You can run this notebook in a live session or view it on Github.
Parallel and Distributed Machine Learning
Dask-ML has resources for parallel and distributed machine learning.
Types of Scaling
There are a couple of distinct scaling problems you might face. The scaling strategy depends on which problem you’re facing.
CPU-Bound: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
Memory-bound: Data is larger than RAM, and sampling isn’t an option.
For in-memory problems, just use scikit-learn (or your favorite ML library).
For large models, use
dask_ml.joblib
and your favorite scikit-learn estimatorFor large datasets, use
dask_ml
estimators
Scikit-Learn in 5 Minutes
Scikit-Learn has a nice, consistent API.
You instantiate an
Estimator
(e.g.LinearRegression
,RandomForestClassifier
, etc.). All of the models hyperparameters (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it’s created.You call
estimator.fit(X, y)
to train the estimator.Use
estimator
to inspect attributes, make predictions, etc.
Let’s generate some random data.
- [ ]:
- from sklearn.datasets import make_classification
- X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
- X[:8]
- [ ]:
- y[:8]
We’ll fit a LogisitcRegression.
- [ ]:
- from sklearn.svm import SVC
Create the estimator and fit it.
- [ ]:
- estimator = SVC(random_state=0)
- estimator.fit(X, y)
Inspect the learned attributes.
- [ ]:
- estimator.support_vectors_[:4]
Check the accuracy.
- [ ]:
- estimator.score(X, y)
Hyperparameters
Most models have hyperparameters. They affect the fit, but are specified up front instead of learned during training.
- [ ]:
- estimator = SVC(C=0.00001, shrinking=False, random_state=0)
- estimator.fit(X, y)
- estimator.support_vectors_[:4]
- [ ]:
- estimator.score(X, y)
Hyperparameter Optimization
There are a few ways to learn the best _hyper_parameters while training. One is GridSearchCV
. As the name implies, this does a brute-force search over a grid of hyperparameter combinations.
- [ ]:
- from sklearn.model_selection import GridSearchCV
- [ ]:
- %%time
- estimator = SVC(gamma='auto', random_state=0, probability=True)
- param_grid = {
- 'C': [0.001, 10.0],
- 'kernel': ['rbf', 'poly'],
- }
- grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
- grid_search.fit(X, y)
Single-machine parallelism with scikit-learn
Scikit-Learn has nice single-machine parallelism, via Joblib. Any scikit-learn estimator that can operate in parallel exposes an n_jobs
keyword. This controls the number of CPU cores that will be used.
- [ ]:
- %%time
- grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
- grid_search.fit(X, y)
Multi-machine parallelism with Dask
Dask can talk to scikit-learn (via joblib) so that your cluster is used to train a model.
If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a disrtibuted cluster. That would mean putting something in the call to Client
something like
- c = Client('tcp://my.scheduler.address:8786')
Details on the many ways to create a cluster can be found here.
Let’s try it on a larger problem (more hyperparameters).
- [ ]:
- import joblib
- import dask.distributed
- c = dask.distributed.Client()
- [ ]:
- param_grid = {
- 'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
- # Uncomment this for larger Grid searches on a cluster
- # 'kernel': ['rbf', 'poly', 'linear'],
- # 'shrinking': [True, False],
- }
- grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)
- [ ]:
- %%time
- with joblib.parallel_backend("dask", scatter=[X, y]):
- grid_search.fit(X, y)
- [ ]:
- grid_search.best_params_, grid_search.best_score_
Training on Large Datasets
Sometimes you’ll want to train on a larger than memory dataset. dask-ml
has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine’s RAM.
- [ ]:
- import dask.array as da
- import dask.delayed
- from sklearn.datasets import make_blobs
- import numpy as np
We’ll make a small (random) dataset locally using scikit-learn.
- [ ]:
- n_centers = 12
- n_features = 20
- X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)
- centers = np.zeros((n_centers, n_features))
- for i in range(n_centers):
- centers[i] = X_small[y_small == i].mean(0)
- centers[:4]
The small dataset will be the template for our large random dataset. We’ll use dask.delayed
to adapt sklearn.datasets.make_blobs
, so that the actual dataset is being generated on our workers.
- [ ]:
- n_samples_per_block = 200000
- n_blocks = 500
- delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
- centers=centers,
- n_features=n_features,
- random_state=i)[0]
- for i in range(n_blocks)]
- arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
- for obj in delayeds]
- X = da.concatenate(arrays)
- X
- [ ]:
- X = X.persist() # Only run this on the cluster.
The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.
They follow the scikit-learn API, so if you’re familiar with scikit-learn, you’ll feel at home with Dask-ML.
- [ ]:
- from dask_ml.cluster import KMeans
- [ ]:
- clf = KMeans(init_max_iter=3, oversampling_factor=10)
- [ ]:
- %time clf.fit(X)
- [ ]:
- clf.labels_
- [ ]:
- clf.labels_[:10].compute()