diff --git a/.gitignore b/.gitignore index 13767d622f82..f1e459c8cb0a 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,6 @@ bin/ # Hive/metastore files metastore_db/ + +# Python stuff +python/.mypy_cache/ diff --git a/build.gradle b/build.gradle index 67ea9f5a6d08..1bd68a574309 100644 --- a/build.gradle +++ b/build.gradle @@ -99,17 +99,11 @@ subprojects { } } -apply from: 'baseline.gradle' -apply from: 'deploy.gradle' -apply from: 'tasks.gradle' -apply from: 'jmh.gradle' - project(':iceberg-bundled-guava') { apply plugin: 'com.github.johnrengelman.shadow' - - tasks.assemble.dependsOn tasks.shadowJar - tasks.install.dependsOn tasks.shadowJar - + + tasks.jar.dependsOn tasks.shadowJar + dependencies { compileOnly('com.google.guava:guava') { exclude group: 'com.google.code.findbugs' @@ -118,13 +112,9 @@ project(':iceberg-bundled-guava') { exclude group: 'com.google.j2objc' } } - - jar { - enabled = false - } - + shadowJar { - classifier = null + classifier null configurations = [project.configurations.compileOnly] zip64 true @@ -144,6 +134,10 @@ project(':iceberg-bundled-guava') { minimize() } + + jar { + classifier 'empty' + } } project(':iceberg-api') { @@ -428,12 +422,11 @@ project(':iceberg-pig') { project(':iceberg-spark-runtime') { apply plugin: 'com.github.johnrengelman.shadow' - tasks.assemble.dependsOn tasks.shadowJar - tasks.install.dependsOn tasks.shadowJar - tasks.javadocJar.dependsOn tasks.shadowJar + tasks.jar.dependsOn tasks.shadowJar configurations { - compileOnly { + compile { + exclude group: 'org.apache.spark' // included in Spark exclude group: 'org.slf4j' exclude group: 'org.apache.commons' @@ -446,12 +439,12 @@ project(':iceberg-spark-runtime') { } dependencies { - compileOnly project(':iceberg-spark') + compile project(':iceberg-spark') + compile 'org.apache.spark:spark-hive_2.11' } shadowJar { - // shade compileOnly dependencies to avoid including in transitive dependencies - configurations = [project.configurations.compileOnly] + configurations = [project.configurations.compile] zip64 true @@ -481,7 +474,11 @@ project(':iceberg-spark-runtime') { relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow' relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch' - archiveName = "iceberg-spark-runtime-${version}.${extension}" + classifier null + } + + jar { + classifier = 'empty' } } @@ -519,3 +516,9 @@ String getJavadocVersion() { throw new Exception("Neither version.txt nor git version exists") } } + +apply from: 'baseline.gradle' +apply from: 'deploy.gradle' +apply from: 'tasks.gradle' +apply from: 'jmh.gradle' + diff --git a/bundled-guava/LICENSE b/bundled-guava/LICENSE index 13464a315a90..597de019cdd7 100644 --- a/bundled-guava/LICENSE +++ b/bundled-guava/LICENSE @@ -209,4 +209,3 @@ Copyright: 2006-2019 The Guava Authors Home page: https://github.com/google/guava License: http://www.apache.org/licenses/LICENSE-2.0 --------------------------------------------------------------------------------- diff --git a/bundled-guava/NOTICE b/bundled-guava/NOTICE index cefcafb2c4c1..0f0fa79e5924 100644 --- a/bundled-guava/NOTICE +++ b/bundled-guava/NOTICE @@ -5,8 +5,3 @@ Copyright 2017-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). --------------------------------------------------------------------------------- -| This product includes software from Google Guava (Apache 2.0) -| * Copyright (C) 2007 The Guava Authors -| * https://github.com/google/guava --------------------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 4dae4155bb8b..e2c73782bf12 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.stream.LongStream; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -87,9 +88,15 @@ public class TableTestBase { public TestTables.TestTable table = null; protected final int formatVersion; + @SuppressWarnings("checkstyle:MemberName") + protected final Assertions V1Assert; + @SuppressWarnings("checkstyle:MemberName") + protected final Assertions V2Assert; public TableTestBase(int formatVersion) { this.formatVersion = formatVersion; + this.V1Assert = new Assertions(1, formatVersion); + this.V2Assert = new Assertions(2, formatVersion); } @Before @@ -209,6 +216,14 @@ ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFi } void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) { + validateSnapshot(old, snap, null, newFiles); + } + + void validateSnapshot(Snapshot old, Snapshot snap, long sequenceNumber, DataFile... newFiles) { + validateSnapshot(old, snap, (Long) sequenceNumber, newFiles); + } + + void validateSnapshot(Snapshot old, Snapshot snap, Long sequenceNumber, DataFile... newFiles) { List oldManifests = old != null ? old.manifests() : ImmutableList.of(); // copy the manifests to a modifiable list and remove the existing manifests @@ -227,6 +242,10 @@ void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) { for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); + if (sequenceNumber != null) { + V1Assert.assertEquals("Sequence number should default to 0", 0, entry.sequenceNumber().longValue()); + V2Assert.assertEquals("Sequence number should match expected", sequenceNumber, entry.sequenceNumber()); + } Assert.assertEquals("Path should match expected", newPaths.next(), file.path().toString()); Assert.assertEquals("File's snapshot ID should match", id, (long) entry.snapshotId()); } @@ -254,12 +273,23 @@ List paths(DataFile... dataFiles) { return paths; } - static void validateManifest(ManifestFile manifest, - Iterator ids, - Iterator expectedFiles) { + void validateManifest(ManifestFile manifest, + Iterator ids, + Iterator expectedFiles) { + validateManifest(manifest, null, ids, expectedFiles); + } + + void validateManifest(ManifestFile manifest, + Iterator seqs, + Iterator ids, + Iterator expectedFiles) { for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); + if (seqs != null) { + V1Assert.assertEquals("Sequence number should default to 0", 0, entry.sequenceNumber().longValue()); + V2Assert.assertEquals("Sequence number should match expected", seqs.next(), entry.sequenceNumber()); + } Assert.assertEquals("Path should match expected", expected.path().toString(), file.path().toString()); Assert.assertEquals("Snapshot ID should match expected ID", @@ -292,6 +322,10 @@ static Iterator statuses(ManifestEntry.Status... statuses) return Iterators.forArray(statuses); } + static Iterator seqs(long... seqs) { + return LongStream.of(seqs).iterator(); + } + static Iterator ids(Long... ids) { return Iterators.forArray(ids); } @@ -303,4 +337,33 @@ static Iterator files(DataFile... files) { static Iterator files(ManifestFile manifest) { return ManifestFiles.read(manifest, FILE_IO).iterator(); } + + /** + * Used for assertions that only apply if the table version is v2. + */ + protected static class Assertions { + private final boolean enabled; + + private Assertions(int validForVersion, int formatVersion) { + this.enabled = validForVersion == formatVersion; + } + + void assertEquals(String context, int expected, int actual) { + if (enabled) { + Assert.assertEquals(context, expected, actual); + } + } + + void assertEquals(String context, long expected, long actual) { + if (enabled) { + Assert.assertEquals(context, expected, actual); + } + } + + void assertEquals(String context, Object expected, Object actual) { + if (enabled) { + Assert.assertEquals(context, expected, actual); + } + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index 2bda21dccd29..1cc478d11ffe 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -52,13 +52,21 @@ public void testEmptyTableAppend() { TableMetadata base = readMetadata(); Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + Assert.assertEquals("Table should start with last-sequence-number 0", 0, base.lastSequenceNumber()); - Snapshot pending = table.newFastAppend() + table.newFastAppend() .appendFile(FILE_A) .appendFile(FILE_B) - .apply(); + .commit(); + + Snapshot snap = table.currentSnapshot(); + + validateSnapshot(base.currentSnapshot(), snap, 1, FILE_A, FILE_B); - validateSnapshot(base.currentSnapshot(), pending, FILE_A, FILE_B); + V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap.sequenceNumber()); + V2Assert.assertEquals("Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); + + V1Assert.assertEquals("Table should end with last-sequence-number 0", 0, base.lastSequenceNumber()); } @Test @@ -67,17 +75,25 @@ public void testEmptyTableAppendManifest() throws IOException { TableMetadata base = readMetadata(); Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + Assert.assertEquals("Table should start with last-sequence-number 0", 0, base.lastSequenceNumber()); ManifestFile manifest = writeManifest(FILE_A, FILE_B); - Snapshot pending = table.newFastAppend() + table.newFastAppend() .appendManifest(manifest) - .apply(); + .commit(); - validateSnapshot(base.currentSnapshot(), pending, FILE_A, FILE_B); + Snapshot snap = table.currentSnapshot(); + + validateSnapshot(base.currentSnapshot(), snap, 1, FILE_A, FILE_B); // validate that the metadata summary is correct when using appendManifest Assert.assertEquals("Summary metadata should include 2 added files", - "2", pending.summary().get("added-data-files")); + "2", snap.summary().get("added-data-files")); + + V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap.sequenceNumber()); + V2Assert.assertEquals("Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); + + V1Assert.assertEquals("Table should end with last-sequence-number 0", 0, base.lastSequenceNumber()); } @Test @@ -86,22 +102,32 @@ public void testEmptyTableAppendFilesAndManifest() throws IOException { TableMetadata base = readMetadata(); Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + Assert.assertEquals("Table should start with last-sequence-number 0", 0, base.lastSequenceNumber()); ManifestFile manifest = writeManifest(FILE_A, FILE_B); - Snapshot pending = table.newFastAppend() + table.newFastAppend() .appendFile(FILE_C) .appendFile(FILE_D) .appendManifest(manifest) - .apply(); + .commit(); + + Snapshot snap = table.currentSnapshot(); - long pendingId = pending.snapshotId(); + long commitId = snap.snapshotId(); - validateManifest(pending.manifests().get(0), - ids(pendingId, pendingId), + validateManifest(snap.manifests().get(0), + seqs(1, 1), + ids(commitId, commitId), files(FILE_C, FILE_D)); - validateManifest(pending.manifests().get(1), - ids(pendingId, pendingId), + validateManifest(snap.manifests().get(1), + seqs(1, 1), + ids(commitId, commitId), files(FILE_A, FILE_B)); + + V2Assert.assertEquals("Snapshot sequence number should be 1", 1, snap.sequenceNumber()); + V2Assert.assertEquals("Last sequence number should be 1", 1, readMetadata().lastSequenceNumber()); + + V1Assert.assertEquals("Table should end with last-sequence-number 0", 0, base.lastSequenceNumber()); } @Test diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 2f6f3b07933d..07419d57df8d 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -245,6 +245,8 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy } case TIME_MICROS: return new TimeReader(desc); + case TIME_MILLIS: + return new TimeMillisReader(desc); case DECIMAL: DecimalMetadata decimal = primitive.getDecimalMetadata(); switch (primitive.getPrimitiveTypeName()) { @@ -356,6 +358,17 @@ public OffsetDateTime read(OffsetDateTime reuse) { } } + private static class TimeMillisReader extends PrimitiveReader { + private TimeMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalTime read(LocalTime reuse) { + return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); + } + } + private static class TimeReader extends PrimitiveReader { private TimeReader(ColumnDescriptor desc) { super(desc); diff --git a/deploy.gradle b/deploy.gradle index 29e5a1f21231..82d91c26f865 100644 --- a/deploy.gradle +++ b/deploy.gradle @@ -57,7 +57,11 @@ subprojects { publishing { publications { apache(MavenPublication) { - from components.java + if (tasks.matching({task -> task.name == 'shadowJar'}).isEmpty()) { + from components.java + } else { + project.shadow.component(it) + } artifact sourceJar artifact javadocJar diff --git a/python/iceberg/api/partition_field.py b/python/iceberg/api/partition_field.py index adaca2894370..8e789a10897b 100644 --- a/python/iceberg/api/partition_field.py +++ b/python/iceberg/api/partition_field.py @@ -18,8 +18,9 @@ class PartitionField(object): - def __init__(self, source_id, name, transform): + def __init__(self, source_id, field_id, name, transform): self.source_id = source_id + self.field_id = field_id self.name = name self.transform = transform @@ -29,7 +30,8 @@ def __eq__(self, other): elif other is None or not isinstance(other, PartitionField): return False - return self.source_id == other.source_id and self.name == other.name and self.transform == other.transform + return self.source_id == other.source_id and self.field_id == other.field_id and \ + self.name == other.name and self.transform == other.transform def __ne__(self, other): return not self.__eq__(other) @@ -38,4 +40,4 @@ def __hash__(self): return hash(self.__key()) def __key(self): - return PartitionField.__class__, self.source_id, self.name, self.transform + return PartitionField.__class__, self.source_id, self.field_id, self.name, self.transform diff --git a/python/iceberg/api/partition_spec.py b/python/iceberg/api/partition_spec.py index 95cce0e6a679..1451fbfe617b 100644 --- a/python/iceberg/api/partition_spec.py +++ b/python/iceberg/api/partition_spec.py @@ -32,13 +32,13 @@ class PartitionSpec(object): @staticmethod def UNPARTITIONED_SPEC(): - return PartitionSpec(Schema(), 0, []) + return PartitionSpec(Schema(), 0, [], PartitionSpec.PARTITION_DATA_ID_START - 1) @staticmethod def unpartitioned(): return PartitionSpec.UNPARTITIONED_SPEC() - def __init__(self, schema, spec_id, fields): + def __init__(self, schema, spec_id, fields, last_assigned_field_id): self.fields_by_source_id = None self.fields_by_name = None self.__java_classes = None @@ -49,6 +49,7 @@ def __init__(self, schema, spec_id, fields): self.__fields = list() for field in fields: self.__fields.append(field) + self.last_assigned_field_id = last_assigned_field_id @property def fields(self): @@ -70,10 +71,10 @@ def get_field_by_source_id(self, field_id): def partition_type(self): struct_fields = list() - for i, field in enumerate(self.__fields): + for _i, field in enumerate(self.__fields): source_type = self.schema.find_type(field.source_id) result_type = field.transform.get_result_type(source_type) - struct_fields.append(NestedField.optional(PartitionSpec.PARTITION_DATA_ID_START + i, + struct_fields.append(NestedField.optional(field.field_id, field.name, result_type)) @@ -170,9 +171,10 @@ def __repr__(self): sb = ["["] for field in self.__fields: - sb.append("\n {name}: {transform}({source_id})".format(name=field.name, - transform=str(field.transform), - source_id=field.source_id)) + sb.append("\n {field_id}: {name}: {transform}({source_id})".format(field_id=field.field_id, + name=field.name, + transform=str(field.transform), + source_id=field.source_id)) if len(self.__fields) > 0: sb.append("\n") @@ -201,6 +203,11 @@ def __init__(self, schema): self.fields = list() self.partition_names = set() self.spec_id = 0 + self.last_assigned_field_id = PartitionSpec.PARTITION_DATA_ID_START - 1 + + def __next_field_id(self): + self.last_assigned_field_id = self.last_assigned_field_id + 1 + return self.last_assigned_field_id def with_spec_id(self, spec_id): self.spec_id = spec_id @@ -226,6 +233,7 @@ def identity(self, source_name): self.check_and_add_partition_name(source_name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), source_name, Transforms.identity(source_column.type))) return self @@ -235,8 +243,9 @@ def year(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, - Transforms.year(source_column.types))) + Transforms.year(source_column.type))) return self def month(self, source_name): @@ -244,8 +253,9 @@ def month(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, - Transforms.month(source_column.types))) + Transforms.month(source_column.type))) return self def day(self, source_name): @@ -253,8 +263,9 @@ def day(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, - Transforms.day(source_column.types))) + Transforms.day(source_column.type))) return self def hour(self, source_name): @@ -262,6 +273,7 @@ def hour(self, source_name): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.hour(source_column.type))) return self @@ -271,6 +283,7 @@ def bucket(self, source_name, num_buckets): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, Transforms.bucket(source_column.type, num_buckets))) return self @@ -280,11 +293,15 @@ def truncate(self, source_name, width): self.check_and_add_partition_name(name) source_column = self.find_source_column(source_name) self.fields.append(PartitionField(source_column.field_id, + self.__next_field_id(), name, - Transforms.truncate(source_column.types, width))) + Transforms.truncate(source_column.type, width))) return self - def add(self, source_id, name, transform): + def add_without_field_id(self, source_id, name, transform): + return self.add(source_id, self.__next_field_id(), name, transform) + + def add(self, source_id, field_id, name, transform): self.check_and_add_partition_name(name) column = self.schema.find_field(source_id) if column is None: @@ -292,12 +309,14 @@ def add(self, source_id, name, transform): transform_obj = Transforms.from_string(column.type, transform) field = PartitionField(source_id, + field_id, name, transform_obj) self.fields.append(field) + self.last_assigned_field_id = max(self.last_assigned_field_id, field_id) return self def build(self): - spec = PartitionSpec(self.schema, self.spec_id, self.fields) + spec = PartitionSpec(self.schema, self.spec_id, self.fields, self.last_assigned_field_id) PartitionSpec.check_compatibility(spec, self.schema) return spec diff --git a/python/iceberg/api/transforms/bucket.py b/python/iceberg/api/transforms/bucket.py index c1672da4a4eb..64db7122089e 100644 --- a/python/iceberg/api/transforms/bucket.py +++ b/python/iceberg/api/transforms/bucket.py @@ -101,7 +101,7 @@ def hash(self, value): return Bucket.MURMUR3.hash(struct.pack("q", value)) def can_transform(self, type_var): - return type_var.type_id() in [TypeID.INTEGER, TypeID.DATE] + return type_var.type_id in [TypeID.INTEGER, TypeID.DATE] class BucketLong(Bucket): @@ -112,9 +112,9 @@ def hash(self, value): return Bucket.MURMUR3.hash(struct.pack("q", value)) def can_transform(self, type_var): - return type_var.type_id() in [TypeID.LONG, - TypeID.TIME, - TypeID.TIMESTAMP] + return type_var.type_id in [TypeID.LONG, + TypeID.TIME, + TypeID.TIMESTAMP] class BucketFloat(Bucket): @@ -125,7 +125,7 @@ def hash(self, value): return Bucket.MURMUR3.hash(struct.pack("d", value)) def can_transform(self, type_var): - return type_var.type_id() == TypeID.FLOAT + return type_var.type_id == TypeID.FLOAT class BucketDouble(Bucket): @@ -136,7 +136,7 @@ def hash(self, value): return Bucket.MURMUR3.hash(struct.pack("d", value)) def can_transform(self, type_var): - return type_var.type_id() == TypeID.DOUBLE + return type_var.type_id == TypeID.DOUBLE class BucketDecimal(Bucket): diff --git a/python/iceberg/api/transforms/dates.py b/python/iceberg/api/transforms/dates.py index 628281b20b81..474b986f696d 100644 --- a/python/iceberg/api/transforms/dates.py +++ b/python/iceberg/api/transforms/dates.py @@ -52,7 +52,7 @@ def apply(self, days): return apply_func(datetime.datetime.utcfromtimestamp(days * Dates.SECONDS_IN_DAY), Dates.EPOCH) def can_transform(self, type): - return type.type_id() == TypeID.DATE + return type.type_id == TypeID.DATE def get_result_type(self, source_type): return IntegerType.get() @@ -73,4 +73,4 @@ def to_human_string(self, value): return Dates.HUMAN_FUNCS[self.granularity](value) def __str__(self): - return "%s" % self + return self.name diff --git a/python/iceberg/api/transforms/timestamps.py b/python/iceberg/api/transforms/timestamps.py index 697cec67e594..25c4439bc179 100644 --- a/python/iceberg/api/transforms/timestamps.py +++ b/python/iceberg/api/transforms/timestamps.py @@ -50,7 +50,7 @@ def apply(self, value): return apply_func(datetime.datetime.utcfromtimestamp(value / 1000000), Timestamps.EPOCH) def can_transform(self, type_var): - return type_var == TypeID.TIMESTAMP + return type_var.type_id == TypeID.TIMESTAMP def get_result_type(self, source_type): return IntegerType.get() diff --git a/python/iceberg/api/transforms/transforms.py b/python/iceberg/api/transforms/transforms.py index c14d84930f6b..0cf243e0f912 100644 --- a/python/iceberg/api/transforms/transforms.py +++ b/python/iceberg/api/transforms/transforms.py @@ -42,22 +42,22 @@ def __init__(self): pass @staticmethod - def from_string(type, transform): + def from_string(type_var, transform): match = Transforms.HAS_WIDTH.match(transform) if match is not None: name = match.group(1) - w = match.group(2) + w = int(match.group(2)) if name.lower() == "truncate": - return Truncate.get(type, w) + return Truncate.get(type_var, w) elif name.lower() == "bucket": - return Bucket.get(type, w) + return Bucket.get(type_var, w) if transform.lower() == "identity": - return Identity.get(type) - elif type.type_id() == TypeID.TIMESTAMP: + return Identity.get(type_var) + elif type_var.type_id == TypeID.TIMESTAMP: return Timestamps(transform.lower(), transform.lower()) - elif type.type_id() == TypeID.DATE: + elif type_var.type_id == TypeID.DATE: return Dates(transform.lower(), transform.lower()) raise RuntimeError("Unknown transform: %s" % transform) @@ -108,4 +108,4 @@ def bucket(type_var, num_buckets): @staticmethod def truncate(type_var, width): - return Truncate.get(type, width) + return Truncate.get(type_var, width) diff --git a/python/iceberg/api/transforms/truncate.py b/python/iceberg/api/transforms/truncate.py index cd001ad44eac..b37e6f0b06dd 100644 --- a/python/iceberg/api/transforms/truncate.py +++ b/python/iceberg/api/transforms/truncate.py @@ -32,7 +32,7 @@ def get(type_var, width): if type_var.type_id == TypeID.INTEGER: return TruncateInteger(width) elif type_var.type_id == TypeID.LONG: - return TruncateInteger(width) + return TruncateLong(width) elif type_var.type_id == TypeID.DECIMAL: return TruncateDecimal(width) elif type_var.type_id == TypeID.STRING: diff --git a/python/iceberg/api/types/type_util.py b/python/iceberg/api/types/type_util.py index ca52a7db9c6f..68f0f595acf3 100644 --- a/python/iceberg/api/types/type_util.py +++ b/python/iceberg/api/types/type_util.py @@ -16,6 +16,7 @@ # under the License. import math +from typing import List from .type import (Type, TypeID) @@ -238,7 +239,6 @@ def get(self): return self.visitor.field(self.field, VisitFuture(self.field.type, self.visitor).get) -@staticmethod def decimal_required_bytes(precision): if precision < 0 or precision > 40: raise RuntimeError("Unsupported decimal precision: %s" % precision) @@ -451,11 +451,11 @@ def write_compatibility_errors(read_schema, write_schema): def read_compatibility_errors(read_schema, write_schema): visit(write_schema, CheckCompatibility(read_schema, False)) - NO_ERRORS = [] + NO_ERRORS: List[str] = [] def __init__(self, schema, check_ordering): self.schema = schema - self.check_ordering + self.check_ordering = check_ordering self.current_type = None def schema(self, schema, struct_result): @@ -498,10 +498,10 @@ def struct(self, struct, field_results): return errors - def field(self, field, field_result): + def field(self, field, field_result) -> List[str]: struct = self.current_type.as_struct_type() curr_field = struct.field(field.field_id) - errors = list() + errors = [] if curr_field is None: if not field.is_optional: diff --git a/python/iceberg/core/base_table_scan.py b/python/iceberg/core/base_table_scan.py index d205364aad15..e64038d3136a 100644 --- a/python/iceberg/core/base_table_scan.py +++ b/python/iceberg/core/base_table_scan.py @@ -34,9 +34,9 @@ class BaseTableScan(CloseableGroup, TableScan): DATE_FORMAT = "%Y-%m-%d %H:%M:%S.%f" - SNAPSHOT_COLUMNS = ["snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes", + SNAPSHOT_COLUMNS = ("snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes", "file_size_in_bytes", "record_count", "partition", "value_counts", "null_value_counts", - "lower_bounds", "upper_bounds"] + "lower_bounds", "upper_bounds") def new_refined_scan(self, ops, table, schema, snapshot_id, row_filter, case_sensitive, selected_columns, options, minused_cols): diff --git a/python/iceberg/core/manifest_reader.py b/python/iceberg/core/manifest_reader.py index 6a4581293a6f..e648684f29fe 100644 --- a/python/iceberg/core/manifest_reader.py +++ b/python/iceberg/core/manifest_reader.py @@ -33,8 +33,8 @@ class ManifestReader(CloseableGroup, Filterable): - ALL_COLUMNS = ["*"] - CHANGE_COLUMNS = ["file_path", "file_format", "partition", "record_count", "file_size_in_bytes"] + ALL_COLUMNS = ("*",) + CHANGE_COLUMNS = ("file_path", "file_format", "partition", "record_count", "file_size_in_bytes") @staticmethod def read(file, spec_lookup=None): diff --git a/python/iceberg/core/partition_spec_parser.py b/python/iceberg/core/partition_spec_parser.py index 600904d53af1..ab57079c55b1 100644 --- a/python/iceberg/core/partition_spec_parser.py +++ b/python/iceberg/core/partition_spec_parser.py @@ -25,6 +25,7 @@ class PartitionSpecParser(object): SPEC_ID = "spec-id" FIELDS = "fields" SOURCE_ID = "source-id" + FIELD_ID = "field-id" TRANSFORM = "transform" NAME = "name" @@ -41,7 +42,8 @@ def to_dict(spec): def to_json_fields(spec): return [{PartitionSpecParser.NAME: field.name, PartitionSpecParser.TRANSFORM: str(field.transform), - PartitionSpecParser.SOURCE_ID: field.source_id} + PartitionSpecParser.SOURCE_ID: field.source_id, + PartitionSpecParser.FIELD_ID: field.field_id} for field in spec.fields] @staticmethod @@ -56,18 +58,8 @@ def from_json(schema, json_obj): builder = PartitionSpec.builder_for(schema).with_spec_id(spec_id) fields = json_obj.get(PartitionSpecParser.FIELDS) - if not isinstance(fields, (list, tuple)): - raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % fields) - for element in fields: - if not isinstance(element, dict): - raise RuntimeError("Cannot parse partition field, not an object: %s" % element) - - builder.add(element.get(PartitionSpecParser.SOURCE_ID), - element.get(PartitionSpecParser.NAME), - element.get(PartitionSpecParser.TRANSFORM)) - - return builder.build() + return PartitionSpecParser.__build_from_json_fields(builder, fields) @staticmethod def from_json_fields(schema, spec_id, json_obj): @@ -76,14 +68,31 @@ def from_json_fields(schema, spec_id, json_obj): if isinstance(json_obj, str): json_obj = json.loads(json_obj) - if not isinstance(json_obj, list): - raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % json_obj) + return PartitionSpecParser.__build_from_json_fields(builder, json_obj) + + @staticmethod + def __build_from_json_fields(builder, json_fields): + if not isinstance(json_fields, (list, tuple)): + raise RuntimeError("Cannot parse partition spec fields, not an array: %s" % json_fields) + + field_id_count = 0 + for element in json_fields: + if not isinstance(element, dict): + raise RuntimeError("Cannot parse partition field, not an object: %s" % element) - for item in json_obj: - if not isinstance(item, dict): - raise RuntimeError("Cannot parse partition field, not an object: %s" % json_obj) - builder.add(item.get(PartitionSpecParser.SOURCE_ID), - item.get(PartitionSpecParser.NAME), - item.get(PartitionSpecParser.TRANSFORM)) + if element.get(PartitionSpecParser.FIELD_ID) is not None: + builder.add(element.get(PartitionSpecParser.SOURCE_ID), + element.get(PartitionSpecParser.FIELD_ID), + element.get(PartitionSpecParser.NAME), + element.get(PartitionSpecParser.TRANSFORM)) + field_id_count = field_id_count + 1 + else: + builder.add_without_field_id(element.get(PartitionSpecParser.SOURCE_ID), + element.get(PartitionSpecParser.NAME), + element.get(PartitionSpecParser.TRANSFORM)) + + if field_id_count > 0 and field_id_count != len(json_fields): + raise RuntimeError("Cannot parse spec with missing field IDs: %s missing of %s fields." % + (len(json_fields) - field_id_count, len(json_fields))) return builder.build() diff --git a/python/tests/core/test_partition_spec.py b/python/tests/core/test_partition_spec.py new file mode 100644 index 000000000000..f0f3b9b01a43 --- /dev/null +++ b/python/tests/core/test_partition_spec.py @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from iceberg.api import PartitionSpec, Schema +from iceberg.api.types import (BinaryType, + DateType, + DecimalType, + FixedType, + IntegerType, + LongType, + NestedField, + StringType, + TimestampType, + TimeType, + UUIDType) + + +def test_to_json_conversion(): + spec_schema = Schema(NestedField.required(1, "i", IntegerType.get()), + NestedField.required(2, "l", LongType.get()), + NestedField.required(3, "d", DateType.get()), + NestedField.required(4, "t", TimeType.get()), + NestedField.required(5, "ts", TimestampType.without_timezone()), + NestedField.required(6, "dec", DecimalType.of(9, 2)), + NestedField.required(7, "s", StringType.get()), + NestedField.required(8, "u", UUIDType.get()), + NestedField.required(9, "f", FixedType.of_length(3)), + NestedField.required(10, "b", BinaryType.get())) + + specs = [ + PartitionSpec.builder_for(spec_schema).identity("i").build(), + PartitionSpec.builder_for(spec_schema).identity("l").build(), + PartitionSpec.builder_for(spec_schema).identity("d").build(), + PartitionSpec.builder_for(spec_schema).identity("t").build(), + PartitionSpec.builder_for(spec_schema).identity("ts").build(), + PartitionSpec.builder_for(spec_schema).identity("dec").build(), + PartitionSpec.builder_for(spec_schema).identity("s").build(), + PartitionSpec.builder_for(spec_schema).identity("u").build(), + PartitionSpec.builder_for(spec_schema).identity("f").build(), + PartitionSpec.builder_for(spec_schema).identity("b").build(), + PartitionSpec.builder_for(spec_schema).bucket("i", 128).build(), + PartitionSpec.builder_for(spec_schema).bucket("l", 128).build(), + PartitionSpec.builder_for(spec_schema).bucket("d", 128).build(), + PartitionSpec.builder_for(spec_schema).bucket("t", 128).build(), + PartitionSpec.builder_for(spec_schema).bucket("ts", 128).build(), + PartitionSpec.builder_for(spec_schema).bucket("dec", 128).build(), + PartitionSpec.builder_for(spec_schema).bucket("s", 128).build(), + PartitionSpec.builder_for(spec_schema).year("d").build(), + PartitionSpec.builder_for(spec_schema).month("d").build(), + PartitionSpec.builder_for(spec_schema).day("d").build(), + PartitionSpec.builder_for(spec_schema).year("ts").build(), + PartitionSpec.builder_for(spec_schema).month("ts").build(), + PartitionSpec.builder_for(spec_schema).day("ts").build(), + PartitionSpec.builder_for(spec_schema).hour("ts").build(), + PartitionSpec.builder_for(spec_schema).truncate("i", 10).build(), + PartitionSpec.builder_for(spec_schema).truncate("l", 10).build(), + PartitionSpec.builder_for(spec_schema).truncate("dec", 10).build(), + PartitionSpec.builder_for(spec_schema).truncate("s", 10).build(), + PartitionSpec.builder_for(spec_schema).add_without_field_id(6, "dec_bucket", "bucket[16]").build(), + PartitionSpec.builder_for(spec_schema).add(6, 1011, "dec_bucket", "bucket[16]").build(), + ] + + expected_spec_strs = [ + "[\n 1000: i: identity(1)\n]", + "[\n 1000: l: identity(2)\n]", + "[\n 1000: d: identity(3)\n]", + "[\n 1000: t: identity(4)\n]", + "[\n 1000: ts: identity(5)\n]", + "[\n 1000: dec: identity(6)\n]", + "[\n 1000: s: identity(7)\n]", + "[\n 1000: u: identity(8)\n]", + "[\n 1000: f: identity(9)\n]", + "[\n 1000: b: identity(10)\n]", + "[\n 1000: i_bucket: bucket[128](1)\n]", + "[\n 1000: l_bucket: bucket[128](2)\n]", + "[\n 1000: d_bucket: bucket[128](3)\n]", + "[\n 1000: t_bucket: bucket[128](4)\n]", + "[\n 1000: ts_bucket: bucket[128](5)\n]", + "[\n 1000: dec_bucket: bucket[128](6)\n]", + "[\n 1000: s_bucket: bucket[128](7)\n]", + "[\n 1000: d_year: year(3)\n]", + "[\n 1000: d_month: month(3)\n]", + "[\n 1000: d_day: day(3)\n]", + "[\n 1000: ts_year: year(5)\n]", + "[\n 1000: ts_month: month(5)\n]", + "[\n 1000: ts_day: day(5)\n]", + "[\n 1000: ts_hour: hour(5)\n]", + "[\n 1000: i_truncate: truncate[10](1)\n]", + "[\n 1000: l_truncate: truncate[10](2)\n]", + "[\n 1000: dec_truncate: truncate[10](6)\n]", + "[\n 1000: s_truncate: truncate[10](7)\n]", + "[\n 1000: dec_bucket: bucket[16](6)\n]", + "[\n 1011: dec_bucket: bucket[16](6)\n]", + ] + + for (spec, expected_spec_str) in zip(specs, expected_spec_strs): + assert str(spec) == expected_spec_str diff --git a/python/tests/core/test_partition_spec_parser.py b/python/tests/core/test_partition_spec_parser.py index 53285362f6cd..256186cb84c2 100644 --- a/python/tests/core/test_partition_spec_parser.py +++ b/python/tests/core/test_partition_spec_parser.py @@ -16,21 +16,95 @@ # under the License. from iceberg.api import PartitionSpec, Schema -from iceberg.api.types import IntegerType, NestedField, StringType +from iceberg.api.types import DecimalType, IntegerType, NestedField, StringType from iceberg.core import PartitionSpecParser +import pytest def test_to_json_conversion(): spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), - NestedField.required(2, "data", StringType.get())) + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) - spec = PartitionSpec\ + spec = PartitionSpec \ .builder_for(spec_schema) \ - .identity("id")\ - .bucket("data", 16)\ + .identity("id") \ + .bucket("data", 16) \ + .add_without_field_id(2, "data1", "bucket[16]") \ + .add(2, 1010, "data2", "bucket[8]") \ + .bucket("num", 8) \ .build() expected = '{"spec-id": 0, "fields": [' \ - '{"name": "id", "transform": "identity", "source-id": 1}, ' \ - '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2}]}' + '{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2, "field-id": 1002}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2, "field-id": 1010}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3, "field-id": 1011}]}' assert expected == PartitionSpecParser.to_json(spec) + + +def test_from_json_conversion_with_field_ids(): + spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) + + spec_string = '{"spec-id": 0, "fields": [' \ + '{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2, "field-id": 1002}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2, "field-id": 1010}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3, "field-id": 1011}]}' + + spec = PartitionSpecParser.from_json(spec_schema, spec_string) + + expected_spec = PartitionSpec \ + .builder_for(spec_schema) \ + .identity("id") \ + .bucket("data", 16) \ + .add_without_field_id(2, "data1", "bucket[16]") \ + .add(2, 1010, "data2", "bucket[8]") \ + .bucket("num", 8) \ + .build() + assert expected_spec == spec + + +def test_from_json_conversion_without_field_ids(): + spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) + + spec_string = '{"spec-id": 0, "fields": [' \ + '{"name": "id", "transform": "identity", "source-id": 1}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3}]}' + + spec = PartitionSpecParser.from_json(spec_schema, spec_string) + + expected_spec = PartitionSpec \ + .builder_for(spec_schema) \ + .identity("id") \ + .bucket("data", 16) \ + .add_without_field_id(2, "data1", "bucket[16]") \ + .add(2, 1003, "data2", "bucket[8]") \ + .bucket("num", 8) \ + .build() + assert expected_spec == spec + + +def test_raise_exception_with_invalid_json(): + spec_schema = Schema(NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "data", StringType.get()), + NestedField.required(3, "num", DecimalType.of(9, 2))) + + spec_string = '{"spec-id": 0, "fields": [' \ + '{"name": "id", "transform": "identity", "source-id": 1, "field-id": 1000}, ' \ + '{"name": "data_bucket", "transform": "bucket[16]", "source-id": 2, "field-id": 1001}, ' \ + '{"name": "data1", "transform": "bucket[16]", "source-id": 2}, ' \ + '{"name": "data2", "transform": "bucket[8]", "source-id": 2}, ' \ + '{"name": "num_bucket", "transform": "bucket[8]", "source-id": 3}]}' + + with pytest.raises(RuntimeError): + PartitionSpecParser.from_json(spec_schema, spec_string) diff --git a/python/tox.ini b/python/tox.ini index 9c49b0bd33cc..b011511130b2 100644 --- a/python/tox.ini +++ b/python/tox.ini @@ -39,9 +39,11 @@ deps = . {[testenv:flake8]deps} {[testenv:bandit]deps} + {[testenv:mypy]deps} commands = {[testenv:flake8]commands} {[testenv:bandit]commands} + {[testenv:mypy]commands} [testenv:flake8] basepython = python3 @@ -53,6 +55,14 @@ deps = commands = flake8 iceberg setup.py tests +[testenv:mypy] +basepython = python3 +skip_install = true +deps = + mypy +commands = + mypy --ignore-missing-imports iceberg/ + [testenv:bandit] basepython = python3 skip_install = true @@ -77,9 +87,6 @@ commands = # commands = # python -m http.server {posargs} -[bandit] -skips = B104 - [flake8] ignore = E501,W503 exclude = diff --git a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java index 7909c4302b46..e32636414f56 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java +++ b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java @@ -19,6 +19,7 @@ package org.apache.iceberg.actions; +import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -48,6 +49,9 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.util.SerializableConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +73,14 @@ public class RemoveOrphanFilesAction extends BaseAction> { private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class); + private static final UserDefinedFunction filename = functions.udf((String path) -> { + int lastIndex = path.lastIndexOf(File.separator); + if (lastIndex == -1) { + return path; + } else { + return path.substring(lastIndex + 1); + } + }, DataTypes.StringType); private final SparkSession spark; private final JavaSparkContext sparkContext; @@ -141,7 +153,10 @@ public List execute() { Dataset validFileDF = validDataFileDF.union(validMetadataFileDF); Dataset actualFileDF = buildActualFileDF(); - Column joinCond = validFileDF.col("file_path").equalTo(actualFileDF.col("file_path")); + Column nameEqual = filename.apply(actualFileDF.col("file_path")) + .equalTo(filename.apply(validFileDF.col("file_path"))); + Column actualContains = actualFileDF.col("file_path").contains(validFileDF.col("file_path")); + Column joinCond = nameEqual.and(actualContains); List orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti") .as(Encoders.STRING()) .collectAsList(); diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java index b28ff145efa9..8e71b0e287f6 100644 --- a/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java +++ b/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java @@ -85,11 +85,12 @@ public static void stopSpark() { @Rule public TemporaryFolder temp = new TemporaryFolder(); + private File tableDir = null; private String tableLocation = null; @Before public void setupTableLocation() throws Exception { - File tableDir = temp.newFolder(); + this.tableDir = temp.newFolder(); this.tableLocation = tableDir.toURI().toString(); } @@ -491,4 +492,54 @@ private List snapshotFiles(long snapshotId) { .as(Encoders.STRING()) .collectAsList(); } + + @Test + public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, InterruptedException { + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableDir.getAbsolutePath()); + + List records = Lists.newArrayList( + new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA") + ); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3") + .write() + .format("iceberg") + .mode("append") + .save(tableDir.getAbsolutePath()); + + List validFiles = spark.read().format("iceberg") + .load(tableLocation + "#files") + .select("file_path") + .as(Encoders.STRING()) + .collectAsList(); + Assert.assertEquals("Should be 1 valid files", 1, validFiles.size()); + String validFile = validFiles.get(0); + + df.write().mode("append").parquet(tableLocation + "/data"); + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List allFiles = Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map(file -> file.getPath().toString()) + .collect(Collectors.toList()); + Assert.assertEquals("Should be 2 files", 2, allFiles.size()); + + List invalidFiles = Lists.newArrayList(allFiles); + invalidFiles.removeIf(file -> file.contains(validFile)); + Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size()); + + // sleep for 1 second to unsure files will be old enough + Thread.sleep(1000); + + Actions actions = Actions.forTable(table); + List result = actions.removeOrphanFiles() + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> { }) + .execute(); + Assert.assertEquals("Action should find 1 file", invalidFiles, result); + Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); + } }