Incorporate custom functions in sklearn pipeline

·

6 min read

I was working on a tabular dataset problem on Kaggle, and I wrote a function to do some preprocessing. The function removed columns that had a correlation above a threshold. I wanted to experiment with different thresholds, and rather than tune it by hand, I wanted to be able to call it with RandomizedSearchCV.

I found this excellent article that went into a lot of depth on how to do this. Rather than reproduce that work on how to implement this, I am going to talk about what I was trying to do, and why.

The Problem

This month's Kaggle Tabular Challenge is to take 5000 separate files that each represent a set of predictions and combine them, ensemble them, into a new prediction. The most straight forward way to do this would be soft voting - taking the mean of each prediction.

The idea behind ensembles is that combining several good models can produce better results than any of the individual models by itself. This works better the more different the models are from each other. So before we vote, we should remove any redundant models. The challenge doesn't provide any information about how the predictions were produced; all we have in each file is a series of ids and predictions, which are decimal numbers between 0 and 1.

Correlation

We don't know how the predictions were produced, but we can see how correlated each prediction is with each other. Removing columns with high correlations to leave only columns that are independent of each other should make for a more reliable ensemble.

Pandas provides a correlation method on both its dataframe and its series. The dataframe has 5,000 columns and 20,000 rows. I decided to not wait for that calculation and try working column by column.

 def find_correlated_columns(df, threshold):
        correlated_cols = []

        num_cols = len(df.columns)
        for i in range(num_cols):
            col_name = df.columns[i]

            if col_name in correlated_cols:
                continue

            source_col = df[col_name]

            for j in range(i+1, num_cols):
                comparison_col = df.columns[j]
                if not comparison_col in correlated_cols:
                    corr = source_col.corr(df[comparison_col])
                    if corr > threshold:
                        correlated_cols.append(comparison_col)

        return correlated_cols

This goes column by column (outer for loop), and then compares each subsequent column to see if the correlation is above the passed in threshold. After we have compiled the list, we will drop those columns from both the training and test datafames. Calculating the correlation for each column is expensive, so we check to see if the column we are examining is already in the list of columns to be removed, and if so we skip the comparison.

Making it tunable

I wanted to be able to run my transformation as part of a pipeline so that I could use sklearn's RandomizedSearchCV function to find the best correlation threshold to use in conjunction with other hyperparemeters like this:

my_pipeline = Pipeline([
    ("correlations", CorrelationRemover()),
    ("xgboost", xgb.XGBClassifier())
])

param_distribs = {'xgboost__max_depth': [2, 3, 4, 5],
                               'correlations__threshold': [0.8, 0.9, 1.0]}

rnd_search = RandomizedSearchCV(my_pipeline, param_distribs,cv=3, n_iter=2,
                               scoring="neg_log_loss")

This required putting my function into a class that implemented a fit and transform functions as described in the article I mentioned earlier. The class inherits from BaseEstimator and TransformerMixin.

    def fit(self, X, y=None): 
        print("Finding correlations at : ", self.threshold)
        self.correlated_cols = self.find_correlated_columns(X, self.threshold)
        return self

    def transform(self, X, y=None):
        transformed = X.drop(self.correlated_cols, axis=1)
        return transformed

With that done, I can call fit_transform, and get my smaller dataset.

CT = CorrelationRemover(0.5)
X_example = CT.fit_transform(X_train, y_train)

Optimization

The function I initially wrote did a lot of extra work during the actual random search. Once we have calculated the columns for a given threshold, there is no reason to calculate it again. The random search of hyperparameters uses 3-fold cross validation, which means that for each combination it tries, it will run the correlation remover 3 times on the same threshold. I decided to memoize the results. I imagine that Python has an implementation of memoization, but I decided to implement my own. I created an empty dictionary outside my correlation function, then read from it at the top of my function, and write to it at the bottom.

    def find_correlated_columns(self, df, threshold):
        correlated_cols = []
        key = round(threshold * 1000)

        prev_result = correlation_memo.get(key)
        if prev_result:
            return prev_result

# ... same code as before to find the correlations

        correlation_memo[key] = correlated_cols
        return correlated_cols

I am always wary of equality with floating point numbers, so I convert the thresholds to integers to use as my keys. Because I multiplied the number by 1000 before rounding, it means I can only try thresholds to the granularity of 0.001, but that seemed like more than enough for this application.

Skipping work saves a lot of time. Computing a threshold of 0.5 drops a lot of columns really quickly, whereas 0.99 means we have to run a lot more comparisons. Calculating a threshold of 1.0 would be the slowest of all, and it is completely unnecessary because in that case we won't be dropping any columns.

I also modified the test to check the absolute value of the correlation because a correlation of -0.99 is just as redundant as a correlation of 0.99, but that didn't seem to have any effect on this dataset. I left it in, even after I determined it wouldn't affect the data, because if I ever look back at my notebook for reference I wanted to have it done properly. Preventing a future bug is worth the small performance penalty it may have.

Pulling it all together

After defining my class and adding the optimizations I discussed, this

correlation_memo = {}
class CorrelationRemover(BaseEstimator, TransformerMixin):
    def __init__(self, threshold=1.0):
        self.threshold = threshold
        self.correlated_cols = []

    def find_correlated_columns(self, df, threshold):
        correlated_cols = []
        key = round(threshold * 1000)
        if key >= 1000:
            return []

        prev_result = correlation_memo.get(key)
        if prev_result:
            return prev_result

        num_cols = len(df.columns)
        for i in range(num_cols):
            col_name = df.columns[i]

            if col_name in correlated_cols:
                continue

            source_col = df[col_name]

            for j in range(i+1, num_cols):
                comparison_col = df.columns[j]
                if not comparison_col in correlated_cols:
                    corr = source_col.corr(df[comparison_col])
                    if abs(corr) > threshold:
                        correlated_cols.append(comparison_col)

        correlation_memo[key] = correlated_cols
        return correlated_cols


    def fit(self, X, y=None): 
        print("Finding correlations at : ", self.threshold)
        self.correlated_cols = self.find_correlated_columns(X, self.threshold)
        return self

    def transform(self, X, y=None):
        transformed = X.drop(self.correlated_cols, axis=1)
        return transformed

Next steps

Memoizing the list of correlated columns for various thresholds sped things up considerably, but it is hard to avoid the conclusion that storing the correlations themselves, would save even more work. With 5,000 features, the number of correlations is 25 million, which sounds like a lot, but even if we stored them as doubles, that is still only 200 mb.

So was all of this effort wasted? Not really. I was able to explore these ideas with low correlation thresholds, which means that my iterations were fairly quick. Then, when I was happy with the state of the code, I could change parameters to things that I thought would perform better and submit the code to Kaggle. Kaggle runs all of the calculations, and I can go have a sandwich. The optimizations were still worthwhile because I could get the results of my experiments faster, and because I could try a larger number of iterations of my tuning job without hitting Kaggle's timeout.

For my next experiment I will call correlation on a dataframe, and write a function to find correlated columns by using the precalculated data, rather than doing the calculations again for each new threshold. I will do my experiments on a dataset that has 50 features. When I am satisfied that it works properly, I will modify my notebook to load all 5,000 features, and submit that to Kaggle.