diff --git a/pyBreakDown/explainer.py b/pyBreakDown/explainer.py index df1123e..b44fde2 100644 --- a/pyBreakDown/explainer.py +++ b/pyBreakDown/explainer.py @@ -2,6 +2,8 @@ from collections import deque from blist import blist from pyBreakDown import explanation as e +from tqdm import tqdm + class Explainer: """ @@ -17,17 +19,19 @@ class Explainer: Dataset feature names. """ def __init__(self, clf, data, colnames): - assert len(colnames) == data.shape[1] #otherwise it wouldnt make any sense + assert len(colnames) == data.shape[1] # otherwise it wouldn't make any sense self.clf = clf self.data = data self.colnames = colnames - def _transform_observation (self, observation): + @staticmethod + def _transform_observation (observation): if observation.ndim < 2: observation = np.expand_dims(observation, axis=0) return observation - def _get_initial_dataset(self, observation, data): + @staticmethod + def _get_initial_dataset(observation, data): assert observation.ndim == 2 and observation.shape[0] == 1 return np.repeat(observation,repeats=data.shape[0], axis=0) @@ -55,16 +59,19 @@ def explain (self, observation, direction, useIntercept = False, baseline=0): """ data = np.copy(self.data) - assert direction in ["up","down"] - observation = self._transform_observation(observation) #expand dims from 1D to 2D if necessary + assert direction in ["up", "down"] + observation = self._transform_observation(observation) # expand dims from 1D to 2D if necessary assert len(self.colnames) == observation.shape[1] - if direction=="up": - exp = self._explain_up(observation, baseline, data) - if direction=="down": - exp = self._explain_down(observation, baseline, data) + if direction == "up": + exp = self._explain_up(observation, data) + else: + exp = self._explain_down(observation, data) - mean_prediction = np.mean(self.clf.predict(data)) + if hasattr(self.clf, 'predict_proba'): + mean_prediction = np.mean(self.clf.predict_proba(data)[:, 0]) + else: + mean_prediction = np.mean(self.clf.predict(data)) if useIntercept: baseline = mean_prediction @@ -77,63 +84,110 @@ def explain (self, observation, direction, useIntercept = False, baseline=0): exp.make_final_prediction() return exp - - def _explain_up (self, observation, baseline, data): + def _explain_up(self, observation, data): new_data = self._get_initial_dataset(observation, data) - baseline_yhat = np.mean(self.clf.predict(data)) - - open_variables = blist(range(0,data.shape[1])) - important_variables = deque() - important_yhats = {} - - for i in range(0, data.shape[1]): - yhats = {} - yhats_diff = np.repeat(-float('inf'), data.shape[1]) - - for variable in open_variables: - tmp_data = np.copy(data) - tmp_data[:,variable] = new_data[:,variable] - yhats[variable] = self.clf.predict(tmp_data) - yhats_diff[variable] = abs(baseline_yhat - np.mean(yhats[variable])) - - amax = np.argmax(yhats_diff) - important_variables.append(amax) - important_yhats[i] = yhats[amax] - data[:,amax] = new_data[:,amax] - open_variables.remove(amax) + if hasattr(self.clf, 'predict_proba'): + baseline_yhat = np.mean(self.clf.predict_proba(data)[:, 0]) + open_variables = blist(range(0, data.shape[1])) + important_variables = deque() + important_yhats = {} + temp = np.copy(data) + + for i in tqdm(range(0, data.shape[1])): + yhats = {} + yhats_diff = np.repeat([-float('inf')], data.shape[1]) + + for variable in open_variables: + data[:, variable] = new_data[:, variable] + yhats[variable] = self.clf.predict_proba(data)[:, 0] + yhats_diff[variable] = abs(baseline_yhat - np.mean(yhats[variable])) + data[:, variable]=temp[:, variable] + + amax = np.argmax(yhats_diff) + important_variables.append(amax) + important_yhats[i] = yhats[amax] + data[:, amax] = new_data[:, amax] + open_variables.remove(amax) + else: + baseline_yhat = np.mean(self.clf.predict(data)) + open_variables = blist(range(0, data.shape[1])) + important_variables = deque() + important_yhats = {} + temp = np.copy(data) + + for i in tqdm(range(0, data.shape[1])): + yhats = {} + yhats_diff = np.repeat([-float('inf')], data.shape[1]) + + for variable in open_variables: + data[:,variable] = data[:,variable] + yhats[variable] = self.clf.predict(data)[:, 0] + yhats_diff[variable] = abs(baseline_yhat - np.mean(yhats[variable])) + data[:, variable] = temp[:, variable] + + amax = np.argmax(yhats_diff) + important_variables.append(amax) + important_yhats[i] = yhats[amax] + data[:,amax] = new_data[:,amax] + open_variables.remove(amax) var_names = np.array(self.colnames)[important_variables] - var_values = observation[0,important_variables] + var_values = observation[0, important_variables] means = self._get_means_from_yhats(important_yhats) means.appendleft(baseline_yhat) contributions = np.diff(means) return e.Explanation(var_names, var_values, contributions, e.ExplainerDirection.Up) - def _explain_down (self, observation, baseline, data): + def _explain_down (self, observation, data): new_data = self._get_initial_dataset(observation, data) - target_yhat = self.clf.predict(observation) + if hasattr(self.clf, 'predict_proba'): + target_yhat = self.clf.predict_proba(observation)[:, 0] + + open_variables = blist(range(0, data.shape[1])) + important_variables = deque() + important_yhats = {} + temp = np.copy(new_data) + + for i in range(0, data.shape[1]): + yhats = {} + yhats_diff = np.repeat([float('inf')], data.shape[1]) + + for variable in open_variables: + new_data[:,variable] = data[:,variable] + yhats[variable] = self.clf.predict_proba(new_data)[:, 0] + yhats_diff[variable] = abs(target_yhat - np.mean(yhats[variable])) + new_data[:, variable] = temp[:, variable] + + amin = np.argmin(yhats_diff) + important_variables.append(amin) + important_yhats[i] = yhats[amin] + new_data[:,amin] = data[:,amin] + open_variables.remove(amin) + else: + target_yhat = self.clf.predict(observation)[:, 0] + + open_variables = blist(range(0, data.shape[1])) + important_variables = deque() + important_yhats = {} + temp = np.copy(new_data) - open_variables = blist(range(0,data.shape[1])) - important_variables = deque() - important_yhats = {} + for i in range(0, data.shape[1]): + yhats = {} + yhats_diff = np.repeat([float('inf')], data.shape[1]) - for i in range(0, data.shape[1]): - yhats = {} - yhats_diff = np.repeat(float('inf'), data.shape[1]) - - for variable in open_variables: - tmp_data = np.copy(new_data) - tmp_data[:,variable] = data[:,variable] - yhats[variable] = self.clf.predict(tmp_data) - yhats_diff[variable] = abs(target_yhat - np.mean(yhats[variable])) + for variable in open_variables: + new_data[:, variable] = data[:, variable] + yhats[variable] = self.clf.predict(new_data)[:, 0] + yhats_diff[variable] = abs(target_yhat - np.mean(yhats[variable])) + new_data[:, variable] = temp[:, variable] - amin = np.argmin(yhats_diff) - important_variables.append(amin) - important_yhats[i] = yhats[amin] - new_data[:,amin] = data[:,amin] - open_variables.remove(amin) + amin = np.argmin(yhats_diff) + important_variables.append(amin) + important_yhats[i] = yhats[amin] + new_data[:, amin] = data[:, amin] + open_variables.remove(amin) important_variables.reverse() var_names = np.array(self.colnames)[important_variables] @@ -145,5 +199,6 @@ def _explain_down (self, observation, baseline, data): return e.Explanation(var_names, var_values, contributions, e.ExplainerDirection.Down) - def _get_means_from_yhats (self, important_yhats): - return deque([np.array(v).mean() for k,v in important_yhats.items()]) + @staticmethod + def _get_means_from_yhats (important_yhats): + return deque([np.array(v).mean() for k, v in important_yhats.items()]) diff --git a/pyBreakDown/explanation.py b/pyBreakDown/explanation.py index 700a1d7..bf05c6b 100644 --- a/pyBreakDown/explanation.py +++ b/pyBreakDown/explanation.py @@ -1,15 +1,16 @@ -import numpy as np from collections import deque from enum import Enum from recordclass import recordclass import matplotlib.patches as patches from matplotlib import pyplot as plt -AttrInfo = recordclass("AttrInfo",["name","value","contribution","cumulative"]) +AttrInfo = recordclass("AttrInfo", ["name", "value", "contribution", "cumulative"]) + + +class ExplainerDirection(Enum): + Up = 1 + Down = 2 -class ExplainerDirection (Enum): - Up=1 - Down=2 class Explanation: """ @@ -18,19 +19,21 @@ class Explanation: _INTERCEPT_NAME = "Intercept" _INTERCEPT_VALUE = 1 - def __init__ (self, variable_names, variable_values, contributions, direction): + def __init__(self, variable_names, variable_values, contributions, direction): self._direction = direction self._attributes = deque() csum = 0 for (name, value, contribution) in zip(variable_names, variable_values, contributions): - csum+=contribution + csum += contribution self._attributes.append( AttrInfo(name=name, value=value, contribution=contribution, cumulative=csum) - ) - self._has_intercept=False - self._has_final_prognosis=False - - def text (self, fwidth=25, contwidth=20, cumulwidth = 20, digits=2): + ) + self._has_intercept = False + self._has_final_prognosis = False + self._final_prediction = 0 + self._baseline = 0 + + def text(self, fwidth=25, contwidth=20, cumulwidth=20, digits=2): """ Get user-friendly text from of explanation @@ -49,24 +52,24 @@ def text (self, fwidth=25, contwidth=20, cumulwidth = 20, digits=2): return lines = [''.join( - [ - ' = '.join([attr.name, str(attr.value)]).ljust(fwidth), - str(round(attr.contribution,digits)).ljust(contwidth), + [ + ' = '.join([attr.name, str(attr.value)]).ljust(fwidth), + str(round(attr.contribution, digits)).ljust(contwidth), str(round(attr.cumulative, digits)).ljust(cumulwidth) - ] - ) for attr in self._attributes] + ] + ) for attr in self._attributes] - print (''.join( - ["Feature".ljust(fwidth), - "Contribution".ljust(contwidth), - "Cumulative".ljust(cumulwidth)])) + print(''.join( + ["Feature".ljust(fwidth), + "Contribution".ljust(contwidth), + "Cumulative".ljust(cumulwidth)])) print('\n'.join(lines)) print(''.join( - ['Final prediction'.ljust(fwidth+contwidth), - str(round(self._final_prediction, digits)).ljust(cumulwidth)])) + ['Final prediction'.ljust(fwidth + contwidth), + str(round(self._final_prediction, digits)).ljust(cumulwidth)])) print(' = '.join(["Baseline", str(round(self._baseline, digits))])) - def visualize(self, figsize=(7,6), filename=None, dpi=90,fontsize=14): + def visualize(self, figsize=(7, 6), filename=None, dpi=90, fontsize=14): """ Get user friendly visualization of explanation @@ -86,86 +89,86 @@ def visualize(self, figsize=(7,6), filename=None, dpi=90,fontsize=14): fig = plt.figure(figsize=figsize) ax = plt.axes() - positions = list(range(len(self._attributes)+2)) + positions = list(range(len(self._attributes) + 2)) previous_value = self._baseline for (attr_info, position) in zip(self._attributes, positions[1:]): - cumulative = attr_info.cumulative+self._baseline - height=1 + cumulative = attr_info.cumulative + self._baseline + height = 1 left = previous_value if attr_info.contribution > 0 else cumulative width = abs(attr_info.contribution) color = "blue" if attr_info.contribution > 0 else "orange" rect = patches.Rectangle( - xy=(left, position-0.5),width=width,height=height,alpha=0.8,color=color) + xy=(left, position - 0.5), width=width, height=height, alpha=0.8, color=color) ax.add_patch(rect) plt.errorbar(x=left, y=position, yerr=0.5, color="black") - plt.errorbar(x=left+width, y=position, yerr=0.5, color="black") - plt.text(left+width+0.15, y=position-0.2, size=fontsize, - s = self._get_prefix(attr_info.contribution) + str(round(attr_info.contribution,2))) + plt.errorbar(x=left + width, y=position, yerr=0.5, color="black") + plt.text(left + width + 0.15, y=position - 0.2, size=fontsize, + s=self._get_prefix(attr_info.contribution) + str(round(attr_info.contribution, 2))) previous_value = cumulative - - #add final prediction bar + + # add final prediction bar rectf = patches.Rectangle( - xy=(self._baseline,positions[len(positions)-1]-0.5), - width=self._final_prediction, + xy=(self._baseline, positions[len(positions) - 1] - 0.5), + width=self._final_prediction, height=1, color="grey", alpha=0.8 ) ax.add_patch(rectf) - ax.axvline(x=self._baseline,mew=3,color="black",alpha=1) - plt.errorbar(x=self._baseline, y=len(positions)-1, yerr=0.5, color="black") - plt.errorbar(x=self._baseline+self._final_prediction, y=len(positions)-1, yerr=0.5, color="black") + ax.axvline(x=self._baseline, mew=3, color="black", alpha=1) + plt.errorbar(x=self._baseline, y=len(positions) - 1, yerr=0.5, color="black") + plt.errorbar(x=self._baseline + self._final_prediction, y=len(positions) - 1, yerr=0.5, color="black") plt.text( - x=self._baseline+self._final_prediction+0.15, - y=positions[len(positions)-1]-0.2, - s=str(round(self._final_prediction+self._baseline,2)),size=fontsize,weight="bold") + x=self._baseline + self._final_prediction + 0.15, + y=positions[len(positions) - 1] - 0.2, + s=str(round(self._final_prediction + self._baseline, 2)), size=fontsize, weight="bold") ax.set_yticks(positions[1:]) - ax.grid(color="gray",alpha=0.5) - sign = "+" if self._direction==ExplainerDirection.Up else "-" - labels=[sign + "=".join([attr.name,str(attr.value)]) for attr in self._attributes]+["Final Prognosis"] - ax.set_yticklabels(labels,size=fontsize) - + ax.grid(color="gray", alpha=0.5) + sign = "+" if self._direction == ExplainerDirection.Up else "-" + labels = [sign + "=".join([attr.name, str(attr.value)]) for attr in self._attributes] + ["Final Prognosis"] + ax.set_yticklabels(labels, size=fontsize) + all_cumulative = [attr.cumulative for attr in self._attributes] leftbound = min([min(all_cumulative), 0]) + self._baseline - rightbound= max(max(all_cumulative)+self._baseline,self._baseline) - plt.text(x=self._baseline+0.15, y=positions[0]-0.2, s="Baseline = "+str(round(self._baseline,2)), - size=fontsize,color="red") + rightbound = max(max(all_cumulative) + self._baseline, self._baseline) + plt.text(x=self._baseline + 0.15, y=positions[0] - 0.2, s="Baseline = " + str(round(self._baseline, 2)), + size=fontsize, color="red") - ax.set_xlim(leftbound-1, rightbound+1) - ax.set_ylim(-1,len(self._attributes)+2) + ax.set_xlim(leftbound - 1, rightbound + 1) + ax.set_ylim(-1, len(self._attributes) + 2) ax.spines['right'].set_visible(False) ax.spines['top'].set_visible(False) - approach = "\"up\"" if self._direction==ExplainerDirection.Up else "\"down\"" - plt.title("Prediction explanation for "+approach+" approach") + approach = "\"up\"" if self._direction == ExplainerDirection.Up else "\"down\"" + plt.title("Prediction explanation for " + approach + " approach") - #fig.tight_layout(pad=0, w_pad=0, h_pad=0.0) - #fig.subplots_adjust(hspace=0, wspace=0.1) + # fig.tight_layout(pad=0, w_pad=0, h_pad=0.0) + # fig.subplots_adjust(hspace=0, wspace=0.1) if filename is None: plt.show() else: - fig.savefig(filename,dpi=dpi) + fig.savefig(filename, dpi=dpi) - def add_intercept (self, intercept_contribution): + def add_intercept(self, intercept_contribution): self._attributes.appendleft(AttrInfo( - name=self._INTERCEPT_NAME, - value=self._INTERCEPT_VALUE, - contribution=intercept_contribution, + name=self._INTERCEPT_NAME, + value=self._INTERCEPT_VALUE, + contribution=intercept_contribution, cumulative=0) - ) + ) self._correct_cumulatives() self._has_intercept = True - def make_final_prediction (self): + def make_final_prediction(self): self._final_prediction = sum(attr.contribution for attr in self._attributes) self._has_final_prognosis = True - def add_baseline (self, baseline): + def add_baseline(self, baseline): self._baseline = baseline def _correct_cumulatives(self): csum = 0 for attribute in self._attributes: - csum+=attribute.contribution + csum += attribute.contribution attribute.cumulative = csum def _get_prefix(self, val): - return "+" if val>=0 else "" \ No newline at end of file + return "+" if val >= 0 else "" diff --git a/setup.py b/setup.py index f66717a..6754e51 100644 --- a/setup.py +++ b/setup.py @@ -7,13 +7,14 @@ author='Jakub Bondyra', author_email='jb10193@gmail.com', license='GPL-2', - packages= find_packages(exclude=['tests']), + packages=find_packages(exclude=['tests']), install_requires=[ - 'numpy==1.14.2', - 'scikit-learn==0.19.1', - 'scipy==1.0.0', - 'blist==1.3.6', - 'sphinx-bootstrap-theme==0.6.5', - 'matplotlib==2.1.2', - 'recordclass==0.5']) - + 'numpy==1.14.2', + 'scikit-learn==0.19.1', + 'scipy==1.0.0', + 'blist==1.3.6', + 'sphinx-bootstrap-theme==0.6.5', + 'matplotlib==2.1.2', + 'recordclass==0.5', + 'tqdm' + ]) diff --git a/tests/test_classification_and_regression.py b/tests/test_classification_and_regression.py new file mode 100644 index 0000000..f357348 --- /dev/null +++ b/tests/test_classification_and_regression.py @@ -0,0 +1,75 @@ +import unittest +from pyBreakDown.explainer import Explainer +from sklearn import model_selection +import numpy as np +from sklearn import tree +from sklearn import linear_model + + +class Testbreakdown(unittest.TestCase): + def setUp(self): + np.random.seed(11231231) # Must not be changed as test 2 often returns a 0 prediction which breaks percent_diff + data = np.random.rand(10, 4) + target = np.reshape([1, 0, 1, 0, 1, 0, 1, 1, 1, 0], (10, 1)) + self.train, self.test, self.labels_train, self.labels_test = model_selection.train_test_split( + data, target, train_size=0.80) + + def test_explainer_returns_sensible_output_for_classifier_up(self): + # arrange + dtree = tree.DecisionTreeClassifier() + dtree.fit(self.train, self.labels_train) + expected_proba = 1.625 + + # act + bd_exp = Explainer(clf=dtree, data=self.train, colnames=['foo', 'bar', 'baz', 'qux']) + explanation = bd_exp.explain(self.test[0], direction="up", useIntercept=False, baseline=0) + percentage_diff = abs(explanation._attributes[-1].cumulative / expected_proba - 1) + + # assert + self.assertLessEqual(percentage_diff, 0.2) + self.assertEqual(len(explanation._attributes)-1, np.shape(self.train)[1]) + + def test_explainer_returns_sensible_output_for_classifier_down(self): + # arrange + dtree = tree.DecisionTreeClassifier() + dtree.fit(self.train, self.labels_train) + expected_proba = 1 + + # act + bd_exp = Explainer(clf=dtree, data=self.train, colnames=['foo', 'bar', 'baz', 'qux']) + explanation = bd_exp.explain(self.test[0], direction="down", useIntercept=False, baseline=0) + percentage_diff = abs(explanation._attributes[-1].cumulative / expected_proba - 1) + + # assert + self.assertLessEqual(percentage_diff, 0.2) + self.assertEqual(len(explanation._attributes)-1, np.shape(self.train)[1]) + + def test_explainer_returns_sensible_output_for_regressor_up(self): + # arrange + lregressor = linear_model.LinearRegression() + lregressor.fit(self.train, self.labels_train) + expected_val = - 1.6407540804833103 + + # act + bd_exp = Explainer(clf=lregressor, data=self.train, colnames=['foo', 'bar', 'baz', 'qux']) + explanation = bd_exp.explain(self.test[0], direction="up", useIntercept=False, baseline=0) + percentage_diff = abs(explanation._attributes[-1].cumulative / expected_val -1) + + # assert + self.assertLessEqual(percentage_diff, 0.2) + self.assertEqual(len(explanation._attributes)-1, np.shape(self.train)[1]) + + def test_explainer_returns_sensible_output_for_regressor_down(self): + # arrange + lregressor = linear_model.LinearRegression() + lregressor.fit(self.train, self.labels_train) + expected_val = - 0.576140993143524 + + # act + bd_exp = Explainer(clf=lregressor, data=self.train, colnames=['foo', 'bar', 'baz', 'qux']) + explanation = bd_exp.explain(self.test[0], direction="down", useIntercept=False, baseline=0) + percentage_diff = abs(explanation._attributes[-1].cumulative / expected_val -1) + + # assert + self.assertLessEqual(percentage_diff, 0.2) + self.assertEqual(len(explanation._attributes)-1, np.shape(self.train)[1])