A workflow is a function decorated with @DBOS.workflow() that orchestrates multiple steps into a durable, fault-tolerant process. When a workflow executes, DBOS automatically checkpoints its progress in Postgres. If your application crashes or restarts, DBOS automatically resumes all active workflows from their last completed step.
When you call a step or transaction from within a workflow, DBOS:
Records the call in the system database before executing it
Executes the operation (step or transaction)
Records the result in the system database after completion
If your application crashes during step 2, when DBOS restarts:
It detects the incomplete workflow
It skips any steps that already completed (using their recorded results)
It resumes from the first incomplete step
Workflows must be deterministic in their control flow. Use steps for any non-deterministic operations like random number generation, API calls, or getting the current time.
Enqueue workflows for background processing with concurrency control:
from dbos import Queueorder_queue = Queue("orders", concurrency=10)@DBOS.workflow()def process_batch(orders): handles = [order_queue.enqueue(process_order, o) for o in orders] return [h.get_result() for h in handles]
import randomimport datetime@DBOS.workflow()def bad_workflow(): # Non-deterministic! Different on replay if random.random() > 0.5: step_one() # Non-deterministic! Time changes if datetime.datetime.now().hour < 12: morning_task()
Workflows have unique IDs (generated automatically or set via SetWorkflowID)
Workflows can run concurrently without interfering
Each workflow has its own context and state
from dbos import SetWorkflowID# Set a custom workflow ID for idempotencywith SetWorkflowID("order-123-payment"): result = process_payment("order-123")# Calling again with the same ID returns the cached resultwith SetWorkflowID("order-123-payment"): result = process_payment("order-123") # Returns previous result