Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Performance Tuning Guidelines
Optimizing performance in multi-agent systems requires a systematic approach to identify bottlenecks and implement targeted improvements. This guide provides strategies for achieving optimal performance. To measure where time goes in your agents, see Profiling.Performance Analysis Framework
Key Performance Indicators (KPIs)
- Response Time: End-to-end request latency
- Throughput: Requests processed per second
- Resource Utilization: CPU, memory, and I/O usage
- Concurrency: Parallel agent execution efficiency
- Token Efficiency: Tokens used per task
Performance Profiling
1. Agent Performance Profiler
Comprehensive profiling for multi-agent systems:import time
import psutil
import cProfile
import pstats
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import threading
from contextlib import contextmanager
import io
@dataclass
class PerformanceMetrics:
agent_name: str
operation: str
start_time: float
end_time: Optional[float] = None
cpu_percent_start: float = 0
cpu_percent_end: float = 0
memory_mb_start: float = 0
memory_mb_end: float = 0
tokens_used: int = 0
cache_hits: int = 0
cache_misses: int = 0
custom_metrics: Dict[str, Any] = field(default_factory=dict)
class PerformanceProfiler:
def __init__(self):
self.metrics: List[PerformanceMetrics] = []
self.active_profiles: Dict[str, cProfile.Profile] = {}
self._lock = threading.Lock()
self.process = psutil.Process()
@contextmanager
def profile_operation(self, agent_name: str, operation: str):
"""Profile a specific operation"""
# Start profiling
profile = cProfile.Profile()
profile_key = f"{agent_name}:{operation}"
# Collect initial metrics
metric = PerformanceMetrics(
agent_name=agent_name,
operation=operation,
start_time=time.time(),
cpu_percent_start=self.process.cpu_percent(),
memory_mb_start=self.process.memory_info().rss / 1024 / 1024
)
with self._lock:
self.active_profiles[profile_key] = profile
profile.enable()
try:
yield metric
finally:
profile.disable()
# Collect final metrics
metric.end_time = time.time()
metric.cpu_percent_end = self.process.cpu_percent()
metric.memory_mb_end = self.process.memory_info().rss / 1024 / 1024
with self._lock:
self.metrics.append(metric)
if profile_key in self.active_profiles:
del self.active_profiles[profile_key]
def get_profile_stats(self, agent_name: str, operation: str,
top_n: int = 20) -> str:
"""Get detailed profile statistics"""
profile_key = f"{agent_name}:{operation}"
# Find all metrics for this operation
operation_metrics = [
m for m in self.metrics
if m.agent_name == agent_name and m.operation == operation
]
if not operation_metrics:
return f"No profiling data for {profile_key}"
# Aggregate statistics
total_time = sum(m.end_time - m.start_time for m in operation_metrics if m.end_time)
avg_time = total_time / len(operation_metrics)
# Memory statistics
memory_deltas = [
m.memory_mb_end - m.memory_mb_start
for m in operation_metrics
if m.memory_mb_end > 0
]
avg_memory_delta = sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0
stats = f"""
Performance Profile: {profile_key}
===================================
Executions: {len(operation_metrics)}
Total Time: {total_time:.2f}s
Average Time: {avg_time:.2f}s
Average Memory Delta: {avg_memory_delta:.2f}MB
Top Time-Consuming Operations:
"""
# Add detailed timing breakdown if available
if hasattr(operation_metrics[-1], '_profile_stats'):
s = io.StringIO()
ps = pstats.Stats(operation_metrics[-1]._profile_stats, stream=s)
ps.strip_dirs().sort_stats('cumulative').print_stats(top_n)
stats += s.getvalue()
return stats
def identify_bottlenecks(self, threshold_seconds: float = 1.0) -> List[Dict[str, Any]]:
"""Identify performance bottlenecks"""
bottlenecks = []
# Group metrics by agent and operation
operation_groups = {}
for metric in self.metrics:
if metric.end_time is None:
continue
key = f"{metric.agent_name}:{metric.operation}"
if key not in operation_groups:
operation_groups[key] = []
operation_groups[key].append(metric)
# Analyze each operation
for key, metrics in operation_groups.items():
execution_times = [m.end_time - m.start_time for m in metrics]
avg_time = sum(execution_times) / len(execution_times)
if avg_time > threshold_seconds:
memory_deltas = [
m.memory_mb_end - m.memory_mb_start
for m in metrics
if m.memory_mb_end > 0
]
bottlenecks.append({
"operation": key,
"avg_execution_time": avg_time,
"max_execution_time": max(execution_times),
"total_executions": len(metrics),
"avg_memory_delta": sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0,
"severity": "high" if avg_time > threshold_seconds * 2 else "medium"
})
# Sort by average execution time
bottlenecks.sort(key=lambda x: x["avg_execution_time"], reverse=True)
return bottlenecks
2. Async Performance Monitor
Monitor async operations and concurrency:import asyncio
from typing import Set
class AsyncPerformanceMonitor:
def __init__(self):
self.active_tasks: Set[str] = set()
self.task_metrics: Dict[str, Dict[str, Any]] = {}
self._lock = asyncio.Lock()
self.max_concurrent_tasks = 0
@contextmanager
async def monitor_async_operation(self, operation_name: str):
"""Monitor an async operation"""
task_id = f"{operation_name}_{id(asyncio.current_task())}"
async with self._lock:
self.active_tasks.add(task_id)
self.max_concurrent_tasks = max(
self.max_concurrent_tasks,
len(self.active_tasks)
)
self.task_metrics[task_id] = {
"operation": operation_name,
"start_time": time.time(),
"status": "running"
}
try:
yield
async with self._lock:
self.task_metrics[task_id]["status"] = "completed"
self.task_metrics[task_id]["end_time"] = time.time()
except Exception as e:
async with self._lock:
self.task_metrics[task_id]["status"] = "failed"
self.task_metrics[task_id]["error"] = str(e)
self.task_metrics[task_id]["end_time"] = time.time()
raise
finally:
async with self._lock:
self.active_tasks.discard(task_id)
def get_concurrency_report(self) -> Dict[str, Any]:
"""Get concurrency performance report"""
completed_tasks = [
m for m in self.task_metrics.values()
if m["status"] == "completed" and "end_time" in m
]
if not completed_tasks:
return {"message": "No completed tasks"}
# Calculate concurrency metrics
execution_times = [
t["end_time"] - t["start_time"]
for t in completed_tasks
]
# Group by operation
operation_stats = {}
for task in completed_tasks:
op = task["operation"]
if op not in operation_stats:
operation_stats[op] = {
"count": 0,
"total_time": 0,
"max_concurrent": 0
}
operation_stats[op]["count"] += 1
operation_stats[op]["total_time"] += task["end_time"] - task["start_time"]
return {
"max_concurrent_tasks": self.max_concurrent_tasks,
"current_active_tasks": len(self.active_tasks),
"total_completed_tasks": len(completed_tasks),
"avg_execution_time": sum(execution_times) / len(execution_times),
"operation_stats": operation_stats
}
Optimization Strategies
1. Caching Strategy
Implement intelligent caching for expensive operations:from functools import lru_cache
import hashlib
import pickle
class IntelligentCache:
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache: Dict[str, Dict[str, Any]] = {}
self.access_count: Dict[str, int] = {}
self.computation_time: Dict[str, float] = {}
self._lock = threading.Lock()
def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate cache key from function arguments"""
key_data = {
"func": func_name,
"args": args,
"kwargs": kwargs
}
# Create hash of the data
key_str = pickle.dumps(key_data)
return hashlib.sha256(key_str).hexdigest()
def cached(self, ttl_override: Optional[int] = None):
"""Decorator for caching function results"""
def decorator(func):
def wrapper(*args, **kwargs):
cache_key = self._generate_key(func.__name__, args, kwargs)
# Check cache
with self._lock:
if cache_key in self.cache:
entry = self.cache[cache_key]
# Check TTL
age = time.time() - entry["timestamp"]
ttl = ttl_override or self.ttl_seconds
if age < ttl:
self.access_count[cache_key] = self.access_count.get(cache_key, 0) + 1
return entry["value"]
# Compute value
start_time = time.time()
result = func(*args, **kwargs)
computation_time = time.time() - start_time
# Store in cache
with self._lock:
# Evict if necessary
if len(self.cache) >= self.max_size:
self._evict_least_valuable()
self.cache[cache_key] = {
"value": result,
"timestamp": time.time()
}
self.access_count[cache_key] = 1
self.computation_time[cache_key] = computation_time
return result
return wrapper
return decorator
def _evict_least_valuable(self):
"""Evict least valuable cache entry"""
if not self.cache:
return
# Calculate value score for each entry
scores = {}
current_time = time.time()
for key, entry in self.cache.items():
age = current_time - entry["timestamp"]
access_count = self.access_count.get(key, 1)
comp_time = self.computation_time.get(key, 0.1)
# Value = (access_count * computation_time) / age
value_score = (access_count * comp_time) / max(age, 1)
scores[key] = value_score
# Evict lowest score
evict_key = min(scores.keys(), key=lambda k: scores[k])
del self.cache[evict_key]
self.access_count.pop(evict_key, None)
self.computation_time.pop(evict_key, None)
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
with self._lock:
if not self.access_count:
return {"cache_size": len(self.cache)}
total_hits = sum(count - 1 for count in self.access_count.values())
total_misses = len(self.access_count)
hit_rate = total_hits / (total_hits + total_misses) if (total_hits + total_misses) > 0 else 0
# Calculate time saved
time_saved = sum(
(count - 1) * self.computation_time.get(key, 0)
for key, count in self.access_count.items()
)
return {
"cache_size": len(self.cache),
"hit_rate": hit_rate,
"total_hits": total_hits,
"total_misses": total_misses,
"estimated_time_saved": time_saved,
"avg_computation_time": sum(self.computation_time.values()) / len(self.computation_time)
}
2. Batch Processing Optimization
Optimize batch operations for better throughput:from typing import TypeVar, Generic
T = TypeVar('T')
R = TypeVar('R')
class BatchProcessor(Generic[T, R]):
def __init__(self,
batch_size: int = 100,
max_wait_time: float = 0.1,
max_workers: int = 4):
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.max_workers = max_workers
self.pending_items: List[Tuple[T, asyncio.Future]] = []
self.processing = False
self._lock = asyncio.Lock()
async def process(self, item: T, processor_func: Callable[[List[T]], List[R]]) -> R:
"""Add item for batch processing"""
future = asyncio.Future()
async with self._lock:
self.pending_items.append((item, future))
# Start processing if not already running
if not self.processing:
asyncio.create_task(self._process_batches(processor_func))
return await future
async def _process_batches(self, processor_func: Callable[[List[T]], List[R]]):
"""Process items in batches"""
self.processing = True
try:
while True:
# Wait for batch to fill or timeout
start_wait = time.time()
while len(self.pending_items) < self.batch_size:
if time.time() - start_wait > self.max_wait_time:
break
if not self.pending_items:
await asyncio.sleep(0.01)
continue
await asyncio.sleep(0.001)
# Get batch
async with self._lock:
if not self.pending_items:
break
batch_items = self.pending_items[:self.batch_size]
self.pending_items = self.pending_items[self.batch_size:]
# Process batch
items = [item for item, _ in batch_items]
futures = [future for _, future in batch_items]
try:
# Process in parallel if supported
if asyncio.iscoroutinefunction(processor_func):
results = await processor_func(items)
else:
# Run in thread pool for CPU-bound operations
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, processor_func, items)
# Distribute results
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
# Set exception for all futures in batch
for future in futures:
future.set_exception(e)
finally:
self.processing = False
3. Connection Pooling
Optimize resource connections:import asyncio
from asyncio import Queue
import aiohttp
class ConnectionPool:
def __init__(self,
min_connections: int = 5,
max_connections: int = 20,
connection_timeout: float = 30.0):
self.min_connections = min_connections
self.max_connections = max_connections
self.connection_timeout = connection_timeout
self.available_connections: Queue = Queue()
self.active_connections = 0
self.total_connections = 0
self._lock = asyncio.Lock()
self._connector = None
self._stats = {
"connections_created": 0,
"connections_reused": 0,
"connection_errors": 0,
"wait_time_total": 0
}
async def initialize(self):
"""Initialize connection pool"""
self._connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=self.max_connections
)
# Create minimum connections
for _ in range(self.min_connections):
conn = await self._create_connection()
await self.available_connections.put(conn)
async def acquire(self) -> aiohttp.ClientSession:
"""Acquire a connection from the pool"""
start_time = time.time()
# Try to get available connection
try:
connection = await asyncio.wait_for(
self.available_connections.get(),
timeout=0.1
)
self._stats["connections_reused"] += 1
except asyncio.TimeoutError:
# Create new connection if under limit
async with self._lock:
if self.total_connections < self.max_connections:
connection = await self._create_connection()
else:
# Wait for connection to become available
connection = await self.available_connections.get()
self._stats["connections_reused"] += 1
self._stats["wait_time_total"] += time.time() - start_time
async with self._lock:
self.active_connections += 1
return connection
async def release(self, connection: aiohttp.ClientSession):
"""Release connection back to pool"""
async with self._lock:
self.active_connections -= 1
# Check if connection is still valid
if not connection.closed:
await self.available_connections.put(connection)
else:
async with self._lock:
self.total_connections -= 1
# Create replacement if below minimum
if self.total_connections < self.min_connections:
try:
new_conn = await self._create_connection()
await self.available_connections.put(new_conn)
except Exception:
pass
async def _create_connection(self) -> aiohttp.ClientSession:
"""Create a new connection"""
try:
session = aiohttp.ClientSession(
connector=self._connector,
timeout=aiohttp.ClientTimeout(total=self.connection_timeout)
)
async with self._lock:
self.total_connections += 1
self._stats["connections_created"] += 1
return session
except Exception as e:
self._stats["connection_errors"] += 1
raise
def get_pool_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics"""
return {
"total_connections": self.total_connections,
"active_connections": self.active_connections,
"available_connections": self.available_connections.qsize(),
"connection_reuse_rate": (
self._stats["connections_reused"] /
max(self._stats["connections_created"] + self._stats["connections_reused"], 1)
),
**self._stats
}
4. Memory Optimization
Optimize memory usage patterns:import gc
import weakref
from pympler import tracker
class MemoryOptimizer:
def __init__(self):
self.memory_tracker = tracker.SummaryTracker()
self.large_objects = weakref.WeakValueDictionary()
self.gc_stats = []
def track_large_object(self, obj_id: str, obj: Any):
"""Track large objects for memory optimization"""
self.large_objects[obj_id] = obj
def optimize_memory(self) -> Dict[str, Any]:
"""Perform memory optimization"""
stats = {}
# Get current memory usage
process = psutil.Process()
stats["memory_before_mb"] = process.memory_info().rss / 1024 / 1024
# Clear weak references
stats["weak_refs_cleared"] = len(self.large_objects)
self.large_objects.clear()
# Run garbage collection
gc_stats_before = gc.get_stats()
collected = gc.collect(2) # Full collection
gc_stats_after = gc.get_stats()
stats["objects_collected"] = collected
stats["memory_after_mb"] = process.memory_info().rss / 1024 / 1024
stats["memory_freed_mb"] = stats["memory_before_mb"] - stats["memory_after_mb"]
# Track GC statistics
self.gc_stats.append({
"timestamp": time.time(),
"collected": collected,
"memory_freed": stats["memory_freed_mb"]
})
return stats
def get_memory_report(self) -> Dict[str, Any]:
"""Get detailed memory usage report"""
# Get memory summary
summary = self.memory_tracker.create_summary()
# Find top memory consumers
top_consumers = []
for entry in sorted(summary, key=lambda x: x[2], reverse=True)[:10]:
filename, lineno, size, count = entry
top_consumers.append({
"location": f"{filename}:{lineno}",
"size_mb": size / 1024 / 1024,
"count": count
})
# Process memory info
process = psutil.Process()
memory_info = process.memory_info()
return {
"current_memory_mb": memory_info.rss / 1024 / 1024,
"peak_memory_mb": memory_info.peak_wset / 1024 / 1024 if hasattr(memory_info, 'peak_wset') else 0,
"top_consumers": top_consumers,
"gc_stats": self.gc_stats[-10:], # Last 10 GC runs
"large_objects_tracked": len(self.large_objects)
}
@staticmethod
def optimize_data_structure(data: Any) -> Any:
"""Optimize data structures for memory efficiency"""
if isinstance(data, list):
# Use array for homogeneous numeric data
if all(isinstance(x, (int, float)) for x in data):
import array
return array.array('d' if any(isinstance(x, float) for x in data) else 'l', data)
elif isinstance(data, dict):
# Use __slots__ for objects if possible
if len(data) < 10: # Small dicts
class SlottedDict:
__slots__ = list(data.keys())
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
return SlottedDict(**data)
return data
Performance Testing
Load Testing Framework
import asyncio
import statistics
from typing import List, Callable
class LoadTester:
def __init__(self):
self.results = []
async def run_load_test(self,
test_func: Callable,
concurrent_users: int = 10,
requests_per_user: int = 100,
ramp_up_time: float = 10.0) -> Dict[str, Any]:
"""Run load test with gradual ramp-up"""
# Calculate ramp-up delay
ramp_up_delay = ramp_up_time / concurrent_users
# Create user tasks
user_tasks = []
for user_id in range(concurrent_users):
# Stagger user start times
start_delay = user_id * ramp_up_delay
user_task = asyncio.create_task(
self._simulate_user(
user_id,
test_func,
requests_per_user,
start_delay
)
)
user_tasks.append(user_task)
# Wait for all users to complete
await asyncio.gather(*user_tasks)
# Calculate statistics
return self._calculate_statistics()
async def _simulate_user(self,
user_id: int,
test_func: Callable,
num_requests: int,
start_delay: float):
"""Simulate a single user"""
# Wait for ramp-up
await asyncio.sleep(start_delay)
for request_id in range(num_requests):
start_time = time.time()
success = False
error = None
try:
await test_func(user_id, request_id)
success = True
except Exception as e:
error = str(e)
response_time = time.time() - start_time
self.results.append({
"user_id": user_id,
"request_id": request_id,
"response_time": response_time,
"success": success,
"error": error,
"timestamp": start_time
})
def _calculate_statistics(self) -> Dict[str, Any]:
"""Calculate load test statistics"""
if not self.results:
return {"error": "No results collected"}
# Response times
response_times = [r["response_time"] for r in self.results]
successful_times = [r["response_time"] for r in self.results if r["success"]]
# Error rate
total_requests = len(self.results)
failed_requests = sum(1 for r in self.results if not r["success"])
error_rate = failed_requests / total_requests
# Throughput
test_duration = max(r["timestamp"] for r in self.results) - min(r["timestamp"] for r in self.results)
throughput = total_requests / test_duration if test_duration > 0 else 0
return {
"total_requests": total_requests,
"successful_requests": total_requests - failed_requests,
"failed_requests": failed_requests,
"error_rate": error_rate,
"throughput_rps": throughput,
"response_time": {
"min": min(response_times),
"max": max(response_times),
"mean": statistics.mean(response_times),
"median": statistics.median(response_times),
"p95": statistics.quantiles(response_times, n=20)[18] if len(response_times) > 20 else max(response_times),
"p99": statistics.quantiles(response_times, n=100)[98] if len(response_times) > 100 else max(response_times)
}
}
Optimization Checklist
1. Code-Level Optimizations
class OptimizationChecker:
"""Check for common optimization opportunities"""
@staticmethod
def check_n_plus_one_queries(operations: List[Dict]) -> List[str]:
"""Detect N+1 query patterns"""
issues = []
# Group operations by type and timing
operation_groups = {}
for op in operations:
key = op["type"]
if key not in operation_groups:
operation_groups[key] = []
operation_groups[key].append(op["timestamp"])
# Check for repeated operations in tight loops
for op_type, timestamps in operation_groups.items():
if len(timestamps) > 10:
# Check if operations are clustered
timestamps.sort()
clusters = []
current_cluster = [timestamps[0]]
for ts in timestamps[1:]:
if ts - current_cluster[-1] < 0.1: # Within 100ms
current_cluster.append(ts)
else:
if len(current_cluster) > 5:
clusters.append(current_cluster)
current_cluster = [ts]
if clusters:
issues.append(
f"Potential N+1 query pattern detected for {op_type}: "
f"{len(clusters)} clusters with avg size {sum(len(c) for c in clusters) / len(clusters):.1f}"
)
return issues
@staticmethod
def check_synchronous_io(code_metrics: Dict) -> List[str]:
"""Check for synchronous I/O in async context"""
issues = []
sync_io_patterns = [
"time.sleep",
"requests.get",
"open(",
"file.read",
"file.write"
]
for pattern in sync_io_patterns:
if pattern in code_metrics.get("function_calls", []):
issues.append(
f"Synchronous I/O detected: {pattern}. "
f"Consider using async alternative."
)
return issues
Best Practices
-
Profile Before Optimizing: Always measure before making changes
with profiler.profile_operation("agent", "inference"): result = await agent.process_request(request) -
Set Performance Budgets: Define acceptable performance thresholds
PERFORMANCE_BUDGETS = { "api_response_time_p95": 1.0, # seconds "memory_per_request": 50, # MB "tokens_per_request": 1000, # max tokens } -
Monitor Production Performance: Track real-world metrics
@app.middleware("http") async def add_performance_monitoring(request, call_next): start_time = time.time() response = await call_next(request) # Record metrics latency = time.time() - start_time metrics.record("http_request_duration", latency, { "method": request.method, "endpoint": request.url.path, "status": response.status_code }) return response
Testing Performance Optimizations
import pytest
import asyncio
@pytest.mark.asyncio
async def test_batch_processor():
processor = BatchProcessor[int, int](batch_size=5, max_wait_time=0.05)
# Process function that doubles values
def double_batch(items: List[int]) -> List[int]:
return [x * 2 for x in items]
# Submit multiple items
tasks = []
for i in range(10):
task = processor.process(i, double_batch)
tasks.append(task)
results = await asyncio.gather(*tasks)
assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
def test_intelligent_cache():
cache = IntelligentCache(max_size=3)
call_count = 0
@cache.cached()
def expensive_function(x):
nonlocal call_count
call_count += 1
time.sleep(0.1) # Simulate expensive operation
return x * 2
# First call - cache miss
result1 = expensive_function(5)
assert result1 == 10
assert call_count == 1
# Second call - cache hit
result2 = expensive_function(5)
assert result2 == 10
assert call_count == 1
# Check cache stats
stats = cache.get_cache_stats()
assert stats["hit_rate"] == 0.5
assert stats["estimated_time_saved"] >= 0.1
@pytest.mark.asyncio
async def test_connection_pool():
pool = ConnectionPool(min_connections=2, max_connections=5)
await pool.initialize()
# Acquire multiple connections
connections = []
for _ in range(3):
conn = await pool.acquire()
connections.append(conn)
stats = pool.get_pool_stats()
assert stats["active_connections"] == 3
# Release connections
for conn in connections:
await pool.release(conn)
stats = pool.get_pool_stats()
assert stats["active_connections"] == 0

