GraphWorkflow¶
A powerful workflow orchestration system that creates directed graphs of agents for complex multi-agent collaboration and task execution.
Overview¶
The GraphWorkflow
class is a sophisticated workflow management system that enables the creation and execution of complex multi-agent workflows. It represents workflows as directed graphs where nodes are agents and edges represent data flow and dependencies between agents. The system supports parallel execution, automatic compilation optimization, and comprehensive visualization capabilities.
Key features:
Feature | Description |
---|---|
Agent-based nodes | Each node represents an agent that can process tasks |
Directed graph structure | Edges define the flow of data between agents |
Parallel execution | Multiple agents can run simultaneously within layers |
Automatic compilation | Optimizes workflow structure for efficient execution |
Rich visualization | Generate visual representations using Graphviz |
Serialization | Save and load workflows as JSON |
Pattern detection | Automatically identifies parallel processing patterns |
Architecture¶
graph TB
subgraph "GraphWorkflow Architecture"
A[GraphWorkflow] --> B[Node Collection]
A --> C[Edge Collection]
A --> D[NetworkX Graph]
A --> E[Execution Engine]
B --> F[Agent Nodes]
C --> G[Directed Edges]
D --> H[Topological Sort]
E --> I[Parallel Execution]
E --> J[Layer Processing]
subgraph "Node Types"
F --> K[Agent Node]
K --> L[Agent Instance]
K --> M[Node Metadata]
end
subgraph "Edge Types"
G --> N[Simple Edge]
G --> O[Fan-out Edge]
G --> P[Fan-in Edge]
G --> Q[Parallel Chain]
end
subgraph "Execution Patterns"
I --> R[Thread Pool]
I --> S[Concurrent Futures]
J --> T[Layer-by-layer]
J --> U[Dependency Resolution]
end
end
Class Reference¶
Parameter | Type | Description | Default |
---|---|---|---|
id |
Optional[str] |
Unique identifier for the workflow | Auto-generated UUID |
name |
Optional[str] |
Human-readable name for the workflow | "Graph-Workflow-01" |
description |
Optional[str] |
Detailed description of the workflow | Generic description |
nodes |
Optional[Dict[str, Node]] |
Initial collection of nodes | {} |
edges |
Optional[List[Edge]] |
Initial collection of edges | [] |
entry_points |
Optional[List[str]] |
Node IDs that serve as starting points | [] |
end_points |
Optional[List[str]] |
Node IDs that serve as ending points | [] |
max_loops |
int |
Maximum number of execution loops | 1 |
task |
Optional[str] |
The task to be executed by the workflow | None |
auto_compile |
bool |
Whether to automatically compile the workflow | True |
verbose |
bool |
Whether to enable detailed logging | False |
Core Methods¶
add_node(agent: Agent, **kwargs)
¶
Adds an agent node to the workflow graph.
Parameter | Type | Description |
---|---|---|
agent |
Agent |
The agent to add as a node |
**kwargs |
Any |
Additional keyword arguments for the node |
Raises:
ValueError
: If a node with the same ID already exists
Example:
workflow = GraphWorkflow()
agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
workflow.add_node(agent, metadata={"priority": "high"})
add_edge(edge_or_source, target=None, **kwargs)
¶
Adds an edge to connect nodes in the workflow.
Parameter | Type | Description |
---|---|---|
edge_or_source |
Edge or str |
Either an Edge object or source node ID |
target |
str |
Target node ID (required if edge_or_source is not an Edge) |
**kwargs |
Any |
Additional keyword arguments for the edge |
Raises:
ValueError
: If source or target nodes don't exist
Example:
# Using Edge object
edge = Edge(source="agent1", target="agent2")
workflow.add_edge(edge)
# Using node IDs
workflow.add_edge("agent1", "agent2", metadata={"priority": "high"})
add_edges_from_source(source, targets, **kwargs)
¶
Creates a fan-out pattern where one source connects to multiple targets.
Parameter | Type | Description |
---|---|---|
source |
str |
Source node ID |
targets |
List[str] |
List of target node IDs |
**kwargs |
Any |
Additional keyword arguments for all edges |
Returns:
List[Edge]
: List of created Edge objects
Example:
workflow.add_edges_from_source(
"DataCollector",
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
)
add_edges_to_target(sources, target, **kwargs)
¶
Creates a fan-in pattern where multiple sources connect to one target.
Parameter | Type | Description |
---|---|---|
sources |
List[str] |
List of source node IDs |
target |
str |
Target node ID |
**kwargs |
Any |
Additional keyword arguments for all edges |
Returns:
List[Edge]
: List of created Edge objects
Example:
workflow.add_edges_to_target(
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
"SynthesisAgent"
)
add_parallel_chain(sources, targets, **kwargs)
¶
Creates a full mesh connection between multiple sources and targets.
Parameter | Type | Description |
---|---|---|
sources |
List[str] |
List of source node IDs |
targets |
List[str] |
List of target node IDs |
**kwargs |
Any |
Additional keyword arguments for all edges |
Returns:
List[Edge]
: List of created Edge objects
Example:
workflow.add_parallel_chain(
["DataCollector1", "DataCollector2"],
["Analyst1", "Analyst2", "Analyst3"]
)
Execution Methods¶
run(task: str = None, img: Optional[str] = None, *args, **kwargs) -> Dict[str, Any]
¶
Executes the workflow with optimized parallel agent execution.
Parameter | Type | Description |
---|---|---|
task |
str |
Task to execute (uses self.task if not provided) |
img |
Optional[str] |
Image path for vision-enabled agents |
*args |
Any |
Additional positional arguments |
**kwargs |
Any |
Additional keyword arguments |
Returns:
Dict[str, Any]
: Execution results from all nodes
Example:
arun(task: str = None, *args, **kwargs) -> Dict[str, Any]
¶
Async version of run for better performance with I/O bound operations.
Parameter | Type | Description |
---|---|---|
task |
str |
Task to execute |
*args |
Any |
Additional positional arguments |
**kwargs |
Any |
Additional keyword arguments |
Returns:
Dict[str, Any]
: Execution results from all nodes
Example:
Compilation and Optimization¶
compile()
¶
Pre-computes expensive operations for faster execution.
Example:
workflow.compile()
status = workflow.get_compilation_status()
print(f"Compiled: {status['is_compiled']}")
get_compilation_status() -> Dict[str, Any]
¶
Returns detailed compilation status information.
Returns:
Dict[str, Any]
: Compilation status including cache state and performance metrics
Example:
status = workflow.get_compilation_status()
print(f"Layers: {status['cached_layers_count']}")
print(f"Max workers: {status['max_workers']}")
Visualization Methods¶
visualize(format: str = "png", view: bool = True, engine: str = "dot", show_summary: bool = False) -> str
¶
Generates a visual representation of the workflow using Graphviz.
Parameter | Type | Description | Default |
---|---|---|---|
format |
str |
Output format ('png', 'svg', 'pdf', 'dot') | "png" |
view |
bool |
Whether to open the visualization | True |
engine |
str |
Graphviz layout engine | "dot" |
show_summary |
bool |
Whether to print parallel processing summary | False |
Returns:
str
: Path to the generated visualization file
Example:
output_file = workflow.visualize(
format="svg",
show_summary=True
)
print(f"Visualization saved to: {output_file}")
visualize_simple() -> str
¶
Generates a simple text-based visualization.
Returns:
str
: Text representation of the workflow
Example:
Serialization Methods¶
to_json(fast: bool = True, include_conversation: bool = False, include_runtime_state: bool = False) -> str
¶
Serializes the workflow to JSON format.
Parameter | Type | Description | Default |
---|---|---|---|
fast |
bool |
Whether to use fast JSON serialization | True |
include_conversation |
bool |
Whether to include conversation history | False |
include_runtime_state |
bool |
Whether to include runtime state | False |
Returns:
str
: JSON representation of the workflow
Example:
from_json(json_str: str, restore_runtime_state: bool = False) -> GraphWorkflow
¶
Deserializes a workflow from JSON format.
Parameter | Type | Description | Default |
---|---|---|---|
json_str |
str |
JSON string representation | Required |
restore_runtime_state |
bool |
Whether to restore runtime state | False |
Returns:
GraphWorkflow
: A new GraphWorkflow instance
Example:
save_to_file(filepath: str, include_conversation: bool = False, include_runtime_state: bool = False, overwrite: bool = False) -> str
¶
Saves the workflow to a JSON file.
Parameter | Type | Description | Default |
---|---|---|---|
filepath |
str |
Path to save the JSON file | Required |
include_conversation |
bool |
Whether to include conversation history | False |
include_runtime_state |
bool |
Whether to include runtime state | False |
overwrite |
bool |
Whether to overwrite existing files | False |
Returns:
str
: Path to the saved file
Example:
load_from_file(filepath: str, restore_runtime_state: bool = False) -> GraphWorkflow
¶
Loads a workflow from a JSON file.
Parameter | Type | Description | Default |
---|---|---|---|
filepath |
str |
Path to the JSON file | Required |
restore_runtime_state |
bool |
Whether to restore runtime state | False |
Returns:
GraphWorkflow
: Loaded workflow instance
Example:
Utility Methods¶
export_summary() -> Dict[str, Any]
¶
Generates a human-readable summary of the workflow.
Returns:
Dict[str, Any]
: Comprehensive workflow summary
Example:
summary = workflow.export_summary()
print(f"Workflow has {summary['structure']['nodes']} nodes")
print(f"Compilation status: {summary['compilation_status']['is_compiled']}")
set_entry_points(entry_points: List[str])
¶
Sets the entry points for the workflow.
Parameter | Type | Description |
---|---|---|
entry_points |
List[str] |
List of node IDs to serve as entry points |
Example:
set_end_points(end_points: List[str])
¶
Sets the end points for the workflow.
Parameter | Type | Description |
---|---|---|
end_points |
List[str] |
List of node IDs to serve as end points |
Example:
Class Methods¶
from_spec(agents, edges, entry_points=None, end_points=None, task=None, **kwargs) -> GraphWorkflow
¶
Constructs a workflow from a list of agents and connections.
Parameter | Type | Description | Default |
---|---|---|---|
agents |
List |
List of agents or Node objects | Required |
edges |
List |
List of edges or edge tuples | Required |
entry_points |
List[str] |
List of entry point node IDs | None |
end_points |
List[str] |
List of end point node IDs | None |
task |
str |
Task to be executed by the workflow | None |
**kwargs |
Any |
Additional keyword arguments | {} |
Returns:
GraphWorkflow
: A new GraphWorkflow instance
Example:
workflow = GraphWorkflow.from_spec(
agents=[agent1, agent2, agent3],
edges=[
("agent1", "agent2"),
("agent2", "agent3"),
("agent1", ["agent2", "agent3"]) # Fan-out
],
task="Analyze market data"
)
Examples¶
Basic Sequential Workflow¶
from swarms import Agent, GraphWorkflow
from swarms.prompts.multi_agent_collab_prompt import MULTI_AGENT_COLLAB_PROMPT_TWO
# Create agents
research_agent = Agent(
agent_name="ResearchAgent",
model_name="gpt-4",
system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
max_loops=1
)
analysis_agent = Agent(
agent_name="AnalysisAgent",
model_name="gpt-4",
system_prompt=MULTI_AGENT_COLLAB_PROMPT_TWO,
max_loops=1
)
# Build workflow
workflow = GraphWorkflow(name="Research-Analysis-Workflow")
workflow.add_node(research_agent)
workflow.add_node(analysis_agent)
workflow.add_edge("ResearchAgent", "AnalysisAgent")
# Execute
results = workflow.run("What are the latest trends in AI?")
print(results)
Parallel Processing Workflow¶
from swarms import Agent, GraphWorkflow
# Create specialized agents
data_collector = Agent(agent_name="DataCollector", model_name="gpt-4")
technical_analyst = Agent(agent_name="TechnicalAnalyst", model_name="gpt-4")
fundamental_analyst = Agent(agent_name="FundamentalAnalyst", model_name="gpt-4")
sentiment_analyst = Agent(agent_name="SentimentAnalyst", model_name="gpt-4")
synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")
# Build parallel workflow
workflow = GraphWorkflow(name="Market-Analysis-Workflow")
# Add all agents
for agent in [data_collector, technical_analyst, fundamental_analyst,
sentiment_analyst, synthesis_agent]:
workflow.add_node(agent)
# Create fan-out pattern: data collector feeds all analysts
workflow.add_edges_from_source(
"DataCollector",
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"]
)
# Create fan-in pattern: all analysts feed synthesis agent
workflow.add_edges_to_target(
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst"],
"SynthesisAgent"
)
# Execute
results = workflow.run("Analyze Bitcoin market trends")
print(results)
Complex Multi-Layer Workflow¶
from swarms import Agent, GraphWorkflow
# Create agents for different stages
data_collectors = [
Agent(agent_name=f"DataCollector{i}", model_name="gpt-4")
for i in range(1, 4)
]
analysts = [
Agent(agent_name=f"Analyst{i}", model_name="gpt-4")
for i in range(1, 4)
]
validators = [
Agent(agent_name=f"Validator{i}", model_name="gpt-4")
for i in range(1, 3)
]
synthesis_agent = Agent(agent_name="SynthesisAgent", model_name="gpt-4")
# Build complex workflow
workflow = GraphWorkflow(name="Complex-Research-Workflow")
# Add all agents
all_agents = data_collectors + analysts + validators + [synthesis_agent]
for agent in all_agents:
workflow.add_node(agent)
# Layer 1: Data collectors feed all analysts in parallel
workflow.add_parallel_chain(
[agent.agent_name for agent in data_collectors],
[agent.agent_name for agent in analysts]
)
# Layer 2: Analysts feed validators
workflow.add_parallel_chain(
[agent.agent_name for agent in analysts],
[agent.agent_name for agent in validators]
)
# Layer 3: Validators feed synthesis agent
workflow.add_edges_to_target(
[agent.agent_name for agent in validators],
"SynthesisAgent"
)
# Visualize and execute
workflow.visualize(show_summary=True)
results = workflow.run("Comprehensive analysis of renewable energy markets")
Workflow with Custom Metadata¶
from swarms import Agent, GraphWorkflow, Edge
# Create agents with specific roles
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
# Build workflow with metadata
workflow = GraphWorkflow(
name="Metadata-Workflow",
description="Workflow demonstrating metadata usage"
)
workflow.add_node(research_agent, metadata={"priority": "high", "timeout": 300})
workflow.add_node(analysis_agent, metadata={"priority": "medium", "timeout": 600})
# Add edge with metadata
edge = Edge(
source="ResearchAgent",
target="AnalysisAgent",
metadata={"data_type": "research_findings", "priority": "high"}
)
workflow.add_edge(edge)
# Execute with custom parameters
results = workflow.run(
"Analyze the impact of climate change on agriculture",
max_loops=2
)
Workflow Serialization and Persistence¶
from swarms import Agent, GraphWorkflow
# Create workflow
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
workflow = GraphWorkflow(name="Persistent-Workflow")
workflow.add_node(research_agent)
workflow.add_node(analysis_agent)
workflow.add_edge("ResearchAgent", "AnalysisAgent")
# Execute and get conversation
results = workflow.run("Research quantum computing applications")
# Save workflow with conversation history
filepath = workflow.save_to_file(
"quantum_research_workflow.json",
include_conversation=True,
include_runtime_state=True
)
# Load workflow later
loaded_workflow = GraphWorkflow.load_from_file(
filepath,
restore_runtime_state=True
)
# Continue execution
new_results = loaded_workflow.run("Continue with quantum cryptography analysis")
Advanced Pattern Detection¶
from swarms import Agent, GraphWorkflow
# Create a complex workflow with multiple patterns
workflow = GraphWorkflow(name="Pattern-Detection-Workflow", verbose=True)
# Create agents
agents = {
"collector": Agent(agent_name="DataCollector", model_name="gpt-4"),
"tech_analyst": Agent(agent_name="TechnicalAnalyst", model_name="gpt-4"),
"fund_analyst": Agent(agent_name="FundamentalAnalyst", model_name="gpt-4"),
"sentiment_analyst": Agent(agent_name="SentimentAnalyst", model_name="gpt-4"),
"risk_analyst": Agent(agent_name="RiskAnalyst", model_name="gpt-4"),
"synthesis": Agent(agent_name="SynthesisAgent", model_name="gpt-4"),
"validator": Agent(agent_name="Validator", model_name="gpt-4")
}
# Add all agents
for agent in agents.values():
workflow.add_node(agent)
# Create complex patterns
# Fan-out from collector
workflow.add_edges_from_source(
"DataCollector",
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"]
)
# Fan-in to synthesis
workflow.add_edges_to_target(
["TechnicalAnalyst", "FundamentalAnalyst", "SentimentAnalyst", "RiskAnalyst"],
"SynthesisAgent"
)
# Final validation step
workflow.add_edge("SynthesisAgent", "Validator")
# Compile and get status
workflow.compile()
status = workflow.get_compilation_status()
print(f"Compilation status: {status}")
print(f"Layers: {status['cached_layers_count']}")
print(f"Max workers: {status['max_workers']}")
# Visualize with pattern detection
workflow.visualize(show_summary=True, format="png")
Error Handling and Recovery¶
from swarms import Agent, GraphWorkflow
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create workflow with error handling
workflow = GraphWorkflow(
name="Error-Handling-Workflow",
verbose=True,
max_loops=1
)
# Create agents
try:
research_agent = Agent(agent_name="ResearchAgent", model_name="gpt-4")
analysis_agent = Agent(agent_name="AnalysisAgent", model_name="gpt-4")
workflow.add_node(research_agent)
workflow.add_node(analysis_agent)
workflow.add_edge("ResearchAgent", "AnalysisAgent")
# Execute with error handling
try:
results = workflow.run("Analyze market trends")
print("Workflow completed successfully")
print(results)
except Exception as e:
print(f"Workflow execution failed: {e}")
# Get workflow summary for debugging
summary = workflow.export_summary()
print(f"Workflow state: {summary['structure']}")
except Exception as e:
print(f"Workflow setup failed: {e}")
Conclusion¶
The GraphWorkflow
class provides a powerful and flexible framework for orchestrating complex multi-agent workflows. Its key benefits include:
Benefits¶
Benefit | Description |
---|---|
Scalability | Supports workflows with hundreds of agents through efficient parallel execution |
Flexibility | Multiple connection patterns (sequential, fan-out, fan-in, parallel chains) |
Performance | Automatic compilation and optimization for faster execution |
Visualization | Rich visual representations for workflow understanding and debugging |
Persistence | Complete serialization and deserialization capabilities |
Error Handling | Comprehensive error handling and recovery mechanisms |
Monitoring | Detailed logging and status reporting |
Use Cases¶
Use Case | Description |
---|---|
Research Workflows | Multi-stage research with data collection, analysis, and synthesis |
Content Generation | Parallel content creation with validation and refinement |
Data Processing | Complex ETL pipelines with multiple processing stages |
Decision Making | Multi-agent decision systems with voting and consensus |
Quality Assurance | Multi-stage validation and verification processes |
Automated Testing | Complex test orchestration with parallel execution |
Best Practices¶
Best Practice | Description |
---|---|
Use meaningful agent names | Helps with debugging and visualization |
Leverage parallel patterns | Use fan-out and fan-in for better performance |
Compile workflows | Always compile before execution for optimal performance |
Monitor execution | Use verbose mode and status reporting for debugging |
Save important workflows | Use serialization for workflow persistence |
Handle errors gracefully | Implement proper error handling and recovery |
Visualize complex workflows | Use visualization to understand and debug workflows |
The GraphWorkflow system represents a significant advancement in multi-agent orchestration, providing the tools needed to build complex, scalable, and maintainable AI workflows.