AI/ML notes

ch7.processing_data

Ch 7. Processing data

Introduction

What foundational issues should workflows address when handling data?

Workflows must solve how to consume and produce data efficiently and reliably. This includes:


1. What types of data sources might a workflow interact with?

  • Prototyping: local files (e.g., CSVs via IncludeFile)
  • Small/medium companies: relational databases (e.g., PostgreSQL)
  • Larger systems:
    • Federated query engines: Trino (Presto)
    • Data lakes: cloud-based storage with tools like:
      • Apache Flink (streaming)
      • Apache Iceberg (metadata handling)
      • Apache Spark (batch processing)

2. What data types and modalities should be considered?

  • Structured data: tables, columns, SQL queries
  • Unstructured data: free text, images, audio
  • Semi-structured data: JSON, logs, event streams (partially structured)

3. What are the core concerns for data access?

  • Performance:

    • Fast, efficient loading of large datasets
    • Avoid bottlenecks that slow down prototyping
  • Data Selection:

    • Find relevant subsets efficiently (typically via SQL)
    • Support for structured and semi-structured data
  • Feature Engineering:

    • Transform raw data into model-friendly formats
    • Preprocessing, encoding, aggregations, etc.

4. How often should the application react to new data?

  • Batch processing:

    • Triggered at intervals (e.g., every 15 minutes)
    • Simpler, easier to manage
  • Streaming:

    • Real-time, continuous updates
    • More complex infrastructure

5. Who owns the data access logic?

  • Varies by organization, but typical roles include:
    • Data engineers: ensure raw data is available and clean
    • Data scientists: explore and model the data

Why does this matter?

Efficient data access and transformation patterns directly impact:

  • Speed of iteration (prototyping loop)
  • Workflow reliability and scalability
  • Collaboration between engineering and data science teams

Although loading data from a data warehouse to a workflow seems like a technical detail, it impacts how teams work, how infrastructure is architected, and how scalable and efficient workflows become.


What is the traditional approach to data access in analytics?

  • Data stays inside the warehouse.
  • Applications express their needs via complex SQL queries.
  • The warehouse returns a small, filtered result to tools like Tableau dashboards.
  • Benefits:
    • Centralized control over data.
    • Efficient for business intelligence queries.
  • Limitations for ML:
    • Cannot run compute-heavy tasks like model training.
    • Tightly couples compute with the data layer, limiting scalability.

Why doesn’t this work well for machine learning?

  • ML applications often need simple but high-volume queries like SELECT * FROM table.
  • ML workloads require:
    • Bulk data access
    • Decoupling of compute from data
    • Autoscaling compute resources, such as GPU-backed instances
  • Problems encountered:
    • Data warehouses may be inefficient at handling large reads.
    • Client libraries often aren’t optimized for bulk transfer.

What is the case for decoupling data and compute?

  • Enables use of optimal compute layers (e.g., AWS Batch, GPUs).
  • Allows data scientists to iterate freely, without affecting production databases.
  • Reduces coordination overhead between teams.
  • Tradeoff: Loses centralized control over how data is used, increasing the need for proper data access management.

How do ML and BI applications differ in data access?

Characteristic BI (e.g., Tableau) ML Workflows
Query complexity High Low (SELECT *)
Result size Small Large (full table)
Optimized for Filtered insights Bulk ingestion
Performance bottleneck Query logic Data transfer/loading
Compute needs Light (local/client-side) Heavy (external, autoscaled compute)

S3

How does loading data from S3 compare to local files?

  • Preferred by data scientists: Local files are simple, fast, stable, and universally supported.
  • Downsides of local files:
    • Not suitable for cloud workflows.
    • Require manual updates.
    • Don’t adhere to governance policies.

Why use S3 instead?

  • Pros:

    • Cloud-compatible and governance-friendly.
    • Supports scalable workflows.
    • Can outperform local files, especially for large datasets.
  • Benchmark Summary (based on listing s3benchmark.py):

    • When dataset fits in memory, S3 loading uses disk cache, enabling very high read speeds.
    • Throughput depends on:
      • Instance size: Larger instances (e.g., m5n.24xlarge) = higher performance.
      • File size: Avoid many small files; prefer tens of MB or larger.
      • Region alignment: Run compute in same region as the S3 bucket to avoid latency and transfer costs.

Key code concepts from the S3 benchmark

  • Listing S3 files:

    files = list(s3.list_recursive([URL]))[:num]
  • Downloading multiple objects:

    loaded = s3.get_many([f.url for f in files])
  • Parallel file reading:

    parallel_map(lambda x: len(open(x, 'rb').read()), files)
  • Disk cache advantage:

    • S3-loaded files stay in memory (via OS-level disk cache) if dataset fits.
    • Local files are often read from disk, making them slower on under-provisioned machines.

Best practices for loading from S3

  • Use @resources to allocate enough memory for datasets to stay in RAM.
  • Use large files (tens of MB or more) for optimal S3 throughput.
  • Favor cloud-based instances for benchmarking or heavy workloads.
  • Prefer metaflow.S3 over ad hoc S3 clients for consistency and performance.
  • Keep compute and S3 storage in the same AWS region.

s3benchmark.py

This script benchmarks the performance of loading data from either:

  • Local files (--local_dir specified), or
  • S3 (via metaflow.S3) if no local directory is given.
import os
from metaflow import FlowSpec, step, Parameter, S3, profile, parallel_map

URL = "s3://commoncrawl/crawl-data/CC-MAIN-2021-25/segments/1623488519735.70/wet/"


def load_s3(s3, num):
    files = list(s3.list_recursive([URL]))[:num]
    total_size = sum(f.size for f in files) / 1024**3  # in GB
    stats = {}

    with profile("downloading", stats_dict=stats):
        loaded = s3.get_many([f.url for f in files])

    s3_gbps = (total_size * 8) / (stats["downloading"] / 1000.0)
    print("S3->EC2 throughput: %2.1f Gb/s" % s3_gbps)
    return [obj.path for obj in loaded]


class S3BenchmarkFlow(FlowSpec):

    local_dir = Parameter("local_dir", help="Read local files from this directory")

    num = Parameter("num_files", help="Maximum number of files to read", default=50)

    @step
    def start(self):
        with S3() as s3:
            with profile("Loading and processing"):
                if self.local_dir:
                    files = [
                        os.path.join(self.local_dir, f)
                        for f in os.listdir(self.local_dir)
                    ][: self.num]
                else:
                    files = load_s3(s3, self.num)

                print("Reading %d objects" % len(files))

                stats = {}
                with profile("reading", stats_dict=stats):
                    size = (
                        sum(parallel_map(lambda x:
                          len(open(x, "rb").read()), files)) / 1024**3
                    )  # in GB

                read_gbps = (size * 8) / (stats["reading"] / 1000.0)
                print("Read %2.1f GB. Throughput: %2.1f Gb/s" % (size, read_gbps))

        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    S3BenchmarkFlow()
  • profile: Measures timing for download and read phases.
  • parallel_map: Leverages multiple CPU cores to load data in parallel.
  • S3() context: Automatically handles cleanup of temporary files.
  • --local_dir: Optional CLI parameter to test local file performance.
  • @resources: Add this if running large workloads on Batch for better performance.

  • Run locally:

    python s3benchmark.py run --num_files 10
  • Run with cloud compute:

    python s3benchmark.py run --with batch:memory=16000
  • Run with local files:

    python s3benchmark.py run --num_files 10 --local_dir local_data


Recommendations

  • Use S3 for all scalable workflows.
  • Benchmark file loading realistically using appropriate instances.
  • Choose instance size based on dataset size to ensure memory fits.

Working with tabular data

  • Shifts focus from raw byte movement to structured/semi-structured tabular data, typically handled as dataframes.
  • Structured data is common in business data science, such as data from relational databases.

  • A tabular dataset with three columns: name, age, role, and three rows for employee records.
    • CSV (Comma-Separated Values): Plain text, one row per line, values separated by commas.
    • Parquet: A column-oriented binary format.

How do Parquet and CSV compare for structured data?

  • Column Storage:

    • Parquet stores each column independently.
    • Benefits include type-specific compression and efficient column-based querying.
  • Schema Handling:

    • Parquet includes an explicit schema and metadata.
    • CSV treats all values as strings, lacking schema awareness.
  • Efficiency:

    • Columnar layout in Parquet supports efficient access, e.g., SELECT name, role FROM table or SELECT AVG(age).
    • Parquet uses binary compression, reducing file size and transfer time.

How is Parquet read into memory and what library is used?

  • CSV: Read using Python’s built-in csv module.
  • Parquet: Read using Apache Arrow, which:
    • Decodes Parquet.
    • Offers efficient in-memory representation for processing.

How does the benchmark flow (ParquetBenchmarkFlow) operate?

import os
from metaflow import FlowSpec, step, conda_base, resources, S3, profile

URL = 's3://ursa-labs-taxi-data/2014/12/data.parquet'

@conda_base(python='3.8.10',
        libraries={'pyarrow': '5.0.0', 'pandas': '1.3.2', 'memory_profiler': '0.58.0'})
class ParquetBenchmarkFlow(FlowSpec):

    @step
    def start(self):
        import pyarrow.parquet as pq
        with S3() as s3:
            res = s3.get(URL)
            table = pq.read_table(res.path)
            os.rename(res.path, 'taxi.parquet')
        table.to_pandas().to_csv('taxi.csv')
        self.stats = {}
        self.next(self.load_csv, self.load_parquet, self.load_pandas)

    @step
    def load_csv(self):
        with profile('load_csv', stats_dict=self.stats):
            import csv
            with open('taxi.csv') as csvfile:
                for row in csv.reader(csvfile):
                    pass
        self.next(self.join)

    @step
    def load_parquet(self):
        with profile('load_parquet', stats_dict=self.stats):
            import pyarrow.parquet as pq
            table = pq.read_table('taxi.parquet')
        self.next(self.join)

    @step
    def load_pandas(self):
        with profile('load_pandas', stats_dict=self.stats):
            import pandas as pd
            df = pd.read_parquet('taxi.parquet')
        self.next(self.join)

    @step
    def join(self, inputs):
        for inp in inputs:
            print(list(inp.stats.items())[0])
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    ParquetBenchmarkFlow()
  • start:

    • Downloads Parquet from S3.
    • Reads it with PyArrow.
    • Converts to CSV using pandas for benchmarking.
  • load_csv:

    • Reads all rows from CSV with csv.reader.
  • load_parquet:

    • Loads data using PyArrow (read_table).
  • load_pandas:

    • Loads Parquet using pandas (read_parquet).
  • join:

    • Prints load timing from each method.
  • Run with:

    python parquet_benchmark.py --environment=conda run --max-workers 1
  • Use --max-workers 1 to ensure sequential execution for fair timing.

  • start step takes longer (downloads 319MB file, converts to 1.6GB CSV).

  • Use resume to skip slow start during iteration (e.g., resume load_csv).


What are the performance results?

  • load_parquet: Fastest (~973 ms).
  • load_pandas: Slower (~1853 ms).
  • load_csv: Slowest (~19.5 s). If data retained in memory, time jumps to ~70s and ~20GB RAM usage.

What is the takeaway regarding file format choice?

  • Recommendation: Use Parquet whenever feasible.
  • Exceptions:
    • Sharing small datasets.
    • Environments that can't handle Parquet.
    • Simpler inspection with CSV using standard tools (though Parquet viewers exist).

The in-memory data stack

What are the core components and relationships in the in-memory data stack?

  • Parquet is a data storage format that is more efficient than CSV for storing and transferring data.
  • Parquet files can be loaded efficiently from cloud storage like S3 using libraries such as metaflow.S3.
  • Arrow (specifically PyArrow in Python) decodes Parquet files into an efficient in-memory representation.
  • Arrow's in-memory data structures can be used directly or passed to other libraries like pandas and NumPy.
  • Arrow's superpower: avoids unnecessary data copying, enabling high-performance processing in memory-constrained or speed-critical workflows.
  • pandas is user-friendly and rich in data manipulation features but can be memory-intensive and slower compared to Arrow.
  • Depending on the task:
    • Use pandas for data wrangling tasks.
    • Use Arrow or NumPy when memory and performance are priorities.
    • Prefer direct consumption by ML libraries that accept Arrow or NumPy formats.

How can we profile memory usage in Metaflow steps?

  • Use the memory_profiler library to monitor memory consumption.
  • A decorator @profile_memory is defined to wrap Metaflow steps and record peak memory usage.
from functools import wraps

def profile_memory(mf_step):
    @wraps(mf_step)
    def func(self):
        from memory_profiler import memory_usage
        self.mem_usage = memory_usage(
            (mf_step, (self,), {}),
            max_iterations=1,
            max_usage=True,
            interval=0.2
        )
    return func
  • Save this in a file named metaflow_memory.py.
  • To use:
    • Import it: from metaflow_memory import profile_memory
    • Ensure memory_profiler is listed in @conda_base:
      @conda_base(libraries={'memory_profiler': '0.58.0'})
    • Apply to steps like:
      @profile_memory
      @step
      def load_csv(self):
          ...

How can memory usage be collected and displayed across branches?

  • Add the decorator to every branch where profiling is needed.
  • In the join step, access and print each branch’s memory usage:
@step
def join(self, inputs):
    for inp in inputs:
        print(list(inp.stats.items())[0], inp.mem_usage)
    self.next(self.end)
  • To simulate peak memory usage when reading CSV:
    • Use list(csv.reader(csvfile)) instead of looping through and discarding rows.
    • This may require >16 GB RAM.

What were the results of the CSV vs. Arrow vs. pandas memory benchmark?

  • Holding CSV data in memory as raw Python objects:
    • Consumed nearly 17 GB RAM.
  • Using pandas:
    • About 1 GB more memory than Arrow, due to additional Python-friendly structures.
  • Using Arrow:
    • Most memory-efficient, ideal for large datasets.

What is the memory-efficiency recommendation?

  • Avoid storing rows as Python objects when possible.
  • Converting to pandas should be avoided if memory is a concern.
  • Prefer Arrow or NumPy for memory- and performance-sensitive applications.

What to do when dataset size exceeds memory limits?

  • When data exceeds available instance memory (e.g., beyond @resources(memory=256000)):
    • Use upstream infrastructure like query engines to filter, join, and prepare data before processing.
    • This allows workflows to handle arbitrary volumes of raw data in manageable chunks.

Interfacing with data infrastructure

Why should data science workflows integrate with existing data infrastructure?

  • Most companies already have centralized data infrastructure to support analytics, product features, and other applications.
  • Consistency in data use across applications—including data science—is important.
  • Differences in usage:
    • Data science workflows often require large-scale data extracts (tens to hundreds of GB).
    • Compute demands are higher for model training compared to dashboards or analytics apps.
  • Despite differences, it's beneficial to align data science and data infrastructure to avoid duplication and reduce operational overhead.

What are the layers of modern data infrastructure?

Each component adds capability, starting from the core (data) outward:

  • Data: Stored as files (CSV, Parquet) or in databases; assumed to be already available.
  • Durable storage:
    • E.g., AWS S3, replicated databases.
    • Can be enhanced with metadata layers (e.g., Apache Hive, Iceberg) to support query engines.
  • Query engine:
    • Translates queries (e.g., SQL) into data subsets.
    • Traditional: tightly integrated stacks (Postgres, Teradata).
    • Modern: loosely coupled (Trino/Presto, Apache Spark); streaming options include Apache Druid, Pinot.
  • ETL / ELT systems:
    • Move and transform data between systems.
    • ELT model (e.g., Snowflake, Spark) loads raw data first, then transforms.
    • Tools: DBT (transform management), Great Expectations (data quality).
  • Workflow orchestrator:
    • Executes DAG-based pipelines (ETL or data science).
    • Examples: AWS Step Functions, Apache Airflow, Apache Flink (streaming).
  • Data management:
    • Tools like Amundsen for cataloging.
    • Governance, monitoring, and policy enforcement (e.g., access control, lineage tracking).

How does the data science infrastructure integrate with data infrastructure?

  • Integration increases over time as systems evolve.
  • Components often shared:
    • Job scheduling: Centralized orchestrators can manage both data and data science workflows (e.g., triggering model retraining after data updates).
    • Data versioning: Data catalogs can support lineage from raw data to models (e.g., tracking with Metaflow artifacts).
    • Monitoring: Source data monitoring can prevent model failure.
    • Feature engineering: Catalogs help discover usable features; some double as feature stores.
  • Python libraries often act as connectors (e.g., AWS Data Wrangler).
  • Components should be adopted incrementally, based on evolving needs.

How can work be divided between data engineers and data scientists?

  • Data Engineer responsibilities:

    • Acquire and curate raw data.
    • Ensure data quality, validity, and structure.
    • Produce stable, authoritative datasets (e.g., structured tables).
    • Should avoid making interpretive transformations; focus on raw facts.
  • Data Scientist responsibilities:

    • Build, deploy, and maintain data science workflows and models.
    • Create project-specific tables/views based on curated datasets.
    • Can iterate quickly and adjust feature engineering independently.
    • Responsible for performance-sensitive aspects like partitioning and layout.

Why is modern infrastructure critical to enable this division?

  • Query engines must be robust enough to tolerate less optimized queries from non-experts.
  • In the past, data warehouses could be easily overloaded by inefficient queries.
  • Modern systems isolate and handle workloads gracefully, allowing safe use by both engineers and scientists.

Preparing datasets in SQL

  • Use SQL engines (e.g., Trino, Spark) or warehouses (e.g., Redshift, Snowflake) to prepare datasets for data science workflows.
  • Demonstrate dataset preparation using AWS Athena, a managed query engine compatible with the Apache Hive metastore.
  • Use CTAS (Create Table As Select) queries to create optimized subsets of data for efficient consumption.
  • Employ AWS Data Wrangler to interface with Athena and simplify reading/writing from AWS services.

Dataset preparation

  • Upload source data to S3 and register it as a table with schema metadata.

  • Athena metadata format is compatible with Apache Hive.

  • Partitioning strategy is used to improve query performance by organizing data into subdirectories by month.

    Example S3 path structure:

    s3://my-metaflow-bucket/metaflow/data/TaxiDataLoader/12/nyc_taxi/month=11/file.parquet
    
    • Only the suffix month=11 is crucial for partitioning.
    • Prefix paths will differ depending on Metaflow configuration.

  • Ensure IAM user or Batch role has AmazonAthenaFullAccess policy.
  • Set AWS region via AWS_DEFAULT_REGION (e.g., us-east-1).

  • Load a year’s worth of data (~160 million rows).
  • Partition the table by month using directory structure for optimized querying.
  • Convert schema from Parquet to Hive-compatible format.

How does the Metaflow TaxiDataLoader workflow implement this logic?

from metaflow import FlowSpec, Parameter, step, conda, profile, S3

GLUE_DB = 'dsinfra_test'
URL = 's3://ursa-labs-taxi-data/2014/'

TYPES = {'timestamp[us]': 'bigint', 'int8': 'tinyint'}

class TaxiDataLoader(FlowSpec):

    table = Parameter('table', help='Table name', default='nyc_taxi')

    @conda(python='3.8.10', libraries={'pyarrow': '5.0.0'})
    @step
    def start(self):
        import pyarrow.parquet as pq

        def make_key(obj):
            key = '%s/month=%s/%s' % tuple([self.table] + obj.key.split('/'))
            return key, obj.path

        def hive_field(f):
            return f.name, TYPES.get(str(f.type), str(f.type))

        with S3() as s3down:
            with profile('Dowloading data'):
                loaded = list(map(make_key, s3down.get_recursive([URL])))
            table = pq.read_table(loaded[0][1])
            self.schema = dict(map(hive_field, table.schema))

            with S3(run=self) as s3up:
                with profile('Uploading data'):
                    uploaded = s3up.put_files(loaded)
                key, url = uploaded[0]
                self.s3_prefix = url[:-(len(key) - len(self.table))]
        self.next(self.end)

    @conda(python='3.8.10', libraries={'awswrangler': '1.10.1'})
    @step
    def end(self):
        import awswrangler as wr
        try:
            wr.catalog.create_database(name=GLUE_DB)
        except:
            pass
        wr.athena.create_athena_bucket()
        with profile('Creating table'):
            wr.catalog.create_parquet_table(
                database=GLUE_DB,
                table=self.table,
                path=self.s3_prefix,
                columns_types=self.schema,
                partitions_types={'month': 'int'},
                mode='overwrite'
            )
            wr.athena.repair_table(self.table, database=GLUE_DB)

if __name__ == '__main__':
    TaxiDataLoader()
  • start step:

    • Copies and reorganizes Parquet files to use partitioning structure.
    • Extracts and transforms schema from Parquet to Hive-compatible format.
  • end step:

    • Creates a Glue database (dsinfra_test) if it doesn’t exist.
    • Sets up Athena result bucket.
    • Registers the table using awswrangler.catalog.create_parquet_table.
    • Calls repair_table to register partitions.
python taxi_loader.py --environment=conda run
  • Prefer running on AWS Batch or a cloud workstation due to ~4.2GB data volume.
  • Verify in Athena console:
    • Table nyc_taxi should appear under dsinfra_test.
    • Run test query:
      SELECT * FROM nyc_taxi LIMIT 10;

How can project-specific CTAS queries be integrated into a workflow?

  • Avoid decoupling SQL logic from data science workflows by executing queries inside flows.
  • Use a CTAS (Create Table As Select) approach within the flow to generate result tables dynamically.
  • Store SQL in external .sql files for better IDE support and maintainability.

What does the sample SQL file (sql/taxi_etl.sql) look like?

SELECT * FROM nyc_taxi
WHERE hour(from_unixtime(pickup_at / 1000)) BETWEEN 9 AND 17
  • Converts pickup_at from milliseconds to seconds.
  • Filters for business-hour rides (9 a.m. to 5 p.m.).

How is the CTAS query executed in the flow?

from metaflow import FlowSpec, project, profile, S3, step, current, conda

GLUE_DB = 'dsinfra_test'

@project(name='nyc_taxi')
class TaxiETLFlow(FlowSpec):

    def athena_ctas(self, sql):
        import awswrangler as wr
        table = 'mf_ctas_%s' % current.pathspec.replace('/', '_')
        self.ctas = "CREATE TABLE %s AS %s" % (table, sql)
        with profile('Running query'):
            query = wr.athena.start_query_execution(self.ctas, database=GLUE_DB)
            output = wr.athena.wait_query(query)
            loc = output['ResultConfiguration']['OutputLocation']
            with S3() as s3:
                return [obj.url for obj in s3.list_recursive([loc + '/'])]

    @conda(python='3.8.10', libraries={'awswrangler': '1.10.1'})
    @step
    def start(self):
        with open('sql/taxi_etl.sql') as f:
            self.paths = self.athena_ctas(f.read())
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    TaxiETLFlow()
  • athena_ctas method:

    • Converts the SQL string into a CTAS query.
    • Runs it using AWS Wrangler.
    • Waits for query completion and stores output file paths in self.paths.
  • start step:

    • Reads SQL file content.
    • Runs the CTAS process and stores results.
  • Artifacts stored:

    • self.ctas: the executed SQL statement.
    • self.paths: list of output file locations.

How do you run the flow and ensure .sql file inclusion?

python taxi_etl.py --environment=conda --package-suffixes .sql run
  • --package-suffixes .sql ensures SQL files are bundled with the flow.

How does this pattern support traceability and debugging?

  • CTAS table names include the run and task IDs, e.g., mf_ctas_TaxiETLFlow_1631494834745839_start_1.
  • By logging:
    • The SQL query (self.ctas)
    • The result locations (self.paths)
  • Enables full data lineage: from raw data → queries → results → models.

How does versioned storage work with S3(run=self)?

  • Automatically stores data in run-specific paths.
  • Enables:
    • Isolation across runs.
    • Versioning without collisions.
  • Facilitates reproducibility and traceability of data across runs.

What are best practices for managing old CTAS query results?

  • S3 Lifecycle Policy:

    • Configure automatic deletion of objects at result paths (e.g., after 30 days).
  • Glue Table Cleanup:

    • Delete table metadata manually or via:
      aws glue batch-delete-table
    • Or programmatically using AWS Data Wrangler:
      wr.catalog.delete_table(database="dsinfra_test", table="table_name")

Miscellaneous Notes

  • You can replicate the CTAS pattern with Spark, Redshift, or Snowflake.
  • Suitable for integrating scalable query processing into data science workflows.
  • Compatible with production schedulers for routine ETL tasks.

Distributed data processing

  • Single-instance loading (e.g., via Parquet + Arrow + NumPy) is fast and efficient when data fits in memory.
  • Limits arise when:
    • Dataset size exceeds memory.
    • Processing becomes too slow on one machine.
  • In such cases, distribute computation across multiple instances using Metaflow’s foreach.

What are common distributed data processing use cases?

  1. Subset-specific processing (e.g., per country, per hour).
  2. Efficient data preprocessing:
    • Extract with SQL (e.g., via CTAS).
    • Preprocess further in Python.
  3. Sharded data processing:
    • Input split into chunks.
    • Processed in parallel.
    • Results aggregated in a join step (MapReduce style).

How does the helper function for visualization work?

from io import BytesIO

CANVAS = {
    'plot_width': 1000,
    'plot_height': 1000,
    'x_range': (-74.03, -73.92),
    'y_range': (40.70, 40.78)
}

def visualize(lat, lon):
    from pandas import DataFrame
    import datashader as ds
    from datashader import transfer_functions as tf
    from datashader.colors import Greys9

    canvas = ds.Canvas(**CANVAS)
    agg = canvas.points(DataFrame({'x': lon, 'y': lat}), 'x', 'y')
    img = tf.shade(agg, cmap=Greys9, how='log')
    img = tf.set_background(img, 'white')

    buf = BytesIO()
    img.to_pil().save(buf, format='png')
    return buf.getvalue()
  • Plots millions of GPS coordinates efficiently.
  • Uses logarithmic shading for better contrast.
  • Returns an in-memory PNG image as bytes.

How does the distributed visualization flow (TaxiPlotterFlow) work?

from metaflow import FlowSpec, step, conda, Parameter, S3, resources, project, Flow
import taxiviz

URL = 's3://ursa-labs-taxi-data/2014/'
NUM_SHARDS = 4

def process_data(table):
    return table.filter(table['passenger_count'].to_numpy() > 1)

@project(name='taxi_nyc')
class TaxiPlotterFlow(FlowSpec):

    use_ctas = Parameter('use_ctas_data', help='Use CTAS data', default=False)

    @conda(python='3.8.10')
    @step
    def start(self):
        if self.use_ctas:
            self.paths = Flow('TaxiETLFlow').latest_run.data.paths
        else:
            with S3() as s3:
                objs = s3.list_recursive([URL])
                self.paths = [obj.url for obj in objs]

        print("Processing %d Parquet files" % len(self.paths))
        n = round(len(self.paths) / NUM_SHARDS)
        self.shards = [self.paths[i*n:(i+1)*n] for i in range(NUM_SHARDS - 1)]
        self.shards.append(self.paths[(NUM_SHARDS - 1) * n:])
        self.next(self.preprocess_data, foreach='shards')

    @resources(memory=16000)
    @conda(python='3.8.10', libraries={'pyarrow': '5.0.0'})
    @step
    def preprocess_data(self):
        with S3() as s3:
            from pyarrow.parquet import ParquetDataset
            if self.input:
                objs = s3.get_many(self.input)
                orig_table = ParquetDataset([obj.path for obj in objs]).read()
                self.num_rows_before = orig_table.num_rows
                table = process_data(orig_table)
                self.num_rows_after = table.num_rows
                print('selected %d/%d rows' % (self.num_rows_after, self.num_rows_before))
                self.lat = table['pickup_latitude'].to_numpy()
                self.lon = table['pickup_longitude'].to_numpy()
        self.next(self.join)

    @resources(memory=16000)
    @conda(python='3.8.10', libraries={'pyarrow': '5.0.0', 'datashader': '0.13.0'})
    @step
    def join(self, inputs):
        import numpy
        lat = numpy.concatenate([inp.lat for inp in inputs])
        lon = numpy.concatenate([inp.lon for inp in inputs])
        print("Plotting %d locations" % len(lat))
        self.image = taxiviz.visualize(lat, lon)
        self.next(self.end)

    @conda(python='3.8.10')
    @step
    def end(self):
        pass

if __name__ == '__main__':
    TaxiPlotterFlow()
  • start step:

    • Selects CTAS output or raw dataset.
    • Shards paths into 4 roughly equal groups.
  • preprocess_data step:

    • Loads and filters Parquet data (e.g., trips with >1 passenger).
    • Stores only lat/lon as NumPy arrays for efficiency.
  • join step:

    • Merges coordinate arrays.
    • Generates PNG using visualize().
python taxi_plotter.py --environment=conda run --use_ctas_data=True
  • Omitting --use_ctas_data=True uses raw S3 data.

View output in a Jupyter notebook:

from metaflow import Flow
from IPython.display import Image
run = Flow('TaxiPlotterFlow').latest_run
Image(run.data.image)

What design principles does this flow illustrate?

  • Namespace isolation with @project and Flow(...).latest_run:

    • Ensures downstream flows access the correct upstream version.
  • Custom UDF processing (process_data):

    • Simplifies logic outside SQL.
    • Acts like a flexible SQL UDF.
  • Efficient artifact usage:

    • Stores NumPy arrays instead of full datasets.
    • Speeds up storage, retrieval, and merging.

#$## What alternative distributed tools could be used?

  • Dask:

    • Pandas-like syntax.
    • Automatic sharding and parallelism.
    • Adds infrastructure overhead.
  • PySpark:

    • Similar benefits and tradeoffs as Dask.
    • Greater complexity in deployment and debugging.
  • Note: These tools can be invoked from within Metaflow steps if a cluster is available.


Miscellaneous Notes

  • Distributing data via foreach aligns with the MapReduce model (Map = preprocess_data, Reduce = join).
  • Datashader handles overplotting gracefully, making it suitable for dense geospatial visualizations.
  • The approach demonstrates how scalable infrastructure and Python-based flexibility can coexist in large-scale data science pipelines.

From data to features

What is the interface between data and models, and how does feature engineering fit in?

  • Data science workflows often require general-purpose data processing before modeling starts.
  • Feature engineering bridges raw data (facts) and models, transforming objective observations into model-relevant inputs.
  • Unlike foundational infrastructure (e.g., compute, storage), feature engineering benefits from domain-specific flexibility.
  • Recommendation: Build or adopt domain-specific feature layers that sit on top of foundational infrastructure without competing with it.

What is the difference between facts and features?

  • Facts:

    • Direct, objective observations from reality (e.g., a user clicks "Play").
    • Collected by data engineers with minimal interpretation.
    • Stored in reliable fact tables for reuse across projects.
    • Can be biased or noisy but represent raw events as-is.
  • Features:

    • Interpretations or transformations of facts, crafted for use in models.
    • Created by data scientists and iterated based on modeling results.
    • Numerous valid features can be derived from the same fact.
    • Tailored to the use case and modeling task.

Why is distinguishing facts from features practically important?

  • Divides responsibilities:

    • Data engineers handle infrastructure and data integrity.
    • Data scientists focus on experimentation and modeling logic.
  • Enables iteration:

    • Reliable facts allow repeatable extraction of different feature sets.
    • Supports fast prototyping and evaluation cycles.
  • Encourages modularity:

    • Fact tables are centralized and reusable.
    • Feature generation happens downstream in workflows (e.g., using foreach and Python UDFs).

How do facts and features relate to workflow patterns in this chapter?

  • Fact storage:

    • Data engineers maintain high-quality, centralized fact tables.
  • CTAS pattern:

    • Data scientists extract project-specific views from fact tables using SQL.
  • MapReduce-style workflows:

    • Feature extraction and transformation occur in parallel (e.g., using foreach in Metaflow).

What are suggested tools and patterns for working with features?

  • Start with a simple, baseline solution using native workflows (e.g., Metaflow, SQL, Python).
  • Consider integrating feature stores, labeling services, or custom libraries as needs grow.
  • Ensure:
    • Easy access to facts
    • Fast feature iteration
    • Versioning support (e.g., via Metaflow) to maintain traceability of data and feature sets.

Miscellaneous Notes

  • There is no one-size-fits-all solution for feature engineering.
  • Different data types (e.g., tabular, audio, time series) require different approaches.
  • Domain expertise plays a critical role in choosing and transforming the right features.

Encoding features

What is feature encoding, and why is separating it from modeling important?

  • Feature encoding (or featurization) transforms raw data (facts) into features usable by models.
  • Done through feature encoder functions, which can number in the dozens or hundreds per project.
  • Separation of featurization from modeling:
    • Ensures maintainability and offline-online consistency (same encoders used during training and inference).
    • Allows versioning and repeatability of ML workflows.

Why is managing time in feature encoding important?

  • Prevents data leakage in time-series/backtesting scenarios by enforcing correct train-test splits.
  • Enables modeling strategies like:
    • Backtesting using a historical cut-off point.
    • Concept drift detection by tracking how target feature statistics evolve.
  • A well-structured encoding pipeline should respect time boundaries and modeling context.

Why is Python-based feature encoding sometimes preferable to SQL?

  • Faster iteration: Testing assumptions (e.g., 2% outlier thresholds) is easier in Python.
  • Complex logic: Multivariate outlier filtering and random sampling are simpler and more concise in Python.
  • Performance: Leveraging Apache Arrow and NumPy avoids conversion to pandas, keeping computation fast and memory-efficient.

What utility functions support flexible featurization?

def filter_outliers(table, clean_fields):
    import numpy
    valid = numpy.ones(table.num_rows, dtype='bool')
    for field in clean_fields:
        column = table[field].to_numpy()
        minval = numpy.percentile(column, 2)
        maxval = numpy.percentile(column, 98)
        valid &= (column > minval) & (column < maxval)
    return table.filter(valid)

def sample(table, p):
    import numpy
    return table.filter(numpy.random.random(table.num_rows) < p)
  • Filters top/bottom 2% of values in selected fields.
  • Uniformly samples rows with probability p.

How is modeling and visualization handled?

def fit(features):
    from sklearn.linear_model import LinearRegression
    d = features['trip_distance'].reshape(-1, 1)
    model = LinearRegression().fit(d, features['total_amount'])
    return model

def visualize(model, features):
    import matplotlib.pyplot as plt
    from io import BytesIO
    import numpy
    maxval = max(features['trip_distance'])
    line = numpy.arange(0, maxval, maxval / 1000)
    pred = model.predict(line.reshape(-1, 1))
    plt.rcParams.update({'font.size': 22})
    plt.scatter(data=features,
                x='trip_distance',
                y='total_amount',
                alpha=0.01,
                linewidth=0.5)
    plt.plot(line, pred, linewidth=2, color='black')
    plt.xlabel('Distance')
    plt.ylabel('Amount')
    fig = plt.gcf()
    fig.set_size_inches(18, 10)
    buf = BytesIO()
    fig.savefig(buf)
    return buf.getvalue()
  • Fits a linear regression model on distance vs. amount.
  • Visualizes the scatterplot with a fitted regression line.

How does the TaxiRegressionFlow demonstrate feature encoding in a workflow?

from metaflow import FlowSpec, step, conda, Parameter, \
    S3, resources, project, Flow

URLS = ['s3://ursa-labs-taxi-data/2014/10/',
        's3://ursa-labs-taxi-data/2014/11/']
NUM_SHARDS = 4
FIELDS = ['trip_distance', 'total_amount']

@conda_base(python='3.8.10')
@project(name='taxi_nyc')
class TaxiRegressionFlow(FlowSpec):

    sample = Parameter('sample', default=0.1)
    use_ctas = Parameter('use_ctas_data', help='Use CTAS data', default=False)

    @step
    def start(self):
        if self.use_ctas:
            self.paths = Flow('TaxiETLFlow').latest_run.data.paths
        else:
            with S3() as s3:
                objs = s3.list_recursive(URLS)
                self.paths = [obj.url for obj in objs]

        print("Processing %d Parquet files" % len(self.paths))
        n = max(round(len(self.paths) / NUM_SHARDS), 1)
        self.shards = [self.paths[i*n:(i+1)*n] for i in range(NUM_SHARDS - 1)]
        self.shards.append(self.paths[(NUM_SHARDS - 1) * n:])
        self.next(self.preprocess_data, foreach='shards')

    @resources(memory=16000)
    @conda(libraries={'pyarrow': '5.0.0'})
    @step
    def preprocess_data(self):
        from table_utils import filter_outliers, sample
        self.shard = None
        with S3() as s3:
            from pyarrow.parquet import ParquetDataset
            if self.input:
                objs = s3.get_many(self.input)
                table = ParquetDataset([obj.path for obj in objs]).read()
                table = sample(filter_outliers(table, FIELDS), self.sample)
                self.shard = {field: table[field].to_numpy() for field in FIELDS}
        self.next(self.join)

    @resources(memory=8000)
    @conda(libraries={'numpy': '1.21.1'})
    @step
    def join(self, inputs):
        from numpy import concatenate
        self.features = {}
        for f in FIELDS:
            shards = [inp.shard[f] for inp in inputs if inp.shard]
            self.features[f] = concatenate(shards)
        self.next(self.regress)

    @resources(memory=8000)
    @conda(libraries={'numpy': '1.21.1',
                      'scikit-learn': '0.24.1',
                      'matplotlib': '3.4.3'})
    @step
    def regress(self):
        from taxi_model import fit, visualize
        self.model = fit(self.features)
        self.viz = visualize(self.model, self.features)
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    TaxiRegressionFlow()
  • Supports CTAS-based or raw S3 data.
  • Uses foreach to parallelize preprocessing.
  • Filters and samples data shards.
  • Aggregates feature arrays for model fitting.
  • Trains and visualizes a regression model.

python taxi_regression.py --environment=conda run --use_ctas_data=True
  • Omit --use_ctas_data=True to use raw data.
  • Run takes ~1–2 minutes depending on sample size and instance size.

What are the key lessons from this example?

  • Demonstrates a flexible and performant feature pipeline using:
    • Query engines for extraction (CTAS).
    • Python for encoding (Arrow + NumPy).
    • Scikit-learn for modeling.
  • Encodes features using a MapReduce-style pattern with foreach and join.
  • Model results and visualizations are versioned in Metaflow.

Miscellaneous Notes

  • This example is on the flexibility-focused end of the spectrum; not all best practices (e.g., online-offline consistency) are enforced.
  • In Chapter 9, the pipeline will be expanded with more robust and extensible feature engineering patterns.
  • Starting with a flexible system helps rapid iteration. Rigid systems for correctness can be introduced later as needed.