Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Push notifications deliver real-time messages from a PraisonAI gateway to subscribed clients over WebSocket (with HTTP polling fallback).
PushClient requires the praisonai wrapper package (pip install praisonai). The core praisonaiagents package only ships the push protocols and ChannelMessage model — concrete client and transport implementations live in the wrapper. You can still import via from praisonaiagents.push import PushClient (a lazy re-export), or directly via from praisonai.push import PushClient.

Quick Start

1

Install Prerequisites

PushClient and its transports ship in the praisonai wrapper package. Install it before importing:
pip install praisonai
The core praisonaiagents package contains only the push protocols and the ChannelMessage model. The concrete PushClient, WebSocketTransport, and PollingTransport classes live in the wrapper.
2

Enable Push on Gateway

from praisonaiagents import GatewayConfig, PushConfig

config = GatewayConfig(
    push=PushConfig(enabled=True)
)
3

Connect PushClient

from praisonaiagents.push import PushClient

client = PushClient("ws://localhost:8765/ws", auth_token="my-token")
await client.connect()
4

Subscribe and Receive Messages

@client.on("channel_message")
async def on_message(msg):
    print(f"Received on {msg.channel}: {msg.data}")

await client.subscribe("alerts")
await client.wait_closed()

Agent-Centric Example

from praisonaiagents import Agent
from praisonaiagents.push import PushClient

agent = Agent(
    name="alerts-agent",
    instructions="Watch the 'alerts' channel and summarise incoming events"
)

client = PushClient("ws://localhost:8765/ws", auth_token="my-token")
await client.connect()

@client.on("channel_message")
async def on_msg(msg):
    summary = agent.start(f"Summarise this alert: {msg.data}")
    print(summary)

await client.subscribe("alerts")
await client.wait_closed()

Import Paths

Two equivalent import paths are supported:
# Recommended (explicit wrapper import)
from praisonai.push import PushClient, WebSocketTransport, PollingTransport

# Backward-compatible (lazy re-export from core)
from praisonaiagents.push import PushClient, WebSocketTransport, PollingTransport
Both resolve to the same classes in the praisonai wrapper package. The praisonaiagents.push path is kept for backward compatibility and will raise a clear ImportError if the praisonai wrapper is not installed. The core praisonaiagents.push module only exports protocol-level types:
from praisonaiagents.push import ChannelMessage           # dataclass
from praisonaiagents.push import PushTransportProtocol    # typing.Protocol

How It Works

ComponentPurpose
PushClientClient SDK with auto-reconnect and transport fallback
WebSocketPrimary real-time transport
HTTP PollingFallback transport for restricted networks
ChannelsNamed message streams for pub/sub
PresenceTrack online/offline status of clients
Delivery GuaranteesAt-least-once message delivery with ACKs

Configuration Options

PushConfig

OptionTypeDefaultDescription
enabledboolFalseFeature toggle (push is opt-in; zero overhead when disabled)
redisOptional[RedisConfig]NoneRedis config for cross-server scaling
presencePresenceConfigPresenceConfig()Presence tracking settings
deliveryDeliveryConfigDeliveryConfig()Delivery guarantee settings
pollingPollingConfigPollingConfig()Polling fallback settings

RedisConfig

OptionTypeDefaultDescription
urlOptional[str]NoneFull Redis URL (takes precedence over host/port)
hoststr"localhost"Redis host
portint6379Redis port
dbint0Redis database number
passwordOptional[str]NoneRedis password
prefixstr"praison:push:"Key prefix namespace
max_connectionsint20Connection pool size

PresenceConfig

OptionTypeDefaultDescription
enabledboolTrueToggle presence tracking
heartbeat_intervalint15Expected heartbeat frequency (seconds)
offline_timeoutint45Mark offline after this many seconds without heartbeat
broadcast_changesboolTrueBroadcast presence changes to subscribed channels

DeliveryConfig

OptionTypeDefaultDescription
enabledboolTrueToggle delivery guarantees
ack_timeoutint30Seconds to wait for ACK before retrying
max_retriesint3Maximum retry attempts
retry_backofffloat2.0Exponential backoff multiplier
message_ttlint86400How long to retain unacknowledged messages (seconds)
store_backendstr"memory""memory" or "redis"

PollingConfig

OptionTypeDefaultDescription
enabledboolTrueToggle polling fallback
long_poll_timeoutint30Long-poll hang duration (seconds)
max_batch_sizeint100Max messages per poll response

Custom Transports (Advanced)

Implement PushTransportProtocol to plug in a custom transport (e.g. SSE, gRPC):
from praisonaiagents.push import PushTransportProtocol

class MyTransport:
    @property
    def is_connected(self) -> bool: ...
    async def connect(self) -> None: ...
    async def disconnect(self) -> None: ...
    async def send(self, data: dict) -> None: ...
    async def receive(self) -> dict: ...
Method / PropertyPurpose
is_connected (property)Whether the transport is currently connected
connect()Establish the underlying connection
disconnect()Close the connection
send(data)Send a JSON-serialisable dict to the gateway
receive()Await and return the next JSON message from the gateway

Common Patterns

Publish/Subscribe

from praisonaiagents.push import PushClient

# Publisher
publisher = PushClient("ws://localhost:8765/ws")
await publisher.connect()
await publisher.publish("events", {"type": "user_login", "user_id": 123})

# Subscriber
subscriber = PushClient("ws://localhost:8765/ws")
await subscriber.connect()

@subscriber.on("channel_message")
async def handle_event(msg):
    print(f"Event: {msg.data}")

await subscriber.subscribe("events")

One-shot Wait

from praisonaiagents.push import PushClient

client = PushClient("ws://localhost:8765/ws")
await client.connect()

# Wait for next message on channel with timeout
try:
    msg = await client.wait_for("notifications", timeout=60.0)
    print(f"Got notification: {msg.data}")
except asyncio.TimeoutError:
    print("No notification received within 60 seconds")

Presence Tracking

from praisonaiagents.push import PushClient

client = PushClient("ws://localhost:8765/ws")
await client.connect()

# Set your status
await client.set_status("available", {"role": "agent"})

# Query who's online
@client.on("presence_list")
async def on_presence(data):
    for user in data.get("clients", []):
        print(f"{user['client_id']}: {user['status']}")

await client.get_presence()

Best Practices

Enable Redis when running multiple gateway servers that need to share push channels:
from praisonaiagents import GatewayConfig, PushConfig, RedisConfig

config = GatewayConfig(
    push=PushConfig(
        enabled=True,
        redis=RedisConfig(
            host="redis.example.com",
            password="your-redis-password"
        )
    )
)
Without Redis, channels only exist on a single gateway instance.
Use memory for low-latency scenarios where message loss on restart is acceptable:
delivery=DeliveryConfig(store_backend="memory")
Use redis for high-availability scenarios where messages must survive server restarts:
delivery=DeliveryConfig(store_backend="redis")
Adjust based on network conditions and desired responsiveness:
# Fast networks, quick presence updates
presence=PresenceConfig(
    heartbeat_interval=10,
    offline_timeout=30
)

# Slow networks, conservative timeouts
presence=PresenceConfig(
    heartbeat_interval=30,
    offline_timeout=90
)
Polling is crucial for corporate firewalls and mobile networks that block WebSocket connections:
# Ensure fallback is enabled (default)
client = PushClient(
    "ws://localhost:8765/ws",
    fallback_to_polling=True
)
Clients automatically switch to HTTP polling if WebSocket connection fails.

Choosing a Transport


Gateway

The gateway that hosts push notification channels

A2A Push Notifications

Webhook-based task updates (different from real-time channels)