`dask-glm`

is a library for fitting generalized linear models on large datasets.
The heart of the project is the set of optimization routines that work on either NumPy or dask arrays.
See these two blogposts describing how dask-glm works internally.

This notebook is shows an example of the higher-level scikit-learn style API built on top of these optimization routines.

```
In [1]:
```import os
import s3fs
import pandas as pd
import dask.array as da
import dask.dataframe as dd
from distributed import Client
from dask import persist, compute
from dask_glm.estimators import LogisticRegression

`distributed.Client`

locally. In the real world you could connect to a cluster of dask-workers.

```
In [2]:
```client = Client()

```
In [3]:
```if not os.path.exists('trip.csv'):
s3 = S3FileSystem(anon=True)
s3.get("dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv", "trip.csv")

```
In [4]:
```ddf = dd.read_csv("trip.csv")
ddf = ddf.repartition(npartitions=8)

```
In [5]:
```# these filter out less than 1% of the observations
ddf = ddf[(ddf.trip_distance < 20) &
(ddf.fare_amount < 150)]
ddf = ddf.repartition(npartitions=8)

```
In [6]:
```df_train, df_test = ddf.random_split([0.8, 0.2], random_state=2)
columns = ['VendorID', 'passenger_count', 'trip_distance', 'payment_type', 'fare_amount']
X_train, y_train = df_train[columns], df_train['tip_amount'] > 0
X_test, y_test = df_test[columns], df_test['tip_amount'] > 0
X_train, y_train, X_test, y_test = persist(
X_train, y_train, X_test, y_test
)

```
In [7]:
```X_train.head()

```
Out[7]:
```

```
In [8]:
```y_train.head()

```
Out[8]:
```

```
In [9]:
```print(f"{len(X_train):,d} observations")

```
```

`scikit-learn`

.

```
In [10]:
```%%time
# this is a *dask-glm* LogisticRegresion, not scikit-learn
lm = LogisticRegression(fit_intercept=False)
lm.fit(X_train.values, y_train.values)

```
```

`.score`

method.
For LogisticRegression this is the mean accuracy score (what percent of the predicted matched the actual).

```
In [11]:
```%%time
lm.score(X_train.values, y_train.values).compute()

```
Out[11]:
```

and on the test dataset:

```
In [12]:
```%%time
lm.score(X_test.values, y_test.values).compute()

```
Out[12]:
```

The bulk of my time "doing data science" is data cleaning and pre-processing. Actually fitting an estimator or making predictions is a relatively small proportion of the work.

You could manually do all your data-processing tasks as a sequence of function calls starting with the raw data.
Or, you could use scikit-learn's `Pipeline`

to accomplish this and then some.
`Pipeline`

s offer a few advantages over the manual solution.

First, your entire modeling process from raw data to final output is in a self-contained object. No more wondering "did I remember to scale this version of my model?" It's there in the `Pipeline`

for you to check.

Second, `Pipeline`

s combine well with scikit-learn's model selection utilties, specifically `GridSearchCV`

and `RandomizedSearchCV`

. You're able to search over hyperparameters of the pipeline stages, just like you would for an estimator.

Third, `Pipeline`

s help prevent leaking information from your test and validation sets to your training set.
A common mistake is to compute some pre-processing statistic on the *entire* dataset (before you've train-test split) rather than just the training set. For example, you might normalize a column by the average of all the observations.
These types of errors can lead you overestimate the performance of your model on new observations.

Since dask-glm follows the scikit-learn API, we can reuse scikit-learn's `Pipeline`

machinery, *with a few caveats.*

Many of the tranformers built into scikit-learn will validate their inputs. As part of this, array-like things are cast to numpy arrays. Since dask-arrays are array-like they are converted and things "work", but this might not be ideal when your dataset doesn't fit in memory.

Second, some things are just fundamentally hard to do on large datasets. For example, naively dummy-encoding a dataset requires a full scan of the data to determine the set of unique values per categorical column. When your dataset fits in memory, this isn't a huge deal. But when it's scattered across a cluster, this could become a bottleneck.

If you know the set of possible values *ahead* of time, you can do much better.
You can encode the categorical columns as pandas `Categoricals`

, and then convert with `get_dummies`

, without having to do an expensive full-scan, just to compute the set of unique values.
We'll do that on the `VendorID`

and `payment_type`

columnms.

```
In [13]:
```from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline

First let's write a little transformer to convert columns to `Categoricals`

.
If you aren't familar with scikit-learn transformers, the basic idea is that the transformer must implement two methods: `.fit`

and `.tranform`

.

`.fit`

is called during training.
It learns something about the data and records it on `self`

.

Then `.transform`

uses what's learned during `.fit`

to transform the feature matrix somehow.

A `Pipeline`

is simply a chain of transformers, each one is `fit`

on some data, and passes the output of `.transform`

onto the next step; the final output is an `Estimator`

, like `LogisticRegression`

.

```
In [14]:
```class CategoricalEncoder(BaseEstimator, TransformerMixin):
"""Encode `categories` as pandas `Categorical`
Parameters
----------
categories : Dict[str, list]
Mapping from column name to list of possible values
"""
def __init__(self, categories):
self.categories = categories
def fit(self, X, y=None):
# "stateless" transformer. Don't have anything to learn here
return self
def transform(self, X, y=None):
X = X.copy()
for column, categories in self.categories.items():
X[column] = X[column].astype('category').cat.set_categories(categories)
return X

`StandardScaler`

, that won't eagerly
convert a dask.array to a numpy array (N.B. the scikit-learn version has more features and error handling, but this will work for now).

```
In [15]:
```class StandardScaler(BaseEstimator, TransformerMixin):
def __init__(self, columns=None, with_mean=True, with_std=True):
self.columns = columns
self.with_mean = with_mean
self.with_std = with_std
def fit(self, X, y=None):
if self.columns is None:
self.columns_ = X.columns
else:
self.columns_ = self.columns
if self.with_mean:
self.mean_ = X[self.columns_].mean(0)
if self.with_std:
self.scale_ = X[self.columns_].std(0)
return self
def transform(self, X, y=None):
X = X.copy()
if self.with_mean:
X[self.columns_] = X[self.columns_] - self.mean_
if self.with_std:
X[self.columns_] = X[self.columns_] / self.scale_
return X.values

```
In [16]:
```from dummy_encoder import DummyEncoder

```
In [17]:
```pipe = make_pipeline(
CategoricalEncoder({"VendorID": [1, 2],
"payment_type": [1, 2, 3, 4, 5]}),
DummyEncoder(),
StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount']),
LogisticRegression(fit_intercept=False)
)

So that's our pipeline. We can go ahead and fit it just like before, passing in the raw data.

```
In [18]:
```%%time
pipe.fit(X_train, y_train.values)

```
Out[18]:
```

`Pipeline`

ensures that all of the nescessary transformations take place before calling the estimator's `score`

method.

```
In [19]:
```pipe.score(X_train, y_train.values).compute()

```
Out[19]:
```

```
In [20]:
```pipe.score(X_test, y_test.values).compute()

```
Out[20]:
```

As explained earlier, Pipelines and grid search go hand-in-hand. Let's run a quick example with dask-searchcv.

```
In [21]:
```from sklearn.model_selection import GridSearchCV
import dask_searchcv as dcv

We'll search over two hyperparameters

- Whether or not to standardize the variance of each column in
`StandardScaler`

- The strength of the regularization in
`LogisticRegression`

This involves fitting many models, one for each combination of paramters. dask-searchcv is smart enough to know that early stages in the pipeline (like the categorical and dummy encoding) are shared among all the combinations, and so only fits them once.

```
In [22]:
```param_grid = {
'standardscaler__with_std': [True, False],
'logisticregression__lamduh': [.001, .01, .1, 1],
}
pipe = make_pipeline(
CategoricalEncoder({"VendorID": [1, 2],
"payment_type": [1, 2, 3, 4, 5]}),
DummyEncoder(),
StandardScaler(columns=['passenger_count', 'trip_distance', 'fare_amount']),
LogisticRegression(fit_intercept=False)
)
gs = dcv.GridSearchCV(pipe, param_grid)

```
In [23]:
```%%time
gs.fit(X_train, y_train.values)

```
Out[23]:
```

Now we have access to the usual attributes like `cv_results_`

learned by the grid search object:

```
In [31]:
```pd.DataFrame(gs.cv_results_)

```
Out[31]:
```

And we can do our usual checks on model fit for the training set:

```
In [25]:
```gs.score(X_train, y_train.values).compute()

```
Out[25]:
```

And the test set:

```
In [26]:
```gs.score(X_test, y_test.values).compute()

```
Out[26]:
```