-
-
Notifications
You must be signed in to change notification settings - Fork 772
Expand file tree
/
Copy pathworkflow_conditional.py
More file actions
83 lines (71 loc) · 2.58 KB
/
workflow_conditional.py
File metadata and controls
83 lines (71 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
Agentic Conditional Workflow Example
Demonstrates using should_run to conditionally execute agent steps
based on input or previous results.
"""
from praisonaiagents import Agent, Workflow, Task
# Condition functions
def is_technical(ctx) -> bool:
"""Only run technical review for technical content."""
keywords = ["code", "programming", "algorithm", "database", "api"]
return any(keyword in ctx.input.lower() for keyword in keywords)
def needs_creative(ctx) -> bool:
"""Only run creative enhancement for creative content."""
keywords = ["story", "poem", "creative", "write", "narrative"]
return any(keyword in ctx.input.lower() for keyword in keywords)
# Create agents
analyzer = Agent(
name="Analyzer",
role="Content Analyzer",
goal="Analyze and process content",
instructions="Analyze the given content and provide insights."
)
tech_reviewer = Agent(
name="TechReviewer",
role="Technical Reviewer",
goal="Review technical content",
instructions="Review the technical aspects and provide technical feedback."
)
creative_enhancer = Agent(
name="CreativeEnhancer",
role="Creative Enhancer",
goal="Enhance creative content",
instructions="Enhance the creative aspects and make it more engaging."
)
finalizer = Agent(
name="Finalizer",
role="Content Finalizer",
goal="Finalize the content",
instructions="Provide the final polished version of the content."
)
# Create workflow with conditional steps
workflow = AgentFlow(
name="Conditional Agentic Pipeline",
steps=[
analyzer, # Always runs
Task(
name="tech_review",
agent=tech_reviewer,
should_run=is_technical # Only runs for technical content
),
Task(
name="creative_enhance",
agent=creative_enhancer,
should_run=needs_creative # Only runs for creative content
),
finalizer # Always runs
]
)
if __name__ == "__main__":
# Test 1: Technical content
print("=== Test 1: Technical Content ===")
result = workflow.start("Review this Python code for best practices")
print(f"Output: {result['output'][:200]}...\n")
# Test 2: Creative content
print("=== Test 2: Creative Content ===")
result = workflow.start("Write a short story about a robot")
print(f"Output: {result['output'][:200]}...\n")
# Test 3: General content (neither technical nor creative)
print("=== Test 3: General Content ===")
result = workflow.start("What is the weather like today?")
print(f"Output: {result['output'][:200]}...")