diff --git a/flaml/automl/__init__.py b/flaml/automl/__init__.py index b93cd5b14643..809f64f08e2b 100644 --- a/flaml/automl/__init__.py +++ b/flaml/automl/__init__.py @@ -1,3 +1,5 @@ -from flaml.automl.automl import AutoML, AutoMLState, SearchState, logger_formatter, size +from flaml.automl.automl import AutoML, size +from flaml.automl.logger import logger_formatter +from flaml.automl.state import SearchState, AutoMLState __all__ = ["AutoML", "AutoMLState", "SearchState", "logger_formatter", "size"] diff --git a/flaml/automl/automl.py b/flaml/automl/automl.py index 5d16ec9504cd..8bab84e0cd67 100644 --- a/flaml/automl/automl.py +++ b/flaml/automl/automl.py @@ -6,30 +6,20 @@ import time import os import sys -from typing import Callable, Optional, List, Union, Any +from typing import Callable, List, Union, Optional import inspect from functools import partial import numpy as np -from scipy.sparse import issparse -from sklearn.model_selection import ( - train_test_split, - RepeatedStratifiedKFold, - RepeatedKFold, - GroupKFold, - TimeSeriesSplit, - GroupShuffleSplit, - StratifiedGroupKFold, -) -from sklearn.utils import shuffle from sklearn.base import BaseEstimator import pandas as pd import logging import json + +from flaml.automl.state import SearchState, AutoMLState from flaml.automl.ml import ( compute_estimator, train_estimator, get_estimator_class, - get_classification_objective, ) from flaml.config import ( MIN_SAMPLE_TRAIN, @@ -41,29 +31,18 @@ N_SPLITS, SAMPLE_MULTIPLY_FACTOR, ) -from flaml.automl.data import ( - concat, - CLASSIFICATION, - TOKENCLASSIFICATION, - TS_FORECAST, - TS_FORECASTREGRESSION, - TS_FORECASTPANEL, - TS_TIMESTAMP_COL, - REGRESSION, - _is_nlp_task, - NLG_TASKS, -) +from flaml.automl.data import concat + +# TODO check to see when we can remove these +from flaml.automl.task.task import CLASSIFICATION, TS_FORECAST, Task +from flaml.automl.task.factory import task_factory from flaml import tune +from flaml.automl.logger import logger, logger_formatter from flaml.automl.training_log import training_log_reader, training_log_writer from flaml.default import suggest_learner from flaml.version import __version__ as flaml_version from flaml.tune.spark.utils import check_spark, get_broadcast_data -logger = logging.getLogger(__name__) -logger_formatter = logging.Formatter( - "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S" -) -logger.propagate = False try: import mlflow @@ -80,424 +59,6 @@ ray_available = False -class SearchState: - @property - def search_space(self): - return self._search_space_domain - - @property - def estimated_cost4improvement(self): - return max( - self.time_best_found - self.time_best_found_old, - self.total_time_used - self.time_best_found, - ) - - def valid_starting_point_one_dim(self, value_one_dim, domain_one_dim): - from flaml.tune.space import sample - - """ - For each hp in the starting point, check the following 3 conditions: - (1) If the type of the starting point does not match the required type in search space, return false - (2) If the starting point is not in the required search space, return false - (3) If the search space is a value instead of domain, and the value is not equal to the starting point - Notice (2) include the case starting point not in user specified search space custom_hp - """ - if isinstance(domain_one_dim, sample.Domain): - renamed_type = list( - inspect.signature(domain_one_dim.is_valid).parameters.values() - )[0].annotation - type_match = ( - renamed_type == Any - or isinstance(value_one_dim, renamed_type) - or isinstance(value_one_dim, int) - and renamed_type is float - ) - if not (type_match and domain_one_dim.is_valid(value_one_dim)): - return False - elif value_one_dim != domain_one_dim: - return False - return True - - def valid_starting_point(self, starting_point, search_space): - return all( - self.valid_starting_point_one_dim(value, search_space[name].get("domain")) - for name, value in starting_point.items() - if name != "FLAML_sample_size" - ) - - def __init__( - self, - learner_class, - data_size, - task, - starting_point=None, - period=None, - custom_hp=None, - max_iter=None, - budget=None, - ): - self.init_eci = learner_class.cost_relative2lgbm() if budget >= 0 else 1 - self._search_space_domain = {} - self.init_config = None - self.low_cost_partial_config = {} - self.cat_hp_cost = {} - self.data_size = data_size - self.ls_ever_converged = False - self.learner_class = learner_class - self._budget = budget - if task in TS_FORECAST: - search_space = learner_class.search_space( - data_size=data_size, task=task, pred_horizon=period - ) - else: - search_space = learner_class.search_space(data_size=data_size, task=task) - - if custom_hp is not None: - search_space.update(custom_hp) - - if isinstance(starting_point, dict): - starting_point = AutoMLState.sanitize(starting_point) - if max_iter > 1 and not self.valid_starting_point( - starting_point, search_space - ): - # If the number of iterations is larger than 1, remove invalid point - logger.warning( - "Starting point {} removed because it is outside of the search space".format( - starting_point - ) - ) - starting_point = None - elif isinstance(starting_point, list): - starting_point = [AutoMLState.sanitize(x) for x in starting_point] - if max_iter > len(starting_point): - # If the number of starting points is no smaller than max iter, avoid the checking - starting_point_len = len(starting_point) - starting_point = [ - x - for x in starting_point - if self.valid_starting_point(x, search_space) - ] - if starting_point_len > len(starting_point): - logger.warning( - "Starting points outside of the search space are removed. " - f"Remaining starting points for {learner_class}: {starting_point}" - ) - starting_point = starting_point or None - - for name, space in search_space.items(): - assert ( - "domain" in space - ), f"{name}'s domain is missing in the search space spec {space}" - if space["domain"] is None: - # don't search this hp - continue - self._search_space_domain[name] = space["domain"] - - if "low_cost_init_value" in space: - self.low_cost_partial_config[name] = space["low_cost_init_value"] - if "cat_hp_cost" in space: - self.cat_hp_cost[name] = space["cat_hp_cost"] - # if a starting point is provided, set the init config to be - # the starting point provided - if ( - isinstance(starting_point, dict) - and starting_point.get(name) is not None - ): - if self.init_config is None: - self.init_config = {} - self.init_config[name] = starting_point[name] - elif ( - not isinstance(starting_point, list) - and "init_value" in space - and self.valid_starting_point_one_dim( - space["init_value"], space["domain"] - ) - ): - if self.init_config is None: - self.init_config = {} - self.init_config[name] = space["init_value"] - - if isinstance(starting_point, list): - self.init_config = starting_point - else: - self.init_config = [] if self.init_config is None else [self.init_config] - - self._hp_names = list(self._search_space_domain.keys()) - self.search_alg = None - self.best_config = None - self.best_result = None - self.best_loss = self.best_loss_old = np.inf - self.total_time_used = 0 - self.total_iter = 0 - self.base_eci = None - self.time_best_found = self.time_best_found_old = 0 - self.time2eval_best = 0 - self.time2eval_best_old = 0 - self.trained_estimator = None - self.sample_size = None - self.trial_time = 0 - - def update(self, result, time_used): - if result: - config = result["config"] - if config and "FLAML_sample_size" in config: - self.sample_size = config["FLAML_sample_size"] - else: - self.sample_size = self.data_size[0] - obj = result["val_loss"] - metric_for_logging = result["metric_for_logging"] - time2eval = result["time_total_s"] - trained_estimator = result["trained_estimator"] - del result["trained_estimator"] # free up RAM - n_iter = ( - trained_estimator - and hasattr(trained_estimator, "ITER_HP") - and trained_estimator.params.get(trained_estimator.ITER_HP) - ) - if n_iter: - if "ml" in config: - config["ml"][trained_estimator.ITER_HP] = n_iter - else: - config[trained_estimator.ITER_HP] = n_iter - else: - obj, time2eval, trained_estimator = np.inf, 0.0, None - metric_for_logging = config = None - self.trial_time = time2eval - self.total_time_used += time_used if self._budget >= 0 else 1 - self.total_iter += 1 - - if self.base_eci is None: - self.base_eci = time_used - if (obj is not None) and (obj < self.best_loss): - self.best_loss_old = self.best_loss if self.best_loss < np.inf else 2 * obj - self.best_loss = obj - self.best_result = result - self.time_best_found_old = self.time_best_found - self.time_best_found = self.total_time_used - self.iter_best_found = self.total_iter - self.best_config = config - self.best_config_sample_size = self.sample_size - self.best_config_train_time = time_used - if time2eval: - self.time2eval_best_old = self.time2eval_best - self.time2eval_best = time2eval - if ( - self.trained_estimator - and trained_estimator - and self.trained_estimator != trained_estimator - ): - self.trained_estimator.cleanup() - if trained_estimator: - self.trained_estimator = trained_estimator - elif trained_estimator: - trained_estimator.cleanup() - self.metric_for_logging = metric_for_logging - self.val_loss, self.config = obj, config - - def get_hist_config_sig(self, sample_size, config): - config_values = tuple([config[k] for k in self._hp_names if k in config]) - config_sig = str(sample_size) + "_" + str(config_values) - return config_sig - - def est_retrain_time(self, retrain_sample_size): - assert ( - self.best_config_sample_size is not None - ), "need to first get best_config_sample_size" - return self.time2eval_best * retrain_sample_size / self.best_config_sample_size - - -class AutoMLState: - def _prepare_sample_train_data(self, sample_size: int): - sampled_weight = groups = None - if sample_size <= self.data_size[0]: - if isinstance(self.X_train, pd.DataFrame): - sampled_X_train = self.X_train.iloc[:sample_size] - else: - sampled_X_train = self.X_train[:sample_size] - if isinstance(self.y_train, pd.Series): - sampled_y_train = self.y_train.iloc[:sample_size] - else: - sampled_y_train = self.y_train[:sample_size] - weight = self.fit_kwargs.get( - "sample_weight" - ) # NOTE: _prepare_sample_train_data is before kwargs is updated to fit_kwargs_by_estimator - if weight is not None: - sampled_weight = ( - weight.iloc[:sample_size] - if isinstance(weight, pd.Series) - else weight[:sample_size] - ) - if self.groups is not None: - groups = ( - self.groups.iloc[:sample_size] - if isinstance(self.groups, pd.Series) - else self.groups[:sample_size] - ) - else: - sampled_X_train = self.X_train_all - sampled_y_train = self.y_train_all - if ( - "sample_weight" in self.fit_kwargs - ): # NOTE: _prepare_sample_train_data is before kwargs is updated to fit_kwargs_by_estimator - sampled_weight = self.sample_weight_all - if self.groups is not None: - groups = self.groups_all - return sampled_X_train, sampled_y_train, sampled_weight, groups - - @staticmethod - def _compute_with_config_base( - config_w_resource: dict, - state: AutoMLState, - estimator: str, - is_report: bool = True, - ) -> dict: - if "FLAML_sample_size" in config_w_resource: - sample_size = int(config_w_resource["FLAML_sample_size"]) - else: - sample_size = state.data_size[0] - - this_estimator_kwargs = state.fit_kwargs_by_estimator.get( - estimator - ).copy() # NOTE: _compute_with_config_base is after kwargs is updated to fit_kwargs_by_estimator - ( - sampled_X_train, - sampled_y_train, - sampled_weight, - groups, - ) = state._prepare_sample_train_data(sample_size) - if sampled_weight is not None: - weight = this_estimator_kwargs["sample_weight"] - this_estimator_kwargs["sample_weight"] = sampled_weight - if groups is not None: - this_estimator_kwargs["groups"] = groups - config = config_w_resource.copy() - if "FLAML_sample_size" in config: - del config["FLAML_sample_size"] - budget = ( - None - if state.time_budget < 0 - else state.time_budget - state.time_from_start - if sample_size == state.data_size[0] - else (state.time_budget - state.time_from_start) - / 2 - * sample_size - / state.data_size[0] - ) - - ( - trained_estimator, - val_loss, - metric_for_logging, - _, - pred_time, - ) = compute_estimator( - sampled_X_train, - sampled_y_train, - state.X_val, - state.y_val, - state.weight_val, - state.groups_val, - state.train_time_limit - if budget is None - else min(budget, state.train_time_limit or np.inf), - state.kf, - config, - state.task, - estimator, - state.eval_method, - state.metric, - state.best_loss, - state.n_jobs, - state.learner_classes.get(estimator), - state.cv_score_agg_func, - state.log_training_metric, - this_estimator_kwargs, - state.free_mem_ratio, - ) - if state.retrain_final and not state.model_history: - trained_estimator.cleanup() - - result = { - "pred_time": pred_time, - "wall_clock_time": time.time() - state._start_time_flag, - "metric_for_logging": metric_for_logging, - "val_loss": val_loss, - "trained_estimator": trained_estimator, - } - if sampled_weight is not None: - this_estimator_kwargs["sample_weight"] = weight - if is_report is True: - tune.report(**result) - return result - - @classmethod - def sanitize(cls, config: dict) -> dict: - """Make a config ready for passing to estimator.""" - config = config.get("ml", config).copy() - config.pop("FLAML_sample_size", None) - config.pop("learner", None) - config.pop("_choice_", None) - return config - - def _train_with_config( - self, - estimator: str, - config_w_resource: dict, - sample_size: Optional[int] = None, - ): - if not sample_size: - sample_size = config_w_resource.get( - "FLAML_sample_size", len(self.y_train_all) - ) - config = AutoMLState.sanitize(config_w_resource) - - this_estimator_kwargs = self.fit_kwargs_by_estimator.get( - estimator - ).copy() # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator - ( - sampled_X_train, - sampled_y_train, - sampled_weight, - groups, - ) = self._prepare_sample_train_data(sample_size) - if sampled_weight is not None: - weight = this_estimator_kwargs[ - "sample_weight" - ] # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator - this_estimator_kwargs[ - "sample_weight" - ] = sampled_weight # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator - if groups is not None: - this_estimator_kwargs[ - "groups" - ] = groups # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator - - budget = ( - None if self.time_budget < 0 else self.time_budget - self.time_from_start - ) - - estimator, train_time = train_estimator( - X_train=sampled_X_train, - y_train=sampled_y_train, - config_dic=config, - task=self.task, - estimator_name=estimator, - n_jobs=self.n_jobs, - estimator_class=self.learner_classes.get(estimator), - budget=budget, - fit_kwargs=this_estimator_kwargs, # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator - eval_metric=self.metric if hasattr(self, "metric") else "train_time", - free_mem_ratio=self.free_mem_ratio, - ) - - if sampled_weight is not None: - this_estimator_kwargs[ - "sample_weight" - ] = weight # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator - - return estimator, train_time - - def size(learner_classes: dict, config: dict) -> float: """Size function. @@ -579,7 +140,8 @@ def custom_metric( ``` task: A string of the task type, e.g., 'classification', 'regression', 'ts_forecast', 'rank', - 'seq-classification', 'seq-regression', 'summarization'. + 'seq-classification', 'seq-regression', 'summarization', + or an instance of the Task class. n_jobs: An integer of the number of threads for training | default=-1. Use all available resources when n_jobs == -1. log_file_name: A string of the log file name | default="". To disable logging, @@ -956,7 +518,7 @@ def score(self, X: pd.DataFrame, y: pd.Series, **kwargs): "No estimator is trained. Please run fit with enough budget." ) return None - X = self._preprocess(X) + X = self._state.task.preprocess(X, self._transformer) if self._label_transformer: y = self._label_transformer.transform(y) return estimator.score(X, y, **kwargs) @@ -999,7 +561,7 @@ def predict( "No estimator is trained. Please run fit with enough budget." ) return None - X = self._preprocess(X) + X = self._state.task.preprocess(X, self._transformer) y_pred = estimator.predict(X, **pred_kwargs) if ( isinstance(y_pred, np.ndarray) @@ -1033,560 +595,10 @@ def predict_proba(self, X, **pred_kwargs): "No estimator is trained. Please run fit with enough budget." ) return None - X = self._preprocess(X) + X = self._state.task.preprocess(X, self._transformer) proba = self._trained_estimator.predict_proba(X, **pred_kwargs) return proba - def _preprocess(self, X): - if isinstance(X, List): - try: - if isinstance(X[0], List): - X = [x for x in zip(*X)] - X = pd.DataFrame( - dict( - [ - (self._transformer._str_columns[idx], X[idx]) - if isinstance(X[0], List) - else (self._transformer._str_columns[idx], [X[idx]]) - for idx in range(len(X)) - ] - ) - ) - except IndexError: - raise IndexError( - "Test data contains more columns than training data, exiting" - ) - elif isinstance(X, int): - return X - elif issparse(X): - X = X.tocsr() - if self._state.task in TS_FORECAST: - X = pd.DataFrame(X) - if self._transformer: - X = self._transformer.transform(X) - return X - - def _validate_ts_data( - self, - dataframe, - y_train_all=None, - ): - assert ( - dataframe[dataframe.columns[0]].dtype.name == "datetime64[ns]" - ), f"For '{TS_FORECAST}' task, the first column must contain timestamp values." - if y_train_all is not None: - y_df = ( - pd.DataFrame(y_train_all) - if isinstance(y_train_all, pd.Series) - else pd.DataFrame(y_train_all, columns=["labels"]) - ) - dataframe = dataframe.join(y_df) - duplicates = dataframe.duplicated() - if any(duplicates): - logger.warning( - "Duplicate timestamp values found in timestamp column. " - f"\n{dataframe.loc[duplicates, dataframe][dataframe.columns[0]]}" - ) - dataframe = dataframe.drop_duplicates() - logger.warning("Removed duplicate rows based on all columns") - assert ( - dataframe[[dataframe.columns[0]]].duplicated() is None - ), "Duplicate timestamp values with different values for other columns." - ts_series = pd.to_datetime(dataframe[dataframe.columns[0]]) - inferred_freq = pd.infer_freq(ts_series) - if inferred_freq is None: - logger.warning( - "Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. " - ) - if y_train_all is not None: - return dataframe.iloc[:, :-1], dataframe.iloc[:, -1] - return dataframe - - def _validate_data( - self, - X_train_all, - y_train_all, - dataframe, - label, - X_val=None, - y_val=None, - groups_val=None, - groups=None, - ): - if X_train_all is not None and y_train_all is not None: - assert ( - isinstance(X_train_all, np.ndarray) - or issparse(X_train_all) - or isinstance(X_train_all, pd.DataFrame) - ), ( - "X_train_all must be a numpy array, a pandas dataframe, " - "or Scipy sparse matrix." - ) - assert isinstance(y_train_all, np.ndarray) or isinstance( - y_train_all, pd.Series - ), "y_train_all must be a numpy array or a pandas series." - assert ( - X_train_all.size != 0 and y_train_all.size != 0 - ), "Input data must not be empty." - if isinstance(X_train_all, np.ndarray) and len(X_train_all.shape) == 1: - X_train_all = np.reshape(X_train_all, (X_train_all.size, 1)) - if isinstance(y_train_all, np.ndarray): - y_train_all = y_train_all.flatten() - assert ( - X_train_all.shape[0] == y_train_all.shape[0] - ), "# rows in X_train must match length of y_train." - self._df = isinstance(X_train_all, pd.DataFrame) - self._nrow, self._ndim = X_train_all.shape - if self._state.task in TS_FORECAST: - X_train_all = pd.DataFrame(X_train_all) - X_train_all, y_train_all = self._validate_ts_data( - X_train_all, y_train_all - ) - X, y = X_train_all, y_train_all - elif dataframe is not None and label is not None: - assert isinstance( - dataframe, pd.DataFrame - ), "dataframe must be a pandas DataFrame" - assert label in dataframe.columns, "label must a column name in dataframe" - self._df = True - if self._state.task in TS_FORECAST: - dataframe = self._validate_ts_data(dataframe) - X = dataframe.drop(columns=label) - self._nrow, self._ndim = X.shape - y = dataframe[label] - else: - raise ValueError("either X_train+y_train or dataframe+label are required") - - # check the validity of input dimensions for NLP tasks, so need to check _is_nlp_task not estimator - if _is_nlp_task(self._state.task): - from .nlp.utils import is_a_list_of_str - - is_all_str = True - is_all_list = True - for column in X.columns: - assert X[column].dtype.name in ( - "object", - "string", - ), "If the task is an NLP task, X can only contain text columns" - for each_cell in X[column]: - if each_cell is not None: - is_str = isinstance(each_cell, str) - is_list_of_int = isinstance(each_cell, list) and all( - isinstance(x, int) for x in each_cell - ) - is_list_of_str = is_a_list_of_str(each_cell) - if self._state.task == TOKENCLASSIFICATION: - assert is_list_of_str, ( - "For the token-classification task, the input column needs to be a list of string," - "instead of string, e.g., ['EU', 'rejects','German', 'call','to','boycott','British','lamb','.',].", - "For more examples, please refer to test/nlp/test_autohf_tokenclassification.py", - ) - else: - assert is_str or is_list_of_int, ( - "Each column of the input must either be str (untokenized) " - "or a list of integers (tokenized)" - ) - is_all_str &= is_str - is_all_list &= is_list_of_int or is_list_of_str - assert is_all_str or is_all_list, ( - "Currently FLAML only supports two modes for NLP: either all columns of X are string (non-tokenized), " - "or all columns of X are integer ids (tokenized)" - ) - - if issparse(X_train_all) or self._skip_transform: - self._transformer = self._label_transformer = False - self._X_train_all, self._y_train_all = X, y - else: - from .data import DataTransformer - - self._transformer = DataTransformer() - - self._X_train_all, self._y_train_all = self._transformer.fit_transform( - X, y, self._state.task - ) - self._label_transformer = self._transformer.label_transformer - if self._state.task == TOKENCLASSIFICATION: - if hasattr(self._label_transformer, "label_list"): - self._state.fit_kwargs.update( - {"label_list": self._label_transformer.label_list} - ) - elif "label_list" not in self._state.fit_kwargs: - for each_fit_kwargs in self._state.fit_kwargs_by_estimator.values(): - assert ( - "label_list" in each_fit_kwargs - ), "For the token-classification task, you must either (1) pass token labels; or (2) pass id labels and the label list. " - "Please refer to the documentation for more details: https://microsoft.github.io/FLAML/docs/Examples/AutoML-NLP#a-simple-token-classification-example" - self._feature_names_in_ = ( - self._X_train_all.columns.to_list() - if hasattr(self._X_train_all, "columns") - else None - ) - - self._sample_weight_full = self._state.fit_kwargs.get( - "sample_weight" - ) # NOTE: _validate_data is before kwargs is updated to fit_kwargs_by_estimator - if X_val is not None and y_val is not None: - assert ( - isinstance(X_val, np.ndarray) - or issparse(X_val) - or isinstance(X_val, pd.DataFrame) - ), ( - "X_val must be None, a numpy array, a pandas dataframe, " - "or Scipy sparse matrix." - ) - assert isinstance(y_val, np.ndarray) or isinstance( - y_val, pd.Series - ), "y_val must be None, a numpy array or a pandas series." - assert X_val.size != 0 and y_val.size != 0, ( - "Validation data are expected to be nonempty. " - "Use None for X_val and y_val if no validation data." - ) - if isinstance(y_val, np.ndarray): - y_val = y_val.flatten() - assert ( - X_val.shape[0] == y_val.shape[0] - ), "# rows in X_val must match length of y_val." - if self._transformer: - self._state.X_val = self._transformer.transform(X_val) - else: - self._state.X_val = X_val - # If it's NLG_TASKS, y_val is a pandas series containing the output sequence tokens, - # so we cannot use label_transformer.transform to process it - if self._label_transformer: - self._state.y_val = self._label_transformer.transform(y_val) - else: - self._state.y_val = y_val - else: - self._state.X_val = self._state.y_val = None - if groups is not None and len(groups) != self._nrow: - # groups is given as group counts - self._state.groups = np.concatenate([[i] * c for i, c in enumerate(groups)]) - assert ( - len(self._state.groups) == self._nrow - ), "the sum of group counts must match the number of examples" - self._state.groups_val = ( - np.concatenate([[i] * c for i, c in enumerate(groups_val)]) - if groups_val is not None - else None - ) - else: - self._state.groups_val = groups_val - self._state.groups = groups - - def _prepare_data(self, eval_method, split_ratio, n_splits): - X_val, y_val = self._state.X_val, self._state.y_val - if issparse(X_val): - X_val = X_val.tocsr() - X_train_all, y_train_all = self._X_train_all, self._y_train_all - if issparse(X_train_all): - X_train_all = X_train_all.tocsr() - if ( - self._state.task in CLASSIFICATION - and self._auto_augment - and self._state.fit_kwargs.get("sample_weight") - is None # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - and self._split_type in ["stratified", "uniform"] - and self._state.task != TOKENCLASSIFICATION - ): - # logger.info(f"label {pd.unique(y_train_all)}") - label_set, counts = np.unique(y_train_all, return_counts=True) - # augment rare classes - rare_threshld = 20 - rare = counts < rare_threshld - rare_label, rare_counts = label_set[rare], counts[rare] - for i, label in enumerate(rare_label): - count = rare_count = rare_counts[i] - rare_index = y_train_all == label - n = len(y_train_all) - while count < rare_threshld: - if self._df: - X_train_all = concat( - X_train_all, X_train_all.iloc[:n].loc[rare_index] - ) - else: - X_train_all = concat( - X_train_all, X_train_all[:n][rare_index, :] - ) - if isinstance(y_train_all, pd.Series): - y_train_all = concat( - y_train_all, y_train_all.iloc[:n].loc[rare_index] - ) - else: - y_train_all = np.concatenate( - [y_train_all, y_train_all[:n][rare_index]] - ) - count += rare_count - logger.info(f"class {label} augmented from {rare_count} to {count}") - SHUFFLE_SPLIT_TYPES = ["uniform", "stratified"] - if self._split_type in SHUFFLE_SPLIT_TYPES: - if self._sample_weight_full is not None: - X_train_all, y_train_all, self._state.sample_weight_all = shuffle( - X_train_all, - y_train_all, - self._sample_weight_full, - random_state=RANDOM_SEED, - ) - self._state.fit_kwargs[ - "sample_weight" - ] = ( - self._state.sample_weight_all - ) # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - if isinstance(self._state.sample_weight_all, pd.Series): - self._state.sample_weight_all.reset_index(drop=True, inplace=True) - else: - X_train_all, y_train_all = shuffle( - X_train_all, y_train_all, random_state=RANDOM_SEED - ) - if self._df: - X_train_all.reset_index(drop=True, inplace=True) - if isinstance(y_train_all, pd.Series): - y_train_all.reset_index(drop=True, inplace=True) - - X_train, y_train = X_train_all, y_train_all - self._state.groups_all = self._state.groups - if X_val is None and eval_method == "holdout": - # if eval_method = holdout, make holdout data - if self._split_type == "time": - if self._state.task in TS_FORECAST: - period = self._state.fit_kwargs[ - "period" - ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - if self._state.task == TS_FORECASTPANEL: - X_train_all["time_idx"] -= X_train_all["time_idx"].min() - X_train_all["time_idx"] = X_train_all["time_idx"].astype("int") - ids = self._state.fit_kwargs["group_ids"].copy() - ids.append(TS_TIMESTAMP_COL) - ids.append("time_idx") - y_train_all = pd.DataFrame(y_train_all) - y_train_all[ids] = X_train_all[ids] - X_train_all = X_train_all.sort_values(ids) - y_train_all = y_train_all.sort_values(ids) - training_cutoff = X_train_all["time_idx"].max() - period - X_train = X_train_all[lambda x: x.time_idx <= training_cutoff] - y_train = y_train_all[ - lambda x: x.time_idx <= training_cutoff - ].drop(columns=ids) - X_val = X_train_all[lambda x: x.time_idx > training_cutoff] - y_val = y_train_all[ - lambda x: x.time_idx > training_cutoff - ].drop(columns=ids) - else: - num_samples = X_train_all.shape[0] - assert ( - period < num_samples - ), f"period={period}>#examples={num_samples}" - split_idx = num_samples - period - X_train = X_train_all[:split_idx] - y_train = y_train_all[:split_idx] - X_val = X_train_all[split_idx:] - y_val = y_train_all[split_idx:] - else: - if ( - "sample_weight" in self._state.fit_kwargs - ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - ( - X_train, - X_val, - y_train, - y_val, - self._state.fit_kwargs[ - "sample_weight" - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - self._state.weight_val, - ) = train_test_split( - X_train_all, - y_train_all, - self._state.fit_kwargs[ - "sample_weight" - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - test_size=split_ratio, - shuffle=False, - ) - else: - X_train, X_val, y_train, y_val = train_test_split( - X_train_all, - y_train_all, - test_size=split_ratio, - shuffle=False, - ) - elif self._split_type == "group": - gss = GroupShuffleSplit( - n_splits=1, test_size=split_ratio, random_state=RANDOM_SEED - ) - for train_idx, val_idx in gss.split( - X_train_all, y_train_all, self._state.groups_all - ): - if self._df: - X_train = X_train_all.iloc[train_idx] - X_val = X_train_all.iloc[val_idx] - else: - X_train, X_val = X_train_all[train_idx], X_train_all[val_idx] - y_train, y_val = y_train_all[train_idx], y_train_all[val_idx] - self._state.groups = self._state.groups_all[train_idx] - self._state.groups_val = self._state.groups_all[val_idx] - elif self._state.task in CLASSIFICATION: - # for classification, make sure the labels are complete in both - # training and validation data - label_set, first = np.unique(y_train_all, return_index=True) - rest = [] - last = 0 - first.sort() - for i in range(len(first)): - rest.extend(range(last, first[i])) - last = first[i] + 1 - rest.extend(range(last, len(y_train_all))) - X_first = X_train_all.iloc[first] if self._df else X_train_all[first] - X_rest = X_train_all.iloc[rest] if self._df else X_train_all[rest] - y_rest = y_train_all[rest] - stratify = y_rest if self._split_type == "stratified" else None - if ( - "sample_weight" in self._state.fit_kwargs - ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - ( - X_train, - X_val, - y_train, - y_val, - weight_train, - weight_val, - ) = train_test_split( - X_rest, - y_rest, - self._state.fit_kwargs["sample_weight"][ - rest - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - test_size=split_ratio, - stratify=stratify, - random_state=RANDOM_SEED, - ) - weight1 = self._state.fit_kwargs["sample_weight"][ - first - ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - self._state.weight_val = concat(weight1, weight_val) - self._state.fit_kwargs[ - "sample_weight" - ] = concat( # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - weight1, weight_train - ) - else: - X_train, X_val, y_train, y_val = train_test_split( - X_rest, - y_rest, - test_size=split_ratio, - stratify=stratify, - random_state=RANDOM_SEED, - ) - X_train = concat(X_first, X_train) - y_train = ( - concat(label_set, y_train) - if self._df - else np.concatenate([label_set, y_train]) - ) - X_val = concat(X_first, X_val) - y_val = ( - concat(label_set, y_val) - if self._df - else np.concatenate([label_set, y_val]) - ) - elif self._state.task in REGRESSION: - if ( - "sample_weight" in self._state.fit_kwargs - ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - ( - X_train, - X_val, - y_train, - y_val, - self._state.fit_kwargs[ - "sample_weight" - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - self._state.weight_val, - ) = train_test_split( - X_train_all, - y_train_all, - self._state.fit_kwargs[ - "sample_weight" - ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - test_size=split_ratio, - random_state=RANDOM_SEED, - ) - else: - X_train, X_val, y_train, y_val = train_test_split( - X_train_all, - y_train_all, - test_size=split_ratio, - random_state=RANDOM_SEED, - ) - self._state.data_size = X_train.shape - self.data_size_full = len(y_train_all) - self._state.X_train, self._state.y_train = X_train, y_train - self._state.X_val, self._state.y_val = X_val, y_val - self._state.X_train_all = X_train_all - self._state.y_train_all = y_train_all - if eval_method == "holdout": - self._state.kf = None - return - if self._split_type == "group": - # logger.info("Using GroupKFold") - assert ( - len(self._state.groups_all) == y_train_all.size - ), "the length of groups must match the number of examples" - assert ( - len(np.unique(self._state.groups_all)) >= n_splits - ), "the number of groups must be equal or larger than n_splits" - self._state.kf = GroupKFold(n_splits) - elif self._split_type == "stratified": - # logger.info("Using StratifiedKFold") - assert y_train_all.size >= n_splits, ( - f"{n_splits}-fold cross validation" - f" requires input data with at least {n_splits} examples." - ) - assert y_train_all.size >= 2 * n_splits, ( - f"{n_splits}-fold cross validation with metric=r2 " - f"requires input data with at least {n_splits*2} examples." - ) - self._state.kf = RepeatedStratifiedKFold( - n_splits=n_splits, n_repeats=1, random_state=RANDOM_SEED - ) - elif self._split_type == "time": - # logger.info("Using TimeSeriesSplit") - if ( - self._state.task in TS_FORECAST - and self._state.task is not TS_FORECASTPANEL - ): - period = self._state.fit_kwargs[ - "period" - ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator - if period * (n_splits + 1) > y_train_all.size: - n_splits = int(y_train_all.size / period - 1) - assert n_splits >= 2, ( - f"cross validation for forecasting period={period}" - f" requires input data with at least {3 * period} examples." - ) - logger.info(f"Using nsplits={n_splits} due to data size limit.") - self._state.kf = TimeSeriesSplit(n_splits=n_splits, test_size=period) - elif self._state.task is TS_FORECASTPANEL: - n_groups = X_train.groupby( - self._state.fit_kwargs.get("group_ids") - ).ngroups - period = self._state.fit_kwargs.get("period") - self._state.kf = TimeSeriesSplit( - n_splits=n_splits, test_size=period * n_groups - ) - else: - self._state.kf = TimeSeriesSplit(n_splits=n_splits) - elif isinstance(self._split_type, str): - # logger.info("Using RepeatedKFold") - self._state.kf = RepeatedKFold( - n_splits=n_splits, n_repeats=1, random_state=RANDOM_SEED - ) - else: - # logger.info("Using splitter object") - self._state.kf = self._split_type - if isinstance(self._state.kf, (GroupKFold, StratifiedGroupKFold)): - # self._split_type is either "group", a GroupKFold object, or a StratifiedGroupKFold object - self._state.kf.groups = self._state.groups_all - def add_learner(self, learner_name, learner_class): """Add a customized learner. @@ -1596,7 +608,9 @@ def add_learner(self, learner_name, learner_class): """ self._state.learner_classes[learner_name] = learner_class - def get_estimator_from_log(self, log_file_name: str, record_id: int, task: str): + def get_estimator_from_log( + self, log_file_name: str, record_id: int, task: Union[str, Task] + ): """Get the estimator from log file. Args: @@ -1604,7 +618,8 @@ def get_estimator_from_log(self, log_file_name: str, record_id: int, task: str): record_id: An integer of the record ID in the file, 0 corresponds to the first trial. task: A string of the task type, - 'binary', 'multiclass', 'regression', 'ts_forecast', 'rank'. + 'binary', 'multiclass', 'regression', 'ts_forecast', 'rank', + or an instance of the Task class. Returns: An estimator object for the given configuration. @@ -1615,6 +630,9 @@ def get_estimator_from_log(self, log_file_name: str, record_id: int, task: str): estimator = record.learner config = AutoMLState.sanitize(record.config) + if isinstance(task, str): + task = task_factory(task) + estimator, _ = train_estimator( X_train=None, y_train=None, @@ -1634,7 +652,7 @@ def retrain_from_log( dataframe=None, label=None, time_budget=np.inf, - task=None, + task: Optional[Union[str, Task]] = None, eval_method=None, split_ratio=None, n_splits=None, @@ -1661,10 +679,7 @@ def retrain_from_log( Args: log_file_name: A string of the log file name. X_train: A numpy array or dataframe of training data in shape n*m. - For time series forecast tasks, the first column of X_train - must be the timestamp column (datetime type). Other - columns in the dataframe are assumed to be exogenous - variables (categorical or numeric). + For time series forecast tasks, the first column of X_train must be the timestamp column (datetime type). Other columns in the dataframe are assumed to be exogenous variables (categorical or numeric). y_train: A numpy array or series of labels in shape n*1. dataframe: A dataframe of training data including label column. For time series forecast tasks, dataframe must be specified and should @@ -1679,7 +694,8 @@ def retrain_from_log( time_budget: A float number of the time budget in seconds. task: A string of the task type, e.g., 'classification', 'regression', 'ts_forecast', 'rank', - 'seq-classification', 'seq-regression', 'summarization'. + 'seq-classification', 'seq-regression', 'summarization', + or an instance of Task class. eval_method: A string of resampling strategy, one of ['auto', 'cv', 'holdout']. split_ratio: A float of the validation data percentage for holdout. @@ -1765,6 +781,9 @@ def retrain_from_log( used by TemporalFusionTransformerEstimator. """ task = task or self._settings.get("task") + if isinstance(task, str): + task = task_factory(task) + eval_method = eval_method or self._settings.get("eval_method") split_ratio = split_ratio or self._settings.get("split_ratio") n_splits = n_splits or self._settings.get("n_splits") @@ -1773,7 +792,7 @@ def retrain_from_log( self._settings.get("auto_augment") if auto_augment is None else auto_augment ) self._state.task = task - self._estimator_type = "classifier" if task in CLASSIFICATION else "regressor" + self._estimator_type = "classifier" if task.is_classification() else "regressor" self._state.fit_kwargs = fit_kwargs self._state.custom_hp = custom_hp or self._settings.get("custom_hp") @@ -1790,7 +809,9 @@ def retrain_from_log( if preserve_checkpoint is None else preserve_checkpoint ) - self._validate_data(X_train, y_train, dataframe, label, groups=groups) + task.validate_data( + self, self._state, X_train, y_train, dataframe, label, groups=groups + ) logger.info("log file name {}".format(log_file_name)) @@ -1854,7 +875,12 @@ def retrain_from_log( ) # Partially copied from fit() function # Initilize some attributes required for retrain_from_log - self._decide_split_type(split_type) + self._split_type = task.decide_split_type( + split_type, + self._y_train_all, + self._state.fit_kwargs, + self._state.groups, + ) eval_method = self._decide_eval_method(eval_method, time_budget) self.modelcount = 0 self._auto_augment = auto_augment @@ -1880,51 +906,6 @@ def retrain_from_log( logger.info("retrain from log succeeded") return training_duration - def _decide_split_type(self, split_type): - if self._state.task == "classification": - self._state.task = get_classification_objective( - len(np.unique(self._y_train_all)) - ) - if not isinstance(split_type, str): - assert hasattr(split_type, "split") and hasattr( - split_type, "get_n_splits" - ), "split_type must be a string or a splitter object with split and get_n_splits methods." - assert ( - not isinstance(split_type, GroupKFold) or self._state.groups is not None - ), "GroupKFold requires groups to be provided." - self._split_type = split_type - elif self._state.task in CLASSIFICATION: - assert split_type in ["auto", "stratified", "uniform", "time", "group"] - self._split_type = ( - split_type - if split_type != "auto" - else self._state.groups is None and "stratified" or "group" - ) - elif self._state.task in REGRESSION: - assert split_type in ["auto", "uniform", "time", "group"] - self._split_type = split_type if split_type != "auto" else "uniform" - elif self._state.task in TS_FORECAST: - assert split_type in ["auto", "time"] - self._split_type = "time" - assert isinstance( - self._state.fit_kwargs.get("period"), - int, # NOTE: _decide_split_type is before kwargs is updated to fit_kwargs_by_estimator - ), f"missing a required integer 'period' for '{TS_FORECAST}' task." - if self._state.fit_kwargs.get("group_ids"): - self._state.task == TS_FORECASTPANEL - assert isinstance( - self._state.fit_kwargs.get("group_ids"), list - ), f"missing a required List[str] 'group_ids' for '{TS_FORECASTPANEL}' task." - elif self._state.task == "rank": - assert ( - self._state.groups is not None - ), "groups must be specified for ranking task." - assert split_type in ["auto", "group"] - self._split_type = "group" - elif self._state.task in NLG_TASKS: - assert split_type in ["auto", "uniform", "time", "group"] - self._split_type = split_type if split_type != "auto" else "uniform" - def _decide_eval_method(self, eval_method, time_budget): if not isinstance(self._split_type, str): assert eval_method in [ @@ -2172,6 +1153,21 @@ def metric_constraints(self) -> list: """ return self._metric_constraints + def _prepare_data(self, eval_method, split_ratio, n_splits): + self._state.task.prepare_data( + self._state, + self._X_train_all, + self._y_train_all, + self._auto_augment, + eval_method, + self._split_type, + split_ratio, + n_splits, + self._df, + self._sample_weight_full, + ) + self.data_size_full = len(self._state.y_train_all) + def fit( self, X_train=None, @@ -2179,7 +1175,7 @@ def fit( dataframe=None, label=None, metric=None, - task=None, + task: Optional[Union[str, Task]] = None, n_jobs=None, # gpu_per_trial=0, log_file_name=None, @@ -2289,7 +1285,7 @@ def custom_metric( task: A string of the task type, e.g., 'classification', 'regression', 'ts_forecast_regression', 'ts_forecast_classification', 'rank', 'seq-classification', - 'seq-regression', 'summarization'. + 'seq-regression', 'summarization', or an instance of Task class n_jobs: An integer of the number of threads for training | default=-1. Use all available resources when n_jobs == -1. log_file_name: A string of the log file name | default="". To disable logging, @@ -2539,7 +1535,10 @@ def cv_score_agg_func(val_loss_folds, log_metrics_folds): self._state._start_time_flag = self._start_time_flag = time.time() task = task or self._settings.get("task") - self._estimator_type = "classifier" if task in CLASSIFICATION else "regressor" + if isinstance(task, str): + task = task_factory(task, X_train, y_train) + self._state.task = task + self._estimator_type = "classifier" if task.is_classification() else "regressor" time_budget = time_budget or self._settings.get("time_budget") n_jobs = n_jobs or self._settings.get("n_jobs") gpu_per_trial = fit_kwargs.get("gpu_per_trial", 0) @@ -2719,15 +1718,29 @@ def cv_score_agg_func(val_loss_folds, log_metrics_folds): ) # shallow copy of fit_kwargs_by_estimator self._state.weight_val = sample_weight_val - self._validate_data( - X_train, y_train, dataframe, label, X_val, y_val, groups_val, groups + task.validate_data( + self, + self._state, + X_train, + y_train, + dataframe, + label, + X_val, + y_val, + groups_val, + groups, ) self._search_states = {} # key: estimator name; value: SearchState self._random = np.random.RandomState(RANDOM_SEED) self._seed = seed if seed is not None else 20 self._learner_selector = learner_selector logger.info(f"task = {task}") - self._decide_split_type(split_type) + self._split_type = self._state.task.decide_split_type( + split_type, + self._y_train_all, + self._state.fit_kwargs, + self._state.groups, + ) logger.info(f"Data split method: {self._split_type}") eval_method = self._decide_eval_method(eval_method, time_budget) self._state.eval_method = eval_method @@ -2778,12 +1791,13 @@ def cv_score_agg_func(val_loss_folds, log_metrics_folds): self._min_sample_size_input = min_sample_size self._prepare_data(eval_method, split_ratio, n_splits) + # TODO pull this to task as decide_sample_size if isinstance(self._min_sample_size, dict): self._sample = { ( k, sample - and task != "rank" + and not task.is_rank() and eval_method != "cv" and ( self._min_sample_size[k] * SAMPLE_MULTIPLY_FACTOR @@ -2795,33 +1809,18 @@ def cv_score_agg_func(val_loss_folds, log_metrics_folds): else: self._sample = ( sample - and task != "rank" + and not task.is_rank() and eval_method != "cv" and ( self._min_sample_size * SAMPLE_MULTIPLY_FACTOR < self._state.data_size[0] ) ) - if "auto" == metric: - if _is_nlp_task(self._state.task): - from flaml.automl.nlp.utils import ( - load_default_huggingface_metric_for_task, - ) - - metric = load_default_huggingface_metric_for_task(self._state.task) - elif "binary" in self._state.task: - metric = "roc_auc" - elif "multiclass" in self._state.task: - metric = "log_loss" - elif self._state.task in TS_FORECAST: - metric = "mape" - elif self._state.task == "rank": - metric = "ndcg" - else: - metric = "r2" + metric = task.default_metric(metric) self._state.metric = metric + # TODO pull this to task def is_to_reverse_metric(metric, task): if metric.startswith("ndcg"): return True, f"1-{metric}" @@ -2840,7 +1839,7 @@ def is_to_reverse_metric(metric, task): "macro_f1", ]: return True, f"1-{metric}" - if _is_nlp_task(task): + if task.is_nlp(): from flaml.automl.ml import huggingface_metric_to_mode if ( @@ -2860,46 +1859,8 @@ def is_to_reverse_metric(metric, task): error_metric = "customized metric" logger.info(f"Minimizing error metric: {error_metric}") - if "auto" == estimator_list: - if self._state.task == "rank": - estimator_list = ["lgbm", "xgboost", "xgb_limitdepth"] - elif _is_nlp_task(self._state.task): - estimator_list = ["transformer"] - elif self._state.task == TS_FORECASTPANEL: - estimator_list = ["tft"] - else: - try: - import catboost - - estimator_list = [ - "lgbm", - "rf", - "catboost", - "xgboost", - "extra_tree", - "xgb_limitdepth", - ] - except ImportError: - estimator_list = [ - "lgbm", - "rf", - "xgboost", - "extra_tree", - "xgb_limitdepth", - ] - if self._state.task in TS_FORECAST: - # catboost is removed because it has a `name` parameter, making it incompatible with hcrystalball - if "catboost" in estimator_list: - estimator_list.remove("catboost") - if self._state.task in TS_FORECASTREGRESSION: - try: - import prophet - - estimator_list += ["prophet", "arima", "sarimax"] - except ImportError: - estimator_list += ["arima", "sarimax"] - elif "regression" != self._state.task: - estimator_list += ["lrl1"] + estimator_list = task.default_estimator_list(estimator_list) + # When no search budget is specified if no_budget: max_iter = len(estimator_list) @@ -3673,7 +2634,7 @@ def _search(self): [(estimator[0], estimator[1].params) for estimator in estimators] ) if len(estimators) > 1: - if self._state.task in CLASSIFICATION: + if self._state.task.is_classification(): from sklearn.ensemble import StackingClassifier as Stacker else: from sklearn.ensemble import StackingRegressor as Stacker diff --git a/flaml/automl/data.py b/flaml/automl/data.py index 01f27d14e6cb..bfb5bbd5f55f 100644 --- a/flaml/automl/data.py +++ b/flaml/automl/data.py @@ -10,51 +10,13 @@ from flaml.automl.training_log import training_log_reader from datetime import datetime -from typing import Union - -# TODO: if your task is not specified in here, define your task as an all-capitalized word -SEQCLASSIFICATION = "seq-classification" -MULTICHOICECLASSIFICATION = "multichoice-classification" -TOKENCLASSIFICATION = "token-classification" -CLASSIFICATION = ( - "binary", - "multiclass", - "classification", - SEQCLASSIFICATION, - MULTICHOICECLASSIFICATION, - TOKENCLASSIFICATION, -) -SEQREGRESSION = "seq-regression" -REGRESSION = ("regression", SEQREGRESSION) -TS_FORECASTREGRESSION = ( - "forecast", - "ts_forecast", - "ts_forecast_regression", -) -TS_FORECASTCLASSIFICATION = "ts_forecast_classification" -TS_FORECASTPANEL = "ts_forecast_panel" -TS_FORECAST = ( - *TS_FORECASTREGRESSION, - TS_FORECASTCLASSIFICATION, - TS_FORECASTPANEL, -) +from typing import TYPE_CHECKING, Union + +if TYPE_CHECKING: + from flaml.automl.task import Task + TS_TIMESTAMP_COL = "ds" TS_VALUE_COL = "y" -SUMMARIZATION = "summarization" -NLG_TASKS = (SUMMARIZATION,) -NLU_TASKS = ( - SEQREGRESSION, - SEQCLASSIFICATION, - MULTICHOICECLASSIFICATION, - TOKENCLASSIFICATION, -) - - -def _is_nlp_task(task): - if task in NLU_TASKS or task in NLG_TASKS: - return True - else: - return False def load_openml_dataset( @@ -273,20 +235,26 @@ def add_time_idx_col(X): class DataTransformer: """Transform input training data.""" - def fit_transform(self, X: Union[DataFrame, np.array], y, task): + def fit_transform( + self, X: Union[DataFrame, np.ndarray], y, task: Union[str, "Task"] + ): """Fit transformer and process the input training data according to the task type. Args: X: A numpy array or a pandas dataframe of training data. y: A numpy array or a pandas series of labels. - task: A string of the task type, e.g., - 'classification', 'regression', 'ts_forecast', 'rank'. + task: An instance of type Task, or a str such as 'classification', 'regression'. Returns: X: Processed numpy array or pandas dataframe of training data. y: Processed numpy array or pandas series of labels. """ - if _is_nlp_task(task): + if isinstance(task, str): + from flaml.automl.task.factory import task_factory + + task = task_factory(task, X, y) + + if task.is_nlp(): # if the mode is NLP, check the type of input, each column must be either string or # ids (input ids, token type id, attention mask, etc.) str_columns = [] @@ -301,9 +269,9 @@ def fit_transform(self, X: Union[DataFrame, np.array], y, task): n = X.shape[0] cat_columns, num_columns, datetime_columns = [], [], [] drop = False - if task in TS_FORECAST: + if task.is_ts_forecast(): X = X.rename(columns={X.columns[0]: TS_TIMESTAMP_COL}) - if task is TS_FORECASTPANEL: + if task.is_ts_forecastpanel(): if "time_idx" not in X: X = add_time_idx_col(X) ds_col = X.pop(TS_TIMESTAMP_COL) @@ -361,7 +329,7 @@ def fit_transform(self, X: Union[DataFrame, np.array], y, task): X[column] = X[column].fillna(np.nan) num_columns.append(column) X = X[cat_columns + num_columns] - if task in TS_FORECAST: + if task.is_ts_forecast(): X.insert(0, TS_TIMESTAMP_COL, ds_col) if cat_columns: X[cat_columns] = X[cat_columns].astype("category") @@ -396,11 +364,11 @@ def fit_transform(self, X: Union[DataFrame, np.array], y, task): ) self._drop = drop if ( - task in CLASSIFICATION + task.is_classification() or not pd.api.types.is_numeric_dtype(y) - and task not in NLG_TASKS + and not task.is_nlg() ): - if task != TOKENCLASSIFICATION: + if not task.is_token_classification(): from sklearn.preprocessing import LabelEncoder self.label_transformer = LabelEncoder() @@ -409,7 +377,6 @@ def fit_transform(self, X: Union[DataFrame, np.array], y, task): self.label_transformer = LabelEncoderforTokenClassification() y = self.label_transformer.fit_transform(y) - else: self.label_transformer = None self._task = task @@ -426,7 +393,7 @@ def transform(self, X: Union[DataFrame, np.array]): """ X = X.copy() - if _is_nlp_task(self._task): + if self._task.is_nlp(): # if the mode is NLP, check the type of input, each column must be either string or # ids (input ids, token type id, attention mask, etc.) if len(self._str_columns) > 0: @@ -437,7 +404,7 @@ def transform(self, X: Union[DataFrame, np.array]): self._num_columns, self._datetime_columns, ) - if self._task in TS_FORECAST: + if self._task.is_ts_forecast(): X = X.rename(columns={X.columns[0]: TS_TIMESTAMP_COL}) ds_col = X.pop(TS_TIMESTAMP_COL) for column in datetime_columns: @@ -459,7 +426,7 @@ def transform(self, X: Union[DataFrame, np.array]): X[column] = X[column].map(datetime.toordinal) del tmp_dt X = X[cat_columns + num_columns].copy() - if self._task in TS_FORECAST: + if self._task.is_ts_forecast(): X.insert(0, TS_TIMESTAMP_COL, ds_col) for column in cat_columns: if X[column].dtype.name == "object": diff --git a/flaml/automl/logger.py b/flaml/automl/logger.py new file mode 100644 index 000000000000..1085b5aae1e3 --- /dev/null +++ b/flaml/automl/logger.py @@ -0,0 +1,7 @@ +import logging + +logger = logging.getLogger(__name__) +logger_formatter = logging.Formatter( + "[%(name)s: %(asctime)s] {%(lineno)d} %(levelname)s - %(message)s", "%m-%d %H:%M:%S" +) +logger.propagate = False diff --git a/flaml/automl/ml.py b/flaml/automl/ml.py index 55fcd932ec4c..ae3573a17b4c 100644 --- a/flaml/automl/ml.py +++ b/flaml/automl/ml.py @@ -47,8 +47,10 @@ TemporalFusionTransformerEstimator, TransformersEstimatorModelSelection, ) -from flaml.automl.data import CLASSIFICATION, group_counts, TS_FORECAST +from flaml.automl.data import group_counts +from flaml.automl.task.task import TS_FORECAST, Task from flaml.automl.model import BaseEstimator + import logging logger = logging.getLogger(__name__) @@ -354,8 +356,8 @@ def sklearn_metric_loss_score( return score -def get_y_pred(estimator, X, eval_metric, obj): - if eval_metric in ["roc_auc", "ap", "roc_auc_weighted"] and "binary" in obj: +def get_y_pred(estimator, X, eval_metric, task: Task): + if eval_metric in ["roc_auc", "ap", "roc_auc_weighted"] and task.is_binary(): y_pred_classes = estimator.predict_proba(X) y_pred = y_pred_classes[:, 1] if y_pred_classes.ndim > 1 else y_pred_classes elif eval_metric in [ @@ -382,7 +384,7 @@ def _eval_estimator( weight_val, groups_val, eval_metric: Union[str, Callable], - obj, + task, labels=None, log_training_metric=False, fit_kwargs: Optional[dict] = None, @@ -391,7 +393,7 @@ def _eval_estimator( fit_kwargs = {} if isinstance(eval_metric, str): pred_start = time.time() - val_pred_y = get_y_pred(estimator, X_val, eval_metric, obj) + val_pred_y = get_y_pred(estimator, X_val, eval_metric, task) pred_time = (time.time() - pred_start) / X_val.shape[0] val_loss = metric_loss_score( @@ -404,7 +406,7 @@ def _eval_estimator( ) metric_for_logging = {"pred_time": pred_time} if log_training_metric: - train_pred_y = get_y_pred(estimator, X_train, eval_metric, obj) + train_pred_y = get_y_pred(estimator, X_train, eval_metric, task) metric_for_logging["train_loss"] = metric_loss_score( eval_metric, train_pred_y, @@ -499,120 +501,6 @@ def default_cv_score_agg_func(val_loss_folds, log_metrics_folds): return metric_to_minimize, metrics_to_log -def evaluate_model_CV( - config: dict, - estimator: EstimatorSubclass, - X_train_all, - y_train_all, - budget, - kf, - task: str, - eval_metric, - best_val_loss, - cv_score_agg_func=None, - log_training_metric=False, - fit_kwargs: Optional[dict] = None, - free_mem_ratio=0, -): - if fit_kwargs is None: - fit_kwargs = {} - if cv_score_agg_func is None: - cv_score_agg_func = default_cv_score_agg_func - start_time = time.time() - val_loss_folds = [] - log_metric_folds = [] - metric = None - train_time = pred_time = 0 - total_fold_num = 0 - n = kf.get_n_splits() - X_train_split, y_train_split = X_train_all, y_train_all - if task in CLASSIFICATION: - labels = np.unique(y_train_all) - else: - labels = fit_kwargs.get( - "label_list" - ) # pass the label list on to compute the evaluation metric - groups = None - shuffle = getattr(kf, "shuffle", task not in TS_FORECAST) - if isinstance(kf, RepeatedStratifiedKFold): - kf = kf.split(X_train_split, y_train_split) - elif isinstance(kf, (GroupKFold, StratifiedGroupKFold)): - groups = kf.groups - kf = kf.split(X_train_split, y_train_split, groups) - shuffle = False - elif isinstance(kf, TimeSeriesSplit): - kf = kf.split(X_train_split, y_train_split) - else: - kf = kf.split(X_train_split) - rng = np.random.RandomState(2020) - budget_per_train = budget and budget / n - if "sample_weight" in fit_kwargs: - weight = fit_kwargs["sample_weight"] - weight_val = None - else: - weight = weight_val = None - for train_index, val_index in kf: - if shuffle: - train_index = rng.permutation(train_index) - if isinstance(X_train_all, pd.DataFrame): - X_train = X_train_split.iloc[train_index] - X_val = X_train_split.iloc[val_index] - else: - X_train, X_val = X_train_split[train_index], X_train_split[val_index] - y_train, y_val = y_train_split[train_index], y_train_split[val_index] - estimator.cleanup() - if weight is not None: - fit_kwargs["sample_weight"], weight_val = ( - weight[train_index], - weight[val_index], - ) - if groups is not None: - fit_kwargs["groups"] = ( - groups[train_index] - if isinstance(groups, np.ndarray) - else groups.iloc[train_index] - ) - groups_val = ( - groups[val_index] - if isinstance(groups, np.ndarray) - else groups.iloc[val_index] - ) - else: - groups_val = None - val_loss_i, metric_i, train_time_i, pred_time_i = get_val_loss( - config, - estimator, - X_train, - y_train, - X_val, - y_val, - weight_val, - groups_val, - eval_metric, - task, - labels, - budget_per_train, - log_training_metric=log_training_metric, - fit_kwargs=fit_kwargs, - free_mem_ratio=free_mem_ratio, - ) - if isinstance(metric_i, dict) and "intermediate_results" in metric_i.keys(): - del metric_i["intermediate_results"] - if weight is not None: - fit_kwargs["sample_weight"] = weight - total_fold_num += 1 - val_loss_folds.append(val_loss_i) - log_metric_folds.append(metric_i) - train_time += train_time_i - pred_time += pred_time_i - if budget and time.time() - start_time >= budget: - break - val_loss, metric = cv_score_agg_func(val_loss_folds, log_metric_folds) - n = total_fold_num - pred_time /= n - return val_loss, metric, train_time, pred_time - - def compute_estimator( X_train, y_train, @@ -674,14 +562,13 @@ def compute_estimator( free_mem_ratio=0, ) else: - val_loss, metric_for_logging, train_time, pred_time = evaluate_model_CV( + val_loss, metric_for_logging, train_time, pred_time = task.evaluate_model_CV( config_dic, estimator, X_train, y_train, budget, kf, - task, eval_metric, best_val_loss, cv_score_agg_func, @@ -734,14 +621,6 @@ def train_estimator( return estimator, train_time -def get_classification_objective(num_labels: int) -> str: - if num_labels == 2: - objective_name = "binary" - else: - objective_name = "multiclass" - return objective_name - - def norm_confusion_matrix( y_true: Union[np.array, pd.Series], y_pred: Union[np.array, pd.Series] ): diff --git a/flaml/automl/model.py b/flaml/automl/model.py index 0d8b32c7d7e8..019aee347d20 100644 --- a/flaml/automl/model.py +++ b/flaml/automl/model.py @@ -22,11 +22,13 @@ from flaml import tune from flaml.automl.data import ( group_counts, - CLASSIFICATION, add_time_idx_col, - TS_FORECASTREGRESSION, TS_TIMESTAMP_COL, TS_VALUE_COL, +) +from flaml.automl.task.task import ( + CLASSIFICATION, + TS_FORECASTREGRESSION, SEQCLASSIFICATION, SEQREGRESSION, TOKENCLASSIFICATION, @@ -567,9 +569,14 @@ def tokenizer(self): @property def data_collator(self): - from .nlp.huggingface.data_collator import task_to_datacollator_class + from flaml.automl.task.task import Task + from flaml.automl.nlp.huggingface.data_collator import ( + task_to_datacollator_class, + ) - data_collator_class = task_to_datacollator_class.get(self._task) + data_collator_class = task_to_datacollator_class.get( + self._task.name if isinstance(self._task, Task) else self._task + ) if data_collator_class: kwargs = { @@ -1486,8 +1493,12 @@ def cost_relative2lgbm(cls): return 1.9 def __init__(self, task="binary", **params): + if isinstance(task, str): + from flaml.automl.task.factory import task_factory + + task = task_factory(task) super().__init__(task, **params) - if "regression" in task: + if task.is_regression(): self.estimator_class = ExtraTreesRegressor else: self.estimator_class = ExtraTreesClassifier @@ -2122,11 +2133,7 @@ def predict(self, X, **kwargs): X.iloc[:i, :] ) preds.append(self._model[i - 1].predict(X_pred, **kwargs)[-1]) - forecast = DataFrame( - data=np.asarray(preds).reshape(-1, 1), - columns=[self.hcrystaball_model.name], - index=X.index, - ) + forecast = Series(preds) else: ( X_pred, diff --git a/flaml/automl/nlp/huggingface/data_collator.py b/flaml/automl/nlp/huggingface/data_collator.py index 484e6a346c52..51cfda109e7a 100644 --- a/flaml/automl/nlp/huggingface/data_collator.py +++ b/flaml/automl/nlp/huggingface/data_collator.py @@ -6,7 +6,7 @@ ) from collections import OrderedDict -from flaml.automl.data import ( +from flaml.automl.task.task import ( TOKENCLASSIFICATION, MULTICHOICECLASSIFICATION, SUMMARIZATION, diff --git a/flaml/automl/nlp/huggingface/training_args.py b/flaml/automl/nlp/huggingface/training_args.py index 7461b1caa980..9fed71e1dc23 100644 --- a/flaml/automl/nlp/huggingface/training_args.py +++ b/flaml/automl/nlp/huggingface/training_args.py @@ -1,9 +1,7 @@ import argparse from dataclasses import dataclass, field -from flaml.automl.data import ( - NLG_TASKS, -) +from flaml.automl.task.task import NLG_TASKS from typing import Optional, List try: diff --git a/flaml/automl/nlp/huggingface/utils.py b/flaml/automl/nlp/huggingface/utils.py index afea65c105cb..d9cc0244c0a5 100644 --- a/flaml/automl/nlp/huggingface/utils.py +++ b/flaml/automl/nlp/huggingface/utils.py @@ -2,7 +2,7 @@ from itertools import chain import numpy as np -from flaml.automl.data import ( +from flaml.automl.task.task import ( SUMMARIZATION, SEQREGRESSION, SEQCLASSIFICATION, @@ -402,7 +402,11 @@ def load_model(checkpoint_path, task, num_labels=None): transformers.logging.set_verbosity_error() from transformers import AutoConfig - from ...data import SEQCLASSIFICATION, SEQREGRESSION, TOKENCLASSIFICATION + from flaml.automl.task.task import ( + SEQCLASSIFICATION, + SEQREGRESSION, + TOKENCLASSIFICATION, + ) def get_this_model(checkpoint_path, task, model_config): from transformers import AutoModelForSequenceClassification diff --git a/flaml/automl/nlp/utils.py b/flaml/automl/nlp/utils.py index 431bf4219f85..87dc940e01fa 100644 --- a/flaml/automl/nlp/utils.py +++ b/flaml/automl/nlp/utils.py @@ -1,7 +1,7 @@ from typing import Dict, Any import numpy as np -from flaml.automl.data import ( +from flaml.automl.task.task import ( SUMMARIZATION, SEQREGRESSION, SEQCLASSIFICATION, diff --git a/flaml/automl/state.py b/flaml/automl/state.py new file mode 100644 index 000000000000..c10a2c83f99a --- /dev/null +++ b/flaml/automl/state.py @@ -0,0 +1,429 @@ +import inspect +import time +from typing import Any, Optional + +import numpy as np +import pandas as pd + +from flaml import tune +from flaml.automl.logger import logger +from flaml.automl.ml import compute_estimator, train_estimator +from flaml.automl.task.task import TS_FORECAST + + +class SearchState: + @property + def search_space(self): + return self._search_space_domain + + @property + def estimated_cost4improvement(self): + return max( + self.time_best_found - self.time_best_found_old, + self.total_time_used - self.time_best_found, + ) + + def valid_starting_point_one_dim(self, value_one_dim, domain_one_dim): + from flaml.tune.space import sample + + """ + For each hp in the starting point, check the following 3 conditions: + (1) If the type of the starting point does not match the required type in search space, return false + (2) If the starting point is not in the required search space, return false + (3) If the search space is a value instead of domain, and the value is not equal to the starting point + Notice (2) include the case starting point not in user specified search space custom_hp + """ + if isinstance(domain_one_dim, sample.Domain): + renamed_type = list( + inspect.signature(domain_one_dim.is_valid).parameters.values() + )[0].annotation + type_match = ( + renamed_type == Any + or isinstance(value_one_dim, renamed_type) + or isinstance(value_one_dim, int) + and renamed_type is float + ) + if not (type_match and domain_one_dim.is_valid(value_one_dim)): + return False + elif value_one_dim != domain_one_dim: + return False + return True + + def valid_starting_point(self, starting_point, search_space): + return all( + self.valid_starting_point_one_dim(value, search_space[name].get("domain")) + for name, value in starting_point.items() + if name != "FLAML_sample_size" + ) + + def __init__( + self, + learner_class, + data_size, + task, + starting_point=None, + period=None, + custom_hp=None, + max_iter=None, + budget=None, + ): + self.init_eci = learner_class.cost_relative2lgbm() if budget >= 0 else 1 + self._search_space_domain = {} + self.init_config = None + self.low_cost_partial_config = {} + self.cat_hp_cost = {} + self.data_size = data_size + self.ls_ever_converged = False + self.learner_class = learner_class + self._budget = budget + if task in TS_FORECAST: + search_space = learner_class.search_space( + data_size=data_size, task=task, pred_horizon=period + ) + else: + search_space = learner_class.search_space(data_size=data_size, task=task) + + if custom_hp is not None: + search_space.update(custom_hp) + + if isinstance(starting_point, dict): + starting_point = AutoMLState.sanitize(starting_point) + if max_iter > 1 and not self.valid_starting_point( + starting_point, search_space + ): + # If the number of iterations is larger than 1, remove invalid point + logger.warning( + "Starting point {} removed because it is outside of the search space".format( + starting_point + ) + ) + starting_point = None + elif isinstance(starting_point, list): + starting_point = [AutoMLState.sanitize(x) for x in starting_point] + if max_iter > len(starting_point): + # If the number of starting points is no smaller than max iter, avoid the checking + starting_point_len = len(starting_point) + starting_point = [ + x + for x in starting_point + if self.valid_starting_point(x, search_space) + ] + if starting_point_len > len(starting_point): + logger.warning( + "Starting points outside of the search space are removed. " + f"Remaining starting points for {learner_class}: {starting_point}" + ) + starting_point = starting_point or None + + for name, space in search_space.items(): + assert ( + "domain" in space + ), f"{name}'s domain is missing in the search space spec {space}" + if space["domain"] is None: + # don't search this hp + continue + self._search_space_domain[name] = space["domain"] + + if "low_cost_init_value" in space: + self.low_cost_partial_config[name] = space["low_cost_init_value"] + if "cat_hp_cost" in space: + self.cat_hp_cost[name] = space["cat_hp_cost"] + # if a starting point is provided, set the init config to be + # the starting point provided + if ( + isinstance(starting_point, dict) + and starting_point.get(name) is not None + ): + if self.init_config is None: + self.init_config = {} + self.init_config[name] = starting_point[name] + elif ( + not isinstance(starting_point, list) + and "init_value" in space + and self.valid_starting_point_one_dim( + space["init_value"], space["domain"] + ) + ): + if self.init_config is None: + self.init_config = {} + self.init_config[name] = space["init_value"] + + if isinstance(starting_point, list): + self.init_config = starting_point + else: + self.init_config = [] if self.init_config is None else [self.init_config] + + self._hp_names = list(self._search_space_domain.keys()) + self.search_alg = None + self.best_config = None + self.best_result = None + self.best_loss = self.best_loss_old = np.inf + self.total_time_used = 0 + self.total_iter = 0 + self.base_eci = None + self.time_best_found = self.time_best_found_old = 0 + self.time2eval_best = 0 + self.time2eval_best_old = 0 + self.trained_estimator = None + self.sample_size = None + self.trial_time = 0 + + def update(self, result, time_used): + if result: + config = result["config"] + if config and "FLAML_sample_size" in config: + self.sample_size = config["FLAML_sample_size"] + else: + self.sample_size = self.data_size[0] + obj = result["val_loss"] + metric_for_logging = result["metric_for_logging"] + time2eval = result["time_total_s"] + trained_estimator = result["trained_estimator"] + del result["trained_estimator"] # free up RAM + n_iter = ( + trained_estimator + and hasattr(trained_estimator, "ITER_HP") + and trained_estimator.params.get(trained_estimator.ITER_HP) + ) + if n_iter: + if "ml" in config: + config["ml"][trained_estimator.ITER_HP] = n_iter + else: + config[trained_estimator.ITER_HP] = n_iter + else: + obj, time2eval, trained_estimator = np.inf, 0.0, None + metric_for_logging = config = None + self.trial_time = time2eval + self.total_time_used += time_used if self._budget >= 0 else 1 + self.total_iter += 1 + + if self.base_eci is None: + self.base_eci = time_used + if (obj is not None) and (obj < self.best_loss): + self.best_loss_old = self.best_loss if self.best_loss < np.inf else 2 * obj + self.best_loss = obj + self.best_result = result + self.time_best_found_old = self.time_best_found + self.time_best_found = self.total_time_used + self.iter_best_found = self.total_iter + self.best_config = config + self.best_config_sample_size = self.sample_size + self.best_config_train_time = time_used + if time2eval: + self.time2eval_best_old = self.time2eval_best + self.time2eval_best = time2eval + if ( + self.trained_estimator + and trained_estimator + and self.trained_estimator != trained_estimator + ): + self.trained_estimator.cleanup() + if trained_estimator: + self.trained_estimator = trained_estimator + elif trained_estimator: + trained_estimator.cleanup() + self.metric_for_logging = metric_for_logging + self.val_loss, self.config = obj, config + + def get_hist_config_sig(self, sample_size, config): + config_values = tuple([config[k] for k in self._hp_names if k in config]) + config_sig = str(sample_size) + "_" + str(config_values) + return config_sig + + def est_retrain_time(self, retrain_sample_size): + assert ( + self.best_config_sample_size is not None + ), "need to first get best_config_sample_size" + return self.time2eval_best * retrain_sample_size / self.best_config_sample_size + + +class AutoMLState: + def _prepare_sample_train_data(self, sample_size: int): + sampled_weight = groups = None + if sample_size <= self.data_size[0]: + if isinstance(self.X_train, pd.DataFrame): + sampled_X_train = self.X_train.iloc[:sample_size] + else: + sampled_X_train = self.X_train[:sample_size] + if isinstance(self.y_train, pd.Series): + sampled_y_train = self.y_train.iloc[:sample_size] + else: + sampled_y_train = self.y_train[:sample_size] + weight = self.fit_kwargs.get( + "sample_weight" + ) # NOTE: _prepare_sample_train_data is before kwargs is updated to fit_kwargs_by_estimator + if weight is not None: + sampled_weight = ( + weight.iloc[:sample_size] + if isinstance(weight, pd.Series) + else weight[:sample_size] + ) + if self.groups is not None: + groups = ( + self.groups.iloc[:sample_size] + if isinstance(self.groups, pd.Series) + else self.groups[:sample_size] + ) + else: + sampled_X_train = self.X_train_all + sampled_y_train = self.y_train_all + if ( + "sample_weight" in self.fit_kwargs + ): # NOTE: _prepare_sample_train_data is before kwargs is updated to fit_kwargs_by_estimator + sampled_weight = self.sample_weight_all + if self.groups is not None: + groups = self.groups_all + return sampled_X_train, sampled_y_train, sampled_weight, groups + + @staticmethod + def _compute_with_config_base( + config_w_resource: dict, + state: "AutoMLState", + estimator: str, + is_report: bool = True, + ) -> dict: + if "FLAML_sample_size" in config_w_resource: + sample_size = int(config_w_resource["FLAML_sample_size"]) + else: + sample_size = state.data_size[0] + + this_estimator_kwargs = state.fit_kwargs_by_estimator.get( + estimator + ).copy() # NOTE: _compute_with_config_base is after kwargs is updated to fit_kwargs_by_estimator + ( + sampled_X_train, + sampled_y_train, + sampled_weight, + groups, + ) = state._prepare_sample_train_data(sample_size) + if sampled_weight is not None: + weight = this_estimator_kwargs["sample_weight"] + this_estimator_kwargs["sample_weight"] = sampled_weight + if groups is not None: + this_estimator_kwargs["groups"] = groups + config = config_w_resource.copy() + if "FLAML_sample_size" in config: + del config["FLAML_sample_size"] + budget = ( + None + if state.time_budget < 0 + else state.time_budget - state.time_from_start + if sample_size == state.data_size[0] + else (state.time_budget - state.time_from_start) + / 2 + * sample_size + / state.data_size[0] + ) + + ( + trained_estimator, + val_loss, + metric_for_logging, + _, + pred_time, + ) = compute_estimator( + sampled_X_train, + sampled_y_train, + state.X_val, + state.y_val, + state.weight_val, + state.groups_val, + state.train_time_limit + if budget is None + else min(budget, state.train_time_limit or np.inf), + state.kf, + config, + state.task, + estimator, + state.eval_method, + state.metric, + state.best_loss, + state.n_jobs, + state.learner_classes.get(estimator), + state.cv_score_agg_func, + state.log_training_metric, + this_estimator_kwargs, + state.free_mem_ratio, + ) + if state.retrain_final and not state.model_history: + trained_estimator.cleanup() + + result = { + "pred_time": pred_time, + "wall_clock_time": time.time() - state._start_time_flag, + "metric_for_logging": metric_for_logging, + "val_loss": val_loss, + "trained_estimator": trained_estimator, + } + if sampled_weight is not None: + this_estimator_kwargs["sample_weight"] = weight + if is_report is True: + tune.report(**result) + return result + + @classmethod + def sanitize(cls, config: dict) -> dict: + """Make a config ready for passing to estimator.""" + config = config.get("ml", config).copy() + config.pop("FLAML_sample_size", None) + config.pop("learner", None) + config.pop("_choice_", None) + return config + + def _train_with_config( + self, + estimator: str, + config_w_resource: dict, + sample_size: Optional[int] = None, + ): + if not sample_size: + sample_size = config_w_resource.get( + "FLAML_sample_size", len(self.y_train_all) + ) + config = AutoMLState.sanitize(config_w_resource) + + this_estimator_kwargs = self.fit_kwargs_by_estimator.get( + estimator + ).copy() # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator + ( + sampled_X_train, + sampled_y_train, + sampled_weight, + groups, + ) = self._prepare_sample_train_data(sample_size) + if sampled_weight is not None: + weight = this_estimator_kwargs[ + "sample_weight" + ] # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator + this_estimator_kwargs[ + "sample_weight" + ] = sampled_weight # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator + if groups is not None: + this_estimator_kwargs[ + "groups" + ] = groups # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator + + budget = ( + None if self.time_budget < 0 else self.time_budget - self.time_from_start + ) + + estimator, train_time = train_estimator( + X_train=sampled_X_train, + y_train=sampled_y_train, + config_dic=config, + task=self.task, + estimator_name=estimator, + n_jobs=self.n_jobs, + estimator_class=self.learner_classes.get(estimator), + budget=budget, + fit_kwargs=this_estimator_kwargs, # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator + eval_metric=self.metric if hasattr(self, "metric") else "train_time", + free_mem_ratio=self.free_mem_ratio, + ) + + if sampled_weight is not None: + this_estimator_kwargs[ + "sample_weight" + ] = weight # NOTE: _train_with_config is after kwargs is updated to fit_kwargs_by_estimator + + return estimator, train_time diff --git a/flaml/automl/task/__init__.py b/flaml/automl/task/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/flaml/automl/task/factory.py b/flaml/automl/task/factory.py new file mode 100644 index 000000000000..55a45a66c1a2 --- /dev/null +++ b/flaml/automl/task/factory.py @@ -0,0 +1,15 @@ +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from flaml.automl.task.generic_task import GenericTask +from flaml.automl.task.task import Task + + +def task_factory( + task_name: str, + X_train: Optional[Union[np.ndarray, pd.DataFrame]] = None, + y_train: Optional[Union[np.ndarray, pd.DataFrame, pd.Series]] = None, +) -> Task: + return GenericTask(task_name, X_train, y_train) diff --git a/flaml/automl/task/generic_task.py b/flaml/automl/task/generic_task.py new file mode 100644 index 000000000000..dc812bc80d74 --- /dev/null +++ b/flaml/automl/task/generic_task.py @@ -0,0 +1,849 @@ +import logging +import time +from typing import List, Optional + +import pandas as pd +import numpy as np +from scipy.sparse import issparse +from sklearn.utils import shuffle +from sklearn.model_selection import ( + train_test_split, + RepeatedStratifiedKFold, + RepeatedKFold, + GroupKFold, + TimeSeriesSplit, + GroupShuffleSplit, + StratifiedGroupKFold, +) + +from flaml.automl.data import TS_TIMESTAMP_COL, concat +from flaml.automl.ml import EstimatorSubclass, default_cv_score_agg_func, get_val_loss +from flaml.automl.model import ( + XGBoostSklearnEstimator, + XGBoostLimitDepthEstimator, + RandomForestEstimator, + LGBMEstimator, + LRL1Classifier, + LRL2Classifier, + CatBoostEstimator, + ExtraTreesEstimator, + KNeighborsEstimator, + TransformersEstimator, + TransformersEstimatorModelSelection, +) +from flaml.automl.task.task import ( + Task, + get_classification_objective, + TS_FORECAST, + TS_FORECASTPANEL, +) +from flaml.config import RANDOM_SEED + +logger = logging.getLogger(__name__) + + +class GenericTask(Task): + estimators = { + "xgboost": XGBoostSklearnEstimator, + "xgb_limitdepth": XGBoostLimitDepthEstimator, + "rf": RandomForestEstimator, + "lgbm": LGBMEstimator, + "lrl1": LRL1Classifier, + "lrl2": LRL2Classifier, + "catboost": CatBoostEstimator, + "extra_tree": ExtraTreesEstimator, + "kneighbor": KNeighborsEstimator, + "transformer": TransformersEstimator, + "transformer_ms": TransformersEstimatorModelSelection, + } + + def validate_data( + self, + automl, + state, + X_train_all, + y_train_all, + dataframe, + label, + X_val=None, + y_val=None, + groups_val=None, + groups=None, + ): + if X_train_all is not None and y_train_all is not None: + assert ( + isinstance(X_train_all, np.ndarray) + or issparse(X_train_all) + or isinstance(X_train_all, pd.DataFrame) + ), ( + "X_train_all must be a numpy array, a pandas dataframe, " + "or Scipy sparse matrix." + ) + assert isinstance(y_train_all, np.ndarray) or isinstance( + y_train_all, pd.Series + ), "y_train_all must be a numpy array or a pandas series." + assert ( + X_train_all.size != 0 and y_train_all.size != 0 + ), "Input data must not be empty." + if isinstance(X_train_all, np.ndarray) and len(X_train_all.shape) == 1: + X_train_all = np.reshape(X_train_all, (X_train_all.size, 1)) + if isinstance(y_train_all, np.ndarray): + y_train_all = y_train_all.flatten() + assert ( + X_train_all.shape[0] == y_train_all.shape[0] + ), "# rows in X_train must match length of y_train." + automl._df = isinstance(X_train_all, pd.DataFrame) + automl._nrow, automl._ndim = X_train_all.shape + if self.is_ts_forecast(): + X_train_all = pd.DataFrame(X_train_all) + X_train_all, y_train_all = self._validate_ts_data( + X_train_all, y_train_all + ) + X, y = X_train_all, y_train_all + elif dataframe is not None and label is not None: + assert isinstance( + dataframe, pd.DataFrame + ), "dataframe must be a pandas DataFrame" + assert label in dataframe.columns, "label must a column name in dataframe" + automl._df = True + if self.is_ts_forecast(): + dataframe = self._validate_ts_data(dataframe) + X = dataframe.drop(columns=label) + automl._nrow, automl._ndim = X.shape + y = dataframe[label] + else: + raise ValueError("either X_train+y_train or dataframe+label are required") + + # check the validity of input dimensions for NLP tasks, so need to check _is_nlp_task not estimator + if self.is_nlp(): + from flaml.automl.nlp.utils import is_a_list_of_str + + is_all_str = True + is_all_list = True + for column in X.columns: + assert X[column].dtype.name in ( + "object", + "string", + ), "If the task is an NLP task, X can only contain text columns" + for each_cell in X[column]: + if each_cell is not None: + is_str = isinstance(each_cell, str) + is_list_of_int = isinstance(each_cell, list) and all( + isinstance(x, int) for x in each_cell + ) + is_list_of_str = is_a_list_of_str(each_cell) + if self.is_token_classification(): + assert is_list_of_str, ( + "For the token-classification task, the input column needs to be a list of string," + "instead of string, e.g., ['EU', 'rejects','German', 'call','to','boycott','British','lamb','.',].", + "For more examples, please refer to test/nlp/test_autohf_tokenclassification.py", + ) + else: + assert is_str or is_list_of_int, ( + "Each column of the input must either be str (untokenized) " + "or a list of integers (tokenized)" + ) + is_all_str &= is_str + is_all_list &= is_list_of_int or is_list_of_str + assert is_all_str or is_all_list, ( + "Currently FLAML only supports two modes for NLP: either all columns of X are string (non-tokenized), " + "or all columns of X are integer ids (tokenized)" + ) + + if issparse(X_train_all) or automl._skip_transform: + automl._transformer = automl._label_transformer = False + automl._X_train_all, automl._y_train_all = X, y + else: + from flaml.automl.data import DataTransformer + + automl._transformer = DataTransformer() + + ( + automl._X_train_all, + automl._y_train_all, + ) = automl._transformer.fit_transform(X, y, self) + automl._label_transformer = automl._transformer.label_transformer + if self.is_token_classification(): + if hasattr(automl._label_transformer, "label_list"): + state.fit_kwargs.update( + {"label_list": automl._label_transformer.label_list} + ) + elif "label_list" not in state.fit_kwargs: + for each_fit_kwargs in state.fit_kwargs_by_estimator.values(): + assert ( + "label_list" in each_fit_kwargs + ), "For the token-classification task, you must either (1) pass token labels; or (2) pass id labels and the label list. " + "Please refer to the documentation for more details: https://microsoft.github.io/FLAML/docs/Examples/AutoML-NLP#a-simple-token-classification-example" + automl._feature_names_in_ = ( + automl._X_train_all.columns.to_list() + if hasattr(automl._X_train_all, "columns") + else None + ) + + automl._sample_weight_full = state.fit_kwargs.get( + "sample_weight" + ) # NOTE: _validate_data is before kwargs is updated to fit_kwargs_by_estimator + if X_val is not None and y_val is not None: + assert ( + isinstance(X_val, np.ndarray) + or issparse(X_val) + or isinstance(X_val, pd.DataFrame) + ), ( + "X_val must be None, a numpy array, a pandas dataframe, " + "or Scipy sparse matrix." + ) + assert isinstance(y_val, np.ndarray) or isinstance( + y_val, pd.Series + ), "y_val must be None, a numpy array or a pandas series." + assert X_val.size != 0 and y_val.size != 0, ( + "Validation data are expected to be nonempty. " + "Use None for X_val and y_val if no validation data." + ) + if isinstance(y_val, np.ndarray): + y_val = y_val.flatten() + assert ( + X_val.shape[0] == y_val.shape[0] + ), "# rows in X_val must match length of y_val." + if automl._transformer: + state.X_val = automl._transformer.transform(X_val) + else: + state.X_val = X_val + # If it's NLG_TASKS, y_val is a pandas series containing the output sequence tokens, + # so we cannot use label_transformer.transform to process it + if automl._label_transformer: + state.y_val = automl._label_transformer.transform(y_val) + else: + state.y_val = y_val + else: + state.X_val = state.y_val = None + + if groups is not None and len(groups) != automl._nrow: + # groups is given as group counts + state.groups = np.concatenate([[i] * c for i, c in enumerate(groups)]) + assert ( + len(state.groups) == automl._nrow + ), "the sum of group counts must match the number of examples" + state.groups_val = ( + np.concatenate([[i] * c for i, c in enumerate(groups_val)]) + if groups_val is not None + else None + ) + else: + state.groups_val = groups_val + state.groups = groups + + @staticmethod + def _validate_ts_data( + dataframe, + y_train_all=None, + ): + assert ( + dataframe[dataframe.columns[0]].dtype.name == "datetime64[ns]" + ), f"For '{TS_FORECAST}' task, the first column must contain timestamp values." + if y_train_all is not None: + y_df = ( + pd.DataFrame(y_train_all) + if isinstance(y_train_all, pd.Series) + else pd.DataFrame(y_train_all, columns=["labels"]) + ) + dataframe = dataframe.join(y_df) + duplicates = dataframe.duplicated() + if any(duplicates): + logger.warning( + "Duplicate timestamp values found in timestamp column. " + f"\n{dataframe.loc[duplicates, dataframe][dataframe.columns[0]]}" + ) + dataframe = dataframe.drop_duplicates() + logger.warning("Removed duplicate rows based on all columns") + assert ( + dataframe[[dataframe.columns[0]]].duplicated() is None + ), "Duplicate timestamp values with different values for other columns." + ts_series = pd.to_datetime(dataframe[dataframe.columns[0]]) + inferred_freq = pd.infer_freq(ts_series) + if inferred_freq is None: + logger.warning( + "Missing timestamps detected. To avoid error with estimators, set estimator list to ['prophet']. " + ) + if y_train_all is not None: + return dataframe.iloc[:, :-1], dataframe.iloc[:, -1] + return dataframe + + def prepare_data( + self, + state, + X_train_all, + y_train_all, + auto_augment, + eval_method, + split_type, + split_ratio, + n_splits, + data_is_df, + sample_weight_full, + ) -> int: + X_val, y_val = state.X_val, state.y_val + if issparse(X_val): + X_val = X_val.tocsr() + if issparse(X_train_all): + X_train_all = X_train_all.tocsr() + if ( + self.is_classification() + and auto_augment + and state.fit_kwargs.get("sample_weight") + is None # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + and split_type in ["stratified", "uniform"] + and not self.is_token_classification() + ): + # logger.info(f"label {pd.unique(y_train_all)}") + label_set, counts = np.unique(y_train_all, return_counts=True) + # augment rare classes + rare_threshld = 20 + rare = counts < rare_threshld + rare_label, rare_counts = label_set[rare], counts[rare] + for i, label in enumerate(rare_label): + count = rare_count = rare_counts[i] + rare_index = y_train_all == label + n = len(y_train_all) + while count < rare_threshld: + if data_is_df: + X_train_all = concat( + X_train_all, X_train_all.iloc[:n].loc[rare_index] + ) + else: + X_train_all = concat( + X_train_all, X_train_all[:n][rare_index, :] + ) + if isinstance(y_train_all, pd.Series): + y_train_all = concat( + y_train_all, y_train_all.iloc[:n].loc[rare_index] + ) + else: + y_train_all = np.concatenate( + [y_train_all, y_train_all[:n][rare_index]] + ) + count += rare_count + logger.info(f"class {label} augmented from {rare_count} to {count}") + SHUFFLE_SPLIT_TYPES = ["uniform", "stratified"] + if split_type in SHUFFLE_SPLIT_TYPES: + if sample_weight_full is not None: + X_train_all, y_train_all, state.sample_weight_all = shuffle( + X_train_all, + y_train_all, + sample_weight_full, + random_state=RANDOM_SEED, + ) + state.fit_kwargs[ + "sample_weight" + ] = ( + state.sample_weight_all + ) # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + if isinstance(state.sample_weight_all, pd.Series): + state.sample_weight_all.reset_index(drop=True, inplace=True) + else: + X_train_all, y_train_all = shuffle( + X_train_all, y_train_all, random_state=RANDOM_SEED + ) + if data_is_df: + X_train_all.reset_index(drop=True, inplace=True) + if isinstance(y_train_all, pd.Series): + y_train_all.reset_index(drop=True, inplace=True) + + X_train, y_train = X_train_all, y_train_all + state.groups_all = state.groups + if X_val is None and eval_method == "holdout": + # if eval_method = holdout, make holdout data + if split_type == "time": + if self.is_ts_forecast(): + period = state.fit_kwargs[ + "period" + ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + if self.is_ts_forecastpanel(): + X_train_all["time_idx"] -= X_train_all["time_idx"].min() + X_train_all["time_idx"] = X_train_all["time_idx"].astype("int") + ids = state.fit_kwargs["group_ids"].copy() + ids.append(TS_TIMESTAMP_COL) + ids.append("time_idx") + y_train_all = pd.DataFrame(y_train_all) + y_train_all[ids] = X_train_all[ids] + X_train_all = X_train_all.sort_values(ids) + y_train_all = y_train_all.sort_values(ids) + training_cutoff = X_train_all["time_idx"].max() - period + X_train = X_train_all[lambda x: x.time_idx <= training_cutoff] + y_train = y_train_all[ + lambda x: x.time_idx <= training_cutoff + ].drop(columns=ids) + X_val = X_train_all[lambda x: x.time_idx > training_cutoff] + y_val = y_train_all[ + lambda x: x.time_idx > training_cutoff + ].drop(columns=ids) + else: + num_samples = X_train_all.shape[0] + assert ( + period < num_samples + ), f"period={period}>#examples={num_samples}" + split_idx = num_samples - period + X_train = X_train_all[:split_idx] + y_train = y_train_all[:split_idx] + X_val = X_train_all[split_idx:] + y_val = y_train_all[split_idx:] + else: + if ( + "sample_weight" in state.fit_kwargs + ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + ( + X_train, + X_val, + y_train, + y_val, + state.fit_kwargs[ + "sample_weight" + ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + state.weight_val, + ) = train_test_split( + X_train_all, + y_train_all, + state.fit_kwargs[ + "sample_weight" + ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + test_size=split_ratio, + shuffle=False, + ) + else: + X_train, X_val, y_train, y_val = train_test_split( + X_train_all, + y_train_all, + test_size=split_ratio, + shuffle=False, + ) + elif split_type == "group": + gss = GroupShuffleSplit( + n_splits=1, test_size=split_ratio, random_state=RANDOM_SEED + ) + for train_idx, val_idx in gss.split( + X_train_all, y_train_all, state.groups_all + ): + if data_is_df: + X_train = X_train_all.iloc[train_idx] + X_val = X_train_all.iloc[val_idx] + else: + X_train, X_val = X_train_all[train_idx], X_train_all[val_idx] + y_train, y_val = y_train_all[train_idx], y_train_all[val_idx] + state.groups = state.groups_all[train_idx] + state.groups_val = state.groups_all[val_idx] + elif self.is_classification(): + # for classification, make sure the labels are complete in both + # training and validation data + label_set, first = np.unique(y_train_all, return_index=True) + rest = [] + last = 0 + first.sort() + for i in range(len(first)): + rest.extend(range(last, first[i])) + last = first[i] + 1 + rest.extend(range(last, len(y_train_all))) + X_first = X_train_all.iloc[first] if data_is_df else X_train_all[first] + X_rest = X_train_all.iloc[rest] if data_is_df else X_train_all[rest] + y_rest = y_train_all[rest] + stratify = y_rest if split_type == "stratified" else None + if ( + "sample_weight" in state.fit_kwargs + ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + ( + X_train, + X_val, + y_train, + y_val, + weight_train, + weight_val, + ) = train_test_split( + X_rest, + y_rest, + state.fit_kwargs["sample_weight"][ + rest + ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + test_size=split_ratio, + stratify=stratify, + random_state=RANDOM_SEED, + ) + weight1 = state.fit_kwargs["sample_weight"][ + first + ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + state.weight_val = concat(weight1, weight_val) + state.fit_kwargs[ + "sample_weight" + ] = concat( # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + weight1, weight_train + ) + else: + X_train, X_val, y_train, y_val = train_test_split( + X_rest, + y_rest, + test_size=split_ratio, + stratify=stratify, + random_state=RANDOM_SEED, + ) + X_train = concat(X_first, X_train) + y_train = ( + concat(label_set, y_train) + if data_is_df + else np.concatenate([label_set, y_train]) + ) + X_val = concat(X_first, X_val) + y_val = ( + concat(label_set, y_val) + if data_is_df + else np.concatenate([label_set, y_val]) + ) + elif self.is_regression(): + if ( + "sample_weight" in state.fit_kwargs + ): # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + ( + X_train, + X_val, + y_train, + y_val, + state.fit_kwargs[ + "sample_weight" + ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + state.weight_val, + ) = train_test_split( + X_train_all, + y_train_all, + state.fit_kwargs[ + "sample_weight" + ], # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + test_size=split_ratio, + random_state=RANDOM_SEED, + ) + else: + X_train, X_val, y_train, y_val = train_test_split( + X_train_all, + y_train_all, + test_size=split_ratio, + random_state=RANDOM_SEED, + ) + state.data_size = X_train.shape + state.X_train, state.y_train = X_train, y_train + state.X_val, state.y_val = X_val, y_val + state.X_train_all = X_train_all + state.y_train_all = y_train_all + if eval_method == "holdout": + state.kf = None + return + if split_type == "group": + # logger.info("Using GroupKFold") + assert ( + len(state.groups_all) == y_train_all.size + ), "the length of groups must match the number of examples" + assert ( + len(np.unique(state.groups_all)) >= n_splits + ), "the number of groups must be equal or larger than n_splits" + state.kf = GroupKFold(n_splits) + elif split_type == "stratified": + # logger.info("Using StratifiedKFold") + assert y_train_all.size >= n_splits, ( + f"{n_splits}-fold cross validation" + f" requires input data with at least {n_splits} examples." + ) + assert y_train_all.size >= 2 * n_splits, ( + f"{n_splits}-fold cross validation with metric=r2 " + f"requires input data with at least {n_splits*2} examples." + ) + state.kf = RepeatedStratifiedKFold( + n_splits=n_splits, n_repeats=1, random_state=RANDOM_SEED + ) + elif split_type == "time": + # logger.info("Using TimeSeriesSplit") + if self.is_ts_forecast() and not self.is_ts_forecastpanel(): + period = state.fit_kwargs[ + "period" + ] # NOTE: _prepare_data is before kwargs is updated to fit_kwargs_by_estimator + if period * (n_splits + 1) > y_train_all.size: + n_splits = int(y_train_all.size / period - 1) + assert n_splits >= 2, ( + f"cross validation for forecasting period={period}" + f" requires input data with at least {3 * period} examples." + ) + logger.info(f"Using nsplits={n_splits} due to data size limit.") + state.kf = TimeSeriesSplit(n_splits=n_splits, test_size=period) + elif self.is_ts_forecastpanel(): + n_groups = X_train.groupby(state.fit_kwargs.get("group_ids")).ngroups + period = state.fit_kwargs.get("period") + state.kf = TimeSeriesSplit( + n_splits=n_splits, test_size=period * n_groups + ) + else: + state.kf = TimeSeriesSplit(n_splits=n_splits) + elif isinstance(split_type, str): + # logger.info("Using RepeatedKFold") + state.kf = RepeatedKFold( + n_splits=n_splits, n_repeats=1, random_state=RANDOM_SEED + ) + else: + # logger.info("Using splitter object") + state.kf = split_type + if isinstance(state.kf, (GroupKFold, StratifiedGroupKFold)): + # self._split_type is either "group", a GroupKFold object, or a StratifiedGroupKFold object + state.kf.groups = state.groups_all + + def decide_split_type( + self, + split_type, + y_train_all, + fit_kwargs, + groups=None, + ) -> str: + if self.name == "classification": + self.name = get_classification_objective(len(np.unique(y_train_all))) + if not isinstance(split_type, str): + assert hasattr(split_type, "split") and hasattr( + split_type, "get_n_splits" + ), "split_type must be a string or a splitter object with split and get_n_splits methods." + assert ( + not isinstance(split_type, GroupKFold) or groups is not None + ), "GroupKFold requires groups to be provided." + return split_type + + elif self.is_ts_forecast(): + assert split_type in ["auto", "time"] + assert isinstance( + fit_kwargs.get("period"), + int, # NOTE: _decide_split_type is before kwargs is updated to fit_kwargs_by_estimator + ), f"missing a required integer 'period' for '{TS_FORECAST}' task." + if fit_kwargs.get("group_ids"): + # TODO (MARK) This will likely not play well with the task class + self.name = TS_FORECASTPANEL + assert isinstance( + fit_kwargs.get("group_ids"), list + ), f"missing a required List[str] 'group_ids' for '{TS_FORECASTPANEL}' task." + return "time" + + elif self.is_classification(): + assert split_type in ["auto", "stratified", "uniform", "time", "group"] + return ( + split_type + if split_type != "auto" + else groups is None and "stratified" or "group" + ) + + elif self.is_regression(): + assert split_type in ["auto", "uniform", "time", "group"] + return split_type if split_type != "auto" else "uniform" + + elif self.is_rank(): + assert groups is not None, "groups must be specified for ranking task." + assert split_type in ["auto", "group"] + return "group" + + elif self.is_nlg(): + assert split_type in ["auto", "uniform", "time", "group"] + return split_type if split_type != "auto" else "uniform" + + def preprocess(self, X, transformer=None): + if isinstance(X, List): + try: + if isinstance(X[0], List): + X = [x for x in zip(*X)] + X = pd.DataFrame( + dict( + [ + (transformer._str_columns[idx], X[idx]) + if isinstance(X[0], List) + else (transformer._str_columns[idx], [X[idx]]) + for idx in range(len(X)) + ] + ) + ) + except IndexError: + raise IndexError( + "Test data contains more columns than training data, exiting" + ) + elif isinstance(X, int): + return X + elif issparse(X): + X = X.tocsr() + if self.is_ts_forecast(): + X = pd.DataFrame(X) + if transformer: + X = transformer.transform(X) + return X + + def evaluate_model_CV( + self, + config: dict, + estimator: EstimatorSubclass, + X_train_all, + y_train_all, + budget, + kf, + eval_metric, + best_val_loss, + cv_score_agg_func=None, + log_training_metric=False, + fit_kwargs: Optional[dict] = None, + free_mem_ratio=0, + ): + if fit_kwargs is None: + fit_kwargs = {} + if cv_score_agg_func is None: + cv_score_agg_func = default_cv_score_agg_func + start_time = time.time() + val_loss_folds = [] + log_metric_folds = [] + metric = None + train_time = pred_time = 0 + total_fold_num = 0 + n = kf.get_n_splits() + X_train_split, y_train_split = X_train_all, y_train_all + if self.is_classification(): + labels = np.unique(y_train_all) + else: + labels = fit_kwargs.get( + "label_list" + ) # pass the label list on to compute the evaluation metric + groups = None + shuffle = getattr(kf, "shuffle", not self.is_ts_forecast()) + if isinstance(kf, RepeatedStratifiedKFold): + kf = kf.split(X_train_split, y_train_split) + elif isinstance(kf, (GroupKFold, StratifiedGroupKFold)): + groups = kf.groups + kf = kf.split(X_train_split, y_train_split, groups) + shuffle = False + elif isinstance(kf, TimeSeriesSplit): + kf = kf.split(X_train_split, y_train_split) + else: + kf = kf.split(X_train_split) + rng = np.random.RandomState(2020) + budget_per_train = budget and budget / n + if "sample_weight" in fit_kwargs: + weight = fit_kwargs["sample_weight"] + weight_val = None + else: + weight = weight_val = None + for train_index, val_index in kf: + if shuffle: + train_index = rng.permutation(train_index) + if isinstance(X_train_all, pd.DataFrame): + X_train = X_train_split.iloc[train_index] + X_val = X_train_split.iloc[val_index] + else: + X_train, X_val = X_train_split[train_index], X_train_split[val_index] + y_train, y_val = y_train_split[train_index], y_train_split[val_index] + estimator.cleanup() + if weight is not None: + fit_kwargs["sample_weight"], weight_val = ( + weight[train_index], + weight[val_index], + ) + if groups is not None: + fit_kwargs["groups"] = ( + groups[train_index] + if isinstance(groups, np.ndarray) + else groups.iloc[train_index] + ) + groups_val = ( + groups[val_index] + if isinstance(groups, np.ndarray) + else groups.iloc[val_index] + ) + else: + groups_val = None + val_loss_i, metric_i, train_time_i, pred_time_i = get_val_loss( + config, + estimator, + X_train, + y_train, + X_val, + y_val, + weight_val, + groups_val, + eval_metric, + self, + labels, + budget_per_train, + log_training_metric=log_training_metric, + fit_kwargs=fit_kwargs, + free_mem_ratio=free_mem_ratio, + ) + if isinstance(metric_i, dict) and "intermediate_results" in metric_i.keys(): + del metric_i["intermediate_results"] + if weight is not None: + fit_kwargs["sample_weight"] = weight + total_fold_num += 1 + val_loss_folds.append(val_loss_i) + log_metric_folds.append(metric_i) + train_time += train_time_i + pred_time += pred_time_i + if budget and time.time() - start_time >= budget: + break + val_loss, metric = cv_score_agg_func(val_loss_folds, log_metric_folds) + n = total_fold_num + pred_time /= n + return val_loss, metric, train_time, pred_time + + def default_estimator_list(self, estimator_list: List[str]) -> List[str]: + if "auto" != estimator_list: + return estimator_list + if self.is_rank(): + estimator_list = ["lgbm", "xgboost", "xgb_limitdepth"] + elif self.is_nlp(): + estimator_list = ["transformer"] + elif self.is_ts_forecastpanel(): + estimator_list = ["tft"] + else: + try: + import catboost + + estimator_list = [ + "lgbm", + "rf", + "catboost", + "xgboost", + "extra_tree", + "xgb_limitdepth", + ] + except ImportError: + estimator_list = [ + "lgbm", + "rf", + "xgboost", + "extra_tree", + "xgb_limitdepth", + ] + if self.is_ts_forecast(): + # catboost is removed because it has a `name` parameter, making it incompatible with hcrystalball + if "catboost" in estimator_list: + estimator_list.remove("catboost") + if self.is_ts_forecastregression(): + try: + import prophet + + estimator_list += ["prophet", "arima", "sarimax"] + except ImportError: + estimator_list += ["arima", "sarimax"] + elif not self.is_regression(): + estimator_list += ["lrl1"] + + return estimator_list + + def default_metric(self, metric: str) -> str: + if "auto" != metric: + return metric + + if self.is_nlp(): + from flaml.automl.nlp.utils import ( + load_default_huggingface_metric_for_task, + ) + + return load_default_huggingface_metric_for_task(self.name) + elif self.is_binary(): + return "roc_auc" + elif self.is_multiclass(): + return "log_loss" + elif self.is_ts_forecast(): + return "mape" + elif self.is_rank(): + return "ndcg" + else: + return "r2" diff --git a/flaml/automl/task/task.py b/flaml/automl/task/task.py new file mode 100644 index 000000000000..09c38a52a587 --- /dev/null +++ b/flaml/automl/task/task.py @@ -0,0 +1,354 @@ +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, List, Optional, Tuple, Union + +import numpy as np +import pandas as pd + +if TYPE_CHECKING: + import flaml + +try: + import ray +except ImportError: + ray = None + +# TODO: if your task is not specified in here, define your task as an all-capitalized word +SEQCLASSIFICATION = "seq-classification" +MULTICHOICECLASSIFICATION = "multichoice-classification" +TOKENCLASSIFICATION = "token-classification" + +SEQREGRESSION = "seq-regression" + +TS_FORECASTREGRESSION = ( + "forecast", + "ts_forecast", + "ts_forecast_regression", +) +REGRESSION = ("regression", SEQREGRESSION, *TS_FORECASTREGRESSION) +TS_FORECASTCLASSIFICATION = "ts_forecast_classification" +TS_FORECASTPANEL = "ts_forecast_panel" +TS_FORECAST = ( + *TS_FORECASTREGRESSION, + TS_FORECASTCLASSIFICATION, + TS_FORECASTPANEL, +) +CLASSIFICATION = ( + "binary", + "multiclass", + "classification", + SEQCLASSIFICATION, + MULTICHOICECLASSIFICATION, + TOKENCLASSIFICATION, + TS_FORECASTCLASSIFICATION, +) +RANK = ("rank",) +SUMMARIZATION = "summarization" +NLG_TASKS = (SUMMARIZATION,) +NLU_TASKS = ( + SEQREGRESSION, + SEQCLASSIFICATION, + MULTICHOICECLASSIFICATION, + TOKENCLASSIFICATION, +) +NLP_TASKS = (*NLG_TASKS, *NLU_TASKS) + + +def get_classification_objective(num_labels: int) -> str: + if num_labels == 2: + objective_name = "binary" + else: + objective_name = "multiclass" + return objective_name + + +class Task(ABC): + """ + Abstract base class for a machine learning task. + + Class definitions should implement abstract methods and provide a non-empty dictionary of estimator classes. + A Task can be suitable to be used for multiple machine-learning tasks (e.g. classification or regression) or be + implemented specifically for a single one depending on the generality of data validation and model evaluation methods + implemented. The implementation of a Task may optionally use the training data and labels to determine data and task + specific details, such as in determining if a problem is single-label or multi-label. + + FLAML evaluates at runtime how to behave exactly, relying on the task instance to provide implementations of + operations which vary between tasks. + """ + + estimators = {} + + def __init__( + self, + task_name: str, + X_train: Optional[Union[np.ndarray, pd.DataFrame]] = None, + y_train: Optional[Union[np.ndarray, pd.DataFrame, pd.Series]] = None, + ): + """Constructor. + + Args: + task_name: String name for this type of task. Used when the Task can be generic and implement a number of + types of sub-task. + X_train: Optional. Some Task types may use the data shape or features to determine details of their usage, + such as in binary vs multilabel classification. + y_train: Optional. Some Task types may use the data shape or features to determine details of their usage, + such as in binary vs multilabel classification. + """ + self.name = task_name + + def __str__(self) -> str: + """Name of this task type.""" + return self.name + + @abstractmethod + def evaluate_model_CV( + self, + config: dict, + estimator: "flaml.automl.ml.BaseEstimator", + X_train_all: Union[np.ndarray, pd.DataFrame], + y_train_all: Union[np.ndarray, pd.DataFrame, pd.Series], + budget: int, + kf, + eval_metric: str, + best_val_loss: float, + log_training_metric: bool = False, + fit_kwargs: Optional[dict] = {}, + ) -> Tuple[float, float, float, float]: + """Evaluate the model using cross-validation. + + Args: + config: configuration used in the evaluation of the metric. + estimator: Estimator class of the model. + X_train_all: Complete training feature data. + y_train_all: Complete training target data. + budget: Training time budget. + kf: Cross-validation index generator. + eval_metric: Metric name to be used for evaluation. + best_val_loss: Best current validation-set loss. + log_training_metric: Bool defaults False. Enables logging of the training metric. + fit_kwargs: Additional kwargs passed to the estimator's fit method. + + Returns: + validation loss, metric value, train time, prediction time + """ + + @abstractmethod + def validate_data( + self, + automl: "flaml.automl.automl.AutoML", + state: "flaml.automl.state.AutoMLState", + X_train_all: Union[np.ndarray, pd.DataFrame, None], + y_train_all: Union[np.ndarray, pd.DataFrame, pd.Series, None], + dataframe: Union[pd.DataFrame, None], + label: str, + X_val: Optional[Union[np.ndarray, pd.DataFrame]] = None, + y_val: Optional[Union[np.ndarray, pd.DataFrame, pd.Series]] = None, + groups_val: Optional[List[str]] = None, + groups: Optional[List[str]] = None, + ): + """Validate that the data is suitable for this task type. + + Args: + automl: The AutoML instance from which this task has been constructed. + state: The AutoMLState instance for this run. + X_train_all: The complete data set or None if dataframe is supplied. + y_train_all: The complete target set or None if dataframe is supplied. + dataframe: A dataframe constaining the complete data set with targets. + label: The name of the target column in dataframe. + X_val: Optional. A data set for validation. + y_val: Optional. A target vector corresponding to X_val for validation. + groups_val: Group labels (with matching length to y_val) or group counts (with sum equal to length of y_val) + for validation data. Need to be consistent with groups. + groups: Group labels (with matching length to y_train) or groups counts (with sum equal to length of y_train) + for training data. + + Raises: + AssertionError: The data provided is invalid for this task type and configuration. + """ + + @abstractmethod + def prepare_data( + self, + state: "flaml.automl.state.AutoMLState", + X_train_all: Union[np.ndarray, pd.DataFrame], + y_train_all: Union[np.ndarray, pd.DataFrame, pd.Series, None], + auto_augment: bool, + eval_method: str, + split_type: str, + split_ratio: float, + n_splits: int, + data_is_df: bool, + sample_weight_full: Optional[List[float]] = None, + ): + """Prepare the data for fitting or inference. + + Args: + automl: The AutoML instance from which this task has been constructed. + state: The AutoMLState instance for this run. + X_train_all: The complete data set or None if dataframe is supplied. Must + contain the target if y_train_all is None + y_train_all: The complete target set or None if supplied in X_train_all. + auto_augment: If true, task-specific data augmentations will be applied. + eval_method: A string of resampling strategy, one of ['auto', 'cv', 'holdout']. + split_type: str or splitter object, default="auto" | the data split type. + * A valid splitter object is an instance of a derived class of scikit-learn + [KFold](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.KFold.html#sklearn.model_selection.KFold) + and have ``split`` and ``get_n_splits`` methods with the same signatures. + Set eval_method to "cv" to use the splitter object. + * Valid str options depend on different tasks. + For classification tasks, valid choices are + ["auto", 'stratified', 'uniform', 'time', 'group']. "auto" -> stratified. + For regression tasks, valid choices are ["auto", 'uniform', 'time']. + "auto" -> uniform. + For time series forecast tasks, must be "auto" or 'time'. + For ranking task, must be "auto" or 'group'. + split_ratio: A float of the valiation data percentage for holdout. + n_splits: An integer of the number of folds for cross - validation. + data_is_df: True if the data was provided as a pd.DataFrame else False. + sample_weight_full: A 1d arraylike of the sample weight. + + Raises: + AssertionError: The configuration provided is invalid for this task type and data. + """ + + @abstractmethod + def decide_split_type( + self, + split_type: str, + y_train_all: Union[np.ndarray, pd.DataFrame, pd.Series, None], + fit_kwargs: dict, + groups: Optional[List[str]] = None, + ) -> str: + """Choose an appropriate data split type for this data and task. + + If split_type is 'auto' then this is determined based on the task type and data. + If a specific split_type is requested then the choice is validated to be appropriate. + + Args: + split_type: Either 'auto' or a task appropriate split type. + y_train_all: The complete set of targets. + fit_kwargs: Additional kwargs passed to the estimator's fit method. + groups: Optional. Group labels (with matching length to y_train) or groups counts (with sum equal to length + of y_train) for training data. + + Returns: + The determined appropriate split type. + + Raises: + AssertionError: The requested split_type is invalid for this task, configuration and data. + """ + + @abstractmethod + def preprocess( + self, + X: Union[np.ndarray, pd.DataFrame], + transformer: Optional["flaml.automl.data.DataTransformer"] = None, + ) -> Union[np.ndarray, pd.DataFrame]: + """Preprocess the data ready for fitting or inference with this task type. + + Args: + X: The data set to process. + transformer: A DataTransformer instance to be used in processing. + + Returns: + The preprocessed data set having the same type as the input. + """ + + @abstractmethod + def default_estimator_list( + self, estimator_list: Union[List[str], str] = "auto" + ) -> List[str]: + """Return the list of default estimators registered for this task type. + + If 'auto' is provided then the default list is returned, else the provided list will be validated given this task + type. + + Args: + estimator_list: Either 'auto' or a list of estimator names to be validated. + + Returns: + A list of valid estimator names for this task type. + """ + + @abstractmethod + def default_metric(self, metric: str) -> str: + """Return the default metric for this task type. + + If 'auto' is provided then the default metric for this task will be returned. Otherwise, the provided metric name + is validated for this task type. + + Args: + metric: The name of a metric to be used in evaluation of models during fitting or validation. + + Returns: + The default metric, or the provided metric if it is valid for this task type. + """ + + def is_ts_forecast(self) -> bool: + return self.name in TS_FORECAST + + def is_ts_forecastpanel(self) -> bool: + return self.name == TS_FORECASTPANEL + + def is_ts_forecastregression(self) -> bool: + return self.name in TS_FORECASTREGRESSION + + def is_nlp(self) -> bool: + return self.name in NLP_TASKS + + def is_nlg(self) -> bool: + return self.name in NLG_TASKS + + def is_classification(self) -> bool: + return self.name in CLASSIFICATION + + def is_rank(self) -> bool: + return self.name in RANK + + def is_binary(self) -> bool: + return self.name == "binary" + + def is_seq_regression(self) -> bool: + return self.name == SEQREGRESSION + + def is_seq_classification(self) -> bool: + return self.name == SEQCLASSIFICATION + + def is_token_classification(self) -> bool: + return self.name == TOKENCLASSIFICATION + + def is_summarization(self) -> bool: + return self.name == SUMMARIZATION + + def is_multiclass(self) -> bool: + return "multiclass" in self.name + + def is_regression(self) -> bool: + return self.name in REGRESSION + + def __eq__(self, other: str) -> bool: + """For backward compatibility with all the string comparisons to task""" + return self.name == other + + @classmethod + def estimator_class_from_str( + cls, estimator_name: str + ) -> "flaml.automl.ml.BaseEstimator": + """Determine the estimator class corresponding to the provided name. + + Args: + estimator_name: Name of the desired estimator. + + Returns: + The estimator class corresponding to the provided name. + + Raises: + ValueError: The provided estimator_name has not been registered for this task type. + """ + if estimator_name in cls.estimators: + return cls.estimators[estimator_name] + else: + raise ValueError( + f"{estimator_name} is not a built-in learner for this task type, " + f"only {list(cls.estimators.keys())} are supported." + "Please use AutoML.add_learner() to add a customized learner." + ) diff --git a/flaml/default/estimator.py b/flaml/default/estimator.py index f5eaf58753c6..3465d87097ac 100644 --- a/flaml/default/estimator.py +++ b/flaml/default/estimator.py @@ -1,6 +1,6 @@ import sklearn.ensemble as ensemble from functools import wraps -from flaml.automl.data import CLASSIFICATION +from flaml.automl.task.task import CLASSIFICATION from .suggest import preprocess_and_suggest_hyperparams DEFAULT_LOCATION = "default_location" diff --git a/flaml/default/suggest.py b/flaml/default/suggest.py index e5f99569b179..dc6503f4f266 100644 --- a/flaml/default/suggest.py +++ b/flaml/default/suggest.py @@ -3,8 +3,9 @@ import logging import pathlib import json -from flaml.automl.data import CLASSIFICATION, DataTransformer -from flaml.automl.ml import get_estimator_class, get_classification_objective +from flaml.automl.data import DataTransformer +from flaml.automl.task.task import CLASSIFICATION, get_classification_objective +from flaml.automl.ml import get_estimator_class from flaml.version import __version__ LOCATION = pathlib.Path(__file__).parent.resolve() @@ -45,6 +46,7 @@ def meta_feature(task, X_train, y_train, meta_feature_names): def load_config_predictor(estimator_name, task, location=None): + task = str(task) key = f"{location}/{estimator_name}/{task}" predictor = CONFIG_PREDICTORS.get(key) if predictor: diff --git a/notebook/automl_classification.ipynb b/notebook/automl_classification.ipynb index c1633820341f..1ea9a0720b31 100644 --- a/notebook/automl_classification.ipynb +++ b/notebook/automl_classification.ipynb @@ -29,7 +29,7 @@ "\n", "FLAML requires `Python>=3.7`. To run this notebook example, please install flaml with the `notebook` option:\n", "```bash\n", - "pip install flaml[notebook]\n", + "pip install flaml[notebook]==1.1.2\n", "```" ] }, @@ -39,7 +39,7 @@ "metadata": {}, "outputs": [], "source": [ - "# %pip install flaml[notebook]" + "%pip install flaml[notebook]==1.1.2" ] }, { diff --git a/notebook/automl_time_series_forecast.ipynb b/notebook/automl_time_series_forecast.ipynb index 945d919ca9f5..793f089630d7 100644 --- a/notebook/automl_time_series_forecast.ipynb +++ b/notebook/automl_time_series_forecast.ipynb @@ -156,7 +156,7 @@ } ], "source": [ - "%pip install flaml[notebook,ts_forecast]\n", + "%pip install flaml[notebook,ts_forecast]==1.1.2\n", "# avoid version 1.0.2 to 1.0.5 for this notebook due to a bug for arima and sarimax's init config" ] }, diff --git a/notebook/automl_xgboost.ipynb b/notebook/automl_xgboost.ipynb index 9c99ca01d25d..c2429fa8f81a 100644 --- a/notebook/automl_xgboost.ipynb +++ b/notebook/automl_xgboost.ipynb @@ -29,7 +29,7 @@ "\n", "FLAML requires `Python>=3.7`. To run this notebook example, please install flaml with the `notebook` option:\n", "```bash\n", - "pip install flaml[notebook]\n", + "pip install flaml[notebook]==1.1.2\n", "```" ] }, @@ -39,7 +39,7 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install flaml[notebook]==1.0.8" + "%pip install flaml[notebook]==1.1.2" ] }, { diff --git a/notebook/autovw.ipynb b/notebook/autovw.ipynb index 9292ad666337..cc642d6ffad3 100644 --- a/notebook/autovw.ipynb +++ b/notebook/autovw.ipynb @@ -31,7 +31,7 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install flaml[notebook,vw]" + "%pip install flaml[notebook,vw]==1.1.2" ] }, { diff --git a/test/automl/test_multiclass.py b/test/automl/test_multiclass.py index 3b6f70732938..cb82eb02bd43 100644 --- a/test/automl/test_multiclass.py +++ b/test/automl/test_multiclass.py @@ -3,7 +3,7 @@ import scipy.sparse from sklearn.datasets import load_iris, load_wine from flaml import AutoML -from flaml.automl.data import CLASSIFICATION, get_output_from_log +from flaml.automl.data import get_output_from_log from flaml.automl.model import LGBMEstimator, XGBoostSklearnEstimator, SKLearnEstimator from flaml import tune from flaml.automl.training_log import training_log_reader @@ -13,7 +13,12 @@ class MyRegularizedGreedyForest(SKLearnEstimator): def __init__(self, task="binary", **config): super().__init__(task, **config) - if task in CLASSIFICATION: + if isinstance(task, str): + from flaml.automl.task.factory import task_factory + + task = task_factory(task) + + if task.is_classification(): from rgf.sklearn import RGFClassifier self.estimator_class = RGFClassifier diff --git a/test/spark/custom_mylearner.py b/test/spark/custom_mylearner.py index 66426827e4e6..210e91c547bd 100644 --- a/test/spark/custom_mylearner.py +++ b/test/spark/custom_mylearner.py @@ -4,14 +4,20 @@ from flaml import tune import time from flaml.automl.model import LGBMEstimator, XGBoostSklearnEstimator, SKLearnEstimator -from flaml.automl.data import CLASSIFICATION, get_output_from_log +from flaml.automl.data import get_output_from_log +from flaml.automl.task.task import CLASSIFICATION class MyRegularizedGreedyForest(SKLearnEstimator): def __init__(self, task="binary", **config): super().__init__(task, **config) - if task in CLASSIFICATION: + if isinstance(task, str): + from flaml.automl.task.factory import task_factory + + task = task_factory(task) + + if task.is_classification(): from rgf.sklearn import RGFClassifier self.estimator_class = RGFClassifier diff --git a/test/spark/test_multiclass.py b/test/spark/test_multiclass.py index 9a2a3950a01a..e0384e23f269 100644 --- a/test/spark/test_multiclass.py +++ b/test/spark/test_multiclass.py @@ -3,7 +3,7 @@ import scipy.sparse from sklearn.datasets import load_iris, load_wine from flaml import AutoML -from flaml.automl.data import CLASSIFICATION, get_output_from_log +from flaml.automl.data import get_output_from_log from flaml.automl.training_log import training_log_reader from flaml.tune.spark.utils import check_spark import os