Skip to content

Commit

Permalink
issues TheAlgorithms#8067 Implemented XGBoostRegressor from scratch
Browse files Browse the repository at this point in the history
  • Loading branch information
YugantGotmare committed Oct 5, 2023
1 parent 3b082d4 commit 8b4342d
Showing 1 changed file with 40 additions and 44 deletions.
84 changes: 40 additions & 44 deletions machine_learning/xgboost_regressor.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
import numpy as np
import pandas as pd
from collections import defaultdict
import math

class XGBoostModel():
''' XGBoost regressor.
class XGBoostRegressor:
'''Custom implementation of XGBoost regressor.
This implementation includes a simplified version of the XGBoost algorithm
for regression tasks. It includes gradient boosting with decision trees as base learners.
for regression tasks. It employs gradient boosting with decision trees as base learners.
'''

def __init__(self, params=None, random_seed=None):
'''Initialize XGBoostModel.
'''Initialize XGBoostRegressor.
Parameters:
params (dict): Hyperparameters for the XGBoost model.
random_seed (int): Seed for random number generation.
'''
# Set hyperparameters with defaults
self.params = defaultdict(lambda: None, params)
self.subsample = self.params['subsample'] or 1.0
self.learning_rate = self.params['learning_rate'] or 0.3
self.base_prediction = self.params['base_score'] or 0.5
self.max_depth = self.params['max_depth'] or 5
self.rng = np.random.default_rng(seed=random_seed)
self.params = params or {}
self.subsample = self.params.get('subsample', 1.0)
self.learning_rate = self.params.get('learning_rate', 0.3)
self.base_prediction = self.params.get('base_score', 0.5)
self.max_depth = self.params.get('max_depth', 5)
self.random_seed = random_seed
self.boosters = []

def fit(self, X, y, objective, num_boost_round, verbose=False):
Expand All @@ -37,16 +35,16 @@ def fit(self, X, y, objective, num_boost_round, verbose=False):
verbose (bool): Whether to print training progress.
'''
# Initialize predictions with base score
current_predictions = self.base_prediction * np.ones(shape=len(y))
current_predictions = np.full_like(y, self.base_prediction)
for i in range(num_boost_round):
# Compute negative gradient and hessian
gradients = objective.gradient(y, current_predictions)
hessians = objective.hessian(y, current_predictions)
# Apply subsampling if required
sample_idxs = None if self.subsample == 1.0 else self.rng.choice(
len(y), size=math.floor(self.subsample*len(y)), replace=False)
booster = TreeBooster(X, gradients, hessians, self.params,
self.max_depth, sample_idxs)
if self.subsample < 1.0:
sample_idxs = np.random.choice(len(y), size=int(self.subsample * len(y)), replace=False)
gradients, hessians = gradients[sample_idxs], hessians[sample_idxs]
booster = TreeBooster(X, gradients, hessians, self.params, self.max_depth, self.random_seed)
# Update predictions using learning rate and booster predictions
current_predictions += self.learning_rate * booster.predict(X)
self.boosters.append(booster)
Expand All @@ -67,10 +65,10 @@ def predict(self, X):
np.sum([booster.predict(X) for booster in self.boosters], axis=0))


class TreeBooster():
class TreeBooster:
'''Decision tree booster for XGBoost regressor.'''

def __init__(self, X, g, h, params, max_depth, idxs=None):
def __init__(self, X, g, h, params, max_depth, random_seed=None):
'''Initialize a decision tree booster.
Parameters:
Expand All @@ -79,26 +77,26 @@ def __init__(self, X, g, h, params, max_depth, idxs=None):
h (np.ndarray): Hessian values.
params (dict): Hyperparameters for the booster.
max_depth (int): Maximum depth of the tree.
idxs (np.ndarray): Indices of the samples used in this booster.
random_seed (int): Seed for random number generation.
'''
# Set hyperparameters
self.params = params
self.max_depth = max_depth
assert self.max_depth >= 0, 'max_depth must be nonnegative'
self.min_child_weight = params['min_child_weight'] or 1.0
self.reg_lambda = params['reg_lambda'] or 1.0
self.gamma = params['gamma'] or 0.0
self.colsample_bynode = params['colsample_bynode'] or 1.0
self.min_child_weight = params.get('min_child_weight', 1.0)
self.reg_lambda = params.get('reg_lambda', 1.0)
self.gamma = params.get('gamma', 0.0)
self.colsample_bynode = params.get('colsample_bynode', 1.0)
self.random_seed = random_seed
np.random.seed(self.random_seed)

# Set data and indices
if isinstance(g, pd.Series): g = g.values
if isinstance(h, pd.Series): h = h.values
if idxs is None: idxs = np.arange(len(g))
self.X, self.g, self.h, self.idxs = X, g, h, idxs
self.n, self.c = len(idxs), X.shape[1]
self.X, self.g, self.h = X.values, g, h
self.n, self.c = X.shape[0], X.shape[1]
self.idxs = np.arange(self.n)

# Initialize node value
self.value = -g[idxs].sum() / (h[idxs].sum() + self.reg_lambda)
self.value = -np.sum(g[self.idxs]) / (np.sum(h[self.idxs]) + self.reg_lambda)
self.best_score_so_far = 0.

# Recursively build the tree
Expand All @@ -117,26 +115,25 @@ def _maybe_insert_child_nodes(self):
if self.is_leaf:
return
# Split the data based on the best feature and threshold
x = self.X.values[self.idxs, self.split_feature_idx]
x = self.X[self.idxs, self.split_feature_idx]
left_idx = np.nonzero(x <= self.threshold)[0]
right_idx = np.nonzero(x > self.threshold)[0]
# Recur for left and right subtrees
self.left = TreeBooster(self.X, self.g, self.h, self.params,
self.max_depth - 1, self.idxs[left_idx])
self.right = TreeBooster(self.X, self.g, self.h, self.params,
self.max_depth - 1, self.idxs[right_idx])
self.left = TreeBooster(self.X[left_idx], self.g[left_idx], self.h[left_idx], self.params,
self.max_depth - 1, self.random_seed)
self.right = TreeBooster(self.X[right_idx], self.g[right_idx], self.h[right_idx], self.params,
self.max_depth - 1, self.random_seed)

def _find_better_split(self, feature_idx):
'''Find the best split for a feature.'''
x = self.X.values[self.idxs, feature_idx]
g, h = self.g[self.idxs], self.h[self.idxs]
x = self.X[self.idxs, feature_idx]
sort_idx = np.argsort(x)
sort_g, sort_h, sort_x = g[sort_idx], h[sort_idx], x[sort_idx]
sum_g, sum_h = g.sum(), h.sum()
sort_g, sort_h, sort_x = self.g[self.idxs][sort_idx], self.h[self.idxs][sort_idx], x[sort_idx]
sum_g, sum_h = np.sum(sort_g), np.sum(sort_h)
sum_g_right, sum_h_right = sum_g, sum_h
sum_g_left, sum_h_left = 0., 0.

for i in range(0, self.n - 1):
for i in range(self.n - 1):
g_i, h_i, x_i, x_i_next = sort_g[i], sort_h[i], sort_x[i], sort_x[i + 1]
sum_g_left += g_i
sum_g_right -= g_i
Expand All @@ -150,20 +147,19 @@ def _find_better_split(self, feature_idx):
gain = 0.5 * ((sum_g_left**2 / (sum_h_left + self.reg_lambda))
+ (sum_g_right**2 / (sum_h_right + self.reg_lambda))
- (sum_g**2 / (sum_h + self.reg_lambda))
) - self.gamma/2 # Eq(7) in the xgboost paper
) - self.gamma/2 # Eq(7) in the xgboost paper
if gain > self.best_score_so_far:
self.split_feature_idx = feature_idx
self.best_score_so_far = gain
self.threshold = (x_i + x_i_next) / 2

def predict(self, X):
'''Make predictions using the trained booster.'''
return np.array([self._predict_row(row) for _, row in X.iterrows()])
return np.array([self._predict_row(row) for row in X])

def _predict_row(self, row):
'''Recursively predict a single data point.'''
if self.is_leaf:
return self.value
child = self.left if row[self.split_feature_idx] <= self.threshold \
else self.right
child = self.left if row[self.split_feature_idx] <= self.threshold else self.right
return child._predict_row(row)

0 comments on commit 8b4342d

Please sign in to comment.