Skip to main content

Overview

The SequentialWorkflow class orchestrates the execution of a sequence of agents in a defined workflow. This class enables the construction and execution of a workflow where multiple agents (or callables) are executed in a specified order, passing tasks and optional data through the chain.

Key Features

  • Sequential Execution: Agents execute one after another in a defined order
  • Synchronous and Asynchronous Support: Both blocking and non-blocking execution modes
  • Batched Processing: Execute multiple tasks through the same workflow
  • Concurrent Task Processing: Run multiple tasks concurrently using thread pools
  • Team Awareness: Agents can be aware of their position in the team structure
  • Auto-save Support: Automatically save conversation history to workspace
  • Flexible Output Types: Support for various output formats (dict, list, string)

Installation

pip install -U swarms

Class Definition

class SequentialWorkflow:
    def __init__(
        self,
        id: str = "sequential_workflow",
        name: str = "SequentialWorkflow",
        description: str = "Sequential Workflow, where agents are executed in a sequence.",
        agents: List[Union[Agent, Callable]] = None,
        max_loops: int = 1,
        output_type: OutputType = "dict",
        shared_memory_system: callable = None,
        multi_agent_collab_prompt: bool = False,
        team_awareness: bool = False,
        autosave: bool = True,
        verbose: bool = False,
        *args,
        **kwargs,
    )

Parameters

id
str
default:"sequential_workflow"
Unique identifier for the workflow instance
name
str
default:"SequentialWorkflow"
Human-readable name for the workflow
description
str
Description of the workflow’s purpose
agents
List[Union[Agent, Callable]]
required
List of agents or callables to execute in sequence. Cannot be None or empty.
max_loops
int
default:"1"
Maximum number of times to execute the workflow. Must be greater than 0.
output_type
OutputType
default:"dict"
Format of the output from the workflow. Options include “dict”, “list”, “str”, etc.
shared_memory_system
callable
default:"None"
Optional callable for managing shared memory between agents
multi_agent_collab_prompt
bool
default:"False"
If True, appends a collaborative prompt to each agent’s system prompt
team_awareness
bool
default:"False"
Whether agents are aware of the team structure and their position
autosave
bool
default:"True"
Whether to enable autosaving of conversation history to workspace
verbose
bool
default:"False"
Whether to enable verbose logging

Methods

run(task, img=None, imgs=None, *args, **kwargs)

Executes a specified task through the agents in the dynamically constructed flow.
task
str
required
The task for the agents to execute
img
Optional[str]
default:"None"
An optional image input for the agents
imgs
Optional[List[str]]
default:"None"
Optional list of images for the agents
result
str
The final result after processing through all agents

run_batched(tasks)

Executes a batch of tasks through the agents sequentially.
tasks
List[str]
required
A list of tasks for the agents to execute. Must be a non-empty list of strings.
results
List[str]
A list of final results after processing through all agents

run_async(task)

Executes the specified task through the agents asynchronously.
task
str
required
The task for the agents to execute. Must be a non-empty string.
result
str
The final result after processing through all agents

run_concurrent(tasks)

Executes a batch of tasks through the agents concurrently using ThreadPoolExecutor.
tasks
List[str]
required
A list of tasks for the agents to execute. Must be a non-empty list of strings.
results
List[str]
A list of final results after processing through all agents

sequential_flow()

Constructs a string representation of the agent execution order.
flow
str
A string showing the order of agent execution (e.g., “AgentA -> AgentB -> AgentC”)

reliability_check()

Validates the workflow configuration and prepares agents for execution. Raises:
  • ValueError: If the agents list is None or empty, or if max_loops is set to 0

Attributes

AttributeTypeDescription
idstrUnique identifier for the workflow instance
namestrHuman-readable name for the workflow
descriptionstrDescription of the workflow’s purpose
agentsList[Union[Agent, Callable]]List of agents or callables to execute in sequence
max_loopsintMaximum number of times to execute the workflow
output_typeOutputTypeFormat of the output from the workflow
flowstrString representation of the agent execution order
agent_rearrangeAgentRearrangeInternal helper for managing agent execution

Usage Examples

Basic Sequential Workflow

from swarms import Agent, SequentialWorkflow
from swarms.models import OpenAIChat

# Initialize the language model
llm = OpenAIChat(
    model_name="gpt-4o",
    temperature=0.5,
)

# Create specialized agents
researcher = Agent(
    agent_name="Researcher",
    llm=llm,
    max_loops=1,
    system_prompt="You are a research specialist. Gather and analyze information."
)

writer = Agent(
    agent_name="Writer",
    llm=llm,
    max_loops=1,
    system_prompt="You are a content writer. Create engaging content based on research."
)

editor = Agent(
    agent_name="Editor",
    llm=llm,
    max_loops=1,
    system_prompt="You are an editor. Review and polish the content."
)

# Create sequential workflow
workflow = SequentialWorkflow(
    name="Content Creation Pipeline",
    description="Research -> Write -> Edit pipeline",
    agents=[researcher, writer, editor],
    max_loops=1,
    verbose=True
)

# Run the workflow
result = workflow.run("Create a blog post about artificial intelligence")
print(result)

Batched Task Processing

# Process multiple tasks through the same workflow
tasks = [
    "Analyze the impact of AI on healthcare",
    "Discuss the future of renewable energy",
    "Examine cybersecurity trends"
]

results = workflow.run_batched(tasks)

for task, result in zip(tasks, results):
    print(f"Task: {task}")
    print(f"Result: {result}")
    print("-" * 50)

Concurrent Task Execution

import asyncio

# Run multiple tasks concurrently
tasks = [
    "Research quantum computing",
    "Research blockchain technology",
    "Research machine learning"
]

results = workflow.run_concurrent(tasks)
print(f"Processed {len(results)} tasks concurrently")

Async Execution

import asyncio

async def process_task():
    result = await workflow.run_async("Analyze climate change solutions")
    return result

# Run async
result = asyncio.run(process_task())
print(result)

With Team Awareness

# Create workflow with team awareness
workflow = SequentialWorkflow(
    name="Collaborative Pipeline",
    agents=[researcher, writer, editor],
    team_awareness=True,  # Agents know about each other
    multi_agent_collab_prompt=True,  # Add collaboration prompt
    verbose=True
)

result = workflow.run("Create a comprehensive market analysis")

Custom Output Types

# Different output formats
workflow_dict = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="dict"  # Returns dict format
)

workflow_list = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="list"  # Returns list format
)

workflow_str = SequentialWorkflow(
    agents=[researcher, writer],
    output_type="str"  # Returns string format
)

With Autosave

import os

# Set workspace directory for autosave
os.environ["WORKSPACE_DIR"] = "./my_workspace"

workflow = SequentialWorkflow(
    name="SavedWorkflow",
    agents=[researcher, writer, editor],
    autosave=True,  # Automatically save conversation history
    verbose=True
)

result = workflow.run("Generate a report")
# Conversation history saved to ./my_workspace/swarms/SequentialWorkflow/

Error Handling

try:
    workflow = SequentialWorkflow(
        agents=[researcher, writer],
        max_loops=1
    )
    result = workflow.run("Process this task")
except ValueError as e:
    print(f"Configuration error: {e}")
except Exception as e:
    print(f"Execution error: {e}")

Best Practices

  1. Agent Design: Ensure each agent has a clear, specific role in the sequence
  2. Error Handling: Use try-except blocks around workflow execution
  3. Verbose Mode: Enable verbose logging during development for debugging
  4. Autosave: Enable autosave for important workflows to preserve conversation history
  5. Task Clarity: Provide clear, specific tasks to get the best results
  6. Resource Management: Use run_concurrent for I/O-bound tasks to improve performance
  7. Team Awareness: Enable team awareness for complex workflows where context matters

Common Use Cases

  • Content Creation Pipelines: Research -> Write -> Edit workflows
  • Data Processing: Extract -> Transform -> Load (ETL) pipelines
  • Analysis Workflows: Collect -> Analyze -> Report sequences
  • Quality Assurance: Create -> Review -> Approve chains
  • Multi-Stage Reasoning: Break complex problems into sequential steps

Build docs developers (and LLMs) love