Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions deltacat/compute/converter/converter_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def converter_session(params: ConverterSessionParams, **kwargs):
iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
iceberg_namespace = params.iceberg_namespace
merge_keys = params.merge_keys
compact_small_files = params.compact_small_files
compact_previous_position_delete_files = (
params.compact_previous_position_delete_files
)
task_max_parallelism = params.task_max_parallelism
s3_client_kwargs = params.s3_client_kwargs
s3_file_system = params.s3_file_system
Expand Down Expand Up @@ -103,7 +105,7 @@ def convert_input_provider(index, item):
convert_task_index=index,
iceberg_table_warehouse_prefix=iceberg_table_warehouse_prefix,
identifier_fields=identifier_fields,
compact_small_files=compact_small_files,
compact_previous_position_delete_files=compact_previous_position_delete_files,
table_io=iceberg_table.io,
table_metadata=iceberg_table.metadata,
enforce_primary_key_uniqueness=enforce_primary_key_uniqueness,
Expand All @@ -114,7 +116,7 @@ def convert_input_provider(index, item):
)
}

logger.info((f"Getting remote convert tasks..."))
logger.info(f"Getting remote convert tasks...")
# Ray remote task: convert
# TODO: Add split mechanism to split large buckets
convert_tasks_pending = invoke_parallel(
Expand All @@ -131,12 +133,36 @@ def convert_input_provider(index, item):
convert_results = ray.get(convert_tasks_pending)
logger.info(f"Got {len(convert_tasks_pending)} convert tasks.")

total_position_delete_record_count = sum(
convert_result.position_delete_record_count
for convert_result in convert_results
)
total_input_data_file_record_count = sum(
convert_result.input_data_files_record_count
for convert_result in convert_results
)
total_data_file_hash_columns_in_memory_sizes = sum(
convert_result.input_data_files_hash_columns_in_memory_sizes
for convert_result in convert_results
)
total_position_delete_file_in_memory_sizes = sum(
convert_result.position_delete_in_memory_sizes
for convert_result in convert_results
)
total_position_delete_on_disk_sizes = sum(
convert_result.position_delete_on_disk_sizes
for convert_result in convert_results
)

to_be_added_files_list = []
for convert_result in convert_results:
to_be_deleted_files_list.extend(convert_result[0].values())
to_be_added_files_list.extend(convert_result[1])
to_be_added_files = convert_result.to_be_added_files
to_be_deleted_files = convert_result.to_be_deleted_files

to_be_deleted_files_list.extend(to_be_deleted_files.values())
to_be_added_files_list.extend(to_be_added_files)

if not to_be_deleted_files_list:
if not to_be_deleted_files_list and to_be_added_files_list:
commit_append_snapshot(
iceberg_table=iceberg_table,
new_position_delete_files=to_be_added_files_list,
Expand All @@ -147,4 +173,13 @@ def convert_input_provider(index, item):
to_be_deleted_files_list=to_be_deleted_files_list,
new_position_delete_files=to_be_added_files_list,
)
logger.info(
f"Aggregated stats for {table_name}: "
f"total position delete record count: {total_position_delete_record_count}, "
f"total input data file record_count: {total_input_data_file_record_count}, "
f"total data file hash columns in memory sizes: {total_data_file_hash_columns_in_memory_sizes}, "
f"total position delete file in memory sizes: {total_position_delete_file_in_memory_sizes}, "
f"total position delete file on disk sizes: {total_position_delete_on_disk_sizes}."
)

logger.info(f"Committed new Iceberg snapshot.")
10 changes: 6 additions & 4 deletions deltacat/compute/converter/model/convert_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def of(
identifier_fields,
table_io,
table_metadata,
compact_small_files,
compact_previous_position_delete_files,
enforce_primary_key_uniqueness,
position_delete_for_multiple_data_files,
max_parallel_data_file_download,
Expand All @@ -27,7 +27,9 @@ def of(
result["iceberg_table_warehouse_prefix"] = iceberg_table_warehouse_prefix
result["table_io"] = table_io
result["table_metadata"] = table_metadata
result["compact_small_files"] = compact_small_files
result[
"compact_previous_position_delete_files"
] = compact_previous_position_delete_files
result["enforce_primary_key_uniqueness"] = enforce_primary_key_uniqueness
result[
"position_delete_for_multiple_data_files"
Expand Down Expand Up @@ -63,8 +65,8 @@ def table_metadata(self):
return self["table_metadata"]

@property
def compact_small_files(self) -> bool:
return self["compact_small_files"]
def compact_previous_position_delete_files(self) -> bool:
return self["compact_previous_position_delete_files"]

@property
def enforce_primary_key_uniqueness(self) -> bool:
Expand Down
61 changes: 61 additions & 0 deletions deltacat/compute/converter/model/convert_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations
from typing import Dict


class ConvertResult(Dict):
@staticmethod
def of(
convert_task_index,
to_be_added_files,
to_be_deleted_files,
position_delete_record_count,
input_data_files_record_count,
input_data_files_hash_columns_in_memory_sizes,
position_delete_in_memory_sizes,
position_delete_on_disk_sizes,
) -> ConvertResult:

result = ConvertResult()
result["convert_task_index"] = convert_task_index
result["to_be_added_files"] = to_be_added_files
result["to_be_deleted_files"] = to_be_deleted_files
result["position_delete_record_count"] = position_delete_record_count
result["input_data_files_record_count"] = input_data_files_record_count
result[
"input_data_files_hash_columns_in_memory_sizes"
] = input_data_files_hash_columns_in_memory_sizes
result["position_delete_in_memory_sizes"] = position_delete_in_memory_sizes
result["position_delete_on_disk_sizes"] = position_delete_on_disk_sizes
return result

@property
def convert_task_index(self) -> int:
return self["convert_task_index"]

@property
def to_be_added_files(self):
return self["to_be_added_files"]

@property
def to_be_deleted_files(self):
return self["to_be_deleted_files"]

@property
def position_delete_record_count(self):
return self["position_delete_record_count"]

@property
def input_data_files_record_count(self):
return self["input_data_files_record_count"]

@property
def input_data_files_hash_columns_in_memory_sizes(self):
return self["input_data_files_hash_columns_in_memory_sizes"]

@property
def position_delete_in_memory_sizes(self):
return self["position_delete_in_memory_sizes"]

@property
def position_delete_on_disk_sizes(self):
return self["position_delete_on_disk_sizes"]
18 changes: 12 additions & 6 deletions deltacat/compute/converter/model/converter_session_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def of(params: Optional[Dict]) -> ConverterSessionParams:
result.enforce_primary_key_uniqueness = params.get(
"enforce_primary_key_uniqueness", False
)
result.compact_small_files = params.get("compact_small_files", False)
result.compact_previous_position_delete_files = params.get(
"compact_previous_position_delete_files", False
)

# For Iceberg v3 spec, option to produce delete vector that can establish 1:1 mapping with data files.
result.position_delete_for_multiple_data_files = params.get(
Expand Down Expand Up @@ -73,12 +75,16 @@ def enforce_primary_key_uniqueness(self, enforce_primary_key_uniqueness) -> None
self["enforce_primary_key_uniqueness"] = enforce_primary_key_uniqueness

@property
def compact_small_files(self) -> bool:
return self["compact_small_files"]
def compact_previous_position_delete_files(self) -> bool:
return self["compact_previous_position_delete_files"]

@compact_small_files.setter
def compact_small_files(self, compact_small_files) -> None:
self["compact_small_files"] = compact_small_files
@compact_previous_position_delete_files.setter
def compact_previous_position_delete_files(
self, compact_previous_position_delete_files
) -> None:
self[
"compact_previous_position_delete_files"
] = compact_previous_position_delete_files

@property
def position_delete_for_multiple_data_files(self) -> bool:
Expand Down
4 changes: 3 additions & 1 deletion deltacat/compute/converter/pyiceberg/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def parquet_files_dict_to_iceberg_data_files(io, table_metadata, files_dict):
schema = table_metadata.schema()
for partition_value, file_paths in files_dict.items():
for file_path in file_paths:
logger.info(f"DEBUG_file_path:{file_path}")
input_file = io.new_input(file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)
Expand Down Expand Up @@ -239,9 +240,10 @@ def fetch_all_bucket_files(table):
file_sequence_number = manifest_entry.sequence_number
data_file_tuple = (file_sequence_number, data_file)
partition_value = data_file.partition

if data_file.content == DataFileContent.DATA:
data_entries[partition_value].append(data_file_tuple)
if data_file.content == DataFileContent.POSITION_DELETES:
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries[partition_value].append(data_file_tuple)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
equality_data_entries[partition_value].append(data_file_tuple)
Expand Down
Loading