Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

A2AClient enables PraisonAI applications to discover and interact with remote A2A agents through an async HTTP interface.

Quick Start

1

Simple Usage

Connect to an A2A server and send a message:
import asyncio
from praisonaiagents.ui.a2a import A2AClient

async def main():
    async with A2AClient("http://localhost:8000") as client:
        # Discover agent capabilities
        card = await client.get_agent_card()
        print(f"Agent: {card.name}")
        
        # Send a message
        result = await client.send_message("What are AI trends?")
        print(result)

asyncio.run(main())
2

With Authentication

Use bearer token authentication:
from praisonaiagents.ui.a2a import A2AClient

async with A2AClient(
    base_url="http://localhost:8000",
    auth_token="sk-secret",
    timeout=30.0
) as client:
    result = await client.send_message("Hello!")
    print(result)

How It Works

StepDescription
DiscoveryFetch agent capabilities via Agent Card
AuthenticationOptional Bearer token validation
MessagingSend messages using JSON-RPC protocol
Task ManagementTrack, list, and cancel tasks

Configuration Options

ParameterTypeDefaultDescription
base_urlstrBase URL of the A2A server
auth_tokenstrNoneBearer token for authentication
timeoutfloat30.0HTTP request timeout in seconds

Discovery Methods

Get Agent Card

Fetch basic agent information from /.well-known/agent.json:
async with A2AClient("http://localhost:8000") as client:
    card = await client.get_agent_card()
    
    print(card.name)           # "Research Assistant"
    print(card.capabilities)   # AgentCapabilities(streaming=True, ...)
    print(card.skills)         # [AgentSkill(name="search_web", ...)]

Get Extended Card

Fetch detailed agent information (requires authentication):
client = A2AClient("http://localhost:8000", auth_token="sk-secret")
extended = await client.get_extended_card()
print(extended.advanced_features)

Message Methods

Send Message

Send a text message and get the complete task result:
# Basic message
result = await client.send_message("Explain quantum computing")

# Access task details
task = result["result"]
print(task["status"]["state"])  # "completed"
print(task["artifacts"])         # Agent response artifacts
Multi-turn conversations with context:
# First message
result1 = await client.send_message("What is Python?")
context_id = result1["result"]["contextId"]

# Follow-up message
result2 = await client.send_message(
    "What about its type system?",
    context_id=context_id
)

Streaming Messages

Send a message and receive real-time Server-Sent Events:
async for event in client.send_message_streaming("Write a haiku"):
    if "result" in event:
        task = event["result"]
        
        # Check task status updates
        if "status" in task:
            print(f"Status: {task['status']['state']}")
        
        # Process response chunks
        if "artifact" in task:
            for part in task["artifact"]["parts"]:
                print(part.get("text", ""))

Task Management

Get Task Details

task = await client.get_task("task-uuid-123")
print(task["result"]["status"]["state"])  # "completed", "running", "failed"

List Tasks

# List all tasks
all_tasks = await client.list_tasks()
print(f"Total tasks: {len(all_tasks['result'])}")

# List tasks for specific conversation
ctx_tasks = await client.list_tasks(context_id="ctx-uuid-123")

Cancel Task

cancelled = await client.cancel_task("task-uuid-123")
print(cancelled["result"]["status"]["state"])  # "cancelled"

Context Manager Pattern

Recommended approach with automatic session cleanup:
# Auto-closes HTTP session
async with A2AClient("http://localhost:8000") as client:
    card = await client.get_agent_card()
    result = await client.send_message("Hello!")
Manual management when needed:
client = A2AClient("http://localhost:8000")
try:
    result = await client.send_message("Hello!")
finally:
    await client.close()  # Must call explicitly

Agent-to-Agent Communication

Complete example showing agent-to-agent delegation:
import asyncio
from praisonaiagents import Agent
from praisonaiagents.ui.a2a import A2A, A2AClient

# Start remote agent server (separate process)
# agent = Agent(name="Expert", role="AI Expert", goal="Answer questions")
# a2a = A2A(agent=agent)
# a2a.serve(port=8001)

async def main():
    # Client agent calls remote agent
    async with A2AClient("http://localhost:8001") as remote:
        # Discover capabilities
        card = await remote.get_agent_card()
        print(f"Connected to: {card.name}")
        
        # Delegate task
        result = await remote.send_message("What are the top AI trends?")
        task = result["result"]
        print(f"Status: {task['status']['state']}")
        
        # Stream long responses
        async for event in remote.send_message_streaming("Write a detailed report"):
            if "artifact" in event.get("result", {}):
                for part in event["result"]["artifact"]["parts"]:
                    print(part.get("text", ""), end="")

asyncio.run(main())

Error Handling

Handle common HTTP and connection errors:
import aiohttp

try:
    result = await client.send_message("Hello")
    
except aiohttp.ClientResponseError as e:
    if e.status == 401:
        print("Authentication required")
    elif e.status == 404:
        print("A2A endpoint not found")
    elif e.status == 500:
        print("Server error")
        
except aiohttp.ClientError as e:
    print(f"Connection error: {e}")
    
except asyncio.TimeoutError:
    print("Request timed out")

Common Patterns

Always fetch the agent card first to understand capabilities before sending messages.
async with A2AClient(url) as client:
    card = await client.get_agent_card()
    
    # Check if streaming is supported
    if card.capabilities.streaming:
        # Use streaming for long responses
        async for event in client.send_message_streaming(text):
            process_event(event)
    else:
        # Use regular messaging
        result = await client.send_message(text)
Maintain conversation context across multiple message exchanges.
context_id = None

async with A2AClient(url) as client:
    # Start conversation
    result1 = await client.send_message("Hello", context_id=context_id)
    context_id = result1["result"]["contextId"]
    
    # Continue conversation  
    result2 = await client.send_message("Follow up", context_id=context_id)
Monitor long-running tasks with periodic status checks.
async def monitor_task(client, task_id):
    while True:
        task = await client.get_task(task_id)
        state = task["result"]["status"]["state"]
        
        if state in ["completed", "failed", "cancelled"]:
            break
            
        await asyncio.sleep(1)  # Poll every second
    
    return task
Process multiple tasks concurrently using asyncio.
async def batch_messages(client, messages):
    tasks = []
    
    for msg in messages:
        task = asyncio.create_task(client.send_message(msg))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return results

Best Practices

Always use the context manager pattern to ensure proper session cleanup and avoid connection leaks.
# ✅ Good - automatic cleanup
async with A2AClient(url) as client:
    result = await client.send_message("Hello")

# ❌ Bad - manual cleanup required
client = A2AClient(url)
result = await client.send_message("Hello")
# Session not closed!
Implement retry logic for transient network errors and timeouts.
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def robust_send_message(client, message):
    try:
        return await client.send_message(message)
    except aiohttp.ClientError as e:
        print(f"Retry after error: {e}")
        raise
Store auth tokens securely and never expose them in logs or error messages.
import os

# ✅ Good - from environment
auth_token = os.getenv("A2A_AUTH_TOKEN")
client = A2AClient(url, auth_token=auth_token)

# ❌ Bad - hardcoded token
client = A2AClient(url, auth_token="sk-hardcoded-secret")
Handle streaming events gracefully with proper error handling and backpressure.
async def process_stream(client, message):
    try:
        async for event in client.send_message_streaming(message):
            if "result" in event:
                # Process incrementally to avoid memory buildup
                process_event_chunk(event["result"])
                
    except asyncio.CancelledError:
        print("Stream cancelled")
        raise
    except Exception as e:
        print(f"Stream error: {e}")
        # Handle gracefully

A2A Protocol

Setup A2A servers and protocol basics

Multi-Agent Patterns

Agent orchestration and communication patterns