import asynciofrom praisonaiagents.ui.a2a import A2AClientasync def main(): async with A2AClient("http://localhost:8000") as client: # Discover agent capabilities card = await client.get_agent_card() print(f"Agent: {card.name}") # Send a message result = await client.send_message("What are AI trends?") print(result)asyncio.run(main())
2
With Authentication
Use bearer token authentication:
from praisonaiagents.ui.a2a import A2AClientasync with A2AClient( base_url="http://localhost:8000", auth_token="sk-secret", timeout=30.0) as client: result = await client.send_message("Hello!") print(result)
# First messageresult1 = await client.send_message("What is Python?")context_id = result1["result"]["contextId"]# Follow-up messageresult2 = await client.send_message( "What about its type system?", context_id=context_id)
Send a message and receive real-time Server-Sent Events:
async for event in client.send_message_streaming("Write a haiku"): if "result" in event: task = event["result"] # Check task status updates if "status" in task: print(f"Status: {task['status']['state']}") # Process response chunks if "artifact" in task: for part in task["artifact"]["parts"]: print(part.get("text", ""))
# List all tasksall_tasks = await client.list_tasks()print(f"Total tasks: {len(all_tasks['result'])}")# List tasks for specific conversationctx_tasks = await client.list_tasks(context_id="ctx-uuid-123")
Complete example showing agent-to-agent delegation:
import asynciofrom praisonaiagents import Agentfrom praisonaiagents.ui.a2a import A2A, A2AClient# Start remote agent server (separate process)# agent = Agent(name="Expert", role="AI Expert", goal="Answer questions")# a2a = A2A(agent=agent)# a2a.serve(port=8001)async def main(): # Client agent calls remote agent async with A2AClient("http://localhost:8001") as remote: # Discover capabilities card = await remote.get_agent_card() print(f"Connected to: {card.name}") # Delegate task result = await remote.send_message("What are the top AI trends?") task = result["result"] print(f"Status: {task['status']['state']}") # Stream long responses async for event in remote.send_message_streaming("Write a detailed report"): if "artifact" in event.get("result", {}): for part in event["result"]["artifact"]["parts"]: print(part.get("text", ""), end="")asyncio.run(main())
Always fetch the agent card first to understand capabilities before sending messages.
async with A2AClient(url) as client: card = await client.get_agent_card() # Check if streaming is supported if card.capabilities.streaming: # Use streaming for long responses async for event in client.send_message_streaming(text): process_event(event) else: # Use regular messaging result = await client.send_message(text)
Context Preservation
Maintain conversation context across multiple message exchanges.
Monitor long-running tasks with periodic status checks.
async def monitor_task(client, task_id): while True: task = await client.get_task(task_id) state = task["result"]["status"]["state"] if state in ["completed", "failed", "cancelled"]: break await asyncio.sleep(1) # Poll every second return task
Batch Operations
Process multiple tasks concurrently using asyncio.
Always use the context manager pattern to ensure proper session cleanup and avoid connection leaks.
# ✅ Good - automatic cleanupasync with A2AClient(url) as client: result = await client.send_message("Hello")# ❌ Bad - manual cleanup requiredclient = A2AClient(url)result = await client.send_message("Hello")# Session not closed!
Error Recovery
Implement retry logic for transient network errors and timeouts.