Skip to main content
A task is the fundamental building block and extension point of Flyte. Every unit of computation — from a simple data transformation to a distributed Spark job — is expressed as a task.

What is a task?

A Flyte task is characterized by:
  • Containerized execution: Each task runs in its own container on a Kubernetes Pod, isolated from all other tasks.
  • Strong typing: All inputs and outputs must be annotated with Python type hints. Flyte validates these at compile time and execution time.
  • Versioning: Tasks are versioned (typically aligned with the Git SHA) and immutable once registered.
  • Independent executability: Tasks can be run individually, outside of a workflow, for testing and development.
from typing import List
from flytekit import task


@task
def mean(values: List[float]) -> float:
    return sum(values) / len(values)
This is all it takes to define a Flyte task: a regular Python function decorated with @task. The type annotations on inputs and outputs are required.
Tasks and workflows must always be called with keyword arguments:
result = mean(values=[1.0, 2.0, 3.0])  # correct
result = mean([1.0, 2.0, 3.0])         # raises an error

Running a task locally

You can execute a Flyte task just like any regular Python function:
result = mean(values=[float(i) for i in range(1, 11)])
print(result)  # 5.5
Or via the pyflyte run CLI:
pyflyte run task.py mean --values '[1.0,2.0,3.0,4.0,5.0]'
To run it on a remote Flyte cluster:
pyflyte run --remote task.py mean --values '[1.0,2.0,3.0,4.0,5.0]'

Task types

Flyte distinguishes between tasks based on where and how they execute.
The most common task type. Any Python function decorated with @task becomes a PythonFunctionTask. When run on a cluster, it executes inside a container on a Kubernetes Pod.
from flytekit import task

@task
def process_data(x: List[float]) -> float:
    return sum(x)
Run arbitrary shell commands or binaries inside a container, without writing Python:
from flytekit import ContainerTask, kwtypes

container_task = ContainerTask(
    name="run_script",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(script_args=str),
    outputs=kwtypes(result=str),
    image="python:3.11-slim",
    command=["python", "/scripts/run.py"],
)
Flyte includes backend plugins for running queries on distributed data warehouses. These tasks do not require a Python function body — they delegate execution to the external service:
  • Athena (flytekitplugins-aws-athena)
  • BigQuery (flytekitplugins-bigquery)
  • Snowflake (flytekitplugins-snowflake)
  • Hive (via the Hive backend plugin)
Submit Apache Spark jobs directly from a Flyte task using the pyspark plugin:
from flytekitplugins.spark import Spark
from flytekit import task

@task(
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
        }
    )
)
def my_spark_task(partitions: int) -> int:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    # ... your Spark logic
    return partitions
Use the Kubeflow PyTorch plugin to run distributed training jobs:
import numpy as np
import torch.nn as nn
from flytekit import task, Resources
from flytekitplugins.kfpytorch import PyTorch

@task(
    task_config=PyTorch(num_workers=2),
    requests=Resources(cpu="2", gpu="1", mem="8Gi"),
    limits=Resources(cpu="4", gpu="2", mem="16Gi"),
    image="ghcr.io/flyteorg/flytecookbook:kfpytorch-latest",
)
def train_model(features: np.ndarray, target: np.ndarray) -> nn.Module:
    ...  # distributed training logic

Caching task outputs

Flyte supports memoization of task outputs. When you enable caching, Flyte checks whether an identical invocation (same inputs and cache_version) was executed before and returns the stored output instead of re-running the task.
from typing import List
from flytekit import task


@task(cache=True, cache_version="1")
def compute_mean(data: List[float]) -> float:
    return sum(data) / len(data)
  • cache=True enables caching for this task.
  • cache_version is a string you control. Change it when you want to invalidate the cache (for example, after fixing a bug in the task logic).
Caching works both locally and on a Flyte cluster.
For file-like data types such as FlyteFile and offloaded types like pandas.DataFrame, you can provide a hash function that determines the cache key based on file content.

Retries

Flyte categorizes failures into two types and handles them independently:
Error typeDescriptionConfiguration
User errorsApplication-level failures: logic errors, invalid inputs, value errorsretries parameter in @task
System errorsInfrastructure failures: spot preemptions, network issues, hardware faultsPlatform-level config (max-node-retries-system-failures)
from flytekit import task
from flytekit.exceptions.user import FlyteRecoverableException
import random


@task(retries=3)
def flaky_task(data: List[float]) -> float:
    if random.random() < 0.05:
        raise FlyteRecoverableException("Transient error, please retry")
    return sum(data) / len(data)
The number of user retries must be 10 or fewer. All user exceptions are considered non-recoverable unless the exception subclasses FlyteRecoverableException.

Spot instances and retries

Tasks marked interruptible=True run on preemptible (spot) instances. Preemptions count against the system retry budget, not your user retry budget. The last system retry automatically runs on a non-preemptible instance to guarantee completion:
@task(
    retries=3,           # user retry budget
    interruptible=True   # enables spot instances
)
def my_spot_task() -> None:
    ...

Timeouts

Use the timeout parameter to protect against tasks that hang indefinitely. After the timeout period elapses, the task is marked as failed:
from datetime import timedelta
from flytekit import task


@task(timeout=timedelta(hours=1))
def long_running_task(data: List[float]) -> float:
    return sum(data) / len(data)
A timed-out task is retried if it has a retry strategy defined.

Resource allocation

Flyte tasks can declare their resource requirements using the Resources object. This allows workflows to be composed of tasks with heterogeneous hardware needs:
from flytekit import task, Resources


@task(requests=Resources(cpu="2", mem="100Mi"))
def light_task() -> float:
    ...


@task(requests=Resources(cpu="16", mem="16Gi", gpu="1"))
def heavy_task() -> float:
    ...
You can also set limits to cap the maximum resources the task may use:
@task(
    requests=Resources(cpu="2", mem="8Gi"),
    limits=Resources(cpu="4", mem="16Gi"),
)
def scalable_task() -> None:
    ...

Map tasks

Use map_task to parallelize a task over a list of inputs without writing explicit parallelism logic:
import math
from typing import List, Tuple
from flytekit import task, workflow, map_task


@task
def sum_and_length(data: List[float]) -> List[float]:
    return [sum(data), float(len(data))]


@task
def prepare_partitions(data: List[float], n_partitions: int) -> List[List[float]]:
    size = math.ceil(len(data) / n_partitions)
    return [data[size * i: size * (i + 1)] for i in range(n_partitions)]


@task
def reduce(results: List[List[float]]) -> float:
    total, length = 0.0, 0.0
    for sub_total, sub_length in results:
        total += sub_total
        length += sub_length
    return total / length


@workflow
def parallelized_mean(data: List[float], n_partitions: int = 10) -> float:
    partitioned = prepare_partitions(data=data, n_partitions=n_partitions)
    results = map_task(sum_and_length)(data=partitioned)
    return reduce(results=results)

What makes a good Flyte task?

When deciding whether a unit of work is a good candidate for a Flyte task, consider:
  • Well-defined exit criteria: A task is expected to exit after processing its inputs. Long-running daemons do not fit the task model.
  • Repeatability: Under certain circumstances (retries, re-runs), a task may be executed multiple times with the same inputs. It should produce the same output every time. Avoid using random seeds based on the current clock.
  • Minimal side effects: Tasks should be pure functions where possible. When side effects are unavoidable (e.g., writing to a database), ensure the operation is idempotent.

Build docs developers (and LLMs) love