How to use Custom Sklearn Classes and Pipelines
How to use Scikit-Learn's deeper features to build custom models and transformers.
Introduction
Scikit-learn (or sklearn
) is the machine learning tool of choice for exploratory analysis by data scientists. It has over 45k stars on GitHub and was downloaded over 7 million times in the last month (March 2021) Their fit
/ transform
/ predict
API is now ubiquitous in the python machine learning ecosystem with many other open source projects choosing to be compatible with that API.
Motivation
In order to leverage the deeper features of the sklearn
platform, it is useful to build custom data transformation pipelines using the provided classes. In this blog post, we will focus on using Custom Transformers and Pipelines which are essential to delivering replicable results.
When dealing with real-world data, it is often difficult to convert data manipulations into repeatable steps: this is where custom transformers come into play. Additionally, you might then want to apply these custom transformations in sequence. This is where Pipelines
become useful. Putting these two tools together, you have a powerful playbook to handle all the messy data the real world throws at you.
sklearn
API and concepts such as fit
/ transform
/ predict
and so on. If some of that is new to you, please check out this fantastic page in the `sklearn` docs.
# Install Required packages
# Python 3.8
!pip install --upgrade pip
!pip install kaggle==1.5.12
!pip install scikit-learn==0.24.1
!pip install pandas==1.2.3
!pip install python-dotenv==0.17.0
!pip install fastcore==1.3.19
from pathlib import Path
from pprint import pprint
from typing import Any, Union
import numpy as np
import pandas as pd
from fastcore.basics import patch
from pandas import json_normalize
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin, clone
from sklearn.feature_extraction import DictVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, OrdinalEncoder
from sklearn.utils.validation import check_array, check_is_fitted
np.random.seed(42)
SklearnInput = Union[list, np.ndarray, pd.DataFrame, pd.Series]
The Dataset
In this blog post we'll be using the motivating example of the Prescription Based Prediction (PBP) dataset. The PBP dataset combines the frequencies of prescribed medication under Medicare part D with practitioner data from the National Provider Identifier dataset. The authors of this aggregate dataset had a corresponding blog post explaining the dataset in a bit more detail. It is no longer live, but it is still available on The Wayback Machine.
from dotenv import load_dotenv
load_dotenv()
import kaggle
kaggle.api.authenticate()
kaggle.api.dataset_download_files(
"roamresearch/prescriptionbasedprediction", unzip=True
)
The data itself is in the uncommon .jsonl
format which is a newline delimited dataset. Each line contains a json
object. We can use pandas to read this dataset into a pd.DataFrame
.
FPATH = Path("/work/roam_prescription_based_prediction.jsonl")
df = pd.read_json(FPATH, lines=True, orient="columns")
df.head(3)
To understand this data a bit better, lets explode both of the json
variables in the first row. We can use the pandas
builtin function json_normalize
which converts a json
dictionary into a flat table.
json_normalize(df.iloc[0].cms_prescription_counts)
json_normalize(df.iloc[0].provider_variables)
Cool, let's now see what the distribution of those variables looks like.
json_normalize(df["provider_variables"]).settlement_type.value_counts()
json_normalize(df["provider_variables"]).region.value_counts()
It looks like the cms_prescription_counts
column contains a json
dictionary with the names of each drug followed by the count. The provider_variables
column seems to have a json
dictionary that contains information about the provider who prescribed the drugs.
It might be interesting to build a few models that predict some of these variables given the prescribed drugs and counts. In this blog post, we'll focus on estimating the settlement_type
and the region
.
Let's set the feature and response variables to their appropriate column and convert them to native Python lists. Let's also split the data into a train and validation set.
TEST_PCT = 0.2
X = df.cms_prescription_counts.tolist()
y = df.provider_variables.tolist()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_PCT)
Custom Transformers
Developing sklearn
API compatible transformers will allow us to make full use of other aspects of the library. In this example, we'll implement two custom transformers that leverage sklearn base classes to transform the response variable in the dataset.
JSONTargetSelector
First, let's build a way to quickly extract a desired response variable from y
. We start by defining a class that inherits from TransformerMixin
which gives us the fit_transform
method if we define the fit
and transform
methods.
Next up, we define the constructor which requires two variables that tell us the key to extract from each json
row and the dimensions of the data output. We also include an is_fit_
attribute which lets sklearn
helper functions know that this transformer is already been "fitted" (i.e. it doesn't need to be fit)
class JSONTargetSelector(TransformerMixin):
"""From a list of json blobs select a particular key and compose a new list of those values.
Args:
target_name: The json key to extract in each row
two_d_out: Whether the output transformation should be 2d
"""
def __init__(self, target_name: str, two_d_out: bool = False):
self.target_name = target_name
self.two_d_out = two_d_out
self.is_fit_ = True
Now, let's add the fit
method. It is empty because we already have all the information we need to transform input data from the constructor alone. We return self
(the object instance) as per convention.
@patch
decorator to add instance methods to the class so that we don't have to have all of the code in a single jupyter cell.
@patch
def fit(self: JSONTargetSelector, X: SklearnInput, y: SklearnInput = None):
return self
Now let's add the transform functionality. We break this up into the two methods. The _extract_target
method allows us to pull a particular key out of an input dictionary while also validating that the inputs are as we expect them. The transform
method does the meat of the data conversion. It turns a list of json
objects to a flat list of selected values.
@patch
def _extract_target(self: JSONTargetSelector, index: int, json_dict: dict):
"""Get the target key from a dictionary.
Args:
index: the index of the current row
json_dict: potentially a json dictionary which has `target_name` as a key
Returns:
The value of the `target_name` key.
Raises:
ValueError: If `json_dict` is not a dictionary
ValueError: If `target_name` is not in the `json_dict`
"""
if not isinstance(json_dict, dict):
raise ValueError(f"item at index {index} is not a dictionary.")
if self.target_name not in json_dict:
raise ValueError(f"index {index} is missing target {self.target_name}.")
return json_dict.get(self.target_name)
@patch
def transform(self: JSONTargetSelector, X: SklearnInput, y: SklearnInput = None):
X = check_array(
X, accept_sparse=False, force_all_finite=True, ensure_2d=False, dtype="object"
)
out = np.array([self._extract_target(i, Xi) for i, Xi in enumerate(np.squeeze(X))])
if self.two_d_out:
out = out.reshape((-1, 1))
return out
Lastly, let's add the identity function as the inverse transform. There is no meaningful inverse since we lose data during the transform and future sections will require this function to have an implementation.
@patch
def inverse_transform(self: JSONTargetSelector, X: SklearnInput):
return X
We can test that the above implementation works with some fake data. Given we selects "c"
as the desired key, we should see [2 4 8]
in the output which is exactly what we observe.
d = {k: i for i, k in enumerate(list("abcd"))}
list_of_dict = [d, {k: v**2 for k, v in d.items()}, {k: v**3 for k, v in d.items()}]
print("List of Dicts")
pprint(list_of_dict)
print()
selected_key = "c"
jts = JSONTargetSelector(selected_key)
jts_trans = jts.fit_transform(list_of_dict)
print("Transformed Data")
print(jts_trans)
TransformedTargetClassifier
Phew, alright now that we've got that class completed we need to build a wrapper class that will allow us to actually use it in downstream applications.
Why, you may ask? In our case, we want a simple way to both transform the response variable and estimate on the response variable. Building a composite meta estimator is the way to go here considering the broader API constraints. To be honest, it's a bit of a quirk in the way the sklearn
API was originally designed. By that, I mean that the original API design does not really take into account the possibility of transforming the response variable (specifically using Pipelines
which we'll see in just a bit).
sklearn
already has addressed that flaw with a built-in meta estimator called TransformedTargetRegressor
, but unfortunately, it only works for regression estimators. (e.g. applying a log transform to your response variable) In our case, we are looking at a binary classification problem, so we'll need to roll our own.
First, let's define a constructor which takes in an instance of a classifier and a transformer.
class TransformedTargetClassifier(ClassifierMixin, BaseEstimator):
"""A meta estimator that both transforms the response variable.
Args:
classifier: An instance of a class that inherits from BaseEstimator
transformer: An instance of a class that inherits from TransformerMixin
"""
def __init__(
self, classifier: BaseEstimator, transformer: Union[TransformerMixin, Pipeline]
):
self.classifier = classifier
self.transformer = transformer
Now when fitting this meta estimator, we need to fit both the classifier and the transformer. But first, we need to quickly validate the input y array. Stealing a bit of code from TransformTargetRegressor
we can validate both the datatypes and shape of the inputs.
We use check_array
which is a validation function from sklearn
which converts inputs into np.ndarray
form and validates a bunch of other things for us. We follow this with a bit of code to make sure that the dimensions of the response variable are correct. This is so that we can handle either the user passing a row vector or a column vector.
@patch
def fit( # noqa: F811
self: TransformedTargetClassifier,
X: SklearnInput,
y: SklearnInput,
**fit_params: Any,
):
y = check_array(
y, accept_sparse=False, force_all_finite=True, ensure_2d=False, dtype="object"
)
# store the original dimensions of y
self._training_dim = y.ndim
if y.ndim == 1:
y_2d = y.reshape(-1, 1)
else:
y_2d = y
self.transformer.fit(y_2d)
y_trans = self.transformer.transform(y_2d)
self.classifier.fit(X, y_trans, **fit_params)
self.is_fit_ = True
return self
With the fit
method in place, we turn our attention to predict
which is where the magic happens.
We use the check_is_fitted
method to make sure that the particular transformer is fit. sklearn
has an internal convention which checks for a variable to exist with a trailing underscore (_
) to know if an instance has been fit. Since we don't know the exact classifier or transformer, we cannot get any more specific than that. Unfortunately, Pipelines
do not set the conventional attributes at fit time so we have no way of knowing if it has been fitted.
Next, we run the predict step of the classifier and run an inverse transformation on its output. (running the fit steps in reverse) This was why we made sure to define the inverse_transform
method in our previous custom transformer. After that, we make sure to handle data dimension issues before returning the output.
@patch
def predict(self: TransformedTargetClassifier, X: SklearnInput):
check_is_fitted(self.classifier)
if not isinstance(self.transformer, Pipeline):
check_is_fitted(self.transformer)
pred = self.classifier.predict(X)
if pred.ndim == 1:
pred = pred.reshape(-1, 1)
pred_trans = self.transformer.inverse_transform(pred)
if self._training_dim == 1 and pred_trans.ndim == 2 and pred_trans.shape[1] == 1:
pred_trans = pred_trans.squeeze(axis=1)
return pred_trans
Lastly, we have to add an updated score
method. We cannot simply just use the inherited scoring method since we need to transform the ground truth variable before scoring. We use accuracy just the same as the super class.
@patch
def score(
self: TransformedTargetClassifier,
X: SklearnInput,
y: SklearnInput,
sample_weight: Union[list, np.ndarray] = None,
):
y = check_array(
y, accept_sparse=False, force_all_finite=True, ensure_2d=False, dtype=None
)
if y.ndim == 1:
y = y.reshape((-1, 1))
y = self.transformer.transform(y)
return accuracy_score(y, self.classifier.predict(X), sample_weight=sample_weight)
Finally, we're ready to see if this class works!
Let's generate some random data with 100 samples to test in a Pipeline
. We'll cover exactly how Pipelines
work in the next section so don't worry about the details here. We just want to make sure that we see strings instead of numbers as the output of a prediction, which, we do indeed see.
_y = np.array(["a", "b"] * 50)
_X = np.random.normal(size=(100, 3))
_X_test = np.random.normal(size=(3, 3))
oe = Pipeline(
[
("oe", OrdinalEncoder()),
("reshaper", FunctionTransformer(lambda x: np.squeeze(x))),
]
)
lr = LogisticRegression()
ttc = TransformedTargetClassifier(lr, oe)
ttc.fit(_X, _y)
ttc.predict(_X_test)
Pipelines
Pipelines
allow you to run multiple operations on an input dataset in succession before an estimation is performed. It was originally designed to be a linear step-by-step transformation template but there are now additional tools in the Pipeline
toolkit that allow for horizontal joining of "columns". This is out of scope for this blog post but you can check out the docs for that here.
The main use case for this approach becomes apparent when we look to perform experiments on a particular machine learning approach. Pipelines make it straightforward to perform cross validation and grid searching across both the feature engineering and estimator hyperparameters. Pipelines make it very hard to accidentally get your data into an improper state which often happens when you have the same input variable being modified across multiple jupyter notebook cells.
In the next two cells we define a Pipeline
which is applied to our PBP dataset. Since the last step in the pipeline inherits from BaseEstimtor
the pipeline behaves exactly like an estimator. (enabling methods like predict
and score
) When we call fit, each of the steps fits the transformer and transforms the data before passing it to the next step.
The following are descriptions of each of the steps across both estimators
- Feature Transformation
DictVectorizer
: Converts the input column of dictionary values into a sparse nxm matrix where n is the number of rows and m is the number of unique prescriptions. The value in each cell is the frequency with which the drug was prescribed by a practitioner.TfidfTransformer
: We can think of the above dictionary almost like a "document" with frequency counts of terms. Hence, we can use term frequency, inverse document frequency (TFIDF) to process those terms into a form that would be more meaningful for a classification model
- Response Transformation (
TransformedTargetClassifier
)- Transformer (also a
Pipeline
)JSONTargetSelector
: We supply this step a chosen response variableOrdinalEncoder
: This step converts a list of strings into numbers incrementing one by one.FunctionTransformer
: This step is a convenience method that allows us to apply a lambda function to the entire dataset. We make sure to reshape the data to a column vector as is required by mostsklearn
estimators
- Classifier
LogisticRegression
: We choose to use logistic regression because it is the simplest classification model
- Transformer (also a
Here we also clone
settlement_pipe
which duplicates its structure and parameters without keeping its state. We then swap out JSONTargetSelector
with an instance that has the right target for the second pipeline.
settlement_pipe = Pipeline(
[
("dv", DictVectorizer()),
("tfidf", TfidfTransformer()),
(
"ttc",
TransformedTargetClassifier(
classifier=LogisticRegression(max_iter=1000),
transformer=Pipeline(
[
["jts", JSONTargetSelector("settlement_type", two_d_out=True)],
("enc", OrdinalEncoder()),
("ft", FunctionTransformer(lambda x: np.squeeze(x))),
]
),
),
),
]
)
region_pipe = clone(settlement_pipe)
region_pipe.steps[2][1].transformer.steps[0][1] = JSONTargetSelector(
"region", two_d_out=True
)
settlement_pipe.fit(X_train, y_train);
region_pipe.fit(X_train, y_train);
Let's do some quick evaluations to see what the output looks like. Carefully selecting some specific rows which do not have too many prescriptions, we find the following output. Looks like 100% accuracy on this cherry picked, dataset... time to call it a day?
N = slice(1, 4)
sample_X = json_normalize(X[N])
preds_df = pd.DataFrame(
zip(
json_normalize(y[N]).settlement_type,
settlement_pipe.predict(X[N]),
json_normalize(y[N]).region,
region_pipe.predict(X[N]),
),
columns=["settlement_type", "settlement_type_hat", "region", "region_hat"],
)
rslts_df = pd.concat([sample_X, preds_df], axis=1)
rslts_df
Anyways, when we actually score the models, we find the following training accuracies. Not too bad for no hyperparameter tuning.
s_train_score = settlement_pipe.score(X_train, y_train)
r_train_score = region_pipe.score(X_train, y_train)
s_test_score = settlement_pipe.score(X_test, y_test)
r_test_score = region_pipe.score(X_test, y_test)
scores_df = pd.DataFrame(
{
"settlement_type": [f"{s_train_score:.3f}", f"{s_test_score:.3f}"],
"region": [f"{r_train_score:.3f}", f"{r_test_score:.3f}"],
},
index=["train", "test"],
)
scores_df
Further Resources
Future Work
- One interesting improvement here could be to add an extra custom transformer before the
DictVectorizer
. One that removes frequent and infrequent medications may improve performance. You should have all the tools to add this step to the pipeline. - Another improvement to deepen your understanding of pipelines would be to try and see how you could use
GridSearchCV
to find a more optimal model. It would be especially useful to see how tuning interplays with your custom transformer from the previous step
Other Links
The best place to learn more about these advanced sklearn
features is actually their documentation. They have a user guide which is quite good, and their API is well documented as well. Additionally, I picked up most of the transformer tricks by reading the source code of the feature extraction transformers. If you have access to the PyCharm debugger, it is also really useful to step through the code as Pipeline.fit
runs since a lot of functionality is abstracted away.
from nbformat import read
# Default path for this notebook
with open("/work/notebook.ipynb", "r", encoding="utf-8") as f:
nb = read(f, 4)
# Count the number of lines in markdown or heading cells
word_count = sum(
[
len(cell["source"].replace("#", "").lstrip().split(" "))
for cell in nb["cells"]
if cell.cell_type in ["markdown", "heading"]
]
)
# Count number of lines in the notebook and subtract the number of
# lines in this cell
line_count = (
sum(
[
# Filter out cells that are comments or are empty
len(
list(
filter(
lambda line: not (line.lstrip().startswith("#")),
cell["source"].split("\n"),
)
)
)
for cell in nb["cells"]
if cell.cell_type == "code"
]
)
- 27
)
print(f"Word Count: {word_count:,}")
print(f"Line Count: {line_count:,}")