diff --git a/python/iceberg/api/partition_field.py b/python/iceberg/api/partition_field.py index adaca2894370..8e789a10897b 100644 --- a/python/iceberg/api/partition_field.py +++ b/python/iceberg/api/partition_field.py @@ -18,8 +18,9 @@ class PartitionField(object): - def __init__(self, source_id, name, transform): + def __init__(self, source_id, field_id, name, transform): self.source_id = source_id + self.field_id = field_id self.name = name self.transform = transform @@ -29,7 +30,8 @@ def __eq__(self, other): elif other is None or not isinstance(other, PartitionField): return False - return self.source_id == other.source_id and self.name == other.name and self.transform == other.transform + return self.source_id == other.source_id and self.field_id == other.field_id and \ + self.name == other.name and self.transform == other.transform def __ne__(self, other): return not self.__eq__(other) @@ -38,4 +40,4 @@ def __hash__(self): return hash(self.__key()) def __key(self): - return PartitionField.__class__, self.source_id, self.name, self.transform + return PartitionField.__class__, self.source_id, self.field_id, self.name, self.transform diff --git a/python/iceberg/api/partition_spec.py b/python/iceberg/api/partition_spec.py index 95cce0e6a679..e9e24c9b385a 100644 --- a/python/iceberg/api/partition_spec.py +++ b/python/iceberg/api/partition_spec.py @@ -32,13 +32,13 @@ class PartitionSpec(object): @staticmethod def UNPARTITIONED_SPEC(): - return PartitionSpec(Schema(), 0, []) + return PartitionSpec(Schema(), 0, [], PartitionSpec.PARTITION_DATA_ID_START - 1) @staticmethod def unpartitioned(): return PartitionSpec.UNPARTITIONED_SPEC() - def __init__(self, schema, spec_id, fields): + def __init__(self, schema, spec_id, fields, last_assigned_field_id): self.fields_by_source_id = None self.fields_by_name = None self.__java_classes = None @@ -49,6 +49,7 @@ def __init__(self, schema, spec_id, fields): self.__fields = list() for field in fields: self.__fields.append(field) + self.last_assigned_field_id = last_assigned_field_id @property def fields(self): @@ -70,10 +71,10 @@ def get_field_by_source_id(self, field_id): def partition_type(self): struct_fields = list() - for i, field in enumerate(self.__fields): + for _i, field in enumerate(self.__fields): source_type = self.schema.find_type(field.source_id) result_type = field.transform.get_result_type(source_type) - struct_fields.append(NestedField.optional(PartitionSpec.PARTITION_DATA_ID_START + i, + struct_fields.append(NestedField.optional(field.field_id, field.name, result_type)) @@ -170,9 +171,10 @@ def __repr__(self): sb = ["["] for field in self.__fields: - sb.append("\n {name}: {transform}({source_id})".format(name=field.name, - transform=str(field.transform), - source_id=field.source_id)) + sb.append("\n {field_id}: {name}: {transform}({source_id})".format(field_id=field.field_id, + name=field.name, + transform=str(field.transform), + source_id=field.source_id)) if len(self.__fields) > 0: sb.append("\n") @@ -201,6 +203,11 @@ def __init__(self, schema): self.fields = list() self.partition_names = set() self.spec_id = 0 + self.last_assigned_field_id = PartitionSpec.PARTITION_DATA_ID_START - 1 + + def __next_field_id(self): + self.last_assigned_field_id = self.last_assigned_field_id + 1 + return self.last_assigned_field_id def with_spec_id(self, spec_id): self.spec_id = spec_id @@ -226,6 +233,7 @@ def identity(self, source_name): self.check_and_add_partition_name(source_name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), source_name, Transforms.identity(source_column.type))) return self @@ -235,6 +243,7 @@ def year(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.year(source_column.types))) return self @@ -244,6 +253,7 @@ def month(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.month(source_column.types))) return self @@ -253,6 +263,7 @@ def day(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.day(source_column.types))) return self @@ -262,6 +273,7 @@ def hour(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.hour(source_column.type))) return self @@ -271,6 +283,7 @@ def bucket(self, source_name, num_buckets): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.bucket(source_column.type, num_buckets))) return self @@ -280,11 +293,15 @@ def truncate(self, source_name, width): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.truncate(source_column.types, width))) return self - def add(self, source_id, name, transform): + def add_without_field_id(self, source_id, name, transform): + return self.add(source_id, self.__next_field_id(), name, transform) + + def add(self, source_id, field_id, name, transform): self.check_and_add_partition_name(name) column = self.schema.find_field(source_id) if column is None: @@ -292,12 +309,14 @@ def add(self, source_id, name, transform): transform_obj = Transforms.from_string(column.type, transform) field = PartitionField(source_id, + field_id, name, transform_obj) self.fields.append(field) + self.last_assigned_field_id = max(self.last_assigned_field_id, field_id) return self def build(self): - spec = PartitionSpec(self.schema, self.spec_id, self.fields) + spec = PartitionSpec(self.schema, self.spec_id, self.fields, self.last_assigned_field_id) PartitionSpec.check_compatibility(spec, self.schema) return spec diff --git a/python/iceberg/api/transforms/transforms.py b/python/iceberg/api/transforms/transforms.py index c14d84930f6b..53e4a1523b9a 100644 --- a/python/iceberg/api/transforms/transforms.py +++ b/python/iceberg/api/transforms/transforms.py @@ -47,7 +47,7 @@ def from_string(type, transform): if match is not None: name = match.group(1) - w = match.group(2) + w = int(match.group(2)) if name.lower() == "truncate": return Truncate.get(type, w) elif name.lower() == "bucket": diff --git a/python/iceberg/core/partition_spec_parser.py b/python/iceberg/core/partition_spec_parser.py index 600904d53af1..ab57079c55b1 100644 --- a/python/iceberg/core/partition_spec_parser.py +++ b/python/iceberg/core/partition_spec_parser.py @@ -25,6 +25,7 @@ class PartitionSpecParser(object): SPEC_ID = "spec-id" FIELDS = "fields" SOURCE_ID = "source-id" + FIELD_ID = "field-id" TRANSFORM = "transform" NAME = "name" @@ -41,7 +42,8 @@ def to_dict(spec): def to_json_fields(spec): return [{PartitionSpecParser.NAME: field.name, PartitionSpecParser.TRANSFORM: str(field.transform), - PartitionSpecParser.SOURCE_ID: field.source_id} + PartitionSpecParser.SOURCE_ID: field.source_id, + PartitionSpecParser.FIELD_ID: field.field_id} for field in spec.fields] @staticmethod @@ -56,18 +58,8 @@ def from_json(schema, json_obj): builder = PartitionSpec.builder_for(schema).with_spec_id(spec_id) fields = json_obj.get(PartitionSpecParser.FIELDS) - if not isinstance(fields, (list, tuple)): - raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % fields) - for element in fields: - if not isinstance(element, dict): - raise RuntimeError("Cannot parse partition field, not an object: %s" % element) - - builder.add(element.get(PartitionSpecParser.SOURCE_ID), - element.get(PartitionSpecParser.NAME), - element.get(PartitionSpecParser.TRANSFORM)) - - return builder.build() + return PartitionSpecParser.__build_from_json_fields(builder, fields) @staticmethod def from_json_fields(schema, spec_id, json_obj): @@ -76,14 +68,31 @@ def from_json_fields(schema, spec_id, json_obj): if isinstance(json_obj, str): json_obj = json.loads(json_obj) - if not isinstance(json_obj, list): - raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % json_obj) + return PartitionSpecParser.__build_from_json_fields(builder, json_obj) + + @staticmethod + def __build_from_json_fields(builder, json_fields): + if not isinstance(json_fields, (list, tuple)): + raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % json_fields) + + field_id_count = 0 + for element in json_fields: + if not isinstance(element, dict): + raise RuntimeError("Cannot parse partition field, not an object: %s" % element) - for item in json_obj: - if not isinstance(item, dict): - raise RuntimeError("Cannot parse partition field, not an object: %s" % json_obj) - builder.add(item.get(PartitionSpecParser.SOURCE_ID), - item.get(PartitionSpecParser.NAME), - item.get(PartitionSpecParser.TRANSFORM)) + if element.get(PartitionSpecParser.FIELD_ID) is not None: + builder.add(element.get(PartitionSpecParser.SOURCE_ID), + element.get(PartitionSpecParser.FIELD_ID), + element.get(PartitionSpecParser.NAME), + element.get(PartitionSpecParser.TRANSFORM)) + field_id_count = field_id_count + 1 + else: + builder.add_without_field_id(element.get(PartitionSpecParser.SOURCE_ID), + element.get(PartitionSpecParser.NAME), + element.get(PartitionSpecParser.TRANSFORM)) + + if field_id_count > 0 and field_id_count != len(json_fields): + raise RuntimeError("Cannot parse spec with missing field IDs: %s missing of %s fields." % + (len(json_fields) - field_id_count, len(json_fields))) return builder.build() diff --git a/python/tests/core/test_partition_spec_parser.py b/python/tests/core/test_partition_spec_parser.py index 53285362f6cd..256186cb84c2 100644 --- a/python/tests/core/test_partition_spec_parser.py +++ b/python/tests/core/test_partition_spec_parser.py @@ -16,21 +16,95 @@ # under the License. from iceberg.api import PartitionSpec, Schema -from iceberg.api.types import IntegerType, NestedField, StringType +from iceberg.api.types import DecimalType, IntegerType, NestedField, StringType from iceberg.core import PartitionSpecParser +import pytest def test_to_json_conversion(): spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), - NestedField.required(2, "data", StringType.get())) + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) - spec = PartitionSpec\ + spec = PartitionSpec \ .builder_for(spec_schema) \ - .identity("id")\ - .bucket("data", 16)\ + .identity("id") \ + .bucket("data", 16) \ + .add_without_field_id(2, "data1", "bucket[16]") \ + .add(2, 1010, "data2", "bucket[8]") \ + .bucket("num", 8) \ .build() expected = '{"spec-id": 0, "fields": [' \ - '{"name": "id", "transform": "identity", "source-id": 1}, ' \ - '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2}]}' + '{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2, "field-id": 1002}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2, "field-id": 1010}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3, "field-id": 1011}]}' assert expected == PartitionSpecParser.to_json(spec) + + +def test_from_json_conversion_with_field_ids(): + spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) + + spec_string = '{"spec-id": 0, "fields": [' \ + '{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2, "field-id": 1002}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2, "field-id": 1010}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3, "field-id": 1011}]}' + + spec = PartitionSpecParser.from_json(spec_schema, spec_string) + + expected_spec = PartitionSpec \ + .builder_for(spec_schema) \ + .identity("id") \ + .bucket("data", 16) \ + .add_without_field_id(2, "data1", "bucket[16]") \ + .add(2, 1010, "data2", "bucket[8]") \ + .bucket("num", 8) \ + .build() + assert expected_spec == spec + + +def test_from_json_conversion_without_field_ids(): + spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) + + spec_string = '{"spec-id": 0, "fields": [' \ + '{"name": "id", "transform": "identity", "source-id": 1}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3}]}' + + spec = PartitionSpecParser.from_json(spec_schema, spec_string) + + expected_spec = PartitionSpec \ + .builder_for(spec_schema) \ + .identity("id") \ + .bucket("data", 16) \ + .add_without_field_id(2, "data1", "bucket[16]") \ + .add(2, 1003, "data2", "bucket[8]") \ + .bucket("num", 8) \ + .build() + assert expected_spec == spec + + +def test_raise_exception_with_invalid_json(): + spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) + + spec_string = '{"spec-id": 0, "fields": [' \ + '{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3}]}' + + with pytest.raises(RuntimeError): + PartitionSpecParser.from_json(spec_schema, spec_string)