Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Performance Tuning Guidelines

Optimizing performance in multi-agent systems requires a systematic approach to identify bottlenecks and implement targeted improvements. This guide provides strategies for achieving optimal performance. To measure where time goes in your agents, see Profiling.

Performance Analysis Framework

Key Performance Indicators (KPIs)

  1. Response Time: End-to-end request latency
  2. Throughput: Requests processed per second
  3. Resource Utilization: CPU, memory, and I/O usage
  4. Concurrency: Parallel agent execution efficiency
  5. Token Efficiency: Tokens used per task

Performance Profiling

1. Agent Performance Profiler

Comprehensive profiling for multi-agent systems:
import time
import psutil
import cProfile
import pstats
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
import threading
from contextlib import contextmanager
import io

@dataclass
class PerformanceMetrics:
    agent_name: str
    operation: str
    start_time: float
    end_time: Optional[float] = None
    cpu_percent_start: float = 0
    cpu_percent_end: float = 0
    memory_mb_start: float = 0
    memory_mb_end: float = 0
    tokens_used: int = 0
    cache_hits: int = 0
    cache_misses: int = 0
    custom_metrics: Dict[str, Any] = field(default_factory=dict)

class PerformanceProfiler:
    def __init__(self):
        self.metrics: List[PerformanceMetrics] = []
        self.active_profiles: Dict[str, cProfile.Profile] = {}
        self._lock = threading.Lock()
        self.process = psutil.Process()
    
    @contextmanager
    def profile_operation(self, agent_name: str, operation: str):
        """Profile a specific operation"""
        # Start profiling
        profile = cProfile.Profile()
        profile_key = f"{agent_name}:{operation}"
        
        # Collect initial metrics
        metric = PerformanceMetrics(
            agent_name=agent_name,
            operation=operation,
            start_time=time.time(),
            cpu_percent_start=self.process.cpu_percent(),
            memory_mb_start=self.process.memory_info().rss / 1024 / 1024
        )
        
        with self._lock:
            self.active_profiles[profile_key] = profile
        
        profile.enable()
        
        try:
            yield metric
        finally:
            profile.disable()
            
            # Collect final metrics
            metric.end_time = time.time()
            metric.cpu_percent_end = self.process.cpu_percent()
            metric.memory_mb_end = self.process.memory_info().rss / 1024 / 1024
            
            with self._lock:
                self.metrics.append(metric)
                if profile_key in self.active_profiles:
                    del self.active_profiles[profile_key]
    
    def get_profile_stats(self, agent_name: str, operation: str, 
                         top_n: int = 20) -> str:
        """Get detailed profile statistics"""
        profile_key = f"{agent_name}:{operation}"
        
        # Find all metrics for this operation
        operation_metrics = [
            m for m in self.metrics 
            if m.agent_name == agent_name and m.operation == operation
        ]
        
        if not operation_metrics:
            return f"No profiling data for {profile_key}"
        
        # Aggregate statistics
        total_time = sum(m.end_time - m.start_time for m in operation_metrics if m.end_time)
        avg_time = total_time / len(operation_metrics)
        
        # Memory statistics
        memory_deltas = [
            m.memory_mb_end - m.memory_mb_start 
            for m in operation_metrics 
            if m.memory_mb_end > 0
        ]
        avg_memory_delta = sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0
        
        stats = f"""
Performance Profile: {profile_key}
===================================
Executions: {len(operation_metrics)}
Total Time: {total_time:.2f}s
Average Time: {avg_time:.2f}s
Average Memory Delta: {avg_memory_delta:.2f}MB

Top Time-Consuming Operations:
"""
        
        # Add detailed timing breakdown if available
        if hasattr(operation_metrics[-1], '_profile_stats'):
            s = io.StringIO()
            ps = pstats.Stats(operation_metrics[-1]._profile_stats, stream=s)
            ps.strip_dirs().sort_stats('cumulative').print_stats(top_n)
            stats += s.getvalue()
        
        return stats
    
    def identify_bottlenecks(self, threshold_seconds: float = 1.0) -> List[Dict[str, Any]]:
        """Identify performance bottlenecks"""
        bottlenecks = []
        
        # Group metrics by agent and operation
        operation_groups = {}
        for metric in self.metrics:
            if metric.end_time is None:
                continue
                
            key = f"{metric.agent_name}:{metric.operation}"
            if key not in operation_groups:
                operation_groups[key] = []
            operation_groups[key].append(metric)
        
        # Analyze each operation
        for key, metrics in operation_groups.items():
            execution_times = [m.end_time - m.start_time for m in metrics]
            avg_time = sum(execution_times) / len(execution_times)
            
            if avg_time > threshold_seconds:
                memory_deltas = [
                    m.memory_mb_end - m.memory_mb_start 
                    for m in metrics 
                    if m.memory_mb_end > 0
                ]
                
                bottlenecks.append({
                    "operation": key,
                    "avg_execution_time": avg_time,
                    "max_execution_time": max(execution_times),
                    "total_executions": len(metrics),
                    "avg_memory_delta": sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0,
                    "severity": "high" if avg_time > threshold_seconds * 2 else "medium"
                })
        
        # Sort by average execution time
        bottlenecks.sort(key=lambda x: x["avg_execution_time"], reverse=True)
        
        return bottlenecks

2. Async Performance Monitor

Monitor async operations and concurrency:
import asyncio
from typing import Set

class AsyncPerformanceMonitor:
    def __init__(self):
        self.active_tasks: Set[str] = set()
        self.task_metrics: Dict[str, Dict[str, Any]] = {}
        self._lock = asyncio.Lock()
        self.max_concurrent_tasks = 0
    
    @contextmanager
    async def monitor_async_operation(self, operation_name: str):
        """Monitor an async operation"""
        task_id = f"{operation_name}_{id(asyncio.current_task())}"
        
        async with self._lock:
            self.active_tasks.add(task_id)
            self.max_concurrent_tasks = max(
                self.max_concurrent_tasks, 
                len(self.active_tasks)
            )
            
            self.task_metrics[task_id] = {
                "operation": operation_name,
                "start_time": time.time(),
                "status": "running"
            }
        
        try:
            yield
            
            async with self._lock:
                self.task_metrics[task_id]["status"] = "completed"
                self.task_metrics[task_id]["end_time"] = time.time()
                
        except Exception as e:
            async with self._lock:
                self.task_metrics[task_id]["status"] = "failed"
                self.task_metrics[task_id]["error"] = str(e)
                self.task_metrics[task_id]["end_time"] = time.time()
            raise
            
        finally:
            async with self._lock:
                self.active_tasks.discard(task_id)
    
    def get_concurrency_report(self) -> Dict[str, Any]:
        """Get concurrency performance report"""
        completed_tasks = [
            m for m in self.task_metrics.values() 
            if m["status"] == "completed" and "end_time" in m
        ]
        
        if not completed_tasks:
            return {"message": "No completed tasks"}
        
        # Calculate concurrency metrics
        execution_times = [
            t["end_time"] - t["start_time"] 
            for t in completed_tasks
        ]
        
        # Group by operation
        operation_stats = {}
        for task in completed_tasks:
            op = task["operation"]
            if op not in operation_stats:
                operation_stats[op] = {
                    "count": 0,
                    "total_time": 0,
                    "max_concurrent": 0
                }
            
            operation_stats[op]["count"] += 1
            operation_stats[op]["total_time"] += task["end_time"] - task["start_time"]
        
        return {
            "max_concurrent_tasks": self.max_concurrent_tasks,
            "current_active_tasks": len(self.active_tasks),
            "total_completed_tasks": len(completed_tasks),
            "avg_execution_time": sum(execution_times) / len(execution_times),
            "operation_stats": operation_stats
        }

Optimization Strategies

1. Caching Strategy

Implement intelligent caching for expensive operations:
from functools import lru_cache
import hashlib
import pickle

class IntelligentCache:
    def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
        self.max_size = max_size
        self.ttl_seconds = ttl_seconds
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.access_count: Dict[str, int] = {}
        self.computation_time: Dict[str, float] = {}
        self._lock = threading.Lock()
    
    def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """Generate cache key from function arguments"""
        key_data = {
            "func": func_name,
            "args": args,
            "kwargs": kwargs
        }
        
        # Create hash of the data
        key_str = pickle.dumps(key_data)
        return hashlib.sha256(key_str).hexdigest()
    
    def cached(self, ttl_override: Optional[int] = None):
        """Decorator for caching function results"""
        def decorator(func):
            def wrapper(*args, **kwargs):
                cache_key = self._generate_key(func.__name__, args, kwargs)
                
                # Check cache
                with self._lock:
                    if cache_key in self.cache:
                        entry = self.cache[cache_key]
                        
                        # Check TTL
                        age = time.time() - entry["timestamp"]
                        ttl = ttl_override or self.ttl_seconds
                        
                        if age < ttl:
                            self.access_count[cache_key] = self.access_count.get(cache_key, 0) + 1
                            return entry["value"]
                
                # Compute value
                start_time = time.time()
                result = func(*args, **kwargs)
                computation_time = time.time() - start_time
                
                # Store in cache
                with self._lock:
                    # Evict if necessary
                    if len(self.cache) >= self.max_size:
                        self._evict_least_valuable()
                    
                    self.cache[cache_key] = {
                        "value": result,
                        "timestamp": time.time()
                    }
                    self.access_count[cache_key] = 1
                    self.computation_time[cache_key] = computation_time
                
                return result
            
            return wrapper
        return decorator
    
    def _evict_least_valuable(self):
        """Evict least valuable cache entry"""
        if not self.cache:
            return
        
        # Calculate value score for each entry
        scores = {}
        current_time = time.time()
        
        for key, entry in self.cache.items():
            age = current_time - entry["timestamp"]
            access_count = self.access_count.get(key, 1)
            comp_time = self.computation_time.get(key, 0.1)
            
            # Value = (access_count * computation_time) / age
            value_score = (access_count * comp_time) / max(age, 1)
            scores[key] = value_score
        
        # Evict lowest score
        evict_key = min(scores.keys(), key=lambda k: scores[k])
        del self.cache[evict_key]
        self.access_count.pop(evict_key, None)
        self.computation_time.pop(evict_key, None)
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache performance statistics"""
        with self._lock:
            if not self.access_count:
                return {"cache_size": len(self.cache)}
            
            total_hits = sum(count - 1 for count in self.access_count.values())
            total_misses = len(self.access_count)
            hit_rate = total_hits / (total_hits + total_misses) if (total_hits + total_misses) > 0 else 0
            
            # Calculate time saved
            time_saved = sum(
                (count - 1) * self.computation_time.get(key, 0)
                for key, count in self.access_count.items()
            )
            
            return {
                "cache_size": len(self.cache),
                "hit_rate": hit_rate,
                "total_hits": total_hits,
                "total_misses": total_misses,
                "estimated_time_saved": time_saved,
                "avg_computation_time": sum(self.computation_time.values()) / len(self.computation_time)
            }

2. Batch Processing Optimization

Optimize batch operations for better throughput:
from typing import TypeVar, Generic

T = TypeVar('T')
R = TypeVar('R')

class BatchProcessor(Generic[T, R]):
    def __init__(self,
                 batch_size: int = 100,
                 max_wait_time: float = 0.1,
                 max_workers: int = 4):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.max_workers = max_workers
        
        self.pending_items: List[Tuple[T, asyncio.Future]] = []
        self.processing = False
        self._lock = asyncio.Lock()
    
    async def process(self, item: T, processor_func: Callable[[List[T]], List[R]]) -> R:
        """Add item for batch processing"""
        future = asyncio.Future()
        
        async with self._lock:
            self.pending_items.append((item, future))
            
            # Start processing if not already running
            if not self.processing:
                asyncio.create_task(self._process_batches(processor_func))
        
        return await future
    
    async def _process_batches(self, processor_func: Callable[[List[T]], List[R]]):
        """Process items in batches"""
        self.processing = True
        
        try:
            while True:
                # Wait for batch to fill or timeout
                start_wait = time.time()
                
                while len(self.pending_items) < self.batch_size:
                    if time.time() - start_wait > self.max_wait_time:
                        break
                    
                    if not self.pending_items:
                        await asyncio.sleep(0.01)
                        continue
                    
                    await asyncio.sleep(0.001)
                
                # Get batch
                async with self._lock:
                    if not self.pending_items:
                        break
                    
                    batch_items = self.pending_items[:self.batch_size]
                    self.pending_items = self.pending_items[self.batch_size:]
                
                # Process batch
                items = [item for item, _ in batch_items]
                futures = [future for _, future in batch_items]
                
                try:
                    # Process in parallel if supported
                    if asyncio.iscoroutinefunction(processor_func):
                        results = await processor_func(items)
                    else:
                        # Run in thread pool for CPU-bound operations
                        loop = asyncio.get_event_loop()
                        results = await loop.run_in_executor(None, processor_func, items)
                    
                    # Distribute results
                    for future, result in zip(futures, results):
                        future.set_result(result)
                        
                except Exception as e:
                    # Set exception for all futures in batch
                    for future in futures:
                        future.set_exception(e)
        
        finally:
            self.processing = False

3. Connection Pooling

Optimize resource connections:
import asyncio
from asyncio import Queue
import aiohttp

class ConnectionPool:
    def __init__(self,
                 min_connections: int = 5,
                 max_connections: int = 20,
                 connection_timeout: float = 30.0):
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.connection_timeout = connection_timeout
        
        self.available_connections: Queue = Queue()
        self.active_connections = 0
        self.total_connections = 0
        
        self._lock = asyncio.Lock()
        self._connector = None
        self._stats = {
            "connections_created": 0,
            "connections_reused": 0,
            "connection_errors": 0,
            "wait_time_total": 0
        }
    
    async def initialize(self):
        """Initialize connection pool"""
        self._connector = aiohttp.TCPConnector(
            limit=self.max_connections,
            limit_per_host=self.max_connections
        )
        
        # Create minimum connections
        for _ in range(self.min_connections):
            conn = await self._create_connection()
            await self.available_connections.put(conn)
    
    async def acquire(self) -> aiohttp.ClientSession:
        """Acquire a connection from the pool"""
        start_time = time.time()
        
        # Try to get available connection
        try:
            connection = await asyncio.wait_for(
                self.available_connections.get(),
                timeout=0.1
            )
            
            self._stats["connections_reused"] += 1
            
        except asyncio.TimeoutError:
            # Create new connection if under limit
            async with self._lock:
                if self.total_connections < self.max_connections:
                    connection = await self._create_connection()
                else:
                    # Wait for connection to become available
                    connection = await self.available_connections.get()
                    self._stats["connections_reused"] += 1
        
        self._stats["wait_time_total"] += time.time() - start_time
        
        async with self._lock:
            self.active_connections += 1
        
        return connection
    
    async def release(self, connection: aiohttp.ClientSession):
        """Release connection back to pool"""
        async with self._lock:
            self.active_connections -= 1
        
        # Check if connection is still valid
        if not connection.closed:
            await self.available_connections.put(connection)
        else:
            async with self._lock:
                self.total_connections -= 1
            
            # Create replacement if below minimum
            if self.total_connections < self.min_connections:
                try:
                    new_conn = await self._create_connection()
                    await self.available_connections.put(new_conn)
                except Exception:
                    pass
    
    async def _create_connection(self) -> aiohttp.ClientSession:
        """Create a new connection"""
        try:
            session = aiohttp.ClientSession(
                connector=self._connector,
                timeout=aiohttp.ClientTimeout(total=self.connection_timeout)
            )
            
            async with self._lock:
                self.total_connections += 1
                self._stats["connections_created"] += 1
            
            return session
            
        except Exception as e:
            self._stats["connection_errors"] += 1
            raise
    
    def get_pool_stats(self) -> Dict[str, Any]:
        """Get connection pool statistics"""
        return {
            "total_connections": self.total_connections,
            "active_connections": self.active_connections,
            "available_connections": self.available_connections.qsize(),
            "connection_reuse_rate": (
                self._stats["connections_reused"] / 
                max(self._stats["connections_created"] + self._stats["connections_reused"], 1)
            ),
            **self._stats
        }

4. Memory Optimization

Optimize memory usage patterns:
import gc
import weakref
from pympler import tracker

class MemoryOptimizer:
    def __init__(self):
        self.memory_tracker = tracker.SummaryTracker()
        self.large_objects = weakref.WeakValueDictionary()
        self.gc_stats = []
    
    def track_large_object(self, obj_id: str, obj: Any):
        """Track large objects for memory optimization"""
        self.large_objects[obj_id] = obj
    
    def optimize_memory(self) -> Dict[str, Any]:
        """Perform memory optimization"""
        stats = {}
        
        # Get current memory usage
        process = psutil.Process()
        stats["memory_before_mb"] = process.memory_info().rss / 1024 / 1024
        
        # Clear weak references
        stats["weak_refs_cleared"] = len(self.large_objects)
        self.large_objects.clear()
        
        # Run garbage collection
        gc_stats_before = gc.get_stats()
        collected = gc.collect(2)  # Full collection
        gc_stats_after = gc.get_stats()
        
        stats["objects_collected"] = collected
        stats["memory_after_mb"] = process.memory_info().rss / 1024 / 1024
        stats["memory_freed_mb"] = stats["memory_before_mb"] - stats["memory_after_mb"]
        
        # Track GC statistics
        self.gc_stats.append({
            "timestamp": time.time(),
            "collected": collected,
            "memory_freed": stats["memory_freed_mb"]
        })
        
        return stats
    
    def get_memory_report(self) -> Dict[str, Any]:
        """Get detailed memory usage report"""
        # Get memory summary
        summary = self.memory_tracker.create_summary()
        
        # Find top memory consumers
        top_consumers = []
        for entry in sorted(summary, key=lambda x: x[2], reverse=True)[:10]:
            filename, lineno, size, count = entry
            top_consumers.append({
                "location": f"{filename}:{lineno}",
                "size_mb": size / 1024 / 1024,
                "count": count
            })
        
        # Process memory info
        process = psutil.Process()
        memory_info = process.memory_info()
        
        return {
            "current_memory_mb": memory_info.rss / 1024 / 1024,
            "peak_memory_mb": memory_info.peak_wset / 1024 / 1024 if hasattr(memory_info, 'peak_wset') else 0,
            "top_consumers": top_consumers,
            "gc_stats": self.gc_stats[-10:],  # Last 10 GC runs
            "large_objects_tracked": len(self.large_objects)
        }
    
    @staticmethod
    def optimize_data_structure(data: Any) -> Any:
        """Optimize data structures for memory efficiency"""
        if isinstance(data, list):
            # Use array for homogeneous numeric data
            if all(isinstance(x, (int, float)) for x in data):
                import array
                return array.array('d' if any(isinstance(x, float) for x in data) else 'l', data)
        
        elif isinstance(data, dict):
            # Use __slots__ for objects if possible
            if len(data) < 10:  # Small dicts
                class SlottedDict:
                    __slots__ = list(data.keys())
                    
                    def __init__(self, **kwargs):
                        for k, v in kwargs.items():
                            setattr(self, k, v)
                
                return SlottedDict(**data)
        
        return data

Performance Testing

Load Testing Framework

import asyncio
import statistics
from typing import List, Callable

class LoadTester:
    def __init__(self):
        self.results = []
        
    async def run_load_test(self,
                          test_func: Callable,
                          concurrent_users: int = 10,
                          requests_per_user: int = 100,
                          ramp_up_time: float = 10.0) -> Dict[str, Any]:
        """Run load test with gradual ramp-up"""
        
        # Calculate ramp-up delay
        ramp_up_delay = ramp_up_time / concurrent_users
        
        # Create user tasks
        user_tasks = []
        
        for user_id in range(concurrent_users):
            # Stagger user start times
            start_delay = user_id * ramp_up_delay
            
            user_task = asyncio.create_task(
                self._simulate_user(
                    user_id,
                    test_func,
                    requests_per_user,
                    start_delay
                )
            )
            user_tasks.append(user_task)
        
        # Wait for all users to complete
        await asyncio.gather(*user_tasks)
        
        # Calculate statistics
        return self._calculate_statistics()
    
    async def _simulate_user(self,
                           user_id: int,
                           test_func: Callable,
                           num_requests: int,
                           start_delay: float):
        """Simulate a single user"""
        
        # Wait for ramp-up
        await asyncio.sleep(start_delay)
        
        for request_id in range(num_requests):
            start_time = time.time()
            success = False
            error = None
            
            try:
                await test_func(user_id, request_id)
                success = True
            except Exception as e:
                error = str(e)
            
            response_time = time.time() - start_time
            
            self.results.append({
                "user_id": user_id,
                "request_id": request_id,
                "response_time": response_time,
                "success": success,
                "error": error,
                "timestamp": start_time
            })
    
    def _calculate_statistics(self) -> Dict[str, Any]:
        """Calculate load test statistics"""
        if not self.results:
            return {"error": "No results collected"}
        
        # Response times
        response_times = [r["response_time"] for r in self.results]
        successful_times = [r["response_time"] for r in self.results if r["success"]]
        
        # Error rate
        total_requests = len(self.results)
        failed_requests = sum(1 for r in self.results if not r["success"])
        error_rate = failed_requests / total_requests
        
        # Throughput
        test_duration = max(r["timestamp"] for r in self.results) - min(r["timestamp"] for r in self.results)
        throughput = total_requests / test_duration if test_duration > 0 else 0
        
        return {
            "total_requests": total_requests,
            "successful_requests": total_requests - failed_requests,
            "failed_requests": failed_requests,
            "error_rate": error_rate,
            "throughput_rps": throughput,
            "response_time": {
                "min": min(response_times),
                "max": max(response_times),
                "mean": statistics.mean(response_times),
                "median": statistics.median(response_times),
                "p95": statistics.quantiles(response_times, n=20)[18] if len(response_times) > 20 else max(response_times),
                "p99": statistics.quantiles(response_times, n=100)[98] if len(response_times) > 100 else max(response_times)
            }
        }

Optimization Checklist

1. Code-Level Optimizations

class OptimizationChecker:
    """Check for common optimization opportunities"""
    
    @staticmethod
    def check_n_plus_one_queries(operations: List[Dict]) -> List[str]:
        """Detect N+1 query patterns"""
        issues = []
        
        # Group operations by type and timing
        operation_groups = {}
        
        for op in operations:
            key = op["type"]
            if key not in operation_groups:
                operation_groups[key] = []
            operation_groups[key].append(op["timestamp"])
        
        # Check for repeated operations in tight loops
        for op_type, timestamps in operation_groups.items():
            if len(timestamps) > 10:
                # Check if operations are clustered
                timestamps.sort()
                
                clusters = []
                current_cluster = [timestamps[0]]
                
                for ts in timestamps[1:]:
                    if ts - current_cluster[-1] < 0.1:  # Within 100ms
                        current_cluster.append(ts)
                    else:
                        if len(current_cluster) > 5:
                            clusters.append(current_cluster)
                        current_cluster = [ts]
                
                if clusters:
                    issues.append(
                        f"Potential N+1 query pattern detected for {op_type}: "
                        f"{len(clusters)} clusters with avg size {sum(len(c) for c in clusters) / len(clusters):.1f}"
                    )
        
        return issues
    
    @staticmethod
    def check_synchronous_io(code_metrics: Dict) -> List[str]:
        """Check for synchronous I/O in async context"""
        issues = []
        
        sync_io_patterns = [
            "time.sleep",
            "requests.get",
            "open(",
            "file.read",
            "file.write"
        ]
        
        for pattern in sync_io_patterns:
            if pattern in code_metrics.get("function_calls", []):
                issues.append(
                    f"Synchronous I/O detected: {pattern}. "
                    f"Consider using async alternative."
                )
        
        return issues

Best Practices

  1. Profile Before Optimizing: Always measure before making changes
    with profiler.profile_operation("agent", "inference"):
        result = await agent.process_request(request)
    
  2. Set Performance Budgets: Define acceptable performance thresholds
    PERFORMANCE_BUDGETS = {
        "api_response_time_p95": 1.0,  # seconds
        "memory_per_request": 50,       # MB
        "tokens_per_request": 1000,     # max tokens
    }
    
  3. Monitor Production Performance: Track real-world metrics
    @app.middleware("http")
    async def add_performance_monitoring(request, call_next):
        start_time = time.time()
        response = await call_next(request)
        
        # Record metrics
        latency = time.time() - start_time
        metrics.record("http_request_duration", latency, {
            "method": request.method,
            "endpoint": request.url.path,
            "status": response.status_code
        })
        
        return response
    

Testing Performance Optimizations

import pytest
import asyncio

@pytest.mark.asyncio
async def test_batch_processor():
    processor = BatchProcessor[int, int](batch_size=5, max_wait_time=0.05)
    
    # Process function that doubles values
    def double_batch(items: List[int]) -> List[int]:
        return [x * 2 for x in items]
    
    # Submit multiple items
    tasks = []
    for i in range(10):
        task = processor.process(i, double_batch)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    
    assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

def test_intelligent_cache():
    cache = IntelligentCache(max_size=3)
    
    call_count = 0
    
    @cache.cached()
    def expensive_function(x):
        nonlocal call_count
        call_count += 1
        time.sleep(0.1)  # Simulate expensive operation
        return x * 2
    
    # First call - cache miss
    result1 = expensive_function(5)
    assert result1 == 10
    assert call_count == 1
    
    # Second call - cache hit
    result2 = expensive_function(5)
    assert result2 == 10
    assert call_count == 1
    
    # Check cache stats
    stats = cache.get_cache_stats()
    assert stats["hit_rate"] == 0.5
    assert stats["estimated_time_saved"] >= 0.1

@pytest.mark.asyncio
async def test_connection_pool():
    pool = ConnectionPool(min_connections=2, max_connections=5)
    await pool.initialize()
    
    # Acquire multiple connections
    connections = []
    for _ in range(3):
        conn = await pool.acquire()
        connections.append(conn)
    
    stats = pool.get_pool_stats()
    assert stats["active_connections"] == 3
    
    # Release connections
    for conn in connections:
        await pool.release(conn)
    
    stats = pool.get_pool_stats()
    assert stats["active_connections"] == 0

Conclusion

Performance tuning in multi-agent systems requires a systematic approach combining profiling, analysis, and targeted optimizations. By following these guidelines and continuously monitoring performance metrics, you can build systems that scale efficiently while maintaining responsiveness.