Use this file to discover all available pages before exploring further.
ClickHouse provides columnar analytics database storage, designed for high-performance analytics on large volumes of conversation data and real-time reporting.
import clickhouse_connectfrom praisonaiagents import Agent# ClickHouse is typically used for analytics, not primary storageclient = clickhouse_connect.get_client( host="localhost", port=8123, username="default", password="")# Create agent with primary storage elsewhereagent = Agent( name="AnalyticsBot", instructions="You are a helpful assistant.", # Use SQLite/PostgreSQL for conversations db=db(database_url="sqlite:///conversations.db"), session_id="analytics-session")# Agent conversations stored in primary DBresponse = agent.chat("Generate analytics on our conversation patterns")print(response)
2
Analytics Table Setup
import clickhouse_connectfrom datetime import datetimeclient = clickhouse_connect.get_client(host="localhost", port=8123)# Create analytics table for conversation metricsclient.command(""" CREATE TABLE IF NOT EXISTS conversation_analytics ( session_id String, agent_name String, message_count UInt32, total_tokens UInt32, response_time Float64, user_satisfaction UInt8, timestamp DateTime, metadata String ) ENGINE = MergeTree() ORDER BY (timestamp, session_id)""")print("Analytics table created successfully")
import clickhouse_connectimport pandas as pdclient = clickhouse_connect.get_client(host="localhost", port=8123)def get_conversation_analytics(days=30): """Get comprehensive conversation analytics""" # Daily conversation trends daily_trends = client.query_df(f""" SELECT toDate(timestamp) as date, agent_name, count() as conversations, sum(tokens) as total_tokens, avg(response_time_ms) as avg_response_time, countDistinct(user_id) as unique_users FROM conversation_events WHERE timestamp >= now() - INTERVAL {days} DAY GROUP BY date, agent_name ORDER BY date DESC """) # Peak hours analysis hourly_patterns = client.query_df(f""" SELECT toHour(timestamp) as hour, count() as message_count, avg(response_time_ms) as avg_response_time FROM conversation_events WHERE timestamp >= now() - INTERVAL {days} DAY GROUP BY hour ORDER BY hour """) # User behavior analysis user_behavior = client.query_df(f""" SELECT user_id, count() as total_interactions, countDistinct(session_id) as unique_sessions, sum(tokens) as total_tokens, first_value(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp) as first_seen, max(timestamp) as last_seen FROM conversation_events WHERE timestamp >= now() - INTERVAL {days} DAY GROUP BY user_id HAVING total_interactions > 5 ORDER BY total_interactions DESC """) return { 'daily_trends': daily_trends, 'hourly_patterns': hourly_patterns, 'user_behavior': user_behavior }# Get analyticsanalytics = get_conversation_analytics()print("Top conversation days:")print(analytics['daily_trends'].head())print("\nPeak hours:")print(analytics['hourly_patterns'])print(f"\nActive users: {len(analytics['user_behavior'])}")
import clickhouse_connect# Connect to ClickHouse clustercluster_client = clickhouse_connect.get_client( host="clickhouse-lb.example.com", port=8123, username="analytics_user", password="secure_password", settings={ 'distributed_product_mode': 'global', 'max_execution_time': 600 })# Create distributed tablecluster_client.command(""" CREATE TABLE IF NOT EXISTS conversation_events_distributed AS conversation_events ENGINE = Distributed(analytics_cluster, default, conversation_events, rand())""")# Query across clustercluster_analytics = cluster_client.query_df(""" SELECT agent_name, count() as total_events, uniq(user_id) as unique_users, avg(response_time_ms) as avg_response_time FROM conversation_events_distributed WHERE timestamp >= today() - 30 GROUP BY agent_name ORDER BY total_events DESC""")print("Cluster-wide analytics:")print(cluster_analytics)