artifacts
Artifacts
π 1. Flow & Step: Organizing the Workflow
- A flow is the whole pipeline (workflow) you define in Metaflow. Itβs a Python class, typically decorated with
@Flow. - A step is a method inside a flow class, decorated with
@step. Each step represents a node in the workflow DAG.
from metaflow import FlowSpec, step
class HelloFlow(FlowSpec):
@step
def start(self):
self.message = "Hello"
self.next(self.end)
@step
def end(self):
print(self.message)
if __name__ == '__main__':
HelloFlow()
π¦ 2. Artifacts: Data Moving Through Steps
Metaflow uses the concept of artifacts to pass data between steps.
What is an artifact?
An artifact is any variable assigned to self.<var_name> inside a @step method. These variables are automatically serialized, stored, and versioned by Metaflow after the step completes, and then loaded into the next step when it's executed.
Example:
@step
def prepare_data(self):
self.df = pd.read_csv("data.csv") # <- df becomes an artifact
self.next(self.train)
@step
def train(self):
model = train_model(self.df) # <- df is available here
self.model = model # <- model is a new artifact
Behind the Scenes:
- Artifacts are pickled and stored in Metaflow's default datastore (can be S3, local disk, or custom store).
- When you resume or inspect a flow run, Metaflow knows exactly where to fetch those artifacts.
𧬠3. Data Lineage and Versioning
Each run of a flow (i.e., python flow.py run) gets a unique run_id. Every step inside it is versioned, and artifacts are tied to a specific step and run.
You can access past runs via the CLI or programmatically:
metaflow status HelloFlow
metaflow run HelloFlow/<run-id>/train
Or from Python:
from metaflow import Flow
run = Flow('HelloFlow').latest_run
print(run['train'].data.model) # Access the model artifact from the train step
This supports:
- Reproducibility: You can rerun any past flow with the exact same data.
- Experiment tracking: Inspect how data evolved through steps.
- Caching: If a stepβs code and inputs havenβt changed, Metaflow can reuse its outputs.
π§ 4. Artifacts Types & Serialization
Metaflow uses pickle by default to serialize artifacts, but also supports:
- JSON serialization
- Custom serialization (via plugins)
- Support for large data (e.g., NumPy arrays, Pandas DataFrames) stored via S3 or external stores
For huge data, you can combine with:
@batchdecorator to run in AWS Batch@catchor@retryto handle transient failures
π 5. Integration with ML/DS tools
- Artifacts can include:
- Pandas DataFrames
- NumPy arrays
- PyTorch models
- Pickled Python objects
- You can log metrics with
self.log_metric(...) - Integrate with MLflow using Metaflow cards or external tracking
π 6. Datastore: Where Artifacts Live
The storage backend is configurable:
- LocalFileStore (default for local runs):
.metaflowdirectory - S3 for production environments
- Custom stores for enterprise use
You can configure via:
export METAFLOW_DATASTORE=s3
export METAFLOW_DATATOOLS_S3ROOT=s3://your-bucket/metaflow
π§ͺ 7. Artifacts in Branching & Parallel Steps
Artifacts in Metaflow are isolated per branch:
- In
self.next(self.step1, self.step2)scenario:- Each parallel path gets the artifacts it needs
- They converge using
@joinsteps - Metaflow handles merging and conflict resolution
π 8. Artifacts in Retry and Resume
If a step fails, or a run is interrupted:
- Artifacts are preserved up to the last completed step
- You can
resumeorretryfrom the failure point - Only steps downstream from the failed point are rerun
π‘ Summary Table
| Concept | Description |
|---|---|
| Flow | The entire pipeline (a Python class) |
| Step | A node in the pipeline (a method with @step) |
| Artifact | Data saved via self.var passed between steps |
| Run | A specific execution of the flow |
| Versioning | Each flow, step, and artifact is versioned for reproducibility |
| Datastore | Backend where artifacts are stored |
| Lineage | Full trace of data + code used in a flow |
| Resuming | Reuse of artifacts from previous runs |
merge_artifacts
π§© Context: When Is merge_artifacts Used?
You use merge_artifacts(inputs) inside a join step when you're combining multiple branches, typically after a @step(foreach=...) or self.next(step1, step2).
In those scenarios:
inputsis a list of objects, one per incoming branch.- Each input has its own set of artifacts (e.g.,
input.result,input.score, etc.)
By default, those artifacts are not merged, and you have to manually collect them:
self.results = [input.result for input in inputs]
But with merge_artifacts, you can merge them all into the current stepβs self, like this:
π§ͺ Example
from metaflow import FlowSpec, step
class MergeExampleFlow(FlowSpec):
@step
def start(self):
self.next(self.branch1, self.branch2)
@step
def branch1(self):
self.branch_name = 'branch1'
self.value = 10
self.next(self.join)
@step
def branch2(self):
self.branch_name = 'branch2'
self.value = 20
self.next(self.join)
@step
def join(self, inputs):
from metaflow import merge_artifacts
merge_artifacts(self, inputs) # Now `self.branch_name`, `self.value` become lists
print(self.branch_name) # ['branch1', 'branch2']
print(self.value) # [10, 20]
self.next(self.end)
@step
def end(self):
print("Done.")
if __name__ == '__main__':
MergeExampleFlow()
π How Does merge_artifacts(self, inputs) Work?
It:
- Scans all input steps' artifacts
- Finds common artifact names
- Merges them into lists (in
self.<artifact>)
So instead of manually writing:
self.values = [i.value for i in inputs]
self.names = [i.branch_name for i in inputs]
You just write:
merge_artifacts(self, inputs)
β
Clean
β
Robust
β
Less boilerplate
β οΈ Caveats
- Only works on artifacts with the same name across all branches.
- The merged result is a list, in the same order as
inputs. - It doesnβt deep-merge dicts or nested objects β just collects top-level values.
β When Should You Use It?
- After a
foreachbranch (e.g., grid search over parameters) - After a manual multi-branch fan-out (
self.next(stepA, stepB, ...)) - When you want to aggregate results from each branch into one step