Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deltacat/compute/converter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@

# Safe limit ONLY considering CPU limit, typically 32 for a 8x-large worker
DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD = 30

DEFAULT_ICEBERG_NAMESPACE = "default"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about recycling the exising DEFAULT_NAMESPACE in deltacat/constansts.py and changing its current value from "DEFAULT" to "default"? (I have the same change already staged in my workspace, to standardize on existing lowercase conventions for default names)

Copy link
Member Author

@Zyiqin-Miranda Zyiqin-Miranda May 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing same constant sounds good, merging this PR in first and addressing in next PR #542

43 changes: 26 additions & 17 deletions deltacat/compute/converter/converter_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from deltacat.utils.ray_utils.concurrency import (
invoke_parallel,
task_resource_options_provider,
Expand Down Expand Up @@ -48,6 +47,18 @@ def converter_session(params: ConverterSessionParams, **kwargs):
table_name = params.iceberg_table_name
iceberg_table = load_table(catalog, table_name)
enforce_primary_key_uniqueness = params.enforce_primary_key_uniqueness
iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
iceberg_namespace = params.iceberg_namespace
merge_keys = params.merge_keys
compact_small_files = params.compact_small_files
task_max_parallelism = params.task_max_parallelism
s3_client_kwargs = params.s3_client_kwargs
s3_file_system = params.s3_file_system
location_provider_prefix_override = params.location_provider_prefix_override
position_delete_for_multiple_data_files = (
params.position_delete_for_multiple_data_files
)

data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(
iceberg_table
)
Expand All @@ -56,14 +67,16 @@ def converter_session(params: ConverterSessionParams, **kwargs):
equality_delete_dict=equality_delete_dict,
pos_delete_dict=pos_delete_dict,
)
iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
iceberg_namespace = params.iceberg_namespace
iceberg_table_warehouse_prefix = construct_iceberg_table_prefix(
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
table_name=table_name,
iceberg_namespace=iceberg_namespace,
)
merge_keys = params.merge_keys

if not location_provider_prefix_override:
iceberg_table_warehouse_prefix = construct_iceberg_table_prefix(
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
table_name=table_name,
iceberg_namespace=iceberg_namespace,
)
else:
iceberg_table_warehouse_prefix = location_provider_prefix_override

# Using table identifier fields as merge keys if merge keys not provided
if not merge_keys:
identifier_fields_set = iceberg_table.schema().identifier_field_names()
Expand All @@ -86,31 +99,27 @@ def converter_session(params: ConverterSessionParams, **kwargs):
# Note that approach 2 will ideally require shared object store to avoid download equality delete files * number of child tasks times.
max_parallel_data_file_download = DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD

compact_small_files = params.compact_small_files
position_delete_for_multiple_data_files = (
params.position_delete_for_multiple_data_files
)
task_max_parallelism = params.task_max_parallelism

def convert_input_provider(index, item):
return {
"convert_input": ConvertInput.of(
files_for_each_bucket=item,
convert_input_files=item,
convert_task_index=index,
iceberg_table_warehouse_prefix=iceberg_table_warehouse_prefix,
identifier_fields=identifier_fields,
compact_small_files=compact_small_files,
enforce_primary_key_uniqueness=enforce_primary_key_uniqueness,
position_delete_for_multiple_data_files=position_delete_for_multiple_data_files,
max_parallel_data_file_download=max_parallel_data_file_download,
s3_client_kwargs=s3_client_kwargs,
s3_file_system=s3_file_system,
)
}

# Ray remote task: convert
# Assuming that memory consume by each bucket doesn't exceed one node's memory limit.
# TODO: Add split mechanism to split large buckets
convert_tasks_pending = invoke_parallel(
items=convert_input_files_for_all_buckets.items(),
items=convert_input_files_for_all_buckets,
ray_task=convert,
max_parallelism=task_max_parallelism,
options_provider=convert_options_provider,
Expand Down
6 changes: 6 additions & 0 deletions deltacat/compute/converter/model/convert_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def of(
position_delete_for_multiple_data_files,
max_parallel_data_file_download,
s3_file_system,
s3_client_kwargs,
) -> ConvertInput:

result = ConvertInput()
Expand All @@ -29,6 +30,7 @@ def of(
] = position_delete_for_multiple_data_files
result["max_parallel_data_file_download"] = max_parallel_data_file_download
result["s3_file_system"] = s3_file_system
result["s3_client_kwargs"] = s3_client_kwargs

return result

Expand Down Expand Up @@ -67,3 +69,7 @@ def max_parallel_data_file_download(self) -> int:
@property
def s3_file_system(self):
return self["s3_file_system"]

@property
def s3_client_kwargs(self):
return self["s3_client_kwargs"]
46 changes: 42 additions & 4 deletions deltacat/compute/converter/model/converter_session_params.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations
from typing import Optional, Dict
from deltacat.compute.converter.constants import DEFAULT_CONVERTER_TASK_MAX_PARALLELISM
from deltacat.compute.converter.constants import (
DEFAULT_CONVERTER_TASK_MAX_PARALLELISM,
DEFAULT_ICEBERG_NAMESPACE,
)
from fsspec import AbstractFileSystem


class ConverterSessionParams(dict):
Expand All @@ -18,11 +22,11 @@ def of(params: Optional[Dict]) -> ConverterSessionParams:
assert (
params.get("iceberg_warehouse_bucket_name") is not None
), "iceberg_warehouse_bucket_name is a required arg"
assert (
params.get("iceberg_namespace") is not None
), "iceberg_namespace is a required arg"
result = ConverterSessionParams(params)

result.iceberg_namespace = params.get(
"iceberg_namespace", DEFAULT_ICEBERG_NAMESPACE
)
result.enforce_primary_key_uniqueness = params.get(
"enforce_primary_key_uniqueness", False
)
Expand All @@ -36,6 +40,10 @@ def of(params: Optional[Dict]) -> ConverterSessionParams:
"task_max_parallelism", DEFAULT_CONVERTER_TASK_MAX_PARALLELISM
)
result.merge_keys = params.get("merge_keys", None)
result.s3_client_kwargs = params.get("s3_client_kwargs", {})
result.s3_file_system = params.get("s3_file_system", None)
result.s3_prefix_override = params.get("s3_prefix_override", None)

return result

@property
Expand All @@ -54,6 +62,10 @@ def iceberg_warehouse_bucket_name(self) -> str:
def iceberg_namespace(self) -> str:
return self["iceberg_namespace"]

@iceberg_namespace.setter
def iceberg_namespace(self, iceberg_namespace) -> None:
self["iceberg_namespace"] = iceberg_namespace

@property
def enforce_primary_key_uniqueness(self) -> bool:
return self["enforce_primary_key_uniqueness"]
Expand Down Expand Up @@ -97,3 +109,29 @@ def merge_keys(self) -> str:
@merge_keys.setter
def merge_keys(self, merge_keys) -> None:
self["merge_keys"] = merge_keys

@property
def s3_client_kwargs(self) -> Dict:
return self["s3_client_kwargs"]

@s3_client_kwargs.setter
def s3_client_kwargs(self, s3_client_kwargs) -> None:
self["s3_client_kwargs"] = s3_client_kwargs

@property
def s3_file_system(self) -> AbstractFileSystem:
return self["s3_file_system"]

@s3_file_system.setter
def s3_file_system(self, s3_file_system) -> None:
self["s3_file_system"] = s3_file_system

@property
def location_provider_prefix_override(self) -> str:
return self["location_provider_prefix_override"]

@location_provider_prefix_override.setter
def location_provider_prefix_override(
self, location_provider_prefix_override
) -> None:
self["location_provider_prefix_override"] = location_provider_prefix_override
Loading