Documentation Index
Fetch the complete documentation index at: https://docs.praison.ai/llms.txt
Use this file to discover all available pages before exploring further.
Latency Tracking
Implement comprehensive performance monitoring to track, analyse, and optimise latency across your PraisonAI applications.Overview
Latency tracking in PraisonAI enables:- Phase-specific performance monitoring
- Request-level metrics collection
- Thread-safe concurrent tracking
- Integration with monitoring systems
- Performance optimisation insights
Basic Latency Tracking
Using the Latency Tracker Tool
from praisonaiagents import Agent
from examples.custom_tools import LatencyTrackerTool
# Create latency tracker
latency_tracker = LatencyTrackerTool()
# Create agent with latency tracking
agent = Agent(
name="TrackedAgent",
instructions="Process requests with performance monitoring",
tools=[latency_tracker, other_tools]
)
# Use the agent
response = agent.chat("Analyse this data and generate a report")
# Get metrics
metrics = latency_tracker.get_metrics()
print(f"Total requests: {metrics['total_requests']}")
print(f"Average latency: {metrics['average_latency_ms']}ms")
Direct Phase Tracking
# Track specific phases
with latency_tracker.track("data_processing"):
# Process data
processed_data = process_large_dataset()
with latency_tracker.track("llm_generation"):
# Generate response
response = agent.chat(processed_data)
# Get phase-specific metrics
phase_metrics = latency_tracker.get_metrics_by_phase()
for phase, stats in phase_metrics.items():
print(f"{phase}: avg={stats['avg']:.2f}ms, max={stats['max']:.2f}ms")
Advanced Tracking Patterns
Context Manager Pattern
import time
from contextlib import contextmanager
class EnhancedLatencyTracker:
def __init__(self):
self.metrics = {}
self.active_timers = {}
@contextmanager
def track_phase(self, phase_name, metadata=None):
start_time = time.time()
timer_id = f"{phase_name}_{id(start_time)}"
self.active_timers[timer_id] = {
"phase": phase_name,
"start": start_time,
"metadata": metadata or {}
}
try:
yield timer_id
finally:
duration = (time.time() - start_time) * 1000 # ms
if phase_name not in self.metrics:
self.metrics[phase_name] = []
self.metrics[phase_name].append({
"duration": duration,
"timestamp": start_time,
"metadata": self.active_timers[timer_id]["metadata"]
})
del self.active_timers[timer_id]
# Usage
tracker = EnhancedLatencyTracker()
with tracker.track_phase("api_call", {"endpoint": "/analyse"}):
# Make API call
result = make_api_call()
with tracker.track_phase("post_processing", {"size": len(result)}):
# Process results
final_result = process_results(result)
Decorator Pattern
from functools import wraps
import asyncio
def track_latency(phase_name=None):
def decorator(func):
actual_phase = phase_name or func.__name__
@wraps(func)
def sync_wrapper(*args, **kwargs):
tracker = get_global_tracker() # Get tracker instance
with tracker.track(actual_phase):
return func(*args, **kwargs)
@wraps(func)
async def async_wrapper(*args, **kwargs):
tracker = get_global_tracker()
with tracker.track(actual_phase):
return await func(*args, **kwargs)
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
# Usage
@track_latency("database_query")
def fetch_user_data(user_id):
# Database operation
return db.query(f"SELECT * FROM users WHERE id = {user_id}")
@track_latency()
async def process_request(request):
# Async processing
result = await async_operation(request)
return result
Agent Wrapper Pattern
class TrackedAgent:
def __init__(self, agent, tracker):
self.agent = agent
self.tracker = tracker
self.request_count = 0
def chat(self, message, **kwargs):
self.request_count += 1
request_id = f"req_{self.request_count}"
# Track overall request
with self.tracker.track(f"request_{request_id}"):
# Track planning phase
with self.tracker.track("planning"):
plan = self.agent.plan(message)
# Track execution phase
with self.tracker.track("execution"):
result = self.agent.execute(plan, **kwargs)
# Track formatting phase
with self.tracker.track("formatting"):
response = self.agent.format_response(result)
return response
def get_performance_report(self):
metrics = self.tracker.get_metrics()
return {
"total_requests": self.request_count,
"average_request_time": metrics.get("avg", 0),
"phase_breakdown": self.tracker.get_metrics_by_phase(),
"slowest_phase": self._identify_slowest_phase()
}
def _identify_slowest_phase(self):
phase_metrics = self.tracker.get_metrics_by_phase()
slowest = max(
phase_metrics.items(),
key=lambda x: x[1]["avg"],
default=(None, {"avg": 0})
)
return slowest[0]
MCP Server Integration
Tracking MCP Requests
from praisonaiagents import Agent
class MCPLatencyTracker:
def __init__(self):
self.mcp_metrics = {}
def track_mcp_request(self, server_name, tool_name):
key = f"{server_name}.{tool_name}"
if key not in self.mcp_metrics:
self.mcp_metrics[key] = {
"count": 0,
"total_ms": 0,
"errors": 0
}
start_time = time.time()
def record_result(success=True):
duration = (time.time() - start_time) * 1000
self.mcp_metrics[key]["count"] += 1
self.mcp_metrics[key]["total_ms"] += duration
if not success:
self.mcp_metrics[key]["errors"] += 1
return record_result
# Track MCP server calls
tracker = MCPLatencyTracker()
# Example with filesystem MCP
record = tracker.track_mcp_request("filesystem", "read_file")
try:
content = mcp_filesystem.read_file("data.txt")
record(success=True)
except Exception:
record(success=False)
raise
# Get MCP-specific metrics
for key, stats in tracker.mcp_metrics.items():
avg_latency = stats["total_ms"] / stats["count"] if stats["count"] > 0 else 0
error_rate = stats["errors"] / stats["count"] if stats["count"] > 0 else 0
print(f"{key}: {avg_latency:.2f}ms avg, {error_rate:.1%} error rate")
Metrics Analysis
Statistical Analysis
import numpy as np
from collections import defaultdict
class LatencyAnalyser:
def __init__(self, tracker):
self.tracker = tracker
def analyse_phase(self, phase_name):
metrics = self.tracker.get_metrics_by_phase().get(phase_name, {})
if not metrics.get("count"):
return None
# Get all latency values
latencies = metrics.get("all_values", [])
if not latencies:
return None
return {
"mean": np.mean(latencies),
"median": np.median(latencies),
"std_dev": np.std(latencies),
"percentiles": {
"p50": np.percentile(latencies, 50),
"p90": np.percentile(latencies, 90),
"p95": np.percentile(latencies, 95),
"p99": np.percentile(latencies, 99)
},
"outliers": self._detect_outliers(latencies)
}
def _detect_outliers(self, values):
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
return [v for v in values if v < lower_bound or v > upper_bound]
def generate_report(self):
report = {
"summary": self.tracker.get_metrics(),
"phases": {}
}
for phase in self.tracker.get_metrics_by_phase():
analysis = self.analyse_phase(phase)
if analysis:
report["phases"][phase] = analysis
return report
Trend Detection
class LatencyTrendDetector:
def __init__(self, window_size=100):
self.window_size = window_size
self.history = defaultdict(list)
def add_measurement(self, phase, latency):
self.history[phase].append({
"timestamp": time.time(),
"latency": latency
})
# Keep only recent measurements
if len(self.history[phase]) > self.window_size:
self.history[phase].pop(0)
def detect_trend(self, phase):
if len(self.history[phase]) < 10:
return "insufficient_data"
latencies = [m["latency"] for m in self.history[phase]]
# Simple linear regression
x = np.arange(len(latencies))
coefficients = np.polyfit(x, latencies, 1)
slope = coefficients[0]
# Determine trend
if abs(slope) < 0.1:
return "stable"
elif slope > 0:
return "increasing"
else:
return "decreasing"
def get_trend_report(self):
return {
phase: {
"trend": self.detect_trend(phase),
"recent_avg": np.mean([m["latency"] for m in self.history[phase][-10:]])
if len(self.history[phase]) >= 10 else None
}
for phase in self.history
}
Integration with Monitoring Systems
Prometheus Integration
from prometheus_client import Counter, Histogram, Gauge
import time
class PrometheusLatencyTracker:
def __init__(self):
# Define metrics
self.request_count = Counter(
'praisonai_requests_total',
'Total number of requests',
['agent', 'phase']
)
self.request_duration = Histogram(
'praisonai_request_duration_seconds',
'Request duration in seconds',
['agent', 'phase'],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0)
)
self.active_requests = Gauge(
'praisonai_active_requests',
'Number of active requests',
['agent']
)
def track_request(self, agent_name, phase):
self.request_count.labels(agent=agent_name, phase=phase).inc()
self.active_requests.labels(agent=agent_name).inc()
start_time = time.time()
def complete():
duration = time.time() - start_time
self.request_duration.labels(
agent=agent_name,
phase=phase
).observe(duration)
self.active_requests.labels(agent=agent_name).dec()
return complete
# Usage with agent
prom_tracker = PrometheusLatencyTracker()
class MonitoredAgent(Agent):
def chat(self, message):
# Track overall request
complete_request = prom_tracker.track_request(self.name, "total")
try:
# Track planning
complete_planning = prom_tracker.track_request(self.name, "planning")
plan = self.plan(message)
complete_planning()
# Track execution
complete_execution = prom_tracker.track_request(self.name, "execution")
result = self.execute(plan)
complete_execution()
return result
finally:
complete_request()
CloudWatch Integration
import boto3
from datetime import datetime
class CloudWatchLatencyTracker:
def __init__(self, namespace='PraisonAI'):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = namespace
self.metrics_buffer = []
def record_latency(self, agent_name, phase, latency_ms):
metric = {
'MetricName': 'Latency',
'Value': latency_ms,
'Unit': 'Milliseconds',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{
'Name': 'Agent',
'Value': agent_name
},
{
'Name': 'Phase',
'Value': phase
}
]
}
self.metrics_buffer.append(metric)
# Batch send when buffer is full
if len(self.metrics_buffer) >= 20:
self.flush_metrics()
def flush_metrics(self):
if self.metrics_buffer:
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=self.metrics_buffer
)
self.metrics_buffer = []
def create_alarm(self, agent_name, threshold_ms=1000):
self.cloudwatch.put_metric_alarm(
AlarmName=f'{agent_name}-HighLatency',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='Latency',
Namespace=self.namespace,
Period=300,
Statistic='Average',
Threshold=threshold_ms,
ActionsEnabled=True,
AlarmDescription=f'Alarm when {agent_name} latency exceeds {threshold_ms}ms',
Dimensions=[
{
'Name': 'Agent',
'Value': agent_name
}
]
)
Performance Optimisation
Identifying Bottlenecks
class PerformanceOptimiser:
def __init__(self, tracker):
self.tracker = tracker
def identify_bottlenecks(self, threshold_percentile=90):
phase_metrics = self.tracker.get_metrics_by_phase()
bottlenecks = []
# Calculate total average time
total_avg = sum(m["avg"] for m in phase_metrics.values())
for phase, metrics in phase_metrics.items():
# Check if phase takes disproportionate time
phase_percentage = (metrics["avg"] / total_avg) * 100
if phase_percentage > threshold_percentile:
bottlenecks.append({
"phase": phase,
"percentage": phase_percentage,
"avg_latency": metrics["avg"],
"recommendation": self._get_recommendation(phase, metrics)
})
return sorted(bottlenecks, key=lambda x: x["percentage"], reverse=True)
def _get_recommendation(self, phase, metrics):
recommendations = {
"llm_generation": "Consider using a faster model or implementing caching",
"tool_execution": "Optimise tool implementations or add parallelisation",
"planning": "Simplify agent instructions or add planning caches",
"memory_search": "Optimise embeddings or implement better indexing"
}
return recommendations.get(phase, "Investigate implementation for optimisation opportunities")
Caching Strategy
from functools import lru_cache
import hashlib
class CachedLatencyTracker:
def __init__(self, tracker):
self.tracker = tracker
self.cache_hits = 0
self.cache_misses = 0
@lru_cache(maxsize=1000)
def cached_operation(self, operation_key):
# Track cache miss
self.cache_misses += 1
with self.tracker.track("cached_operation"):
result = expensive_operation(operation_key)
return result
def get_with_tracking(self, key):
# Check if in cache
if key in self.cached_operation.cache_info():
self.cache_hits += 1
with self.tracker.track("cache_hit"):
return self.cached_operation(key)
else:
with self.tracker.track("cache_miss"):
return self.cached_operation(key)
def get_cache_statistics(self):
cache_info = self.cached_operation.cache_info()
return {
"hit_rate": self.cache_hits / (self.cache_hits + self.cache_misses)
if (self.cache_hits + self.cache_misses) > 0 else 0,
"hits": self.cache_hits,
"misses": self.cache_misses,
"cache_size": cache_info.currsize,
"max_size": cache_info.maxsize
}
Best Practices
1. Granular Tracking
# Track at appropriate granularity
with tracker.track("api_request"):
with tracker.track("api_request.auth"):
authenticate()
with tracker.track("api_request.fetch"):
data = fetch_data()
with tracker.track("api_request.process"):
result = process_data(data)
2. Conditional Tracking
# Only track in production or when debugging
if os.getenv("ENABLE_LATENCY_TRACKING", "false").lower() == "true":
tracker = LatencyTrackerTool()
agent = TrackedAgent(base_agent, tracker)
else:
agent = base_agent
3. Sampling
import random
class SampledTracker:
def __init__(self, base_tracker, sample_rate=0.1):
self.base_tracker = base_tracker
self.sample_rate = sample_rate
def track(self, phase):
if random.random() < self.sample_rate:
return self.base_tracker.track(phase)
else:
# Return no-op context manager
return nullcontext()
Common Patterns
Request Tracing
import uuid
def trace_request(request_id=None):
request_id = request_id or str(uuid.uuid4())
# Add request ID to all tracking
def track_with_id(phase):
return tracker.track(f"{request_id}.{phase}")
return track_with_id, request_id
# Usage
track, request_id = trace_request()
with track("total"):
with track("phase1"):
do_phase1()
with track("phase2"):
do_phase2()
print(f"Request {request_id} completed")
Performance Budgets
class PerformanceBudget:
def __init__(self, budgets):
self.budgets = budgets # {"phase": max_ms}
self.violations = []
def check(self, phase, latency_ms):
if phase in self.budgets and latency_ms > self.budgets[phase]:
self.violations.append({
"phase": phase,
"latency": latency_ms,
"budget": self.budgets[phase],
"exceeded_by": latency_ms - self.budgets[phase]
})
return False
return True
def get_violations(self):
return self.violations
Troubleshooting
High Latency Issues
- Check phase breakdown to identify slow components
- Look for outliers that skew averages
- Monitor trends to detect degradation
- Review concurrent request handling
Memory Leaks
- Limit metric history size
- Implement cleanup for old metrics
- Use weak references where appropriate
Threading Issues
- Use thread-safe data structures
- Implement proper locking for shared state
- Test with concurrent workloads

