AI/ML notes

ch3.intro_to_metaflow

Ch 3. Introducing Metaflow

The basics of Metaflow

Why was Metaflow created, and what problem does it solve?

  • Origin: Developed at Netflix in 2017 to empower data scientists to independently build, deliver, and operate complete data science applications.
  • Business need:
    • Large companies like Netflix have numerous potential use cases.
    • Rapid experimentation with minimal overhead is essential.
    • Promising experiments should easily transition to production without requiring large teams.

How does Metaflow align with the four Vs of data science infrastructure?**

  • Volume:

    • Supports more applications with fewer people.
    • Reduces accidental complexity by providing a unified workflow-based approach.
  • Variety:

    • Flexible across different types of data science applications.
    • Opinionated about lower infrastructure layers (to ensure reliability), less opinionated about top layers (to support diverse use cases).
  • Velocity:

    • Streamlines prototyping and production interaction.
    • Uses idiomatic Python to enhance developer speed and comfort.
  • Validity:

    • Enforces best practices to support production-grade robustness, even without DevOps expertise.

What is Metaflow’s role from an engineering perspective?

  • Acts as a substrate for integration, not a replacement for existing tools.
  • Recognizes that most companies already use solid tools (e.g., for storage, compute, ML libraries).
  • Focuses on bridging and binding these components into cohesive data science workflows.
  • Enables integration into business systems rather than isolation on a separate platform.

How is Metaflow designed for flexibility and extensibility?

  • Plugin architecture:

    • Allows different backends for different stack layers.
    • Supports a range of operations using cloud-native primitives.
    • Compatible with major cloud providers (compute and storage abstractions).
  • Gentle adoption curve:

    • Start on a local machine in “single-player mode.”
    • Scale to cloud infrastructure incrementally as project needs grow.

Metaflow workflow

What are the key ideas for writing a basic Metaflow workflow?

  • A workflow simplifies structuring data science applications by organizing logic into sequential steps, similar to notebook cells.
  • Each workflow is defined as a directed acyclic graph (DAG) where steps (nodes) connect via directed edges.
  • Metaflow provides structure, built-in execution control, and automatic experiment tracking.

What are the six rules for defining a Metaflow workflow?

  1. Class definition:

    • A workflow is a Python class derived from FlowSpec.
    • Class names conventionally end with Flow.
  2. Defining steps:

    • A step is a method with the @step decorator.
    • Method body can contain arbitrary Python code.
    • Optional docstrings can describe each step.
  3. Step execution:

    • Metaflow runs each step as a separate task, an atomic unit of execution.
  4. Starting step:

    • The first step must be named start.
  5. Defining step transitions:

    • Use self.next(step_name) as the final line in a step to define the next step.
  6. Ending step:

    • The final step must be named end, with no self.next() required.
  7. Single flow per module:

    • One flow per Python file.
    • Instantiate the flow inside if __name__ == '__main__':.

Hello world

from metaflow import FlowSpec, step

class HelloWorldFlow(FlowSpec):

    @step
    def start(self):
        """Starting point"""
        print("This is start step")
        self.next(self.hello)

    @step
    def hello(self):
        """Just saying hi"""
        print("Hello World!")
        self.next(self.end)

    @step
    def end(self):
        """Finish line"""
        print("This is end step")

if __name__ == '__main__':
    HelloWorldFlow()

How to read and understand a Metaflow workflow?

  1. Locate the start step: Marks the beginning of the flow.
  2. Trace execution using self.next(): Identify the sequence of steps.
  3. Read subsequent steps: Continue tracing until reaching the end step.

How to run and validate a Metaflow flow?

  • Save the flow in helloworld.py.

  • Validate structure without running steps:

    python helloworld.py
  • Disable linter (optional):

    python helloworld.py --no-pylint
  • Show DAG structure:

    python helloworld.py show
  • Run the flow:

    python helloworld.py run

What information is shown in the run output header?

  • Timestamp: When output was generated.
  • Pathspec: Uniquely identifies the task (run_id/step/task_id).
  • Step name: Current step being executed.
  • PID (process ID): OS-level process executing the task.
  • Log message: Output from Metaflow or user-defined code.

Why are unique identifiers important in Metaflow?

  • Enable automatic experiment tracking.
  • Help identify and compare different runs.
  • Support reproducibility by tracking exact versions and results.

How to inspect logs from a specific past run?

  • Use the logs subcommand with a valid pathspec:

    python helloworld.py logs <pathspec>
  • Example:

    python helloworld.py logs 1609557277904772/start/1
  • View available options:

    python helloworld.py logs --help

### What built-in features does Metaflow provide?

  • Automatic conversion of Python script into a CLI tool.
  • No manual setup for command-line parsing or log management.
  • Each step is a separate subprocess
  • Steps resemble notebook cells but are executed as independent OS tasks.
  • Experiment tracking is automatic, no additional setup needed.
  • The structure ensures clarity and simplifies debugging, unlike unstructured Python scripts.

What are the key questions to determine data flow in a workflow?

  1. Input access:

    • How the workflow accesses raw or source data.
  2. Internal state transfer:

    • How intermediate (transformed) data is passed between steps.
  3. Output accessibility:

    • How the final results or artifacts are made available to external systems.

How does step execution determine data flow in a Metaflow workflow?

  • Steps are executed in a directed order, from the start to the end.
  • Data flows downstream only, from earlier steps to later ones.
  • Reverse flow is not possible

Why does explicit data flow matter in workflows compared to notebooks?

  • Notebook behavior:

    • Allows arbitrary execution order.
    • Jupyter kernel maintains an implicit hidden state.
    • Results can become unpredictable or inconsistent, depending on execution order.
  • Workflow behavior:

    • Enforces explicit, ordered execution.
    • Each step’s output becomes input to the next step.
    • Ensures reproducibility and consistency of results.
  • Notebooks are well-suited for exploratory tasks and ad-hoc analysis.

  • For production workflows or complex pipelines, explicit data flow in a DAG structure is preferred for accuracy and maintainability.


How do workflows improve upon notebook-style development for serious applications?

  • Eliminates risk of accidental re-evaluation inconsistencies.
  • Forces clear dependencies between transformations.
  • Ensures that all outputs are produced in a deterministic manner.
  • Makes workflows easier to debug, test, and maintain.

Managing data flow

What is the challenge of managing state in distributed workflows?

  • In single-process software, state can remain in memory between steps.
  • In distributed workflows (e.g., using multiple computers or GPUs), in-memory state cannot be shared directly.
  • Therefore, state must be transferred between steps running on different machines.

How can we transfer state between workflow steps?

  • Persist the state after a step completes:
    • Save relevant data (variables, intermediate results) to storage.
  • Load the state before a new step starts:
    • Retrieve previously saved data so the step can continue processing.

What are common implementation options for persisting state?

  • Workflow frameworks often do not enforce a specific method.
  • Users choose their own storage method (e.g., databases).
  • Each step includes custom logic to:
    • Load necessary data.
    • Store output state after execution.

What are the downsides of manual persistence?

  • Adds boilerplate code to every step.
  • Increases cognitive load for the data scientist.
  • Leads to workarounds:
    • Merging unrelated operations into one step to avoid boilerplate.
    • Persisting only outputs strictly needed by downstream steps.
  • Workflow logic becomes harder to follow (poor readability).
  • Debugging failures in production becomes more difficult.
  • Missing historical context reduces observability and traceability.

  • Use a transparent mechanism for state transfer:
    • Abstracts away physical separation of computing resources.
    • Minimizes boilerplate code.
  • Encourage liberal persistence of data, even if not strictly required.
  • Enhanced monitoring, debugging, reproducibility, and sharing of workflows.
  • Storage cost is minimal relative to the cost of a data scientist’s time.
  • Avoid redundant input data storage—addressed in detail in Chapter 7.
  • Workflow design should prioritize logical clarity over implementation shortcuts.

Metaflow Artifacts

How does Metaflow make data flow nearly transparent to the user?

  • Automatic persistence of all instance variables (self.var) in each step.
  • These persisted instance variables are called artifacts.
  • Artifacts can be any serializable Python object (via pickle).
  • Stored in a central datastore managed by Metaflow.

What is the lifecycle of an artifact in Metaflow?

  • Step completion:
    • All instance variables are serialized and stored as immutable artifacts in the datastore.
    • Each artifact is permanently associated with its producing step via a pathspec.
  • Subsequent steps:
    • May read artifacts from previous steps.
    • May create new versions, but never overwrite previous ones.

Example: CounterFlow using persisted state

from metaflow import FlowSpec, step

class CounterFlow(FlowSpec):

    @step
    def start(self):
        self.count = 0
        self.next(self.add)

    @step
    def add(self):
        print("The count is", self.count, "before incrementing")
        self.count += 1
        self.next(self.end)

    @step
    def end(self):
        self.count += 1
        print("The final count is", self.count)

if __name__ == '__main__':
    CounterFlow()
  • self.count = 0 is initialized in start.
  • Incremented in add → persisted as a new artifact.
  • Incremented again in end.
  • Run using:
    python counter.py run

What happens under the hood when managing state?

  • Each task runs in its own process, possibly on a different computer.
  • Snapshotting occurs at the end of each step:
    • All self variables are serialized and stored.
    • Enables resuming, debugging, and cross-machine execution.

How can you inspect artifacts?

  • Use the dump command to inspect step outputs:
    python counter.py dump <run_id>/end/3
  • Use the Metaflow Client API for programmatic access (covered later in section 3.3.2).

Rule of thumb for using instance variables

  • Use self.variable for:
    • Any data that must persist across steps.
    • Any data that may be helpful for debugging.
  • Use local variables for:
    • Temporary, within-step computations.

How does Metaflow’s datastore function internally?

  • After task execution:
    • Metaflow inspects instance variables.
    • Each is serialized (currently with pickle) into a byte blob (e.g., ab0ef2).
  • Stored as immutable artifacts:
    • Linked to step metadata (e.g., start -> ab0ef2).
  • When accessed by another step:
    • Artifact is fetched from the datastore using metadata and flow order.
  • Datastore is content-addressed (similar to Git):
    • Artifacts are hashed by content.
    • Automatic deduplication prevents redundant storage.

Benefits of this approach

  • Minimizes user effort in managing state explicitly.
  • Enables reproducibility, observability, and debugging.
  • Efficient storage without user intervention.
  • Artifact versioning enables easy tracking and auditing of workflow runs.

More on step

What makes a good step in a Metaflow workflow?

  • Think of steps as checkpoints:
    • Artifacts are persisted at the end of each step.
    • Workflow execution can be resumed from any completed step.
    • Artifacts are available for inspection after step completion.

Why keep steps reasonably small?

  • Improves resilience:
    • Less work lost in case of failure.
  • Enhances observability:
    • You can monitor progress and intermediate state more frequently.
  • Enables fine-grained debugging:
    • Narrow down where issues occur.
  • Supports incremental development:
    • Easier to build and test workflows in parts.

Why not make steps too small?

  • Each step incurs overhead:
    • Task launching.
    • Artifact persistence.
  • Too many tiny steps may:
    • Slow down the workflow.
    • Create unnecessary complexity.
  • If overhead becomes apparent, you can merge small steps later.

How does step size affect code readability?

  • Use python flow.py show to inspect the workflow graph.
  • Large steps:
    • May hide logical structure.
    • Can make the graph less meaningful.
  • Smaller, focused steps:
    • Improve code clarity.
    • Make workflow structure more comprehensible.

Rule of thumb

  • Organize workflows into logical, explainable steps.
  • When uncertain:
    • Favor smaller steps—easier to understand and debug.
    • You can always refactor and combine them if needed.

Metaflow parameters

How do you pass data into a Metaflow flow?

  • Use Parameters to pass external values at the time of running the flow.
  • Unlike artifacts, which pass data between steps, Parameters provide data to the flow at the start.

What are Metaflow Parameters?

  • Special class-level constructs that:
    • Are immutable once the run starts.
    • Are available to all steps, including start.
    • Are treated as artifacts, so they're tracked and inspectable.
  • Parameters simplify experiment tracking by making runs reproducible and traceable.
  • All parameters are recorded like other artifacts, aiding future inspection and debugging.

How do you define a Parameter?

Use the Parameter() constructor at the class level. Four main elements are required:

  1. Instantiate Parameter at class scope.
  2. Assign it to a variable (e.g., animal, count).
  3. Specify a name shown to users (e.g., 'creature', 'count').
  4. Set a type:
    • Either via type=... (e.g., float, int)
    • Or by providing a default value (type is inferred)

Example:

from metaflow import FlowSpec, Parameter, step

class ParameterFlow(FlowSpec):

    animal = Parameter('creature', help="Specify an animal", required=True)
    count = Parameter('count', help="Number of animals", default=1)
    ratio = Parameter('ratio', help="Ratio between 0.0 and 1.0", type=float)

    @step
    def start(self):
        print(self.animal, "is a string of", len(self.animal), "characters")
        print("Count is an integer: %s+1=%s" % (self.count, self.count + 1))
        print("Ratio is a", type(self.ratio), "whose value is", self.ratio)
        self.next(self.end)

    @step
    def end(self):
        print("done!")

if __name__ == '__main__':
    ParameterFlow()

How do you run a flow with parameters?

  • Required parameters must be specified via CLI:
python parameters.py run --creature seal
  • Optional parameters use defaults unless specified:
python parameters.py run --creature seal --count 10 --ratio 0.3
  • Metaflow automatically converts values to specified types.

Can you change a parameter’s value in your code?

  • No — parameters are immutable.
  • To manipulate a parameter, copy it to another artifact:
self.mutable_count = self.count
self.mutable_count += 1

How to avoid specifying parameters repeatedly?

  • Set parameters using environment variables:
export METAFLOW_RUN_CREATURE=dinosaur
python parameters.py run --ratio 0.25
  • CLI values override environment variables:
python parameters.py run --creature otter --count 10 --ratio 0.3
  • creature is now set to "otter" instead of "dinosaur".

Complex parameters

How do you define complex parameters in Metaflow?

  • Basic parameters support scalar types: str, int, float, bool.
  • For more complex structures (e.g., lists, dicts), use JSON-encoded parameters.

How do JSON-encoded parameters work?

  • JSON values are passed as strings on the command line.
  • Use the JSONType type from Metaflow for decoding.
from metaflow import FlowSpec, Parameter, step, JSONType

class JSONParameterFlow(FlowSpec):
    mapping = Parameter('mapping',
                        help="Specify a mapping",
                        default='{"some": "default"}',
                        type=JSONType)

    @step
    def start(self):
        for key, value in self.mapping.items():
            print('key', key, 'value', value)
        self.next(self.end)

    @step
    def end(self):
        print('done!')
  • Run with:

    python json_parameter.py run --mapping '{"mykey": "myvalue"}'
  • Use single quotes to prevent shell issues with special characters.


How do you handle large JSON parameters?

  • Store JSON in a file (e.g., myconfig.json):

    echo '{"largekey": "largevalue"}' > myconfig.json
  • Pass using shell substitution:

    python json_parameter.py run --mapping "$(cat myconfig.json)"

How do you include actual files as parameters?

  • Use IncludeFile for small/medium-size files (e.g., CSV, JSON).
  • Artifacts created via IncludeFile are snapshotted and versioned.
from metaflow import FlowSpec, Parameter, step, IncludeFile
from io import StringIO
import csv

class CSVFileFlow(FlowSpec):
    data = IncludeFile('csv',
                       help="CSV file to be parsed",
                       is_text=True)
    delimiter = Parameter('delimiter',
                          help="delimiter",
                          default=',')

    @step
    def start(self):
        fileobj = StringIO(self.data)
        for i, row in enumerate(csv.reader(fileobj, delimiter=self.delimiter)):
            print("row %d: %s" % (i, row))
        self.next(self.end)

    @step
    def end(self):
        print('done!')
  • Create a test CSV:

    echo "first,second,third\na,b,c" > test.csv
  • Run with:

    python csv_file.py run --csv test.csv

Why use IncludeFile instead of reading files directly?

  • Files passed via IncludeFile become immutable artifacts:
    • Versioned alongside the flow run.
    • Available even if the original file is deleted or changed later.
  • Supports reproducibility and traceability of input data.

Miscellaneous Notes

  • Use JSONType for structured config-like inputs.
  • Use IncludeFile for datasets or larger structured inputs.
  • IncludeFile is not meant for very large files—chapter 7 covers scalable data handling.

Branches

What is the role of branches in Metaflow workflows?

  • Branches allow concurrent execution of steps.
  • Enables efficient use of resources like multicore CPUs or distributed clusters.
  • Provides a simple abstraction of parallelism, easier than low-level concurrency models like multithreading.

When should you use branches?

  • Use branches when steps can run independently based on data flow:
    • If step A produces data needed by both B and C, but B and C don’t need each other’s output → branch B and C from A.
    • Join their outputs later in a common step D if needed.
  • Example DAG:
    • A (fetch data)
    • B (train model 1) & C (train model 2) — run in parallel
    • D (compare and choose best model)

What are the benefits of branching?

  • Performance: Steps B and C execute in parallel.
  • Clarity: Reflects the true data flow and step dependencies.
  • Maintainability: Easier to understand and reason about.

Rule of thumb

When two or more steps can be executed independently, define them as parallel branches.


Why not automate parallelization?

  • Automatic parallelism is hard with arbitrary Python:
    • Python code often lacks enough structure for safe auto-parallelization.
    • Steps may use external libraries or services.
  • Manual control leads to better clarity and fewer errors.
  • Workflows are also communication tools, meant to be human-readable.

What are valid DAG structures in Metaflow?

  • Split step: A step that initiates branching (e.g., A).
  • Join step: A step that merges branches back together (e.g., D).
  • Every split must have a corresponding join.

Rules for DAG validity:

  1. Every split must be matched by a join (like matching parentheses).
  2. A join step can only join steps that share a common split parent.
  • Invalid: Joining steps that came from different splits.
  • Valid: All joined branches must originate from the same split step.


Why are these constraints necessary?

  • Metaflow’s DAG constraints ensure clean, reproducible parallel workflows.
  • Branching is a core feature to speed up processing and clarify structure.
  • The constraints prevents ambiguous artifact lineage.
  • Keeps DAGs clear and readable:
    • In a valid DAG, no arrows (edges) have to cross.
  • Simplifies tracking and debugging data flow across steps.

Static branch

What is a static DAG in Metaflow?

  • A static DAG has a fully defined structure before execution begins.
  • All branches and joins are declared explicitly using self.next(...).

How is branching defined in a static DAG?

  • Split step: Use self.next(step1, step2, ...) to define parallel branches.
  • Join step: Add an extra inputs parameter to access upstream branches.

Why do we need to merge artifacts after a branch?

  • Branches create divergent data flows—artifacts may have different values in each branch.
  • Join steps act as barriers: downstream steps can't access any upstream artifacts unless explicitly reassigned or merged.
  • Even unchanged artifacts (like creature) must be explicitly reassigned in the join step to make them available downstream.

Example: CounterBranchFlow with static branches and manual merging

from metaflow import FlowSpec, step

class CounterBranchFlow(FlowSpec):

    @step
    def start(self):
        self.creature = "dog"
        self.count = 0
        self.next(self.add_one, self.add_two)

    @step
    def add_one(self):
        self.count += 1
        self.next(self.join)

    @step
    def add_two(self):
        self.count += 2
        self.next(self.join)

    @step
    def join(self, inputs):
        self.count = max(inp.count for inp in inputs)
        print("count from add_one", inputs.add_one.count)
        print("count from add_two", inputs.add_two.count)
        self.creature = inputs[0].creature  # manual reassignment
        self.next(self.end)

    @step
    def end(self):
        print("The creature is", self.creature)
        print("The final count is", self.count)

if __name__ == '__main__':
    CounterBranchFlow()
  • Run with:

    python counter_branch.py run
  • Comment out the self.creature reassignment in join to see the flow fail due to a missing artifact.

  • You can access branches in the join step via:

    • inputs[0] (first branch)
    • inputs.branch_name
    • for inp in inputs (for aggregation with sum, max, etc.)
  • Metaflow executes branches in parallel processes, enabling true concurrency on multicore systems.


How does merge_artifacts() help?

  • Automatically reassigns non-divergent artifacts.
  • Replaces boilerplate like self.creature = inputs[0].creature.
  • Must be called after manually merging divergent artifacts like count.
self.merge_artifacts(inputs)

How to exclude specific artifacts from merging?

  • Use the exclude option for artifacts you don’t want to propagate downstream:
self.merge_artifacts(inputs, exclude=['increment'])
  • Useful for internal or debug-only variables.

Example:

from metaflow import FlowSpec, step

class CounterBranchHelperFlow(FlowSpec):

    @step
    def start(self):
        self.creature = "dog"
        self.count = 0
        self.next(self.add_one, self.add_two)

    @step
    def add_one(self):
        self.increment = 1
        self.count += self.increment
        self.next(self.join)

    @step
    def add_two(self):
        self.increment = 2
        self.count += self.increment
        self.next(self.join)

    @step
    def join(self, inputs):
        self.count = max(inp.count for inp in inputs)
        print("count from add_one", inputs.add_one.count)
        print("count from add_two", inputs.add_two.count)
        self.merge_artifacts(inputs, exclude=['increment']) # !!!!!!!!!!!
        self.next(self.end)

    @step
    def end(self):
        print("The creature is", self.creature)
        print("The final count", self.count)

if __name__ == '__main__':
    CounterBranchHelperFlow()
  • Run with:
    python counter_branch_helper.py run


What happens if you don’t merge artifacts properly?

  • Metaflow raises an error if:
    • A divergent artifact is left unmerged.
    • A downstream step accesses an artifact not reassigned in the join step.

Summary of branching and merging in Metaflow:

  • Use self.next(...) for static branching.
  • Join step must handle all artifacts—either by:
    • Manual reassignment
    • merge_artifacts(), with or without exclude
  • Branching enables parallelism, but merging ensures correct and consistent data flow.

Dynamic branch

What is dynamic branching in Metaflow?

  • Dynamic branches are created at runtime based on data values.
  • Implemented using the foreach construct.
  • Commonly used for data parallelism:
    • Running the same step multiple times with different data (e.g., training models per country).
    • Unlike static branches, where each branch runs different code.

When should you use dynamic branches?

  • When the same operation needs to be applied to each item in a list.
  • Typical use cases:
    • Parallel training, scoring, evaluation
    • Hyperparameter tuning
    • Batch data processing

Syntax for a foreach split

  • Use self.next(step_name, foreach='list_artifact')
  • The referenced artifact must be a Python list
from metaflow import FlowSpec, step

class ForeachFlow(FlowSpec):

    @step
    def start(self):
        self.creatures = ['bird', 'mouse', 'dog']
        self.next(self.analyze_creatures, foreach='creatures')

    @step
    def analyze_creatures(self):
        print("Analyzing", self.input)
        self.creature = self.input
        self.score = len(self.creature)
        self.next(self.join)

    @step
    def join(self, inputs):
        self.best = max(inputs, key=lambda x: x.score).creature
        self.next(self.end)

    @step
    def end(self):
        print(self.best, 'won!')

if __name__ == '__main__':
    ForeachFlow()
  • Run the flow:
    python foreach.py run

Key concepts for foreach steps

  • self.input is available inside the foreach step:
    • It contains one item from the input list.
    • Must be copied to another artifact if you need to access it later.
  • All iterations of the foreach step run in parallel.

Merging after foreach

  • join step works the same as in static branching:
    • Receives inputs, a list of branches.
    • You can use aggregation patterns, e.g., max(), sum(), etc.
  • Use merge_artifacts(inputs) to merge non-divergent values.

Parallelism patterns and foreach

  • The foreach pattern maps to common parallel computing models:
    • MapReduce
    • Bulk Synchronous Parallel
    • Fork-Join
    • Parallel Map
  • It matches how tools like TensorFlow or PyTorch optimize computation at a lower level.

Philosophy of Metaflow

  • Metaflow focuses on human-centric application structure.
  • Delegates machine-level optimization (like matrix parallelism) to ML libraries.
  • Encourages defining clear workflows with data parallelism handled through foreach.

Miscellaneous Notes

  • You can see unique task identifiers in logs like analyze_creatures/2.
  • These help track individual branches and their outputs.
  • Dynamic branching is ideal for scalable, repeatable operations on datasets.

Control concurrency

How does Metaflow control concurrency in foreach branches?

  • foreach can create thousands of tasks from a list dynamically.
  • By default, Metaflow limits the number of parallel splits and concurrent tasks to protect your system.

What are the two key concurrency control options?

1. --max-num-splits

  • Controls the maximum number of tasks a foreach can spawn.
  • Prevents accidentally spawning massive numbers of tasks.
  • Default limit: 100
  • Override with:
    python wide_foreach.py run --max-num-splits 10000
  • Or set an environment variable:
    export METAFLOW_RUN_MAX_NUM_SPLITS=100000

2. --max-workers

  • Controls how many tasks run concurrently (processes).

  • Default limit: 16

  • Can be set lower (e.g., to save memory) or higher (if your system allows):

    python wide_foreach.py run --max-num-splits 1000 --max-workers 8

Example: WideForeachFlow with 1,000 tasks

from metaflow import FlowSpec, step

class WideForeachFlow(FlowSpec):

    @step
    def start(self):
        self.ints = list(range(1000))
        self.next(self.multiply, foreach='ints')

    @step
    def multiply(self):
        self.result = self.input * 1000
        self.next(self.join)

    @step
    def join(self, inputs):
        self.total = sum(inp.result for inp in inputs)
        self.next(self.end)

    @step
    def end(self):
        print('Total sum is', self.total)

if __name__ == '__main__':
    WideForeachFlow()
  • Run (will fail without override):
    python wide_foreach.py run
  • Fix with:
    python wide_foreach.py run --max-num-splits 1000

How does Metaflow execute these tasks?

  • Uses a local scheduler:
    • Converts workflow steps to tasks (processes).
    • Manages execution order and concurrency.
    • Respects --max-workers as the concurrency cap.
  • Tasks are queued if they exceed --max-workers.

  • If a foreach splits into 8 tasks:

    • --max-num-splits=8 allows it.
    • --max-workers=2 means only 2 tasks run at once.
    • Tasks will run in 4 mini-batches of 2.
  • These limits do not affect workflow logic, only execution.

  • Helps safely test large-scale workflows locally.

  • Metaflow’s scheduler handles resource management automatically during runs.

Practical tips

  • Set --max-workers to match or stay below your CPU core count.
  • Lower it further if tasks use significant memory or GPU resources.
  • Use time to benchmark performance under different settings:
    time python wide_foreach.py run --max-num-splits 1000 --max-workers 8
    time python wide_foreach.py run --max-num-splits 1000 --max-workers 64

Starting a new project

How can you approach a new data science project using workflows?

  • Use the spiral development recipe—build workflows iteratively, from outer structure to core logic.
  • Helps break down the blank-file paralysis and ensures focus on key elements.

Spiral development path

  1. Define the business problem you are solving.
  2. Identify the input data and how to read it.
  3. Define the desired outputs and how to store or deliver them.
  4. Choose initial modeling techniques to turn input into output.
  • This approach ensures the problem, data, and business context guide the solution.

Why this method works well

  1. Clarifies objectives early to avoid building the wrong thing.
  2. Data prep is often underestimated—starting early saves time.
  3. Output integration can be complex—discovering requirements early helps.
  4. Start simple—a working baseline allows measurement and iterative improvement.

Infrastructure requirement

  • Should support iterative development and easy experimentation.
  • Metaflow is built to enable this exact pattern.

Example: Starting a new classification project

  • Business problem: Classify wine types using Scikit-Learn’s wine dataset.
  • Start with a minimal flow that loads and splits the data.

Code: classifier_train_v1.py

  • This skeleton serves as a reusable template for future classification tasks with different datasets.
  • Focus on getting a minimal version running before expanding functionality.
from metaflow import FlowSpec, step

class ClassifierTrainFlow(FlowSpec):

    @step
    def start(self):
        from sklearn import datasets
        from sklearn.model_selection import train_test_split

        X, y = datasets.load_wine(return_X_y=True)
        self.train_data, \
        self.test_data, \
        self.train_labels, \
        self.test_labels = train_test_split(X, y, test_size=0.2, random_state=0)

        print("Data loaded successfully")
        self.next(self.end)

    @step
    def end(self):
        self.model = 'nothingburger'
        print('done')

if __name__ == '__main__':
    ClassifierTrainFlow()
  • Save to: classifier_train_v1.py

  • Install Scikit-Learn (if not installed):

    pip install sklearn
  • Run the flow:

    python classifier_train_v1.py run

How to inspect artifacts to confirm data was loaded

  • Use dump with a real pathspec from your run:
    python classifier_train_v1.py dump <your_run_id>/start/1
  • Output confirms artifacts exist (though large arrays may not display inline).

Notes

  • In Metaflow, it's good practice to place import statements inside the steps where they are used.
    • Reduces unnecessary imports and improves modularity.
  • The current flow loads and splits the data; further steps (e.g., model training, evaluation) will be added incrementally.

Accessing results with the Client API

How do you access results programmatically in Metaflow?

  • Use the Client API to explore and access artifacts from past runs.
  • Artifacts are read-only and tied to steps, tasks, and runs.
  • Helpful for:
    • Debugging
    • Reusing outputs
    • Chaining workflows

Client API container hierarchy

Each layer nests within the previous:

  1. Metaflow: All flows in the environment.
  2. Flow: All runs of a specific flow.
  3. Run: All steps executed during a single run.
  4. Step: All tasks in a step (one or many for foreach).
  5. Task: Holds artifacts and logs.
  6. Data artifact: A specific variable stored in a task.

Accessing artifacts via pathspec or shortcuts

  • Three ways to navigate:

    from metaflow import Run
    
    # Direct pathspec
    run = Run("ClassifierTrainFlow/1611541088765447")
    
    # Access step and artifact
    run['start'].task.data.train_data
    from metaflow import Flow
    
    # Shortcut to latest run
    run = Flow("ClassifierTrainFlow").latest_run
    train_data = run['start'].task.data.train_data
    # Iterate over all steps of a run
    for step in run:
        print(step.id)

  • Open Jupyter in your flow directory:

    jupyter-notebook
  • In a notebook cell:

    from metaflow import Flow
    
    run = Flow("ClassifierTrainFlow").latest_run
    run['start'].task.data.train_data  # inspect artifact
  • Try inspecting:

    • train_labels
    • test_data
    • model

Exercise ideas

  • Run the training flow again → .latest_run will change.
  • Use older runs by listing Flow("ClassifierTrainFlow").runs.
  • Use help() to inspect object properties:
    help(run)

Reusing artifacts in another flow

  • Use the Client API to access outputs from a different flow.
  • Example: Create a prediction flow that reuses the trained model from ClassifierTrainFlow.
from metaflow import FlowSpec, step, Flow, Parameter, JSONType

class ClassifierPredictFlow(FlowSpec):

    vector = Parameter('vector', type=JSONType, required=True)

    @step
    def start(self):
        run = Flow('ClassifierTrainFlow').latest_run
        self.train_run_id = run.pathspec
        self.model = run['end'].task.data.model
        print("Input vector", self.vector)
        self.next(self.end)

    @step
    def end(self):
        print("Model", self.model)

if __name__ == '__main__':
    ClassifierPredictFlow()
  • Save to: classifier_predict_v1.py

  • Run with a test vector:

    python classifier_predict_v1.py run --vector '[1,2]'
  • Output should print:

    Model nothingburger
    

Notes

  • self.train_run_id = run.pathspec captures the training run’s ID.
  • Useful for:
    • Traceability
    • Debugging surprising predictions
    • Auditing production results
  • The Client API works across notebooks, scripts, and interactive shells.
  • It’s ideal for chaining flows and building multi-stage pipelines without coupling training and inference.

Debugging failures

Common situation: failure during prototyping

  • In ClassifierTrainFlow, we train two models in parallel: KNN and SVM.

  • train_svm fails due to an invalid kernel name:

    svm.SVC(kernel='polynomial')  # ❌ should be 'poly'
  • Expected error:

    ValueError: 'polynomial' is not in list
    

Step-by-step debugging guide

1. Find the error message

  • When running the flow:

    • Error trace is printed with the failing step name (e.g., train_svm).
    • If many steps/logs are printed, use the logs command:
      python classifier_train.py logs <run_id>/train_svm/1
  • Or use the Client API to find and inspect failed tasks:

    from metaflow import Flow
    
    for step in Flow("ClassifierTrainFlow").latest_run:
        for task in step:
            if not task.successful:
                print("Task %s failed:" % task.pathspec)
                print("-- Stdout --")
                print(task.stdout)
                print("-- Stderr --")
                print(task.stderr)
  • Optional: add filters like if 'svm' in task.stderr: for targeted debugging.


2. Understand why the code failed

  • Investigate the error using:

    • Documentation

    • Google

    • Notebook inspection of artifacts leading up to the failure:

      from metaflow import Flow
      
      run = Flow("ClassifierTrainFlow").latest_run
      run['train_svm'].task.data.train_data  # or any other relevant artifact
  • Optional: Use IDE debuggers (e.g., in VS Code or PyCharm) with extra config for subprocesses.


3. Test and resume execution

  • Fix the line in train_svm:

    self.model = svm.SVC(kernel='poly')  # ✅ correct argument
  • Instead of restarting the full run, use the resume command:

    python classifier_train.py resume
  • This resumes from the failed step and reuses successful results (e.g., start and train_knn).


Additional resume options

  • Resume from a specific step (and all following steps):

    python classifier_train.py resume train_knn
  • Resume from a specific past run:

    python classifier_train.py resume --origin-run-id 1611609148294496 train_knn
  • Parameters cannot be changed during resume (to ensure consistency with reused artifacts).

Note:

  • Resumed runs receive their own unique Run ID.
  • They are fully tracked and accessible via the Client API.
  • Artifacts from resumed runs are consistent with their origin unless rerun steps are altered.

Final flow after the fix

from metaflow import FlowSpec, step

class ClassifierTrainFlow(FlowSpec):

    @step
    def start(self):
        from sklearn import datasets
        from sklearn.model_selection import train_test_split

        X, y = datasets.load_wine(return_X_y=True)
        self.train_data, self.test_data, self.train_labels, self.test_labels = \
            train_test_split(X, y, test_size=0.2, random_state=0)

        self.next(self.train_knn, self.train_svm)

    @step
    def train_knn(self):
        from sklearn.neighbors import KNeighborsClassifier

        self.model = KNeighborsClassifier()
        self.model.fit(self.train_data, self.train_labels)
        self.next(self.choose_model)

    @step
    def train_svm(self):
        from sklearn import svm

        self.model = svm.SVC(kernel='poly')  # ✅ fixed
        self.model.fit(self.train_data, self.train_labels)
        self.next(self.choose_model)

    @step
    def choose_model(self, inputs):
        def score(inp):
            return inp.model, inp.model.score(inp.test_data, inp.test_labels)

        self.results = sorted(map(score, inputs), key=lambda x: -x[1])
        self.model = self.results[0][0]
        self.next(self.end)

    @step
    def end(self):
        print("Scores:")
        print("\n".join("%s %f" % res for res in self.results))

if __name__ == '__main__':
    ClassifierTrainFlow()

What are the final steps to complete your Metaflow-based ML application?

With the training flow (ClassifierTrainFlow) now producing a valid model, the prediction flow (ClassifierPredictFlow) can be finalized with just one additional line.


Update to ClassifierPredictFlow

Add this line to the end step to make a prediction:

print("Predicted class", self.model.predict([self.vector])[0])

Final version of the flow:

from metaflow import FlowSpec, step, Flow, Parameter, JSONType

class ClassifierPredictFlow(FlowSpec):

    vector = Parameter('vector', type=JSONType, required=True)

    @step
    def start(self):
        run = Flow('ClassifierTrainFlow').latest_run
        self.train_run_id = run.pathspec
        self.model = run['end'].task.data.model
        print("Input vector", self.vector)
        self.next(self.end)

    @step
    def end(self):
        print("Model", self.model)
        print("Predicted class", self.model.predict([self.vector])[0])

if __name__ == '__main__':
    ClassifierPredictFlow()

Test the prediction

Run the flow with a valid vector (13 attributes from the wine dataset):

python classifier_predict.py run --vector \
'[14.3,1.92,2.72,20.0,120.0,2.8,3.14,0.33,1.97,6.2,1.07,2.65,1280.0]'
  • Expected output:
    Predicted class 0
    

What if the predictions seem incorrect?

  • Use the Client API and Scikit-Learn’s model diagnostics:
    • Inspect .coef_, .feature_importances_, or use cross_val_score().
    • Load model artifacts in a notebook for exploration.

Summary of the final architecture

  1. Input step:

    • Loads and splits the dataset.
    • Stores training/testing data as artifacts.
  2. Parallel model training:

    • Trains KNN and SVM in parallel steps.
  3. Model selection:

    • Evaluates both models using test data.
    • Selects and stores the best model.
  4. Artifact inspection:

    • All intermediate and final data stored as artifacts.
    • Accessible in notebooks via the Client API.
  5. Prediction flow:

    • Loads the latest model and classifies new data using the same interface.

Real-world scalability preview

This architecture scales to production scenarios. For example:

  • Replace --vector with a CSV file (IncludeFile).
  • Handle larger data inputs, multiple models, and long training times (e.g., with foreach and parallel compute).
  • Use versioning, automation, and scheduling (covered in future chapters).

Questions answered in future

  • How to handle large-scale datasets (TBs).
  • How to scale up model training (e.g., 2,000 models).
  • How to organize long flows across multiple files.
  • How to schedule flows for production.
  • How to isolate runs and use different dependencies per run.