Skip to main content

Production Deployment Guide

This guide covers best practices, configurations, and strategies for deploying PraisonAI agents in production environments.

Overview

Deploying AI agents in production requires careful consideration of:
  • Performance and scalability
  • Security and compliance
  • Monitoring and observability
  • Cost optimization
  • Error handling and recovery

Pre-Deployment Checklist

1. Code Preparation

# config/production.py
import os
from typing import Optional

class ProductionConfig:
    # API Keys (from environment)
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
    
    # Model Configuration
    DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4")
    TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
    MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
    
    # Performance
    REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "300"))
    MAX_CONCURRENT_AGENTS = int(os.getenv("MAX_CONCURRENT_AGENTS", "10"))
    
    # Security
    ENABLE_CONTENT_FILTERING = os.getenv("ENABLE_CONTENT_FILTERING", "true").lower() == "true"
    ALLOWED_TOOLS = os.getenv("ALLOWED_TOOLS", "").split(",")
    
    # Monitoring
    TELEMETRY_ENABLED = os.getenv("TELEMETRY_ENABLED", "true").lower() == "true"
    TELEMETRY_ENDPOINT = os.getenv("TELEMETRY_ENDPOINT")
# utils/error_handler.py
import logging
from functools import wraps
from typing import Callable, Any
import traceback

logger = logging.getLogger(__name__)

class ProductionError(Exception):
    """Base exception for production errors"""
    pass

def production_error_handler(
    fallback_result: Any = None,
    log_errors: bool = True,
    raise_errors: bool = False
):
    """Decorator for production error handling"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if log_errors:
                    logger.error(f"Error in {func.__name__}: {str(e)}")
                    logger.debug(traceback.format_exc())
                
                if raise_errors:
                    raise ProductionError(f"Production error in {func.__name__}: {str(e)}")
                
                return fallback_result
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                if log_errors:
                    logger.error(f"Error in {func.__name__}: {str(e)}")
                    logger.debug(traceback.format_exc())
                
                if raise_errors:
                    raise ProductionError(f"Production error in {func.__name__}: {str(e)}")
                
                return fallback_result
        
        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
    return decorator
# security/validator.py
from typing import List, Dict, Any
import re

class SecurityValidator:
    def __init__(self, config: ProductionConfig):
        self.config = config
        self.blocked_patterns = [
            r"(api_key|password|secret)",
            r"(eval|exec|__import__)",
            r"(system|popen|subprocess)"
        ]
    
    def validate_input(self, input_text: str) -> bool:
        """Validate user input for security threats"""
        # Check for injection attempts
        for pattern in self.blocked_patterns:
            if re.search(pattern, input_text, re.IGNORECASE):
                return False
        
        # Check input length
        if len(input_text) > 10000:  # 10KB limit
            return False
        
        return True
    
    def sanitize_output(self, output: str) -> str:
        """Remove sensitive information from output"""
        # Remove potential secrets
        output = re.sub(r'(sk-|api_key=)[a-zA-Z0-9]+', '[REDACTED]', output)
        return output

2. Infrastructure Setup

Container Deployment

# Dockerfile
FROM python:3.11-slim

# Security: Run as non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .
RUN chown -R appuser:appuser /app

# Switch to non-root user
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD python -c "import requests; requests.get('http://localhost:8000/health')"

EXPOSE 8000

CMD ["gunicorn", "app:app", "--bind", "0.0.0.0:8000", "--workers", "4"]

Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: praisonai-agents
  labels:
    app: praisonai-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: praisonai-agents
  template:
    metadata:
      labels:
        app: praisonai-agents
    spec:
      containers:
      - name: praisonai
        image: your-registry/praisonai-agents:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai-key
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: praisonai-service
spec:
  selector:
    app: praisonai-agents
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

Performance Optimization

1. Caching Strategy

# cache/agent_cache.py
from functools import lru_cache
import hashlib
import redis
import json
from typing import Optional, Any

class AgentCache:
    def __init__(self, redis_url: Optional[str] = None):
        self.redis_client = redis.from_url(redis_url) if redis_url else None
        self.local_cache = {}
    
    def cache_key(self, agent_name: str, input_text: str) -> str:
        """Generate cache key"""
        content = f"{agent_name}:{input_text}"
        return hashlib.md5(content.encode()).hexdigest()
    
    async def get(self, agent_name: str, input_text: str) -> Optional[Any]:
        """Get cached result"""
        key = self.cache_key(agent_name, input_text)
        
        # Try local cache first
        if key in self.local_cache:
            return self.local_cache[key]
        
        # Try Redis
        if self.redis_client:
            cached = self.redis_client.get(key)
            if cached:
                return json.loads(cached)
        
        return None
    
    async def set(self, agent_name: str, input_text: str, result: Any, ttl: int = 3600):
        """Cache result"""
        key = self.cache_key(agent_name, input_text)
        
        # Local cache
        self.local_cache[key] = result
        
        # Redis cache
        if self.redis_client:
            self.redis_client.setex(key, ttl, json.dumps(result))

2. Connection Pooling

# utils/connection_pool.py
import asyncio
from typing import Dict, Any
import aiohttp

class ConnectionPool:
    def __init__(self, max_connections: int = 100):
        self.connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=30,
            ttl_dns_cache=300
        )
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(connector=self.connector)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
        async with self.session.request(method, url, **kwargs) as response:
            return await response.json()

3. Load Balancing

# load_balancer/agent_balancer.py
from typing import List, Optional
import random
from collections import defaultdict

class AgentLoadBalancer:
    def __init__(self, agents: List[Agent]):
        self.agents = agents
        self.agent_loads = defaultdict(int)
        self.agent_errors = defaultdict(int)
    
    def select_agent(self) -> Optional[Agent]:
        """Select agent with least load"""
        available_agents = [
            agent for agent in self.agents
            if self.agent_errors[agent.name] < 3  # Skip agents with many errors
        ]
        
        if not available_agents:
            return None
        
        # Select agent with minimum load
        return min(available_agents, key=lambda a: self.agent_loads[a.name])
    
    async def execute_with_balancing(self, task: str) -> Any:
        agent = self.select_agent()
        if not agent:
            raise Exception("No available agents")
        
        self.agent_loads[agent.name] += 1
        try:
            result = await agent.arun(task)
            self.agent_errors[agent.name] = 0  # Reset error count on success
            return result
        except Exception as e:
            self.agent_errors[agent.name] += 1
            raise
        finally:
            self.agent_loads[agent.name] -= 1

Monitoring and Observability

1. Metrics Collection

# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
agent_requests = Counter('agent_requests_total', 'Total agent requests', ['agent_name', 'status'])
agent_latency = Histogram('agent_request_duration_seconds', 'Agent request latency', ['agent_name'])
active_agents = Gauge('active_agents', 'Number of active agents')
token_usage = Counter('token_usage_total', 'Total tokens used', ['model', 'agent_name'])

class MetricsCollector:
    @staticmethod
    def record_request(agent_name: str, status: str):
        agent_requests.labels(agent_name=agent_name, status=status).inc()
    
    @staticmethod
    def record_latency(agent_name: str, duration: float):
        agent_latency.labels(agent_name=agent_name).observe(duration)
    
    @staticmethod
    def record_tokens(model: str, agent_name: str, tokens: int):
        token_usage.labels(model=model, agent_name=agent_name).inc(tokens)

2. Logging Configuration

# logging_config.py
import logging
import json
from pythonjsonlogger import jsonlogger

def setup_production_logging():
    # JSON formatter for structured logs
    logHandler = logging.StreamHandler()
    formatter = jsonlogger.JsonFormatter(
        fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    logHandler.setFormatter(formatter)
    
    # Configure root logger
    logging.root.setLevel(logging.INFO)
    logging.root.addHandler(logHandler)
    
    # Configure specific loggers
    logging.getLogger('praisonaiagents').setLevel(logging.INFO)
    logging.getLogger('httpx').setLevel(logging.WARNING)
    
    return logging.getLogger(__name__)

3. Health Checks

# health/health_check.py
from typing import Dict, List
import asyncio

class HealthChecker:
    def __init__(self, agents: List[Agent], dependencies: Dict[str, Any]):
        self.agents = agents
        self.dependencies = dependencies
    
    async def check_health(self) -> Dict[str, Any]:
        """Comprehensive health check"""
        health_status = {
            "status": "healthy",
            "checks": {}
        }
        
        # Check agents
        for agent in self.agents:
            try:
                await agent.arun("test")
                health_status["checks"][f"agent_{agent.name}"] = "healthy"
            except Exception as e:
                health_status["status"] = "unhealthy"
                health_status["checks"][f"agent_{agent.name}"] = str(e)
        
        # Check dependencies
        for name, dependency in self.dependencies.items():
            try:
                await dependency.ping()
                health_status["checks"][name] = "healthy"
            except Exception as e:
                health_status["status"] = "unhealthy"
                health_status["checks"][name] = str(e)
        
        return health_status

Security Best Practices

1. API Key Management

# security/key_manager.py
import os
from cryptography.fernet import Fernet
import boto3
from typing import Optional

class SecureKeyManager:
    def __init__(self, use_aws_secrets: bool = False):
        self.use_aws_secrets = use_aws_secrets
        self.fernet = Fernet(os.getenv("ENCRYPTION_KEY", Fernet.generate_key()))
        
        if use_aws_secrets:
            self.secrets_client = boto3.client('secretsmanager')
    
    def get_api_key(self, key_name: str) -> Optional[str]:
        """Securely retrieve API key"""
        if self.use_aws_secrets:
            try:
                response = self.secrets_client.get_secret_value(SecretId=key_name)
                return response['SecretString']
            except Exception as e:
                logger.error(f"Failed to retrieve secret: {e}")
                return None
        else:
            # Get from environment and decrypt
            encrypted = os.getenv(key_name)
            if encrypted:
                return self.fernet.decrypt(encrypted.encode()).decode()
            return None

2. Rate Limiting

# security/rate_limiter.py
from collections import defaultdict
import time
from typing import Dict, Tuple

class RateLimiter:
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.requests: Dict[str, List[float]] = defaultdict(list)
    
    def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
        """Check if request is allowed"""
        now = time.time()
        minute_ago = now - 60
        
        # Clean old requests
        self.requests[client_id] = [
            req_time for req_time in self.requests[client_id]
            if req_time > minute_ago
        ]
        
        if len(self.requests[client_id]) >= self.requests_per_minute:
            # Calculate wait time
            oldest_request = min(self.requests[client_id])
            wait_time = 60 - (now - oldest_request)
            return False, wait_time
        
        self.requests[client_id].append(now)
        return True, None

Cost Optimization

1. Model Selection Strategy

# optimization/model_selector.py
from typing import Dict, Optional

class CostOptimizedModelSelector:
    def __init__(self):
        self.model_costs = {
            "gpt-4": 0.03,
            "gpt-3.5-turbo": 0.002,
            "claude-3-opus": 0.015,
            "claude-3-sonnet": 0.003
        }
        self.model_capabilities = {
            "gpt-4": ["complex_reasoning", "code", "analysis"],
            "gpt-3.5-turbo": ["general", "conversation"],
            "claude-3-opus": ["complex_reasoning", "long_context"],
            "claude-3-sonnet": ["general", "fast"]
        }
    
    def select_model(self, task_type: str, max_cost: Optional[float] = None) -> str:
        """Select most cost-effective model for task"""
        suitable_models = [
            model for model, capabilities in self.model_capabilities.items()
            if task_type in capabilities or "general" in capabilities
        ]
        
        if max_cost:
            suitable_models = [
                model for model in suitable_models
                if self.model_costs[model] <= max_cost
            ]
        
        # Return cheapest suitable model
        return min(suitable_models, key=lambda m: self.model_costs[m])

2. Token Usage Optimization

# optimization/token_optimizer.py
import tiktoken

class TokenOptimizer:
    def __init__(self, model: str = "gpt-4"):
        self.encoder = tiktoken.encoding_for_model(model)
        self.max_tokens = 8000  # Leave room for response
    
    def optimize_prompt(self, prompt: str, context: str) -> str:
        """Optimize prompt to fit within token limits"""
        prompt_tokens = len(self.encoder.encode(prompt))
        context_tokens = len(self.encoder.encode(context))
        
        total_tokens = prompt_tokens + context_tokens
        
        if total_tokens <= self.max_tokens:
            return f"{prompt}\n\nContext: {context}"
        
        # Truncate context to fit
        available_tokens = self.max_tokens - prompt_tokens - 50  # Buffer
        context_parts = context.split('. ')
        
        optimized_context = ""
        current_tokens = 0
        
        for part in context_parts:
            part_tokens = len(self.encoder.encode(part))
            if current_tokens + part_tokens <= available_tokens:
                optimized_context += part + ". "
                current_tokens += part_tokens
            else:
                break
        
        return f"{prompt}\n\nContext (truncated): {optimized_context}"

Deployment Checklist

  • Pre-Deployment
  • Deployment
  • Post-Deployment
  • All tests passing
  • Security scan completed
  • Performance benchmarks met
  • Documentation updated
  • Environment variables configured
  • Secrets management setup
  • Monitoring dashboards created
  • Alerting rules configured
  • Backup strategy defined
  • Rollback plan prepared

Troubleshooting Production Issues

Common Issues and Solutions

Symptoms: Slow response times, timeoutsDiagnostics:
# Check agent performance
async def diagnose_latency():
    start = time.time()
    result = await agent.arun("test")
    latency = time.time() - start
    
    if latency > 5:  # 5 seconds threshold
        logger.warning(f"High latency detected: {latency}s")
        # Check cache hit rate
        # Check model response time
        # Check network latency
Solutions:
  • Enable caching for common queries
  • Use faster models for simple tasks
  • Implement request queuing
  • Add more agent instances
Symptoms: Increasing memory usage, OOM errorsDiagnostics:
import tracemalloc
import gc

# Track memory usage
tracemalloc.start()

# ... run agents ...

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

for stat in top_stats[:10]:
    print(stat)
Solutions:
  • Implement proper cleanup in agents
  • Use context managers
  • Limit conversation history
  • Regular garbage collection
Symptoms: 429 errors, rejected requestsSolutions:
  • Implement exponential backoff
  • Use multiple API keys
  • Add request queuing
  • Cache frequent requests

Next Steps

  1. Review the Monitoring Guide
  2. Set up Telemetry
  3. Configure Multi-Provider Setup
  4. Implement Cost Optimization

Additional Resources

I