diff --git a/machine_learning/xgboost_regressor.py b/machine_learning/xgboost_regressor.py index ffe76132c525..6cb862a55fcb 100644 --- a/machine_learning/xgboost_regressor.py +++ b/machine_learning/xgboost_regressor.py @@ -1,29 +1,27 @@ import numpy as np import pandas as pd -from collections import defaultdict -import math -class XGBoostModel(): - ''' XGBoost regressor. +class XGBoostRegressor: + '''Custom implementation of XGBoost regressor. This implementation includes a simplified version of the XGBoost algorithm - for regression tasks. It includes gradient boosting with decision trees as base learners. + for regression tasks. It employs gradient boosting with decision trees as base learners. ''' def __init__(self, params=None, random_seed=None): - '''Initialize XGBoostModel. + '''Initialize XGBoostRegressor. Parameters: params (dict): Hyperparameters for the XGBoost model. random_seed (int): Seed for random number generation. ''' # Set hyperparameters with defaults - self.params = defaultdict(lambda: None, params) - self.subsample = self.params['subsample'] or 1.0 - self.learning_rate = self.params['learning_rate'] or 0.3 - self.base_prediction = self.params['base_score'] or 0.5 - self.max_depth = self.params['max_depth'] or 5 - self.rng = np.random.default_rng(seed=random_seed) + self.params = params or {} + self.subsample = self.params.get('subsample', 1.0) + self.learning_rate = self.params.get('learning_rate', 0.3) + self.base_prediction = self.params.get('base_score', 0.5) + self.max_depth = self.params.get('max_depth', 5) + self.random_seed = random_seed self.boosters = [] def fit(self, X, y, objective, num_boost_round, verbose=False): @@ -37,16 +35,16 @@ def fit(self, X, y, objective, num_boost_round, verbose=False): verbose (bool): Whether to print training progress. ''' # Initialize predictions with base score - current_predictions = self.base_prediction * np.ones(shape=len(y)) + current_predictions = np.full_like(y, self.base_prediction) for i in range(num_boost_round): # Compute negative gradient and hessian gradients = objective.gradient(y, current_predictions) hessians = objective.hessian(y, current_predictions) # Apply subsampling if required - sample_idxs = None if self.subsample == 1.0 else self.rng.choice( - len(y), size=math.floor(self.subsample*len(y)), replace=False) - booster = TreeBooster(X, gradients, hessians, self.params, - self.max_depth, sample_idxs) + if self.subsample < 1.0: + sample_idxs = np.random.choice(len(y), size=int(self.subsample * len(y)), replace=False) + gradients, hessians = gradients[sample_idxs], hessians[sample_idxs] + booster = TreeBooster(X, gradients, hessians, self.params, self.max_depth, self.random_seed) # Update predictions using learning rate and booster predictions current_predictions += self.learning_rate * booster.predict(X) self.boosters.append(booster) @@ -67,10 +65,10 @@ def predict(self, X): np.sum([booster.predict(X) for booster in self.boosters], axis=0)) -class TreeBooster(): +class TreeBooster: '''Decision tree booster for XGBoost regressor.''' - def __init__(self, X, g, h, params, max_depth, idxs=None): + def __init__(self, X, g, h, params, max_depth, random_seed=None): '''Initialize a decision tree booster. Parameters: @@ -79,26 +77,26 @@ def __init__(self, X, g, h, params, max_depth, idxs=None): h (np.ndarray): Hessian values. params (dict): Hyperparameters for the booster. max_depth (int): Maximum depth of the tree. - idxs (np.ndarray): Indices of the samples used in this booster. + random_seed (int): Seed for random number generation. ''' # Set hyperparameters self.params = params self.max_depth = max_depth assert self.max_depth >= 0, 'max_depth must be nonnegative' - self.min_child_weight = params['min_child_weight'] or 1.0 - self.reg_lambda = params['reg_lambda'] or 1.0 - self.gamma = params['gamma'] or 0.0 - self.colsample_bynode = params['colsample_bynode'] or 1.0 + self.min_child_weight = params.get('min_child_weight', 1.0) + self.reg_lambda = params.get('reg_lambda', 1.0) + self.gamma = params.get('gamma', 0.0) + self.colsample_bynode = params.get('colsample_bynode', 1.0) + self.random_seed = random_seed + np.random.seed(self.random_seed) # Set data and indices - if isinstance(g, pd.Series): g = g.values - if isinstance(h, pd.Series): h = h.values - if idxs is None: idxs = np.arange(len(g)) - self.X, self.g, self.h, self.idxs = X, g, h, idxs - self.n, self.c = len(idxs), X.shape[1] + self.X, self.g, self.h = X.values, g, h + self.n, self.c = X.shape[0], X.shape[1] + self.idxs = np.arange(self.n) # Initialize node value - self.value = -g[idxs].sum() / (h[idxs].sum() + self.reg_lambda) + self.value = -np.sum(g[self.idxs]) / (np.sum(h[self.idxs]) + self.reg_lambda) self.best_score_so_far = 0. # Recursively build the tree @@ -117,26 +115,25 @@ def _maybe_insert_child_nodes(self): if self.is_leaf: return # Split the data based on the best feature and threshold - x = self.X.values[self.idxs, self.split_feature_idx] + x = self.X[self.idxs, self.split_feature_idx] left_idx = np.nonzero(x <= self.threshold)[0] right_idx = np.nonzero(x > self.threshold)[0] # Recur for left and right subtrees - self.left = TreeBooster(self.X, self.g, self.h, self.params, - self.max_depth - 1, self.idxs[left_idx]) - self.right = TreeBooster(self.X, self.g, self.h, self.params, - self.max_depth - 1, self.idxs[right_idx]) + self.left = TreeBooster(self.X[left_idx], self.g[left_idx], self.h[left_idx], self.params, + self.max_depth - 1, self.random_seed) + self.right = TreeBooster(self.X[right_idx], self.g[right_idx], self.h[right_idx], self.params, + self.max_depth - 1, self.random_seed) def _find_better_split(self, feature_idx): '''Find the best split for a feature.''' - x = self.X.values[self.idxs, feature_idx] - g, h = self.g[self.idxs], self.h[self.idxs] + x = self.X[self.idxs, feature_idx] sort_idx = np.argsort(x) - sort_g, sort_h, sort_x = g[sort_idx], h[sort_idx], x[sort_idx] - sum_g, sum_h = g.sum(), h.sum() + sort_g, sort_h, sort_x = self.g[self.idxs][sort_idx], self.h[self.idxs][sort_idx], x[sort_idx] + sum_g, sum_h = np.sum(sort_g), np.sum(sort_h) sum_g_right, sum_h_right = sum_g, sum_h sum_g_left, sum_h_left = 0., 0. - for i in range(0, self.n - 1): + for i in range(self.n - 1): g_i, h_i, x_i, x_i_next = sort_g[i], sort_h[i], sort_x[i], sort_x[i + 1] sum_g_left += g_i sum_g_right -= g_i @@ -150,7 +147,7 @@ def _find_better_split(self, feature_idx): gain = 0.5 * ((sum_g_left**2 / (sum_h_left + self.reg_lambda)) + (sum_g_right**2 / (sum_h_right + self.reg_lambda)) - (sum_g**2 / (sum_h + self.reg_lambda)) - ) - self.gamma/2 # Eq(7) in the xgboost paper + ) - self.gamma/2 # Eq(7) in the xgboost paper if gain > self.best_score_so_far: self.split_feature_idx = feature_idx self.best_score_so_far = gain @@ -158,12 +155,11 @@ def _find_better_split(self, feature_idx): def predict(self, X): '''Make predictions using the trained booster.''' - return np.array([self._predict_row(row) for _, row in X.iterrows()]) + return np.array([self._predict_row(row) for row in X]) def _predict_row(self, row): '''Recursively predict a single data point.''' if self.is_leaf: return self.value - child = self.left if row[self.split_feature_idx] <= self.threshold \ - else self.right + child = self.left if row[self.split_feature_idx] <= self.threshold else self.right return child._predict_row(row)