Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions python/iceberg/api/partition_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

class PartitionField(object):

def __init__(self, source_id, name, transform):
def __init__(self, source_id, field_id, name, transform):
self.source_id = source_id
self.field_id = field_id
self.name = name
self.transform = transform

Expand All @@ -29,7 +30,8 @@ def __eq__(self, other):
elif other is None or not isinstance(other, PartitionField):
return False

return self.source_id == other.source_id and self.name == other.name and self.transform == other.transform
return self.source_id == other.source_id and self.field_id == other.field_id and \
self.name == other.name and self.transform == other.transform

def __ne__(self, other):
return not self.__eq__(other)
Expand All @@ -38,4 +40,4 @@ def __hash__(self):
return hash(self.__key())

def __key(self):
return PartitionField.__class__, self.source_id, self.name, self.transform
return PartitionField.__class__, self.source_id, self.field_id, self.name, self.transform
37 changes: 28 additions & 9 deletions python/iceberg/api/partition_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ class PartitionSpec(object):

@staticmethod
def UNPARTITIONED_SPEC():
return PartitionSpec(Schema(), 0, [])
return PartitionSpec(Schema(), 0, [], PartitionSpec.PARTITION_DATA_ID_START - 1)

@staticmethod
def unpartitioned():
return PartitionSpec.UNPARTITIONED_SPEC()

def __init__(self, schema, spec_id, fields):
def __init__(self, schema, spec_id, fields, last_assigned_field_id):
self.fields_by_source_id = None
self.fields_by_name = None
self.__java_classes = None
Expand All @@ -49,6 +49,7 @@ def __init__(self, schema, spec_id, fields):
self.__fields = list()
for field in fields:
self.__fields.append(field)
self.last_assigned_field_id = last_assigned_field_id

@property
def fields(self):
Expand All @@ -70,10 +71,10 @@ def get_field_by_source_id(self, field_id):

def partition_type(self):
struct_fields = list()
for i, field in enumerate(self.__fields):
for _i, field in enumerate(self.__fields):
source_type = self.schema.find_type(field.source_id)
result_type = field.transform.get_result_type(source_type)
struct_fields.append(NestedField.optional(PartitionSpec.PARTITION_DATA_ID_START + i,
struct_fields.append(NestedField.optional(field.field_id,
field.name,
result_type))

Expand Down Expand Up @@ -170,9 +171,10 @@ def __repr__(self):
sb = ["["]

for field in self.__fields:
sb.append("\n {name}: {transform}({source_id})".format(name=field.name,
transform=str(field.transform),
source_id=field.source_id))
sb.append("\n {field_id}: {name}: {transform}({source_id})".format(field_id=field.field_id,
name=field.name,
transform=str(field.transform),
source_id=field.source_id))

if len(self.__fields) > 0:
sb.append("\n")
Expand Down Expand Up @@ -201,6 +203,11 @@ def __init__(self, schema):
self.fields = list()
self.partition_names = set()
self.spec_id = 0
self.last_assigned_field_id = PartitionSpec.PARTITION_DATA_ID_START - 1

def __next_field_id(self):
self.last_assigned_field_id = self.last_assigned_field_id + 1
return self.last_assigned_field_id

def with_spec_id(self, spec_id):
self.spec_id = spec_id
Expand All @@ -226,6 +233,7 @@ def identity(self, source_name):
self.check_and_add_partition_name(source_name)
source_column = self.find_source_column(source_name)
self.fields.append(PartitionField(source_column.field_id,
self.__next_field_id(),
source_name,
Transforms.identity(source_column.type)))
return self
Expand All @@ -235,6 +243,7 @@ def year(self, source_name):
self.check_and_add_partition_name(name)
source_column = self.find_source_column(source_name)
self.fields.append(PartitionField(source_column.field_id,
self.__next_field_id(),
name,
Transforms.year(source_column.types)))
return self
Expand All @@ -244,6 +253,7 @@ def month(self, source_name):
self.check_and_add_partition_name(name)
source_column = self.find_source_column(source_name)
self.fields.append(PartitionField(source_column.field_id,
self.__next_field_id(),
name,
Transforms.month(source_column.types)))
return self
Expand All @@ -253,6 +263,7 @@ def day(self, source_name):
self.check_and_add_partition_name(name)
source_column = self.find_source_column(source_name)
self.fields.append(PartitionField(source_column.field_id,
self.__next_field_id(),
name,
Transforms.day(source_column.types)))
return self
Expand All @@ -262,6 +273,7 @@ def hour(self, source_name):
self.check_and_add_partition_name(name)
source_column = self.find_source_column(source_name)
self.fields.append(PartitionField(source_column.field_id,
self.__next_field_id(),
name,
Transforms.hour(source_column.type)))
return self
Expand All @@ -271,6 +283,7 @@ def bucket(self, source_name, num_buckets):
self.check_and_add_partition_name(name)
source_column = self.find_source_column(source_name)
self.fields.append(PartitionField(source_column.field_id,
self.__next_field_id(),
name,
Transforms.bucket(source_column.type, num_buckets)))
return self
Expand All @@ -280,24 +293,30 @@ def truncate(self, source_name, width):
self.check_and_add_partition_name(name)
source_column = self.find_source_column(source_name)
self.fields.append(PartitionField(source_column.field_id,
self.__next_field_id(),
name,
Transforms.truncate(source_column.types, width)))
return self

def add(self, source_id, name, transform):
def add_without_field_id(self, source_id, name, transform):
return self.add(source_id, self.__next_field_id(), name, transform)

def add(self, source_id, field_id, name, transform):
self.check_and_add_partition_name(name)
column = self.schema.find_field(source_id)
if column is None:
raise RuntimeError("Cannot find source column: %s" % source_id)

transform_obj = Transforms.from_string(column.type, transform)
field = PartitionField(source_id,
field_id,
name,
transform_obj)
self.fields.append(field)
self.last_assigned_field_id = max(self.last_assigned_field_id, field_id)
return self

def build(self):
spec = PartitionSpec(self.schema, self.spec_id, self.fields)
spec = PartitionSpec(self.schema, self.spec_id, self.fields, self.last_assigned_field_id)
PartitionSpec.check_compatibility(spec, self.schema)
return spec
2 changes: 1 addition & 1 deletion python/iceberg/api/transforms/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def from_string(type, transform):

if match is not None:
name = match.group(1)
w = match.group(2)
w = int(match.group(2))
if name.lower() == "truncate":
return Truncate.get(type, w)
elif name.lower() == "bucket":
Expand Down
49 changes: 29 additions & 20 deletions python/iceberg/core/partition_spec_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class PartitionSpecParser(object):
SPEC_ID = "spec-id"
FIELDS = "fields"
SOURCE_ID = "source-id"
FIELD_ID = "field-id"
TRANSFORM = "transform"
NAME = "name"

Expand All @@ -41,7 +42,8 @@ def to_dict(spec):
def to_json_fields(spec):
return [{PartitionSpecParser.NAME: field.name,
PartitionSpecParser.TRANSFORM: str(field.transform),
PartitionSpecParser.SOURCE_ID: field.source_id}
PartitionSpecParser.SOURCE_ID: field.source_id,
PartitionSpecParser.FIELD_ID: field.field_id}
for field in spec.fields]

@staticmethod
Expand All @@ -56,18 +58,8 @@ def from_json(schema, json_obj):

builder = PartitionSpec.builder_for(schema).with_spec_id(spec_id)
fields = json_obj.get(PartitionSpecParser.FIELDS)
if not isinstance(fields, (list, tuple)):
raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % fields)

for element in fields:
if not isinstance(element, dict):
raise RuntimeError("Cannot parse partition field, not an object: %s" % element)

builder.add(element.get(PartitionSpecParser.SOURCE_ID),
element.get(PartitionSpecParser.NAME),
element.get(PartitionSpecParser.TRANSFORM))

return builder.build()
return PartitionSpecParser.__build_from_json_fields(builder, fields)

@staticmethod
def from_json_fields(schema, spec_id, json_obj):
Expand All @@ -76,14 +68,31 @@ def from_json_fields(schema, spec_id, json_obj):
if isinstance(json_obj, str):
json_obj = json.loads(json_obj)

if not isinstance(json_obj, list):
raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % json_obj)
return PartitionSpecParser.__build_from_json_fields(builder, json_obj)

@staticmethod
def __build_from_json_fields(builder, json_fields):
if not isinstance(json_fields, (list, tuple)):
raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % json_fields)

field_id_count = 0
for element in json_fields:
if not isinstance(element, dict):
raise RuntimeError("Cannot parse partition field, not an object: %s" % element)

for item in json_obj:
if not isinstance(item, dict):
raise RuntimeError("Cannot parse partition field, not an object: %s" % json_obj)
builder.add(item.get(PartitionSpecParser.SOURCE_ID),
item.get(PartitionSpecParser.NAME),
item.get(PartitionSpecParser.TRANSFORM))
if element.get(PartitionSpecParser.FIELD_ID) is not None:
builder.add(element.get(PartitionSpecParser.SOURCE_ID),
element.get(PartitionSpecParser.FIELD_ID),
element.get(PartitionSpecParser.NAME),
element.get(PartitionSpecParser.TRANSFORM))
field_id_count = field_id_count + 1
else:
builder.add_without_field_id(element.get(PartitionSpecParser.SOURCE_ID),
element.get(PartitionSpecParser.NAME),
element.get(PartitionSpecParser.TRANSFORM))

if field_id_count > 0 and field_id_count != len(json_fields):
raise RuntimeError("Cannot parse spec with missing field IDs: %s missing of %s fields." %
(len(json_fields) - field_id_count, len(json_fields)))

return builder.build()
88 changes: 81 additions & 7 deletions python/tests/core/test_partition_spec_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,95 @@
# under the License.

from iceberg.api import PartitionSpec, Schema
from iceberg.api.types import IntegerType, NestedField, StringType
from iceberg.api.types import DecimalType, IntegerType, NestedField, StringType
from iceberg.core import PartitionSpecParser
import pytest


def test_to_json_conversion():
spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()),
NestedField.required(2, "data", StringType.get()))
NestedField.required(2, "data", StringType.get()),
NestedField.required(3, "num", DecimalType.of(9, 2)))

spec = PartitionSpec\
spec = PartitionSpec \
.builder_for(spec_schema) \
.identity("id")\
.bucket("data", 16)\
.identity("id") \
.bucket("data", 16) \
.add_without_field_id(2, "data1", "bucket[16]") \
.add(2, 1010, "data2", "bucket[8]") \
.bucket("num", 8) \
.build()

expected = '{"spec-id": 0, "fields": [' \
'{"name": "id", "transform": "identity", "source-id": 1}, ' \
'{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2}]}'
'{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \
'{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \
'{"name": "data1", "transform": "bucket[16]", "source-id": 2, "field-id": 1002}, ' \
'{"name": "data2", "transform": "bucket[8]", "source-id": 2, "field-id": 1010}, ' \
'{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3, "field-id": 1011}]}'
assert expected == PartitionSpecParser.to_json(spec)


def test_from_json_conversion_with_field_ids():
spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()),
NestedField.required(2, "data", StringType.get()),
NestedField.required(3, "num", DecimalType.of(9, 2)))

spec_string = '{"spec-id": 0, "fields": [' \
'{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \
'{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \
'{"name": "data1", "transform": "bucket[16]", "source-id": 2, "field-id": 1002}, ' \
'{"name": "data2", "transform": "bucket[8]", "source-id": 2, "field-id": 1010}, ' \
'{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3, "field-id": 1011}]}'

spec = PartitionSpecParser.from_json(spec_schema, spec_string)

expected_spec = PartitionSpec \
.builder_for(spec_schema) \
.identity("id") \
.bucket("data", 16) \
.add_without_field_id(2, "data1", "bucket[16]") \
.add(2, 1010, "data2", "bucket[8]") \
.bucket("num", 8) \
.build()
assert expected_spec == spec


def test_from_json_conversion_without_field_ids():
spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()),
NestedField.required(2, "data", StringType.get()),
NestedField.required(3, "num", DecimalType.of(9, 2)))

spec_string = '{"spec-id": 0, "fields": [' \
'{"name": "id", "transform": "identity", "source-id": 1}, ' \
'{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2}, ' \
'{"name": "data1", "transform": "bucket[16]", "source-id": 2}, ' \
'{"name": "data2", "transform": "bucket[8]", "source-id": 2}, ' \
'{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3}]}'

spec = PartitionSpecParser.from_json(spec_schema, spec_string)

expected_spec = PartitionSpec \
.builder_for(spec_schema) \
.identity("id") \
.bucket("data", 16) \
.add_without_field_id(2, "data1", "bucket[16]") \
.add(2, 1003, "data2", "bucket[8]") \
.bucket("num", 8) \
.build()
assert expected_spec == spec


def test_raise_exception_with_invalid_json():
spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()),
NestedField.required(2, "data", StringType.get()),
NestedField.required(3, "num", DecimalType.of(9, 2)))

spec_string = '{"spec-id": 0, "fields": [' \
'{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \
'{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \
'{"name": "data1", "transform": "bucket[16]", "source-id": 2}, ' \
'{"name": "data2", "transform": "bucket[8]", "source-id": 2}, ' \
'{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3}]}'

with pytest.raises(RuntimeError):
PartitionSpecParser.from_json(spec_schema, spec_string)