You can run this notebook in a live sessionBinder or view it on Github.

Parallel and Distributed Machine Learning

Dask-ML has resources for parallel and distributed machine learning.

Types of Scaling

There are a couple of distinct scaling problems you might face. The scaling strategy depends on which problem you’re facing.

  • CPU-Bound: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.

  • Memory-bound: Data is larger than RAM, and sampling isn’t an option.

image0

  • For in-memory problems, just use scikit-learn (or your favorite ML library).

  • For large models, use dask_ml.joblib and your favorite scikit-learn estimator

  • For large datasets, use dask_ml estimators

Scikit-Learn in 5 Minutes

Scikit-Learn has a nice, consistent API.

  • You instantiate an Estimator (e.g. LinearRegression, RandomForestClassifier, etc.). All of the models hyperparameters (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it’s created.

  • You call estimator.fit(X, y) to train the estimator.

  • Use estimator to inspect attributes, make predictions, etc.

Let’s generate some random data.

  1. [ ]:
  1. from sklearn.datasets import make_classification
  2.  
  3. X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
  4. X[:8]
  1. [ ]:
  1. y[:8]

We’ll fit a LogisitcRegression.

  1. [ ]:
  1. from sklearn.svm import SVC

Create the estimator and fit it.

  1. [ ]:
  1. estimator = SVC(random_state=0)
  2. estimator.fit(X, y)

Inspect the learned attributes.

  1. [ ]:
  1. estimator.support_vectors_[:4]

Check the accuracy.

  1. [ ]:
  1. estimator.score(X, y)

Hyperparameters

Most models have hyperparameters. They affect the fit, but are specified up front instead of learned during training.

  1. [ ]:
  1. estimator = SVC(C=0.00001, shrinking=False, random_state=0)
  2. estimator.fit(X, y)
  3. estimator.support_vectors_[:4]
  1. [ ]:
  1. estimator.score(X, y)

Hyperparameter Optimization

There are a few ways to learn the best _hyper_parameters while training. One is GridSearchCV. As the name implies, this does a brute-force search over a grid of hyperparameter combinations.

  1. [ ]:
  1. from sklearn.model_selection import GridSearchCV
  1. [ ]:
  1. %%time
  2. estimator = SVC(gamma='auto', random_state=0, probability=True)
  3. param_grid = {
  4. 'C': [0.001, 10.0],
  5. 'kernel': ['rbf', 'poly'],
  6. }
  7.  
  8. grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
  9. grid_search.fit(X, y)

Single-machine parallelism with scikit-learn

image0

Scikit-Learn has nice single-machine parallelism, via Joblib. Any scikit-learn estimator that can operate in parallel exposes an n_jobs keyword. This controls the number of CPU cores that will be used.

  1. [ ]:
  1. %%time
  2. grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
  3. grid_search.fit(X, y)

Multi-machine parallelism with Dask

image0

Dask can talk to scikit-learn (via joblib) so that your cluster is used to train a model.

If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a disrtibuted cluster. That would mean putting something in the call to Client something like

  1. c = Client('tcp://my.scheduler.address:8786')

Details on the many ways to create a cluster can be found here.

Let’s try it on a larger problem (more hyperparameters).

  1. [ ]:
  1. import joblib
  2. import dask.distributed
  3.  
  4. c = dask.distributed.Client()
  1. [ ]:
  1. param_grid = {
  2. 'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
  3. # Uncomment this for larger Grid searches on a cluster
  4. # 'kernel': ['rbf', 'poly', 'linear'],
  5. # 'shrinking': [True, False],
  6. }
  7.  
  8. grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)
  1. [ ]:
  1. %%time
  2. with joblib.parallel_backend("dask", scatter=[X, y]):
  3. grid_search.fit(X, y)
  1. [ ]:
  1. grid_search.best_params_, grid_search.best_score_

Training on Large Datasets

Sometimes you’ll want to train on a larger than memory dataset. dask-ml has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine’s RAM.

  1. [ ]:
  1. import dask.array as da
  2. import dask.delayed
  3. from sklearn.datasets import make_blobs
  4. import numpy as np

We’ll make a small (random) dataset locally using scikit-learn.

  1. [ ]:
  1. n_centers = 12
  2. n_features = 20
  3.  
  4. X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)
  5.  
  6. centers = np.zeros((n_centers, n_features))
  7.  
  8. for i in range(n_centers):
  9. centers[i] = X_small[y_small == i].mean(0)
  10.  
  11. centers[:4]

The small dataset will be the template for our large random dataset. We’ll use dask.delayed to adapt sklearn.datasets.make_blobs, so that the actual dataset is being generated on our workers.

  1. [ ]:
  1. n_samples_per_block = 200000
  2. n_blocks = 500
  3.  
  4. delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
  5. centers=centers,
  6. n_features=n_features,
  7. random_state=i)[0]
  8. for i in range(n_blocks)]
  9. arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
  10. for obj in delayeds]
  11. X = da.concatenate(arrays)
  12. X
  1. [ ]:
  1. X = X.persist() # Only run this on the cluster.

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you’re familiar with scikit-learn, you’ll feel at home with Dask-ML.

  1. [ ]:
  1. from dask_ml.cluster import KMeans
  1. [ ]:
  1. clf = KMeans(init_max_iter=3, oversampling_factor=10)
  1. [ ]:
  1. %time clf.fit(X)
  1. [ ]:
  1. clf.labels_
  1. [ ]:
  1. clf.labels_[:10].compute()