Skip to main content

Background Tasks

Execute agent tasks and recipes asynchronously without blocking the main thread. Monitor progress, cancel running tasks, and manage concurrent execution.

Quick Start

Agent-Centric Usage

import asyncio
from praisonaiagents import Agent
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create background runner
    runner = BackgroundRunner(config=BackgroundConfig(max_concurrent_tasks=3))
    
    # Agent with background task support
    agent = Agent(
        name="AsyncAssistant",
        instructions="You are a research assistant.",
        background=runner
    )
    
    # Submit agent task to run in background
    task = await agent.background.submit_agent(
        agent=agent,
        prompt="Research AI trends in 2025",
        name="research_task"
    )
    
    # Continue with other work while task runs...
    await task.wait(timeout=60.0)
    print(task.result)

asyncio.run(main())

Using Recipe Operations

from praisonai import recipe

# Submit recipe as background task
task = recipe.run_background(
    "my-recipe",
    input={"query": "What is AI?"},
    config={"max_tokens": 1000},
    session_id="session_123",
    timeout_sec=300,
)

print(f"Task ID: {task.task_id}")

# Check status
status = await task.status()

# Wait for completion
result = await task.wait(timeout=600)
print(f"Result: {result}")

# Cancel if needed
await task.cancel()

Features

  • Async Execution: Run tasks without blocking
  • Concurrency Control: Limit concurrent tasks
  • Progress Tracking: Monitor task status
  • Timeout Support: Set execution time limits
  • Cancellation: Cancel running tasks

Configuration

from praisonaiagents.background import BackgroundConfig

config = BackgroundConfig(
    max_concurrent_tasks=5,    # Max parallel tasks
    default_timeout=300.0,     # 5 minute default timeout
    auto_cleanup=True          # Auto-remove completed tasks
)

Task Status

from praisonaiagents.background import TaskStatus

# Check status
if task.status == TaskStatus.COMPLETED:
    print(f"Result: {task.result}")
elif task.status == TaskStatus.FAILED:
    print(f"Error: {task.error}")
elif task.status == TaskStatus.RUNNING:
    print("Still running...")

CLI Usage

# Submit a recipe as background task
praisonai background submit --recipe my-recipe

# List tasks
praisonai background list

# Check status
praisonai background status <task_id>

# Cancel task
praisonai background cancel <task_id>

# Clear completed
praisonai background clear

Safe Defaults

SettingDefaultDescription
timeout_sec300Maximum execution time (5 minutes)
max_concurrent5Maximum concurrent tasks
cleanup_delay_sec3600Time before completed tasks are cleaned up

Low-level API Reference

BackgroundRunner Direct Usage

import asyncio
from praisonaiagents.background import BackgroundRunner, BackgroundConfig

async def main():
    # Create runner with config
    config = BackgroundConfig(max_concurrent_tasks=3)
    runner = BackgroundRunner(config=config)
    
    # Define a task
    async def my_task(name: str) -> str:
        await asyncio.sleep(2)
        return f"Task {name} completed"
    
    # Submit task
    task = await runner.submit(my_task, args=("example",), name="my_task")
    print(f"Submitted: {task.id[:8]}")
    
    # Wait for completion
    await task.wait(timeout=10.0)
    print(f"Result: {task.result}")

asyncio.run(main())

Submitting Tasks

# Submit async function
task = await runner.submit(
    func=my_async_function,
    args=(arg1, arg2),
    kwargs={"key": "value"},
    name="descriptive_name",
    timeout=60.0
)

# Submit sync function (runs in thread pool)
task = await runner.submit(
    func=my_sync_function,
    args=(arg1,),
    name="sync_task"
)

Task Management

# List all tasks
for task in runner.tasks:
    print(f"{task.name}: {task.status.value}")

# Get running tasks
running = runner.running_tasks

# Get pending tasks
pending = runner.pending_tasks

# Clear completed tasks
runner.clear_completed()

Synchronous Job Manager

For simpler use cases, use BackgroundJobManager for synchronous job management:
from praisonaiagents.background.job_manager import BackgroundJobManager, JobStatus

# Create manager with auto-background threshold
manager = BackgroundJobManager(auto_background_threshold=5.0)

# Start a job
job_id = manager.start_job(lambda: expensive_computation())

# Check status
status = manager.get_status(job_id)
if status == JobStatus.COMPLETED:
    result = manager.get_result(job_id)
elif status == JobStatus.FAILED:
    error = manager.get_error(job_id)

# List all jobs
for job_id, info in manager.list_jobs().items():
    print(f"{job_id}: {info.status}")

# Cancel a running job
manager.cancel_job(job_id)

Zero Performance Impact

The background module uses lazy loading - no overhead when not used:
# Only loads when accessed
from praisonaiagents.background import BackgroundRunner

See Also