AI/ML notes

artifacts

Artifacts

πŸ” 1. Flow & Step: Organizing the Workflow

  • A flow is the whole pipeline (workflow) you define in Metaflow. It’s a Python class, typically decorated with @Flow.
  • A step is a method inside a flow class, decorated with @step. Each step represents a node in the workflow DAG.
from metaflow import FlowSpec, step

class HelloFlow(FlowSpec):

    @step
    def start(self):
        self.message = "Hello"
        self.next(self.end)

    @step
    def end(self):
        print(self.message)

if __name__ == '__main__':
    HelloFlow()

πŸ“¦ 2. Artifacts: Data Moving Through Steps

Metaflow uses the concept of artifacts to pass data between steps.

What is an artifact?

An artifact is any variable assigned to self.<var_name> inside a @step method. These variables are automatically serialized, stored, and versioned by Metaflow after the step completes, and then loaded into the next step when it's executed.

Example:

@step
def prepare_data(self):
    self.df = pd.read_csv("data.csv")  # <- df becomes an artifact
    self.next(self.train)

@step
def train(self):
    model = train_model(self.df)  # <- df is available here
    self.model = model            # <- model is a new artifact

Behind the Scenes:

  • Artifacts are pickled and stored in Metaflow's default datastore (can be S3, local disk, or custom store).
  • When you resume or inspect a flow run, Metaflow knows exactly where to fetch those artifacts.

🧬 3. Data Lineage and Versioning

Each run of a flow (i.e., python flow.py run) gets a unique run_id. Every step inside it is versioned, and artifacts are tied to a specific step and run.

You can access past runs via the CLI or programmatically:

metaflow status HelloFlow
metaflow run HelloFlow/<run-id>/train

Or from Python:

from metaflow import Flow

run = Flow('HelloFlow').latest_run
print(run['train'].data.model)  # Access the model artifact from the train step

This supports:

  • Reproducibility: You can rerun any past flow with the exact same data.
  • Experiment tracking: Inspect how data evolved through steps.
  • Caching: If a step’s code and inputs haven’t changed, Metaflow can reuse its outputs.

🧠 4. Artifacts Types & Serialization

Metaflow uses pickle by default to serialize artifacts, but also supports:

  • JSON serialization
  • Custom serialization (via plugins)
  • Support for large data (e.g., NumPy arrays, Pandas DataFrames) stored via S3 or external stores

For huge data, you can combine with:

  • @batch decorator to run in AWS Batch
  • @catch or @retry to handle transient failures

πŸ“ˆ 5. Integration with ML/DS tools

  • Artifacts can include:
    • Pandas DataFrames
    • NumPy arrays
    • PyTorch models
    • Pickled Python objects
  • You can log metrics with self.log_metric(...)
  • Integrate with MLflow using Metaflow cards or external tracking

πŸ“ 6. Datastore: Where Artifacts Live

The storage backend is configurable:

  • LocalFileStore (default for local runs): .metaflow directory
  • S3 for production environments
  • Custom stores for enterprise use

You can configure via:

export METAFLOW_DATASTORE=s3
export METAFLOW_DATATOOLS_S3ROOT=s3://your-bucket/metaflow

πŸ§ͺ 7. Artifacts in Branching & Parallel Steps

Artifacts in Metaflow are isolated per branch:

  • In self.next(self.step1, self.step2) scenario:
    • Each parallel path gets the artifacts it needs
    • They converge using @join steps
    • Metaflow handles merging and conflict resolution

πŸ”„ 8. Artifacts in Retry and Resume

If a step fails, or a run is interrupted:

  • Artifacts are preserved up to the last completed step
  • You can resume or retry from the failure point
  • Only steps downstream from the failed point are rerun

πŸ’‘ Summary Table

Concept Description
Flow The entire pipeline (a Python class)
Step A node in the pipeline (a method with @step)
Artifact Data saved via self.var passed between steps
Run A specific execution of the flow
Versioning Each flow, step, and artifact is versioned for reproducibility
Datastore Backend where artifacts are stored
Lineage Full trace of data + code used in a flow
Resuming Reuse of artifacts from previous runs

merge_artifacts

🧩 Context: When Is merge_artifacts Used?

You use merge_artifacts(inputs) inside a join step when you're combining multiple branches, typically after a @step(foreach=...) or self.next(step1, step2).

In those scenarios:

  • inputs is a list of objects, one per incoming branch.
  • Each input has its own set of artifacts (e.g., input.result, input.score, etc.)

By default, those artifacts are not merged, and you have to manually collect them:

self.results = [input.result for input in inputs]

But with merge_artifacts, you can merge them all into the current step’s self, like this:


πŸ§ͺ Example

from metaflow import FlowSpec, step

class MergeExampleFlow(FlowSpec):

    @step
    def start(self):
        self.next(self.branch1, self.branch2)

    @step
    def branch1(self):
        self.branch_name = 'branch1'
        self.value = 10
        self.next(self.join)

    @step
    def branch2(self):
        self.branch_name = 'branch2'
        self.value = 20
        self.next(self.join)

    @step
    def join(self, inputs):
        from metaflow import merge_artifacts
        merge_artifacts(self, inputs)  # Now `self.branch_name`, `self.value` become lists

        print(self.branch_name)  # ['branch1', 'branch2']
        print(self.value)        # [10, 20]

        self.next(self.end)

    @step
    def end(self):
        print("Done.")

if __name__ == '__main__':
    MergeExampleFlow()

πŸ” How Does merge_artifacts(self, inputs) Work?

It:

  • Scans all input steps' artifacts
  • Finds common artifact names
  • Merges them into lists (in self.<artifact>)

So instead of manually writing:

self.values = [i.value for i in inputs]
self.names = [i.branch_name for i in inputs]

You just write:

merge_artifacts(self, inputs)

βœ… Clean
βœ… Robust
βœ… Less boilerplate


⚠️ Caveats

  • Only works on artifacts with the same name across all branches.
  • The merged result is a list, in the same order as inputs.
  • It doesn’t deep-merge dicts or nested objects β€” just collects top-level values.

βœ… When Should You Use It?

  • After a foreach branch (e.g., grid search over parameters)
  • After a manual multi-branch fan-out (self.next(stepA, stepB, ...))
  • When you want to aggregate results from each branch into one step