diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 441f91ca394..f828509803f 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1806,6 +1806,7 @@ def run( service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, replica_count: int = 0, machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", @@ -1881,6 +1882,13 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -1901,6 +1909,16 @@ def run( - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } replica_count (int): The number of worker replicas. If replica count = 1 then one chief replica will be provisioned. If replica_count > 1 the remainder will be @@ -1961,6 +1979,7 @@ def run( worker_pool_specs=worker_pool_specs, managed_model=managed_model, args=args, + environment_variables=environment_variables, base_output_dir=base_output_dir, service_account=service_account, bigquery_destination=bigquery_destination, @@ -1987,6 +2006,7 @@ def _run( worker_pool_specs: _DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, @@ -2019,9 +2039,26 @@ def _run( Model proto if this script produces a Managed Model. args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -2084,6 +2121,13 @@ def _run( if args: spec["pythonPackageSpec"]["args"] = args + if environment_variables: + env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in environment_variables.items() + ] + spec["pythonPackageSpec"]["env"] = env + ( training_task_inputs, base_output_dir, @@ -2335,6 +2379,7 @@ def run( service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, replica_count: int = 0, machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", @@ -2403,6 +2448,13 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -2423,6 +2475,16 @@ def run( - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } replica_count (int): The number of worker replicas. If replica count = 1 then one chief replica will be provisioned. If replica_count > 1 the remainder will be @@ -2482,6 +2544,7 @@ def run( worker_pool_specs=worker_pool_specs, managed_model=managed_model, args=args, + environment_variables=environment_variables, base_output_dir=base_output_dir, service_account=service_account, bigquery_destination=bigquery_destination, @@ -2507,6 +2570,7 @@ def _run( worker_pool_specs: _DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, @@ -2536,9 +2600,26 @@ def _run( Model proto if this script produces a Managed Model. args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -2594,6 +2675,13 @@ def _run( if args: spec["containerSpec"]["args"] = args + if environment_variables: + env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in environment_variables.items() + ] + spec["containerSpec"]["env"] = env + ( training_task_inputs, base_output_dir, @@ -3605,6 +3693,7 @@ def run( service_account: Optional[str] = None, bigquery_destination: Optional[str] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, replica_count: int = 0, machine_type: str = "n1-standard-4", accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", @@ -3673,6 +3762,13 @@ def run( base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -3693,6 +3789,16 @@ def run( - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } replica_count (int): The number of worker replicas. If replica count = 1 then one chief replica will be provisioned. If replica_count > 1 the remainder will be @@ -3747,6 +3853,7 @@ def run( worker_pool_specs=worker_pool_specs, managed_model=managed_model, args=args, + environment_variables=environment_variables, base_output_dir=base_output_dir, service_account=service_account, training_fraction_split=training_fraction_split, @@ -3772,6 +3879,7 @@ def _run( worker_pool_specs: _DistributedTrainingSpec, managed_model: Optional[gca_model.Model] = None, args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, training_fraction_split: float = 0.8, @@ -3802,9 +3910,26 @@ def _run( Model proto if this script produces a Managed Model. args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. + + AI Platform sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + service_account (str): Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. @@ -3846,6 +3971,13 @@ def _run( if args: spec["pythonPackageSpec"]["args"] = args + if environment_variables: + env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in environment_variables.items() + ] + spec["pythonPackageSpec"]["env"] = env + ( training_task_inputs, base_output_dir, diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 1a614694440..4bce07ba9dc 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -119,6 +119,9 @@ "learning_rate": 0.01, "loss_fn": "mse", } +_TEST_ENVIRONMENT_VARIABLES = { + "MY_PATH": "/path/to/my_path", +} _TEST_MODEL_SERVING_CONTAINER_PORTS = [8888, 10000] _TEST_MODEL_DESCRIPTION = "test description" @@ -596,6 +599,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( base_output_dir=_TEST_BASE_OUTPUT_DIR, service_account=_TEST_SERVICE_ACCOUNT, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -618,6 +622,10 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( ) true_args = _TEST_RUN_ARGS + true_env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -631,6 +639,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, } @@ -754,6 +763,7 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( base_output_dir=_TEST_BASE_OUTPUT_DIR, bigquery_destination=_TEST_BIGQUERY_DESTINATION, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -770,6 +780,10 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( model_from_job.wait() true_args = _TEST_RUN_ARGS + true_env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -783,6 +797,7 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, } @@ -1018,6 +1033,7 @@ def test_run_call_pipeline_service_create_with_no_dataset( model_from_job = job.run( base_output_dir=_TEST_BASE_OUTPUT_DIR, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -1039,6 +1055,10 @@ def test_run_call_pipeline_service_create_with_no_dataset( ) true_args = _TEST_RUN_ARGS + true_env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -1052,6 +1072,7 @@ def test_run_call_pipeline_service_create_with_no_dataset( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, } @@ -1263,6 +1284,7 @@ def test_run_call_pipeline_service_create_distributed_training( dataset=mock_tabular_dataset, base_output_dir=_TEST_BASE_OUTPUT_DIR, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=10, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -1284,6 +1306,10 @@ def test_run_call_pipeline_service_create_distributed_training( ) true_args = _TEST_RUN_ARGS + true_env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] true_worker_pool_spec = [ { @@ -1298,6 +1324,7 @@ def test_run_call_pipeline_service_create_distributed_training( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, }, { @@ -1312,6 +1339,7 @@ def test_run_call_pipeline_service_create_distributed_training( "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, }, ] @@ -1730,6 +1758,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( dataset=mock_tabular_dataset, base_output_dir=_TEST_BASE_OUTPUT_DIR, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -1746,6 +1775,10 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( model_from_job.wait() true_args = _TEST_RUN_ARGS + true_env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -1758,6 +1791,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( "imageUri": _TEST_TRAINING_CONTAINER_IMAGE, "command": _TEST_TRAINING_CONTAINER_CMD, "args": true_args, + "env": true_env, }, } @@ -2937,6 +2971,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( base_output_dir=_TEST_BASE_OUTPUT_DIR, service_account=_TEST_SERVICE_ACCOUNT, args=_TEST_RUN_ARGS, + environment_variables=_TEST_ENVIRONMENT_VARIABLES, replica_count=1, machine_type=_TEST_MACHINE_TYPE, accelerator_type=_TEST_ACCELERATOR_TYPE, @@ -2952,6 +2987,10 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( model_from_job.wait() true_args = _TEST_RUN_ARGS + true_env = [ + gca_env_var.EnvVar(name=str(key), value=str(value)) + for key, value in _TEST_ENVIRONMENT_VARIABLES.items() + ] true_worker_pool_spec = { "replicaCount": _TEST_REPLICA_COUNT, @@ -2965,6 +3004,7 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( "pythonModule": _TEST_PYTHON_MODULE_NAME, "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], "args": true_args, + "env": true_env, }, }