diff --git a/google/cloud/aiplatform/explain/lit.py b/google/cloud/aiplatform/explain/lit.py index a34578abade..50320558012 100644 --- a/google/cloud/aiplatform/explain/lit.py +++ b/google/cloud/aiplatform/explain/lit.py @@ -17,6 +17,7 @@ import logging import os +from google.cloud import aiplatform from typing import Dict, List, Optional, Tuple, Union try: @@ -61,11 +62,11 @@ def __init__( ): """Construct a VertexLitDataset. Args: - dataset: - Required. A Pandas DataFrame that includes feature column names and data. - column_types: - Required. An OrderedDict of string names matching the columns of the dataset - as the key, and the associated LitType of the column. + dataset: + Required. A Pandas DataFrame that includes feature column names and data. + column_types: + Required. An OrderedDict of string names matching the columns of the dataset + as the key, and the associated LitType of the column. """ self._examples = dataset.to_dict(orient="records") self._column_types = column_types @@ -75,8 +76,109 @@ def spec(self): return dict(self._column_types) -class _VertexLitModel(lit_model.Model): - """LIT model class for the Vertex LIT integration. +class _EndpointLitModel(lit_model.Model): + """LIT model class for the Vertex LIT integration with a model deployed to an endpoint. + + This is used in the create_lit_model function. + """ + + def __init__( + self, + endpoint: Union[str, aiplatform.Endpoint], + input_types: "OrderedDict[str, lit_types.LitType]", # noqa: F821 + output_types: "OrderedDict[str, lit_types.LitType]", # noqa: F821 + model_id: Optional[str] = None, + ): + """Construct a VertexLitModel. + Args: + model: + Required. The name of the Endpoint resource. Format: + ``projects/{project}/locations/{location}/endpoints/{endpoint}`` + input_types: + Required. An OrderedDict of string names matching the features of the model + as the key, and the associated LitType of the feature. + output_types: + Required. An OrderedDict of string names matching the labels of the model + as the key, and the associated LitType of the label. + model_id: + Optional. A string of the specific model in the endpoint to create the + LIT model from. If this is not set, any usable model in the endpoint is + used to create the LIT model. + Raises: + ValueError if the model_id was not found in the endpoint. + """ + if isinstance(endpoint, str): + self._endpoint = aiplatform.Endpoint(endpoint) + else: + self._endpoint = endpoint + self._model_id = model_id + self._input_types = input_types + self._output_types = output_types + # Check if the model with the model ID has explanation enabled + if model_id: + deployed_model = next( + filter( + lambda model: model.id == model_id, self._endpoint.list_models() + ), + None, + ) + if not deployed_model: + raise ValueError( + "A model with id {model_id} was not found in the endpoint {endpoint}.".format( + model_id=model_id, endpoint=endpoint + ) + ) + self._explanation_enabled = bool(deployed_model.explanation_spec) + # Check if all models in the endpoint have explanation enabled + else: + self._explanation_enabled = all( + model.explanation_spec for model in self._endpoint.list_models() + ) + + def predict_minibatch( + self, inputs: List[lit_types.JsonDict] + ) -> List[lit_types.JsonDict]: + """Retun predictions based on a batch of inputs. + Args: + inputs: Requred. a List of instances to predict on based on the input spec. + Returns: + A list of predictions based on the output spec. + """ + instances = [] + for input in inputs: + instance = [input[feature] for feature in self._input_types] + instances.append(instance) + if self._explanation_enabled: + prediction_object = self._endpoint.explain(instances) + else: + prediction_object = self._endpoint.predict(instances) + outputs = [] + for prediction in prediction_object.predictions: + outputs.append({key: prediction[key] for key in self._output_types}) + if self._explanation_enabled: + for i, explanation in enumerate(prediction_object.explanations): + attributions = explanation.attributions + outputs[i]["feature_attribution"] = lit_dtypes.FeatureSalience( + attributions + ) + return outputs + + def input_spec(self) -> lit_types.Spec: + """Return a spec describing model inputs.""" + return dict(self._input_types) + + def output_spec(self) -> lit_types.Spec: + """Return a spec describing model outputs.""" + output_spec_dict = dict(self._output_types) + if self._explanation_enabled: + output_spec_dict["feature_attribution"] = lit_types.FeatureSalience( + signed=True + ) + return output_spec_dict + + +class _TensorFlowLitModel(lit_model.Model): + """LIT model class for the Vertex LIT integration with a TensorFlow saved model. This is used in the create_lit_model function. """ @@ -90,19 +192,19 @@ def __init__( ): """Construct a VertexLitModel. Args: - model: - Required. A string reference to a local TensorFlow saved model directory. - The model must have at most one input and one output tensor. - input_types: - Required. An OrderedDict of string names matching the features of the model - as the key, and the associated LitType of the feature. - output_types: - Required. An OrderedDict of string names matching the labels of the model - as the key, and the associated LitType of the label. - attribution_method: - Optional. A string to choose what attribution configuration to - set up the explainer with. Valid options are 'sampled_shapley' - or 'integrated_gradients'. + model: + Required. A string reference to a local TensorFlow saved model directory. + The model must have at most one input and one output tensor. + input_types: + Required. An OrderedDict of string names matching the features of the model + as the key, and the associated LitType of the feature. + output_types: + Required. An OrderedDict of string names matching the labels of the model + as the key, and the associated LitType of the label. + attribution_method: + Optional. A string to choose what attribution configuration to + set up the explainer with. Valid options are 'sampled_shapley' + or 'integrated_gradients'. """ self._load_model(model) self._input_types = input_types @@ -120,6 +222,12 @@ def attribution_explainer(self,) -> Optional["AttributionExplainer"]: # noqa: F def predict_minibatch( self, inputs: List[lit_types.JsonDict] ) -> List[lit_types.JsonDict]: + """Retun predictions based on a batch of inputs. + Args: + inputs: Requred. a List of instances to predict on based on the input spec. + Returns: + A list of predictions based on the output spec. + """ instances = [] for input in inputs: instance = [input[feature] for feature in self._input_types] @@ -166,7 +274,7 @@ def output_spec(self) -> lit_types.Spec: def _load_model(self, model: str): """Loads a TensorFlow saved model and populates the input and output signature attributes of the class. Args: - model: Required. A string reference to a TensorFlow saved model directory. + model: Required. A string reference to a TensorFlow saved model directory. Raises: ValueError if the model has more than one input tensor or more than one output tensor. """ @@ -188,11 +296,11 @@ def _set_up_attribution_explainer( ): """Populates the attribution explainer attribute of the class. Args: - model: Required. A string reference to a TensorFlow saved model directory. + model: Required. A string reference to a TensorFlow saved model directory. attribution_method: - Optional. A string to choose what attribution configuration to - set up the explainer with. Valid options are 'sampled_shapley' - or 'integrated_gradients'. + Optional. A string to choose what attribution configuration to + set up the explainer with. Valid options are 'sampled_shapley' + or 'integrated_gradients'. """ try: import explainable_ai_sdk @@ -228,17 +336,44 @@ def create_lit_dataset( ) -> lit_dataset.Dataset: """Creates a LIT Dataset object. Args: - dataset: - Required. A Pandas DataFrame that includes feature column names and data. - column_types: - Required. An OrderedDict of string names matching the columns of the dataset - as the key, and the associated LitType of the column. + dataset: + Required. A Pandas DataFrame that includes feature column names and data. + column_types: + Required. An OrderedDict of string names matching the columns of the dataset + as the key, and the associated LitType of the column. Returns: A LIT Dataset object that has the data from the dataset provided. """ return _VertexLitDataset(dataset, column_types) +def create_lit_model_from_endpoint( + endpoint: Union[str, aiplatform.Endpoint], + input_types: "OrderedDict[str, lit_types.LitType]", # noqa: F821 + output_types: "OrderedDict[str, lit_types.LitType]", # noqa: F821 + model_id: Optional[str] = None, +) -> lit_model.Model: + """Creates a LIT Model object. + Args: + model: + Required. The name of the Endpoint resource or an Endpoint instance. + Endpoint name format: ``projects/{project}/locations/{location}/endpoints/{endpoint}`` + input_types: + Required. An OrderedDict of string names matching the features of the model + as the key, and the associated LitType of the feature. + output_types: + Required. An OrderedDict of string names matching the labels of the model + as the key, and the associated LitType of the label. + model_id: + Optional. A string of the specific model in the endpoint to create the + LIT model from. If this is not set, any usable model in the endpoint is + used to create the LIT model. + Returns: + A LIT Model object that has the same functionality as the model provided. + """ + return _EndpointLitModel(endpoint, input_types, output_types, model_id) + + def create_lit_model( model: str, input_types: "OrderedDict[str, lit_types.LitType]", # noqa: F821 @@ -247,23 +382,23 @@ def create_lit_model( ) -> lit_model.Model: """Creates a LIT Model object. Args: - model: - Required. A string reference to a local TensorFlow saved model directory. - The model must have at most one input and one output tensor. - input_types: - Required. An OrderedDict of string names matching the features of the model - as the key, and the associated LitType of the feature. - output_types: - Required. An OrderedDict of string names matching the labels of the model - as the key, and the associated LitType of the label. - attribution_method: - Optional. A string to choose what attribution configuration to - set up the explainer with. Valid options are 'sampled_shapley' - or 'integrated_gradients'. + model: + Required. A string reference to a local TensorFlow saved model directory. + The model must have at most one input and one output tensor. + input_types: + Required. An OrderedDict of string names matching the features of the model + as the key, and the associated LitType of the feature. + output_types: + Required. An OrderedDict of string names matching the labels of the model + as the key, and the associated LitType of the label. + attribution_method: + Optional. A string to choose what attribution configuration to + set up the explainer with. Valid options are 'sampled_shapley' + or 'integrated_gradients'. Returns: A LIT Model object that has the same functionality as the model provided. """ - return _VertexLitModel(model, input_types, output_types, attribution_method) + return _TensorFlowLitModel(model, input_types, output_types, attribution_method) def open_lit( @@ -273,12 +408,12 @@ def open_lit( ): """Open LIT from the provided models and datasets. Args: - models: - Required. A list of LIT models to open LIT with. - input_types: - Required. A lit of LIT datasets to open LIT with. - open_in_new_tab: - Optional. A boolean to choose if LIT open in a new tab or not. + models: + Required. A list of LIT models to open LIT with. + input_types: + Required. A lit of LIT datasets to open LIT with. + open_in_new_tab: + Optional. A boolean to choose if LIT open in a new tab or not. Raises: ImportError if LIT is not installed. """ @@ -297,26 +432,26 @@ def set_up_and_open_lit( ) -> Tuple[lit_dataset.Dataset, lit_model.Model]: """Creates a LIT dataset and model and opens LIT. Args: - dataset: - Required. A Pandas DataFrame that includes feature column names and data. - column_types: - Required. An OrderedDict of string names matching the columns of the dataset - as the key, and the associated LitType of the column. - model: - Required. A string reference to a TensorFlow saved model directory. - The model must have at most one input and one output tensor. - input_types: - Required. An OrderedDict of string names matching the features of the model - as the key, and the associated LitType of the feature. - output_types: - Required. An OrderedDict of string names matching the labels of the model - as the key, and the associated LitType of the label. - attribution_method: - Optional. A string to choose what attribution configuration to - set up the explainer with. Valid options are 'sampled_shapley' - or 'integrated_gradients'. - open_in_new_tab: - Optional. A boolean to choose if LIT open in a new tab or not. + dataset: + Required. A Pandas DataFrame that includes feature column names and data. + column_types: + Required. An OrderedDict of string names matching the columns of the dataset + as the key, and the associated LitType of the column. + model: + Required. A string reference to a TensorFlow saved model directory. + The model must have at most one input and one output tensor. + input_types: + Required. An OrderedDict of string names matching the features of the model + as the key, and the associated LitType of the feature. + output_types: + Required. An OrderedDict of string names matching the labels of the model + as the key, and the associated LitType of the label. + attribution_method: + Optional. A string to choose what attribution configuration to + set up the explainer with. Valid options are 'sampled_shapley' + or 'integrated_gradients'. + open_in_new_tab: + Optional. A boolean to choose if LIT open in a new tab or not. Returns: A Tuple of the LIT dataset and model created. Raises: diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index 8a85b1aa7ad..274f89d2aa0 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -123,9 +123,8 @@ def __init__( location=self.location, credentials=credentials, ) - @property - def featurestore_name(self) -> str: - """Full qualified resource name of the managed featurestore in which this EntityType is.""" + def _get_featurestore_name(self) -> str: + """Gets full qualified resource name of the managed featurestore in which this EntityType is.""" entity_type_name_components = self._parse_resource_name(self.resource_name) return featurestore.Featurestore._format_resource_name( project=entity_type_name_components["project"], @@ -133,6 +132,12 @@ def featurestore_name(self) -> str: featurestore=entity_type_name_components["featurestore"], ) + @property + def featurestore_name(self) -> str: + """Full qualified resource name of the managed featurestore in which this EntityType is.""" + self.wait() + return self._get_featurestore_name() + def get_featurestore(self) -> "featurestore.Featurestore": """Retrieves the managed featurestore in which this EntityType is. @@ -141,7 +146,7 @@ def get_featurestore(self) -> "featurestore.Featurestore": """ return featurestore.Featurestore(self.featurestore_name) - def get_feature(self, feature_id: str) -> "featurestore.Feature": + def _get_feature(self, feature_id: str) -> "featurestore.Feature": """Retrieves an existing managed feature in this EntityType. Args: @@ -151,7 +156,6 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature": featurestore.Feature - The managed feature resource object. """ entity_type_name_components = self._parse_resource_name(self.resource_name) - return featurestore.Feature( feature_name=featurestore.Feature._format_resource_name( project=entity_type_name_components["project"], @@ -162,6 +166,18 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature": ) ) + def get_feature(self, feature_id: str) -> "featurestore.Feature": + """Retrieves an existing managed feature in this EntityType. + + Args: + feature_id (str): + Required. The managed feature resource ID in this EntityType. + Returns: + featurestore.Feature - The managed feature resource object. + """ + self.wait() + return self._get_feature(feature_id=feature_id) + def update( self, description: Optional[str] = None, @@ -202,6 +218,7 @@ def update( Returns: EntityType - The updated entityType resource object. """ + self.wait() update_mask = list() if description: @@ -380,6 +397,7 @@ def list_features( Returns: List[featurestore.Feature] - A list of managed feature resource objects. """ + self.wait() return featurestore.Feature.list( entity_type_name=self.resource_name, filter=filter, order_by=order_by, ) @@ -399,7 +417,7 @@ def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None: """ features = [] for feature_id in feature_ids: - feature = self.get_feature(feature_id=feature_id) + feature = self._get_feature(feature_id=feature_id) feature.delete(sync=False) features.append(feature) @@ -626,6 +644,7 @@ def create_feature( featurestore.Feature - feature resource object """ + self.wait() return featurestore.Feature.create( feature_id=feature_id, value_type=value_type, @@ -761,8 +780,9 @@ def batch_create_features( return self + @staticmethod def _validate_and_get_import_feature_values_request( - self, + entity_type_name: str, feature_ids: List[str], feature_time: Union[str, datetime.datetime], data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource], @@ -773,6 +793,8 @@ def _validate_and_get_import_feature_values_request( ) -> gca_featurestore_service.ImportFeatureValuesRequest: """Validates and get import feature values request. Args: + entity_type_name (str): + Required. A fully-qualified entityType resource name. feature_ids (List[str]): Required. IDs of the Feature to import values of. The Features must exist in the target @@ -840,7 +862,7 @@ def _validate_and_get_import_feature_values_request( ] import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( - entity_type=self.resource_name, + entity_type=entity_type_name, feature_specs=feature_specs, entity_id_field=entity_id_field, disable_online_serving=disable_online_serving, @@ -992,6 +1014,7 @@ def ingest_from_bq( bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri) import_feature_values_request = self._validate_and_get_import_feature_values_request( + entity_type_name=self.resource_name, feature_ids=feature_ids, feature_time=feature_time, data_source=bigquery_source, @@ -1114,6 +1137,7 @@ def ingest_from_gcs( data_source = gca_io.AvroSource(gcs_source=gcs_source) import_feature_values_request = self._validate_and_get_import_feature_values_request( + entity_type_name=self.resource_name, feature_ids=feature_ids, feature_time=feature_time, data_source=data_source, @@ -1213,6 +1237,7 @@ def ingest_from_df( project=self.project, credentials=self.credentials ) + self.wait() entity_type_name_components = self._parse_resource_name(self.resource_name) featurestore_id, entity_type_id = ( entity_type_name_components["featurestore"], @@ -1222,6 +1247,8 @@ def ingest_from_df( temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace( "-", "_" ) + + # TODO(b/216497263): Add support for resource project does not match initializer.global_config.project temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ :1024 ] @@ -1297,7 +1324,7 @@ def read( Returns: pd.DataFrame: entities' feature values in DataFrame """ - + self.wait() if isinstance(feature_ids, str): feature_ids = [feature_ids] @@ -1339,7 +1366,7 @@ def read( feature_descriptor.id for feature_descriptor in header.feature_descriptors ] - return EntityType._construct_dataframe( + return self._construct_dataframe( feature_ids=feature_ids, entity_views=entity_views, ) diff --git a/google/cloud/aiplatform/featurestore/feature.py b/google/cloud/aiplatform/featurestore/feature.py index 08c91cdb1aa..1564abfd62a 100644 --- a/google/cloud/aiplatform/featurestore/feature.py +++ b/google/cloud/aiplatform/featurestore/feature.py @@ -122,17 +122,21 @@ def __init__( else featurestore_id, ) - @property - def featurestore_name(self) -> str: - """Full qualified resource name of the managed featurestore in which this Feature is.""" + def _get_featurestore_name(self) -> str: + """Gets full qualified resource name of the managed featurestore in which this Feature is.""" feature_path_components = self._parse_resource_name(self.resource_name) - return featurestore.Featurestore._format_resource_name( project=feature_path_components["project"], location=feature_path_components["location"], featurestore=feature_path_components["featurestore"], ) + @property + def featurestore_name(self) -> str: + """Full qualified resource name of the managed featurestore in which this Feature is.""" + self.wait() + return self._get_featurestore_name() + def get_featurestore(self) -> "featurestore.Featurestore": """Retrieves the managed featurestore in which this Feature is. @@ -141,11 +145,9 @@ def get_featurestore(self) -> "featurestore.Featurestore": """ return featurestore.Featurestore(featurestore_name=self.featurestore_name) - @property - def entity_type_name(self) -> str: - """Full qualified resource name of the managed entityType in which this Feature is.""" + def _get_entity_type_name(self) -> str: + """Gets full qualified resource name of the managed entityType in which this Feature is.""" feature_path_components = self._parse_resource_name(self.resource_name) - return featurestore.EntityType._format_resource_name( project=feature_path_components["project"], location=feature_path_components["location"], @@ -153,6 +155,12 @@ def entity_type_name(self) -> str: entity_type=feature_path_components["entity_type"], ) + @property + def entity_type_name(self) -> str: + """Full qualified resource name of the managed entityType in which this Feature is.""" + self.wait() + return self._get_entity_type_name() + def get_entity_type(self) -> "featurestore.EntityType": """Retrieves the managed entityType in which this Feature is. @@ -203,6 +211,7 @@ def update( Returns: Feature - The updated feature resource object. """ + self.wait() update_mask = list() if description: diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 6d02bb9f76f..648dd7ad2b7 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -16,6 +16,7 @@ # from typing import Dict, List, Optional, Sequence, Tuple, Union +import uuid from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 @@ -32,6 +33,8 @@ from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import featurestore_utils +from google.cloud import bigquery + _LOGGER = base.Logger(__name__) @@ -109,8 +112,19 @@ def get_entity_type(self, entity_type_id: str) -> "featurestore.EntityType": Returns: featurestore.EntityType - The managed entityType resource object. """ - featurestore_name_components = self._parse_resource_name(self.resource_name) + self.wait() + return self._get_entity_type(entity_type_id=entity_type_id) + + def _get_entity_type(self, entity_type_id: str) -> "featurestore.EntityType": + """Retrieves an existing managed entityType in this Featurestore. + Args: + entity_type_id (str): + Required. The managed entityType resource ID in this Featurestore. + Returns: + featurestore.EntityType - The managed entityType resource object. + """ + featurestore_name_components = self._parse_resource_name(self.resource_name) return featurestore.EntityType( entity_type_name=featurestore.EntityType._format_resource_name( project=featurestore_name_components["project"], @@ -221,6 +235,7 @@ def _update( Returns: Featurestore - The updated featurestore resource object. """ + self.wait() update_mask = list() if labels: @@ -310,6 +325,7 @@ def list_entity_types( Returns: List[featurestore.EntityType] - A list of managed entityType resource objects. """ + self.wait() return featurestore.EntityType.list( featurestore_name=self.resource_name, filter=filter, order_by=order_by, ) @@ -334,7 +350,7 @@ def delete_entity_types( """ entity_types = [] for entity_type_id in entity_type_ids: - entity_type = self.get_entity_type(entity_type_id=entity_type_id) + entity_type = self._get_entity_type(entity_type_id=entity_type_id) entity_type.delete(force=force, sync=False) entity_types.append(entity_type) @@ -547,6 +563,7 @@ def create_entity_type( featurestore.EntityType - EntityType resource object """ + self.wait() return featurestore.EntityType.create( entity_type_id=entity_type_id, featurestore_name=self.resource_name, @@ -591,21 +608,57 @@ def _batch_read_feature_values( return self + @staticmethod + def _validate_and_get_read_instances( + read_instances_uri: str, + ) -> Union[gca_io.BigQuerySource, gca_io.CsvSource]: + """Gets read_instances + + Args: + read_instances_uri (str): + Required. Read_instances_uri can be either BigQuery URI to an input table, + or Google Cloud Storage URI to a csv file. + + Returns: + Union[gca_io.BigQuerySource, gca_io.CsvSource]: + BigQuery source or Csv source for read instances. The Csv source contains exactly 1 URI. + + Raises: + ValueError if read_instances_uri does not start with 'bq://' or 'gs://'. + """ + if not ( + read_instances_uri.startswith("bq://") + or read_instances_uri.startswith("gs://") + ): + raise ValueError( + "The read_instances_uri should be a single uri starts with either 'bq://' or 'gs://'." + ) + + if read_instances_uri.startswith("bq://"): + return gca_io.BigQuerySource(input_uri=read_instances_uri) + if read_instances_uri.startswith("gs://"): + return gca_io.CsvSource( + gcs_source=gca_io.GcsSource(uris=[read_instances_uri]) + ) + def _validate_and_get_batch_read_feature_values_request( self, + featurestore_name: str, serving_feature_ids: Dict[str, List[str]], destination: Union[ gca_io.BigQueryDestination, gca_io.CsvDestination, gca_io.TFRecordDestination, ], - feature_destination_fields: Optional[Dict[str, str]] = None, - read_instances: Optional[Union[gca_io.BigQuerySource, gca_io.CsvSource]] = None, + read_instances: Union[gca_io.BigQuerySource, gca_io.CsvSource], pass_through_fields: Optional[List[str]] = None, + feature_destination_fields: Optional[Dict[str, str]] = None, ) -> gca_featurestore_service.BatchReadFeatureValuesRequest: """Validates and gets batch_read_feature_values_request Args: + featurestore_name (str): + Required. A fully-qualified featurestore resource name. serving_feature_ids (Dict[str, List[str]]): Required. A user defined dictionary to define the entity_types and their features for batch serve/read. The keys of the dictionary are the serving entity_type ids and @@ -619,7 +672,17 @@ def _validate_and_get_batch_read_feature_values_request( destination (Union[gca_io.BigQueryDestination, gca_io.CsvDestination, gca_io.TFRecordDestination]): Required. BigQuery destination, Csv destination or TFRecord destination. + read_instances (Union[gca_io.BigQuerySource, gca_io.CsvSource]): + Required. BigQuery source or Csv source for read instances. + The Csv source must contain exactly 1 URI. + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. feature_destination_fields (Dict[str, str]): Optional. A user defined dictionary to map a feature's fully qualified resource name to its destination field name. If the destination field name is not defined, @@ -631,22 +694,10 @@ def _validate_and_get_batch_read_feature_values_request( 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', } - read_instances (Union[gca_io.BigQuerySource, gca_io.CsvSource]): - Optional. BigQuery source or Csv source for read instances. - pass_through_fields (List[str]): - Optional. When not empty, the specified fields in the - read_instances source will be joined as-is in the output, - in addition to those fields from the Featurestore Entity. - - For BigQuery source, the type of the pass-through values - will be automatically inferred. For CSV source, the - pass-through values will be passed as opaque bytes. - Returns: gca_featurestore_service.BatchReadFeatureValuesRequest: batch read feature values request """ - - featurestore_name_components = self._parse_resource_name(self.resource_name) + featurestore_name_components = self._parse_resource_name(featurestore_name) feature_destination_fields = feature_destination_fields or {} @@ -684,7 +735,7 @@ def _validate_and_get_batch_read_feature_values_request( entity_type_specs.append(entity_type_spec) batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( - featurestore=self.resource_name, entity_type_specs=entity_type_specs, + featurestore=featurestore_name, entity_type_specs=entity_type_specs, ) if isinstance(destination, gca_io.BigQueryDestination): @@ -715,59 +766,14 @@ def _validate_and_get_batch_read_feature_values_request( return batch_read_feature_values_request - def _get_read_instances( - self, read_instances: Union[str, List[str]], - ) -> Union[gca_io.BigQuerySource, gca_io.CsvSource]: - """Gets read_instances - - Args: - read_instances (Union[str, List[str]]): - Required. Read_instances can be either BigQuery URI to the input table, - or Google Cloud Storage URI(-s) to the csv file(s). - - Returns: - Union[gca_io.BigQuerySource, gca_io.CsvSource]: BigQuery source or Csv source for read instances. - - Raises: - TypeError if read_instances is not a string or a list of strings. - ValueError if read_instances uri does not start with 'bq://' or 'gs://'. - ValueError if uris in read_instances do not start with 'gs://'. - """ - if isinstance(read_instances, str): - if not ( - read_instances.startswith("bq://") or read_instances.startswith("gs://") - ): - raise ValueError( - "The read_instances accepts a single uri starts with 'bq://' or 'gs://'." - ) - elif isinstance(read_instances, list) and all( - [isinstance(e, str) for e in read_instances] - ): - if not all([e.startswith("gs://") for e in read_instances]): - raise ValueError( - "The read_instances accepts a list of uris start with 'gs://' only." - ) - else: - raise TypeError( - "The read_instances type should to be either a str or a List[str]." - ) - - if isinstance(read_instances, str): - if read_instances.startswith("bq://"): - return gca_io.BigQuerySource(input_uri=read_instances) - else: - read_instances = [read_instances] - - return gca_io.CsvSource(gcs_source=gca_io.GcsSource(uris=read_instances)) - @base.optional_sync(return_input_arg="self") def batch_serve_to_bq( self, bq_destination_output_uri: str, serving_feature_ids: Dict[str, List[str]], - feature_destination_fields: Optional[Dict[str, str]] = None, - read_instances: Optional[Union[str, List[str]]] = None, + read_instances_uri: str, pass_through_fields: Optional[List[str]] = None, + feature_destination_fields: Optional[Dict[str, str]] = None, request_metadata: Optional[Sequence[Tuple[str, str]]] = (), sync: bool = True, ) -> "Featurestore": @@ -793,29 +799,16 @@ def batch_serve_to_bq( 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], } - feature_destination_fields (Dict[str, str]): - Optional. A user defined dictionary to map a feature's fully qualified resource name to - its destination field name. If the destination field name is not defined, - the feature ID will be used as its destination field name. + read_instances_uri (str): + Required. Read_instances_uri can be either BigQuery URI to an input table, + or Google Cloud Storage URI to a csv file. - Example: - feature_destination_fields = { - 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', - 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', - } - - read_instances (Union[str, List[str]]): - Optional. Read_instances can be either BigQuery URI to the input table, - or Google Cloud Storage URI(-s) to the - csv file(s). May contain wildcards. For more - information on wildcards, see - https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. Example: 'bq://project.dataset.table_name' or - ["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"] + "gs://my_bucket/my_file.csv" - Each read instance consists of exactly one read timestamp + Each read instance should consist of exactly one read timestamp and one or more entity IDs identifying entities of the corresponding EntityTypes whose Features are requested. @@ -844,6 +837,17 @@ def batch_serve_to_bq( will be automatically inferred. For CSV source, the pass-through values will be passed as opaque bytes. + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. + + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', + } + Returns: Featurestore: The featurestore resource object batch read feature values from. @@ -851,15 +855,16 @@ def batch_serve_to_bq( NotFound: if the BigQuery destination Dataset does not exist. FailedPrecondition: if the BigQuery destination Dataset/Table is in a different project. """ + read_instances = self._validate_and_get_read_instances(read_instances_uri) + batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + featurestore_name=self.resource_name, serving_feature_ids=serving_feature_ids, destination=gca_io.BigQueryDestination( output_uri=bq_destination_output_uri ), feature_destination_fields=feature_destination_fields, - read_instances=read_instances - if read_instances is None - else self._get_read_instances(read_instances), + read_instances=read_instances, pass_through_fields=pass_through_fields, ) @@ -874,9 +879,9 @@ def batch_serve_to_gcs( gcs_destination_output_uri_prefix: str, gcs_destination_type: str, serving_feature_ids: Dict[str, List[str]], - feature_destination_fields: Optional[Dict[str, str]] = None, - read_instances: Optional[Union[str, List[str]]] = None, + read_instances_uri: str, pass_through_fields: Optional[List[str]] = None, + feature_destination_fields: Optional[Dict[str, str]] = None, request_metadata: Optional[Sequence[Tuple[str, str]]] = (), sync: bool = True, ) -> "Featurestore": @@ -923,29 +928,16 @@ def batch_serve_to_gcs( 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], } - feature_destination_fields (Dict[str, str]): - Optional. A user defined dictionary to map a feature's fully qualified resource name to - its destination field name. If the destination field name is not defined, - the feature ID will be used as its destination field name. - - Example: - feature_destination_fields = { - 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', - 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', - } + read_instances_uri (str): + Required. Read_instances_uri can be either BigQuery URI to an input table, + or Google Cloud Storage URI to a csv file. - read_instances (Union[str, List[str]]): - Optional. Read_instances can be either BigQuery URI to the input table, - or Google Cloud Storage URI(-s) to the - csv file(s). May contain wildcards. For more - information on wildcards, see - https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. Example: 'bq://project.dataset.table_name' or - ["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"] + "gs://my_bucket/my_file.csv" - Each read instance consists of exactly one read timestamp + Each read instance should consist of exactly one read timestamp and one or more entity IDs identifying entities of the corresponding EntityTypes whose Features are requested. @@ -974,6 +966,17 @@ def batch_serve_to_gcs( will be automatically inferred. For CSV source, the pass-through values will be passed as opaque bytes. + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. + + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', + } + Returns: Featurestore: The featurestore resource object batch read feature values from. @@ -999,13 +1002,14 @@ def batch_serve_to_gcs( if gcs_destination_type == "tfrecord": destination = gca_io.TFRecordDestination(gcs_destination=gcs_destination) + read_instances = self._validate_and_get_read_instances(read_instances_uri) + batch_read_feature_values_request = self._validate_and_get_batch_read_feature_values_request( + featurestore_name=self.resource_name, serving_feature_ids=serving_feature_ids, destination=destination, feature_destination_fields=feature_destination_fields, - read_instances=read_instances - if read_instances is None - else self._get_read_instances(read_instances), + read_instances=read_instances, pass_through_fields=pass_through_fields, ) @@ -1013,3 +1017,193 @@ def batch_serve_to_gcs( batch_read_feature_values_request=batch_read_feature_values_request, request_metadata=request_metadata, ) + + def batch_serve_to_df( + self, + serving_feature_ids: Dict[str, List[str]], + read_instances_df: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd' + pass_through_fields: Optional[List[str]] = None, + feature_destination_fields: Optional[Dict[str, str]] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + ) -> "pd.DataFrame": # noqa: F821 - skip check for undefined name 'pd' + """ Batch serves feature values to pandas DataFrame + + Note: + Calling this method will automatically create and delete a temporary + bigquery dataset in the same GCP project, which will be used + as the intermediary storage for batch serve feature values + from featurestore to dataframe. + + Args: + serving_feature_ids (Dict[str, List[str]]): + Required. A user defined dictionary to define the entity_types and their features for batch serve/read. + The keys of the dictionary are the serving entity_type ids and + the values are lists of serving feature ids in each entity_type. + + Example: + serving_feature_ids = { + 'my_entity_type_id_1': ['feature_id_1_1', 'feature_id_1_2'], + 'my_entity_type_id_2': ['feature_id_2_1', 'feature_id_2_2'], + } + + read_instances_df (pd.DataFrame): + Required. Read_instances_df is a pandas DataFrame containing the read instances. + + Each read instance should consist of exactly one read timestamp + and one or more entity IDs identifying entities of the + corresponding EntityTypes whose Features are requested. + + Each output instance contains Feature values of requested + entities concatenated together as of the read time. + + An example read_instances_df may be + pd.DataFrame( + data=[ + { + "my_entity_type_id_1": "my_entity_type_id_1_entity_1", + "my_entity_type_id_2": "my_entity_type_id_2_entity_1", + "timestamp": "2020-01-01T10:00:00.123Z" + ], + ) + + An example batch_serve_output_df may be + pd.DataFrame( + data=[ + { + "my_entity_type_id_1": "my_entity_type_id_1_entity_1", + "my_entity_type_id_2": "my_entity_type_id_2_entity_1", + "foo": "feature_id_1_1_feature_value", + "feature_id_1_2": "feature_id_1_2_feature_value", + "feature_id_2_1": "feature_id_2_1_feature_value", + "bar": "feature_id_2_2_feature_value", + "timestamp": "2020-01-01T10:00:00.123Z" + ], + ) + + Timestamp in each read instance must be millisecond-aligned. + + The columns can be in any order. + + Values in the timestamp column must use the RFC 3339 format, + e.g. ``2012-07-30T10:43:17.123Z``. + + pass_through_fields (List[str]): + Optional. When not empty, the specified fields in the + read_instances source will be joined as-is in the output, + in addition to those fields from the Featurestore Entity. + + For BigQuery source, the type of the pass-through values + will be automatically inferred. For CSV source, the + pass-through values will be passed as opaque bytes. + + feature_destination_fields (Dict[str, str]): + Optional. A user defined dictionary to map a feature's fully qualified resource name to + its destination field name. If the destination field name is not defined, + the feature ID will be used as its destination field name. + + Example: + feature_destination_fields = { + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id1/features/f_id11': 'foo', + 'projects/123/locations/us-central1/featurestores/fs_id/entityTypes/et_id2/features/f_id22': 'bar', + } + + Returns: + pd.DataFrame: The pandas DataFrame containing feature values from batch serving. + + """ + try: + from google.cloud import bigquery_storage + except ImportError: + raise ImportError( + f"Google-Cloud-Bigquery-Storage is not installed. Please install google-cloud-bigquery-storage to use " + f"{self.batch_serve_to_df.__name__}" + ) + + try: + import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery' + except ImportError: + raise ImportError( + f"Pyarrow is not installed. Please install pyarrow to use " + f"{self.batch_serve_to_df.__name__}" + ) + + try: + import pandas as pd + except ImportError: + raise ImportError( + f"Pandas is not installed. Please install pandas to use " + f"{self.batch_serve_to_df.__name__}" + ) + + bigquery_client = bigquery.Client( + project=self.project, credentials=self.credentials + ) + + self.wait() + featurestore_name_components = self._parse_resource_name(self.resource_name) + featurestore_id = featurestore_name_components["featurestore"] + + temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace( + "-", "_" + ) + + # TODO(b/216497263): Add support for resource project does not match initializer.global_config.project + temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[ + :1024 + ] + temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id) + temp_bq_dataset.location = self.location + temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset) + + temp_bq_batch_serve_table_name = "batch_serve" + temp_bq_read_instances_table_name = "read_instances" + temp_bq_batch_serve_table_id = ( + f"{temp_bq_dataset_id}.{temp_bq_batch_serve_table_name}" + ) + temp_bq_read_instances_table_id = ( + f"{temp_bq_dataset_id}.{temp_bq_read_instances_table_name}" + ) + + try: + + job = bigquery_client.load_table_from_dataframe( + dataframe=read_instances_df, destination=temp_bq_read_instances_table_id + ) + job.result() + + self.batch_serve_to_bq( + bq_destination_output_uri=f"bq://{temp_bq_batch_serve_table_id}", + serving_feature_ids=serving_feature_ids, + read_instances_uri=f"bq://{temp_bq_read_instances_table_id}", + pass_through_fields=pass_through_fields, + feature_destination_fields=feature_destination_fields, + request_metadata=request_metadata, + ) + + bigquery_storage_read_client = bigquery_storage.BigQueryReadClient( + credentials=self.credentials + ) + read_session_proto = bigquery_storage_read_client.create_read_session( + parent=f"projects/{self.project}", + read_session=bigquery_storage.types.ReadSession( + table="projects/{project}/datasets/{dataset}/tables/{table}".format( + project=self.project, + dataset=temp_bq_dataset_name, + table=temp_bq_batch_serve_table_name, + ), + data_format=bigquery_storage.types.DataFormat.ARROW, + ), + ) + + frames = [] + for stream in read_session_proto.streams: + reader = bigquery_storage_read_client.read_rows(stream.name) + for message in reader.rows().pages: + frames.append(message.to_dataframe()) + + finally: + bigquery_client.delete_dataset( + dataset=temp_bq_dataset.dataset_id, delete_contents=True, + ) + + return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(frames) diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 1ab2986f077..e4e2947bc63 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -2534,7 +2534,7 @@ def export_model( def upload_xgboost_model_file( cls, model_file_path: str, - xgboost_version: str = "1.4", + xgboost_version: Optional[str] = None, display_name: str = "XGBoost model", description: Optional[str] = None, instance_schema_uri: Optional[str] = None, @@ -2674,7 +2674,7 @@ def upload_xgboost_model_file( container_image_uri = aiplatform.helpers.get_prebuilt_prediction_container_uri( region=location, framework="xgboost", - framework_version=xgboost_version, + framework_version=xgboost_version or "1.4", accelerator="cpu", ) @@ -2729,7 +2729,7 @@ def upload_xgboost_model_file( def upload_scikit_learn_model_file( cls, model_file_path: str, - sklearn_version: str = "1.0", + sklearn_version: Optional[str] = None, display_name: str = "Scikit-learn model", description: Optional[str] = None, instance_schema_uri: Optional[str] = None, @@ -2869,7 +2869,7 @@ def upload_scikit_learn_model_file( container_image_uri = aiplatform.helpers.get_prebuilt_prediction_container_uri( region=location, framework="sklearn", - framework_version=sklearn_version, + framework_version=sklearn_version or "1.0", accelerator="cpu", ) @@ -2923,7 +2923,7 @@ def upload_scikit_learn_model_file( def upload_tensorflow_saved_model( cls, saved_model_dir: str, - tensorflow_version: str = "2.7", + tensorflow_version: Optional[str] = None, use_gpu: bool = False, display_name: str = "Tensorflow model", description: Optional[str] = None, @@ -3061,7 +3061,7 @@ def upload_tensorflow_saved_model( container_image_uri = aiplatform.helpers.get_prebuilt_prediction_container_uri( region=location, framework="tensorflow", - framework_version=tensorflow_version, + framework_version=tensorflow_version or "2.7", accelerator="gpu" if use_gpu else "cpu", ) diff --git a/google/cloud/aiplatform_v1beta1/services/vizier_service/async_client.py b/google/cloud/aiplatform_v1beta1/services/vizier_service/async_client.py index 4f4ff19fb18..c55267c36f1 100644 --- a/google/cloud/aiplatform_v1beta1/services/vizier_service/async_client.py +++ b/google/cloud/aiplatform_v1beta1/services/vizier_service/async_client.py @@ -255,9 +255,7 @@ async def create_study( Returns: google.cloud.aiplatform_v1beta1.types.Study: - LINT.IfChange A message representing a Study. - """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -328,9 +326,7 @@ async def get_study( Returns: google.cloud.aiplatform_v1beta1.types.Study: - LINT.IfChange A message representing a Study. - """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -548,9 +544,7 @@ async def lookup_study( Returns: google.cloud.aiplatform_v1beta1.types.Study: - LINT.IfChange A message representing a Study. - """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have diff --git a/google/cloud/aiplatform_v1beta1/services/vizier_service/client.py b/google/cloud/aiplatform_v1beta1/services/vizier_service/client.py index f4b0dc15046..0d8d1b9700e 100644 --- a/google/cloud/aiplatform_v1beta1/services/vizier_service/client.py +++ b/google/cloud/aiplatform_v1beta1/services/vizier_service/client.py @@ -478,9 +478,7 @@ def create_study( Returns: google.cloud.aiplatform_v1beta1.types.Study: - LINT.IfChange A message representing a Study. - """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -551,9 +549,7 @@ def get_study( Returns: google.cloud.aiplatform_v1beta1.types.Study: - LINT.IfChange A message representing a Study. - """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have @@ -771,9 +767,7 @@ def lookup_study( Returns: google.cloud.aiplatform_v1beta1.types.Study: - LINT.IfChange A message representing a Study. - """ # Create or coerce a protobuf request object. # Quick check: If we got a request object, we should *not* have diff --git a/google/cloud/aiplatform_v1beta1/types/custom_job.py b/google/cloud/aiplatform_v1beta1/types/custom_job.py index 7ec2a92c038..7d7edd9e37e 100644 --- a/google/cloud/aiplatform_v1beta1/types/custom_job.py +++ b/google/cloud/aiplatform_v1beta1/types/custom_job.py @@ -120,6 +120,7 @@ class CustomJob(proto.Message): class CustomJobSpec(proto.Message): r"""Represents the spec of a CustomJob. + Next Id: 14 Attributes: worker_pool_specs (Sequence[google.cloud.aiplatform_v1beta1.types.WorkerPoolSpec]): diff --git a/google/cloud/aiplatform_v1beta1/types/featurestore.py b/google/cloud/aiplatform_v1beta1/types/featurestore.py index 5d19c1dee71..f28059c6e9b 100644 --- a/google/cloud/aiplatform_v1beta1/types/featurestore.py +++ b/google/cloud/aiplatform_v1beta1/types/featurestore.py @@ -81,13 +81,39 @@ class OnlineServingConfig(proto.Message): Attributes: fixed_node_count (int): - The number of nodes for each cluster. The - number of nodes will not scale automatically but - can be scaled manually by providing different - values when updating. + The number of nodes for each cluster. The number of nodes + will not scale automatically but can be scaled manually by + providing different values when updating. Only one of + ``fixed_node_count`` and ``scaling`` can be set. Setting one + will reset the other. + scaling (google.cloud.aiplatform_v1beta1.types.Featurestore.OnlineServingConfig.Scaling): + Online serving scaling configuration. Only one of + ``fixed_node_count`` and ``scaling`` can be set. Setting one + will reset the other. """ + class Scaling(proto.Message): + r"""Online serving scaling configuration. If min_node_count and + max_node_count are set to the same value, the cluster will be + configured with the fixed number of node (no auto-scaling). + + Attributes: + min_node_count (int): + Required. The minimum number of nodes to + scale down to. Must be greater than or equal to + 1. + max_node_count (int): + The maximum number of nodes to scale up to. Must be greater + or equal to min_node_count. + """ + + min_node_count = proto.Field(proto.INT32, number=1,) + max_node_count = proto.Field(proto.INT32, number=2,) + fixed_node_count = proto.Field(proto.INT32, number=2,) + scaling = proto.Field( + proto.MESSAGE, number=4, message="Featurestore.OnlineServingConfig.Scaling", + ) name = proto.Field(proto.STRING, number=1,) create_time = proto.Field(proto.MESSAGE, number=3, message=timestamp_pb2.Timestamp,) diff --git a/google/cloud/aiplatform_v1beta1/types/index_endpoint.py b/google/cloud/aiplatform_v1beta1/types/index_endpoint.py index c4239573754..4bb17029db9 100644 --- a/google/cloud/aiplatform_v1beta1/types/index_endpoint.py +++ b/google/cloud/aiplatform_v1beta1/types/index_endpoint.py @@ -171,6 +171,15 @@ class DeployedIndex(proto.Message): don't provide SLA when min_replica_count=1). If max_replica_count is not set, the default value is min_replica_count. The max allowed replica count is 1000. + dedicated_resources (google.cloud.aiplatform_v1beta1.types.DedicatedResources): + Optional. A description of resources that are dedicated to + the DeployedIndex, and that need a higher degree of manual + configuration. If min_replica_count is not set, the default + value is 2 (we don't provide SLA when min_replica_count=1). + If max_replica_count is not set, the default value is + min_replica_count. The max allowed replica count is 1000. + + Available machine types: n1-standard-16 n1-standard-32 enable_access_logging (bool): Optional. If true, private endpoint's access logs are sent to StackDriver Logging. @@ -227,6 +236,9 @@ class DeployedIndex(proto.Message): automatic_resources = proto.Field( proto.MESSAGE, number=7, message=machine_resources.AutomaticResources, ) + dedicated_resources = proto.Field( + proto.MESSAGE, number=16, message=machine_resources.DedicatedResources, + ) enable_access_logging = proto.Field(proto.BOOL, number=8,) deployed_index_auth_config = proto.Field( proto.MESSAGE, number=9, message="DeployedIndexAuthConfig", diff --git a/google/cloud/aiplatform_v1beta1/types/model.py b/google/cloud/aiplatform_v1beta1/types/model.py index 4120486a5e2..f40e74a3b28 100644 --- a/google/cloud/aiplatform_v1beta1/types/model.py +++ b/google/cloud/aiplatform_v1beta1/types/model.py @@ -397,7 +397,7 @@ class ModelContainerSpec(proto.Message): r"""Specification of a container for serving predictions. Some fields in this message correspond to fields in the `Kubernetes Container v1 core - specification `__. + specification `__. Attributes: image_uri (str): @@ -463,7 +463,7 @@ class ModelContainerSpec(proto.Message): this syntax with ``$$``; for example: $$(VARIABLE_NAME) This field corresponds to the ``command`` field of the Kubernetes Containers `v1 core - API `__. + API `__. args (Sequence[str]): Immutable. Specifies arguments for the command that runs when the container starts. This overrides the container's @@ -502,7 +502,7 @@ class ModelContainerSpec(proto.Message): this syntax with ``$$``; for example: $$(VARIABLE_NAME) This field corresponds to the ``args`` field of the Kubernetes Containers `v1 core - API `__. + API `__. env (Sequence[google.cloud.aiplatform_v1beta1.types.EnvVar]): Immutable. List of environment variables to set in the container. After the container starts running, code running @@ -535,7 +535,7 @@ class ModelContainerSpec(proto.Message): This field corresponds to the ``env`` field of the Kubernetes Containers `v1 core - API `__. + API `__. ports (Sequence[google.cloud.aiplatform_v1beta1.types.Port]): Immutable. List of ports to expose from the container. Vertex AI sends any prediction requests that it receives to @@ -558,7 +558,7 @@ class ModelContainerSpec(proto.Message): Vertex AI does not use ports other than the first one listed. This field corresponds to the ``ports`` field of the Kubernetes Containers `v1 core - API `__. + API `__. predict_route (str): Immutable. HTTP path on the container to send prediction requests to. Vertex AI forwards requests sent using diff --git a/google/cloud/aiplatform_v1beta1/types/model_deployment_monitoring_job.py b/google/cloud/aiplatform_v1beta1/types/model_deployment_monitoring_job.py index 146fba7fd63..7fa47a9c66d 100644 --- a/google/cloud/aiplatform_v1beta1/types/model_deployment_monitoring_job.py +++ b/google/cloud/aiplatform_v1beta1/types/model_deployment_monitoring_job.py @@ -284,9 +284,10 @@ class ModelDeploymentMonitoringScheduleConfig(proto.Message): Attributes: monitor_interval (google.protobuf.duration_pb2.Duration): - Required. The model monitoring job running + Required. The model monitoring job scheduling interval. It will be rounded up to next full - hour. + hour. This defines how often the monitoring jobs + are triggered. """ monitor_interval = proto.Field( diff --git a/google/cloud/aiplatform_v1beta1/types/study.py b/google/cloud/aiplatform_v1beta1/types/study.py index 77032803f90..beccba62e96 100644 --- a/google/cloud/aiplatform_v1beta1/types/study.py +++ b/google/cloud/aiplatform_v1beta1/types/study.py @@ -27,8 +27,7 @@ class Study(proto.Message): - r"""LINT.IfChange - A message representing a Study. + r"""A message representing a Study. Attributes: name (str): diff --git a/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample.py b/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample.py index 7d69cc12652..aceb81196fc 100644 --- a/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample.py +++ b/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample.py @@ -26,10 +26,10 @@ def create_training_pipeline_tabular_forecasting_sample( target_column: str, time_series_identifier_column: str, time_column: str, - static_columns: str, - time_variant_past_only_columns: str, - time_variant_past_and_future_columns: str, - forecast_window_end: int, + time_series_attribute_columns: str, + unavailable_at_forecast: str, + available_at_forecast: str, + forecast_horizon: int, location: str = "us-central1", api_endpoint: str = "us-central1-aiplatform.googleapis.com", ): @@ -47,7 +47,7 @@ def create_training_pipeline_tabular_forecasting_sample( {"auto": {"column_name": "deaths"}}, ] - period = {"unit": "day", "quantity": 1} + data_granularity = {"unit": "day", "quantity": 1} # the inputs should be formatted according to the training_task_definition yaml file training_task_inputs_dict = { @@ -56,13 +56,13 @@ def create_training_pipeline_tabular_forecasting_sample( "timeSeriesIdentifierColumn": time_series_identifier_column, "timeColumn": time_column, "transformations": transformations, - "period": period, + "dataGranularity": data_granularity, "optimizationObjective": "minimize-rmse", "trainBudgetMilliNodeHours": 8000, - "staticColumns": static_columns, - "timeVariantPastOnlyColumns": time_variant_past_only_columns, - "timeVariantPastAndFutureColumns": time_variant_past_and_future_columns, - "forecastWindowEnd": forecast_window_end, + "timeSeriesAttributeColumns": time_series_attribute_columns, + "unavailableAtForecast": unavailable_at_forecast, + "availableAtForecast": available_at_forecast, + "forecastHorizon": forecast_horizon, } training_task_inputs = json_format.ParseDict(training_task_inputs_dict, Value()) diff --git a/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample_test.py b/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample_test.py index d58b68f8fee..fde61c4e43c 100644 --- a/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample_test.py +++ b/samples/snippets/pipeline_service/create_training_pipeline_tabular_forecasting_sample_test.py @@ -77,10 +77,10 @@ def test_ucaip_generated_create_training_pipeline_sample(capsys, shared_state): target_column=TARGET_COLUMN, time_series_identifier_column="county", time_column="date", - static_columns=["state_name"], - time_variant_past_only_columns=["deaths"], - time_variant_past_and_future_columns=["date"], - forecast_window_end=10, + time_series_attribute_columns=["state_name"], + unavailable_at_forecast=["deaths"], + available_at_forecast=["date"], + forecast_horizon=10, ) out, _ = capsys.readouterr() diff --git a/setup.py b/setup.py index 011da8aea8e..8c1872c300f 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,11 @@ "werkzeug >= 2.0.0", "tensorflow >=2.4.0", ] -featurestore_extra_require = ["pandas >= 1.0.0", "pyarrow >= 6.0.1"] +featurestore_extra_require = [ + "google-cloud-bigquery-storage", + "pandas >= 1.0.0", + "pyarrow >= 6.0.1", +] full_extra_require = list( set( diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index 03070eee3c9..cbbfd82efb2 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -411,6 +411,82 @@ def test_search_features(self, shared_state): len(list_searched_features) - shared_state["base_list_searched_features"] ) == 6 + def test_batch_serve_to_df(self, shared_state, caplog): + + assert shared_state["featurestore"] + assert shared_state["user_age_feature_resource_name"] + assert shared_state["user_gender_feature_resource_name"] + assert shared_state["user_liked_genres_feature_resource_name"] + + featurestore = shared_state["featurestore"] + + user_age_feature_resource_name = shared_state["user_age_feature_resource_name"] + user_gender_feature_resource_name = shared_state[ + "user_gender_feature_resource_name" + ] + user_liked_genres_feature_resource_name = shared_state[ + "user_liked_genres_feature_resource_name" + ] + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + caplog.set_level(logging.INFO) + + read_instances_df = pd.DataFrame( + data=[ + ["alice", "movie_01", "2021-09-15T08:28:14Z"], + ["bob", "movie_02", "2021-09-15T08:28:14Z"], + ["dav", "movie_03", "2021-09-15T08:28:14Z"], + ["eve", "movie_04", "2021-09-15T08:28:14Z"], + ["alice", "movie_03", "2021-09-14T09:35:15Z"], + ["bob", "movie_04", "2020-02-14T09:35:15Z"], + ], + columns=["users", "movies", "timestamp"], + ) + read_instances_df = read_instances_df.astype({"timestamp": "datetime64"}) + + df = featurestore.batch_serve_to_df( + serving_feature_ids={ + _TEST_USER_ENTITY_TYPE_ID: [ + _TEST_USER_AGE_FEATURE_ID, + _TEST_USER_GENDER_FEATURE_ID, + _TEST_USER_LIKED_GENRES_FEATURE_ID, + ], + _TEST_MOVIE_ENTITY_TYPE_ID: [ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + }, + read_instances_df=read_instances_df, + feature_destination_fields={ + user_age_feature_resource_name: "user_age_dest", + user_gender_feature_resource_name: "user_gender_dest", + user_liked_genres_feature_resource_name: "user_liked_genres_dest", + }, + ) + + expected_df_columns = [ + "timestamp", + "entity_type_users", + "user_age_dest", + "user_gender_dest", + "user_liked_genres_dest", + "entity_type_movies", + "title", + "genres", + "average_rating", + ] + + assert type(df) == pd.DataFrame + assert list(df.columns) == expected_df_columns + assert df.size == 54 + assert "Featurestore feature values served." in caplog.text + + caplog.clear() + def test_batch_serve_to_gcs(self, shared_state, caplog): assert shared_state["featurestore"] @@ -448,12 +524,12 @@ def test_batch_serve_to_gcs(self, shared_state, caplog): _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, ], }, + read_instances_uri=_TEST_READ_INSTANCE_SRC, feature_destination_fields={ user_age_feature_resource_name: "user_age_dest", user_gender_feature_resource_name: "user_gender_dest", user_liked_genres_feature_resource_name: "user_liked_genres_dest", }, - read_instances=_TEST_READ_INSTANCE_SRC, gcs_destination_output_uri_prefix=f"gs://{bucket_name}/featurestore_test/tfrecord", gcs_destination_type="tfrecord", ) @@ -498,12 +574,12 @@ def test_batch_serve_to_bq(self, shared_state, caplog): _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, ], }, + read_instances_uri=_TEST_READ_INSTANCE_SRC, feature_destination_fields={ user_age_feature_resource_name: "user_age_dest", user_gender_feature_resource_name: "user_gender_dest", user_liked_genres_feature_resource_name: "user_liked_genres_dest", }, - read_instances=_TEST_READ_INSTANCE_SRC, bq_destination_output_uri=f"bq://{bigquery_dataset_id}.test_table", ) @@ -525,3 +601,9 @@ def test_online_reads(self, shared_state): feature_ids=[_TEST_MOVIE_TITLE_FEATURE_ID, _TEST_MOVIE_GENRES_FEATURE_ID], ) assert type(movie_entity_views) == pd.DataFrame + + movie_entity_views = movie_entity_type.read( + entity_ids="movie_01", + feature_ids=[_TEST_MOVIE_TITLE_FEATURE_ID, _TEST_MOVIE_GENRES_FEATURE_ID], + ) + assert type(movie_entity_views) == pd.DataFrame diff --git a/tests/unit/aiplatform/test_explain_lit.py b/tests/unit/aiplatform/test_explain_lit.py index 8f10193c7b1..c8092b17424 100644 --- a/tests/unit/aiplatform/test_explain_lit.py +++ b/tests/unit/aiplatform/test_explain_lit.py @@ -21,16 +21,104 @@ import pytest import tensorflow as tf +from google.auth import credentials as auth_credentials +from google.cloud import aiplatform +from google.cloud.aiplatform import initializer +from google.cloud.aiplatform.compat.types import ( + endpoint as gca_endpoint, + prediction_service as gca_prediction_service, + explanation as gca_explanation, +) from google.cloud.aiplatform.explain.lit import ( create_lit_dataset, create_lit_model, + create_lit_model_from_endpoint, open_lit, set_up_and_open_lit, ) +from google.cloud.aiplatform_v1.services.endpoint_service import ( + client as endpoint_service_client, +) +from google.cloud.aiplatform_v1.services.prediction_service import ( + client as prediction_service_client, +) +from importlib import reload from lit_nlp.api import types as lit_types from lit_nlp import notebook from unittest import mock +_TEST_PROJECT = "test-project" +_TEST_LOCATION = "us-central1" +_TEST_ID = "1028944691210842416" +_TEST_ID_2 = "4366591682456584192" +_TEST_ID_3 = "5820582938582924817" +_TEST_DISPLAY_NAME = "test-display-name" +_TEST_DISPLAY_NAME_2 = "test-display-name-2" +_TEST_DISPLAY_NAME_3 = "test-display-name-3" +_TEST_ENDPOINT_NAME = ( + f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/endpoints/{_TEST_ID}" +) +_TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials()) +_TEST_EXPLANATION_METADATA = aiplatform.explain.ExplanationMetadata( + inputs={ + "features": { + "input_tensor_name": "dense_input", + "encoding": "BAG_OF_FEATURES", + "modality": "numeric", + "index_feature_mapping": ["abc", "def", "ghj"], + } + }, + outputs={"medv": {"output_tensor_name": "dense_2"}}, +) +_TEST_EXPLANATION_PARAMETERS = aiplatform.explain.ExplanationParameters( + {"sampled_shapley_attribution": {"path_count": 10}} +) +_TEST_DEPLOYED_MODELS = [ + gca_endpoint.DeployedModel(id=_TEST_ID, display_name=_TEST_DISPLAY_NAME), + gca_endpoint.DeployedModel(id=_TEST_ID_2, display_name=_TEST_DISPLAY_NAME_2), + gca_endpoint.DeployedModel(id=_TEST_ID_3, display_name=_TEST_DISPLAY_NAME_3), +] +_TEST_DEPLOYED_MODELS_WITH_EXPLANATION = [ + gca_endpoint.DeployedModel( + id=_TEST_ID, + display_name=_TEST_DISPLAY_NAME, + explanation_spec=gca_explanation.ExplanationSpec( + metadata=_TEST_EXPLANATION_METADATA, + parameters=_TEST_EXPLANATION_PARAMETERS, + ), + ), + gca_endpoint.DeployedModel( + id=_TEST_ID_2, + display_name=_TEST_DISPLAY_NAME_2, + explanation_spec=gca_explanation.ExplanationSpec( + metadata=_TEST_EXPLANATION_METADATA, + parameters=_TEST_EXPLANATION_PARAMETERS, + ), + ), + gca_endpoint.DeployedModel( + id=_TEST_ID_3, + display_name=_TEST_DISPLAY_NAME_3, + explanation_spec=gca_explanation.ExplanationSpec( + metadata=_TEST_EXPLANATION_METADATA, + parameters=_TEST_EXPLANATION_PARAMETERS, + ), + ), +] +_TEST_TRAFFIC_SPLIT = {_TEST_ID: 0, _TEST_ID_2: 100, _TEST_ID_3: 0} +_TEST_PREDICTION = [{"label": 1.0}] +_TEST_EXPLANATIONS = [gca_prediction_service.explanation.Explanation(attributions=[])] +_TEST_ATTRIBUTIONS = [ + gca_prediction_service.explanation.Attribution( + baseline_output_value=1.0, + instance_output_value=2.0, + feature_attributions={"feature_1": 3.0, "feature_2": 2.0}, + output_index=[1, 2, 3], + output_display_name="abc", + approximation_error=6.0, + output_name="xyz", + ) +] + @pytest.fixture def widget_render_mock(): @@ -57,16 +145,25 @@ def load_model_from_local_path_mock(): "feature_1": 0.01, "feature_2": 0.1, } - model_mock.explain.return_value = [ - explanation_mock - # , explanation_mock - ] + model_mock.explain.return_value = [explanation_mock] explainer_mock.return_value = model_mock yield explainer_mock @pytest.fixture -def set_up_sequential(tmpdir): +def feature_types(): + yield collections.OrderedDict( + [("feature_1", lit_types.Scalar()), ("feature_2", lit_types.Scalar())] + ) + + +@pytest.fixture +def label_types(): + yield collections.OrderedDict([("label", lit_types.RegressionScore())]) + + +@pytest.fixture +def set_up_sequential(tmpdir, feature_types, label_types): # Set up a sequential model seq_model = tf.keras.models.Sequential() seq_model.add(tf.keras.layers.Dense(32, activation="relu", input_shape=(2,))) @@ -74,10 +171,6 @@ def set_up_sequential(tmpdir): seq_model.add(tf.keras.layers.Dense(1, activation="sigmoid")) saved_model_path = str(tmpdir.mkdir("tmp")) tf.saved_model.save(seq_model, saved_model_path) - feature_types = collections.OrderedDict( - [("feature_1", lit_types.Scalar()), ("feature_2", lit_types.Scalar())] - ) - label_types = collections.OrderedDict([("label", lit_types.RegressionScore())]) yield feature_types, label_types, saved_model_path @@ -96,130 +189,302 @@ def set_up_pandas_dataframe_and_columns(): yield dataframe, columns -def test_create_lit_dataset_from_pandas_returns_dataset( - set_up_pandas_dataframe_and_columns, -): - pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns - lit_dataset = create_lit_dataset(pd_dataset, lit_columns) - expected_examples = [ - {"feature_1": 1.0, "feature_2": 3.0, "label": 1.0}, - ] +@pytest.fixture +def get_endpoint_with_models_mock(): + with mock.patch.object( + endpoint_service_client.EndpointServiceClient, "get_endpoint" + ) as get_endpoint_mock: + get_endpoint_mock.return_value = gca_endpoint.Endpoint( + display_name=_TEST_DISPLAY_NAME, + name=_TEST_ENDPOINT_NAME, + deployed_models=_TEST_DEPLOYED_MODELS, + traffic_split=_TEST_TRAFFIC_SPLIT, + ) + yield get_endpoint_mock - assert lit_dataset.spec() == dict(lit_columns) - assert expected_examples == lit_dataset._examples +@pytest.fixture +def get_endpoint_with_models_with_explanation_mock(): + with mock.patch.object( + endpoint_service_client.EndpointServiceClient, "get_endpoint" + ) as get_endpoint_mock: + get_endpoint_mock.return_value = gca_endpoint.Endpoint( + display_name=_TEST_DISPLAY_NAME, + name=_TEST_ENDPOINT_NAME, + deployed_models=_TEST_DEPLOYED_MODELS_WITH_EXPLANATION, + traffic_split=_TEST_TRAFFIC_SPLIT, + ) + yield get_endpoint_mock -def test_create_lit_model_from_tensorflow_returns_model(set_up_sequential): - feature_types, label_types, saved_model_path = set_up_sequential - lit_model = create_lit_model(saved_model_path, feature_types, label_types) - test_inputs = [ - {"feature_1": 1.0, "feature_2": 2.0}, - ] - outputs = lit_model.predict_minibatch(test_inputs) - assert lit_model.input_spec() == dict(feature_types) - assert lit_model.output_spec() == dict(label_types) - assert len(outputs) == 1 - for item in outputs: - assert item.keys() == {"label"} - assert len(item.values()) == 1 +@pytest.fixture +def predict_client_predict_mock(): + with mock.patch.object( + prediction_service_client.PredictionServiceClient, "predict" + ) as predict_mock: + predict_mock.return_value = gca_prediction_service.PredictResponse( + deployed_model_id=_TEST_ID + ) + predict_mock.return_value.predictions.extend(_TEST_PREDICTION) + yield predict_mock -@mock.patch.dict(os.environ, {"LIT_PROXY_URL": "auto"}) -@pytest.mark.usefixtures( - "sampled_shapley_explainer_mock", "load_model_from_local_path_mock" -) -def test_create_lit_model_from_tensorflow_with_xai_returns_model(set_up_sequential): - feature_types, label_types, saved_model_path = set_up_sequential - lit_model = create_lit_model(saved_model_path, feature_types, label_types) - test_inputs = [ - {"feature_1": 1.0, "feature_2": 2.0}, - ] - outputs = lit_model.predict_minibatch(test_inputs) - - assert lit_model.input_spec() == dict(feature_types) - assert lit_model.output_spec() == dict( - {**label_types, "feature_attribution": lit_types.FeatureSalience(signed=True)} - ) - assert len(outputs) == 1 - for item in outputs: - assert item.keys() == {"label", "feature_attribution"} - assert len(item.values()) == 2 - - -def test_open_lit( - set_up_sequential, set_up_pandas_dataframe_and_columns, widget_render_mock -): - pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns - lit_dataset = create_lit_dataset(pd_dataset, lit_columns) - feature_types, label_types, saved_model_path = set_up_sequential - lit_model = create_lit_model(saved_model_path, feature_types, label_types) - - open_lit({"model": lit_model}, {"dataset": lit_dataset}) - widget_render_mock.assert_called_once() - - -def test_set_up_and_open_lit( - set_up_sequential, set_up_pandas_dataframe_and_columns, widget_render_mock -): - pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns - feature_types, label_types, saved_model_path = set_up_sequential - lit_dataset, lit_model = set_up_and_open_lit( - pd_dataset, lit_columns, saved_model_path, feature_types, label_types - ) +@pytest.fixture +def predict_client_explain_mock(): + with mock.patch.object( + prediction_service_client.PredictionServiceClient, "explain" + ) as predict_mock: + predict_mock.return_value = gca_prediction_service.ExplainResponse( + deployed_model_id=_TEST_ID, + ) + predict_mock.return_value.predictions.extend(_TEST_PREDICTION) + predict_mock.return_value.explanations.extend(_TEST_EXPLANATIONS) + predict_mock.return_value.explanations[0].attributions.extend( + _TEST_ATTRIBUTIONS + ) + yield predict_mock + + +class TestExplainLit: + def setup_method(self): + reload(initializer) + reload(aiplatform) + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + def teardown_method(self): + initializer.global_pool.shutdown(wait=True) + + def test_create_lit_dataset_from_pandas_returns_dataset( + self, set_up_pandas_dataframe_and_columns, + ): + pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns + lit_dataset = create_lit_dataset(pd_dataset, lit_columns) + expected_examples = [ + {"feature_1": 1.0, "feature_2": 3.0, "label": 1.0}, + ] - expected_examples = [ - {"feature_1": 1.0, "feature_2": 3.0, "label": 1.0}, - ] - test_inputs = [ - {"feature_1": 1.0, "feature_2": 2.0}, - ] - outputs = lit_model.predict_minibatch(test_inputs) + assert lit_dataset.spec() == dict(lit_columns) + assert expected_examples == lit_dataset._examples - assert lit_dataset.spec() == dict(lit_columns) - assert expected_examples == lit_dataset._examples + def test_create_lit_model_from_tensorflow_returns_model(self, set_up_sequential): + feature_types, label_types, saved_model_path = set_up_sequential + lit_model = create_lit_model(saved_model_path, feature_types, label_types) + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) + + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict(label_types) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label"} + assert len(item.values()) == 1 + + @mock.patch.dict(os.environ, {"LIT_PROXY_URL": "auto"}) + @pytest.mark.usefixtures( + "sampled_shapley_explainer_mock", "load_model_from_local_path_mock" + ) + def test_create_lit_model_from_tensorflow_with_xai_returns_model( + self, set_up_sequential + ): + feature_types, label_types, saved_model_path = set_up_sequential + lit_model = create_lit_model(saved_model_path, feature_types, label_types) + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) + + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict( + { + **label_types, + "feature_attribution": lit_types.FeatureSalience(signed=True), + } + ) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label", "feature_attribution"} + assert len(item.values()) == 2 + + @pytest.mark.usefixtures( + "predict_client_predict_mock", "get_endpoint_with_models_mock" + ) + @pytest.mark.parametrize("model_id", [None, _TEST_ID]) + def test_create_lit_model_from_endpoint_returns_model( + self, feature_types, label_types, model_id + ): + endpoint = aiplatform.Endpoint(_TEST_ENDPOINT_NAME) + lit_model = create_lit_model_from_endpoint( + endpoint, feature_types, label_types, model_id + ) + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) - assert lit_model.input_spec() == dict(feature_types) - assert lit_model.output_spec() == dict(label_types) - assert len(outputs) == 1 - for item in outputs: - assert item.keys() == {"label"} - assert len(item.values()) == 1 + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict(label_types) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label"} + assert len(item.values()) == 1 - widget_render_mock.assert_called_once() + @pytest.mark.usefixtures( + "predict_client_explain_mock", "get_endpoint_with_models_with_explanation_mock" + ) + @pytest.mark.parametrize("model_id", [None, _TEST_ID]) + def test_create_lit_model_from_endpoint_with_xai_returns_model( + self, feature_types, label_types, model_id + ): + endpoint = aiplatform.Endpoint(_TEST_ENDPOINT_NAME) + lit_model = create_lit_model_from_endpoint( + endpoint, feature_types, label_types, model_id + ) + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) + + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict( + { + **label_types, + "feature_attribution": lit_types.FeatureSalience(signed=True), + } + ) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label", "feature_attribution"} + assert len(item.values()) == 2 + + @pytest.mark.usefixtures( + "predict_client_predict_mock", "get_endpoint_with_models_mock" + ) + @pytest.mark.parametrize("model_id", [None, _TEST_ID]) + def test_create_lit_model_from_endpoint_name_returns_model( + self, feature_types, label_types, model_id + ): + lit_model = create_lit_model_from_endpoint( + _TEST_ENDPOINT_NAME, feature_types, label_types, model_id + ) + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict(label_types) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label"} + assert len(item.values()) == 1 -@mock.patch.dict(os.environ, {"LIT_PROXY_URL": "auto"}) -@pytest.mark.usefixtures( - "sampled_shapley_explainer_mock", "load_model_from_local_path_mock" -) -def test_set_up_and_open_lit_with_xai( - set_up_sequential, set_up_pandas_dataframe_and_columns, widget_render_mock -): - pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns - feature_types, label_types, saved_model_path = set_up_sequential - lit_dataset, lit_model = set_up_and_open_lit( - pd_dataset, lit_columns, saved_model_path, feature_types, label_types + @pytest.mark.usefixtures( + "predict_client_explain_mock", "get_endpoint_with_models_with_explanation_mock" ) + @pytest.mark.parametrize("model_id", [None, _TEST_ID]) + def test_create_lit_model_from_endpoint_name_with_xai_returns_model( + self, feature_types, label_types, model_id + ): + lit_model = create_lit_model_from_endpoint( + _TEST_ENDPOINT_NAME, feature_types, label_types, model_id + ) + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) + + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict( + { + **label_types, + "feature_attribution": lit_types.FeatureSalience(signed=True), + } + ) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label", "feature_attribution"} + assert len(item.values()) == 2 + + def test_open_lit( + self, set_up_sequential, set_up_pandas_dataframe_and_columns, widget_render_mock + ): + pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns + lit_dataset = create_lit_dataset(pd_dataset, lit_columns) + feature_types, label_types, saved_model_path = set_up_sequential + lit_model = create_lit_model(saved_model_path, feature_types, label_types) + + open_lit({"model": lit_model}, {"dataset": lit_dataset}) + widget_render_mock.assert_called_once() + + def test_set_up_and_open_lit( + self, set_up_sequential, set_up_pandas_dataframe_and_columns, widget_render_mock + ): + pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns + feature_types, label_types, saved_model_path = set_up_sequential + lit_dataset, lit_model = set_up_and_open_lit( + pd_dataset, lit_columns, saved_model_path, feature_types, label_types + ) + + expected_examples = [ + {"feature_1": 1.0, "feature_2": 3.0, "label": 1.0}, + ] + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) - expected_examples = [ - {"feature_1": 1.0, "feature_2": 3.0, "label": 1.0}, - ] - test_inputs = [ - {"feature_1": 1.0, "feature_2": 2.0}, - ] - outputs = lit_model.predict_minibatch(test_inputs) + assert lit_dataset.spec() == dict(lit_columns) + assert expected_examples == lit_dataset._examples - assert lit_dataset.spec() == dict(lit_columns) - assert expected_examples == lit_dataset._examples + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict(label_types) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label"} + assert len(item.values()) == 1 - assert lit_model.input_spec() == dict(feature_types) - assert lit_model.output_spec() == dict( - {**label_types, "feature_attribution": lit_types.FeatureSalience(signed=True)} - ) - assert len(outputs) == 1 - for item in outputs: - assert item.keys() == {"label", "feature_attribution"} - assert len(item.values()) == 2 + widget_render_mock.assert_called_once() - widget_render_mock.assert_called_once() + @mock.patch.dict(os.environ, {"LIT_PROXY_URL": "auto"}) + @pytest.mark.usefixtures( + "sampled_shapley_explainer_mock", "load_model_from_local_path_mock" + ) + def test_set_up_and_open_lit_with_xai( + self, set_up_sequential, set_up_pandas_dataframe_and_columns, widget_render_mock + ): + pd_dataset, lit_columns = set_up_pandas_dataframe_and_columns + feature_types, label_types, saved_model_path = set_up_sequential + lit_dataset, lit_model = set_up_and_open_lit( + pd_dataset, lit_columns, saved_model_path, feature_types, label_types + ) + + expected_examples = [ + {"feature_1": 1.0, "feature_2": 3.0, "label": 1.0}, + ] + test_inputs = [ + {"feature_1": 1.0, "feature_2": 2.0}, + ] + outputs = lit_model.predict_minibatch(test_inputs) + + assert lit_dataset.spec() == dict(lit_columns) + assert expected_examples == lit_dataset._examples + + assert lit_model.input_spec() == dict(feature_types) + assert lit_model.output_spec() == dict( + { + **label_types, + "feature_attribution": lit_types.FeatureSalience(signed=True), + } + ) + assert len(outputs) == 1 + for item in outputs: + assert item.keys() == {"label", "feature_attribution"} + assert len(item.values()) == 2 + + widget_render_mock.assert_called_once() diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index 449d0348c31..df7d544d95d 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -53,6 +53,9 @@ ) from google.cloud import bigquery +from google.cloud import bigquery_storage + +from google.cloud.bigquery_storage_v1.types import stream as gcbqs_stream # project _TEST_PROJECT = "test-project" @@ -379,6 +382,30 @@ def bq_delete_dataset_mock(bq_init_client_mock): yield bq_delete_dataset_mock +@pytest.fixture +def bqs_client_mock(): + mock = MagicMock(bigquery_storage.BigQueryReadClient) + yield mock + + +@pytest.fixture +def bqs_init_client_mock(bqs_client_mock): + with patch.object(bigquery_storage, "BigQueryReadClient") as bqs_init_client_mock: + bqs_init_client_mock.return_value = bqs_client_mock + yield bqs_init_client_mock + + +@pytest.fixture +def bqs_create_read_session(bqs_init_client_mock): + with patch.object( + bigquery_storage.BigQueryReadClient, "create_read_session" + ) as bqs_create_read_session: + read_session_proto = gcbqs_stream.ReadSession() + read_session_proto.streams = [gcbqs_stream.ReadStream()] + bqs_create_read_session.return_value = read_session_proto + yield bqs_create_read_session + + # All Featurestore Mocks @pytest.fixture def get_featurestore_mock(): @@ -1070,41 +1097,7 @@ def test_validate_and_get_batch_read_feature_values_request( featurestore_name=_TEST_FEATURESTORE_NAME ) expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( - featurestore=my_featurestore.resource_name, - destination=gca_featurestore_service.FeatureValueDestination( - bigquery_destination=_TEST_BQ_DESTINATION, - ), - entity_type_specs=expected_entity_type_specs, - ) - assert ( - expected_batch_read_feature_values_request - == my_featurestore._validate_and_get_batch_read_feature_values_request( - serving_feature_ids=serving_feature_ids, - destination=_TEST_BQ_DESTINATION, - feature_destination_fields=feature_destination_fields, - ) - ) - - @pytest.mark.usefixtures("get_featurestore_mock") - def test_validate_and_get_batch_read_feature_values_request_with_read_instances( - self, - ): - aiplatform.init(project=_TEST_PROJECT) - my_featurestore = aiplatform.Featurestore( - featurestore_name=_TEST_FEATURESTORE_NAME - ) - expected_entity_type_specs = [ - _get_entity_type_spec_proto_with_feature_ids( - entity_type_id="my_entity_type_id_1", - feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], - ), - _get_entity_type_spec_proto_with_feature_ids( - entity_type_id="my_entity_type_id_2", - feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], - ), - ] - expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( - featurestore=my_featurestore.resource_name, + featurestore=_TEST_FEATURESTORE_NAME, destination=gca_featurestore_service.FeatureValueDestination( bigquery_destination=_TEST_BQ_DESTINATION, ), @@ -1114,60 +1107,54 @@ def test_validate_and_get_batch_read_feature_values_request_with_read_instances( assert ( expected_batch_read_feature_values_request == my_featurestore._validate_and_get_batch_read_feature_values_request( - serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + featurestore_name=my_featurestore.resource_name, + serving_feature_ids=serving_feature_ids, destination=_TEST_BQ_DESTINATION, read_instances=_TEST_BQ_SOURCE, + feature_destination_fields=feature_destination_fields, ) ) @pytest.mark.usefixtures("get_featurestore_mock") @pytest.mark.parametrize( - "read_instances, expected", + "read_instances_uri, expected_read_instances", [ (_TEST_BQ_SOURCE_URI, _TEST_BQ_SOURCE), - (_TEST_GCS_CSV_SOURCE_URIS, _TEST_CSV_SOURCE), (_TEST_GCS_CSV_SOURCE_URI, _TEST_CSV_SOURCE), ], ) - def test_get_read_instances(self, read_instances, expected): + def test_validate_and_get_read_instances( + self, read_instances_uri, expected_read_instances + ): aiplatform.init(project=_TEST_PROJECT) my_featurestore = aiplatform.Featurestore( featurestore_name=_TEST_FEATURESTORE_NAME ) - assert expected == my_featurestore._get_read_instances( - read_instances=read_instances - ) - - @pytest.mark.usefixtures("get_featurestore_mock") - @pytest.mark.parametrize( - "read_instances", - [[1, 2, 3, 4, 5], 1, (_TEST_GCS_CSV_SOURCE_URI, _TEST_GCS_CSV_SOURCE_URI)], - ) - def test_get_read_instances_with_raise_typeerror(self, read_instances): - aiplatform.init(project=_TEST_PROJECT) - my_featurestore = aiplatform.Featurestore( - featurestore_name=_TEST_FEATURESTORE_NAME + assert ( + expected_read_instances + == my_featurestore._validate_and_get_read_instances( + read_instances_uri=read_instances_uri + ) ) - with pytest.raises(TypeError): - my_featurestore._get_read_instances(read_instances=read_instances) @pytest.mark.usefixtures("get_featurestore_mock") @pytest.mark.parametrize( - "read_instances", + "read_instances_uri", [ "gcs://my_bucket/my_file_1.csv", - "bigquery://my_bucket/my_file_1.csv", + "bigquery://project.dataset.table_name", "my_bucket/my_file_1.csv", - [_TEST_BQ_SOURCE_URI], ], ) - def test_get_read_instances_with_raise_valueerror(self, read_instances): + def test_validate_and_get_read_instances_with_raise(self, read_instances_uri): aiplatform.init(project=_TEST_PROJECT) my_featurestore = aiplatform.Featurestore( featurestore_name=_TEST_FEATURESTORE_NAME ) with pytest.raises(ValueError): - my_featurestore._get_read_instances(read_instances=read_instances) + my_featurestore._validate_and_get_read_instances( + read_instances_uri=read_instances_uri + ) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_featurestore_mock") @@ -1194,11 +1181,13 @@ def test_batch_serve_to_bq(self, batch_read_feature_values_mock, sync): bigquery_destination=_TEST_BQ_DESTINATION, ), entity_type_specs=expected_entity_type_specs, + bigquery_read_instances=_TEST_BQ_SOURCE, ) my_featurestore.batch_serve_to_bq( bq_destination_output_uri=_TEST_BQ_DESTINATION_URI, serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + read_instances_uri=_TEST_BQ_SOURCE_URI, sync=sync, ) @@ -1235,12 +1224,14 @@ def test_batch_serve_to_gcs(self, batch_read_feature_values_mock, sync): tfrecord_destination=_TEST_TFRECORD_DESTINATION, ), entity_type_specs=expected_entity_type_specs, + csv_read_instances=_TEST_CSV_SOURCE, ) my_featurestore.batch_serve_to_gcs( gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_TFRECORD, serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + read_instances_uri=_TEST_GCS_CSV_SOURCE_URI, sync=sync, ) @@ -1265,8 +1256,75 @@ def test_batch_serve_to_gcs_with_invalid_gcs_destination_type(self): gcs_destination_output_uri_prefix=_TEST_GCS_OUTPUT_URI_PREFIX, gcs_destination_type=_TEST_GCS_DESTINATION_TYPE_INVALID, serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + read_instances_uri=_TEST_GCS_CSV_SOURCE_URI, ) + @pytest.mark.usefixtures( + "get_featurestore_mock", + "bq_init_client_mock", + "bq_init_dataset_mock", + "bq_create_dataset_mock", + "bq_load_table_from_dataframe_mock", + "bq_delete_dataset_mock", + "bqs_init_client_mock", + "bqs_create_read_session", + ) + @patch("uuid.uuid4", uuid_mock) + def test_batch_serve_to_df(self, batch_read_feature_values_mock): + aiplatform.init(project=_TEST_PROJECT) + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + + read_instances_df = pd.DataFrame() + + expected_temp_bq_dataset_name = f"temp_{_TEST_FEATURESTORE_ID}_{uuid.uuid4()}".replace( + "-", "_" + ) + expecte_temp_bq_dataset_id = f"{initializer.global_config.project}.{expected_temp_bq_dataset_name}"[ + :1024 + ] + expected_temp_bq_read_instances_table_id = ( + f"{expecte_temp_bq_dataset_id}.read_instances" + ) + expected_temp_bq_batch_serve_table_id = ( + f"{expecte_temp_bq_dataset_id}.batch_serve" + ) + + expected_entity_type_specs = [ + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_1", + feature_ids=["my_feature_id_1_1", "my_feature_id_1_2"], + ), + _get_entity_type_spec_proto_with_feature_ids( + entity_type_id="my_entity_type_id_2", + feature_ids=["my_feature_id_2_1", "my_feature_id_2_2"], + ), + ] + + expected_batch_read_feature_values_request = gca_featurestore_service.BatchReadFeatureValuesRequest( + featurestore=my_featurestore.resource_name, + destination=gca_featurestore_service.FeatureValueDestination( + bigquery_destination=gca_io.BigQueryDestination( + output_uri=f"bq://{expected_temp_bq_batch_serve_table_id}" + ), + ), + entity_type_specs=expected_entity_type_specs, + bigquery_read_instances=gca_io.BigQuerySource( + input_uri=f"bq://{expected_temp_bq_read_instances_table_id}" + ), + ) + + my_featurestore.batch_serve_to_df( + serving_feature_ids=_TEST_SERVING_FEATURE_IDS, + read_instances_df=read_instances_df, + ) + + batch_read_feature_values_mock.assert_called_once_with( + request=expected_batch_read_feature_values_request, + metadata=_TEST_REQUEST_METADATA, + ) + class TestEntityType: def setup_method(self): @@ -1503,6 +1561,7 @@ def test_validate_and_get_import_feature_values_request_with_source_fields(self) assert ( true_import_feature_values_request == my_entity_type._validate_and_get_import_feature_values_request( + entity_type_name=my_entity_type.resource_name, feature_ids=_TEST_IMPORTING_FEATURE_IDS, feature_time=_TEST_FEATURE_TIME_FIELD, data_source=_TEST_BQ_SOURCE, @@ -1529,6 +1588,7 @@ def test_validate_and_get_import_feature_values_request_without_source_fields(se assert ( true_import_feature_values_request == my_entity_type._validate_and_get_import_feature_values_request( + entity_type_name=my_entity_type.resource_name, feature_ids=_TEST_IMPORTING_FEATURE_IDS, feature_time=_TEST_FEATURE_TIME, data_source=_TEST_CSV_SOURCE, diff --git a/tests/unit/gapic/aiplatform_v1beta1/test_index_endpoint_service.py b/tests/unit/gapic/aiplatform_v1beta1/test_index_endpoint_service.py index b8abbab31d1..c90d804f646 100644 --- a/tests/unit/gapic/aiplatform_v1beta1/test_index_endpoint_service.py +++ b/tests/unit/gapic/aiplatform_v1beta1/test_index_endpoint_service.py @@ -43,6 +43,7 @@ ) from google.cloud.aiplatform_v1beta1.services.index_endpoint_service import pagers from google.cloud.aiplatform_v1beta1.services.index_endpoint_service import transports +from google.cloud.aiplatform_v1beta1.types import accelerator_type from google.cloud.aiplatform_v1beta1.types import index_endpoint from google.cloud.aiplatform_v1beta1.types import index_endpoint as gca_index_endpoint from google.cloud.aiplatform_v1beta1.types import index_endpoint_service