Traditional data pipelines have predictable timing. An ETL job that loads a database takes 10 minutes today and, very likely, will take 10 minutes tomorrow. You can set timeouts. You can predict costs.
AI workflows break all these assumptions:
Copied to clipboard!
import flyte
env = flyte.TaskEnvironment()
@env.task
async def analyze_data(dataset_path: str) -> dict:
"""Agent-driven data analysis - how long will this take?"""
# This could take 30 seconds or 30 minutes
# The AI agent decides how many tools to call
# Each tool call might trigger more tool calls
response = await agent.analyze(dataset_path, tools=[
query_database,
run_statistical_analysis,
fetch_external_data,
train_small_model # This one might take hours!
])
return process_agent_response(response)
The challenges:
Unknown duration: An AI agent might reason for 5 minutes or 5 hours
Heterogeneous resource demands: GPU training, large model inference, data downloads
Dynamic behavior: The same input might trigger different tool sequences
External dependencies: API calls, database queries, file uploads that can't be easily repeated
When these workflows crash (and they will), what happens to:
That $50,000 GPU training job?
The 100GB dataset you just downloaded?
The API calls that created resources in external systems?
The 6 hours of AI agent reasoning?
Traditional orchestration will restart execution from scratch; but that's not acceptable.
Enter Durable Executions
Durable executions are a programming model that guarantees your workflow can survive any failure and resume exactly where it left off—as if the failure never happened.
Think of it like a video game with autosave. When your game crashes, you don't restart from the beginning. You load from the last checkpoint and continue playing. The gameplay after the crash is identical to what it would have been without the crash.
That's what durable executions provide for your AI workflows.
The Three Guarantees
A truly durable execution system provides three critical guarantees:
1. Atomic Actions: All-or-Nothing Operations
When an action starts, it runs to completion without interruption. You never observe partial states.
Copied to clipboard!
@env.task
async def train_model(data: pd.DataFrame, config: dict) -> flyte.io.File:
"""
This entire training run is atomic.
It either completes fully or hasn't started.
You never see a half-trained model.
"""
model = initialize_model(config)
# Even if this takes 48 hours, it's treated as one atomic operation
for epoch in range(config["epochs"]):
train_one_epoch(model)
save_checkpoint(f"checkpoint-{epoch}")
model_path = model.save("final_model.pkl")
return flyte.io.File.from_local(model_path)
2. Progress Tracking: Never Lose Your Place
The system logs every state transition. If something fails, it knows exactly where to resume.
Copied to clipboard!
@env.task
async def multi_step_pipeline(input_data: dict) -> dict:
"""
Each step is tracked. Failure recovery resumes from the last completed step.
"""
# Step 1: Logged when complete
cleaned = await clean_data(data=input_data)
# Step 2: Logged when complete
features = await extract_features(data=cleaned)
# Step 3: Logged when complete (and expensive!)
model_file = await train_expensive_model(features=features) # $50K, 48 hours
# Step 4: Logged when complete
results = await evaluate_model(model=model_file)
return results
# If crash happens after Step 2:
# Step 1: Skip (already completed)
# Step 2: Skip (already completed)
# Step 3: Execute (hasn't started yet)
# Step 4: Pending
3. Failure Transparency: Crashes Are Invisible
After recovery, the execution continues as if nothing happened. The sequence of operations is identical to a failure-free run.
This is the magic that prevents waste:
No duplicate API calls
No re-downloading data
No restarting expensive computations
No inconsistent states
How Flyte V2 Implements Durable Executions
Flyte V2 achieves durability through two key mechanisms working together: Run-to-Completion Semantics and Trace-Based Checkpointing.
Mechanism 1: Run-to-Completion Semantics
Every task decorated with `@env.task` becomes an atomic action. Once it starts executing, it proceeds until reaching a terminal state (success or failure) without preemption.
This provides exactly-once execution semantics: each task runs observably exactly once, producing exactly one result or error.
Mechanism 2: Trace-Based Checkpointing
For fine-grained checkpointing within a task, Flyte V2 uses Traces—similar to database write-ahead logs. Each traced operation is logged after execution, creating a recoverable history.
Copied to clipboard!
@env.task
async def process_large_batch(items: list[str]) -> list[dict]:
"""
Process thousands of items with external API calls.
Without traces: crash means restart from zero.
With traces: resume from last successful checkpoint.
"""
# Phase 1: Submit all items to external service (traced/checkpointed)
@flyte.trace
async def submit_jobs(items: list[str]) -> dict[str, str]:
"""Submit returns job IDs. This is checkpointed."""
job_ids = {}
for item in items:
job_id = await external_api.submit(item)
job_ids[item] = job_id
return job_ids
# Phase 2: Wait for all jobs to complete (traced/checkpointed)
@flyte.trace
async def wait_for_completion(job_mapping: dict[str, str]) -> list[dict]:
"""Poll job status until done. This is checkpointed."""
results = []
for item, job_id in job_mapping.items():
result = await external_api.poll_until_complete(job_id)
results.append(result)
return results
# Execute with automatic checkpointing
job_mapping = await submit_jobs(items) # Checkpoint 1
results = await wait_for_completion(job_mapping) # Checkpoint 2
return results
# Timeline with crash:
# 10:00 - Start execution
# 10:15 - submit_jobs completes → CHECKPOINT 1 saved (job_mapping stored)
# 10:20 - wait_for_completion starts polling
# 10:35 - CRASH (network failure, pod eviction, etc.)
# 10:40 - System recovers, workflow resumes
# 10:40 - submit_jobs: SKIP (checkpoint exists, use cached job_mapping)
# 10:40 - wait_for_completion: RESUME polling with same job IDs
# 10:50 - Complete successfully
#
# Result: No duplicate submissions! No wasted API calls!
Why this matters for AI workloads:
Expensive LLM calls: Each API call costs money. Traces prevent re-calling on restart.
Agent reasoning: AI agents build up context over time. Traces preserve this context.
External state changes: File uploads, database writes, resource creation—all recorded and not repeated.
Putting It Together: A Complete Example
Here's how atomic actions and traces work together to create a truly durable AI workflow:
Copied to clipboard!
import flyte
import pandas as pd
image = flyte.Image.from_debian_base().with_pip_packages("pandas", "httpx", "openai")
env = flyte.TaskEnvironment(name="ai-pipeline", image=image)
@env.task(cache="auto")
async def download_training_data(urls: list[str]) -> pd.DataFrame:
"""
ATOMIC ACTION: Downloads all data or fails completely.
CACHED: Won't re-execute on workflow restart.
"""
data = []
async with httpx.AsyncClient() as client:
for url in urls:
response = await client.get(url) # Might download GBs
data.append(parse_content(response.text))
return pd.DataFrame(data)
@env.task
async def ai_agent_training(dataset: pd.DataFrame, config: dict) -> str:
"""
ATOMIC ACTION with TRACES: Long-running AI agent with checkpoints.
If crash occurs, resumes from last checkpoint.
"""
# Phase 1: Agent explores hyperparameter space (traced)
@flyte.trace
async def hyperparameter_search(data: pd.DataFrame) -> dict:
"""Agent-driven search. Duration unknown. Result checkpointed."""
agent = OptimizerAgent()
best_config = await agent.optimize(
data=data,
max_iterations=100, # Could take minutes or hours
tools=[train_small_model, evaluate_metrics]
)
return best_config
# Phase 2: Train final model with best config (traced)
@flyte.trace
async def train_final_model(data: pd.DataFrame, hyperparams: dict) -> flyte.io.File:
"""Expensive training. $50K in compute. Result checkpointed."""
model = LargeLanguageModel()
await model.train(
data=data,
config=hyperparams,
epochs=100 # 48 hours on 8x A100 GPUs
)
model_path = model.save_to_storage()
return flyte.io.File.from_local(model_path)
# Execute phases with automatic checkpointing
best_config = await hyperparameter_search(data=dataset) # Checkpoint 1
model_file = await train_final_model(data=dataset, hyperparams=best_config) # Checkpoint 2
return model_file
@env.task
async def deploy_model(model_file: flyte.io.File) -> str:
"""
ATOMIC ACTION: Deployment is all-or-nothing.
"""
local_path = model_file.download()
endpoint = await production_deployer.deploy(local_path)
await production_deployer.run_smoke_tests(endpoint)
return endpoint
@env.task
async def complete_ml_pipeline(data_urls: list[str], config: dict) -> str:
"""
DURABLE WORKFLOW: Can survive any failure at any point.
Failure scenarios handled:
- Crash during data download → Resume download (or skip if cached)
- Crash during hyperparameter search → Resume from last checkpoint
- Crash during training → Resume from last training checkpoint
- Crash during deployment → Retry deployment (idempotent)
"""
# Step 1: Atomic, cached
training_data = await download_training_data(urls=data_urls)
# Step 2: Atomic with internal traces
model_file = await ai_agent_training(dataset=training_data, config=config)
# Step 3: Atomic
endpoint = await deploy_model(model_file=model_file)
return endpoint
The Developer Experience
The power of Flyte V2's durable executions is that you get all these guarantees with minimal code changes. Here's a diff comparison showing what gets removed (and what little gets added):
Copied to clipboard!
- from tenacity import retry, stop_after_attempt, wait_exponential
- import pickle
- import os
+ import flyte
+ import httpx
- # Global checkpoint management
- CHECKPOINT_DIR = "/tmp/checkpoints"
+ # Define task environment
+ image = flyte.Image.from_debian_base().with_pip_packages("httpx")
+ env = flyte.TaskEnvironment(name="data-pipeline", image=image)
- @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60))
- def download_with_retry(url: str):
- """Manual retry logic for every operation"""
- checkpoint_file = f"{CHECKPOINT_DIR}/{hash(url)}.pkl"
-
- # Check if already downloaded
- if os.path.exists(checkpoint_file):
- with open(checkpoint_file, 'rb') as f:
- return pickle.load(f)
-
- try:
- data = httpx.get(url).content
- # Manual checkpoint
- with open(checkpoint_file, 'wb') as f:
- pickle.dump(data, f)
- return data
- except Exception as e:
- if os.path.exists(checkpoint_file):
- with open(checkpoint_file, 'rb') as f:
- return pickle.load(f)
- raise
+ @env.task(cache="auto")
+ async def download_data(url: str) -> flyte.io.File:
+ """Just write the happy path. Flyte V2 handles everything."""
+ async with httpx.AsyncClient() as client:
+ content = await client.get(url)
+ return flyte.io.File.from_string(content.text, "data.txt")
- def train_with_retry(data: list):
- """Manual retry logic for training"""
- checkpoint_file = f"{CHECKPOINT_DIR}/training_checkpoint.pkl"
-
- if os.path.exists(checkpoint_file):
- with open(checkpoint_file, 'rb') as f:
- return pickle.load(f)
-
- try:
- model = Model()
- model.train(data)
- result = model.save()
-
- # Manual checkpoint
- with open(checkpoint_file, 'wb') as f:
- pickle.dump(result, f)
- return result
- except Exception as e:
- if os.path.exists(checkpoint_file):
- with open(checkpoint_file, 'rb') as f:
- return pickle.load(f)
- raise
+ @env.task
+ async def train_model(data_file: flyte.io.File) -> flyte.io.File:
+ """No try/except. No checkpointing code. No retry logic."""
+ data = data_file.read()
+ model = Model()
+ model.train(data)
+ model_path = model.save()
+ return flyte.io.File.from_local(model_path)
- def my_workflow(urls: list[str]):
- """Every function needs try/except, checkpointing, retry logic"""
-
- # Check workflow-level checkpoint
- if os.path.exists(f"{CHECKPOINT_DIR}/workflow_state.pkl"):
- with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'rb') as f:
- state = pickle.load(f)
- step = state['current_step']
- else:
- step = 0
-
- try:
- if step == 0:
- data = [download_with_retry(url) for url in urls]
- # Save checkpoint
- with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'wb') as f:
- pickle.dump({'current_step': 1, 'data': data}, f)
- step = 1
-
- if step == 1:
- # Load previous data
- with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'rb') as f:
- data = pickle.load(f)['data']
-
- model = train_with_retry(data)
- # Save checkpoint
- with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'wb') as f:
- pickle.dump({'current_step': 2, 'model': model}, f)
- step = 2
-
- # ... more steps with similar boilerplate ...
-
- except Exception as e:
- # Handle workflow-level failure
- log_failure(e)
- raise
+ @env.task
+ async def my_workflow(urls: list[str]) -> flyte.io.File:
+ """Clean. Readable. Fully durable."""
+ # Download all data files
+ data_files = [await download_data(url=url) for url in urls]
+
+ # Train model with first file (or combine them)
+ model_file = await train_model(data_file=data_files[0])
+
+ return model_file
What changed:
Removed ~75 lines of manual error handling, checkpointing, and retry logic
Added ~15 lines of simple, declarative task definitions
83% less code
100% more reliable
The complexity doesn't disappear—it moves to the platform where it belongs. Flyte V2 handles:
You write business logic. Flyte V2 handles reliability.
Conclusion: Durable Executions as a Primitive
Durable executions aren't a nice-to-have feature; they're a fundamental requirement for modern AI systems. As workflows become more autonomous, more expensive, and more unpredictable, the ability to survive failures transparently becomes critical.
Flyte V2 makes durable executions a first-class primitive in your AI infrastructure: