Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
554 changes: 554 additions & 0 deletions deltacat/compute/converter/_dev/example_single_merge_key_converter.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions deltacat/compute/converter/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DEFAULT_CONVERTER_TASK_MAX_PARALLELISM = 4096

# Safe limit ONLY considering CPU limit, typically 32 for a 8x-large worker
DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD = 30
143 changes: 143 additions & 0 deletions deltacat/compute/converter/converter_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from deltacat.utils.ray_utils.concurrency import (
invoke_parallel,
task_resource_options_provider,
)
import ray
import functools
from deltacat.compute.converter.utils.convert_task_options import (
convert_resource_options_provider,
)
import logging
from deltacat import logs
from collections import defaultdict
from deltacat.compute.converter.model.converter_session_params import (
ConverterSessionParams,
)
from deltacat.compute.converter.constants import DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD
from deltacat.compute.converter.steps.convert import convert
from deltacat.compute.converter.model.convert_input import ConvertInput
from deltacat.compute.converter.pyiceberg.overrides import (
fetch_all_bucket_files,
parquet_files_dict_to_iceberg_data_files,
)
from deltacat.compute.converter.utils.converter_session_utils import (
check_data_files_sequence_number,
)
from deltacat.compute.converter.pyiceberg.replace_snapshot import (
commit_overwrite_snapshot,
)
from deltacat.compute.converter.pyiceberg.catalog import load_table

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def converter_session(params: ConverterSessionParams, **kwargs):
"""
Convert equality delete to position delete.
Compute and memory heavy work from downloading equality delete table and compute position deletes
will be executed on Ray remote tasks.
"""

catalog = params.catalog
table_name = params.iceberg_table_name
iceberg_table = load_table(catalog, table_name)
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(
iceberg_table
)

# files_for_each_bucket contains the following files list:
# {partition_value: [(equality_delete_files_list, data_files_list, pos_delete_files_list)]
files_for_each_bucket = defaultdict(tuple)
for k, v in data_file_dict.items():
logger.info(f"data_file: k, v:{k, v}")
for k, v in equality_delete_dict.items():
logger.info(f"equality_delete_file: k, v:{k, v}")
for partition_value, equality_delete_file_list in equality_delete_dict.items():
(
result_equality_delete_file,
result_data_file,
) = check_data_files_sequence_number(
data_files_list=data_file_dict[partition_value],
equality_delete_files_list=equality_delete_dict[partition_value],
)
logger.info(f"result_data_file:{result_data_file}")
logger.info(f"result_equality_delete_file:{result_equality_delete_file}")
files_for_each_bucket[partition_value] = (
result_data_file,
result_equality_delete_file,
[],
)

iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
print(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}")
merge_keys = params.merge_keys
# Using table identifier fields as merge keys if merge keys not provided
if not merge_keys:
identifier_fields_set = iceberg_table.schema().identifier_field_names()
identifier_fields = list(identifier_fields_set)
else:
identifier_fields = merge_keys
if len(identifier_fields) > 1:
raise NotImplementedError(
f"Multiple identifier fields lookup not supported yet."
)
convert_options_provider = functools.partial(
task_resource_options_provider,
resource_amount_provider=convert_resource_options_provider,
)

# TODO (zyiqin): max_parallel_data_file_download should be determined by memory requirement for each bucket.
# Specifically, for case when files for one bucket memory requirement exceed one worker node's memory limit, WITHOUT rebasing with larger hash bucket count,
# 1. We can control parallel files to download by adjusting max_parallel_data_file_download.
# 2. Implement two-layer converter tasks, with convert tasks to spin up child convert tasks.
# Note that approach 2 will ideally require shared object store to avoid download equality delete files * number of child tasks times.
max_parallel_data_file_download = DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD

compact_small_files = params.compact_small_files
position_delete_for_multiple_data_files = (
params.position_delete_for_multiple_data_files
)
task_max_parallelism = params.task_max_parallelism

def convert_input_provider(index, item):
return {
"convert_input": ConvertInput.of(
files_for_each_bucket=item,
convert_task_index=index,
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
identifier_fields=identifier_fields,
compact_small_files=compact_small_files,
position_delete_for_multiple_data_files=position_delete_for_multiple_data_files,
max_parallel_data_file_download=max_parallel_data_file_download,
)
}

# Ray remote task: convert
# Assuming that memory consume by each bucket doesn't exceed one node's memory limit.
# TODO: Add split mechanism to split large buckets
convert_tasks_pending = invoke_parallel(
items=files_for_each_bucket.items(),
ray_task=convert,
max_parallelism=task_max_parallelism,
options_provider=convert_options_provider,
kwargs_provider=convert_input_provider,
)
to_be_deleted_files_list = []
to_be_added_files_dict_list = []
convert_results = ray.get(convert_tasks_pending)
for convert_result in convert_results:
to_be_deleted_files_list.extend(convert_result[0].values())
to_be_added_files_dict_list.append(convert_result[1])

new_position_delete_files = parquet_files_dict_to_iceberg_data_files(
io=iceberg_table.io,
table_metadata=iceberg_table.metadata,
files_dict_list=to_be_added_files_dict_list,
)
commit_overwrite_snapshot(
iceberg_table=iceberg_table,
# equality_delete_files + data file that all rows are deleted
to_be_deleted_files_list=to_be_deleted_files_list[0],
new_position_delete_files=new_position_delete_files,
)
Empty file.
55 changes: 55 additions & 0 deletions deltacat/compute/converter/model/convert_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations
from typing import Dict, List


class ConvertInput(Dict):
@staticmethod
def of(
files_for_each_bucket,
convert_task_index,
iceberg_warehouse_bucket_name,
identifier_fields,
compact_small_files,
position_delete_for_multiple_data_files,
max_parallel_data_file_download,
) -> ConvertInput:

result = ConvertInput()
result["files_for_each_bucket"] = files_for_each_bucket
result["convert_task_index"] = convert_task_index
result["identifier_fields"] = identifier_fields
result["iceberg_warehouse_bucket_name"] = iceberg_warehouse_bucket_name
result["compact_small_files"] = compact_small_files
result[
"position_delete_for_multiple_data_files"
] = position_delete_for_multiple_data_files
result["max_parallel_data_file_download"] = max_parallel_data_file_download
return result

@property
def files_for_each_bucket(self) -> tuple:
return self["files_for_each_bucket"]

@property
def identifier_fields(self) -> List[str]:
return self["identifier_fields"]

@property
def convert_task_index(self) -> int:
return self["convert_task_index"]

@property
def iceberg_warehouse_bucket_name(self) -> str:
return self["iceberg_warehouse_bucket_name"]

@property
def compact_small_files(self) -> bool:
return self["compact_small_files"]

@property
def position_delete_for_multiple_data_files(self) -> bool:
return self["position_delete_for_multiple_data_files"]

@property
def max_parallel_data_file_download(self) -> int:
return self["max_parallel_data_file_download"]
81 changes: 81 additions & 0 deletions deltacat/compute/converter/model/converter_session_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations
from typing import Optional, Dict
from deltacat.compute.converter.constants import DEFAULT_CONVERTER_TASK_MAX_PARALLELISM


class ConverterSessionParams(dict):
"""
This class represents the parameters passed to convert_ (deltacat/compute/compactor/compaction_session.py)
"""

@staticmethod
def of(params: Optional[Dict]) -> ConverterSessionParams:
params = {} if params is None else params
assert params.get("catalog") is not None, "catalog is a required arg"
assert (
params.get("iceberg_table_name") is not None
), "iceberg_table_name is a required arg"
assert (
params.get("iceberg_warehouse_bucket_name") is not None
), "iceberg_warehouse_bucket_name is a required arg"
result = ConverterSessionParams(params)

result.compact_small_files = params.get("compact_small_files", False)

# For Iceberg v3 spec, option to produce delete vector that can establish 1:1 mapping with data files.
result.position_delete_for_multiple_data_files = params.get(
"position_delete_for_multiple_data_files", True
)
result.task_max_parallelism = params.get(
"task_max_parallelism", DEFAULT_CONVERTER_TASK_MAX_PARALLELISM
)
result.merge_keys = params.get("merge_keys", None)
return result

@property
def catalog(self):
return self["catalog"]

@property
def iceberg_table_name(self) -> str:
return self["iceberg_table_name"]

@property
def iceberg_warehouse_bucket_name(self) -> str:
return self["iceberg_warehouse_bucket_name"]

@property
def compact_small_files(self) -> bool:
return self["compact_small_files"]

@compact_small_files.setter
def compact_small_files(self, compact_small_files) -> None:
self["compact_small_files"] = compact_small_files

@property
def position_delete_for_multiple_data_files(self) -> bool:
return self["position_delete_for_multiple_data_files"]

@position_delete_for_multiple_data_files.setter
def position_delete_for_multiple_data_files(
self, position_delete_for_multiple_data_files
) -> None:
self[
"position_delete_for_multiple_data_files"
] = position_delete_for_multiple_data_files

@property
def task_max_parallelism(self) -> str:
return self["task_max_parallelism"]

@task_max_parallelism.setter
def task_max_parallelism(self, task_max_parallelism) -> None:
self["task_max_parallelism"] = task_max_parallelism

@property
def merge_keys(self) -> str:
return self["merge_keys"]

@merge_keys.setter
def merge_keys(self, merge_keys) -> None:
self["merge_keys"] = merge_keys
Empty file.
75 changes: 75 additions & 0 deletions deltacat/compute/converter/pyiceberg/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Optional


def load_catalog(iceberg_catalog_name, iceberg_catalog_properties):
catalog = load_catalog(
name=iceberg_catalog_name,
**iceberg_catalog_properties,
)
return catalog


def get_s3_path(
bucket_name: str,
database_name: Optional[str] = None,
table_name: Optional[str] = None,
) -> str:
result_path = f"s3://{bucket_name}"
if database_name is not None:
result_path += f"/{database_name}.db"

if table_name is not None:
result_path += f"/{table_name}"
return result_path


def get_bucket_name():
return "metadata-py4j-zyiqin1"


def get_s3_prefix():
return get_s3_path(get_bucket_name())


def get_credential():
import boto3

boto3_session = boto3.Session()
credentials = boto3_session.get_credentials()
return credentials


def get_glue_catalog():
from pyiceberg.catalog import load_catalog

credential = get_credential()
# Credentials are refreshable, so accessing your access key / secret key
# separately can lead to a race condition. Use this to get an actual matched
# set.
credential = credential.get_frozen_credentials()
access_key_id = credential.access_key
secret_access_key = credential.secret_key
session_token = credential.token
s3_path = get_s3_prefix()
glue_catalog = load_catalog(
"glue",
**{
"warehouse": s3_path,
"type": "glue",
"aws_access_key_id": access_key_id,
"aws_secret_access_key": secret_access_key,
"aws_session_token": session_token,
"region_name": "us-east-1",
"s3.access-key-id": access_key_id,
"s3.secret-access-key": secret_access_key,
"s3.session-token": session_token,
"s3.region": "us-east-1",
},
)

return glue_catalog


def load_table(catalog, table_name):
loaded_table = catalog.load_table(table_name)
return loaded_table
Loading
Loading