Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Run an agent on a recurring schedule with async-native execution, cooperative cancellation, and built-in retries.

Quick Start

1

Simple Usage

import asyncio
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

# Migration note: this import will move to praisonai.scheduler.async_agent_scheduler
# See import paths section below

async def main():
    agent = Agent(
        name="NewsChecker",
        instructions="Summarise today's AI news in 3 bullet points.",
    )

    scheduler = AsyncAgentScheduler(agent, task="Check the latest AI news")
    await scheduler.start("hourly", max_retries=3, run_immediately=True)

    # ... run your app ...

    await scheduler.stop()
    print(await scheduler.get_stats())

asyncio.run(main())
2

With Callbacks

import asyncio
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

def on_success(result):
    print(f"Agent completed successfully: {result}")

def on_failure(error):
    print(f"Agent failed: {error}")

async def main():
    agent = Agent(
        name="DataProcessor",
        instructions="Process incoming data efficiently",
    )

    scheduler = AsyncAgentScheduler(
        agent, 
        task="Process latest batch of data",
        on_success=on_success,
        on_failure=on_failure
    )

    # Run every 30 minutes
    await scheduler.start("*/30m", max_retries=3, run_immediately=True)

    # Keep running
    try:
        await asyncio.sleep(3600)  # Run for 1 hour
    finally:
        await scheduler.stop()
        stats = await scheduler.get_stats()
        print(f"Completed {stats['success_count']} successful executions")

asyncio.run(main())

How It Works

The AsyncAgentScheduler uses async-native execution with cooperative cancellation, replacing the old thread-based scheduler.
The scheduler’s async primitives (_stop_event, _cancel_event, _stats_lock) are now created lazily inside _ensure_async_primitives() and bound to the loop that start() runs on. Tests that call stop() without first calling start() must invoke scheduler._ensure_async_primitives() explicitly — see tests/unit/scheduler/test_async_agent_scheduler.py in PR #1583 for the canonical pattern.

Schedule Expression Reference

ExpressionIntervalDescription
"daily"86400sEvery 24 hours
"hourly"3600sEvery hour
"*/30m"1800sEvery 30 minutes
"*/1h"3600sEvery 1 hour
"*/5s"5sEvery 5 seconds
"60"60sCustom seconds (plain digits)

Configuration Options

AsyncAgentScheduler Constructor

ParameterTypeDefaultDescription
agentAnyRequiredAgent instance to schedule
taskstrRequiredTask description to execute
configOptional[Dict[str, Any]]NoneOptional configuration dictionary
on_successOptional[Callable[[Any], None]]NoneCallback function on successful execution
on_failureOptional[Callable[[Exception], None]]NoneCallback function on failed execution

start() Method Options

ParameterTypeDefaultDescription
schedule_exprstrRequiredSchedule expression (e.g., “hourly”, ”*/1h”, “3600”)
max_retriesint3Maximum retry attempts on failure
run_immediatelyboolFalseIf True, run agent immediately before starting schedule

get_stats() Response

FieldTypeDescription
is_runningboolWhether scheduler is currently running
execution_countintTotal number of execution attempts
success_countintNumber of successful executions
failure_countintNumber of failed executions
agent_namestrName of the agent being scheduled
taskstrTask description being executed

Common Patterns

Running in FastAPI Application

from contextlib import asynccontextmanager
from fastapi import FastAPI
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

scheduler = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    global scheduler
    agent = Agent(name="BackgroundWorker", instructions="Process background tasks")
    scheduler = AsyncAgentScheduler(agent, task="Process pending tasks")
    await scheduler.start("*/5m", max_retries=2)
    
    yield
    
    # Shutdown
    if scheduler:
        await scheduler.stop()

app = FastAPI(lifespan=lifespan)

Error Handling with Logging

import asyncio
import logging
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

logging.basicConfig(level=logging.INFO)

def handle_failure(error):
    logging.error(f"Agent execution failed: {error}")
    # Send alert, write to database, etc.

async def main():
    agent = Agent(name="MonitoringAgent", instructions="Monitor system health")
    scheduler = AsyncAgentScheduler(
        agent, 
        task="Check system status",
        on_failure=handle_failure
    )
    
    await scheduler.start("*/10m")
    
    try:
        await asyncio.sleep(float('inf'))
    except KeyboardInterrupt:
        await scheduler.stop()

asyncio.run(main())

Graceful Shutdown on SIGINT

import asyncio
import signal
from praisonaiagents import Agent
from praisonai.async_agent_scheduler import AsyncAgentScheduler

scheduler = None

def signal_handler():
    if scheduler:
        asyncio.create_task(scheduler.stop())

async def main():
    global scheduler
    
    # Setup signal handling
    for sig in [signal.SIGINT, signal.SIGTERM]:
        signal.signal(sig, lambda s, f: signal_handler())
    
    agent = Agent(name="LongRunningAgent", instructions="Process data continuously")
    scheduler = AsyncAgentScheduler(agent, task="Process data batch")
    
    await scheduler.start("*/15m", run_immediately=True)
    
    try:
        # Keep running until signal
        await asyncio.sleep(float('inf'))
    except KeyboardInterrupt:
        print("Received interrupt, shutting down gracefully...")
    finally:
        if scheduler:
            await scheduler.stop()
            stats = await scheduler.get_stats()
            print(f"Final stats: {stats}")

asyncio.run(main())

Best Practices

The stop() method waits up to 30 seconds for the current execution to complete before canceling. This prevents data corruption and ensures clean shutdown.
# Good
try:
    await scheduler.start("hourly")
    await asyncio.sleep(3600)
finally:
    await scheduler.stop()

# Bad - may interrupt agent mid-execution
await scheduler.start("hourly")
await asyncio.sleep(3600)
# Exit without stopping
Enable run_immediately=True to verify your agent works correctly before waiting for the first scheduled interval.
# Test immediately, then schedule
await scheduler.start("hourly", run_immediately=True)

# Good for smoke tests
await scheduler.start("daily", run_immediately=True)
Success and failure callbacks are called synchronously. Heavy operations should be offloaded to avoid blocking the scheduler.
# Good - lightweight logging
def on_success(result):
    logger.info(f"Agent completed: {result}")

# Bad - heavy database operations
def on_success(result):
    database.save_large_dataset(result)  # Blocks scheduler
    
# Better - offload heavy work
def on_success(result):
    asyncio.create_task(save_to_database(result))
For new code, use AsyncAgentScheduler instead of the legacy AgentScheduler. The async version provides better cancellation, no daemon threads, and fits naturally into async applications.
# New async approach
from praisonai.async_agent_scheduler import AsyncAgentScheduler
scheduler = AsyncAgentScheduler(agent, task)
await scheduler.start("hourly")

# Legacy thread-based (avoid for new code)
from praisonai.scheduler import AgentScheduler
scheduler = AgentScheduler(agent, task)
scheduler.start("hourly")

Import paths (PR #1552):
  • Pending deprecation (still works, emits PendingDeprecationWarning — will move to praisonai.scheduler.async_agent_scheduler in a future release): from praisonai.async_agent_scheduler import AsyncAgentScheduler
  • Canonical: from praisonai.scheduler import AgentScheduler (sync scheduler)
  • Deprecated (still works, emits DeprecationWarning): from praisonai.agent_scheduler import AgentScheduler
Migration from Legacy Scheduler: AsyncAgentScheduler replaces the thread-based AgentScheduler for new applications. The sync-looking public CLI (praisonai schedule ...) is unchanged and continues to work as before.
Jupyter/Event Loop Compatibility: Starting with PR #1448, PraisonAI no longer calls nest_asyncio.apply() or asyncio.set_event_loop() on your behalf when ACP/LSP is enabled. If you embed PraisonAI inside a Jupyter kernel or another running event loop, either call nest_asyncio.apply() yourself at the top of your notebook, or run PraisonAI from a separate process.

Scheduler CLI

Command-line interface for scheduling agents

Background Tasks

Running agents as background processes