Skip to content

Commit e7c6861

Browse files
Black formatting
1 parent e228b89 commit e7c6861

File tree

5 files changed

+883
-741
lines changed

5 files changed

+883
-741
lines changed

src/sagemaker/feature_store/feature_group.py

Lines changed: 125 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class AthenaQuery:
9999
_result_file_prefix: str = attr.ib(init=False, default=None)
100100

101101
def run(
102-
self, query_string: str, output_location: str, kms_key: str = None, workgroup: str = None
102+
self, query_string: str, output_location: str, kms_key: str = None, workgroup: str = None
103103
) -> str:
104104
"""Execute a SQL query given a query string, output location and kms key.
105105
@@ -213,14 +213,14 @@ class IngestionManagerPandas:
213213

214214
@staticmethod
215215
def _ingest_single_batch(
216-
data_frame: DataFrame,
217-
feature_group_name: str,
218-
feature_definitions: Dict[str, Dict[Any, Any]],
219-
client_config: Config,
220-
start_index: int,
221-
end_index: int,
222-
target_stores: Sequence[TargetStoreEnum] = None,
223-
profile_name: str = None,
216+
data_frame: DataFrame,
217+
feature_group_name: str,
218+
feature_definitions: Dict[str, Dict[Any, Any]],
219+
client_config: Config,
220+
start_index: int,
221+
end_index: int,
222+
target_stores: Sequence[TargetStoreEnum] = None,
223+
profile_name: str = None,
224224
) -> List[int]:
225225
"""Ingest a single batch of DataFrame rows into FeatureStore.
226226
@@ -304,13 +304,13 @@ def wait(self, timeout=None):
304304

305305
@staticmethod
306306
def _ingest_row(
307-
data_frame: DataFrame,
308-
row: Iterable[tuple[Any, ...]],
309-
feature_group_name: str,
310-
feature_definitions: Dict[str, Dict[Any, Any]],
311-
sagemaker_fs_runtime_client: Session,
312-
failed_rows: List[int],
313-
target_stores: Sequence[TargetStoreEnum] = None,
307+
data_frame: DataFrame,
308+
row: Iterable[tuple[Any, ...]],
309+
feature_group_name: str,
310+
feature_definitions: Dict[str, Dict[Any, Any]],
311+
sagemaker_fs_runtime_client: Session,
312+
failed_rows: List[int],
313+
target_stores: Sequence[TargetStoreEnum] = None,
314314
):
315315
"""Ingest a single Dataframe row into FeatureStore.
316316
@@ -333,30 +333,39 @@ def _ingest_row(
333333
record = [
334334
FeatureValue(
335335
feature_name=data_frame.columns[index - 1],
336-
value_as_string_list=IngestionManagerPandas._covert_feature_value_to_string_list(row[index]),
337-
) if IngestionManagerPandas._is_feature_collection_type(
338-
feature_name=data_frame.columns[index - 1], feature_definitions=feature_definitions)
339-
else FeatureValue(
336+
value_as_string_list=IngestionManagerPandas._covert_feature_value_to_string_list(
337+
row[index]
338+
),
339+
)
340+
if IngestionManagerPandas._is_feature_collection_type(
340341
feature_name=data_frame.columns[index - 1],
341-
value_as_string=str(row[index]))
342+
feature_definitions=feature_definitions,
343+
)
344+
else FeatureValue(
345+
feature_name=data_frame.columns[index - 1], value_as_string=str(row[index])
346+
)
342347
for index in range(1, len(row))
343348
if IngestionManagerPandas._feature_value_is_not_none(feature_value=row[index])
344349
]
345350

346351
put_record_params = {
347-
'FeatureGroupName': feature_group_name,
348-
'Record': [value.to_dict() for value in record],
352+
"FeatureGroupName": feature_group_name,
353+
"Record": [value.to_dict() for value in record],
349354
}
350355
if target_stores:
351-
put_record_params['TargetStores'] = [target_store.value for target_store in target_stores]
356+
put_record_params["TargetStores"] = [
357+
target_store.value for target_store in target_stores
358+
]
352359

353360
sagemaker_fs_runtime_client.put_record(**put_record_params)
354361
except Exception as e: # pylint: disable=broad-except
355362
logger.error("Failed to ingest row %d: %s", row[0], e)
356363
failed_rows.append(row[0])
357364

358365
@staticmethod
359-
def _is_feature_collection_type(feature_name: str, feature_definitions: Dict[str, Dict[Any, Any]]):
366+
def _is_feature_collection_type(
367+
feature_name: str, feature_definitions: Dict[str, Dict[Any, Any]]
368+
):
360369
"""Check if the feature is a collection type.
361370
362371
Args:
@@ -372,11 +381,11 @@ def _is_feature_collection_type(feature_name: str, feature_definitions: Dict[str
372381
"""
373382
feature_definition = feature_definitions.get(feature_name)
374383
if feature_definition is not None:
375-
return feature_definition.get('CollectionType') is not None
384+
return feature_definition.get("CollectionType") is not None
376385

377386
@staticmethod
378387
def _feature_value_is_not_none(
379-
feature_value: Any,
388+
feature_value: Any,
380389
):
381390
"""Check if the feature value is not None.
382391
@@ -411,14 +420,15 @@ def _covert_feature_value_to_string_list(feature_value: List[Any]):
411420
List[str]: list of strings.
412421
"""
413422
if not is_list_like(feature_value):
414-
raise ValueError(f"Invalid feature value, feature value: {feature_value} for a collection type feature"
415-
f" must be an Array, but instead was {type(feature_value)}")
416-
return [
417-
str(value) if value is not None else None
418-
for value in feature_value
419-
]
423+
raise ValueError(
424+
f"Invalid feature value, feature value: {feature_value} for a collection type feature"
425+
f" must be an Array, but instead was {type(feature_value)}"
426+
)
427+
return [str(value) if value is not None else None for value in feature_value]
420428

421-
def _run_single_process_single_thread(self, data_frame: DataFrame, target_stores: Sequence[TargetStoreEnum] = None):
429+
def _run_single_process_single_thread(
430+
self, data_frame: DataFrame, target_stores: Sequence[TargetStoreEnum] = None
431+
):
422432
"""Ingest utilizing a single process and a single thread.
423433
424434
Args:
@@ -448,11 +458,11 @@ def _run_single_process_single_thread(self, data_frame: DataFrame, target_stores
448458
)
449459

450460
def _run_multi_process(
451-
self,
452-
data_frame: DataFrame,
453-
target_stores: Sequence[TargetStoreEnum] = None,
454-
wait=True,
455-
timeout=None
461+
self,
462+
data_frame: DataFrame,
463+
target_stores: Sequence[TargetStoreEnum] = None,
464+
wait=True,
465+
timeout=None,
456466
):
457467
"""Start the ingestion process with the specified number of processes.
458468
@@ -501,15 +511,15 @@ def init_worker():
501511

502512
@staticmethod
503513
def _run_multi_threaded(
504-
max_workers: int,
505-
feature_group_name: str,
506-
feature_definitions: Dict[str, Dict[Any, Any]],
507-
sagemaker_fs_runtime_client_config: Config,
508-
data_frame: DataFrame,
509-
target_stores: Sequence[TargetStoreEnum] = None,
510-
row_offset=0,
511-
timeout=None,
512-
profile_name=None,
514+
max_workers: int,
515+
feature_group_name: str,
516+
feature_definitions: Dict[str, Dict[Any, Any]],
517+
sagemaker_fs_runtime_client_config: Config,
518+
data_frame: DataFrame,
519+
target_stores: Sequence[TargetStoreEnum] = None,
520+
row_offset=0,
521+
timeout=None,
522+
profile_name=None,
513523
) -> List[int]:
514524
"""Start the ingestion process.
515525
@@ -563,7 +573,13 @@ def _run_multi_threaded(
563573

564574
return failed_indices
565575

566-
def run(self, data_frame: DataFrame, target_stores: Sequence[TargetStoreEnum] = None, wait=True, timeout=None):
576+
def run(
577+
self,
578+
data_frame: DataFrame,
579+
target_stores: Sequence[TargetStoreEnum] = None,
580+
wait=True,
581+
timeout=None,
582+
):
567583
"""Start the ingestion process.
568584
569585
Args:
@@ -575,9 +591,13 @@ def run(self, data_frame: DataFrame, target_stores: Sequence[TargetStoreEnum] =
575591
if timeout is reached.
576592
"""
577593
if self.max_workers == 1 and self.max_processes == 1 and self.profile_name is None:
578-
self._run_single_process_single_thread(data_frame=data_frame, target_stores=target_stores)
594+
self._run_single_process_single_thread(
595+
data_frame=data_frame, target_stores=target_stores
596+
)
579597
else:
580-
self._run_multi_process(data_frame=data_frame, target_stores=target_stores, wait=wait, timeout=timeout)
598+
self._run_multi_process(
599+
data_frame=data_frame, target_stores=target_stores, wait=wait, timeout=timeout
600+
)
581601

582602

583603
class IngestionError(Exception):
@@ -815,11 +835,11 @@ def update(
815835
)
816836

817837
def update_feature_metadata(
818-
self,
819-
feature_name: str,
820-
description: str = None,
821-
parameter_additions: Sequence[FeatureParameter] = None,
822-
parameter_removals: Sequence[str] = None,
838+
self,
839+
feature_name: str,
840+
description: str = None,
841+
parameter_additions: Sequence[FeatureParameter] = None,
842+
parameter_removals: Sequence[str] = None,
823843
) -> Dict[str, Any]:
824844
"""Update a feature metadata and add/remove metadata.
825845
@@ -904,22 +924,28 @@ def _determine_collection_list_type(series: Series) -> FeatureTypeEnum | None:
904924
feature type.
905925
"""
906926

907-
if (series.apply(lambda lst:
908-
all(isinstance(x, int) or pd.isna(x) for x in lst) if is_list_like(lst) else True)
909-
.all()):
927+
if series.apply(
928+
lambda lst: all(isinstance(x, int) or pd.isna(x) for x in lst)
929+
if is_list_like(lst)
930+
else True
931+
).all():
910932
return FeatureTypeEnum.INTEGRAL
911-
if (series.apply(lambda lst:
912-
all(isinstance(x, (float, int)) or pd.isna(x) for x in lst) if is_list_like(lst) else True)
913-
.all()):
933+
if series.apply(
934+
lambda lst: all(isinstance(x, (float, int)) or pd.isna(x) for x in lst)
935+
if is_list_like(lst)
936+
else True
937+
).all():
914938
return FeatureTypeEnum.FRACTIONAL
915-
if (series.apply(lambda lst:
916-
all(isinstance(x, str) or pd.isna(x) for x in lst) if is_list_like(lst) else True)
917-
.all()):
939+
if series.apply(
940+
lambda lst: all(isinstance(x, str) or pd.isna(x) for x in lst)
941+
if is_list_like(lst)
942+
else True
943+
).all():
918944
return FeatureTypeEnum.STRING
919945
return None
920946

921947
def _generate_feature_definition(
922-
self, series: Series, online_storage_type: OnlineStoreStorageTypeEnum
948+
self, series: Series, online_storage_type: OnlineStoreStorageTypeEnum
923949
) -> FeatureDefinition:
924950
"""Generate feature definition from the Panda Series.
925951
@@ -933,11 +959,11 @@ def _generate_feature_definition(
933959

934960
dtype = str(series.dtype).lower()
935961
if (
936-
online_storage_type
937-
and online_storage_type == OnlineStoreStorageTypeEnum.IN_MEMORY
938-
and dtype == "object"
939-
and pd.notna(series.head(1000)).any()
940-
and series.head(1000).apply(FeatureGroup._check_list_type).all()
962+
online_storage_type
963+
and online_storage_type == OnlineStoreStorageTypeEnum.IN_MEMORY
964+
and dtype == "object"
965+
and pd.notna(series.head(1000)).any()
966+
and series.head(1000).apply(FeatureGroup._check_list_type).all()
941967
):
942968
params["collection_type"] = ListCollectionType()
943969
params["feature_type"] = FeatureGroup._determine_collection_list_type(series.head(1000))
@@ -946,18 +972,15 @@ def _generate_feature_definition(
946972

947973
if params["feature_type"] is None:
948974
raise ValueError(
949-
f"Failed to infer Feature type based on dtype {dtype} "
950-
f"for column {series.name}."
975+
f"Failed to infer Feature type based on dtype {dtype} " f"for column {series.name}."
951976
)
952977

953978
feature_definition = FeatureDefinition(**params)
954979

955980
return feature_definition
956981

957982
def load_feature_definitions(
958-
self,
959-
data_frame: DataFrame,
960-
online_storage_type: OnlineStoreStorageTypeEnum = None
983+
self, data_frame: DataFrame, online_storage_type: OnlineStoreStorageTypeEnum = None
961984
) -> Sequence[FeatureDefinition]:
962985
"""Load feature definitions from a Pandas DataFrame.
963986
@@ -990,15 +1013,17 @@ def load_feature_definitions(
9901013
"""
9911014
feature_definitions = []
9921015
for column in data_frame:
993-
feature_definition = self._generate_feature_definition(data_frame[column], online_storage_type)
1016+
feature_definition = self._generate_feature_definition(
1017+
data_frame[column], online_storage_type
1018+
)
9941019
feature_definitions.append(feature_definition)
9951020
self.feature_definitions = feature_definitions
9961021
return self.feature_definitions
9971022

9981023
def get_record(
999-
self,
1000-
record_identifier_value_as_string: str,
1001-
feature_names: Sequence[str] = None,
1024+
self,
1025+
record_identifier_value_as_string: str,
1026+
feature_names: Sequence[str] = None,
10021027
) -> Sequence[Dict[str, str]]:
10031028
"""Get a single record in a FeatureGroup
10041029
@@ -1015,10 +1040,11 @@ def get_record(
10151040
).get("Record")
10161041

10171042
def put_record(
1018-
self,
1019-
record: Sequence[FeatureValue],
1020-
target_stores: Sequence[TargetStoreEnum] = None,
1021-
ttl_duration: TtlDuration = None):
1043+
self,
1044+
record: Sequence[FeatureValue],
1045+
target_stores: Sequence[TargetStoreEnum] = None,
1046+
ttl_duration: TtlDuration = None,
1047+
):
10221048
"""Put a single record in the FeatureGroup.
10231049
10241050
Args:
@@ -1030,15 +1056,17 @@ def put_record(
10301056
return self.sagemaker_session.put_record(
10311057
feature_group_name=self.name,
10321058
record=[value.to_dict() for value in record],
1033-
target_stores=[target_store.value for target_store in target_stores] if target_stores else None,
1059+
target_stores=[target_store.value for target_store in target_stores]
1060+
if target_stores
1061+
else None,
10341062
ttl_duration=ttl_duration.to_dict() if ttl_duration is not None else None,
10351063
)
10361064

10371065
def delete_record(
1038-
self,
1039-
record_identifier_value_as_string: str,
1040-
event_time: str,
1041-
deletion_mode: DeletionModeEnum = DeletionModeEnum.SOFT_DELETE,
1066+
self,
1067+
record_identifier_value_as_string: str,
1068+
event_time: str,
1069+
deletion_mode: DeletionModeEnum = DeletionModeEnum.SOFT_DELETE,
10421070
):
10431071
"""Delete a single record from a FeatureGroup.
10441072
@@ -1059,14 +1087,14 @@ def delete_record(
10591087
)
10601088

10611089
def ingest(
1062-
self,
1063-
data_frame: DataFrame,
1064-
target_stores: Sequence[TargetStoreEnum] = None,
1065-
max_workers: int = 1,
1066-
max_processes: int = 1,
1067-
wait: bool = True,
1068-
timeout: Union[int, float] = None,
1069-
profile_name: str = None,
1090+
self,
1091+
data_frame: DataFrame,
1092+
target_stores: Sequence[TargetStoreEnum] = None,
1093+
max_workers: int = 1,
1094+
max_processes: int = 1,
1095+
wait: bool = True,
1096+
timeout: Union[int, float] = None,
1097+
profile_name: str = None,
10701098
) -> IngestionManagerPandas:
10711099
"""Ingest the content of a pandas DataFrame to feature store.
10721100

0 commit comments

Comments
 (0)