Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Concurrency controls let you limit parallel agent execution and set timeouts for tool calls to prevent resource exhaustion.

Quick Start

1

Limit parallel runs of an agent

Control how many instances of the same agent can run concurrently:
from praisonaiagents import Agent
from praisonaiagents.agent.concurrency import ConcurrencyRegistry

registry = ConcurrencyRegistry()
registry.set_limit("researcher", 2)  # at most 2 concurrent runs

agent = Agent(name="researcher", instructions="Research topics")

# Sync context
registry.acquire_sync("researcher")
try:
    agent.start("Research Mars exploration")
finally:
    registry.release("researcher")
2

Same, async

Use async context for better resource utilization:
await registry.acquire("researcher")
try:
    await agent.astart("Research Mars exploration")
finally:
    registry.release("researcher")
3

Bound tool time with tool_timeout

Prevent slow tools from blocking agent execution:
from praisonaiagents import Agent

agent = Agent(
    name="Assistant",
    instructions="Use tools to help users",
    tools=["get_weather"],
    tool_timeout=30,  # seconds; slow tools return a timeout dict
)
agent.start("What's the weather in Tokyo?")

How It Works

ComponentPurposeThread Safety
ConcurrencyRegistryLimits parallel agent runs✅ Thread-safe
Tool ExecutorRuns tools with timeout✅ Per-agent pool
Plugin APIsEnable/disable plugins✅ Lock-protected

Sync vs Async Rule

The concurrency registry enforces strict separation between sync and async contexts:
ContextMethodWhat happens if you mix
Syncregistry.acquire_sync(name)✅ Works correctly
Asyncawait registry.acquire(name)✅ Works correctly
Mixedacquire_sync() in async❌ Raises RuntimeError
Calling acquire_sync() from an async context raises RuntimeError("acquire_sync('<agent_name>') cannot be called with a running event loop; use async acquire() in async contexts."). Use await acquire() instead.
Example error:
async def bad_example():
    registry.acquire_sync("agent")  # RuntimeError!

async def good_example():
    await registry.acquire("agent")  # ✅ Correct

Tool Timeout Behavior

When tool_timeout is set, tools run in a dedicated executor with these characteristics:

Timeout Return Shape

On timeout, tools return a dict (not an exception):
{
    "error": "Tool timed out after 30s", 
    "timeout": True
}

Executor Details

  • One executor per Agent instance (lazy creation)
  • max_workers=2 threads per agent
  • Thread name prefix: tool-<agent_name> — useful for log filtering
  • Reused across calls — no resource leak
Which timeout to choose:

Common Patterns

Limit FastAPI Route Concurrency

from praisonaiagents import Agent
from praisonaiagents.agent.concurrency import ConcurrencyRegistry

registry = ConcurrencyRegistry()
registry.set_limit("chat_agent", 5)

@app.post("/chat")
async def chat_endpoint(message: str):
    await registry.acquire("chat_agent")
    try:
        agent = Agent(name="chat_agent", instructions="Help users")
        response = await agent.astart(message)
        return {"response": response}
    finally:
        registry.release("chat_agent")

Async Context Manager Helper

from contextlib import asynccontextmanager

@asynccontextmanager
async def throttled_agent(name: str, max_concurrent: int = 3):
    registry = ConcurrencyRegistry()
    registry.set_limit(name, max_concurrent)
    await registry.acquire(name)
    try:
        yield
    finally:
        registry.release(name)

# Usage
async with throttled_agent("researcher", 2):
    agent = Agent(name="researcher", instructions="Research topics")
    result = await agent.astart("Study quantum computing")

Timeout Selection by Tool Type

def get_agent_with_timeouts():
    return Agent(
        name="MultiTool Assistant",
        instructions="Help with various tasks",
        tools=[
            "web_search",      # Network IO
            "file_processor",  # Local computation  
            "simple_math"      # Fast operation
        ],
        tool_timeout=45  # Good balance for mixed workload
    )

Best Practices

Prevents deadlocks when exceptions occur:
registry.acquire_sync("agent")
try:
    # Agent work here
    agent.start("task")
finally:
    registry.release("agent")  # Always runs
Keep acquisition method consistent with execution context:
# ✅ Good - sync context, sync acquire
def sync_handler():
    registry.acquire_sync("agent")
    try:
        agent.start("task")
    finally:
        registry.release("agent")

# ✅ Good - async context, async acquire  
async def async_handler():
    await registry.acquire("agent")
    try:
        await agent.astart("task")
    finally:
        registry.release("agent")
Any tool that does network IO should have a timeout:
# Tools that need timeouts
network_tools = ["web_search", "api_call", "download_file"]
local_tools = ["calculate", "format_text", "parse_json"]

agent = Agent(
    name="Assistant",
    tools=network_tools + local_tools,
    tool_timeout=30  # Protects against slow network
)
Filter logs by agent name using the thread prefix:
# Filter tool execution logs by agent
grep "tool-researcher" app.log

# Or in Python logging
import logging
logging.basicConfig(format='%(threadName)s: %(message)s')

Plugins

Plugin thread-safety and concurrent execution

Tool Configuration

Tool timeout settings and performance tuning

Async Bridge

Safe sync↔async boundary crossing utilities

Thread Safety

Chat history and state protection mechanisms