“Scaling Machine Learning with Spark” Author Interview and Reading Notes
📚 Some people may know that I host a DS/ML book club where we read one book per month. This month, we read ‘Scaling Machine Learning with Spark’ by Adi Polak. This book covers distributed ML systems, Spark/PySpark basics, MLflow, Spark MLlib, and deep learning frameworks PyTorch and TensorFlow.
In this blog post, I’d like to share an interview we had with the author that’s full of insights. I’ll also be sharing my personal reading notes for those who are curious about this book.
🔗 Book link: https://amzn.to/3scTtcn
🔗 Github repo: https://github.com/adipolak/ml-with-apache-spark
Chapter 1 Distributed Machine Learning Terminology and Concepts
The Stages of the Machine Learning Workflow:
- collect and load/ingest data; explore and validate the data; clean/preprocess the data; extract features/perform feature engineering; split the data into a training set and a validation set; train and tune the model; evaluate the model with test data; deploy the model; monitor the model
Tools and Technologies in the Machine Learning Pipeline
Distributed Computing Models
- general-purpose models: MapReduce: in practice, every task is split into multiple map and reduce functions, preserve data locality as much as possible; MPI: a message passing Interface that models a parallel program running on a distributed-memory system, standardize the communication, low-level standard; Barrier: a synchronization method, make a group of machines stop at a certain point and wait for the rest to finish; Shared memory.
- dedicated distributed computing models: parameter server
Introduction to Distributed Systems Architecture:
- network topologies: physical topologies describe how the computers are arranged and connected; logical topologies describe the way data flows in the system and how the computers exchange information over the network.
- centralized system: all nodes depend on a single node to make decisions; decentralized systems: nodes are independent and make their own decisions, more tolerant of faults, can benefit from a multicloud/hybrid cloud architecture
- interaction models: client/server; peer-to-peer; geo-distributed: aim to solve issues like data privacy and resource allocation.
- communication in a distributed setting: asynchronous: the underlying mechanism is a queue; synchronous.
Introduction to Ensemble Methods
- distributed training topologies: centralized ensemble learning: all requests from the distributed models go through main servers; decentralized decision trees; centralized, distributed training with parameter servers; centralized, distributed training in a P2P topology: gossip learning
The Challenges of Distributed Machine Learning Systems
- performance: data parallelism: data split into shards or partitions, which are distributed among nodes; the same code/logic is also distributed to all machines; model parallelism: the model itself is split and distributed across machines; combining data parallelism and model parallelism
- resource management
- fault tolerance: many distributed computation frameworks have a built-in procedure — replicate the data and writing info to disk between stages for faster recovery; faulty agents/adversaries
- privacy: avoid the restrictions of centralizing data, federated learning,
- portability
Chapter 2 Introduction to Spark and PySpark
Apache Spark Architecture:
- Driver program/Spark driver: execute the SparkSession, which encapsulates the SparkContext. Hold DAG scheduler, task scheduler, block manager, and everything needed to turn code into jobs.
- Executor: a process launched for a particular Spark application on a worker node. A JVM process communicates with the cluster manger and receives tasks to execute.
- Worker node: responsible for executing the work.
- Cluster manager: orchestrate the distributed system
Intro to PySpark:
- Python (code), JVM (query optimization, computation, distribution of tasks to clusters), _gateway (pass the Py4J application to a JVM Spark server)
- Py4J: written in Python and Java that enables Python programs running in a Python interpreter to dynamically access Java in a JVM.
Apache Spark Software architecture:
- API/libraries: work directly with Spark’s DataFrame or Dataset APIs; custom schema with StructType, save schema to a JSON file; MLlib, GraphFrames, Structured Streaming
- DataFrames, Datasets, and RDDs are immutable storage
- lazy execution
PySpark and Functional Programming:
- Spark borrows many concepts from functional programming like anonymous function
- Two main principles: immutability, disciplined state
- The PySpark shell is responsible for linking the Python API to the Spark core and initializing the Spark Session and Spark Context
pandas DataFrames vs. Spark DataFrames
- pandas is not built for scale, it doesn’t have the distributed Spark architecture. It also doesn’t adhere to functional programming principles: pandas DataFrames are mutable. It doesn’t support lazy evaluations.
Scikit-Learn vs. MLlibL:
- scikit-learn: mutable datasets, data must fit in memory, model can be pickled to disk and reloaded via REST
- MLlib: immutable datasets, distributed, supports parquet and snappy file formats
Chapter 3 Managing the Machine Learning Experiment Lifecycle with MLflow
Machine Learning Lifecycle
- model development, model validation, wrapping model and testing in staging, model deployment and monitoring, archiving models
- software development lifecycle (SDLC): planning, analysis, design, development, testing, implementation, maintenance
- Machine Learning Lifecycle management requirement: reproducibility, code version control, data version control (lakeFS, DVC)
MLflow
- Software components of the MLflow platform: storage (support for connecting multiple storage types), backend server (communicate info from database, UI, SDK, CLI to the rest of the components), frontend, API and CLI
- Four main components: MLflow tracking, MLflow projects, MLflow models, MLflow model registry
MLflow Tracking:
- record runs, mlflow.log_param/log_metric/log_artifact/log_model;
- autolog logs the whole lifecycle
- Logging the dataset path and version together with the model name and path during training, use log_param
- set tags on the run
MLflow Projects
- standard format for packaging code
- a directory of a set of code files and a descriptor YAML file to specify its dependencies and how to run the code
MLflow models
- The MLflow Models component is used for packaging machine learning models in multiple formats, called flavors.
- MLflow Models provides several standard flavors that MLflow’s built-in deployment tools support, such as a python_function flavor that describes how to run the model as a Python function.
MLflow Model Registry
- centralized storage for models
- transitioning between stages: MLflowClient API transition_model_version_stage allows us to update the model stage and invoke the desired CI/CD scripts
Using MLflow at Scale
- Two components for storage: a backend store for information about experiments and an artifact store for storing the models themselves and any related files, objects, model summaries etc.
- You can store experiment-related entities in a database like PostgresSQL or SQLite
- You can run MLflow on local machine and tracking server and stores reside remotely
Chapter 4 Data Ingestion, Preprocessing, and Descriptive Statistics
Data Ingestion with Spark
- work with batch and streaming data with the Spark batch API and either the legacy DStream API or the new Structured Streaming API.
- spark.read.format
- work with image files: MLlib converts compressed images into the OpenCV data format. It’s recommended to use the more efficient binary format.
Preprocessing Data
- structured data, semi-structured data, unstructured
- MLlib data types: vector (VectorUDT: DesnseVector, SparseVector), matrix (MatrixUDF)
- MLlib Transformers: pyspark.ml.feature, text data transformers (Tokenizer, RegexTokenizer, HashingTF, NGram, StopWordsRemover), categorical feature transformers (StringIndexer, IndexToString, OneHotEncoder, VectorIndexer), continuous numerical transformers (Binarizer, Bucketizer, MaxAbsScaler, MinMaxScaler, Noramlizer, Quantile Discretizer, RobustScaler, StandardScaler), and others (DCT, Elementwise product, Imputer, Interaction, PCA, Polynomial Expansion, SQL Transformer, Vector Assembler)
- image data: extract labels, transform labels to indices, extract image size (Pillow, @pandas_udf — a pandas user-defined function that can run in a distributed fashion on our Spark executor, optimized with Apache Arrow and faster for grouped operations)
- save the data
- avoid the small files problem: a small files is any file that’s significantly smaller than the storage block size; repartition: entirely new partitions, shuffling the data over the network with the goal of distributing it evenly over the specified number of partitions; coalesce: detects the existing partitions and then shuffles only the necessary data, only reduce the number of partitions
Descriptive Statistics
- from pyspark.ml.stat import Summarizer
- from pyspark.ml.feature import VectorAssembler
- from pyspark.sql.functions import skewness
- from pyspark.ml.stat import Correlation
Chapter 5 Feature Engineering
- handling missing data, exacting features from text, categorical encoding, feature scaling/data normalization
MLlib Featurization Tools:
- extractors: TF-IDF, Word2Vec, Count Vectorizer, Feature Hasher
- selectors: vector slider, RFomula, ChiSqSelector, Univariate Feature Selector
The Image Featurization Process:
- RGB image vs Grayscale
- defining image boundaries using image gradients: Laplace highlights areas of rapid intensity change
Extracting Features with Spark APIs
- Spark pandas UDF types: SCALAR, SCALAR_ITER, GROUPED_MAP, GROUPED_AGG
- applyInPandas and mapInPandas operate over a grouped Spark DataFrame and return a new DataFrame
- Compared to pandas DataFrame, Spark DataFrame supports parallel execution, lazy operation, and is immutable.
The Text Featurization Process
- bag-of-words, tf-idf, n-gram, topic extraction
Feature stores
- 2017 Uber introduced feature stores with its ML platform Michelangelo
- store features in optimized, relatively fast storage, with comprehensive metadata to allow fast access and low-latency querying by the ml algorithm.
- help with caching, deal with data types, and bridge multiple platforms
Chapter 6 Training Models with Spark MLlib
Supervised ML
Classification:
- binary, multiclass, multilabel
- APIs: LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier, LinearSVC, OneVsRest, NaiveBayes, FMClassifier
- Multilabel classifier: not supported by MLlib; use PyTorch/Tensorflow; train binary classifiers for each label
- imbalanced class labels: filter the more representative classes and sample them to downsize the number of entries; use ensemble algorithms based on decision trees such as GBTClassifier, GBTRegressor, RandomForestClassifier and set featureSubsetStrategy to auto.
Regression:
- simple, multiple, multivariate
- FeatureHasher: to speed up the process of indexing and hashing; of type SparseVector
- UnivariateFeatureSelector: select features automatically
- AFTSurvivalRegression, DecisionTreeRegressor, GBTRegressor, FMRegression
Recommendation Systems
- Content-based, Collaborative filtering, Neural networks
- ALS for collaborative filtering
Unsupervised ML
- Frequent Pattern Mining
- Clustering: LDA, GaussianMixture, KMeans, BisectionKMeans, PowerIterationClustering (Lin and Cohen algorithm); compare the likelihood scores between models.
Evaluating
- Supervised Evaluators: confusion matrix; BinaryClassificationEvaluator, MulticlassClassificiationEvaluator (hammingLoss); MultilabelClassificationEvaluator (microPrecision, microRecall), RegressionEvaluator (mse, rmse), RankingEvaluator (RankingMetrics API)
- Unsupervised Evaluators: ClusteringEvaluator computes Silhouette measure.
Hyperparameters and Tuning Experiments
- Building a parameter grid: ParamGridBuilder()
- by default, TrainValidationSplit uses 75% data for training and 25% testing
- CrossValidator
Machine Learning Pipelines:
- Transformer and Estimator
- Constructing a pipeline with a Pipeline object
- Persistence: .write().save(path); you can also export model to a portable format like ONNX
Chapter 7 Bridging Spark and Deep Learning Frameworks
Two Clusters Approach
- One dedicated cluster for running all Spark workloads. Save the data into a distributed filesystem or object store and use in the second cluster — the dedicated DL cluster.
Implementing a Dedicated Data Access Layer (DAL)
- A DAL is responsible for the accessibility of the storage.
- Features of a DAL: Scalable and support distributed systems; discoverability with a dedicated data catalog like Hive Metastore; support columnar formats; allow for row filtering; data versioning
Petastorm
- Open source data access library that allows us to train and evaluate deep learning models directly using multiterabyte datasets in Apache Parquet format.
- support efficient implementations of row filtering, data sharding, shuffling, access a subset of fields, and handle time series data
- Two options to use Petastorm: 1) leverage it as a converter or translator and keep the data in strict Parquet format; 2) Integrate the Petastorm format with the Aparche Parquet store, save data into a dedicated Petastorm dataset
- SparkDatasetConverter: read data from cache or persist in Parquet. It caches intermediate files and cleans out the cache. Input: parquet_row_group_size_bytes, compression_codec, dtype. We can define additional preprocessing functions using TransformSpec.
- PetaStorm as a Parquet Store: define a new schema with UnischemaField; support various codec types for images, scalar data, etc.
Project Hydrogen
- improve Apache Spark’s support for deep learning/neural network distributed training
- Barrier Execution Mode: create gates or barriers between sets of operations and make the operations across barriers sequential
- Accelerator-Aware Scheduling: validate that the distributed system scheduler, responsible for managing the operations and available resources, is aware of the availability of GPUs for hardware acceleration
Horovod Estimator API
- Horovod’s goal is to allow for single-GPU training to be distributed across multiple GPUs
- The Horovod Estimator hides the complexity of gluing Spark DataFrames to a deep learning training script.
- Horovod helps us configure GPUs and define a distributed training strategy.
Chapter 8 TensorFlow Distributed Machine Learning Approach
Tensorflow Basics:
- tf.Tensor (immutable multidimensional array), tf.Variable (mutable), tf.Operation (a node in a graph that performs some computation), tf.function (annotation, compile a function into a callable TF graph), tf.Graph (graph executions — operations are added to tf.Graph to be executed later), tf.Module, model.compile (configure a model for training, specifying the loss function, graph optimizers and metrics)
- Neural Network: tf.keras.layers.Layer, tf.Keras.Model
- Tensorflow Cluster Process Roles and Responsibilities: worker, parameter server (PS — keep track of variables’ values and state), chief (similar to work but with additional responsibilities related to cluster health), evaluator
Loading Parquet Data into a TensorFlow DataSet
- tf.data.Dataset. A TF dataset is an abstraction of the actual data.
- TF doesn’t support loading Parquet out of the box.
- Petastorm: use the make_petastorm_dataset function that creates a tf.data.Dataset instance, using an instance of petastorm.reader.Reader.
- Use make_batch_reader to create the reader: normalize the dataset URL, locate filesystem paths, analyze the Parquet metadata, validate cache format, determine reader pool type, return the reader instance (encapsulate ArrowReaderWorker encapsulate pyarrow)
TensorFlow’s Distributed Machine Learning Strategies (tf.distribute.Strategy)
- ParameterServerStrategy: oldest approach, each machine takes on the role of a worker or a parameter server and TF divides the tasks into worker tasks and parameter server tasks. Rules: variables are stored on parameter servers and are read/updated by worker during training, each variable is stored on a single parameter server, workers perform tasks independently and communicate only with parameter servers with asynchronous remote procedure calls (RPCs), there may be one or multiple parameter servers.
- CentralStorageStrategy (one machine, multiple processors): one machine with multiple CPUs and GPUs where CPUs hold variables and GPUs execute operations.
- MirroredStrategy (one machine, multiple processors, local copy): every processor that holds a replica of the training operations’ logic also holds its own local copy of every variable. Use an all-reduce algorithm to ensure identical updates across processors.
- MultiWorkerMirroredStrategy (multiple machines, synchronous): synchronous training distributed across machines, each of which can have more than one processor. Each variable is replicated and synced across machines and processors. Good when there is good connectivity between machines.
- TPUStrategy: for synchronous training on TPUs and TPU pods, supports communication between individual processors within a machine.
Training APIs
- Keras API
- Custom Training Loop: TF’s custom training loop API provides granular control over training and evaluating loops built from scratch.
- Estimator API: This is an old API that should not be used for new code.
Chapter 9 PyTorch Distributed Machine Learning Approach
PyTorch Overview
- Computation Graph: dynamic; the graph is built at run time and execution starts before the graph is complete; automatic differentiation engine calculates the gradients automatically before the backward computation even starts
- PyTorch Mechanics and Concepts: torch.Tensor, torch.autograd, AutogradMeta, Variable, torch.layout, torch.mm, torch.utils.data.DataLoader, torch.optim.Optimizer
PyTorch Distributed Strategies for Training Models
- PyTorch’s Distributed Approach: torch.distributed, three main components: 1) distributed data-parallel training DDP: data parallelism at the application module level; 2) RPC-based distributed training: supports training process at a higher level and provides mechanisms to enable remote communication between machines; 3) collective communication c10d: expand the communication structure and support sending tensors across processes within a group
- Distributed Data-Parallel Training: divides communication into buckets, which holds the indices of where each value in the input belongs and can consist of multiple layers or one layer
- RPC-Based Distributed Training: enables a program on the local machine to start a program on another machine; enables the implementation of a parameter server, model parallelism, and pipeline parallelism; APIs can be grouped into four categories: remote execution (init_rpc), remote references RRefs (a distributed shared pointer to an object), distributed autograd (stitch together the local autograd engines on all the machines so the backward pass can be run in a distributed fashion), distributed optimizer
- Communication Topologies: collective communication is defined as any communication that involves a group of processes/machines; all_reduce, all_gather, peer-to-peer communication APIs; guidelines for which backend to use: rule of thumb use NCCL for distributed GPU training and use Gloo for distributed CPU training; collective communication APIs: scatter, gather, reduce, all_reduce, all_gather, broadcast; peer-to-peer communication: send, recv, isend, irecv
Loading Data with PyTorch and Petastorm:
- open a Petastorm reader on the parquet file directory, create a PyTorch DataLoader based on this reader
- Limitations: NumPy strings, arrays of strings, object arrays, and object classes are not supported, need to process the data first
Troubleshooting Guidance
- mismatched data types
- straggling workers
PyTorch vs. TensorFlow
- viz and debugging: Tensorflow has better tools
- computation graph: dynamic vs static
- programming limitations: PyTorch less generic, Tensorflow has lots of boilerplate code
- language support for model loading: Tensorlofw support modeling loading in other languages like Java and C++
- supported deployment options: torchScript vs TensorFlow Serving, Flask web server, mobile.
Chapter 10 Deployment Patterns for Machine Learning Models
Deployment Patterns
- Batch Prediction: Pros — easy to implement, scale easily to large data, provide results quickly, Cons — harder to scale with complex inputs, might not be able to compute all possible outputs in the time available, users might get outdated or stale predictions
- Model-in-Service: the model is packaged up with the client-facing app and deployed to a web server, server loads the model and make prediction in real time. Issues: web server backend may be written in a different language, coupling the app and the model means they share the same release schedule, the model may compete for resources with other functions, web server hardware not optimized for models, hard to scale
- Model-as-a-Service: deploy the model itself as a service to avoid coupling with the backend server hardware and possibly conflicting scaling requirements. Drawbacks: add latency to every call, add the complexity of deploying a new service and managing the interaction with it, need to monitor this service
- which Pattern to Use: batch processing method has highest throughput and highest latency, real-time methods have lowest throughput and lowest latency; consider the level of guarantees about missing the deadline: hard, firm, soft. Real-time vs. near real-time
Production Software Requirements:
- model application deployment, model package deployment, dependency management, model runtime environment, REST APIs, Performance optimization, concurrency, model distillation/compression, Cashing layer, Horizontal scaling, managed options
Monitoring ML in Production
- Data Drift: instantaneous drift, gradual drift, periodic drift, temporary drift,
- Model Drift, Concept Drift
- Distributional Domain Shift
- What Metrics to Monitor? model metrics, business metrics, model predictions vs. actual behavior, hardware/networking metrics
- How to Measure Change? define a reference, measure the reference against fresh metrics values — rule-based distance metrics, D1 distance, Kolmogorov-Smirnov statistics, KL divergence
The Production Feedback Loop
- two data pipelines, one compares recent data to the reference data to detect data drift, and one compares model’s predictions with the actual results to detect model drift
Deploying with MLlib
- MLlib’s model format holds metadata as well as the actual data of the model, all the info can help you load the model in a different environment
- Structured Streaming: set up the pipeline with schema, stream reader, ml model; can capture predictions using two data streams and run a stream-stream join to join two streaming DataFrames, which is hard to do efficiently, think of it as microbatch joins
Deploying with MLflow
- Two deployment options: as a microservice and as a UDF in a Spark flow
- Define an ML wrapper: mlflow.pyfunc.PyFuncModel wraps the model and its metadata (the MLmodel file); the interface has three functions: init , load_context, predict (must implement this function every time)
- Deploying the model as a microservice: mlflow.deployments.BaseDeploymentClient exposes APIs that enable deployment to custom serving tools
- Loading model as a Spark UDF: using the spark_udf function to combine the UDF with the Spark DataFrame and create a new Spark DataFrame
Develop Your System Iteratively
- Crawl: manual deployment operation
- Walk: add automated testing and other automation
- Run: add more testing, fine-tune the alerts, capture any changes in data, monitor throughput and results
- Fly: connect the feedback loop of alerts and capturing data drift and changes in production together with triggering a new training process