Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.praison.ai/llms.txt

Use this file to discover all available pages before exploring further.

Memory Cleanup for Long-Running Apps

As of PR #1558, Session.close() automatically calls memory.close_connections(), which in turn calls .close() on the active memory adapter. Most users no longer need manual cleanup hooks for basic memory management.
Long-running multi-agent applications can accumulate memory over time, leading to performance degradation and potential crashes. This guide covers best practices for effective memory management.

Understanding Memory Issues

Common Memory Problems

  1. Memory Leaks: Unreleased references to objects
  2. Conversation History Accumulation: Growing chat histories
  3. Cache Overflow: Unbounded caching
  4. Circular References: Objects referencing each other
  5. Resource Handles: Unclosed files, connections, etc.

Bounding In-Memory Growth

For long-running agents, prevent unbounded memory growth using the InMemoryAdapter with size limits:
from praisonaiagents import Agent
from praisonaiagents.memory.adapters.in_memory_adapter import InMemoryAdapter

# Create bounded in-memory storage with FIFO eviction
adapter = InMemoryAdapter(max_size=1000)  # Limit to 1000 entries
agent = Agent(
    name="Long-running Assistant",
    instructions="Help users continuously",
    memory=adapter
)

# Memory automatically evicts oldest entries when limit exceeded
for i in range(1500):
    agent.store_memory(f"Entry {i}", memory_type="short_term")

# Only the most recent 1000 entries remain
print(f"Memory entries: {len(adapter._data)}")  # 1000
The existing helper class ResourcePool in this page with max_size=10 is unrelated to the new InMemoryAdapter(max_size=...) parameter. They serve different purposes: ResourcePool manages connection pools, while InMemoryAdapter manages memory entry limits.

Memory Management Strategies

1. Conversation History Management

Implement sliding window or summary-based history management:
from collections import deque
from typing import List, Dict, Any
import hashlib

class MemoryEfficientConversationManager:
    def __init__(self, max_history_length: int = 100, summary_threshold: int = 50):
        self.max_history_length = max_history_length
        self.summary_threshold = summary_threshold
        self.conversation_history = deque(maxlen=max_history_length)
        self.summaries = []
        
    def add_message(self, message: Dict[str, Any]):
        """Add a message to conversation history with automatic cleanup"""
        self.conversation_history.append(message)
        
        # Create summary when threshold is reached
        if len(self.conversation_history) >= self.summary_threshold:
            self._create_summary()
    
    def _create_summary(self):
        """Create a summary of older messages"""
        messages_to_summarize = list(self.conversation_history)[:self.summary_threshold//2]
        
        # In production, use an LLM to create actual summaries
        summary = {
            "type": "summary",
            "message_count": len(messages_to_summarize),
            "timestamp": messages_to_summarize[0]["timestamp"],
            "key_points": self._extract_key_points(messages_to_summarize)
        }
        
        self.summaries.append(summary)
        
        # Remove summarized messages
        for _ in range(len(messages_to_summarize)):
            self.conversation_history.popleft()
    
    def _extract_key_points(self, messages: List[Dict]) -> List[str]:
        """Extract key points from messages (simplified version)"""
        # In production, use NLP or LLM for better extraction
        return [msg.get("content", "")[:50] + "..." for msg in messages[-3:]]
    
    def get_context(self, last_n: int = 10) -> List[Dict]:
        """Get recent context including summaries"""
        context = []
        
        # Add relevant summaries
        if self.summaries:
            context.extend(self.summaries[-2:])  # Last 2 summaries
        
        # Add recent messages
        recent_messages = list(self.conversation_history)[-last_n:]
        context.extend(recent_messages)
        
        return context
    
    def cleanup(self):
        """Explicit cleanup method"""
        self.conversation_history.clear()
        self.summaries.clear()

2. Agent Memory Management

Memory construction is now thread-safe and async-safe. Concurrent Tasks sharing a memory_config will coordinate through locks rather than each creating duplicate stores.
from praisonaiagents import Agent, Task, PraisonAIAgents

memory_config = {"storage": {"path": "./shared.db"}, "provider": "file"}

agents  = [Agent(name=f"A{i}", instructions="Summarize one line.") for i in range(4)]
tasks   = [Task(description=f"Summarize doc {i}.", agent=agents[i], config={"memory_config": memory_config}) for i in range(4)]

PraisonAIAgents(agents=agents, tasks=tasks).start()
Implement memory limits and cleanup for agents:
import gc
import weakref
from datetime import datetime, timedelta

class MemoryManagedAgent:
    def __init__(self, name: str, memory_limit_mb: int = 100):
        self.name = name
        self.memory_limit_mb = memory_limit_mb
        self.created_at = datetime.now()
        self.last_cleanup = datetime.now()
        self._memory_store = {}
        self._weak_refs = weakref.WeakValueDictionary()
        
    def store_memory(self, key: str, value: Any, weak: bool = False):
        """Store data with option for weak references"""
        if weak:
            self._weak_refs[key] = value
        else:
            self._memory_store[key] = value
            
        # Check memory usage
        if self._estimate_memory_usage() > self.memory_limit_mb:
            self._cleanup_old_memories()
    
    def _estimate_memory_usage(self) -> float:
        """Estimate memory usage in MB"""
        import sys
        total_size = 0
        
        for obj in self._memory_store.values():
            total_size += sys.getsizeof(obj)
        
        return total_size / (1024 * 1024)
    
    def _cleanup_old_memories(self):
        """Remove old or less important memories"""
        # Sort by age or importance (simplified)
        if hasattr(self, 'memory_importance'):
            sorted_keys = sorted(
                self._memory_store.keys(),
                key=lambda k: self.memory_importance.get(k, 0)
            )
        else:
            sorted_keys = list(self._memory_store.keys())
        
        # Remove least important/oldest 20%
        remove_count = len(sorted_keys) // 5
        for key in sorted_keys[:remove_count]:
            del self._memory_store[key]
        
        # Force garbage collection
        gc.collect()
        self.last_cleanup = datetime.now()
    
    def periodic_cleanup(self):
        """Run periodic cleanup tasks"""
        if datetime.now() - self.last_cleanup > timedelta(minutes=30):
            self._cleanup_old_memories()
            gc.collect()

3. Resource Pool Management

Implement resource pooling to prevent resource exhaustion:
from contextlib import contextmanager
import threading
from queue import Queue, Empty

class ResourcePool:
    def __init__(self, factory, max_size: int = 10, cleanup_func=None):
        self.factory = factory
        self.max_size = max_size
        self.cleanup_func = cleanup_func
        self.pool = Queue(maxsize=max_size)
        self.size = 0
        self.lock = threading.Lock()
        
    @contextmanager
    def acquire(self, timeout: float = 30):
        """Acquire a resource from the pool"""
        resource = None
        try:
            # Try to get from pool
            try:
                resource = self.pool.get(timeout=timeout)
            except Empty:
                # Create new resource if under limit
                with self.lock:
                    if self.size < self.max_size:
                        resource = self.factory()
                        self.size += 1
                    else:
                        raise TimeoutError("Resource pool exhausted")
            
            yield resource
            
        finally:
            # Return resource to pool
            if resource is not None:
                try:
                    self.pool.put_nowait(resource)
                except:
                    # Pool is full, cleanup resource
                    if self.cleanup_func:
                        self.cleanup_func(resource)
                    with self.lock:
                        self.size -= 1
    
    def cleanup_all(self):
        """Clean up all resources in the pool"""
        while not self.pool.empty():
            try:
                resource = self.pool.get_nowait()
                if self.cleanup_func:
                    self.cleanup_func(resource)
            except Empty:
                break
        self.size = 0

4. Cache Management

Implement LRU cache with memory limits:
from functools import lru_cache
import sys
from collections import OrderedDict

class MemoryBoundedLRUCache:
    def __init__(self, max_memory_mb: int = 50, max_items: int = 1000):
        self.max_memory_mb = max_memory_mb
        self.max_items = max_items
        self.cache = OrderedDict()
        self.memory_usage = 0
        
    def get(self, key):
        """Get item from cache"""
        if key in self.cache:
            # Move to end (most recently used)
            self.cache.move_to_end(key)
            return self.cache[key]
        return None
    
    def put(self, key, value):
        """Put item in cache with memory management"""
        value_size = sys.getsizeof(value)
        
        # Remove items if memory limit exceeded
        while (self.memory_usage + value_size > self.max_memory_mb * 1024 * 1024 or
               len(self.cache) >= self.max_items):
            if not self.cache:
                break
            
            # Remove least recently used
            oldest_key = next(iter(self.cache))
            oldest_value = self.cache.pop(oldest_key)
            self.memory_usage -= sys.getsizeof(oldest_value)
        
        # Add new item
        self.cache[key] = value
        self.memory_usage += value_size
    
    def clear(self):
        """Clear the cache"""
        self.cache.clear()
        self.memory_usage = 0

Memory Monitoring

1. Memory Usage Tracking

import psutil
import os
from datetime import datetime

class MemoryMonitor:
    def __init__(self, alert_threshold_percent: float = 80):
        self.alert_threshold_percent = alert_threshold_percent
        self.process = psutil.Process(os.getpid())
        self.memory_history = []
        
    def get_memory_info(self) -> Dict[str, Any]:
        """Get current memory usage information"""
        memory_info = self.process.memory_info()
        memory_percent = self.process.memory_percent()
        
        info = {
            "timestamp": datetime.now(),
            "rss_mb": memory_info.rss / 1024 / 1024,
            "vms_mb": memory_info.vms / 1024 / 1024,
            "percent": memory_percent,
            "available_mb": psutil.virtual_memory().available / 1024 / 1024
        }
        
        self.memory_history.append(info)
        
        # Keep only last hour of history
        cutoff = datetime.now() - timedelta(hours=1)
        self.memory_history = [
            h for h in self.memory_history 
            if h["timestamp"] > cutoff
        ]
        
        return info
    
    def check_memory_health(self) -> Tuple[bool, str]:
        """Check if memory usage is healthy"""
        info = self.get_memory_info()
        
        if info["percent"] > self.alert_threshold_percent:
            return False, f"Memory usage critical: {info['percent']:.1f}%"
        
        # Check for memory growth trend
        if len(self.memory_history) > 10:
            recent = self.memory_history[-10:]
            growth_rate = (recent[-1]["rss_mb"] - recent[0]["rss_mb"]) / len(recent)
            
            if growth_rate > 10:  # Growing > 10MB per check
                return False, f"Memory growing rapidly: {growth_rate:.1f}MB/check"
        
        return True, "Memory usage normal"

2. Automatic Garbage Collection

import gc
import schedule
from typing import Callable

class AutomaticMemoryManager:
    def __init__(self):
        self.cleanup_callbacks: List[Callable] = []
        self.last_gc_stats = None
        
    def register_cleanup(self, callback: Callable):
        """Register a cleanup callback"""
        self.cleanup_callbacks.append(callback)
    
    def aggressive_cleanup(self):
        """Perform aggressive memory cleanup"""
        # Run all registered cleanup callbacks
        for callback in self.cleanup_callbacks:
            try:
                callback()
            except Exception as e:
                print(f"Cleanup callback failed: {e}")
        
        # Force garbage collection
        gc.collect(2)  # Full collection
        
        # Get statistics
        self.last_gc_stats = {
            "collected": sum(gc.get_count()),
            "uncollectable": len(gc.garbage),
            "timestamp": datetime.now()
        }
        
        return self.last_gc_stats
    
    def setup_automatic_cleanup(self, interval_minutes: int = 30):
        """Setup periodic automatic cleanup"""
        schedule.every(interval_minutes).minutes.do(self.aggressive_cleanup)
        
        # Also cleanup on high memory usage
        def conditional_cleanup():
            memory_monitor = MemoryMonitor()
            healthy, _ = memory_monitor.check_memory_health()
            if not healthy:
                self.aggressive_cleanup()
        
        schedule.every(5).minutes.do(conditional_cleanup)

3. Agent Garbage Collection Safety Net

Since PR #1514, Agent.__del__ runs a best-effort close_connections() during garbage collection as a safety net. However, this may be skipped by the Python interpreter and must not be relied upon. Always use explicit cleanup:
# Preferred: Explicit cleanup
agent = Agent(name="Analyst", instructions="Analyze data.")
try:
    result = agent.start("Analyze quarterly numbers.")
finally:
    agent.close()  # Guaranteed cleanup

# Better: Context manager (recommended)  
with Agent(name="Analyst", instructions="Analyze data.") as agent:
    result = agent.start("Analyze quarterly numbers.")
# Cleanup happens automatically here

Profiler buffers are bounded

Enabling Profiler no longer leaks memory in long-running processes. All record lists are now deque(maxlen=PRAISONAI_PROFILE_MAX) (default 10,000) which fixes the prior class-level List[...] storage that grew unbounded. Key improvements:
  • Memory-safe: Fixed maximum memory usage per buffer
  • Ring buffer: Only keeps the most recent N records
  • Configurable: Set PRAISONAI_PROFILE_MAX environment variable
  • Production-ready: Safe for long-running agents
import os
from praisonai.profiler import Profiler
from praisonaiagents import Agent

# Configure buffer size for production
os.environ["PRAISONAI_PROFILE_MAX"] = "5000"  # Smaller for memory-constrained environments

# Enable profiling safely
Profiler.enable()

# Long-running agent - memory usage stays flat
agent = Agent(name="LongRunning", instructions="Process requests")
for i in range(100000):
    agent.start(f"Process request {i}")
    # Memory usage doesn't grow - only last 5000 records kept per buffer

# Each buffer type has bounded size:
# - _timings: deque(maxlen=5000)
# - _imports: deque(maxlen=5000)
# - _flow: deque(maxlen=5000)
# - _api_calls: deque(maxlen=5000)
# - _streaming: deque(maxlen=5000)  
# - _memory: deque(maxlen=5000)
# - _cprofile_stats: deque(maxlen=5000)
For more details on profiling configuration and best practices, see Profiling.

Best Practices

1. Use Context Managers

Always use context managers for resource management:
class ManagedAgentSession:
    def __init__(self, agent_factory):
        self.agent_factory = agent_factory
        self.agents = []
        
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        # Cleanup all agents
        for agent in self.agents:
            agent.cleanup()
        self.agents.clear()
        gc.collect()
    
    def create_agent(self, *args, **kwargs):
        agent = self.agent_factory(*args, **kwargs)
        self.agents.append(agent)
        return agent

2. Implement Memory Budgets

Set memory budgets for different components:
class MemoryBudgetManager:
    def __init__(self, total_budget_mb: int = 1000):
        self.total_budget_mb = total_budget_mb
        self.allocations = {}
        
    def allocate(self, component: str, budget_mb: int) -> bool:
        """Allocate memory budget to a component"""
        current_allocated = sum(self.allocations.values())
        
        if current_allocated + budget_mb > self.total_budget_mb:
            return False
        
        self.allocations[component] = budget_mb
        return True
    
    def check_usage(self, component: str, current_usage_mb: float) -> bool:
        """Check if component is within budget"""
        if component not in self.allocations:
            return False
        
        return current_usage_mb <= self.allocations[component]

3. Profile Memory Usage

Regular profiling helps identify memory issues:
import tracemalloc
from typing import List, Tuple

class MemoryProfiler:
    def __init__(self):
        self.snapshots = []
        
    def start_profiling(self):
        """Start memory profiling"""
        tracemalloc.start()
        self.take_snapshot("start")
    
    def take_snapshot(self, label: str):
        """Take a memory snapshot"""
        snapshot = tracemalloc.take_snapshot()
        self.snapshots.append((label, snapshot))
    
    def get_top_allocations(self, limit: int = 10) -> List[Tuple[str, float]]:
        """Get top memory allocations"""
        if len(self.snapshots) < 2:
            return []
        
        _, snapshot1 = self.snapshots[-2]
        _, snapshot2 = self.snapshots[-1]
        
        stats = snapshot2.compare_to(snapshot1, 'lineno')
        
        results = []
        for stat in stats[:limit]:
            results.append((
                f"{stat.traceback[0].filename}:{stat.traceback[0].lineno}",
                stat.size_diff / 1024 / 1024  # Convert to MB
            ))
        
        return results
    
    def stop_profiling(self):
        """Stop profiling and cleanup"""
        tracemalloc.stop()
        self.snapshots.clear()

Common Pitfalls

  1. Unbounded Collections: Always set limits on collections
  2. Circular References: Use weak references where appropriate
  3. Global State: Minimize global state that accumulates data
  4. Event Listeners: Always unregister event listeners
  5. Thread Local Storage: Clean up thread-local data

Testing Memory Management

import pytest
import time

def test_memory_bounded_cache():
    cache = MemoryBoundedLRUCache(max_memory_mb=1, max_items=100)
    
    # Fill cache beyond memory limit
    for i in range(200):
        cache.put(f"key_{i}", "x" * 10000)  # ~10KB per item
    
    # Cache should have evicted old items
    assert len(cache.cache) < 200
    assert cache.get("key_0") is None  # Old items evicted
    assert cache.get("key_199") is not None  # Recent items kept

def test_conversation_manager_cleanup():
    manager = MemoryEfficientConversationManager(max_history_length=10)
    
    # Add many messages
    for i in range(20):
        manager.add_message({"content": f"Message {i}", "timestamp": time.time()})
    
    # Should not exceed max length
    assert len(manager.conversation_history) <= 10
    
    # Should have created summaries
    assert len(manager.summaries) > 0

Conclusion

Effective memory management is crucial for long-running multi-agent applications. By implementing proper cleanup strategies, monitoring, and resource management, you can ensure your applications remain stable and performant over extended periods.