diff --git a/deltacat/compute/converter/constants.py b/deltacat/compute/converter/constants.py index 77e4db2be..a78912b29 100644 --- a/deltacat/compute/converter/constants.py +++ b/deltacat/compute/converter/constants.py @@ -2,3 +2,5 @@ # Safe limit ONLY considering CPU limit, typically 32 for a 8x-large worker DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD = 30 + +DEFAULT_ICEBERG_NAMESPACE = "default" diff --git a/deltacat/compute/converter/converter_session.py b/deltacat/compute/converter/converter_session.py index 93486356f..236a2aaa9 100644 --- a/deltacat/compute/converter/converter_session.py +++ b/deltacat/compute/converter/converter_session.py @@ -1,4 +1,3 @@ -# from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties from deltacat.utils.ray_utils.concurrency import ( invoke_parallel, task_resource_options_provider, @@ -48,6 +47,18 @@ def converter_session(params: ConverterSessionParams, **kwargs): table_name = params.iceberg_table_name iceberg_table = load_table(catalog, table_name) enforce_primary_key_uniqueness = params.enforce_primary_key_uniqueness + iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name + iceberg_namespace = params.iceberg_namespace + merge_keys = params.merge_keys + compact_small_files = params.compact_small_files + task_max_parallelism = params.task_max_parallelism + s3_client_kwargs = params.s3_client_kwargs + s3_file_system = params.s3_file_system + location_provider_prefix_override = params.location_provider_prefix_override + position_delete_for_multiple_data_files = ( + params.position_delete_for_multiple_data_files + ) + data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files( iceberg_table ) @@ -56,14 +67,16 @@ def converter_session(params: ConverterSessionParams, **kwargs): equality_delete_dict=equality_delete_dict, pos_delete_dict=pos_delete_dict, ) - iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name - iceberg_namespace = params.iceberg_namespace - iceberg_table_warehouse_prefix = construct_iceberg_table_prefix( - iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, - table_name=table_name, - iceberg_namespace=iceberg_namespace, - ) - merge_keys = params.merge_keys + + if not location_provider_prefix_override: + iceberg_table_warehouse_prefix = construct_iceberg_table_prefix( + iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name, + table_name=table_name, + iceberg_namespace=iceberg_namespace, + ) + else: + iceberg_table_warehouse_prefix = location_provider_prefix_override + # Using table identifier fields as merge keys if merge keys not provided if not merge_keys: identifier_fields_set = iceberg_table.schema().identifier_field_names() @@ -86,16 +99,10 @@ def converter_session(params: ConverterSessionParams, **kwargs): # Note that approach 2 will ideally require shared object store to avoid download equality delete files * number of child tasks times. max_parallel_data_file_download = DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD - compact_small_files = params.compact_small_files - position_delete_for_multiple_data_files = ( - params.position_delete_for_multiple_data_files - ) - task_max_parallelism = params.task_max_parallelism - def convert_input_provider(index, item): return { "convert_input": ConvertInput.of( - files_for_each_bucket=item, + convert_input_files=item, convert_task_index=index, iceberg_table_warehouse_prefix=iceberg_table_warehouse_prefix, identifier_fields=identifier_fields, @@ -103,6 +110,8 @@ def convert_input_provider(index, item): enforce_primary_key_uniqueness=enforce_primary_key_uniqueness, position_delete_for_multiple_data_files=position_delete_for_multiple_data_files, max_parallel_data_file_download=max_parallel_data_file_download, + s3_client_kwargs=s3_client_kwargs, + s3_file_system=s3_file_system, ) } @@ -110,7 +119,7 @@ def convert_input_provider(index, item): # Assuming that memory consume by each bucket doesn't exceed one node's memory limit. # TODO: Add split mechanism to split large buckets convert_tasks_pending = invoke_parallel( - items=convert_input_files_for_all_buckets.items(), + items=convert_input_files_for_all_buckets, ray_task=convert, max_parallelism=task_max_parallelism, options_provider=convert_options_provider, diff --git a/deltacat/compute/converter/model/convert_input.py b/deltacat/compute/converter/model/convert_input.py index ec3d91307..957ba2fbb 100644 --- a/deltacat/compute/converter/model/convert_input.py +++ b/deltacat/compute/converter/model/convert_input.py @@ -15,6 +15,7 @@ def of( position_delete_for_multiple_data_files, max_parallel_data_file_download, s3_file_system, + s3_client_kwargs, ) -> ConvertInput: result = ConvertInput() @@ -29,6 +30,7 @@ def of( ] = position_delete_for_multiple_data_files result["max_parallel_data_file_download"] = max_parallel_data_file_download result["s3_file_system"] = s3_file_system + result["s3_client_kwargs"] = s3_client_kwargs return result @@ -67,3 +69,7 @@ def max_parallel_data_file_download(self) -> int: @property def s3_file_system(self): return self["s3_file_system"] + + @property + def s3_client_kwargs(self): + return self["s3_client_kwargs"] diff --git a/deltacat/compute/converter/model/converter_session_params.py b/deltacat/compute/converter/model/converter_session_params.py index 0cc2e2269..3a6c790cf 100644 --- a/deltacat/compute/converter/model/converter_session_params.py +++ b/deltacat/compute/converter/model/converter_session_params.py @@ -1,6 +1,10 @@ from __future__ import annotations from typing import Optional, Dict -from deltacat.compute.converter.constants import DEFAULT_CONVERTER_TASK_MAX_PARALLELISM +from deltacat.compute.converter.constants import ( + DEFAULT_CONVERTER_TASK_MAX_PARALLELISM, + DEFAULT_ICEBERG_NAMESPACE, +) +from fsspec import AbstractFileSystem class ConverterSessionParams(dict): @@ -18,11 +22,11 @@ def of(params: Optional[Dict]) -> ConverterSessionParams: assert ( params.get("iceberg_warehouse_bucket_name") is not None ), "iceberg_warehouse_bucket_name is a required arg" - assert ( - params.get("iceberg_namespace") is not None - ), "iceberg_namespace is a required arg" result = ConverterSessionParams(params) + result.iceberg_namespace = params.get( + "iceberg_namespace", DEFAULT_ICEBERG_NAMESPACE + ) result.enforce_primary_key_uniqueness = params.get( "enforce_primary_key_uniqueness", False ) @@ -36,6 +40,10 @@ def of(params: Optional[Dict]) -> ConverterSessionParams: "task_max_parallelism", DEFAULT_CONVERTER_TASK_MAX_PARALLELISM ) result.merge_keys = params.get("merge_keys", None) + result.s3_client_kwargs = params.get("s3_client_kwargs", {}) + result.s3_file_system = params.get("s3_file_system", None) + result.s3_prefix_override = params.get("s3_prefix_override", None) + return result @property @@ -54,6 +62,10 @@ def iceberg_warehouse_bucket_name(self) -> str: def iceberg_namespace(self) -> str: return self["iceberg_namespace"] + @iceberg_namespace.setter + def iceberg_namespace(self, iceberg_namespace) -> None: + self["iceberg_namespace"] = iceberg_namespace + @property def enforce_primary_key_uniqueness(self) -> bool: return self["enforce_primary_key_uniqueness"] @@ -97,3 +109,29 @@ def merge_keys(self) -> str: @merge_keys.setter def merge_keys(self, merge_keys) -> None: self["merge_keys"] = merge_keys + + @property + def s3_client_kwargs(self) -> Dict: + return self["s3_client_kwargs"] + + @s3_client_kwargs.setter + def s3_client_kwargs(self, s3_client_kwargs) -> None: + self["s3_client_kwargs"] = s3_client_kwargs + + @property + def s3_file_system(self) -> AbstractFileSystem: + return self["s3_file_system"] + + @s3_file_system.setter + def s3_file_system(self, s3_file_system) -> None: + self["s3_file_system"] = s3_file_system + + @property + def location_provider_prefix_override(self) -> str: + return self["location_provider_prefix_override"] + + @location_provider_prefix_override.setter + def location_provider_prefix_override( + self, location_provider_prefix_override + ) -> None: + self["location_provider_prefix_override"] = location_provider_prefix_override diff --git a/deltacat/compute/converter/pyiceberg/overrides.py b/deltacat/compute/converter/pyiceberg/overrides.py index 8b3aae1b2..243f127da 100644 --- a/deltacat/compute/converter/pyiceberg/overrides.py +++ b/deltacat/compute/converter/pyiceberg/overrides.py @@ -1,24 +1,165 @@ from collections import defaultdict import logging from deltacat import logs +import pyarrow import pyarrow.parquet as pq +from pyiceberg.io.pyarrow import ( + parquet_path_to_id_mapping, + StatisticsCollector, + MetricModeTypes, + DataFileStatistics, + MetricsMode, + StatsAggregator, +) +from typing import Dict, List, Set +from deltacat.compute.converter.utils.iceberg_columns import ( + ICEBERG_RESERVED_FIELD_ID_FOR_FILE_PATH_COLUMN, + ICEBERG_RESERVED_FIELD_ID_FOR_POS_COLUMN, +) +from pyiceberg.io.pyarrow import ( + _check_pyarrow_schema_compatible, + compute_statistics_plan, +) +from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, +) +from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator +from pyiceberg.types import ( + strtobool, +) +from pyiceberg.table import _min_sequence_number, _open_manifest +from pyiceberg.utils.concurrent import ExecutorFactory +from itertools import chain +from pyiceberg.typedef import ( + KeyDefaultDict, +) logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) -def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict_list): - from pyiceberg.io.pyarrow import ( - _check_pyarrow_schema_compatible, - data_file_statistics_from_parquet_metadata, - compute_statistics_plan, - parquet_path_to_id_mapping, - ) - from pyiceberg.manifest import ( - DataFile, - DataFileContent, - FileFormat, +def parquet_path_to_id_mapping_override(schema): + res = parquet_path_to_id_mapping(schema) + # Override here to insert position delete reserved column field IDs + res["file_path"] = ICEBERG_RESERVED_FIELD_ID_FOR_FILE_PATH_COLUMN + res["pos"] = ICEBERG_RESERVED_FIELD_ID_FOR_POS_COLUMN + return res + + +def data_file_statistics_from_parquet_metadata( + parquet_metadata: pq.FileMetaData, + stats_columns: Dict[int, StatisticsCollector], + parquet_column_mapping: Dict[str, int], +) -> DataFileStatistics: + """ + Overrides original Pyiceberg function: Compute and return DataFileStatistics that includes the following. + + - record_count + - column_sizes + - value_counts + - null_value_counts + - nan_value_counts + - column_aggregates + - split_offsets + + Args: + parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object. + stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to + set the mode for column metrics collection + parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID + """ + column_sizes: Dict[int, int] = {} + value_counts: Dict[int, int] = {} + split_offsets: List[int] = [] + + null_value_counts: Dict[int, int] = {} + nan_value_counts: Dict[int, int] = {} + + col_aggs = {} + + invalidate_col: Set[int] = set() + for r in range(parquet_metadata.num_row_groups): + # References: + # https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232 + # https://github.com/apache/parquet-mr/blob/ac29db4611f86a07cc6877b416aa4b183e09b353/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java#L184 + + row_group = parquet_metadata.row_group(r) + + data_offset = row_group.column(0).data_page_offset + dictionary_offset = row_group.column(0).dictionary_page_offset + + if row_group.column(0).has_dictionary_page and dictionary_offset < data_offset: + split_offsets.append(dictionary_offset) + else: + split_offsets.append(data_offset) + + for pos in range(parquet_metadata.num_columns): + column = row_group.column(pos) + field_id = parquet_column_mapping[column.path_in_schema] + if field_id in stats_columns: + stats_col = stats_columns[field_id] + + column_sizes.setdefault(field_id, 0) + column_sizes[field_id] += column.total_compressed_size + + if stats_col.mode == MetricsMode(MetricModeTypes.NONE): + continue + + value_counts[field_id] = ( + value_counts.get(field_id, 0) + column.num_values + ) + + if column.is_stats_set: + try: + statistics = column.statistics + + if statistics.has_null_count: + null_value_counts[field_id] = ( + null_value_counts.get(field_id, 0) + + statistics.null_count + ) + + if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS): + continue + + if field_id not in col_aggs: + col_aggs[field_id] = StatsAggregator( + stats_col.iceberg_type, + statistics.physical_type, + stats_col.mode.length, + ) + + col_aggs[field_id].update_min(statistics.min) + col_aggs[field_id].update_max(statistics.max) + + except pyarrow.lib.ArrowNotImplementedError as e: + invalidate_col.add(field_id) + logger.warning(e) + else: + # Note: Removed original adding columns without stats to invalid column logic here + logger.warning( + "PyArrow statistics missing for column %d when writing file", pos + ) + + split_offsets.sort() + + for field_id in invalidate_col: + del col_aggs[field_id] + del null_value_counts[field_id] + + return DataFileStatistics( + record_count=parquet_metadata.num_rows, + column_sizes=column_sizes, + value_counts=value_counts, + null_value_counts=null_value_counts, + nan_value_counts=nan_value_counts, + column_aggregates=col_aggs, + split_offsets=split_offsets, ) + +def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict_list): data_file_content_type = DataFileContent.POSITION_DELETES iceberg_files = [] schema = table_metadata.schema() @@ -37,7 +178,7 @@ def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict_list stats_columns=compute_statistics_plan( schema, table_metadata.properties ), - parquet_column_mapping=parquet_path_to_id_mapping(schema), + parquet_column_mapping=parquet_path_to_id_mapping_override(schema), ) data_file = DataFile( @@ -45,7 +186,6 @@ def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict_list file_path=file_path, file_format=FileFormat.PARQUET, partition=partition_value, - # partition=Record(**{"pk": "111", "bucket": 2}), file_size_in_bytes=len(input_file), sort_order_id=None, spec_id=table_metadata.default_spec_id, @@ -60,9 +200,6 @@ def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict_list def fetch_all_bucket_files(table): # step 1: filter manifests using partition summaries # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id - from pyiceberg.typedef import ( - KeyDefaultDict, - ) data_scan = table.scan() snapshot = data_scan.snapshot() @@ -78,15 +215,6 @@ def fetch_all_bucket_files(table): # step 2: filter the data files in each manifest # this filter depends on the partition spec used to write the manifest file - from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator - from pyiceberg.types import ( - strtobool, - ) - from pyiceberg.table import _min_sequence_number, _open_manifest - from pyiceberg.utils.concurrent import ExecutorFactory - from itertools import chain - from pyiceberg.manifest import DataFileContent - partition_evaluators = KeyDefaultDict(data_scan._build_partition_evaluator) metrics_evaluator = _InclusiveMetricsEvaluator( data_scan.table_metadata.schema(), diff --git a/deltacat/compute/converter/steps/convert.py b/deltacat/compute/converter/steps/convert.py index ef6a6509a..a2ad6036e 100644 --- a/deltacat/compute/converter/steps/convert.py +++ b/deltacat/compute/converter/steps/convert.py @@ -33,6 +33,7 @@ def convert(convert_input: ConvertInput): ) max_parallel_data_file_download = convert_input.max_parallel_data_file_download s3_file_system = convert_input.s3_file_system + s3_client_kwargs = convert_input.s3_client_kwargs if not position_delete_for_multiple_data_files: raise NotImplementedError( f"Distributed file level position delete compute is not supported yet" @@ -52,9 +53,14 @@ def convert(convert_input: ConvertInput): convert_input_files.partition_value ) partition_value = convert_input_files.partition_value - iceberg_table_warehouse_prefix_with_partition = ( - f"{iceberg_table_warehouse_prefix}/{partition_value_str}" - ) + if partition_value_str: + iceberg_table_warehouse_prefix_with_partition = ( + f"{iceberg_table_warehouse_prefix}/{partition_value_str}" + ) + else: + iceberg_table_warehouse_prefix_with_partition = ( + f"{iceberg_table_warehouse_prefix}" + ) enforce_primary_key_uniqueness = convert_input.enforce_primary_key_uniqueness total_pos_delete_table = [] if applicable_equality_delete_files: @@ -67,6 +73,7 @@ def convert(convert_input: ConvertInput): iceberg_table_warehouse_prefix_with_partition=iceberg_table_warehouse_prefix_with_partition, max_parallel_data_file_download=max_parallel_data_file_download, s3_file_system=s3_file_system, + s3_client_kwargs=s3_client_kwargs, ) if pos_delete_after_converting_equality_delete: total_pos_delete_table.append(pos_delete_after_converting_equality_delete) @@ -81,10 +88,12 @@ def convert(convert_input: ConvertInput): identify_column_name_concatenated=identifier_fields[0], identifier_columns=identifier_fields, merge_sort_column=sc._ORDERED_RECORD_IDX_COLUMN_NAME, + s3_client_kwargs=s3_client_kwargs, ) total_pos_delete_table.append(pos_delete_after_dedupe) total_pos_delete = pa.concat_tables(total_pos_delete_table) + to_be_added_files_list = upload_table_with_retry( table=total_pos_delete, s3_url_prefix=iceberg_table_warehouse_prefix_with_partition, @@ -168,6 +177,7 @@ def compute_pos_delete_with_limited_parallelism( iceberg_table_warehouse_prefix_with_partition, max_parallel_data_file_download, s3_file_system, + s3_client_kwargs, ): for data_files, equality_delete_files in zip( data_files_list, equality_delete_files_list @@ -182,6 +192,7 @@ def compute_pos_delete_with_limited_parallelism( sc._ORDERED_RECORD_IDX_COLUMN_NAME, ], sequence_number=data_file[0], + s3_client_kwargs=s3_client_kwargs, ) data_table_total.append(data_table) data_table_total = pa.concat_tables(data_table_total) @@ -191,6 +202,7 @@ def compute_pos_delete_with_limited_parallelism( equality_delete_table = download_data_table_and_append_iceberg_columns( data_files=equality_delete[1], columns_to_download=identifier_columns, + s3_client_kwargs=s3_client_kwargs, ) equality_delete_table_total.append(equality_delete_table) equality_delete_table_total = pa.concat_tables(equality_delete_table_total) @@ -201,6 +213,7 @@ def compute_pos_delete_with_limited_parallelism( iceberg_table_warehouse_prefix_with_partition=iceberg_table_warehouse_prefix_with_partition, identifier_columns=identifier_columns, s3_file_system=s3_file_system, + s3_client_kwargs=s3_client_kwargs, ) if not new_pos_delete_table: logger.info("No records deleted based on equality delete converstion") diff --git a/deltacat/compute/converter/steps/dedupe.py b/deltacat/compute/converter/steps/dedupe.py index 9de96df3e..d631fe1bc 100644 --- a/deltacat/compute/converter/steps/dedupe.py +++ b/deltacat/compute/converter/steps/dedupe.py @@ -11,6 +11,7 @@ def dedupe_data_files( identify_column_name_concatenated, identifier_columns, merge_sort_column, + s3_client_kwargs, ): data_file_table = [] @@ -27,6 +28,7 @@ def dedupe_data_files( sc._ORDERED_RECORD_IDX_COLUMN_NAME, ], sequence_number=sequence_number, + s3_client_kwargs=s3_client_kwargs, ) data_file_table.append(data_file_to_dedupe_table) diff --git a/deltacat/compute/converter/utils/convert_task_options.py b/deltacat/compute/converter/utils/convert_task_options.py index 1de1c7c51..a3c7e3ea9 100644 --- a/deltacat/compute/converter/utils/convert_task_options.py +++ b/deltacat/compute/converter/utils/convert_task_options.py @@ -4,7 +4,10 @@ AVERAGE_FILE_PATH_COLUMN_SIZE_BYTES = 80 AVERAGE_POS_COLUMN_SIZE_BYTES = 4 XXHASH_BYTE_PER_RECORD = 8 -MEMORY_BUFFER_RATE = 1.2 +MEMORY_BUFFER_RATE = 2 +# TODO: Add audit info to check this number in practice +# Worst case 2 as no duplicates exists across all pk +PYARROW_AGGREGATE_MEMORY_MULTIPLIER = 2 def estimate_fixed_hash_columns(hash_value_size_bytes_per_record, total_record_count): @@ -13,8 +16,8 @@ def estimate_fixed_hash_columns(hash_value_size_bytes_per_record, total_record_c def get_total_record_from_iceberg_files(iceberg_files_list): total_record_count = 0 - for iceberg_files in iceberg_files_list: - total_record_count += sum(file.record_count for file in iceberg_files) + # file are in form of tuple (sequence_number, DataFile) + total_record_count += sum(file[1].record_count for file in iceberg_files_list) return total_record_count @@ -76,13 +79,38 @@ def _get_task_options( return task_opts -def convert_resource_options_provider(index, files_for_each_bucket): - ( - data_files_list, - equality_delete_files_list, - position_delete_files_list, - ) = files_for_each_bucket[1] - memory_requirement = estimate_convert_remote_option_resources( - data_files_list, equality_delete_files_list +def estimate_dedupe_memory(all_data_files_for_dedupe): + dedupe_record_count = get_total_record_from_iceberg_files(all_data_files_for_dedupe) + produced_pos_memory_required = estimate_iceberg_pos_delete_additional_columns( + ["file_path", "pos"], dedupe_record_count ) - return _get_task_options(memory=memory_requirement) + download_pk_memory_required = estimate_fixed_hash_columns( + XXHASH_BYTE_PER_RECORD, dedupe_record_count + ) + memory_required_by_dedupe = ( + produced_pos_memory_required + download_pk_memory_required + ) * PYARROW_AGGREGATE_MEMORY_MULTIPLIER + memory_with_buffer = memory_required_by_dedupe * MEMORY_BUFFER_RATE + return memory_with_buffer + + +def convert_resource_options_provider(index, convert_input_files): + applicable_data_files = convert_input_files.applicable_data_files + applicable_equality_delete_files = ( + convert_input_files.applicable_equality_delete_files + ) + all_data_files_for_dedupe = convert_input_files.all_data_files_for_dedupe + total_memory_required = 0 + if applicable_data_files and applicable_equality_delete_files: + memory_requirement_for_convert_equality_deletes = ( + estimate_convert_remote_option_resources( + applicable_data_files, applicable_equality_delete_files + ) + ) + total_memory_required += memory_requirement_for_convert_equality_deletes + if all_data_files_for_dedupe: + memory_requirement_for_dedupe = estimate_dedupe_memory( + all_data_files_for_dedupe + ) + total_memory_required += memory_requirement_for_dedupe + return _get_task_options(memory=total_memory_required) diff --git a/deltacat/compute/converter/utils/io.py b/deltacat/compute/converter/utils/io.py index c3b98edce..e15bc4e41 100644 --- a/deltacat/compute/converter/utils/io.py +++ b/deltacat/compute/converter/utils/io.py @@ -1,13 +1,20 @@ import deltacat.compute.converter.utils.iceberg_columns as sc import daft +from deltacat.utils.daft import _get_s3_io_config +from daft import TimeUnit def download_data_table_and_append_iceberg_columns( - file, columns_to_download, additional_columns_to_append, sequence_number + file, + columns_to_download, + additional_columns_to_append, + sequence_number, + s3_client_kwargs, ): - # TODO; add S3 client kwargs table = download_parquet_with_daft_hash_applied( - identify_columns=columns_to_download, file=file, s3_client_kwargs={} + identify_columns=columns_to_download, + file=file, + s3_client_kwargs=s3_client_kwargs, ) if sc._FILE_PATH_COLUMN_NAME in additional_columns_to_append: table = sc.append_file_path_column(table, file.file_path) @@ -20,7 +27,6 @@ def download_data_table_and_append_iceberg_columns( def download_parquet_with_daft_hash_applied( identify_columns, file, s3_client_kwargs, **kwargs ): - from daft import TimeUnit # TODO: Add correct read kwargs as in: # https://github.com/ray-project/deltacat/blob/383855a4044e4dfe03cf36d7738359d512a517b4/deltacat/utils/daft.py#L97 @@ -29,8 +35,6 @@ def download_parquet_with_daft_hash_applied( kwargs.get("coerce_int96_timestamp_unit", "ms") ) - from deltacat.utils.daft import _get_s3_io_config - # TODO: Use Daft SHA1 hash instead to minimize probably of data corruption io_config = _get_s3_io_config(s3_client_kwargs=s3_client_kwargs) df = daft.read_parquet( diff --git a/deltacat/compute/converter/utils/s3u.py b/deltacat/compute/converter/utils/s3u.py index 1c53a9a85..e40b97943 100644 --- a/deltacat/compute/converter/utils/s3u.py +++ b/deltacat/compute/converter/utils/s3u.py @@ -77,7 +77,7 @@ def upload_table_with_retry( s3_file_system = get_s3_file_system(content_type=content_type) capture_object = CapturedBlockWritePaths() block_write_path_provider = UuidBlockWritePathProvider( - capture_object=capture_object + capture_object=capture_object, base_path=s3_url_prefix ) s3_table_writer_func = get_table_writer(table) table_record_count = get_table_length(table) diff --git a/deltacat/tests/compute/converter/test_convert_session.py b/deltacat/tests/compute/converter/test_convert_session.py index ce0041581..821f2c6b3 100644 --- a/deltacat/tests/compute/converter/test_convert_session.py +++ b/deltacat/tests/compute/converter/test_convert_session.py @@ -249,6 +249,7 @@ def test_converter_drop_duplicates_success( position_delete_for_multiple_data_files=True, max_parallel_data_file_download=10, s3_file_system=s3_file_system, + s3_client_kwargs={}, ) number_partitioned_array_1 = pa.array([0, 0, 0], type=pa.int32()) @@ -418,6 +419,7 @@ def test_converter_pos_delete_read_by_spark_success( position_delete_for_multiple_data_files=True, max_parallel_data_file_download=10, s3_file_system=s3_file_system, + s3_client_kwargs={}, ) primary_key_array_1 = pa.array(["pk1", "pk2", "pk3"])