Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Event Bus Module
The Event Bus provides a typed, publish-subscribe event system for PraisonAI Agents. It enables decoupled communication between components with support for both synchronous and asynchronous subscribers.
Features
- Typed Events - Predefined event types for common operations
- Sync/Async Subscribers - Support for both synchronous and async handlers
- Event Filtering - Subscribe to specific event types
- Event History - Optional event history tracking
- Global Default Bus - Shared bus instance for application-wide events
Installation
The Event Bus is included in the core praisonaiagents package:
pip install praisonaiagents
Quick Start
from praisonaiagents.bus import EventBus, Event, EventType
# Create an event bus
bus = EventBus()
# Subscribe to events
def on_message(event):
print(f"Received: {event.data}")
bus.subscribe(on_message, event_type=EventType.MESSAGE_CREATED)
# Publish an event
bus.publish(Event(
type=EventType.MESSAGE_CREATED,
data={"text": "Hello, World!"}
))
Event Types
The following event types are available:
| Event Type | Description |
|---|
SESSION_CREATED | New session created |
SESSION_UPDATED | Session modified |
SESSION_DELETED | Session deleted |
SESSION_FORKED | Session forked |
MESSAGE_CREATED | New message added |
TOOL_STARTED | Tool execution started |
TOOL_COMPLETED | Tool execution completed |
AGENT_STARTED | Agent execution started |
AGENT_COMPLETED | Agent execution completed |
SNAPSHOT_CREATED | File snapshot created |
COMPACTION_COMPLETED | Context compaction done |
CUSTOM | Custom event type |
API Reference
EventBus
class EventBus:
def subscribe(
self,
callback: Callable[[Event], None],
event_type: Optional[EventType] = None
) -> str:
"""Subscribe to events. Returns subscription ID."""
def unsubscribe(self, subscription_id: str) -> bool:
"""Unsubscribe from events."""
def publish(self, event: Event) -> None:
"""Publish an event to all subscribers."""
async def publish_async(self, event: Event) -> None:
"""Publish an event asynchronously."""
def get_history(self, limit: int = 100) -> List[Event]:
"""Get recent event history."""
Event
@dataclass
class Event:
type: EventType # Event type
data: Dict[str, Any] # Event payload
source: Optional[str] # Source identifier
timestamp: float # Unix timestamp
id: str # Unique event ID
Examples
Async Subscriber
import asyncio
from praisonaiagents.bus import EventBus, Event, EventType
bus = EventBus()
async def async_handler(event):
await asyncio.sleep(0.1)
print(f"Async received: {event.data}")
bus.subscribe(async_handler, event_type=EventType.TOOL_COMPLETED)
# Publish asynchronously
await bus.publish_async(Event(
type=EventType.TOOL_COMPLETED,
data={"tool": "bash", "result": "success"}
))
Global Event Bus
from praisonaiagents.bus import get_default_bus, Event, EventType
# Get the global bus instance
bus = get_default_bus()
# All components can share this bus
bus.publish(Event(type=EventType.CUSTOM, data={"action": "startup"}))
Event History
bus = EventBus(history_size=1000)
# Publish some events
for i in range(10):
bus.publish(Event(type=EventType.CUSTOM, data={"index": i}))
# Get recent history
history = bus.get_history(limit=5)
print(f"Last 5 events: {len(history)}")
Integration with Agents
The Event Bus integrates with PraisonAI Agents to provide real-time notifications:
from praisonaiagents import Agent
from praisonaiagents.bus import get_default_bus, EventType
bus = get_default_bus()
# Monitor agent activity
bus.subscribe(
lambda e: print(f"Agent started: {e.data}"),
event_type=EventType.AGENT_STARTED
)
bus.subscribe(
lambda e: print(f"Tool used: {e.data}"),
event_type=EventType.TOOL_COMPLETED
)
# Agent will emit events during execution
agent = Agent(name="Assistant")
agent.chat("Hello!")