-
-
Notifications
You must be signed in to change notification settings - Fork 772
Expand file tree
/
Copy pathworkflow_checkpoints.py
More file actions
64 lines (55 loc) · 1.83 KB
/
workflow_checkpoints.py
File metadata and controls
64 lines (55 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""
Workflow Checkpoints Example
Demonstrates saving and resuming workflow execution using checkpoints.
Useful for long-running workflows that may be interrupted.
"""
from praisonaiagents import AgentFlow, Task
from praisonaiagents import AgentFlowManager
# Create a multi-step workflow
workflow = AgentFlow(
name="Long Process",
description="A workflow with checkpoints for resumability",
steps=[
Task(
name="step1",
action="Initialize the process and prepare data."
),
Task(
name="step2",
action="Process the first batch of data."
),
Task(
name="step3",
action="Process the second batch of data."
),
Task(
name="step4",
action="Finalize and generate report."
)
]
)
if __name__ == "__main__":
manager = WorkflowManager()
manager.workflows["Long Process"] = workflow
# Execute with checkpoint - saves after each step
print("=== Starting workflow with checkpoint ===")
result = manager.execute(
"Long Process",
default_llm="gpt-4o-mini",
checkpoint="my-checkpoint" # Saves progress after each step
)
print(f"Completed {len(result['results'])} steps")
# List available checkpoints
print("\n=== Available checkpoints ===")
checkpoints = manager.list_checkpoints()
for cp in checkpoints:
print(f" {cp['name']}: {cp['completed_steps']} steps completed")
# Resume from checkpoint (if workflow was interrupted)
# result = manager.execute(
# "Long Process",
# default_llm="gpt-4o-mini",
# resume="my-checkpoint" # Resumes from last saved state
# )
# Clean up checkpoint
manager.delete_checkpoint("my-checkpoint")
print("\n✅ Checkpoint deleted")