Skip to main content

What is a workflow?

A workflow is a function decorated with @DBOS.workflow() that orchestrates multiple steps into a durable, fault-tolerant process. When a workflow executes, DBOS automatically checkpoints its progress in Postgres. If your application crashes or restarts, DBOS automatically resumes all active workflows from their last completed step.
from dbos import DBOS

@DBOS.workflow()
def process_order(order_id: str):
    # Each step is checkpointed automatically
    payment_result = charge_payment(order_id)
    inventory_result = reserve_inventory(order_id)
    ship_order(order_id)
    return {"order_id": order_id, "status": "completed"}

@DBOS.step()
def charge_payment(order_id: str):
    # Call payment processor
    return {"charged": True}

@DBOS.step()
def reserve_inventory(order_id: str):
    # Reserve items
    return {"reserved": True}

@DBOS.step()
def ship_order(order_id: str):
    # Trigger shipping
    pass

Key characteristics

Durable execution

Workflows automatically resume from the last completed step after any interruption

Automatic checkpointing

DBOS stores workflow state in Postgres without any manual code

Composable

Workflows can call other workflows, steps, and transactions

Observable

Query workflow status and history programmatically via DBOSClient

How durability works

When you call a step or transaction from within a workflow, DBOS:
  1. Records the call in the system database before executing it
  2. Executes the operation (step or transaction)
  3. Records the result in the system database after completion
If your application crashes during step 2, when DBOS restarts:
  • It detects the incomplete workflow
  • It skips any steps that already completed (using their recorded results)
  • It resumes from the first incomplete step
Workflows must be deterministic in their control flow. Use steps for any non-deterministic operations like random number generation, API calls, or getting the current time.

Workflow decorator

The @DBOS.workflow() decorator accepts optional parameters:
name
str
Custom name for the workflow. Defaults to the function name.
validate_input
Callable
Function to validate workflow arguments before execution.
@DBOS.workflow(name="process_payment_v2")
def process_payment(amount: float, currency: str):
    validate_amount(amount)
    return charge_payment(amount, currency)

Starting workflows

Synchronous execution

Call a workflow directly to execute it synchronously:
result = process_order("order-123")

Asynchronous execution

Use DBOS.start_workflow() to start a workflow in the background and get a handle:
handle = DBOS.start_workflow(process_order, "order-123")
workflow_id = handle.get_workflow_id()
result = handle.get_result()  # Blocks until completion

From a queue

Enqueue workflows for background processing with concurrency control:
from dbos import Queue

order_queue = Queue("orders", concurrency=10)

@DBOS.workflow()
def process_batch(orders):
    handles = [order_queue.enqueue(process_order, o) for o in orders]
    return [h.get_result() for h in handles]

Child workflows

Workflows can call other workflows to build complex processes:
@DBOS.workflow()
def parent_workflow():
    # Call child workflow synchronously
    result1 = child_workflow("task1")
    
    # Start child workflow asynchronously
    handle = DBOS.start_workflow(child_workflow, "task2")
    
    # Do other work
    do_something_else()
    
    # Wait for async child
    result2 = handle.get_result()
    
    return [result1, result2]

@DBOS.workflow()
def child_workflow(task: str):
    return perform_task(task)

Workflow context

Access workflow metadata and utilities via the workflow context:
from dbos import DBOS

@DBOS.workflow()
def my_workflow():
    ctx = DBOS.get_context()
    
    # Access workflow metadata
    workflow_id = ctx.workflow_id
    authenticated_user = ctx.authenticated_user
    
    # Use durable sleep (survives restarts)
    DBOS.sleep(60)  # Sleep for 60 seconds
    
    # Access logger
    DBOS.logger.info(f"Processing workflow {workflow_id}")
    
    return "done"

Determinism requirements

Workflow code must be deterministic to ensure correct recovery. This means the workflow’s control flow must be the same on every execution.

❌ Don’t do this

import random
import datetime

@DBOS.workflow()
def bad_workflow():
    # Non-deterministic! Different on replay
    if random.random() > 0.5:
        step_one()
    
    # Non-deterministic! Time changes
    if datetime.datetime.now().hour < 12:
        morning_task()

✅ Do this instead

@DBOS.workflow()
def good_workflow():
    # Put non-deterministic logic in steps
    random_val = get_random_value()
    if random_val > 0.5:
        step_one()
    
    current_hour = get_current_hour()
    if current_hour < 12:
        morning_task()

@DBOS.step()
def get_random_value():
    import random
    return random.random()

@DBOS.step()
def get_current_hour():
    import datetime
    return datetime.datetime.now().hour

Recovery behavior

When your application restarts, DBOS automatically:
  1. Detects all workflows that were running when the application stopped
  2. For each workflow, replays it from the beginning
  3. Skips any steps that already completed (reusing their stored results)
  4. Continues execution from the first incomplete step
@DBOS.workflow()
def recovery_example():
    step_one()    # Already completed → skipped, uses stored result
    step_two()    # Already completed → skipped, uses stored result
    step_three()  # Not completed → executes now
    step_four()   # Not completed → executes now
Recovery happens automatically when you call DBOS.launch(). You don’t need to write any recovery code.

Workflow isolation

Each workflow execution is isolated:
  • Workflows have unique IDs (generated automatically or set via SetWorkflowID)
  • Workflows can run concurrently without interfering
  • Each workflow has its own context and state
from dbos import SetWorkflowID

# Set a custom workflow ID for idempotency
with SetWorkflowID("order-123-payment"):
    result = process_payment("order-123")

# Calling again with the same ID returns the cached result
with SetWorkflowID("order-123-payment"):
    result = process_payment("order-123")  # Returns previous result

Next steps

Steps

Learn about workflow steps

Transactions

Learn about database transactions

Workflow tutorial

Build your first workflow

Queue tutorial

Process workflows in queues

Build docs developers (and LLMs) love