Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Thread-Safe Agent State

PraisonAI Agents v0.5.0+ includes thread-safe management of chat history and caches; PR #1567 makes the underlying lock re-entrant and adds a per-event-loop async lock.
Behaviour change in PR #1548: run_sync() now raises RuntimeError when called from inside a running event loop. Previously it would auto-fallback to a background loop.
# Before PR #1548 (worked, but unsafe)
async def handler():
    result = run_sync(some_coro())  # silently used background loop

# After PR #1548 (raises RuntimeError)
async def handler():
    result = await some_coro()  # use this from async context

Thread-Safe Components

Chat History

The chat_history property is now fully thread-safe with automatic locking. The SDK protects chat history mutations through internal helper methods and a locked setter:
from praisonaiagents import Agent
import threading

agent = Agent(
    name="ThreadSafeAgent",
    instructions="You are helpful."
)

def worker(prompt):
    # Safe to call from multiple threads
    response = agent.chat(prompt)
    print(f"Response: {response[:50]}...")

# Create multiple threads
threads = [
    threading.Thread(target=worker, args=(f"Question {i}",))
    for i in range(5)
]

# Start all threads
for t in threads:
    t.start()

# Wait for completion
for t in threads:
    t.join()

What changed in PR #1488

Prior to PR #1488, chat_history mutations bypassed thread-safety locks at 31+ call sites. The SDK now uses internal helper methods that properly acquire locks:
  • _append_to_chat_history(message) - Thread-safe message appending
  • _truncate_chat_history(length) - Thread-safe history truncation
  • _replace_chat_history(new_history) - Thread-safe full replacement
  • chat_history setter now acquires the AsyncSafeState lock for assignments

What changed in PR #1514

PR #1514 enhanced thread-safety in three key areas:1. Locked Memory Initialization: Task.initialize_memory() now uses threading.Lock with double-checked locking pattern. A new async variant initialize_memory_async() uses asyncio.Lock and offloads construction with asyncio.to_thread() to prevent event loop blocking.2. Async-Locked Workflow State: New _set_workflow_finished(value) method uses async locks to safely update workflow completion status across concurrent tasks.3. Non-Mutating Task Context: Task execution no longer mutates task.description during runs. Per-execution context is stored in _execution_context field, keeping the user-facing task.description stable across multiple executions.

Safe operations

# These operations are now thread-safe out of the box:
agent.chat_history = []  # Full replacement - uses locked setter
agent.chat("Hello")      # Appends safely via internal methods

# Reading is always safe:
history = agent.chat_history
print(f"History has {len(history)} messages")

Caches

Internal caches use threading.RLock for reentrant locking:
  • _system_prompt_cache - Cached system prompts
  • _formatted_tools_cache - Cached tool definitions

Rate Limiter

RateLimiter can be shared across threads and agents. Both the sync and async method families are fully locked — see Rate Limiter → Thread Safety & Multi-Agent Use for patterns.

LiteAgent Thread Safety

The lite package also provides thread-safe operations:
from praisonaiagents.lite import LiteAgent, create_openai_llm_fn
import threading

llm_fn = create_openai_llm_fn(model="gpt-4o-mini")
agent = LiteAgent(name="LiteThreadSafe", llm_fn=llm_fn)

def concurrent_chat(message):
    return agent.chat(message)

# Safe concurrent access
with threading.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(concurrent_chat, f"Q{i}") for i in range(10)]
    results = [f.result() for f in futures]

Implementation Details

Lock Types

ComponentLock TypeReason
chat_historyAsyncSafeState (DualLock: RLock + per-loop asyncio.Lock)Re-entrant on same thread; non-blocking in async contexts
_cache_lockthreading.RLockAllows reentrant access from cached helpers
RateLimiter (sync)threading.LockProtects _tokens, _api_tokens, and refill state from races in multi-threaded acquire calls
RateLimiter (async)asyncio.LockSame protection for coroutine contexts

Lock Usage Pattern

# Internal implementation pattern
class Agent:
    def __init__(self):
        self.__chat_history_state = AsyncSafeState([])
        self._cache_lock = threading.RLock()
    
    @property
    def _history_lock(self):
        return self.__chat_history_state
    
    def _append_to_chat_history(self, message):
        with self._history_lock.lock():
            self._history_lock.value.append(message)

Why a re-entrant lock?

Nested calls (e.g. a helper that holds the lock and then assigns chat_history, which itself acquires the lock) used to deadlock. RLock permits the same thread to re-enter. See PR #1567 for details.

Best Practices

Do: Use Agent Methods

# Good - thread-safe
response = agent.chat("Hello")

Don’t: Bypass the Property Interface

# Bad - bypasses locks (direct list mutation)
agent.chat_history.append({"role": "user", "content": "Hello"})

# Good - uses locked setter
agent.chat_history = agent.chat_history + [{"role": "user", "content": "Hello"}]

# Better - use agent methods
agent.chat("Hello")
Reads and full replacements via agent.chat_history = [...] are now safe out-of-the-box. The wrapper is only needed for custom compound operations that require atomic read-modify-write sequences.

Do: Clear History Safely

# Good - use provided method
agent.clear_history()  # Thread-safe

Async Considerations

agent.chat_history is async-aware out of the box — no external asyncio.Lock is required when all calls are inside the same event loop.
import asyncio
from praisonaiagents import Agent

agent = Agent(name="AsyncAgent")

async def async_chat(prompt):
    # No external lock needed - AsyncSafeState handles this
    return agent.chat(prompt)

async def main():
    tasks = [async_chat(f"Question {i}") for i in range(5)]
    results = await asyncio.gather(*tasks)
Since PR #1567, DualLock.sync() and DualLock.async_lock() use independent locks. A sync caller holding the lock will not block an async caller from acquiring it, and vice versa. Within a single context (all-sync or all-async) the lock works as expected; across contexts it does not coordinate. If you mutate agent.chat_history from both sync and async code paths, serialise the boundary yourself.
An external lock is still useful for serialising chat-history mutations from a thread pool that mixes sync and async callers:
import asyncio
import threading
from praisonaiagents import Agent

agent = Agent(name="MixedAgent")
external_lock = threading.Lock()

def sync_worker(prompt):
    with external_lock:
        return agent.chat(prompt)

async def async_worker(prompt):
    # Convert to sync context for coordination
    loop = asyncio.get_event_loop()
    with external_lock:
        return await loop.run_in_executor(None, agent.chat, prompt)

Verifying Thread Safety

Test thread safety with concurrent access:
import threading
from praisonaiagents.lite import LiteAgent

def test_thread_safety():
    agent = LiteAgent(
        name="Test",
        llm_fn=lambda m: "Response"
    )
    
    errors = []
    
    def worker():
        try:
            for _ in range(100):
                agent.chat("Test")
        except Exception as e:
            errors.append(e)
    
    threads = [threading.Thread(target=worker) for _ in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    assert len(errors) == 0, f"Thread safety errors: {errors}"
    print("Thread safety test passed!")

test_thread_safety()

Multi-team HTTP launch

PraisonAI provides comprehensive thread-safety for HTTP server deployment:
  • Multiple Agent / Agents instances may call .launch(port=N) concurrently from different threads — registration is atomic.
  • If two launch calls use the same path on the same port, the second gets an auto-suffixed path (/path_abc123) and a warning is logged.
  • Server readiness is signalled deterministically (no fixed sleep); .launch() returns only after the port is accepting connections (5s timeout).
  • aworkflow() state lock is created inside the running async context, so workflows remain stable when invoked under pytest-asyncio or when nested inside another loop.
import threading
from praisonaiagents import AgentTeam

def launch_team(team_name, port, path):
    team = AgentTeam(name=team_name)
    team.launch(port=port, path=path)

# Safe concurrent launches
thread1 = threading.Thread(target=launch_team, args=("TeamA", 8000, "/team_a"))
thread2 = threading.Thread(target=launch_team, args=("TeamB", 8000, "/team_b"))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

# Both teams available at:
# - http://localhost:8000/team_a
# - http://localhost:8000/team_b

Wrapper-layer thread safety (praisonai package)

The praisonai wrapper layer (distinct from the praisonaiagents content above) provides thread-safe OpenAI client management and CLI command discovery.

Key-aware OpenAI client

The OpenAI client is now cached per (api_key, base_url) tuple, allowing multiple keys in the same process without cross-talk:
from praisonai import PraisonAI
import threading

def worker_with_different_key(api_key, task_name):
    # Each thread gets its own client based on the key
    praisonai = PraisonAI(
        auto=f"Create a {task_name}",
        api_key=api_key  # Different key per thread
    )
    result = praisonai.run()
    print(f"{task_name} completed")

# Two threads with different OpenAI keys
thread1 = threading.Thread(
    target=worker_with_different_key,
    args=("sk-key-team-a", "marketing plan")
)
thread2 = threading.Thread(
    target=worker_with_different_key, 
    args=("sk-key-team-b", "technical doc")
)

thread1.start()
thread2.start()
thread1.join()
thread2.join()

Thread-safe Typer command discovery

Embedding python -m praisonai from multiple threads is now safe. The CLI command discovery uses a double-check lock pattern and doesn’t poison the cache on failure:
import threading
import subprocess

def run_cli_command(command):
    # Safe to call from multiple threads
    result = subprocess.run(
        ["python", "-m", "praisonai"] + command,
        capture_output=True, text=True
    )
    return result.stdout

# Multiple threads can safely use the CLI
threads = [
    threading.Thread(target=run_cli_command, args=(["--version"],))
    for _ in range(5)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

Failure-safe cache

A transient discovery error does not lock the CLI into a broken state — the next call retries instead of permanently breaking dispatch. This ensures reliable operation in multi-threaded server environments where temporary import failures might occur.

New Thread-Safe Components in PR #1548

AsyncAgentScheduler is now loop-aware. The start() method binds its async primitives (asyncio.Event, asyncio.Lock) to the running loop, and stop() raises RuntimeError if called from a different loop than start(). Lazy loaders in praisonai/auto.py are now thread-safe. A single _load_optional(key, loader) helper with a module-level lock replaces the previous unguarded module-level globals. Integration registry (praisonai/integrations/registry.py) now has a per-instance threading.Lock guarding register/unregister/create/list_registered operations.