From 65e3dc6687a4f0f49ee5934954e77700a6d93006 Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Wed, 4 May 2022 16:56:04 -0400 Subject: [PATCH] feat: add seq2seq forecasting training job --- google/cloud/aiplatform/schema.py | 1 + google/cloud/aiplatform/training_jobs.py | 1110 ++++++++++++++--- .../test_automl_forecasting_training_jobs.py | 157 ++- 3 files changed, 1055 insertions(+), 213 deletions(-) diff --git a/google/cloud/aiplatform/schema.py b/google/cloud/aiplatform/schema.py index a1da75d9e65..8c8e7f32f3a 100644 --- a/google/cloud/aiplatform/schema.py +++ b/google/cloud/aiplatform/schema.py @@ -23,6 +23,7 @@ class definition: custom_task = "gs://google-cloud-aiplatform/schema/trainingjob/definition/custom_task_1.0.0.yaml" automl_tabular = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_tabular_1.0.0.yaml" automl_forecasting = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_time_series_forecasting_1.0.0.yaml" + seq2seq_forecasting = "gs://google-cloud-aiplatform/schema/trainingjob/definition/seq2seq_plus_time_series_forecasting_1.0.0.yaml" automl_image_classification = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_image_classification_1.0.0.yaml" automl_image_object_detection = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_image_object_detection_1.0.0.yaml" automl_text_classification = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_text_classification_1.0.0.yaml" diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 2dbd130555c..1fab16e1707 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1561,6 +1561,715 @@ def _model_upload_fail_string(self) -> str: ) +class _ForecastingTrainingJob(_TrainingJob): + """ABC for Forecasting Training Pipelines..""" + + def __init__( + self, + model_type: str, + training_task_definition: str, + display_name: Optional[str] = None, + optimization_objective: Optional[str] = None, + column_specs: Optional[Dict[str, str]] = None, + column_transformations: Optional[List[Dict[str, Dict[str, str]]]] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + ): + """Constructs a Forecasting Training Job. + + Args: + model_type (str): The type of forecasting model. + training_task_definition (str): + Required. A Google Cloud Storage path to the + YAML file that defines the training task which + is responsible for producing the model artifact, + and may also include additional auxiliary work. + The definition files that can be used here are + found in gs://google-cloud- + aiplatform/schema/trainingjob/definition/. + display_name (str): + Optional. The user-defined name of this TrainingPipeline. + optimization_objective (str): + Optional. Objective function the model is to be optimized towards. + The training process creates a Model that optimizes the value of the objective + function over the validation set. The supported optimization objectives: + "minimize-rmse" (default) - Minimize root-mean-squared error (RMSE). + "minimize-mae" - Minimize mean-absolute error (MAE). + "minimize-rmsle" - Minimize root-mean-squared log error (RMSLE). + "minimize-rmspe" - Minimize root-mean-squared percentage error (RMSPE). + "minimize-wape-mae" - Minimize the combination of weighted absolute percentage error (WAPE) + and mean-absolute-error (MAE). + "minimize-quantile-loss" - Minimize the quantile loss at the defined quantiles. + (Set this objective to build quantile forecasts.) + column_specs (Dict[str, str]): + Optional. Alternative to column_transformations where the keys of the dict + are column names and their respective values are one of + AutoMLTabularTrainingJob.column_data_types. + When creating transformation for BigQuery Struct column, the column + should be flattened using "." as the delimiter. Only columns with no child + should have a transformation. + If an input column has no transformations on it, such a column is + ignored by the training, except for the targetColumn, which should have + no transformations defined on. + Only one of column_transformations or column_specs should be passed. + column_transformations (List[Dict[str, Dict[str, str]]]): + Optional. Transformations to apply to the input columns (i.e. columns other + than the targetColumn). Each transformation may produce multiple + result values from the column's value, and all are used for training. + When creating transformation for BigQuery Struct column, the column + should be flattened using "." as the delimiter. Only columns with no child + should have a transformation. + If an input column has no transformations on it, such a column is + ignored by the training, except for the targetColumn, which should have + no transformations defined on. + Only one of column_transformations or column_specs should be passed. + Consider using column_specs as column_transformations will be deprecated eventually. + project (str): + Optional. Project to run training in. Overrides project set in aiplatform.init. + location (str): + Optional. Location to run training in. Overrides location set in aiplatform.init. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to run call training service. Overrides + credentials set in aiplatform.init. + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize TrainingPipelines. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + training_encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the training pipeline. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, this TrainingPipeline will be secured by this key. + + Note: Model trained by this TrainingPipeline is also secured + by this key if ``model_to_upload`` is not set separately. + + Overrides encryption_spec_key_name set in aiplatform.init. + model_encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the model. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, the trained Model will be secured by this key. + + Overrides encryption_spec_key_name set in aiplatform.init. + + Raises: + ValueError: If both column_transformations and column_specs were provided. + """ + super().__init__( + display_name=display_name, + project=project, + location=location, + credentials=credentials, + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, + ) + + self._column_transformations = ( + column_transformations_utils.validate_and_get_column_transformations( + column_specs, + column_transformations, + ) + ) + + self._optimization_objective = optimization_objective + self._additional_experiments = [] + self._model_type = model_type + self._training_task_definition = training_task_definition + + def run( + self, + dataset: datasets.TimeSeriesDataset, + target_column: str, + time_column: str, + time_series_identifier_column: str, + unavailable_at_forecast_columns: List[str], + available_at_forecast_columns: List[str], + forecast_horizon: int, + data_granularity_unit: str, + data_granularity_count: int, + training_fraction_split: Optional[float] = None, + validation_fraction_split: Optional[float] = None, + test_fraction_split: Optional[float] = None, + predefined_split_column_name: Optional[str] = None, + timestamp_split_column_name: Optional[str] = None, + weight_column: Optional[str] = None, + time_series_attribute_columns: Optional[List[str]] = None, + context_window: Optional[int] = None, + export_evaluated_data_items: bool = False, + export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None, + export_evaluated_data_items_override_destination: bool = False, + quantiles: Optional[List[float]] = None, + validation_options: Optional[str] = None, + budget_milli_node_hours: int = 1000, + model_display_name: Optional[str] = None, + model_labels: Optional[Dict[str, str]] = None, + additional_experiments: Optional[List[str]] = None, + sync: bool = True, + create_request_timeout: Optional[float] = None, + ) -> models.Model: + """Runs the training job and returns a model. + + If training on a Vertex AI dataset, you can use one of the following split configurations: + Data fraction splits: + Any of ``training_fraction_split``, ``validation_fraction_split`` and + ``test_fraction_split`` may optionally be provided, they must sum to up to 1. If + the provided ones sum to less than 1, the remainder is assigned to sets as + decided by Vertex AI. If none of the fractions are set, by default roughly 80% + of data will be used for training, 10% for validation, and 10% for test. + + Predefined splits: + Assigns input data to training, validation, and test sets based on the value of a provided key. + If using predefined splits, ``predefined_split_column_name`` must be provided. + Supported only for tabular Datasets. + + Timestamp splits: + Assigns input data to training, validation, and test sets + based on a provided timestamps. The youngest data pieces are + assigned to training set, next to validation set, and the oldest + to the test set. + Supported only for tabular Datasets. + + Args: + dataset (datasets.TimeSeriesDataset): + Required. The dataset within the same Project from which data will be used to train the Model. The + Dataset must use schema compatible with Model being trained, + and what is compatible should be described in the used + TrainingPipeline's [training_task_definition] + [google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. + For time series Datasets, all their data is exported to + training, to pick and choose from. + target_column (str): + Required. Name of the column that the Model is to predict values for. This + column must be unavailable at forecast. + time_column (str): + Required. Name of the column that identifies time order in the time series. + This column must be available at forecast. + time_series_identifier_column (str): + Required. Name of the column that identifies the time series. + unavailable_at_forecast_columns (List[str]): + Required. Column names of columns that are unavailable at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is unknown before the forecast + (e.g. population of a city in a given year, or weather on a given day). + available_at_forecast_columns (List[str]): + Required. Column names of columns that are available at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is known at forecast. + forecast_horizon: (int): + Required. The amount of time into the future for which forecasted values for the target are + returned. Expressed in number of units defined by the [data_granularity_unit] and + [data_granularity_count] field. Inclusive. + data_granularity_unit (str): + Required. The data granularity unit. Accepted values are ``minute``, + ``hour``, ``day``, ``week``, ``month``, ``year``. + data_granularity_count (int): + Required. The number of data granularity units between data points in the training + data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all other + values of [data_granularity_unit], must be 1. + predefined_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key (either the label's value or + value in the column) must be one of {``TRAIN``, + ``VALIDATE``, ``TEST``}, and it defines to which set the + given piece of data is assigned. If for a piece of data the + key is not present or has an invalid value, that piece is + ignored by the pipeline. + + Supported only for tabular and time series Datasets. + timestamp_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key values of the key (the values in + the column) must be in RFC 3339 `date-time` format, where + `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a + piece of data the key is not present or has an invalid value, + that piece is ignored by the pipeline. + Supported only for tabular and time series Datasets. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. + weight_column (str): + Optional. Name of the column that should be used as the weight column. + Higher values in this column give more importance to the row + during Model training. The column must have numeric values between 0 and + 10000 inclusively, and 0 value means that the row is ignored. + If the weight column field is not set, then all rows are assumed to have + equal weight of 1. This column must be available at forecast. + time_series_attribute_columns (List[str]): + Optional. Column names that should be used as attribute columns. + Each column is constant within a time series. + context_window (int): + Optional. The amount of time into the past training and prediction data is used for + model training and prediction respectively. Expressed in number of units defined by the + [data_granularity_unit] and [data_granularity_count] fields. When not provided uses the + default value of 0 which means the model sets each series context window to be 0 (also + known as "cold start"). Inclusive. + export_evaluated_data_items (bool): + Whether to export the test set predictions to a BigQuery table. + If False, then the export is not performed. + export_evaluated_data_items_bigquery_destination_uri (string): + Optional. URI of desired destination BigQuery table for exported test set predictions. + + Expected format: + ``bq://::`` + + If not specified, then results are exported to the following auto-created BigQuery + table: + ``:export_evaluated_examples__.evaluated_examples`` + + Applies only if [export_evaluated_data_items] is True. + export_evaluated_data_items_override_destination (bool): + Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri], + if the table exists, for exported test set predictions. If False, and the + table exists, then the training job will fail. + + Applies only if [export_evaluated_data_items] is True and + [export_evaluated_data_items_bigquery_destination_uri] is specified. + quantiles (List[float]): + Quantiles to use for the `minimize-quantile-loss` + [AutoMLForecastingTrainingJob.optimization_objective]. This argument is required in + this case. + + Accepts up to 5 quantiles in the form of a double from 0 to 1, exclusive. + Each quantile must be unique. + validation_options (str): + Validation options for the data validation component. The available options are: + "fail-pipeline" - (default), will validate against the validation and fail the pipeline + if it fails. + "ignore-validation" - ignore the results of the validation and continue the pipeline + budget_milli_node_hours (int): + Optional. The train budget of creating this Model, expressed in milli node + hours i.e. 1,000 value in this field means 1 node hour. + The training cost of the model will not exceed this budget. The final + cost will be attempted to be close to the budget, though may end up + being (even) noticeably smaller - at the backend's discretion. This + especially may happen when further model training ceases to provide + any improvements. + If the budget is set to a value known to be insufficient to train a + Model for the given training set, the training won't be attempted and + will error. + The minimum value is 1000 and the maximum is 72000. + model_display_name (str): + Optional. If the script produces a managed Vertex AI Model. The display name of + the Model. The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + + If not provided upon creation, the job's display_name is used. + model_labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + additional_experiments (List[str]): + Optional. Additional experiment flags for the time series forcasting training. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + Returns: + model: The trained Vertex AI Model resource or None if training did not + produce a Vertex AI Model. + + Raises: + RuntimeError: If Training job has already been run or is waiting to run. + """ + + if model_display_name: + utils.validate_display_name(model_display_name) + if model_labels: + utils.validate_labels(model_labels) + + if self._is_waiting_to_run(): + raise RuntimeError( + f"{self._model_type} Forecasting Training is already scheduled " + "to run." + ) + + if self._has_run: + raise RuntimeError( + f"{self._model_type} Forecasting Training has " "already run." + ) + + if additional_experiments: + self._add_additional_experiments(additional_experiments) + + return self._run( + dataset=dataset, + target_column=target_column, + time_column=time_column, + time_series_identifier_column=time_series_identifier_column, + unavailable_at_forecast_columns=unavailable_at_forecast_columns, + available_at_forecast_columns=available_at_forecast_columns, + forecast_horizon=forecast_horizon, + data_granularity_unit=data_granularity_unit, + data_granularity_count=data_granularity_count, + training_fraction_split=training_fraction_split, + validation_fraction_split=validation_fraction_split, + test_fraction_split=test_fraction_split, + predefined_split_column_name=predefined_split_column_name, + timestamp_split_column_name=timestamp_split_column_name, + weight_column=weight_column, + time_series_attribute_columns=time_series_attribute_columns, + context_window=context_window, + budget_milli_node_hours=budget_milli_node_hours, + export_evaluated_data_items=export_evaluated_data_items, + export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri, + export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination, + quantiles=quantiles, + validation_options=validation_options, + model_display_name=model_display_name, + model_labels=model_labels, + sync=sync, + create_request_timeout=create_request_timeout, + ) + + @base.optional_sync() + def _run( + self, + dataset: datasets.TimeSeriesDataset, + target_column: str, + time_column: str, + time_series_identifier_column: str, + unavailable_at_forecast_columns: List[str], + available_at_forecast_columns: List[str], + forecast_horizon: int, + data_granularity_unit: str, + data_granularity_count: int, + training_fraction_split: Optional[float] = None, + validation_fraction_split: Optional[float] = None, + test_fraction_split: Optional[float] = None, + predefined_split_column_name: Optional[str] = None, + timestamp_split_column_name: Optional[str] = None, + weight_column: Optional[str] = None, + time_series_attribute_columns: Optional[List[str]] = None, + context_window: Optional[int] = None, + export_evaluated_data_items: bool = False, + export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None, + export_evaluated_data_items_override_destination: bool = False, + quantiles: Optional[List[float]] = None, + validation_options: Optional[str] = None, + budget_milli_node_hours: int = 1000, + model_display_name: Optional[str] = None, + model_labels: Optional[Dict[str, str]] = None, + sync: bool = True, + create_request_timeout: Optional[float] = None, + ) -> models.Model: + """Runs the training job and returns a model. + + If training on a Vertex AI dataset, you can use one of the following split configurations: + Data fraction splits: + Any of ``training_fraction_split``, ``validation_fraction_split`` and + ``test_fraction_split`` may optionally be provided, they must sum to up to 1. If + the provided ones sum to less than 1, the remainder is assigned to sets as + decided by Vertex AI. If none of the fractions are set, by default roughly 80% + of data will be used for training, 10% for validation, and 10% for test. + + Predefined splits: + Assigns input data to training, validation, and test sets based on the value of a provided key. + If using predefined splits, ``predefined_split_column_name`` must be provided. + Supported only for tabular Datasets. + + Timestamp splits: + Assigns input data to training, validation, and test sets + based on a provided timestamps. The youngest data pieces are + assigned to training set, next to validation set, and the oldest + to the test set. + Supported only for tabular Datasets. + + Args: + dataset (datasets.TimeSeriesDataset): + Required. The dataset within the same Project from which data will be used to train the Model. The + Dataset must use schema compatible with Model being trained, + and what is compatible should be described in the used + TrainingPipeline's [training_task_definition] + [google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. + For time series Datasets, all their data is exported to + training, to pick and choose from. + target_column (str): + Required. Name of the column that the Model is to predict values for. This + column must be unavailable at forecast. + time_column (str): + Required. Name of the column that identifies time order in the time series. + This column must be available at forecast. + time_series_identifier_column (str): + Required. Name of the column that identifies the time series. + unavailable_at_forecast_columns (List[str]): + Required. Column names of columns that are unavailable at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is unknown before the forecast + (e.g. population of a city in a given year, or weather on a given day). + available_at_forecast_columns (List[str]): + Required. Column names of columns that are available at forecast. + Each column contains information for the given entity (identified by the + [time_series_identifier_column]) that is known at forecast. + forecast_horizon: (int): + Required. The amount of time into the future for which forecasted values for the target are + returned. Expressed in number of units defined by the [data_granularity_unit] and + [data_granularity_count] field. Inclusive. + data_granularity_unit (str): + Required. The data granularity unit. Accepted values are ``minute``, + ``hour``, ``day``, ``week``, ``month``, ``year``. + data_granularity_count (int): + Required. The number of data granularity units between data points in the training + data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all other + values of [data_granularity_unit], must be 1. + training_fraction_split (float): + Optional. The fraction of the input data that is to be used to train + the Model. This is ignored if Dataset is not provided. + validation_fraction_split (float): + Optional. The fraction of the input data that is to be used to validate + the Model. This is ignored if Dataset is not provided. + test_fraction_split (float): + Optional. The fraction of the input data that is to be used to evaluate + the Model. This is ignored if Dataset is not provided. + predefined_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key (either the label's value or + value in the column) must be one of {``training``, + ``validation``, ``test``}, and it defines to which set the + given piece of data is assigned. If for a piece of data the + key is not present or has an invalid value, that piece is + ignored by the pipeline. + + Supported only for tabular and time series Datasets. + timestamp_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key values of the key (the values in + the column) must be in RFC 3339 `date-time` format, where + `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a + piece of data the key is not present or has an invalid value, + that piece is ignored by the pipeline. + Supported only for tabular and time series Datasets. + This parameter must be used with training_fraction_split, + validation_fraction_split, and test_fraction_split. + weight_column (str): + Optional. Name of the column that should be used as the weight column. + Higher values in this column give more importance to the row + during Model training. The column must have numeric values between 0 and + 10000 inclusively, and 0 value means that the row is ignored. + If the weight column field is not set, then all rows are assumed to have + equal weight of 1. This column must be available at forecast. + time_series_attribute_columns (List[str]): + Optional. Column names that should be used as attribute columns. + Each column is constant within a time series. + context_window (int): + Optional. The number of periods offset into the past to restrict past sequence, where each + period is one unit of granularity as defined by [period]. When not provided uses the + default value of 0 which means the model sets each series historical window to be 0 (also + known as "cold start"). Inclusive. + export_evaluated_data_items (bool): + Whether to export the test set predictions to a BigQuery table. + If False, then the export is not performed. + export_evaluated_data_items_bigquery_destination_uri (string): + Optional. URI of desired destination BigQuery table for exported test set predictions. + + Expected format: + ``bq://::
`` + + If not specified, then results are exported to the following auto-created BigQuery + table: + ``:export_evaluated_examples__.evaluated_examples`` + + Applies only if [export_evaluated_data_items] is True. + export_evaluated_data_items_override_destination (bool): + Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri], + if the table exists, for exported test set predictions. If False, and the + table exists, then the training job will fail. + + Applies only if [export_evaluated_data_items] is True and + [export_evaluated_data_items_bigquery_destination_uri] is specified. + quantiles (List[float]): + Quantiles to use for the `minimize-quantile-loss` + [AutoMLForecastingTrainingJob.optimization_objective]. This argument is required in + this case. + + Accepts up to 5 quantiles in the form of a double from 0 to 1, exclusive. + Each quantile must be unique. + validation_options (str): + Validation options for the data validation component. The available options are: + "fail-pipeline" - (default), will validate against the validation and fail the pipeline + if it fails. + "ignore-validation" - ignore the results of the validation and continue the pipeline + budget_milli_node_hours (int): + Optional. The train budget of creating this Model, expressed in milli node + hours i.e. 1,000 value in this field means 1 node hour. + The training cost of the model will not exceed this budget. The final + cost will be attempted to be close to the budget, though may end up + being (even) noticeably smaller - at the backend's discretion. This + especially may happen when further model training ceases to provide + any improvements. + If the budget is set to a value known to be insufficient to train a + Model for the given training set, the training won't be attempted and + will error. + The minimum value is 1000 and the maximum is 72000. + model_display_name (str): + Optional. If the script produces a managed Vertex AI Model. The display name of + the Model. The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + + If not provided upon creation, the job's display_name is used. + model_labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. + Returns: + model: The trained Vertex AI Model resource or None if training did not + produce a Vertex AI Model. + """ + # auto-populate transformations + if self._column_transformations is None: + _LOGGER.info( + "No column transformations provided, so now retrieving columns from dataset in order to set default column transformations." + ) + + ( + self._column_transformations, + column_names, + ) = dataset._get_default_column_transformations(target_column) + + _LOGGER.info( + "The column transformation of type 'auto' was set for the following columns: %s." + % column_names + ) + + training_task_inputs_dict = { + # required inputs + "targetColumn": target_column, + "timeColumn": time_column, + "timeSeriesIdentifierColumn": time_series_identifier_column, + "timeSeriesAttributeColumns": time_series_attribute_columns, + "unavailableAtForecastColumns": unavailable_at_forecast_columns, + "availableAtForecastColumns": available_at_forecast_columns, + "forecastHorizon": forecast_horizon, + "dataGranularity": { + "unit": data_granularity_unit, + "quantity": data_granularity_count, + }, + "transformations": self._column_transformations, + "trainBudgetMilliNodeHours": budget_milli_node_hours, + # optional inputs + "weightColumn": weight_column, + "contextWindow": context_window, + "quantiles": quantiles, + "validationOptions": validation_options, + "optimizationObjective": self._optimization_objective, + } + + final_export_eval_bq_uri = export_evaluated_data_items_bigquery_destination_uri + if final_export_eval_bq_uri and not final_export_eval_bq_uri.startswith( + "bq://" + ): + final_export_eval_bq_uri = f"bq://{final_export_eval_bq_uri}" + + if export_evaluated_data_items: + training_task_inputs_dict["exportEvaluatedDataItemsConfig"] = { + "destinationBigqueryUri": final_export_eval_bq_uri, + "overrideExistingTable": export_evaluated_data_items_override_destination, + } + + if self._additional_experiments: + training_task_inputs_dict[ + "additionalExperiments" + ] = self._additional_experiments + + model = gca_model.Model( + display_name=model_display_name or self._display_name, + labels=model_labels or self._labels, + encryption_spec=self._model_encryption_spec, + ) + + new_model = self._run_job( + training_task_definition=self._training_task_definition, + training_task_inputs=training_task_inputs_dict, + dataset=dataset, + training_fraction_split=training_fraction_split, + validation_fraction_split=validation_fraction_split, + test_fraction_split=test_fraction_split, + predefined_split_column_name=predefined_split_column_name, + timestamp_split_column_name=timestamp_split_column_name, + model=model, + create_request_timeout=create_request_timeout, + ) + + if export_evaluated_data_items: + _LOGGER.info( + "Exported examples available at:\n%s" + % self.evaluated_data_items_bigquery_uri + ) + + return new_model + + @property + def _model_upload_fail_string(self) -> str: + """Helper property for model upload failure.""" + return ( + f"Training Pipeline {self.resource_name} is not configured to upload a " + "Model." + ) + + @property + def evaluated_data_items_bigquery_uri(self) -> Optional[str]: + """BigQuery location of exported evaluated examples from the Training Job + Returns: + str: BigQuery uri for the exported evaluated examples if the export + feature is enabled for training. + None: If the export feature was not enabled for training. + """ + + self._assert_gca_resource_is_available() + + metadata = self._gca_resource.training_task_metadata + if metadata and "evaluatedDataItemsBigqueryUri" in metadata: + return metadata["evaluatedDataItemsBigqueryUri"] + + return None + + def _add_additional_experiments(self, additional_experiments: List[str]): + """Add experiment flags to the training job. + Args: + additional_experiments (List[str]): + Experiment flags that can enable some experimental training features. + """ + self._additional_experiments.extend(additional_experiments) + + # TODO(b/172368325) add scheduling, custom_job.Scheduling class CustomTrainingJob(_CustomTrainingJob): """Class to launch a Custom Training Job in Vertex AI using a script. @@ -3886,7 +4595,7 @@ class column_data_types: REPEATED_TEXT = "repeated_text" -class AutoMLForecastingTrainingJob(_TrainingJob): +class AutoMLForecastingTrainingJob(_ForecastingTrainingJob): _supported_training_schemas = (schema.training_job.definition.automl_forecasting,) def __init__( @@ -3988,10 +4697,15 @@ def __init__( Raises: ValueError: If both column_transformations and column_specs were provided. """ - if not display_name: - display_name = self.__class__._generate_display_name() super().__init__( + model_type="AutoML", + training_task_definition=( + schema.training_job.definition.automl_forecasting + ), display_name=display_name, + optimization_objective=optimization_objective, + column_specs=column_specs, + column_transformations=column_transformations, project=project, location=location, credentials=credentials, @@ -4000,15 +4714,6 @@ def __init__( model_encryption_spec_key_name=model_encryption_spec_key_name, ) - self._column_transformations = ( - column_transformations_utils.validate_and_get_column_transformations( - column_specs, column_transformations - ) - ) - - self._optimization_objective = optimization_objective - self._additional_experiments = [] - def run( self, dataset: datasets.TimeSeriesDataset, @@ -4188,7 +4893,138 @@ def run( If not provided upon creation, the job's display_name is used. model_labels (Dict[str, str]): Optional. The labels with user-defined metadata to - organize your Models. + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + additional_experiments (List[str]): + Optional. Additional experiment flags for the time series forcasting training. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + Returns: + model: The trained Vertex AI Model resource or None if training did not + produce a Vertex AI Model. + + Raises: + RuntimeError: If Training job has already been run or is waiting to run. + """ + + return super().run( + dataset=dataset, + target_column=target_column, + time_column=time_column, + time_series_identifier_column=time_series_identifier_column, + unavailable_at_forecast_columns=unavailable_at_forecast_columns, + available_at_forecast_columns=available_at_forecast_columns, + forecast_horizon=forecast_horizon, + data_granularity_unit=data_granularity_unit, + data_granularity_count=data_granularity_count, + training_fraction_split=training_fraction_split, + validation_fraction_split=validation_fraction_split, + test_fraction_split=test_fraction_split, + predefined_split_column_name=predefined_split_column_name, + timestamp_split_column_name=timestamp_split_column_name, + weight_column=weight_column, + time_series_attribute_columns=time_series_attribute_columns, + context_window=context_window, + budget_milli_node_hours=budget_milli_node_hours, + export_evaluated_data_items=export_evaluated_data_items, + export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri, + export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination, + quantiles=quantiles, + validation_options=validation_options, + model_display_name=model_display_name, + model_labels=model_labels, + additional_experiments=additional_experiments, + sync=sync, + create_request_timeout=create_request_timeout, + ) + + @property + def evaluated_data_items_bigquery_uri(self) -> Optional[str]: + """BigQuery location of exported evaluated examples from the Training Job + Returns: + str: BigQuery uri for the exported evaluated examples if the export + feature is enabled for training. + None: If the export feature was not enabled for training. + """ + return super().evaluated_data_items_bigquery_uri + + +class SequenceToSequenceForecastingTrainingJob(_ForecastingTrainingJob): + _supported_training_schemas = (schema.training_job.definition.seq2seq_forecasting,) + + def __init__( + self, + display_name: Optional[str] = None, + optimization_objective: Optional[str] = None, + column_specs: Optional[Dict[str, str]] = None, + column_transformations: Optional[List[Dict[str, Dict[str, str]]]] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + labels: Optional[Dict[str, str]] = None, + training_encryption_spec_key_name: Optional[str] = None, + model_encryption_spec_key_name: Optional[str] = None, + ): + """Constructs a Seq2Seq Forecasting Training Job. + + Args: + display_name (str): + Optional. The user-defined name of this TrainingPipeline. + optimization_objective (str): + Optional. Objective function the model is to be optimized towards. + The training process creates a Model that optimizes the value of the objective + function over the validation set. The supported optimization objectives: + "minimize-rmse" (default) - Minimize root-mean-squared error (RMSE). + "minimize-mae" - Minimize mean-absolute error (MAE). + "minimize-rmsle" - Minimize root-mean-squared log error (RMSLE). + "minimize-rmspe" - Minimize root-mean-squared percentage error (RMSPE). + "minimize-wape-mae" - Minimize the combination of weighted absolute percentage error (WAPE) + and mean-absolute-error (MAE). + "minimize-quantile-loss" - Minimize the quantile loss at the defined quantiles. + (Set this objective to build quantile forecasts.) + column_specs (Dict[str, str]): + Optional. Alternative to column_transformations where the keys of the dict + are column names and their respective values are one of + AutoMLTabularTrainingJob.column_data_types. + When creating transformation for BigQuery Struct column, the column + should be flattened using "." as the delimiter. Only columns with no child + should have a transformation. + If an input column has no transformations on it, such a column is + ignored by the training, except for the targetColumn, which should have + no transformations defined on. + Only one of column_transformations or column_specs should be passed. + column_transformations (List[Dict[str, Dict[str, str]]]): + Optional. Transformations to apply to the input columns (i.e. columns other + than the targetColumn). Each transformation may produce multiple + result values from the column's value, and all are used for training. + When creating transformation for BigQuery Struct column, the column + should be flattened using "." as the delimiter. Only columns with no child + should have a transformation. + If an input column has no transformations on it, such a column is + ignored by the training, except for the targetColumn, which should have + no transformations defined on. + Only one of column_transformations or column_specs should be passed. + Consider using column_specs as column_transformations will be deprecated eventually. + project (str): + Optional. Project to run training in. Overrides project set in aiplatform.init. + location (str): + Optional. Location to run training in. Overrides location set in aiplatform.init. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to run call training service. Overrides + credentials set in aiplatform.init. + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize TrainingPipelines. Label keys and values can be no longer than 64 characters (Unicode codepoints), can only contain lowercase letters, numeric characters, @@ -4196,70 +5032,53 @@ def run( are allowed. See https://goo.gl/xmQnxf for more information and examples of labels. - additional_experiments (List[str]): - Optional. Additional experiment flags for the time series forcasting training. - create_request_timeout (float): - Optional. The timeout for the create request in seconds. - sync (bool): - Whether to execute this method synchronously. If False, this method - will be executed in concurrent Future and any downstream object will - be immediately returned and synced when the Future has completed. - Returns: - model: The trained Vertex AI Model resource or None if training did not - produce a Vertex AI Model. + training_encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the training pipeline. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. - Raises: - RuntimeError: If Training job has already been run or is waiting to run. - """ + If set, this TrainingPipeline will be secured by this key. - if model_display_name: - utils.validate_display_name(model_display_name) - if model_labels: - utils.validate_labels(model_labels) + Note: Model trained by this TrainingPipeline is also secured + by this key if ``model_to_upload`` is not set separately. - if self._is_waiting_to_run(): - raise RuntimeError( - "AutoML Forecasting Training is already scheduled to run." - ) + Overrides encryption_spec_key_name set in aiplatform.init. + model_encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the model. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. - if self._has_run: - raise RuntimeError("AutoML Forecasting Training has already run.") + If set, the trained Model will be secured by this key. - if additional_experiments: - self._add_additional_experiments(additional_experiments) + Overrides encryption_spec_key_name set in aiplatform.init. - return self._run( - dataset=dataset, - target_column=target_column, - time_column=time_column, - time_series_identifier_column=time_series_identifier_column, - unavailable_at_forecast_columns=unavailable_at_forecast_columns, - available_at_forecast_columns=available_at_forecast_columns, - forecast_horizon=forecast_horizon, - data_granularity_unit=data_granularity_unit, - data_granularity_count=data_granularity_count, - training_fraction_split=training_fraction_split, - validation_fraction_split=validation_fraction_split, - test_fraction_split=test_fraction_split, - predefined_split_column_name=predefined_split_column_name, - timestamp_split_column_name=timestamp_split_column_name, - weight_column=weight_column, - time_series_attribute_columns=time_series_attribute_columns, - context_window=context_window, - budget_milli_node_hours=budget_milli_node_hours, - export_evaluated_data_items=export_evaluated_data_items, - export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri, - export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination, - quantiles=quantiles, - validation_options=validation_options, - model_display_name=model_display_name, - model_labels=model_labels, - sync=sync, - create_request_timeout=create_request_timeout, + Raises: + ValueError: If both column_transformations and column_specs were provided. + """ + super().__init__( + model_type="Seq2Seq", + training_task_definition=( + schema.training_job.definition.seq2seq_forecasting + ), + display_name=display_name, + optimization_objective=optimization_objective, + column_specs=column_specs, + column_transformations=column_transformations, + project=project, + location=location, + credentials=credentials, + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, ) - @base.optional_sync() - def _run( + def run( self, dataset: datasets.TimeSeriesDataset, target_column: str, @@ -4286,6 +5105,7 @@ def _run( budget_milli_node_hours: int = 1000, model_display_name: Optional[str] = None, model_labels: Optional[Dict[str, str]] = None, + additional_experiments: Optional[List[str]] = None, sync: bool = True, create_request_timeout: Optional[float] = None, ) -> models.Model: @@ -4348,20 +5168,11 @@ def _run( Required. The number of data granularity units between data points in the training data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all other values of [data_granularity_unit], must be 1. - training_fraction_split (float): - Optional. The fraction of the input data that is to be used to train - the Model. This is ignored if Dataset is not provided. - validation_fraction_split (float): - Optional. The fraction of the input data that is to be used to validate - the Model. This is ignored if Dataset is not provided. - test_fraction_split (float): - Optional. The fraction of the input data that is to be used to evaluate - the Model. This is ignored if Dataset is not provided. predefined_split_column_name (str): Optional. The key is a name of one of the Dataset's data columns. The value of the key (either the label's value or - value in the column) must be one of {``training``, - ``validation``, ``test``}, and it defines to which set the + value in the column) must be one of {``TRAIN``, + ``VALIDATE``, ``TEST``}, and it defines to which set the given piece of data is assigned. If for a piece of data the key is not present or has an invalid value, that piece is ignored by the pipeline. @@ -4388,9 +5199,10 @@ def _run( Optional. Column names that should be used as attribute columns. Each column is constant within a time series. context_window (int): - Optional. The number of periods offset into the past to restrict past sequence, where each - period is one unit of granularity as defined by [period]. When not provided uses the - default value of 0 which means the model sets each series historical window to be 0 (also + Optional. The amount of time into the past training and prediction data is used for + model training and prediction respectively. Expressed in number of units defined by the + [data_granularity_unit] and [data_granularity_count] fields. When not provided uses the + default value of 0 which means the model sets each series context window to be 0 (also known as "cold start"). Inclusive. export_evaluated_data_items (bool): Whether to export the test set predictions to a BigQuery table. @@ -4453,110 +5265,53 @@ def _run( are allowed. See https://goo.gl/xmQnxf for more information and examples of labels. + additional_experiments (List[str]): + Optional. Additional experiment flags for the time series forcasting training. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. - create_request_timeout (float): - Optional. The timeout for the create request in seconds. Returns: model: The trained Vertex AI Model resource or None if training did not produce a Vertex AI Model. - """ - - training_task_definition = schema.training_job.definition.automl_forecasting - - # auto-populate transformations - if self._column_transformations is None: - _LOGGER.info( - "No column transformations provided, so now retrieving columns from dataset in order to set default column transformations." - ) - - ( - self._column_transformations, - column_names, - ) = dataset._get_default_column_transformations(target_column) - - _LOGGER.info( - "The column transformation of type 'auto' was set for the following columns: %s." - % column_names - ) - - training_task_inputs_dict = { - # required inputs - "targetColumn": target_column, - "timeColumn": time_column, - "timeSeriesIdentifierColumn": time_series_identifier_column, - "timeSeriesAttributeColumns": time_series_attribute_columns, - "unavailableAtForecastColumns": unavailable_at_forecast_columns, - "availableAtForecastColumns": available_at_forecast_columns, - "forecastHorizon": forecast_horizon, - "dataGranularity": { - "unit": data_granularity_unit, - "quantity": data_granularity_count, - }, - "transformations": self._column_transformations, - "trainBudgetMilliNodeHours": budget_milli_node_hours, - # optional inputs - "weightColumn": weight_column, - "contextWindow": context_window, - "quantiles": quantiles, - "validationOptions": validation_options, - "optimizationObjective": self._optimization_objective, - } - - final_export_eval_bq_uri = export_evaluated_data_items_bigquery_destination_uri - if final_export_eval_bq_uri and not final_export_eval_bq_uri.startswith( - "bq://" - ): - final_export_eval_bq_uri = f"bq://{final_export_eval_bq_uri}" - - if export_evaluated_data_items: - training_task_inputs_dict["exportEvaluatedDataItemsConfig"] = { - "destinationBigqueryUri": final_export_eval_bq_uri, - "overrideExistingTable": export_evaluated_data_items_override_destination, - } - - if self._additional_experiments: - training_task_inputs_dict[ - "additionalExperiments" - ] = self._additional_experiments - model = gca_model.Model( - display_name=model_display_name or self._display_name, - labels=model_labels or self._labels, - encryption_spec=self._model_encryption_spec, - ) + Raises: + RuntimeError: If Training job has already been run or is waiting to run. + """ - new_model = self._run_job( - training_task_definition=training_task_definition, - training_task_inputs=training_task_inputs_dict, + return super().run( dataset=dataset, + target_column=target_column, + time_column=time_column, + time_series_identifier_column=time_series_identifier_column, + unavailable_at_forecast_columns=unavailable_at_forecast_columns, + available_at_forecast_columns=available_at_forecast_columns, + forecast_horizon=forecast_horizon, + data_granularity_unit=data_granularity_unit, + data_granularity_count=data_granularity_count, training_fraction_split=training_fraction_split, validation_fraction_split=validation_fraction_split, test_fraction_split=test_fraction_split, predefined_split_column_name=predefined_split_column_name, timestamp_split_column_name=timestamp_split_column_name, - model=model, + weight_column=weight_column, + time_series_attribute_columns=time_series_attribute_columns, + context_window=context_window, + budget_milli_node_hours=budget_milli_node_hours, + export_evaluated_data_items=export_evaluated_data_items, + export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri, + export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination, + quantiles=quantiles, + validation_options=validation_options, + model_display_name=model_display_name, + model_labels=model_labels, + additional_experiments=additional_experiments, + sync=sync, create_request_timeout=create_request_timeout, ) - if export_evaluated_data_items: - _LOGGER.info( - "Exported examples available at:\n%s" - % self.evaluated_data_items_bigquery_uri - ) - - return new_model - - @property - def _model_upload_fail_string(self) -> str: - """Helper property for model upload failure.""" - return ( - f"Training Pipeline {self.resource_name} is not configured to upload a " - "Model." - ) - @property def evaluated_data_items_bigquery_uri(self) -> Optional[str]: """BigQuery location of exported evaluated examples from the Training Job @@ -4565,22 +5320,7 @@ def evaluated_data_items_bigquery_uri(self) -> Optional[str]: feature is enabled for training. None: If the export feature was not enabled for training. """ - - self._assert_gca_resource_is_available() - - metadata = self._gca_resource.training_task_metadata - if metadata and "evaluatedDataItemsBigqueryUri" in metadata: - return metadata["evaluatedDataItemsBigqueryUri"] - - return None - - def _add_additional_experiments(self, additional_experiments: List[str]): - """Add experiment flags to the training job. - Args: - additional_experiments (List[str]): - Experiment flags that can enable some experimental training features. - """ - self._additional_experiments.extend(additional_experiments) + return super().evaluated_data_items_bigquery_uri class AutoMLImageTrainingJob(_TrainingJob): diff --git a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py index 6a96d656e88..1f6e0ce724c 100644 --- a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py +++ b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py @@ -24,7 +24,7 @@ from google.cloud.aiplatform import datasets from google.cloud.aiplatform import initializer from google.cloud.aiplatform import schema -from google.cloud.aiplatform.training_jobs import AutoMLForecastingTrainingJob +from google.cloud.aiplatform import training_jobs from google.cloud.aiplatform_v1.services.model_service import ( client as model_service_client, @@ -248,7 +248,7 @@ def mock_dataset_nontimeseries(): return ds -class TestAutoMLForecastingTrainingJob: +class TestForecastingTrainingJob: def setup_method(self): importlib.reload(initializer) importlib.reload(aiplatform) @@ -256,7 +256,15 @@ def setup_method(self): def teardown_method(self): initializer.global_pool.shutdown(wait=True) - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_run_call_pipeline_service_create( self, mock_pipeline_service_create, @@ -264,10 +272,11 @@ def test_run_call_pipeline_service_create( mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -344,7 +353,15 @@ def test_run_call_pipeline_service_create( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_run_call_pipeline_service_create_with_timeout( self, mock_pipeline_service_create, @@ -352,10 +369,11 @@ def test_run_call_pipeline_service_create_with_timeout( mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -419,17 +437,26 @@ def test_run_call_pipeline_service_create_with_timeout( ) @pytest.mark.usefixtures("mock_pipeline_service_get") - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_run_call_pipeline_if_no_model_display_name_nor_model_labels( self, mock_pipeline_service_create, mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -488,17 +515,26 @@ def test_run_call_pipeline_if_no_model_display_name_nor_model_labels( ) @pytest.mark.usefixtures("mock_pipeline_service_get") - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_run_call_pipeline_if_set_additional_experiments( self, mock_pipeline_service_create, mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -558,15 +594,24 @@ def test_run_call_pipeline_if_set_additional_experiments( "mock_pipeline_service_get", "mock_model_service_get", ) - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_run_called_twice_raises( self, mock_dataset_time_series, sync, + training_job, ): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -619,17 +664,26 @@ def test_run_called_twice_raises( sync=sync, ) - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_run_raises_if_pipeline_fails( self, mock_pipeline_service_create_and_get_with_fail, mock_dataset_time_series, sync, + training_job, ): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -665,10 +719,21 @@ def test_run_raises_if_pipeline_fails( with pytest.raises(RuntimeError): job.get_model() - def test_raises_before_run_is_called(self, mock_pipeline_service_create): + @pytest.mark.parametrize( + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) + def test_raises_before_run_is_called( + self, + mock_pipeline_service_create, + training_job, + ): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -683,7 +748,15 @@ def test_raises_before_run_is_called(self, mock_pipeline_service_create): with pytest.raises(RuntimeError): job.state - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_splits_fraction( self, mock_pipeline_service_create, @@ -691,10 +764,11 @@ def test_splits_fraction( mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): """ Initiate aiplatform with encryption key name. - Create and run an AutoML Video Classification training job, verify calls and return value + Create and run an Forecasting training job, verify calls and return value """ aiplatform.init( @@ -702,7 +776,7 @@ def test_splits_fraction( encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, ) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -769,7 +843,15 @@ def test_splits_fraction( timeout=None, ) - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_splits_timestamp( self, mock_pipeline_service_create, @@ -777,10 +859,11 @@ def test_splits_timestamp( mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): """Initiate aiplatform with encryption key name. - Create and run an AutoML Forecasting training job, verify calls and + Create and run an Forecasting training job, verify calls and return value """ @@ -789,7 +872,7 @@ def test_splits_timestamp( encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, ) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -859,7 +942,15 @@ def test_splits_timestamp( timeout=None, ) - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_splits_predefined( self, mock_pipeline_service_create, @@ -867,10 +958,11 @@ def test_splits_predefined( mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): """ Initiate aiplatform with encryption key name. - Create and run an AutoML Video Classification training job, verify calls and return value + Create and run an Forecasting training job, verify calls and return value """ aiplatform.init( @@ -878,7 +970,7 @@ def test_splits_predefined( encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, ) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, @@ -941,7 +1033,15 @@ def test_splits_predefined( timeout=None, ) - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "sync", + [True, False], + "training_job", + [ + training_jobs.AutoMLForecastingTrainingJob, + training_jobs.Seq2SeqForecastingTrainingJob, + ], + ) def test_splits_default( self, mock_pipeline_service_create, @@ -949,10 +1049,11 @@ def test_splits_default( mock_dataset_time_series, mock_model_service_get, sync, + training_job, ): """ Initiate aiplatform with encryption key name. - Create and run an AutoML Video Classification training job, verify calls and return value + Create and run an Forecasting training job, verify calls and return value """ aiplatform.init( @@ -960,7 +1061,7 @@ def test_splits_default( encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, ) - job = AutoMLForecastingTrainingJob( + job = training_job( display_name=_TEST_DISPLAY_NAME, optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,