diff --git a/deltacat/compute/converter/converter_session.py b/deltacat/compute/converter/converter_session.py index 02ee70bbc..80303fba0 100644 --- a/deltacat/compute/converter/converter_session.py +++ b/deltacat/compute/converter/converter_session.py @@ -49,7 +49,9 @@ def converter_session(params: ConverterSessionParams, **kwargs): iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name iceberg_namespace = params.iceberg_namespace merge_keys = params.merge_keys - compact_small_files = params.compact_small_files + compact_previous_position_delete_files = ( + params.compact_previous_position_delete_files + ) task_max_parallelism = params.task_max_parallelism s3_client_kwargs = params.s3_client_kwargs s3_file_system = params.s3_file_system @@ -103,7 +105,7 @@ def convert_input_provider(index, item): convert_task_index=index, iceberg_table_warehouse_prefix=iceberg_table_warehouse_prefix, identifier_fields=identifier_fields, - compact_small_files=compact_small_files, + compact_previous_position_delete_files=compact_previous_position_delete_files, table_io=iceberg_table.io, table_metadata=iceberg_table.metadata, enforce_primary_key_uniqueness=enforce_primary_key_uniqueness, @@ -114,7 +116,7 @@ def convert_input_provider(index, item): ) } - logger.info((f"Getting remote convert tasks...")) + logger.info(f"Getting remote convert tasks...") # Ray remote task: convert # TODO: Add split mechanism to split large buckets convert_tasks_pending = invoke_parallel( @@ -131,12 +133,36 @@ def convert_input_provider(index, item): convert_results = ray.get(convert_tasks_pending) logger.info(f"Got {len(convert_tasks_pending)} convert tasks.") + total_position_delete_record_count = sum( + convert_result.position_delete_record_count + for convert_result in convert_results + ) + total_input_data_file_record_count = sum( + convert_result.input_data_files_record_count + for convert_result in convert_results + ) + total_data_file_hash_columns_in_memory_sizes = sum( + convert_result.input_data_files_hash_columns_in_memory_sizes + for convert_result in convert_results + ) + total_position_delete_file_in_memory_sizes = sum( + convert_result.position_delete_in_memory_sizes + for convert_result in convert_results + ) + total_position_delete_on_disk_sizes = sum( + convert_result.position_delete_on_disk_sizes + for convert_result in convert_results + ) + to_be_added_files_list = [] for convert_result in convert_results: - to_be_deleted_files_list.extend(convert_result[0].values()) - to_be_added_files_list.extend(convert_result[1]) + to_be_added_files = convert_result.to_be_added_files + to_be_deleted_files = convert_result.to_be_deleted_files + + to_be_deleted_files_list.extend(to_be_deleted_files.values()) + to_be_added_files_list.extend(to_be_added_files) - if not to_be_deleted_files_list: + if not to_be_deleted_files_list and to_be_added_files_list: commit_append_snapshot( iceberg_table=iceberg_table, new_position_delete_files=to_be_added_files_list, @@ -147,4 +173,13 @@ def convert_input_provider(index, item): to_be_deleted_files_list=to_be_deleted_files_list, new_position_delete_files=to_be_added_files_list, ) + logger.info( + f"Aggregated stats for {table_name}: " + f"total position delete record count: {total_position_delete_record_count}, " + f"total input data file record_count: {total_input_data_file_record_count}, " + f"total data file hash columns in memory sizes: {total_data_file_hash_columns_in_memory_sizes}, " + f"total position delete file in memory sizes: {total_position_delete_file_in_memory_sizes}, " + f"total position delete file on disk sizes: {total_position_delete_on_disk_sizes}." + ) + logger.info(f"Committed new Iceberg snapshot.") diff --git a/deltacat/compute/converter/model/convert_input.py b/deltacat/compute/converter/model/convert_input.py index 8b2e0e3d0..3c2b790af 100644 --- a/deltacat/compute/converter/model/convert_input.py +++ b/deltacat/compute/converter/model/convert_input.py @@ -12,7 +12,7 @@ def of( identifier_fields, table_io, table_metadata, - compact_small_files, + compact_previous_position_delete_files, enforce_primary_key_uniqueness, position_delete_for_multiple_data_files, max_parallel_data_file_download, @@ -27,7 +27,9 @@ def of( result["iceberg_table_warehouse_prefix"] = iceberg_table_warehouse_prefix result["table_io"] = table_io result["table_metadata"] = table_metadata - result["compact_small_files"] = compact_small_files + result[ + "compact_previous_position_delete_files" + ] = compact_previous_position_delete_files result["enforce_primary_key_uniqueness"] = enforce_primary_key_uniqueness result[ "position_delete_for_multiple_data_files" @@ -63,8 +65,8 @@ def table_metadata(self): return self["table_metadata"] @property - def compact_small_files(self) -> bool: - return self["compact_small_files"] + def compact_previous_position_delete_files(self) -> bool: + return self["compact_previous_position_delete_files"] @property def enforce_primary_key_uniqueness(self) -> bool: diff --git a/deltacat/compute/converter/model/convert_result.py b/deltacat/compute/converter/model/convert_result.py new file mode 100644 index 000000000..611e6e1e0 --- /dev/null +++ b/deltacat/compute/converter/model/convert_result.py @@ -0,0 +1,61 @@ +from __future__ import annotations +from typing import Dict + + +class ConvertResult(Dict): + @staticmethod + def of( + convert_task_index, + to_be_added_files, + to_be_deleted_files, + position_delete_record_count, + input_data_files_record_count, + input_data_files_hash_columns_in_memory_sizes, + position_delete_in_memory_sizes, + position_delete_on_disk_sizes, + ) -> ConvertResult: + + result = ConvertResult() + result["convert_task_index"] = convert_task_index + result["to_be_added_files"] = to_be_added_files + result["to_be_deleted_files"] = to_be_deleted_files + result["position_delete_record_count"] = position_delete_record_count + result["input_data_files_record_count"] = input_data_files_record_count + result[ + "input_data_files_hash_columns_in_memory_sizes" + ] = input_data_files_hash_columns_in_memory_sizes + result["position_delete_in_memory_sizes"] = position_delete_in_memory_sizes + result["position_delete_on_disk_sizes"] = position_delete_on_disk_sizes + return result + + @property + def convert_task_index(self) -> int: + return self["convert_task_index"] + + @property + def to_be_added_files(self): + return self["to_be_added_files"] + + @property + def to_be_deleted_files(self): + return self["to_be_deleted_files"] + + @property + def position_delete_record_count(self): + return self["position_delete_record_count"] + + @property + def input_data_files_record_count(self): + return self["input_data_files_record_count"] + + @property + def input_data_files_hash_columns_in_memory_sizes(self): + return self["input_data_files_hash_columns_in_memory_sizes"] + + @property + def position_delete_in_memory_sizes(self): + return self["position_delete_in_memory_sizes"] + + @property + def position_delete_on_disk_sizes(self): + return self["position_delete_on_disk_sizes"] diff --git a/deltacat/compute/converter/model/converter_session_params.py b/deltacat/compute/converter/model/converter_session_params.py index 6ee2de4b8..f52938312 100644 --- a/deltacat/compute/converter/model/converter_session_params.py +++ b/deltacat/compute/converter/model/converter_session_params.py @@ -28,7 +28,9 @@ def of(params: Optional[Dict]) -> ConverterSessionParams: result.enforce_primary_key_uniqueness = params.get( "enforce_primary_key_uniqueness", False ) - result.compact_small_files = params.get("compact_small_files", False) + result.compact_previous_position_delete_files = params.get( + "compact_previous_position_delete_files", False + ) # For Iceberg v3 spec, option to produce delete vector that can establish 1:1 mapping with data files. result.position_delete_for_multiple_data_files = params.get( @@ -73,12 +75,16 @@ def enforce_primary_key_uniqueness(self, enforce_primary_key_uniqueness) -> None self["enforce_primary_key_uniqueness"] = enforce_primary_key_uniqueness @property - def compact_small_files(self) -> bool: - return self["compact_small_files"] + def compact_previous_position_delete_files(self) -> bool: + return self["compact_previous_position_delete_files"] - @compact_small_files.setter - def compact_small_files(self, compact_small_files) -> None: - self["compact_small_files"] = compact_small_files + @compact_previous_position_delete_files.setter + def compact_previous_position_delete_files( + self, compact_previous_position_delete_files + ) -> None: + self[ + "compact_previous_position_delete_files" + ] = compact_previous_position_delete_files @property def position_delete_for_multiple_data_files(self) -> bool: diff --git a/deltacat/compute/converter/pyiceberg/overrides.py b/deltacat/compute/converter/pyiceberg/overrides.py index e98432143..e6c097f01 100644 --- a/deltacat/compute/converter/pyiceberg/overrides.py +++ b/deltacat/compute/converter/pyiceberg/overrides.py @@ -161,6 +161,7 @@ def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict): schema = table_metadata.schema() for partition_value, file_paths in files_dict.items(): for file_path in file_paths: + logger.info(f"DEBUG_file_path:{file_path}") input_file = io.new_input(file_path) with input_file.open() as input_stream: parquet_metadata = pq.read_metadata(input_stream) @@ -239,9 +240,10 @@ def fetch_all_bucket_files(table): file_sequence_number = manifest_entry.sequence_number data_file_tuple = (file_sequence_number, data_file) partition_value = data_file.partition + if data_file.content == DataFileContent.DATA: data_entries[partition_value].append(data_file_tuple) - if data_file.content == DataFileContent.POSITION_DELETES: + elif data_file.content == DataFileContent.POSITION_DELETES: positional_delete_entries[partition_value].append(data_file_tuple) elif data_file.content == DataFileContent.EQUALITY_DELETES: equality_data_entries[partition_value].append(data_file_tuple) diff --git a/deltacat/compute/converter/steps/convert.py b/deltacat/compute/converter/steps/convert.py index ab60ee9dc..df258a532 100644 --- a/deltacat/compute/converter/steps/convert.py +++ b/deltacat/compute/converter/steps/convert.py @@ -18,6 +18,7 @@ from deltacat.compute.converter.pyiceberg.overrides import ( parquet_files_dict_to_iceberg_data_files, ) +from deltacat.compute.converter.model.convert_result import ConvertResult from deltacat import logs logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) @@ -31,7 +32,9 @@ def convert(convert_input: ConvertInput): identifier_fields = convert_input.identifier_fields table_io = convert_input.table_io table_metadata = convert_input.table_metadata - compact_small_files = convert_input.compact_small_files + compact_previous_position_delete_files = ( + convert_input.compact_previous_position_delete_files + ) position_delete_for_multiple_data_files = ( convert_input.position_delete_for_multiple_data_files ) @@ -42,7 +45,7 @@ def convert(convert_input: ConvertInput): raise NotImplementedError( f"Distributed file level position delete compute is not supported yet" ) - if compact_small_files: + if compact_previous_position_delete_files: raise NotImplementedError(f"Compact previous position delete not supported yet") logger.info(f"Starting convert task index: {convert_task_index}") @@ -57,6 +60,7 @@ def convert(convert_input: ConvertInput): convert_input_files.partition_value ) partition_value = convert_input_files.partition_value + if partition_value_str: iceberg_table_warehouse_prefix_with_partition = ( f"{iceberg_table_warehouse_prefix}/{partition_value_str}" @@ -75,6 +79,7 @@ def convert(convert_input: ConvertInput): identifier_columns=identifier_fields, equality_delete_files_list=applicable_equality_delete_files, iceberg_table_warehouse_prefix_with_partition=iceberg_table_warehouse_prefix_with_partition, + convert_task_index=convert_task_index, max_parallel_data_file_download=max_parallel_data_file_download, s3_file_system=s3_file_system, s3_client_kwargs=s3_client_kwargs, @@ -87,36 +92,50 @@ def convert(convert_input: ConvertInput): all_data_files=all_data_files_for_this_bucket, data_files_downloaded=applicable_data_files, ) - logger.info(f"Got {len(data_files_to_dedupe)} files to dedupe.") - pos_delete_after_dedupe = dedupe_data_files( + logger.info( + f"[Convert task {convert_task_index}]: Got {len(data_files_to_dedupe)} files to dedupe." + ) + ( + pos_delete_after_dedupe, + data_file_to_dedupe_record_count, + data_file_to_dedupe_size, + ) = dedupe_data_files( data_file_to_dedupe=data_files_to_dedupe, identifier_columns=identifier_fields, merge_sort_column=sc._ORDERED_RECORD_IDX_COLUMN_NAME, s3_client_kwargs=s3_client_kwargs, ) + logger.info( + f"[Convert task {convert_task_index}]: Dedupe produced {len(pos_delete_after_dedupe)} position delete records." + ) total_pos_delete_table.append(pos_delete_after_dedupe) total_pos_delete = pa.concat_tables(total_pos_delete_table) - logger.info(f"Total position delete produced:{len(total_pos_delete)}") - to_be_added_files_list_parquet = upload_table_with_retry( - table=total_pos_delete, - s3_url_prefix=iceberg_table_warehouse_prefix_with_partition, - s3_table_writer_kwargs={}, - s3_file_system=s3_file_system, + logger.info( + f"[Convert task {convert_task_index}]: Total position delete produced:{len(total_pos_delete)}" ) - to_be_added_files_dict = defaultdict() - to_be_added_files_dict[partition_value] = to_be_added_files_list_parquet + to_be_added_files_list = [] + if total_pos_delete: + to_be_added_files_list_parquet = upload_table_with_retry( + table=total_pos_delete, + s3_url_prefix=iceberg_table_warehouse_prefix_with_partition, + s3_table_writer_kwargs={}, + s3_file_system=s3_file_system, + ) - logger.info( - f"Produced {len(to_be_added_files_list_parquet)} position delete files." - ) - to_be_added_files_list = parquet_files_dict_to_iceberg_data_files( - io=table_io, - table_metadata=table_metadata, - files_dict=to_be_added_files_dict, - ) + to_be_added_files_dict = defaultdict() + to_be_added_files_dict[partition_value] = to_be_added_files_list_parquet + + logger.info( + f"[Convert task {convert_task_index}]: Produced {len(to_be_added_files_list_parquet)} position delete files." + ) + to_be_added_files_list = parquet_files_dict_to_iceberg_data_files( + io=table_io, + table_metadata=table_metadata, + files_dict=to_be_added_files_dict, + ) to_be_delete_files_dict = defaultdict() if applicable_equality_delete_files: @@ -125,7 +144,19 @@ def convert(convert_input: ConvertInput): for equality_delete_file in applicable_equality_delete_files ] - return (to_be_delete_files_dict, to_be_added_files_list) + convert_res = ConvertResult.of( + convert_task_index=convert_task_index, + to_be_added_files=to_be_added_files_list, + to_be_deleted_files=to_be_delete_files_dict, + position_delete_record_count=len(total_pos_delete), + input_data_files_record_count=data_file_to_dedupe_record_count, + input_data_files_hash_columns_in_memory_sizes=data_file_to_dedupe_size, + position_delete_in_memory_sizes=int(total_pos_delete.nbytes), + position_delete_on_disk_sizes=sum( + file.file_size_in_bytes for file in to_be_added_files_list + ), + ) + return convert_res def get_additional_applicable_data_files(all_data_files, data_files_downloaded): @@ -145,9 +176,6 @@ def filter_rows_to_be_deleted( equality_delete_table[identifier_column], ) position_delete_table = data_file_table.filter(equality_deletes) - logger.info( - f"length_pos_delete_table, {len(position_delete_table)}, length_data_table:{len(data_file_table)}" - ) return position_delete_table @@ -172,23 +200,12 @@ def compute_pos_delete_converting_equality_deletes( return new_position_delete_table -def download_bucketed_table(data_files, equality_delete_files): - from deltacat.utils.pyarrow import s3_file_to_table - - compacted_table = s3_file_to_table( - [data_file.file_path for data_file in data_files] - ) - equality_delete_table = s3_file_to_table( - [eq_file.file_path for eq_file in equality_delete_files] - ) - return compacted_table, equality_delete_table - - def compute_pos_delete_with_limited_parallelism( data_files_list, identifier_columns, equality_delete_files_list, iceberg_table_warehouse_prefix_with_partition, + convert_task_index, max_parallel_data_file_download, s3_file_system, s3_client_kwargs, @@ -229,10 +246,14 @@ def compute_pos_delete_with_limited_parallelism( s3_file_system=s3_file_system, s3_client_kwargs=s3_client_kwargs, ) - if not new_pos_delete_table: - logger.info("No records deleted based on equality delete convertion") logger.info( - f"Number of records to delete based on equality delete convertion:{len(new_pos_delete_table)}" + f"[Convert task {convert_task_index}]: Find deletes got {len(data_table_total)} data table records, " + f"{len(equality_delete_table_total)} equality deletes as input, " + f"Produced {len(new_pos_delete_table)} position deletes based off find deletes input." ) + + if not new_pos_delete_table: + logger.info("No records deleted based on equality delete convertion") + return new_pos_delete_table diff --git a/deltacat/compute/converter/steps/dedupe.py b/deltacat/compute/converter/steps/dedupe.py index 64433f9d4..577280874 100644 --- a/deltacat/compute/converter/steps/dedupe.py +++ b/deltacat/compute/converter/steps/dedupe.py @@ -18,6 +18,7 @@ def dedupe_data_files( ): data_file_table = [] + downloaded_data_file_record_count = 0 # Sort data files by file sequence number first data_file_to_dedupe = sorted(data_file_to_dedupe, key=lambda f: f[0]) for file_tuple in data_file_to_dedupe: @@ -33,10 +34,16 @@ def dedupe_data_files( sequence_number=sequence_number, s3_client_kwargs=s3_client_kwargs, ) + downloaded_data_file_record_count += len(data_file_to_dedupe_table) data_file_table.append(data_file_to_dedupe_table) final_data_to_dedupe = pa.concat_tables(data_file_table) + assert len(final_data_to_dedupe) == downloaded_data_file_record_count, ( + f"Mismatch record count while performing table concat, Got {len(final_data_to_dedupe)} in final table, " + f"while input table length is: {downloaded_data_file_record_count}" + ) + logger.info(f"Length of pyarrow table to dedupe:{len(final_data_to_dedupe)}") record_idx_iterator = iter(range(len(final_data_to_dedupe))) @@ -64,4 +71,11 @@ def dedupe_data_files( final_data_table_to_delete = final_data_table_to_delete.drop( [sc._IDENTIFIER_COLUMNS_HASH_COLUMN_NAME, sc._GLOBAL_RECORD_IDX_COLUMN_NAME] ) - return final_data_table_to_delete + logger.info( + f"Deduped {len(final_data_table_to_delete)} Records based off identifier columns." + ) + return ( + final_data_table_to_delete, + len(final_data_to_dedupe), + int(final_data_to_dedupe.nbytes), + ) diff --git a/deltacat/compute/converter/utils/io.py b/deltacat/compute/converter/utils/io.py index f4132986f..36f33d756 100644 --- a/deltacat/compute/converter/utils/io.py +++ b/deltacat/compute/converter/utils/io.py @@ -53,6 +53,7 @@ def download_parquet_with_daft_hash_applied( io_config=io_config, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit, ) + hash_column = concatenate_hashed_identifier_columns( df=df, identifier_columns=identifier_columns ) @@ -75,17 +76,39 @@ def daft_read_parquet(path, io_config, coerce_int96_timestamp_unit): def concatenate_hashed_identifier_columns(df, identifier_columns): pk_hash_columns = [] - + previous_hash_column_length = None for i in range(len(identifier_columns)): pk_hash_column = df.select(daft.col(identifier_columns[i]).hash()) pk_hash_column_arrow = pk_hash_column.to_arrow() - # Converted int16 to string here - pk_hash_columns.append( - sliced_string_cast(pk_hash_column_arrow[identifier_columns[i]]) + + # Assert that each hash column downloaded are same length to ensure we don't create mismatch between columns. + if not previous_hash_column_length: + previous_hash_column_length = len(pk_hash_column_arrow) + else: + assert previous_hash_column_length == len(pk_hash_column_arrow), ( + f"Identifier column Length mismatch: {identifier_columns[i]} has length {len(pk_hash_column_arrow)} " + f"but expected {previous_hash_column_length}." + ) + previous_hash_column_length = len(pk_hash_column_arrow) + + # Convert identifier from different datatypes to string here + pk_hash_column_str = sliced_string_cast( + pk_hash_column_arrow[identifier_columns[i]] + ) + assert len(pk_hash_column_str) == previous_hash_column_length, ( + f"Casting column Length mismatch: {identifier_columns[i]} has length {len(pk_hash_column_str)} after casting, " + f"before casting length: {previous_hash_column_length}." ) + + pk_hash_columns.append(pk_hash_column_str) + pk_hash_columns.append(IDENTIFIER_FIELD_DELIMITER) pk_hash_columns_concatenated = pc.binary_join_element_wise( *pk_hash_columns, null_handling="replace" ) + assert len(pk_hash_columns_concatenated) == previous_hash_column_length, ( + f"Concatenated column Length mismatch: Final concatenated identifier column has length {len(pk_hash_columns_concatenated)}, " + f"before concatenating length: {previous_hash_column_length}." + ) return pk_hash_columns_concatenated diff --git a/deltacat/tests/compute/converter/test_convert_session.py b/deltacat/tests/compute/converter/test_convert_session.py index c65ea659c..6827d9ee1 100644 --- a/deltacat/tests/compute/converter/test_convert_session.py +++ b/deltacat/tests/compute/converter/test_convert_session.py @@ -245,7 +245,7 @@ def test_converter_drop_duplicates_success( identifier_fields=["primary_key"], table_io=tbl.io, table_metadata=tbl.metadata, - compact_small_files=False, + compact_previous_position_delete_files=False, enforce_primary_key_uniqueness=True, position_delete_for_multiple_data_files=True, max_parallel_data_file_download=10, @@ -291,10 +291,10 @@ def test_converter_drop_duplicates_success( to_be_added_files_list = [] # Check if there're files to delete - if convert_result[0]: - to_be_deleted_files_list.extend(convert_result[0].values()) - if convert_result[1]: - to_be_added_files_list.extend(convert_result[1]) + if convert_result.to_be_deleted_files: + to_be_deleted_files_list.extend(convert_result.to_be_deleted_files.values()) + if convert_result.to_be_added_files: + to_be_added_files_list.extend(convert_result.to_be_added_files) commit_append_snapshot( iceberg_table=tbl, @@ -410,7 +410,7 @@ def test_converter_pos_delete_read_by_spark_success( identifier_fields=["primary_key"], table_io=tbl.io, table_metadata=tbl.metadata, - compact_small_files=False, + compact_previous_position_delete_files=False, enforce_primary_key_uniqueness=True, position_delete_for_multiple_data_files=True, max_parallel_data_file_download=10, @@ -445,10 +445,10 @@ def test_converter_pos_delete_read_by_spark_success( to_be_added_files_list = [] convert_result = ray.get(convert_ref) - if convert_result[0]: - to_be_deleted_files_list.extend(convert_result[0].values()) - if convert_result[1]: - to_be_added_files_list.extend(convert_result[1]) + if convert_result.to_be_deleted_files: + to_be_deleted_files_list.extend(convert_result.to_be_deleted_files.values()) + if convert_result.to_be_added_files: + to_be_added_files_list.extend(convert_result.to_be_added_files) # 4. Commit position delete, delete equality deletes from table commit_append_snapshot( @@ -564,7 +564,7 @@ def test_converter_pos_delete_multiple_identifier_fields_success( identifier_fields=["primary_key1", "primary_key2"], table_io=tbl.io, table_metadata=tbl.metadata, - compact_small_files=False, + compact_previous_position_delete_files=False, enforce_primary_key_uniqueness=True, position_delete_for_multiple_data_files=True, max_parallel_data_file_download=10, @@ -607,10 +607,10 @@ def test_converter_pos_delete_multiple_identifier_fields_success( to_be_added_files_list = [] convert_result = ray.get(convert_ref) - if convert_result[0]: - to_be_deleted_files_list.extend(convert_result[0].values()) - if convert_result[1]: - to_be_added_files_list.extend(convert_result[1]) + if convert_result.to_be_deleted_files: + to_be_deleted_files_list.extend(convert_result.to_be_deleted_files.values()) + if convert_result.to_be_added_files: + to_be_added_files_list.extend(convert_result.to_be_added_files) # 4. Commit position delete, delete equality deletes from table