Overview
In Serverless Workflow DSL, data flow management is crucial to ensure that the right data is passed between tasks and to the workflow itself. The DSL provides comprehensive mechanisms for validating, transforming, and managing data as it flows through the workflow execution.
Data flows through multiple transformation stages, each with specific purposes for validation, filtering, and context management.
Data Flow Pipeline
Data flows through a workflow in a structured pipeline with multiple stages:
Validation
Before the workflow starts, input data can be validated against a JSON Schema:
input:
schema:
document:
type: object
properties:
userId:
type: string
pattern: "^[a-zA-Z0-9]{8,}$"
email:
type: string
format: email
age:
type: integer
minimum: 18
maximum: 120
required:
- userId
- email
JSON Schema to validate the workflow input against before execution begins
If validation fails, the workflow execution faults with a ValidationError (https://serverlessworkflow.io/spec/1.0.0/errors/validation).
The input data can be transformed to ensure only relevant data in the expected format is passed into the workflow context:
input:
from: ${ { id: .userId, contact: .email, metadata: { timestamp: now, version: "1.0" } } }
Runtime expression that evaluates on the raw workflow input. Defaults to identity expression ${ . } which leaves input unchanged.
The result of the input.from expression sets the initial value for the $input runtime expression argument and is passed to the first task.
document:
dsl: '1.0.3'
namespace: users
name: user-registration
version: '1.0.0'
input:
schema:
document:
type: object
properties:
username:
type: string
minLength: 3
maxLength: 20
email:
type: string
format: email
profile:
type: object
properties:
firstName:
type: string
lastName:
type: string
dateOfBirth:
type: string
format: date
required:
- username
- email
- profile
from: ${ {
username: .username,
email: .email,
fullName: "\(.profile.firstName) \(.profile.lastName)",
registeredAt: now
} }
do:
- createUser:
call: userService
with:
data: ${ $input }
Each task receives raw input, which is:
- For the first task: the transformed workflow input
- For subsequent tasks: the transformed output of the previous task
Before a task executes, its raw input can be validated:
processOrder:
input:
schema:
document:
type: object
properties:
orderId:
type: string
items:
type: array
minItems: 1
items:
type: object
properties:
productId:
type: string
quantity:
type: integer
minimum: 1
required:
- orderId
- items
call: orderProcessor
with:
order: ${ . }
JSON Schema to validate task input before execution
The input data for a task can be transformed to match specific requirements:
processData:
input:
from: ${ {
id: .userId,
data: .payload.data,
options: {
validate: true,
normalize: true
}
} }
call: dataProcessor
with:
input: ${ $input }
Runtime expression that evaluates the raw task input. Defaults to identity expression ${ . }.
The result of input.from is set as the $input runtime expression argument and is used to evaluate any runtime expressions within the task definition.
do:
- fetchUserData:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/users/${ .userId }
- processUserData:
input:
from: ${ {
userId: .fetchUserData.output.id,
profile: {
name: .fetchUserData.output.firstName + " " + .fetchUserData.output.lastName,
email: .fetchUserData.output.email,
verified: .fetchUserData.output.emailVerified
},
preferences: .fetchUserData.output.settings // { marketing: true }
} }
call: userProcessor
with:
user: ${ $input }
Task Output Processing
After completing a task, its output can be transformed before passing it to the next task:
fetchData:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/data
output:
as: ${ {
items: .body.results,
count: .body.total,
processedAt: now
} }
Runtime expression that evaluates the raw task output. Defaults to identity expression ${ . }.
The result of output.as becomes the input for the next task and is set as the $output runtime expression argument.
Output Validation
After transformation, the task output can be validated:
processData:
call: processor
with:
data: ${ .inputData }
output:
as: ${ . }
schema:
document:
type: object
properties:
result:
type: string
enum: [success, failure]
data:
type: object
timestamp:
type: string
format: date-time
required:
- result
- timestamp
JSON Schema to validate the transformed task output
do:
- fetchProducts:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/products
output:
as: ${
.body.products | map({
id: .id,
name: .name,
price: .price,
available: .inventory > 0
})
}
Context Management
The workflow context ($context) is a persistent data store that maintains state across task executions.
Initial Context
The initial context is set to the transformed workflow input:
input:
from: ${ { userId: .id, timestamp: now } }
do:
- firstTask:
call: someFunction
with:
context: ${ $context } # Contains { userId: "...", timestamp: "..." }
Exporting to Context
Tasks can update the workflow context using the export.as expression:
fetchUser:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/users/${ .userId }
export:
as: ${ $context + { user: $output } }
Runtime expression that evaluates the transformed task output and produces the new context. Defaults to returning the existing context unchanged.
The result of export.as replaces the workflow’s current context and updates the $context runtime expression argument.
Context Validation
The exported context can be validated:
addUserData:
call: processor
with:
data: ${ .data }
export:
as: ${ $context + { userData: $output } }
schema:
document:
type: object
properties:
userId:
type: string
userData:
type: object
required:
- userId
- userData
JSON Schema to validate the exported context
Example: Building Context Incrementally
do:
- fetchUser:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/users/${ .userId }
export:
as: ${ $context + { user: $output.body } }
- fetchOrders:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/orders?userId=${ $context.user.id }
export:
as: ${ $context + { orders: $output.body } }
- fetchPreferences:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/preferences/${ $context.user.id }
export:
as: ${ $context + { preferences: $output.body } }
- generateReport:
call: reportGenerator
with:
user: ${ $context.user }
orders: ${ $context.orders }
preferences: ${ $context.preferences }
Workflow Output Processing
The final workflow output can be transformed:
do:
- processData:
call: processor
- finalizeResults:
call: finalizer
output:
as: ${ {
status: "completed",
result: .finalizeResults.output.result,
processedAt: now,
summary: {
itemsProcessed: .processData.output.count,
duration: "5 minutes"
}
} }
Runtime expression that evaluates the last task’s transformed output. Defaults to identity expression.
Output Validation
The transformed workflow output can be validated:
output:
as: ${ . }
schema:
document:
type: object
properties:
status:
type: string
enum: [success, failure]
result:
type: object
timestamp:
type: string
format: date-time
required:
- status
- result
- timestamp
JSON Schema to validate the transformed workflow output
Runtime Expression Arguments
Different runtime expression arguments are available at different stages:
| Expression Location | $context | $input | $output | $secrets | $task | $workflow | $runtime |
|---|
Workflow input.from | | | | ✔ | | ✔ | ✔ |
Task if | ✔ | | | ✔ | ✔ | ✔ | ✔ |
Task input.from | ✔ | | | ✔ | ✔ | ✔ | ✔ |
| Task definition | ✔ | ✔ | | ✔ | ✔ | ✔ | ✔ |
Task output.as | ✔ | ✔ | | ✔ | ✔ | ✔ | ✔ |
Task export.as | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ | ✔ |
Workflow output.as | ✔ | | | ✔ | | ✔ | ✔ |
Use $secrets with caution: incorporating them in expressions or passing them as call inputs may inadvertently expose sensitive information. Secrets can only be used in the input.from runtime expression to avoid unintentional bleeding.
Data Flow Patterns
Pattern: Data Enrichment Pipeline
do:
- fetchBaseData:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/base/${ .id }
output:
as: ${ .body }
export:
as: ${ { base: $output } }
- enrichWithDetails:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/details/${ $context.base.id }
output:
as: ${ .body }
export:
as: ${ $context + { details: $output } }
- enrichWithMetadata:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/metadata/${ $context.base.id }
output:
as: ${ .body }
export:
as: ${ $context + { metadata: $output } }
- combineData:
set:
enrichedData: ${ {
id: $context.base.id,
name: $context.base.name,
details: $context.details,
metadata: $context.metadata,
enrichedAt: now
} }
Pattern: Data Filtering and Projection
do:
- fetchAllUsers:
call: http
with:
method: get
endpoint:
uri: https://api.example.com/users
output:
as: ${
.body.users |
map(select(.active == true)) |
map({ id: .id, name: .name, email: .email })
}
do:
- processWithCondition:
call: dataSource
output:
as: ${
if .status == "premium" then
{ data: .fullData, tier: "premium" }
else
{ data: .basicData, tier: "basic" }
end
}
Pattern: Data Aggregation
do:
- fetchMultipleSources:
fork:
branches:
- source1:
call: http
with:
method: get
endpoint:
uri: https://api1.example.com/data
- source2:
call: http
with:
method: get
endpoint:
uri: https://api2.example.com/data
- source3:
call: http
with:
method: get
endpoint:
uri: https://api3.example.com/data
- aggregateResults:
set:
combined: ${ {
source1Data: .fetchMultipleSources.source1.output.body,
source2Data: .fetchMultipleSources.source2.output.body,
source3Data: .fetchMultipleSources.source3.output.body,
aggregatedAt: now
} }
Pattern: Data Validation Pipeline
do:
- validateStructure:
input:
schema:
document:
type: object
properties:
data:
type: object
required:
- data
call: structureValidator
with:
input: ${ . }
- validateBusinessRules:
call: businessRuleValidator
with:
data: ${ .validateStructure.output }
- validateIntegrity:
call: integrityValidator
with:
data: ${ .validateBusinessRules.output }
output:
schema:
document:
type: object
properties:
valid:
type: boolean
data:
type: object
required:
- valid
- data
Best Practices
Always validate inputs
Use JSON Schema validation for workflow and task inputs to catch errors early and ensure data integrity.
Transform data as close to the source as possible
Apply input transformations immediately after receiving data to work with clean, relevant data structures.
Keep context minimal
Only export data to context that needs to be shared across multiple tasks. Avoid cluttering context with unnecessary data.
Use explicit transformations
Make data transformations explicit and readable. Avoid complex nested expressions that are hard to understand.
Validate at boundaries
Validate data at workflow boundaries (input/output) and at critical transformation points.
Document data structures
Use descriptive property names and include schema documentation for complex data structures.
Common Pitfalls
Avoiding Context Pollution
# Bad: Exporting everything to context
export:
as: ${ $context + $output }
# Good: Exporting only what's needed
export:
as: ${ $context + { userId: $output.id, status: $output.status } }
Handling Null Values
# Bad: Assuming data exists
output:
as: ${ .user.profile.email }
# Good: Handling potential null values
output:
as: ${ .user.profile.email // "[email protected]" }
# Bad: Complex inline transformation
output:
as: ${ { a: .data.items[0].values[2].nested.prop, b: (.data.summary.total / .data.summary.count) * 100 } }
# Good: Step-by-step transformation
do:
- extractValue:
set:
extractedProp: ${ .data.items[0].values[2].nested.prop }
- calculatePercentage:
set:
percentage: ${ (.data.summary.total / .data.summary.count) * 100 }
- combineResults:
set:
result: ${ { a: .extractedProp, b: .percentage } }