Workflow API
PraisonAI provides a simple, powerful workflow system for chaining agents and functions.
Quick Start
from praisonaiagents import AgentFlow , WorkflowContext , StepResult
def validate ( ctx : WorkflowContext ) -> StepResult :
return StepResult ( output = f "Valid: { ctx . input } " )
def process ( ctx : WorkflowContext ) -> StepResult :
return StepResult ( output = f "Done: { ctx . previous_result } " )
workflow = AgentFlow ( steps =[ validate , process ])
result = workflow . start ( " Hello World " )
print ( result [ " output " ]) # "Done: Valid: Hello World"
Import
from praisonaiagents import AgentFlow , Task , WorkflowContext , StepResult
# Pipeline is an alias for Workflow (same thing)
from praisonaiagents import Pipeline
# Or from workflows module
from praisonaiagents import AgentFlowManager , AgentFlow , Task
# Pattern helpers
from praisonaiagents import route , parallel , loop , repeat
Pipeline and Workflow are interchangeable - they refer to the same class.
Use whichever term fits your mental model better.
Callbacks
Workflow supports callbacks for monitoring and custom logic:
def on_start ( workflow , input_text ):
print ( f "Starting workflow with: { input_text } " )
def on_complete ( workflow , result ):
print ( f "Workflow completed: { result [ ' status ' ] } " )
def on_step_start ( step_name , context ):
print ( f "Starting step: { step_name } " )
def on_step_complete ( step_name , result ):
print ( f "Step { step_name } completed: { result . output [: 50 ] } ..." )
def on_step_error ( step_name , error ):
print ( f "Step { step_name } failed: { error } " )
workflow = AgentFlow (
steps =[ step1 , step2 ],
on_workflow_start = on_start ,
on_workflow_complete = on_complete ,
on_step_start = on_step_start ,
on_step_complete = on_step_complete ,
on_step_error = on_step_error
)
Guardrails
Add validation to steps with automatic retry:
def validate_output ( result ):
if " error " in result . output . lower ():
return ( False , " Output contains error, please fix " )
return ( True , None )
workflow = AgentFlow ( steps =[
Task (
name = " generator " ,
handler = my_generator ,
guardrails = validate_output ,
max_retries = 3
)
])
When validation fails:
The step is retried (up to max_retries)
Validation feedback is passed to the step via ctx.variables["validation_feedback"]
For agent steps, feedback is appended to the prompt
Status Tracking
Track workflow and step execution status:
workflow = AgentFlow ( steps =[ step1 , step2 ])
print ( workflow . status ) # "not_started"
result = workflow . start ( " input " )
print ( workflow . status ) # "completed"
print ( workflow . step_statuses ) # {"step1": "completed", "step2": "completed"}
# Result includes status
print ( result [ " status " ]) # "completed"
print ( result [ " steps " ][ 0 ][ " status " ]) # "completed"
print ( result [ " steps " ][ 0 ][ " retries " ]) # 0
WorkflowContext
Context passed to step handlers containing workflow state.
Constructor
WorkflowContext (
input : str = "" ,
previous_result : Optional [ str ] = None ,
current_step : str = "" ,
variables : Dict [ str , Any ] = {}
)
Attributes
Attribute Type Description inputstrOriginal workflow input previous_resultOptional[str]Output from previous step current_stepstrCurrent step name variablesDict[str, Any]All workflow variables
StepResult
Result returned from step handlers.
Constructor
StepResult (
output : str = "" ,
stop_workflow : bool = False ,
variables : Dict [ str , Any ] = {}
)
Attributes
Attribute Type Default Description outputstr""Step output content stop_workflowboolFalseIf True, stop the entire workflow variablesDict[str, Any]{}Variables to add/update
Example
def validate ( ctx : WorkflowContext ) -> StepResult :
if " error " in ctx . input :
return StepResult ( output = " Invalid " , stop_workflow = True )
return StepResult ( output = " Valid " , variables ={ " validated " : True })
Workflow
A complete workflow with multiple steps.
Constructor
Workflow (
name : str = " Workflow " ,
description : str = "" ,
steps : List = [],
variables : Dict [ str , Any ] = {},
default_llm : Optional [ str ] = None ,
default_agent_config : Optional [ Dict [ str , Any ]] = None
)
Parameters
Parameter Type Default Description namestr"Workflow"Workflow name descriptionstr""Workflow description stepsList[]List of steps (Agent, function, or Task) variablesDict[str, Any]{}Initial variables default_llmOptional[str]NoneDefault LLM for action-based steps default_agent_configOptional[Dict]NoneDefault agent config planningboolFalseEnable planning mode planning_llmOptional[str]NoneLLM for planning reasoningboolFalseEnable chain-of-thought reasoning verboseboolFalseEnable verbose output memory_configOptional[Dict]NoneMemory configuration
Methods
start()
Run the workflow with the given input.
def start (
input : str = "" ,
llm : Optional [ str ] = None ,
verbose : bool = False
) -> Dict [ str , Any ]
Parameter Type Default Description inputstr""Input text for the workflow llmOptional[str]NoneLLM model override verboseboolFalsePrint step progress
Returns: Dict with output, steps, variables, and status
astart() / arun()
Async version of start() for async workflow execution.
async def astart (
input : str = "" ,
llm : Optional [ str ] = None ,
verbose : bool = False
) -> Dict [ str , Any ]
Example:
import asyncio
async def main ():
workflow = AgentFlow ( steps =[ step1 , step2 ])
result = await workflow . astart ( " Hello World " )
print ( result [ " output " ])
asyncio . run ( main ())
Step Types
Workflows accept three types of steps:
Functions - Automatically wrapped as handlers
Agents - Executed with the input
Task - Full configuration
from praisonaiagents import AgentFlow , Agent , Task
workflow = AgentFlow (
steps =[
my_function , # Function
Agent ( name = " Writer " , ... ), # Agent
Task ( name = " custom " , handler = my_handler ) # Task
]
)
Task
A dataclass representing a single step in a workflow.
Constructor
Task (
name : str ,
description : str = "" ,
action : str = "" ,
handler : Optional [ Callable ] = None ,
should_run : Optional [ Callable ] = None ,
agent : Optional [ Agent ] = None ,
agent_config : Optional [ Dict [ str , Any ]] = None ,
condition : Optional [ str ] = None ,
on_error : Literal [ " stop " , " continue " , " retry " ] = " stop " ,
max_retries : int = 1 ,
context_from : Optional [ List [ str ]] = None ,
retain_full_context : bool = True ,
output_variable : Optional [ str ] = None ,
tools : Optional [ List [ Any ]] = None ,
next_steps : Optional [ List [ str ]] = None ,
branch_condition : Optional [ Dict [ str , List [ str ]]] = None ,
loop_over : Optional [ str ] = None ,
loop_var : str = " item "
)
Parameters
Parameter Type Default Description namestrrequired Step name descriptionstr""Step description actionstr""The action/prompt to execute handlerOptional[Callable]NoneCustom function (ctx) -> StepResult should_runOptional[Callable]NoneCondition function (ctx) -> bool agentOptional[Agent]NoneDirect Agent instance agent_configOptional[Dict]NonePer-step agent configuration conditionOptional[str]NoneCondition string for execution on_errorLiteral[...]"stop"Error handling: “stop”, “continue”, “retry” max_retriesint1Maximum retry attempts context_fromOptional[List[str]]NoneSteps to include context from retain_full_contextboolTrueInclude all previous outputs output_variableOptional[str]NoneCustom variable name for output toolsOptional[List[Any]]NoneTools for this step next_stepsOptional[List[str]]NoneNext step names for branching branch_conditionOptional[Dict]NoneConditional branching rules loop_overOptional[str]NoneVariable name to iterate over loop_varstr"item"Variable name for current item guardrailOptional[Callable]NoneValidation function (result) -> (bool, feedback) output_fileOptional[str]NoneSave step output to file output_jsonOptional[Any]NonePydantic model for JSON output output_pydanticOptional[Any]NonePydantic model for structured output imagesOptional[List[str]]NoneImage paths/URLs for vision tasks async_executionboolFalseMark step for async execution quality_checkboolTrueEnable quality validation rerunboolTrueAllow step to be rerun
Handler Function
Custom handler functions receive WorkflowContext and return StepResult:
def my_handler ( ctx : WorkflowContext ) -> StepResult :
# Access context
print ( f "Input: { ctx . input } " )
print ( f "Previous: { ctx . previous_result } " )
print ( f "Variables: { ctx . variables } " )
# Return result
return StepResult (
output = " Step completed " ,
stop_workflow = False , # Set True to stop workflow
variables ={ " key " : " value " } # Add/update variables
)
should_run Function
Conditional execution - return True to run the step, False to skip:
def is_sensitive ( ctx : WorkflowContext ) -> bool :
return " legal " in ctx . input . lower ()
step = Task (
name = " compliance " ,
handler = check_compliance ,
should_run = is_sensitive # Only runs for sensitive content
)
Agent Config Options
When using agent_config, you can specify:
Key Type Description rolestrAgent role (e.g., “Researcher”) goalstrAgent goal backstorystrAgent backstory llmstrLLM model override verboseboolEnable verbose output
Example
step = Task (
name = " research " ,
action = " Research {{ topic }} " ,
agent_config ={
" role " : " Researcher " ,
" goal " : " Find comprehensive information " ,
" backstory " : " Expert researcher "
},
tools =[ " tavily_search " ],
output = TaskOutputConfig ( variable = " research_data " )
)
Branching Example
from praisonaiagents import TaskRoutingConfig
# Decision step with conditional branching
decision_step = Task (
name = " evaluate " ,
action = " Evaluate if the task is complete. Reply with 'success' or 'failure'. " ,
routing = TaskRoutingConfig (
next_steps =[ " success_handler " , " failure_handler " ],
branches ={
" success " : [ " success_handler " ],
" failure " : [ " failure_handler " ]
}
)
)
Loop Example
# Loop step that iterates over a list
loop_step = Task (
name = " process_items " ,
action = " Process item: {{ current_item }} " ,
loop_over = " items " , # Variable containing the list
loop_var = " current_item " # Variable name for each item
)
# Execute with items
result = manager . execute (
" my_workflow " ,
variables ={ " items " : [ " item1 " , " item2 " , " item3 " ]}
)
Workflow
A dataclass representing a complete workflow with multiple steps.
Constructor
Workflow (
name : str ,
description : str = "" ,
steps : List [ Task ] = [],
variables : Dict [ str , Any ] = {},
file_path : Optional [ str ] = None ,
default_agent_config : Optional [ Dict [ str , Any ]] = None ,
default_llm : Optional [ str ] = None ,
memory_config : Optional [ Dict [ str , Any ]] = None ,
planning : bool = False ,
planning_llm : Optional [ str ] = None
)
Parameters
Parameter Type Default Description namestrrequired Workflow name descriptionstr""Workflow description stepsList[Task][]List of workflow steps variablesDict[str, Any]{}Default variables file_pathOptional[str]NoneSource file path default_agent_configOptional[Dict[str, Any]]NoneDefault agent config for all steps default_llmOptional[str]NoneDefault LLM model memory_configOptional[Dict[str, Any]]NoneMemory configuration planningboolFalseEnable planning mode planning_llmOptional[str]NoneLLM for planning
Example
workflow = AgentFlow (
name = " research_pipeline " ,
description = " Multi-agent research workflow " ,
default_llm = " gpt-4o-mini " ,
planning = True ,
steps =[
Task ( name = " research " , action = " Research AI " ),
Task ( name = " write " , action = " Write report " )
],
variables ={ " topic " : " AI trends " }
)
WorkflowManager
The main class for managing and executing workflows.
Constructor
WorkflowManager (
workspace_path : Optional [ str ] = None ,
verbose : int = 0
)
Parameters
Parameter Type Default Description workspace_pathOptional[str]NonePath to workspace (defaults to cwd) verboseint0Verbosity level (0-3)
Methods
execute()
Execute a workflow synchronously.
def execute (
workflow_name : str ,
executor : Optional [ Callable [[ str ], str ]] = None ,
variables : Optional [ Dict [ str , Any ]] = None ,
on_step : Optional [ Callable [[ Task , int ], None ]] = None ,
on_result : Optional [ Callable [[ Task , str ], None ]] = None ,
default_agent : Optional [ Any ] = None ,
default_llm : Optional [ str ] = None ,
memory : Optional [ Any ] = None ,
planning : bool = False ,
stream : bool = False ,
verbose : int = 0 ,
checkpoint : Optional [ str ] = None ,
resume : Optional [ str ] = None
) -> Dict [ str , Any ]
Parameters
Parameter Type Default Description workflow_namestrrequired Name of workflow to execute executorOptional[Callable]NoneFunction to execute each step variablesOptional[Dict]NoneVariables to substitute on_stepOptional[Callable]NoneCallback before each step on_resultOptional[Callable]NoneCallback after each step default_agentOptional[Any]NoneDefault agent for steps default_llmOptional[str]NoneDefault LLM model memoryOptional[Any]NoneShared memory instance planningboolFalseEnable planning mode streamboolFalseEnable streaming output verboseint0Verbosity level checkpointOptional[str]NoneSave checkpoint after each step with this name resumeOptional[str]NoneResume from checkpoint with this name
Returns
{
" success " : bool ,
" workflow " : str ,
" results " : [
{
" step " : str ,
" status " : " success " | " failed " | " skipped " ,
" output " : str | None ,
" error " : str | None
}
],
" variables " : Dict [ str , Any ]
}
Example
from praisonaiagents import Agent
from praisonaiagents import AgentFlowManager
agent = Agent ( name = " Assistant " , llm = " gpt-4o-mini " )
manager = WorkflowManager ()
result = manager . execute (
" deploy " ,
default_agent = agent ,
variables ={ " environment " : " production " },
on_step = lambda step , i : print ( f "Starting: { step . name } " ),
on_result = lambda step , output : print ( f "Done: { step . name } " )
)
if result [ " success " ]:
print ( " Workflow completed! " )
aexecute()
Execute a workflow asynchronously.
async def aexecute (
workflow_name : str ,
executor : Optional [ Callable [[ str ], str ]] = None ,
variables : Optional [ Dict [ str , Any ]] = None ,
on_step : Optional [ Callable [[ Task , int ], None ]] = None ,
on_result : Optional [ Callable [[ Task , str ], None ]] = None ,
default_agent : Optional [ Any ] = None ,
default_llm : Optional [ str ] = None ,
memory : Optional [ Any ] = None ,
planning : bool = False ,
stream : bool = False ,
verbose : int = 0
) -> Dict [ str , Any ]
Parameters
Same as execute().
Example
import asyncio
from praisonaiagents import AgentFlowManager
manager = WorkflowManager ()
async def main ():
# Run multiple workflows concurrently
results = await asyncio . gather (
manager . aexecute ( " research " , default_llm = " gpt-4o-mini " ),
manager . aexecute ( " analysis " , default_llm = " gpt-4o-mini " ),
)
return results
results = asyncio . run ( main ())
list_workflows()
List all available workflows.
def list_workflows () -> List [ Workflow ]
Returns
List of Workflow objects.
Example
manager = WorkflowManager ()
workflows = manager . list_workflows ()
for workflow in workflows :
print ( f " { workflow . name } : { len ( workflow . steps ) } steps" )
get_workflow()
Get a specific workflow by name.
def get_workflow ( name : str ) -> Optional [ Workflow ]
Parameters
Parameter Type Description namestrWorkflow name (case-insensitive)
Returns
Workflow object or None if not found.
create_workflow()
Create a new workflow file.
def create_workflow (
name : str ,
description : str = "" ,
steps : Optional [ List [ Dict [ str , str ]]] = None ,
variables : Optional [ Dict [ str , Any ]] = None
) -> Workflow
Parameters
Parameter Type Default Description namestrrequired Workflow name descriptionstr""Workflow description stepsOptional[List[Dict]]NoneList of step definitions variablesOptional[Dict]NoneDefault variables
Example
manager = WorkflowManager ()
workflow = manager . create_workflow (
name = " Code Review " ,
description = " Review code changes " ,
steps =[
{ " name " : " Lint " , " action " : " Run linting " },
{ " name " : " Test " , " action " : " Run tests " },
{ " name " : " Review " , " action " : " Review code " }
],
variables ={ " branch " : " main " }
)
get_stats()
Get workflow statistics.
def get_stats () -> Dict [ str , Any ]
Returns
{
" total_workflows " : int ,
" total_steps " : int ,
" workflows_dir " : str
}
reload()
Reload workflows from disk.
Variable Substitution
Workflows support variable substitution using {{variable}} syntax:
Variable Description {{variable_name}}User-defined variable {{previous_output}}Output from previous step {{step_name_output}}Output from specific step
Example
workflow = AgentFlow (
name = " pipeline " ,
variables ={ " topic " : " AI " },
steps =[
Task (
name = " research " ,
action = " Research {{ topic }} " ,
output = TaskOutputConfig ( variable = " research_data " )
),
Task (
name = " analyze " ,
action = " Analyze: {{ research_data }} "
),
Task (
name = " write " ,
action = " Write about {{ previous_output }} "
)
]
)
list_checkpoints()
List all saved workflow checkpoints.
def list_checkpoints () -> List [ Dict [ str , Any ]]
Returns
List of checkpoint info dicts with keys: name, workflow, completed_steps, saved_at.
Example
manager = WorkflowManager ()
checkpoints = manager . list_checkpoints ()
for cp in checkpoints :
print ( f " { cp [ ' name ' ] } : { cp [ ' completed_steps ' ] } steps completed" )
delete_checkpoint()
Delete a saved checkpoint.
def delete_checkpoint ( name : str ) -> bool
Parameters
Parameter Type Description namestrCheckpoint name to delete
Returns
True if deleted successfully, False if not found.
Example
manager = WorkflowManager ()
# Execute with checkpoint
result = manager . execute ( " deploy " , checkpoint = " deploy-v1 " )
# Resume if interrupted
result = manager . execute ( " deploy " , resume = " deploy-v1 " )
# Clean up
manager . delete_checkpoint ( " deploy-v1 " )
Workflow Patterns
PraisonAI provides helper functions for common workflow patterns.
Import
from praisonaiagents import AgentFlow , WorkflowContext , StepResult
from praisonaiagents import route , parallel , loop , repeat
route() - Decision-Based Branching
Routes to different steps based on the previous output.
route (
routes : Dict [ str , List ], # Key: pattern to match, Value: steps to execute
default : Optional [ List ] = None # Fallback steps
) -> Route
Example:
workflow = AgentFlow ( steps =[
classify_request , # Returns "approve" or "reject"
route ({
" approve " : [ approve_handler , notify_user ],
" reject " : [ reject_handler ],
" default " : [ fallback_handler ]
})
])
parallel() - Concurrent Execution
Execute multiple steps concurrently and combine results.
parallel ( steps : List ) -> Parallel
Example:
workflow = AgentFlow ( steps =[
parallel ([ research_market , research_competitors , research_customers ]),
summarize_results # Access via ctx.variables["parallel_outputs"]
])
loop() - Iterate Over Data
Execute a step for each item in a list, CSV file, or text file.
loop (
step : Any , # Step to execute for each item
over : Optional [ str ] = None , # Variable name containing list
from_csv : Optional [ str ] = None , # CSV file path
from_file : Optional [ str ] = None , # Text file path
var_name : str = " item " # Variable name for current item
) -> Loop
Examples:
# Loop over list variable
workflow = AgentFlow (
steps =[ loop ( process_item , over = " items " )],
variables ={ " items " : [ " a " , " b " , " c " ]}
)
# Loop over CSV file
workflow = AgentFlow ( steps =[
loop ( process_row , from_csv = " data.csv " )
])
In your handler, access the current item:
def process_item ( ctx : WorkflowContext ) -> StepResult :
item = ctx . variables [ " item " ] # Current item
index = ctx . variables [ " loop_index " ] # Current index
return StepResult ( output = f "Processed: { item } " )
repeat() - Evaluator-Optimizer Pattern
Repeat a step until a condition is met.
repeat (
step : Any , # Step to repeat
until : Optional [ Callable [[ WorkflowContext ], bool ]] = None , # Stop condition
max_iterations : int = 10 # Maximum iterations
) -> Repeat
Example:
def is_complete ( ctx : WorkflowContext ) -> bool :
return " done " in ctx . previous_result . lower ()
workflow = AgentFlow ( steps =[
repeat (
generator ,
until = is_complete ,
max_iterations = 5
)
])
Pattern Combinations
Patterns can be combined for complex workflows:
workflow = AgentFlow ( steps =[
# Step 1: Parallel research
parallel ([ research_a , research_b ]),
# Step 2: Route based on findings
route ({
" positive " : [ expand_research ],
" negative " : [ summarize_and_stop ]
}),
# Step 3: Iterate over results
loop ( process_finding , over = " findings " ),
# Step 4: Repeat until quality threshold
repeat ( refine_output , until = is_high_quality , max_iterations = 3 )
])
See Also
Workflows Guide Complete workflows documentation
Agent API Agent class reference