Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yugant first contribution #9780

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 158 additions & 56 deletions machine_learning/xgboost_regressor.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,168 @@
# XGBoost Regressor Example
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
import pandas as pd

Check failure on line 2 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

machine_learning/xgboost_regressor.py:2:18: F401 `pandas` imported but unused

class XGBoostRegressor():

Check failure on line 4 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

machine_learning/xgboost_regressor.py:1:1: I001 Import block is un-sorted or un-formatted

Check failure on line 4 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP039)

machine_learning/xgboost_regressor.py:4:23: UP039 Unnecessary parentheses after class definition
'''Implementation of XGBoost regressor.

Check failure on line 6 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (W293)

machine_learning/xgboost_regressor.py:6:1: W293 Blank line contains whitespace
This implementation includes a simplified version of the XGBoost algorithm
for regression tasks. It includes gradient boosting with decision trees as base learners.

Check failure on line 8 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

machine_learning/xgboost_regressor.py:8:89: E501 Line too long (93 > 88 characters)
'''

Check failure on line 10 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (W293)

machine_learning/xgboost_regressor.py:10:1: W293 Blank line contains whitespace
def __init__(self, params=None, random_seed=None):
'''Initialize XGBoostRegressor.

Check failure on line 13 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (W293)

machine_learning/xgboost_regressor.py:13:1: W293 Blank line contains whitespace
Parameters:
params (dict): Hyperparameters for the XGBoost model.
random_seed (int): Seed for random number generation.
'''
# Set hyperparameters with defaults
self.params = defaultdict(lambda: None, params)

Check failure on line 19 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F821)

machine_learning/xgboost_regressor.py:19:23: F821 Undefined name `defaultdict`
self.subsample = self.params['subsample'] or 1.0
self.learning_rate = self.params['learning_rate'] or 0.3
self.base_prediction = self.params['base_score'] or 0.5
self.max_depth = self.params['max_depth'] or 5
self.random_seed = random_seed
self.boosters = []

Check failure on line 26 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (W293)

machine_learning/xgboost_regressor.py:26:1: W293 Blank line contains whitespace
def fit(self, X, y, objective, num_boost_round, verbose=False):

Check failure on line 27 in machine_learning/xgboost_regressor.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (N803)

machine_learning/xgboost_regressor.py:27:19: N803 Argument name `X` should be lowercase
'''Train the XGBoost model.

Parameters:
X (pd.DataFrame): Feature matrix.
y (pd.Series): Target values.
objective (ObjectiveFunction): Objective function for regression.
num_boost_round (int): Number of boosting rounds.
verbose (bool): Whether to print training progress.
'''
# Initialize predictions with base score
current_predictions = np.full_like(y, self.base_prediction)
for i in range(num_boost_round):
# Compute negative gradient and hessian
gradients = objective.gradient(y, current_predictions)
hessians = objective.hessian(y, current_predictions)
# Apply subsampling if required
if self.subsample < 1.0:
sample_idxs = np.random.choice(len(y), size=int(self.subsample * len(y)), replace=False)
gradients, hessians = gradients[sample_idxs], hessians[sample_idxs]
booster = DecisionTreeBooster(X, gradients, hessians, self.params, self.max_depth, self.random_seed)
# Update predictions using learning rate and booster predictions
current_predictions += self.learning_rate * booster.predict(X)
self.boosters.append(booster)
if verbose:
print(f'[{i}] train loss = {objective.loss(y, current_predictions)}')

def predict(self, X):
'''Make predictions using the trained model.

Parameters:
X (pd.DataFrame): Feature matrix for prediction.

Returns:
np.ndarray: Predicted values.
'''
# Calculate predictions using all boosters
return (self.base_prediction + self.learning_rate *
np.sum([booster.predict(X) for booster in self.boosters], axis=0))

def data_handling(data: dict) -> tuple:
# Split dataset into features and target. Data is features.
"""
>>> data_handling((
... {'data':'[ 8.3252 41. 6.9841269 1.02380952 322. 2.55555556 37.88 -122.23 ]'
... ,'target':([4.526])}))
('[ 8.3252 41. 6.9841269 1.02380952 322. 2.55555556 37.88 -122.23 ]', [4.526])
"""
return (data["data"], data["target"])

class DecisionTreeBooster:
'''Decision tree booster for XGBoost regressor.'''

def __init__(self, X, g, h, params, max_depth, random_seed=None):
'''Initialize a decision tree booster.

Parameters:
X (np.ndarray): Feature matrix.
g (np.ndarray): Gradient values.
h (np.ndarray): Hessian values.
params (dict): Hyperparameters for the booster.
max_depth (int): Maximum depth of the tree.
random_seed (int): Seed for random number generation.
'''
# Set hyperparameters
self.params = params
self.max_depth = max_depth
assert self.max_depth >= 0, 'max_depth must be nonnegative'
self.min_child_weight = params.get('min_child_weight', 1.0)
self.reg_lambda = params.get('reg_lambda', 1.0)
self.gamma = params.get('gamma', 0.0)
self.colsample_bynode = params.get('colsample_bynode', 1.0)
self.random_seed = random_seed
np.random.seed(self.random_seed)

# Set data and indices
self.X, self.g, self.h = X, g, h
self.n, self.c = X.shape[0], X.shape[1]
self.idxs = np.arange(self.n)

# Initialize node value
self.value = -np.sum(g[self.idxs]) / (np.sum(h[self.idxs]) + self.reg_lambda)
self.best_score_so_far = 0.

# Recursively build the tree
if self.max_depth > 0:
self._maybe_insert_child_nodes()

def xgboost(
features: np.ndarray, target: np.ndarray, test_features: np.ndarray
) -> np.ndarray:
"""
>>> xgboost(np.array([[ 2.3571 , 52. , 6.00813008, 1.06775068,
... 907. , 2.45799458, 40.58 , -124.26]]),np.array([1.114]),
... np.array([[1.97840000e+00, 3.70000000e+01, 4.98858447e+00, 1.03881279e+00,
... 1.14300000e+03, 2.60958904e+00, 3.67800000e+01, -1.19780000e+02]]))
array([[1.1139996]], dtype=float32)
"""
xgb = XGBRegressor(
verbosity=0, random_state=42, tree_method="exact", base_score=0.5
)
xgb.fit(features, target)
# Predict target for test data
predictions = xgb.predict(test_features)
predictions = predictions.reshape(len(predictions), 1)
return predictions

@property
def is_leaf(self):
'''Check if the node is a leaf.'''
return self.best_score_so_far == 0.

def _maybe_insert_child_nodes(self):
'''Recursively insert child nodes to build the tree.'''
for i in range(self.c):
self._find_better_split(i)
if self.is_leaf:
return
# Split the data based on the best feature and threshold
x = self.X.values[self.idxs, self.split_feature_idx]
left_idx = np.nonzero(x <= self.threshold)[0]
right_idx = np.nonzero(x > self.threshold)[0]
# Recur for left and right subtrees
self.left = DecisionTreeBooster(self.X, self.g, self.h, self.params,
self.max_depth - 1, self.idxs[left_idx])
self.right = DecisionTreeBooster(self.X, self.g, self.h, self.params,
self.max_depth - 1, self.idxs[right_idx])

def main() -> None:
"""
>>> main()
Mean Absolute Error : 0.30957163379906033
Mean Square Error : 0.22611560196662744
def _find_better_split(self, feature_idx):
'''Find the best split for a feature.'''
x = self.X.values[self.idxs, feature_idx]
g, h = self.g[self.idxs], self.h[self.idxs]
sort_idx = np.argsort(x)
sort_g, sort_h, sort_x = g[sort_idx], h[sort_idx], x[sort_idx]
sum_g, sum_h = g.sum(), h.sum()
sum_g_right, sum_h_right = sum_g, sum_h
sum_g_left, sum_h_left = 0., 0.

The URL for this algorithm
https://xgboost.readthedocs.io/en/stable/
California house price dataset is used to demonstrate the algorithm.
"""
# Load California house price dataset
california = fetch_california_housing()
data, target = data_handling(california)
x_train, x_test, y_train, y_test = train_test_split(
data, target, test_size=0.25, random_state=1
)
predictions = xgboost(x_train, y_train, x_test)
# Error printing
print(f"Mean Absolute Error : {mean_absolute_error(y_test, predictions)}")
print(f"Mean Square Error : {mean_squared_error(y_test, predictions)}")
for i in range(self.n - 1):
g_i, h_i, x_i, x_i_next = sort_g[i], sort_h[i], sort_x[i], sort_x[i + 1]
sum_g_left += g_i
sum_g_right -= g_i
sum_h_left += h_i
sum_h_right -= h_i
if sum_h_left < self.min_child_weight or x_i == x_i_next:
continue
if sum_h_right < self.min_child_weight:
break

gain = 0.5 * ((sum_g_left**2 / (sum_h_left + self.reg_lambda))
+ (sum_g_right**2 / (sum_h_right + self.reg_lambda))
- (sum_g**2 / (sum_h + self.reg_lambda))
) - self.gamma/2 # Eq(7) in the xgboost paper
if gain > self.best_score_so_far:
self.split_feature_idx = feature_idx
self.best_score_so_far = gain
self.threshold = (x_i + x_i_next) / 2

def predict(self, X):
'''Make predictions using the trained booster.'''
return np.array([self._predict_row(row) for _, row in X.iterrows()])

if __name__ == "__main__":
import doctest

doctest.testmod(verbose=True)
main()
def _predict_row(self, row):
'''Recursively predict a single data point.'''
if self.is_leaf:
return self.value
child = self.left if row[self.split_feature_idx] <= self.threshold \
else self.right
return child._predict_row(row)
Loading