From abca85dbfa6ef525286189e3e9b8bd2169d542c1 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 14 Mar 2023 21:35:38 +0530 Subject: [PATCH 1/4] Docs: Add partition stats spec Address new comments remove trailing whitespaces --- format/spec.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/format/spec.md b/format/spec.md index 60c0f99c3f90..ffc66510b308 100644 --- a/format/spec.md +++ b/format/spec.md @@ -671,6 +671,7 @@ Table metadata consists of the following fields: | _optional_ | _required_ | **`default-sort-order-id`**| Default sort order id of the table. Note that this could be used by writers, but is not used when reading because reads use the specs stored in manifest files. | | | _optional_ | **`refs`** | A map of snapshot references. The map keys are the unique snapshot reference names in the table, and the map values are snapshot reference objects. There is always a `main` branch reference pointing to the `current-snapshot-id` even if the `refs` map is null. | | _optional_ | _optional_ | **`statistics`** | A list (optional) of [table statistics](#table-statistics). | +| _optional_ | _optional_ | **`partition-statistics`** | A list (optional) of [partition statistics](#partition-statistics). | For serialization details, see Appendix C. @@ -702,6 +703,45 @@ Blob metadata is a struct with the following fields: | _optional_ | _optional_ | **`properties`** | `map` | Additional properties associated with the statistic. Subset of Blob properties in the Puffin file. | +#### Partition statistics + +Partition statistics files are based on [Partition Statistics file spec](#partition-statistics-file). Partition statistics are informational. A reader can choose to +ignore partition statistics information. Partition statistics support is not required to read the table correctly. A table can contain +many partition statistics files associated with different table snapshots. +A writer can optionally write the partition statistics file during each write operation. If the statistics file is written for the specific snapshot, +it must be accurate and must be registered in the table metadata file to be considered as a valid statistics file for the reader. + +Partition statistics files metadata within `partition-statistics` table metadata field is a struct with the following fields: + +| v1 | v2 | Field name | Type | Description | +|----|----|------------|------|-------------| +| _required_ | _required_ | **`snapshot-id`** | `long` | ID of the Iceberg table's snapshot the partition statistics file is associated with. | +| _required_ | _required_ | **`statistics-file-path`** | `string` | Path of the partition statistics file. See [Partition Statistics file](#partition-statistics-file). | +| _required_ | _required_ | **`max-data-sequence-number`** | `long` | Maximum data sequence number of the Iceberg table's snapshot the partition statistics was computed from. | + +#### Partition Statistics file + +Statistics information for every partition tuple is stored as a row in the **table default format**. +These rows are sorted (in ascending manner with NULL FIRST) based on the first partition column from `partition` +to optimize filtering rows while scanning. +Each unique partition tuple must have exactly one corresponding row, ensuring all partition tuples are present. + +Partition statistics file store the statistics as a struct with the following fields: + +| v1 | v2 | Field id, name | Type | Description | +|----|----|----------------|------|-------------| +| _required_ | _required_ | **`1 partition`** | `struct<..>` | Partition data tuple, schema based on the partition spec output using partition field ids for the struct field ids | +| _required_ | _required_ | **`2 spec_id`** | `int` | Partition spec id | +| _required_ | _required_ | **`3 data_record_count`** | `long` | Count of records in data files | +| _required_ | _required_ | **`4 data_file_count`** | `int` | Count of data files | +| _required_ | _required_ | **`5 total_data_file_size_in_bytes`** | `long` | Total size of data files in bytes | +| _optional_ | _optional_ | **`6 position_delete_record_count`** | `long` | Count of records in position delete files | +| _optional_ | _optional_ | **`7 position_delete_file_count`** | `int` | Count of position delete files | +| _optional_ | _optional_ | **`8 equality_delete_record_count`** | `long` | Count of records in equality delete files | +| _optional_ | _optional_ | **`9 equality_delete_file_count`** | `int` | Count of equality delete files | +| _optional_ | _optional_ | **`10 last_updated_at`** | `timestamptz` | Commit time of snapshot that last updated this partition | +| _optional_ | _optional_ | **`11 last_updated_snapshot_id`** | `long` | ID of snapshot that last updated this partition | + #### Commit Conflict Resolution and Retry When two commits happen at the same time and are based on the same version, only one commit will succeed. In most cases, the failed commit can be applied to the new current version of table metadata and retried. Updates verify the conditions under which they can be applied to a new version and retry if those conditions are met. From 835f10187910d8f9fbfa904995a689afff1b113a Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Tue, 18 Jul 2023 18:54:32 +0530 Subject: [PATCH 2/4] Parquet: Move iceberg-parquet files to iceberg-core - Since core module need to write stats in parquet format, to avoid circular dependency, move all the files from iceberg-parquet module to iceberg code. - `TestParquetReadProjection` used to duplicate the test code of iceberg-api module's `TestReadProjection`. Removed the duplicate class and instead directly extend the original class from iceberg-api module. - Update TestParquetReadProjection to skip empty struct testcases as only Avro readers supports it. The testcases are now common for both Avro and Parquet readers. --- CONTRIBUTING.md | 1 - README.md | 1 - .../java/org/apache/iceberg/TestHelpers.java | 13 + build.gradle | 39 +- .../data/parquet/BaseParquetReaders.java | 0 .../data/parquet/BaseParquetWriter.java | 0 .../data/parquet/GenericParquetReaders.java | 0 .../data/parquet/GenericParquetWriter.java | 0 .../iceberg/parquet/ApplyNameMapping.java | 0 .../iceberg/parquet/BaseColumnIterator.java | 0 .../iceberg/parquet/BasePageIterator.java | 0 .../iceberg/parquet/ColumnIterator.java | 0 .../apache/iceberg/parquet/ColumnWriter.java | 0 .../iceberg/parquet/MessageTypeToType.java | 0 .../apache/iceberg/parquet/PageIterator.java | 0 .../org/apache/iceberg/parquet/Parquet.java | 0 .../apache/iceberg/parquet/ParquetAvro.java | 0 .../iceberg/parquet/ParquetAvroReader.java | 0 .../parquet/ParquetAvroValueReaders.java | 0 .../iceberg/parquet/ParquetAvroWriter.java | 0 .../parquet/ParquetBloomRowGroupFilter.java | 0 .../iceberg/parquet/ParquetCodecFactory.java | 0 .../iceberg/parquet/ParquetConversions.java | 0 .../ParquetDictionaryRowGroupFilter.java | 0 .../iceberg/parquet/ParquetFilters.java | 0 .../org/apache/iceberg/parquet/ParquetIO.java | 0 .../iceberg/parquet/ParquetIterable.java | 0 .../parquet/ParquetMetricsRowGroupFilter.java | 0 .../iceberg/parquet/ParquetReadSupport.java | 0 .../apache/iceberg/parquet/ParquetReader.java | 0 .../iceberg/parquet/ParquetSchemaUtil.java | 0 .../iceberg/parquet/ParquetTypeVisitor.java | 0 .../apache/iceberg/parquet/ParquetUtil.java | 0 .../iceberg/parquet/ParquetValueReader.java | 0 .../iceberg/parquet/ParquetValueReaders.java | 0 .../iceberg/parquet/ParquetValueWriter.java | 0 .../iceberg/parquet/ParquetValueWriters.java | 0 .../iceberg/parquet/ParquetWriteAdapter.java | 0 .../iceberg/parquet/ParquetWriteSupport.java | 0 .../apache/iceberg/parquet/ParquetWriter.java | 0 .../apache/iceberg/parquet/PruneColumns.java | 0 .../org/apache/iceberg/parquet/ReadConf.java | 0 .../org/apache/iceberg/parquet/RemoveIds.java | 0 .../iceberg/parquet/TripleIterator.java | 0 .../apache/iceberg/parquet/TripleWriter.java | 0 .../iceberg/parquet/TypeToMessageType.java | 0 .../parquet/TypeWithSchemaVisitor.java | 0 .../iceberg/parquet/ValuesAsBytesReader.java | 0 .../parquet/VectorizedParquetReader.java | 0 .../iceberg/parquet/VectorizedReader.java | 0 .../avro/TestParquetReadProjection.java | 18 +- .../parquet/ParquetWritingTestUtils.java | 0 .../parquet/TestBloomRowGroupFilter.java | 0 .../parquet/TestCDHParquetStatistics.java | 0 .../parquet/TestDictionaryRowGroupFilter.java | 0 .../apache/iceberg/parquet/TestParquet.java | 0 .../parquet/TestParquetDataWriter.java | 0 .../parquet/TestParquetDeleteWriters.java | 0 .../parquet/TestParquetEncryption.java | 0 .../parquet/TestParquetSchemaUtil.java | 0 .../iceberg/parquet/TestPruneColumns.java | 0 docs/java-api.md | 1 - flink/v1.15/build.gradle | 1 - flink/v1.16/build.gradle | 1 - flink/v1.17/build.gradle | 1 - hive3/build.gradle | 1 - mr/build.gradle | 1 - .../java/org/apache/iceberg/TestHelpers.java | 85 --- .../iceberg/avro/TestReadProjection.java | 549 ------------------ settings.gradle | 2 - spark/v3.1/build.gradle | 1 - spark/v3.2/build.gradle | 2 - spark/v3.3/build.gradle | 2 - spark/v3.4/build.gradle | 2 - 74 files changed, 38 insertions(+), 683 deletions(-) rename {parquet => core}/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ColumnWriter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/PageIterator.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/Parquet.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetAvroReader.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetAvroWriter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetIO.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetReader.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/PruneColumns.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ReadConf.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/RemoveIds.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/TripleIterator.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/TripleWriter.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java (100%) rename {parquet => core}/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/avro/TestParquetReadProjection.java (77%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestCDHParquetStatistics.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestParquet.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java (100%) rename {parquet => core}/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java (100%) delete mode 100644 parquet/src/test/java/org/apache/iceberg/TestHelpers.java delete mode 100644 parquet/src/test/java/org/apache/iceberg/avro/TestReadProjection.java diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e160aaec17c0..bb6a41234926 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -72,7 +72,6 @@ __Modules__ `iceberg-core` `iceberg-data` `iceberg-orc` -`iceberg-parquet` Changes to public interfaces and classes in the subprojects listed above require a deprecation cycle of one minor release. These projects contain common and internal code used by other projects and can evolve within a major release. diff --git a/README.md b/README.md index a0b7b00208c0..325e2110d4f9 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,6 @@ Iceberg table support is organized in library modules: * `iceberg-common` contains utility classes used in other modules * `iceberg-api` contains the public Iceberg API * `iceberg-core` contains implementations of the Iceberg API and support for Avro data files, **this is what processing engines should depend on** -* `iceberg-parquet` is an optional module for working with tables backed by Parquet files * `iceberg-arrow` is an optional module for reading Parquet into Arrow memory * `iceberg-orc` is an optional module for working with tables backed by ORC files * `iceberg-hive-metastore` is an implementation of Iceberg tables backed by the Hive metastore Thrift client diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 153e2de7ea9a..38c65a7a0f8e 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -46,6 +46,7 @@ import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.ByteBuffers; +import org.assertj.core.api.AbstractThrowableAssert; import org.assertj.core.api.Assertions; import org.objenesis.strategy.StdInstantiatorStrategy; @@ -265,6 +266,18 @@ public static void serialize(final Serializable obj, final OutputStream outputSt } } + public static void assertThrows( + String message, + Class expected, + String containedInMessage, + Runnable runnable) { + AbstractThrowableAssert check = + Assertions.assertThatThrownBy(runnable::run).as(message).isInstanceOf(expected); + if (null != containedInMessage) { + check.hasMessageContaining(containedInMessage); + } + } + public static class KryoHelpers { private KryoHelpers() {} diff --git a/build.gradle b/build.gradle index 298d690e6c9d..a7ec406243ae 100644 --- a/build.gradle +++ b/build.gradle @@ -114,7 +114,7 @@ if (file("${rootDir}/iceberg-build.properties").exists()) { } def projectVersion = getProjectVersion() -final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-parquet", "iceberg-orc", "iceberg-common", "iceberg-data"] +final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-orc", "iceberg-common", "iceberg-data"] allprojects { group = "org.apache.iceberg" @@ -349,6 +349,13 @@ project(':iceberg-core') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' } + implementation(libs.parquet.avro) { + exclude group: 'org.apache.avro', module: 'avro' + // already shaded by Parquet + exclude group: 'it.unimi.dsi' + exclude group: 'org.codehaus.jackson' + } + testImplementation libs.jetty.servlet testImplementation libs.jetty.server testImplementation libs.mockserver.netty @@ -366,7 +373,6 @@ project(':iceberg-data') { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') implementation project(':iceberg-core') - compileOnly project(':iceberg-parquet') compileOnly project(':iceberg-orc') compileOnly(libs.hadoop2.common) { exclude group: 'commons-beanutils' @@ -555,7 +561,6 @@ project(':iceberg-delta-lake') { api project(':iceberg-api') implementation project(':iceberg-common') implementation project(':iceberg-core') - implementation project(':iceberg-parquet') implementation platform(libs.jackson.bom) implementation "com.fasterxml.jackson.core:jackson-databind" annotationProcessor libs.immutables.value @@ -761,32 +766,6 @@ project(':iceberg-orc') { } } -project(':iceberg-parquet') { - test { - useJUnitPlatform() - } - dependencies { - implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') - api project(':iceberg-api') - implementation project(':iceberg-core') - implementation project(':iceberg-common') - - implementation(libs.parquet.avro) { - exclude group: 'org.apache.avro', module: 'avro' - // already shaded by Parquet - exclude group: 'it.unimi.dsi' - exclude group: 'org.codehaus.jackson' - } - - compileOnly libs.avro.avro - compileOnly(libs.hadoop2.client) { - exclude group: 'org.apache.avro', module: 'avro' - } - - testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') - } -} - project(':iceberg-arrow') { test { useJUnitPlatform() @@ -795,7 +774,6 @@ project(':iceberg-arrow') { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') implementation project(':iceberg-core') - implementation project(':iceberg-parquet') implementation(libs.arrow.vector) { exclude group: 'io.netty', module: 'netty-buffer' @@ -837,7 +815,6 @@ project(':iceberg-pig') { api project(':iceberg-api') implementation project(':iceberg-common') implementation project(':iceberg-core') - implementation project(':iceberg-parquet') implementation(libs.parquet.avro) { exclude group: 'org.apache.avro', module: 'avro' diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/core/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java rename to core/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/core/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java rename to core/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/core/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java rename to core/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/core/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java rename to core/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java b/core/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java rename to core/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java b/core/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java rename to core/src/main/java/org/apache/iceberg/parquet/BaseColumnIterator.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java b/core/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java rename to core/src/main/java/org/apache/iceberg/parquet/BasePageIterator.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java b/core/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java rename to core/src/main/java/org/apache/iceberg/parquet/ColumnIterator.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ColumnWriter.java b/core/src/main/java/org/apache/iceberg/parquet/ColumnWriter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ColumnWriter.java rename to core/src/main/java/org/apache/iceberg/parquet/ColumnWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java b/core/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java rename to core/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java b/core/src/main/java/org/apache/iceberg/parquet/PageIterator.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/PageIterator.java rename to core/src/main/java/org/apache/iceberg/parquet/PageIterator.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/core/src/main/java/org/apache/iceberg/parquet/Parquet.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java rename to core/src/main/java/org/apache/iceberg/parquet/Parquet.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroReader.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetAvroReader.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroReader.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetAvroReader.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroWriter.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetAvroWriter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroWriter.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetAvroWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetBloomRowGroupFilter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetIO.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetIO.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetReader.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetReader.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetTypeVisitor.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/core/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java rename to core/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/core/src/main/java/org/apache/iceberg/parquet/PruneColumns.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java rename to core/src/main/java/org/apache/iceberg/parquet/PruneColumns.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/core/src/main/java/org/apache/iceberg/parquet/ReadConf.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java rename to core/src/main/java/org/apache/iceberg/parquet/ReadConf.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/RemoveIds.java b/core/src/main/java/org/apache/iceberg/parquet/RemoveIds.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/RemoveIds.java rename to core/src/main/java/org/apache/iceberg/parquet/RemoveIds.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java b/core/src/main/java/org/apache/iceberg/parquet/TripleIterator.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java rename to core/src/main/java/org/apache/iceberg/parquet/TripleIterator.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TripleWriter.java b/core/src/main/java/org/apache/iceberg/parquet/TripleWriter.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/TripleWriter.java rename to core/src/main/java/org/apache/iceberg/parquet/TripleWriter.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java b/core/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java rename to core/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java b/core/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java rename to core/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java b/core/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java rename to core/src/main/java/org/apache/iceberg/parquet/ValuesAsBytesReader.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/core/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java rename to core/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java b/core/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java similarity index 100% rename from parquet/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java rename to core/src/main/java/org/apache/iceberg/parquet/VectorizedReader.java diff --git a/parquet/src/test/java/org/apache/iceberg/avro/TestParquetReadProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestParquetReadProjection.java similarity index 77% rename from parquet/src/test/java/org/apache/iceberg/avro/TestParquetReadProjection.java rename to core/src/test/java/org/apache/iceberg/avro/TestParquetReadProjection.java index 2df806b1fb9a..1b20e2610ae6 100644 --- a/parquet/src/test/java/org/apache/iceberg/avro/TestParquetReadProjection.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestParquetReadProjection.java @@ -32,7 +32,7 @@ public class TestParquetReadProjection extends TestReadProjection { protected GenericData.Record writeAndRead( String desc, Schema writeSchema, Schema readSchema, GenericData.Record record) throws IOException { - File file = temp.resolve(desc + ".parquet").toFile(); + File file = temp.newFile(desc + ".parquet"); file.delete(); try (FileAppender appender = @@ -45,4 +45,20 @@ protected GenericData.Record writeAndRead( return Iterables.getOnlyElement(records); } + + // Empty struct read is not supported for Parquet + @Override + public void testEmptyStructProjection() throws Exception {} + + @Override + public void testEmptyStructRequiredProjection() throws Exception {} + + @Override + public void testRequiredEmptyStructInRequiredStruct() throws Exception {} + + @Override + public void testEmptyNestedStructProjection() throws Exception {} + + @Override + public void testEmptyNestedStructRequiredProjection() throws Exception {} } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java b/core/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java rename to core/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/core/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java rename to core/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestCDHParquetStatistics.java b/core/src/test/java/org/apache/iceberg/parquet/TestCDHParquetStatistics.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestCDHParquetStatistics.java rename to core/src/test/java/org/apache/iceberg/parquet/TestCDHParquetStatistics.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java b/core/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java rename to core/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/core/src/test/java/org/apache/iceberg/parquet/TestParquet.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java rename to core/src/test/java/org/apache/iceberg/parquet/TestParquet.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/core/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java rename to core/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java b/core/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java rename to core/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java b/core/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java rename to core/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java b/core/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java rename to core/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java b/core/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java similarity index 100% rename from parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java rename to core/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java diff --git a/docs/java-api.md b/docs/java-api.md index 62b51e096fb2..48b02a6b5515 100644 --- a/docs/java-api.md +++ b/docs/java-api.md @@ -247,7 +247,6 @@ Iceberg table support is organized in library modules: * `iceberg-arrow` is an implementation of the Iceberg type system for reading and writing data stored in Iceberg tables using Apache Arrow as the in-memory data format * `iceberg-aws` contains implementations of the Iceberg API to be used with tables stored on AWS S3 and/or for tables defined using the AWS Glue data catalog * `iceberg-core` contains implementations of the Iceberg API and support for Avro data files, **this is what processing engines should depend on** -* `iceberg-parquet` is an optional module for working with tables backed by Parquet files * `iceberg-orc` is an optional module for working with tables backed by ORC files (*experimental*) * `iceberg-hive-metastore` is an implementation of Iceberg tables backed by the Hive metastore Thrift client diff --git a/flink/v1.15/build.gradle b/flink/v1.15/build.gradle index ff8a8e8fe34a..f8daaa9fc464 100644 --- a/flink/v1.15/build.gradle +++ b/flink/v1.15/build.gradle @@ -29,7 +29,6 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-core') api project(':iceberg-data') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') compileOnly libs.flink115.avro diff --git a/flink/v1.16/build.gradle b/flink/v1.16/build.gradle index 260e7c5bd12b..7f98300dcda8 100644 --- a/flink/v1.16/build.gradle +++ b/flink/v1.16/build.gradle @@ -29,7 +29,6 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-core') api project(':iceberg-data') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') compileOnly libs.flink116.avro diff --git a/flink/v1.17/build.gradle b/flink/v1.17/build.gradle index b597a1bcb0b9..80c412ca830a 100644 --- a/flink/v1.17/build.gradle +++ b/flink/v1.17/build.gradle @@ -29,7 +29,6 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-core') api project(':iceberg-data') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') compileOnly libs.flink117.avro diff --git a/hive3/build.gradle b/hive3/build.gradle index 67fe1090c5b0..55910749baa9 100644 --- a/hive3/build.gradle +++ b/hive3/build.gradle @@ -40,7 +40,6 @@ project(':iceberg-hive3') { compileOnly project(':iceberg-core') compileOnly project(':iceberg-common') compileOnly project(':iceberg-hive-metastore') - compileOnly project(':iceberg-parquet') compileOnly project(':iceberg-hive3-orc-bundle') compileOnly project(':iceberg-mr') compileOnly project(':iceberg-data') diff --git a/mr/build.gradle b/mr/build.gradle index a7ff1b73f869..7bd906453a91 100644 --- a/mr/build.gradle +++ b/mr/build.gradle @@ -32,7 +32,6 @@ project(':iceberg-mr') { api project(':iceberg-data') implementation project(':iceberg-hive-metastore') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') compileOnly(libs.hadoop2.client) { exclude group: 'org.apache.avro', module: 'avro' diff --git a/parquet/src/test/java/org/apache/iceberg/TestHelpers.java b/parquet/src/test/java/org/apache/iceberg/TestHelpers.java deleted file mode 100644 index 0e7627cab1f5..000000000000 --- a/parquet/src/test/java/org/apache/iceberg/TestHelpers.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import java.util.concurrent.Callable; -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.generic.GenericRecord; -import org.assertj.core.api.AbstractThrowableAssert; - -public class TestHelpers { - - private TestHelpers() {} - - /** - * A convenience method to avoid a large number of @Test(expected=...) tests - * - * @param message A String message to describe this assertion - * @param expected An Exception class that the Runnable should throw - * @param containedInMessage A String that should be contained by the thrown exception's message - * @param callable A Callable that is expected to throw the exception - */ - public static void assertThrows( - String message, - Class expected, - String containedInMessage, - Callable callable) { - AbstractThrowableAssert check = - assertThatThrownBy(callable::call).as(message).isInstanceOf(expected); - if (null != containedInMessage) { - check.hasMessageContaining(containedInMessage); - } - } - - /** - * A convenience method to avoid a large number of @Test(expected=...) tests - * - * @param message A String message to describe this assertion - * @param expected An Exception class that the Runnable should throw - * @param containedInMessage A String that should be contained by the thrown exception's message - * @param runnable A Runnable that is expected to throw the runtime exception - */ - public static void assertThrows( - String message, - Class expected, - String containedInMessage, - Runnable runnable) { - AbstractThrowableAssert check = - assertThatThrownBy(runnable::run).as(message).isInstanceOf(expected); - if (null != containedInMessage) { - check.hasMessageContaining(containedInMessage); - } - } - - /** - * A convenience method to assert if an Avro field is empty - * - * @param record The record to read from - * @param field The name of the field - */ - public static void assertEmptyAvroField(GenericRecord record, String field) { - TestHelpers.assertThrows( - "Not a valid schema field: " + field, - AvroRuntimeException.class, - "Not a valid schema field: " + field, - () -> record.get(field)); - } -} diff --git a/parquet/src/test/java/org/apache/iceberg/avro/TestReadProjection.java b/parquet/src/test/java/org/apache/iceberg/avro/TestReadProjection.java deleted file mode 100644 index 93c6ad05379e..000000000000 --- a/parquet/src/test/java/org/apache/iceberg/avro/TestReadProjection.java +++ /dev/null @@ -1,549 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.avro; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assertions.within; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.List; -import java.util.Map; -import org.apache.avro.generic.GenericData.Record; -import org.apache.iceberg.Schema; -import org.apache.iceberg.TestHelpers; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -public abstract class TestReadProjection { - protected abstract Record writeAndRead( - String desc, Schema writeSchema, Schema readSchema, Record record) throws IOException; - - @TempDir protected Path temp; - - @Test - public void testFullProjection() throws Exception { - Schema schema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.StringType.get())); - - Record record = new Record(AvroSchemaUtil.convert(schema, "table")); - record.put("id", 34L); - record.put("data", "test"); - - Record projected = writeAndRead("full_projection", schema, schema, record); - - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - - assertThat(projected.get("data").toString()) - .as("Should contain the correct data value") - .isEqualTo("test"); - } - - @Test - public void testReorderedFullProjection() throws Exception { - Schema schema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.StringType.get())); - - Record record = new Record(AvroSchemaUtil.convert(schema, "table")); - record.put("id", 34L); - record.put("data", "test"); - - Schema reordered = - new Schema( - Types.NestedField.optional(1, "data", Types.StringType.get()), - Types.NestedField.required(0, "id", Types.LongType.get())); - - Record projected = writeAndRead("full_projection", schema, reordered, record); - - assertThat(projected.get(0).toString()) - .as("Should contain the correct 0 value") - .isEqualTo("test"); - assertThat(projected.get(1)).as("Should contain the correct 1 value").isEqualTo(34L); - } - - @Test - public void testReorderedProjection() throws Exception { - Schema schema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.StringType.get())); - - Record record = new Record(AvroSchemaUtil.convert(schema, "table")); - record.put("id", 34L); - record.put("data", "test"); - - Schema reordered = - new Schema( - Types.NestedField.optional(2, "missing_1", Types.StringType.get()), - Types.NestedField.optional(1, "data", Types.StringType.get()), - Types.NestedField.optional(3, "missing_2", Types.LongType.get())); - - Record projected = writeAndRead("full_projection", schema, reordered, record); - - assertThat(projected.get(0)).as("Should contain the correct 0 value").isNull(); - assertThat(projected.get(1).toString()) - .as("Should contain the correct 1 value") - .isEqualTo("test"); - assertThat(projected.get(2)).as("Should contain the correct 2 value").isNull(); - } - - @Test - public void testEmptyProjection() throws Exception { - Schema schema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.StringType.get())); - - Record record = new Record(AvroSchemaUtil.convert(schema, "table")); - record.put("id", 34L); - record.put("data", "test"); - - Record projected = writeAndRead("empty_projection", schema, schema.select(), record); - - assertThat(projected).as("Should read a non-null record").isNotNull(); - // this is expected because there are no values - assertThatThrownBy(() -> projected.get(0)).isInstanceOf(ArrayIndexOutOfBoundsException.class); - } - - @Test - public void testBasicProjection() throws Exception { - Schema writeSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.StringType.get())); - - Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); - record.put("id", 34L); - record.put("data", "test"); - - Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get())); - - Record projected = writeAndRead("basic_projection_id", writeSchema, idOnly, record); - TestHelpers.assertEmptyAvroField(projected, "data"); - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - - Schema dataOnly = new Schema(Types.NestedField.optional(1, "data", Types.StringType.get())); - - projected = writeAndRead("basic_projection_data", writeSchema, dataOnly, record); - - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("data").toString()) - .as("Should contain the correct data value") - .isEqualTo("test"); - } - - @Test - public void testRename() throws Exception { - Schema writeSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "data", Types.StringType.get())); - - Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); - record.put("id", 34L); - record.put("data", "test"); - - Schema readSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional(1, "renamed", Types.StringType.get())); - - Record projected = writeAndRead("project_and_rename", writeSchema, readSchema, record); - - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - - assertThat(projected.get("renamed").toString()) - .as("Should contain the correct data/renamed value") - .isEqualTo("test"); - } - - @Test - public void testNestedStructProjection() throws Exception { - Schema writeSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional( - 3, - "location", - Types.StructType.of( - Types.NestedField.required(1, "lat", Types.FloatType.get()), - Types.NestedField.required(2, "long", Types.FloatType.get())))); - - Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); - record.put("id", 34L); - Record location = - new Record(AvroSchemaUtil.fromOption(record.getSchema().getField("location").schema())); - location.put("lat", 52.995143f); - location.put("long", -1.539054f); - record.put("location", location); - - Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get())); - - Record projected = writeAndRead("id_only", writeSchema, idOnly, record); - TestHelpers.assertEmptyAvroField(projected, "location"); - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - - Schema latOnly = - new Schema( - Types.NestedField.optional( - 3, - "location", - Types.StructType.of(Types.NestedField.required(1, "lat", Types.FloatType.get())))); - - projected = writeAndRead("latitude_only", writeSchema, latOnly, record); - Record projectedLocation = (Record) projected.get("location"); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("location")).as("Should project location").isNotNull(); - TestHelpers.assertEmptyAvroField(projectedLocation, "long"); - assertThat((float) projectedLocation.get("lat")) - .as("Should project latitude") - .isCloseTo(52.995143f, within(0.000001f)); - - Schema longOnly = - new Schema( - Types.NestedField.optional( - 3, - "location", - Types.StructType.of(Types.NestedField.required(2, "long", Types.FloatType.get())))); - - projected = writeAndRead("longitude_only", writeSchema, longOnly, record); - projectedLocation = (Record) projected.get("location"); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("location")).as("Should project location").isNotNull(); - TestHelpers.assertEmptyAvroField(projectedLocation, "lat"); - assertThat((float) projectedLocation.get("long")) - .as("Should project longitude") - .isCloseTo(-1.539054f, within(0.000001f)); - - Schema locationOnly = writeSchema.select("location"); - projected = writeAndRead("location_only", writeSchema, locationOnly, record); - projectedLocation = (Record) projected.get("location"); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("location")).as("Should project location").isNotNull(); - assertThat((float) projectedLocation.get("lat")) - .as("Should project latitude") - .isCloseTo(52.995143f, within(0.000001f)); - - assertThat((float) projectedLocation.get("long")) - .as("Should project longitude") - .isCloseTo(-1.539054f, within(0.000001f)); - } - - @Test - public void testMapProjection() throws IOException { - Schema writeSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional( - 5, - "properties", - Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StringType.get()))); - - Map properties = ImmutableMap.of("a", "A", "b", "B"); - - Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); - record.put("id", 34L); - record.put("properties", properties); - - Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get())); - - Record projected = writeAndRead("id_only", writeSchema, idOnly, record); - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - TestHelpers.assertEmptyAvroField(projected, "properties"); - - Schema keyOnly = writeSchema.select("properties.key"); - projected = writeAndRead("key_only", writeSchema, keyOnly, record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(toStringMap((Map) projected.get("properties"))) - .as("Should project entire map") - .isEqualTo(properties); - - Schema valueOnly = writeSchema.select("properties.value"); - projected = writeAndRead("value_only", writeSchema, valueOnly, record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(toStringMap((Map) projected.get("properties"))) - .as("Should project entire map") - .isEqualTo(properties); - - Schema mapOnly = writeSchema.select("properties"); - projected = writeAndRead("map_only", writeSchema, mapOnly, record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(toStringMap((Map) projected.get("properties"))) - .as("Should project entire map") - .isEqualTo(properties); - } - - private Map toStringMap(Map map) { - Map stringMap = Maps.newHashMap(); - for (Map.Entry entry : map.entrySet()) { - if (entry.getValue() instanceof CharSequence) { - stringMap.put(entry.getKey().toString(), entry.getValue().toString()); - } else { - stringMap.put(entry.getKey().toString(), entry.getValue()); - } - } - return stringMap; - } - - @Test - public void testMapOfStructsProjection() throws IOException { - Schema writeSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional( - 5, - "locations", - Types.MapType.ofOptional( - 6, - 7, - Types.StringType.get(), - Types.StructType.of( - Types.NestedField.required(1, "lat", Types.FloatType.get()), - Types.NestedField.required(2, "long", Types.FloatType.get()))))); - - Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); - record.put("id", 34L); - Record l1 = - new Record( - AvroSchemaUtil.fromOption( - AvroSchemaUtil.fromOption(record.getSchema().getField("locations").schema()) - .getValueType())); - l1.put("lat", 53.992811f); - l1.put("long", -1.542616f); - Record l2 = new Record(l1.getSchema()); - l2.put("lat", 52.995143f); - l2.put("long", -1.539054f); - record.put("locations", ImmutableMap.of("L1", l1, "L2", l2)); - - Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get())); - - Record projected = writeAndRead("id_only", writeSchema, idOnly, record); - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - TestHelpers.assertEmptyAvroField(projected, "locations"); - - projected = writeAndRead("all_locations", writeSchema, writeSchema.select("locations"), record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(toStringMap((Map) projected.get("locations"))) - .as("Should project locations map") - .isEqualTo(record.get("locations")); - - projected = writeAndRead("lat_only", writeSchema, writeSchema.select("locations.lat"), record); - TestHelpers.assertEmptyAvroField(projected, "id"); - Map locations = toStringMap((Map) projected.get("locations")); - assertThat(locations).as("Should project locations map").isNotNull(); - assertThat(locations.keySet()).as("Should contain L1 and L2").containsExactly("L1", "L2"); - Record projectedL1 = (Record) locations.get("L1"); - assertThat(projectedL1).as("L1 should not be null").isNotNull(); - assertThat((float) projectedL1.get("lat")) - .as("L1 should contain lat") - .isCloseTo(53.992811f, within(0.000001f)); - - TestHelpers.assertEmptyAvroField(projectedL1, "long"); - Record projectedL2 = (Record) locations.get("L2"); - assertThat(projectedL2).as("L2 should not be null").isNotNull(); - assertThat((float) projectedL2.get("lat")) - .as("L2 should contain lat") - .isCloseTo(52.995143f, within(0.000001f)); - - TestHelpers.assertEmptyAvroField(projectedL2, "long"); - - projected = - writeAndRead("long_only", writeSchema, writeSchema.select("locations.long"), record); - TestHelpers.assertEmptyAvroField(projected, "id"); - locations = toStringMap((Map) projected.get("locations")); - assertThat(locations).as("Should project locations map").isNotNull(); - assertThat(locations.keySet()).as("Should contain L1 and L2").containsExactly("L1", "L2"); - projectedL1 = (Record) locations.get("L1"); - assertThat(projectedL1).as("L1 should not be null").isNotNull(); - TestHelpers.assertEmptyAvroField(projectedL1, "lat"); - assertThat((float) projectedL1.get("long")) - .as("L1 should contain long") - .isCloseTo(-1.542616f, within(0.000001f)); - - projectedL2 = (Record) locations.get("L2"); - assertThat(projectedL2).as("L2 should not be null").isNotNull(); - TestHelpers.assertEmptyAvroField(projectedL2, "lat"); - assertThat((float) projectedL2.get("long")) - .as("L2 should contain long") - .isCloseTo(-1.539054f, within(0.000001f)); - - Schema latitiudeRenamed = - new Schema( - Types.NestedField.optional( - 5, - "locations", - Types.MapType.ofOptional( - 6, - 7, - Types.StringType.get(), - Types.StructType.of( - Types.NestedField.required(1, "latitude", Types.FloatType.get()))))); - - projected = writeAndRead("latitude_renamed", writeSchema, latitiudeRenamed, record); - TestHelpers.assertEmptyAvroField(projected, "id"); - locations = toStringMap((Map) projected.get("locations")); - assertThat(locations).as("Should project locations map").isNotNull(); - assertThat(locations.keySet()).as("Should contain L1 and L2").containsExactly("L1", "L2"); - projectedL1 = (Record) locations.get("L1"); - assertThat(projectedL1).as("L1 should not be null").isNotNull(); - assertThat((float) projectedL1.get("latitude")) - .as("L1 should contain latitude") - .isCloseTo(53.992811f, within(0.000001f)); - TestHelpers.assertEmptyAvroField(projectedL1, "lat"); - TestHelpers.assertEmptyAvroField(projectedL1, "long"); - projectedL2 = (Record) locations.get("L2"); - assertThat(projectedL2).as("L2 should not be null").isNotNull(); - assertThat((float) projectedL2.get("latitude")) - .as("L2 should contain latitude") - .isCloseTo(52.995143f, within(0.000001f)); - - TestHelpers.assertEmptyAvroField(projectedL2, "lat"); - TestHelpers.assertEmptyAvroField(projectedL2, "long"); - } - - @Test - public void testListProjection() throws IOException { - Schema writeSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional( - 10, "values", Types.ListType.ofOptional(11, Types.LongType.get()))); - - List values = ImmutableList.of(56L, 57L, 58L); - - Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); - record.put("id", 34L); - record.put("values", values); - - Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get())); - - Record projected = writeAndRead("id_only", writeSchema, idOnly, record); - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - TestHelpers.assertEmptyAvroField(projected, "values"); - - Schema elementOnly = writeSchema.select("values.element"); - projected = writeAndRead("element_only", writeSchema, elementOnly, record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("values")).as("Should project entire list").isEqualTo(values); - - Schema listOnly = writeSchema.select("values"); - projected = writeAndRead("list_only", writeSchema, listOnly, record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("values")).as("Should project entire list").isEqualTo(values); - } - - @Test - @SuppressWarnings("unchecked") - public void testListOfStructsProjection() throws IOException { - Schema writeSchema = - new Schema( - Types.NestedField.required(0, "id", Types.LongType.get()), - Types.NestedField.optional( - 22, - "points", - Types.ListType.ofOptional( - 21, - Types.StructType.of( - Types.NestedField.required(19, "x", Types.IntegerType.get()), - Types.NestedField.optional(18, "y", Types.IntegerType.get()))))); - - Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table")); - record.put("id", 34L); - Record p1 = - new Record( - AvroSchemaUtil.fromOption( - AvroSchemaUtil.fromOption(record.getSchema().getField("points").schema()) - .getElementType())); - p1.put("x", 1); - p1.put("y", 2); - Record p2 = new Record(p1.getSchema()); - p2.put("x", 3); - p2.put("y", null); - record.put("points", ImmutableList.of(p1, p2)); - - Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get())); - - Record projected = writeAndRead("id_only", writeSchema, idOnly, record); - assertThat((long) projected.get("id")).as("Should contain the correct id value").isEqualTo(34L); - TestHelpers.assertEmptyAvroField(projected, "points"); - - projected = writeAndRead("all_points", writeSchema, writeSchema.select("points"), record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("points")) - .as("Should project points list") - .isEqualTo(record.get("points")); - - projected = writeAndRead("x_only", writeSchema, writeSchema.select("points.x"), record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("points")).as("Should project points list").isNotNull(); - List points = (List) projected.get("points"); - assertThat(points).as("Should read 2 points").hasSize(2); - Record projectedP1 = points.get(0); - assertThat((int) projectedP1.get("x")).as("Should project x").isEqualTo(1); - TestHelpers.assertEmptyAvroField(projectedP1, "y"); - Record projectedP2 = points.get(1); - assertThat((int) projectedP2.get("x")).as("Should project x").isEqualTo(3); - TestHelpers.assertEmptyAvroField(projectedP2, "y"); - - projected = writeAndRead("y_only", writeSchema, writeSchema.select("points.y"), record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("points")).as("Should project points list").isNotNull(); - points = (List) projected.get("points"); - assertThat(points).as("Should read 2 points").hasSize(2); - projectedP1 = points.get(0); - TestHelpers.assertEmptyAvroField(projectedP1, "x"); - assertThat((int) projectedP1.get("y")).as("Should project y").isEqualTo(2); - projectedP2 = points.get(1); - TestHelpers.assertEmptyAvroField(projectedP2, "x"); - assertThat(projectedP2.get("y")).as("Should project null y").isNull(); - - Schema yRenamed = - new Schema( - Types.NestedField.optional( - 22, - "points", - Types.ListType.ofOptional( - 21, - Types.StructType.of( - Types.NestedField.optional(18, "z", Types.IntegerType.get()))))); - - projected = writeAndRead("y_renamed", writeSchema, yRenamed, record); - TestHelpers.assertEmptyAvroField(projected, "id"); - assertThat(projected.get("points")).as("Should project points list").isNotNull(); - points = (List) projected.get("points"); - assertThat(points).as("Should read 2 points").hasSize(2); - projectedP1 = points.get(0); - TestHelpers.assertEmptyAvroField(projectedP1, "x"); - TestHelpers.assertEmptyAvroField(projectedP1, "y"); - assertThat((int) projectedP1.get("z")).as("Should project z").isEqualTo(2); - projectedP2 = points.get(1); - TestHelpers.assertEmptyAvroField(projectedP2, "x"); - TestHelpers.assertEmptyAvroField(projectedP2, "y"); - assertThat(projectedP2.get("z")).as("Should project null z").isNull(); - } -} diff --git a/settings.gradle b/settings.gradle index f557712cf72b..46141732783e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,7 +29,6 @@ include 'azure' include 'azure-bundle' include 'orc' include 'arrow' -include 'parquet' include 'bundled-guava' include 'spark' include 'pig' @@ -52,7 +51,6 @@ project(':azure').name = 'iceberg-azure' project(':azure-bundle').name = 'iceberg-azure-bundle' project(':orc').name = 'iceberg-orc' project(':arrow').name = 'iceberg-arrow' -project(':parquet').name = 'iceberg-parquet' project(':bundled-guava').name = 'iceberg-bundled-guava' project(':spark').name = 'iceberg-spark' project(':pig').name = 'iceberg-pig' diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle index 23ba2d02df74..4d5e32a2b1c0 100644 --- a/spark/v3.1/build.gradle +++ b/spark/v3.1/build.gradle @@ -57,7 +57,6 @@ project(':iceberg-spark:iceberg-spark-3.1_2.12') { implementation project(':iceberg-core') implementation project(':iceberg-data') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') implementation project(':iceberg-arrow') compileOnly libs.errorprone.annotations diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index c4df5015458c..fa6d7ed1b1cd 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -57,7 +57,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { implementation project(':iceberg-core') implementation project(':iceberg-data') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') implementation project(':iceberg-arrow') implementation "org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}" @@ -152,7 +151,6 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer } testImplementation project(path: ':iceberg-data') - testImplementation project(path: ':iceberg-parquet') testImplementation project(path: ':iceberg-hive-metastore') testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') diff --git a/spark/v3.3/build.gradle b/spark/v3.3/build.gradle index abd614923325..e9d86e5e35b9 100644 --- a/spark/v3.3/build.gradle +++ b/spark/v3.3/build.gradle @@ -56,7 +56,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { implementation project(':iceberg-core') implementation project(':iceberg-data') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') implementation project(':iceberg-arrow') implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}") @@ -158,7 +157,6 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer compileOnly libs.errorprone.annotations testImplementation project(path: ':iceberg-data') - testImplementation project(path: ':iceberg-parquet') testImplementation project(path: ':iceberg-hive-metastore') testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle index 3ec10c3b6b31..7612728d3cf0 100644 --- a/spark/v3.4/build.gradle +++ b/spark/v3.4/build.gradle @@ -56,7 +56,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { implementation project(':iceberg-core') implementation project(':iceberg-data') implementation project(':iceberg-orc') - implementation project(':iceberg-parquet') implementation project(':iceberg-arrow') implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}") @@ -157,7 +156,6 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer compileOnly libs.errorprone.annotations testImplementation project(path: ':iceberg-data') - testImplementation project(path: ':iceberg-parquet') testImplementation project(path: ':iceberg-hive-metastore') testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') From 44c2d7ede69eab11f82913d3abbab88cd9f68656 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Wed, 2 Aug 2023 17:53:09 +0530 Subject: [PATCH 3/4] Core: Enhance PartitionsTable.Partition PartitionsTable.Partition will be used between Partitions metadata table and partition stats reader-writer. Hence, move it to a separate class and extend it with Avro's IndexedRecord (for partition stats writing). --- .../java/org/apache/iceberg/Partition.java | 392 ++++++++++++++++++ .../org/apache/iceberg/PartitionsTable.java | 110 +---- .../org/apache/iceberg/TestPartition.java | 74 ++++ 3 files changed, 486 insertions(+), 90 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/Partition.java create mode 100644 core/src/test/java/org/apache/iceberg/TestPartition.java diff --git a/core/src/main/java/org/apache/iceberg/Partition.java b/core/src/main/java/org/apache/iceberg/Partition.java new file mode 100644 index 000000000000..cba011ad9792 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/Partition.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Objects; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; + +public class Partition implements IndexedRecord { + private PartitionData partitionData; + private int specId; + private long dataRecordCount; + private int dataFileCount; + private long dataFileSizeInBytes; + // optional fields are kept as objects instead of primitive. + private Long posDeleteRecordCount; + private Integer posDeleteFileCount; + private Long eqDeleteRecordCount; + private Integer eqDeleteFileCount; + // Commit time of snapshot that last updated this partition + private Long lastUpdatedAt; + // ID of snapshot that last updated this partition + private Long lastUpdatedSnapshotId; + + public enum Column { + PARTITION_DATA, + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + DATA_FILE_SIZE_IN_BYTES, + POSITION_DELETE_RECORD_COUNT, + POSITION_DELETE_FILE_COUNT, + EQUALITY_DELETE_RECORD_COUNT, + EQUALITY_DELETE_FILE_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID + } + + public Partition() {} + + public Partition(StructLike key, Types.StructType keyType) { + this.partitionData = toPartitionData(key, keyType); + this.specId = 0; + this.dataRecordCount = 0L; + this.dataFileCount = 0; + this.dataFileSizeInBytes = 0L; + this.posDeleteRecordCount = 0L; + this.posDeleteFileCount = 0; + this.eqDeleteRecordCount = 0L; + this.eqDeleteFileCount = 0; + } + + public static Builder builder() { + return new Builder(); + } + + public PartitionData partitionData() { + return partitionData; + } + + public int specId() { + return specId; + } + + public long dataRecordCount() { + return dataRecordCount; + } + + public int dataFileCount() { + return dataFileCount; + } + + public long dataFileSizeInBytes() { + return dataFileSizeInBytes; + } + + public Long posDeleteRecordCount() { + return posDeleteRecordCount; + } + + public Integer posDeleteFileCount() { + return posDeleteFileCount; + } + + public Long eqDeleteRecordCount() { + return eqDeleteRecordCount; + } + + public Integer eqDeleteFileCount() { + return eqDeleteFileCount; + } + + public Long lastUpdatedAt() { + return lastUpdatedAt; + } + + public Long lastUpdatedSnapshotId() { + return lastUpdatedSnapshotId; + } + + synchronized void update(ContentFile file, Snapshot snapshot) { + if (snapshot != null) { + long snapshotCommitTime = snapshot.timestampMillis() * 1000; + if (this.lastUpdatedAt == null || snapshotCommitTime > this.lastUpdatedAt) { + this.lastUpdatedAt = snapshotCommitTime; + this.lastUpdatedSnapshotId = snapshot.snapshotId(); + } + } + + switch (file.content()) { + case DATA: + this.dataRecordCount += file.recordCount(); + this.dataFileCount += 1; + this.specId = file.specId(); + this.dataFileSizeInBytes += file.fileSizeInBytes(); + break; + case POSITION_DELETES: + this.posDeleteRecordCount = file.recordCount(); + this.posDeleteFileCount += 1; + this.specId = file.specId(); + break; + case EQUALITY_DELETES: + this.eqDeleteRecordCount = file.recordCount(); + this.eqDeleteFileCount += 1; + this.specId = file.specId(); + break; + default: + throw new UnsupportedOperationException("Unsupported file content type: " + file.content()); + } + } + + /** Needed because StructProjection is not serializable */ + private static PartitionData toPartitionData(StructLike key, Types.StructType keyType) { + PartitionData data = new PartitionData(keyType); + for (int i = 0; i < keyType.fields().size(); i++) { + Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass()); + if (val != null) { + data.set(i, val); + } + } + + return data; + } + + @Override + public void put(int i, Object v) { + switch (i) { + case 0: + this.partitionData = (PartitionData) v; + return; + case 1: + this.specId = (int) v; + return; + case 2: + this.dataRecordCount = (long) v; + return; + case 3: + this.dataFileCount = (int) v; + return; + case 4: + this.dataFileSizeInBytes = (long) v; + return; + case 5: + this.posDeleteRecordCount = (v == null) ? null : (Long) v; + return; + case 6: + this.posDeleteFileCount = (v == null) ? null : (Integer) v; + return; + case 7: + this.eqDeleteRecordCount = (v == null) ? null : (Long) v; + return; + case 8: + this.eqDeleteFileCount = (v == null) ? null : (Integer) v; + return; + case 9: + this.lastUpdatedAt = (v == null) ? null : (Long) v; + return; + case 10: + this.lastUpdatedSnapshotId = (v == null) ? null : (Long) v; + return; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return partitionData; + case 1: + return specId; + case 2: + return dataRecordCount; + case 3: + return dataFileCount; + case 4: + return dataFileSizeInBytes; + case 5: + return posDeleteRecordCount; + case 6: + return posDeleteFileCount; + case 7: + return eqDeleteRecordCount; + case 8: + return eqDeleteFileCount; + case 9: + return lastUpdatedAt; + case 10: + return lastUpdatedSnapshotId; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public Schema getSchema() { + return prepareAvroSchema(partitionData.getPartitionType()); + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof Partition)) { + return false; + } + + Partition that = (Partition) o; + if (!(partitionData.equals(that.partitionData) + && specId == that.specId + && dataRecordCount == that.dataRecordCount + && dataFileCount == that.dataFileCount + && dataFileSizeInBytes == that.dataFileSizeInBytes)) { + return false; + } + + return Objects.equals(posDeleteRecordCount, that.posDeleteRecordCount) + && Objects.equals(posDeleteFileCount, that.posDeleteFileCount) + && Objects.equals(eqDeleteRecordCount, that.eqDeleteRecordCount) + && Objects.equals(eqDeleteFileCount, that.eqDeleteFileCount) + && Objects.equals(lastUpdatedAt, that.lastUpdatedAt) + && Objects.equals(lastUpdatedSnapshotId, that.lastUpdatedSnapshotId); + } + + @Override + public int hashCode() { + return Objects.hash( + partitionData, + specId, + dataRecordCount, + dataFileCount, + dataFileSizeInBytes, + posDeleteRecordCount, + posDeleteFileCount, + eqDeleteRecordCount, + eqDeleteFileCount, + lastUpdatedAt, + lastUpdatedSnapshotId); + } + + public static org.apache.iceberg.Schema icebergSchema(Types.StructType partitionType) { + if (partitionType.fields().isEmpty()) { + throw new IllegalArgumentException("getting schema for an unpartitioned table"); + } + + return new org.apache.iceberg.Schema( + Types.NestedField.required(1, Column.PARTITION_DATA.name(), partitionType), + Types.NestedField.required(2, Column.SPEC_ID.name(), Types.IntegerType.get()), + Types.NestedField.required(3, Column.DATA_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.required(4, Column.DATA_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.required(5, Column.DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), + Types.NestedField.optional( + 6, Column.POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 7, Column.POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional( + 8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 9, Column.EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional(10, Column.LAST_UPDATED_AT.name(), Types.LongType.get()), + Types.NestedField.optional( + 11, Column.LAST_UPDATED_SNAPSHOT_ID.name(), Types.LongType.get())); + } + + private static Schema prepareAvroSchema(Types.StructType partitionType) { + return AvroSchemaUtil.convert(icebergSchema(partitionType), "partitionEntry"); + } + + public static class Builder { + private PartitionData partitionData; + private int specId; + private long dataRecordCount; + private int dataFileCount; + private long dataFileSizeInBytes; + private Long posDeleteRecordCount; + private Integer posDeleteFileCount; + private Long eqDeleteRecordCount; + private Integer eqDeleteFileCount; + private Long lastUpdatedAt; + private Long lastUpdatedSnapshotId; + + public Builder withPartitionData(PartitionData newPartitionData) { + this.partitionData = newPartitionData; + return this; + } + + public Builder withSpecId(int newSpecId) { + this.specId = newSpecId; + return this; + } + + public Builder withDataRecordCount(long newDataRecordCount) { + this.dataRecordCount = newDataRecordCount; + return this; + } + + public Builder withDataFileCount(int newDataFileCount) { + this.dataFileCount = newDataFileCount; + return this; + } + + public Builder withDataFileSizeInBytes(long newDataFileSizeInBytes) { + this.dataFileSizeInBytes = newDataFileSizeInBytes; + return this; + } + + public Builder withPosDeleteRecordCount(Long newPosDeleteRecordCount) { + this.posDeleteRecordCount = newPosDeleteRecordCount; + return this; + } + + public Builder withPosDeleteFileCount(Integer newPosDeleteFileCount) { + this.posDeleteFileCount = newPosDeleteFileCount; + return this; + } + + public Builder withEqDeleteRecordCount(Long newEqDeleteRecordCount) { + this.eqDeleteRecordCount = newEqDeleteRecordCount; + return this; + } + + public Builder withEqDeleteFileCount(Integer newEqDeleteFileCount) { + this.eqDeleteFileCount = newEqDeleteFileCount; + return this; + } + + public Builder withLastUpdatedAt(Long newLastUpdatedAt) { + this.lastUpdatedAt = newLastUpdatedAt; + return this; + } + + public Builder withLastUpdatedSnapshotId(Long newLastUpdatedSnapshotId) { + this.lastUpdatedSnapshotId = newLastUpdatedSnapshotId; + return this; + } + + public Partition build() { + Partition partition = new Partition(partitionData, partitionData.getPartitionType()); + partition.specId = specId; + partition.dataRecordCount = dataRecordCount; + partition.dataFileCount = dataFileCount; + partition.dataFileSizeInBytes = dataFileSizeInBytes; + partition.posDeleteRecordCount = posDeleteRecordCount; + partition.posDeleteFileCount = posDeleteFileCount; + partition.eqDeleteRecordCount = eqDeleteRecordCount; + partition.eqDeleteFileCount = eqDeleteFileCount; + partition.lastUpdatedAt = lastUpdatedAt; + partition.lastUpdatedSnapshotId = lastUpdatedSnapshotId; + return partition; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index d93200c7cfca..60ad28da0d42 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -129,15 +129,15 @@ private DataTask task(StaticTableScan scan) { partitions, root -> StaticDataTask.Row.of( - root.dataRecordCount, - root.dataFileCount, - root.dataFileSizeInBytes, - root.posDeleteRecordCount, - root.posDeleteFileCount, - root.eqDeleteRecordCount, - root.eqDeleteFileCount, - root.lastUpdatedAt, - root.lastUpdatedSnapshotId)); + root.dataRecordCount(), + root.dataFileCount(), + root.dataFileSizeInBytes(), + root.posDeleteRecordCount(), + root.posDeleteFileCount(), + root.eqDeleteRecordCount(), + root.eqDeleteFileCount(), + root.lastUpdatedAt(), + root.lastUpdatedSnapshotId())); } else { return StaticDataTask.of( io().newInputFile(table().operations().current().metadataFileLocation()), @@ -150,17 +150,17 @@ private DataTask task(StaticTableScan scan) { private static StaticDataTask.Row convertPartition(Partition partition) { return StaticDataTask.Row.of( - partition.partitionData, - partition.specId, - partition.dataRecordCount, - partition.dataFileCount, - partition.dataFileSizeInBytes, - partition.posDeleteRecordCount, - partition.posDeleteFileCount, - partition.eqDeleteRecordCount, - partition.eqDeleteFileCount, - partition.lastUpdatedAt, - partition.lastUpdatedSnapshotId); + partition.partitionData(), + partition.specId(), + partition.dataRecordCount(), + partition.dataFileCount(), + partition.dataFileSizeInBytes(), + partition.posDeleteRecordCount(), + partition.posDeleteFileCount(), + partition.eqDeleteRecordCount(), + partition.eqDeleteFileCount(), + partition.lastUpdatedAt(), + partition.lastUpdatedSnapshotId()); } private static Iterable partitions(Table table, StaticTableScan scan) { @@ -271,74 +271,4 @@ Iterable all() { return partitions.values(); } } - - static class Partition { - private final PartitionData partitionData; - private int specId; - private long dataRecordCount; - private int dataFileCount; - private long dataFileSizeInBytes; - private long posDeleteRecordCount; - private int posDeleteFileCount; - private long eqDeleteRecordCount; - private int eqDeleteFileCount; - private Long lastUpdatedAt; - private Long lastUpdatedSnapshotId; - - Partition(StructLike key, Types.StructType keyType) { - this.partitionData = toPartitionData(key, keyType); - this.specId = 0; - this.dataRecordCount = 0L; - this.dataFileCount = 0; - this.dataFileSizeInBytes = 0L; - this.posDeleteRecordCount = 0L; - this.posDeleteFileCount = 0; - this.eqDeleteRecordCount = 0L; - this.eqDeleteFileCount = 0; - } - - void update(ContentFile file, Snapshot snapshot) { - if (snapshot != null) { - long snapshotCommitTime = snapshot.timestampMillis() * 1000; - if (this.lastUpdatedAt == null || snapshotCommitTime > this.lastUpdatedAt) { - this.lastUpdatedAt = snapshotCommitTime; - this.lastUpdatedSnapshotId = snapshot.snapshotId(); - } - } - - switch (file.content()) { - case DATA: - this.dataRecordCount += file.recordCount(); - this.dataFileCount += 1; - this.specId = file.specId(); - this.dataFileSizeInBytes += file.fileSizeInBytes(); - break; - case POSITION_DELETES: - this.posDeleteRecordCount = file.recordCount(); - this.posDeleteFileCount += 1; - this.specId = file.specId(); - break; - case EQUALITY_DELETES: - this.eqDeleteRecordCount = file.recordCount(); - this.eqDeleteFileCount += 1; - this.specId = file.specId(); - break; - default: - throw new UnsupportedOperationException( - "Unsupported file content type: " + file.content()); - } - } - - /** Needed because StructProjection is not serializable */ - private PartitionData toPartitionData(StructLike key, Types.StructType keyType) { - PartitionData data = new PartitionData(keyType); - for (int i = 0; i < keyType.fields().size(); i++) { - Object val = key.get(i, keyType.fields().get(i).type().typeId().javaClass()); - if (val != null) { - data.set(i, val); - } - } - return data; - } - } } diff --git a/core/src/test/java/org/apache/iceberg/TestPartition.java b/core/src/test/java/org/apache/iceberg/TestPartition.java new file mode 100644 index 000000000000..4043ddd85e95 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPartition.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestPartition { + + @Test + public void testPartitionBuilder() { + Types.StructType partitionType = + Types.StructType.of( + Types.NestedField.required(1000, "field1", Types.StringType.get()), + Types.NestedField.required(1001, "field2", Types.IntegerType.get())); + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, "value1"); + partitionData.set(1, 42); + + Partition partition = + Partition.builder() + .withPartitionData(partitionData) + .withSpecId(123) + .withDataRecordCount(1000L) + .withDataFileCount(5) + .withDataFileSizeInBytes(1024L * 1024L) + .withPosDeleteRecordCount(50L) + .withPosDeleteFileCount(2) + .withEqDeleteRecordCount(20L) + .withEqDeleteFileCount(1) + .withLastUpdatedAt(1627900200L) + .withLastUpdatedSnapshotId(456789L) + .build(); + + // Verify the get method + Assertions.assertEquals(partitionData, partition.get(0)); + Assertions.assertEquals(123, partition.get(1)); + Assertions.assertEquals(1000L, partition.get(2)); + Assertions.assertEquals(5, partition.get(3)); + Assertions.assertEquals(1024L * 1024L, partition.get(4)); + Assertions.assertEquals(50L, partition.get(5)); + Assertions.assertEquals(2, partition.get(6)); + Assertions.assertEquals(20L, partition.get(7)); + Assertions.assertEquals(1, partition.get(8)); + Assertions.assertEquals(1627900200L, partition.get(9)); + Assertions.assertEquals(456789L, partition.get(10)); + + // Verify the put method + Partition newPartition = new Partition(); + int size = partition.getSchema().getFields().size(); + for (int i = 0; i < size; i++) { + newPartition.put(i, partition.get(i)); + } + + Assertions.assertEquals(newPartition, partition); + } +} From 2ba244540bf9fd574ece909f4cb178fdf12defa8 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Wed, 2 Aug 2023 20:22:28 +0530 Subject: [PATCH 4/4] Core: Add a util to read and write partition stats --- .../java/org/apache/iceberg/Partitioning.java | 5 +- .../partition/stats/PartitionStatsUtil.java | 132 ++++++++++++++++++ .../stats/TestPartitionStatsUtil.java | 129 +++++++++++++++++ 3 files changed, 265 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/iceberg/partition/stats/PartitionStatsUtil.java create mode 100644 core/src/test/java/org/apache/iceberg/partition/stats/TestPartitionStatsUtil.java diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 7e4fcae333d8..872eb3bb09af 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -238,7 +238,10 @@ public static StructType groupingKeyType(Schema schema, Collection specs = table.specs().values(); + return partitionType(table.specs().values()); + } + + public static StructType partitionType(Collection specs) { return buildPartitionProjectionType("table partition", specs, allFieldIds(specs)); } diff --git a/core/src/main/java/org/apache/iceberg/partition/stats/PartitionStatsUtil.java b/core/src/main/java/org/apache/iceberg/partition/stats/PartitionStatsUtil.java new file mode 100644 index 000000000000..9a37c276ea07 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/partition/stats/PartitionStatsUtil.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.partition.stats; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Iterator; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Partition; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetAvroValueReaders; +import org.apache.iceberg.parquet.ParquetAvroWriter; +import org.apache.iceberg.types.Types; + +public final class PartitionStatsUtil { + + private PartitionStatsUtil() {} + + private static final String PARQUET_SUFFIX = ".parquet"; + + public static void writePartitionStatsFile( + Iterator partitions, OutputFile outputFile, Collection specs) { + validateFormat(outputFile.location()); + writeAsParquetFile( + Partition.icebergSchema(Partitioning.partitionType(specs)), partitions, outputFile); + } + + private static void validateFormat(String filePath) { + if (!filePath.toLowerCase().endsWith(PARQUET_SUFFIX)) { + throw new UnsupportedOperationException("Unsupported format : " + filePath); + } + } + + public static CloseableIterable readPartitionStatsFile( + Schema schema, InputFile inputFile) { + validateFormat(inputFile.location()); + // schema of partition column during read could be different from + // what is used for writing due to partition evolution. + // While reading, ParquetAvroValueReaders fills the data as per latest schema. + CloseableIterable records = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> ParquetAvroValueReaders.buildReader(schema, fileSchema)) + .build(); + return CloseableIterable.transform(records, record -> toPartition(schema, record)); + } + + private static Partition toPartition(Schema schema, GenericData.Record record) { + Partition partition = new Partition(); + partition.put( + Partition.Column.PARTITION_DATA.ordinal(), extractPartitionDataFromRecord(schema, record)); + + int recordCount = record.getSchema().getFields().size(); + for (int columnIndex = 1; columnIndex < recordCount; columnIndex++) { + partition.put(columnIndex, record.get(columnIndex)); + } + + return partition; + } + + private static PartitionData extractPartitionDataFromRecord( + Schema schema, GenericData.Record record) { + int partitionDataCount = + record + .getSchema() + .getField(Partition.Column.PARTITION_DATA.name()) + .schema() + .getFields() + .size(); + PartitionData partitionData = + new PartitionData( + (Types.StructType) schema.findField(Partition.Column.PARTITION_DATA.name()).type()); + for (int partitionColIndex = 0; partitionColIndex < partitionDataCount; partitionColIndex++) { + partitionData.set( + partitionColIndex, + ((GenericData.Record) record.get(Partition.Column.PARTITION_DATA.ordinal())) + .get(partitionColIndex)); + } + + return partitionData; + } + + private static void writeAsParquetFile( + Schema schema, Iterator records, OutputFile outputFile) { + String firstPartitionCol = + ((Types.StructType) schema.asStruct().fields().get(0).type()).fields().get(0).name(); + // sorting based on the first partition column + SortOrder sortOrder = + SortOrder.builderFor(schema) + .asc(Partition.Column.PARTITION_DATA.name() + "." + firstPartitionCol) + .build(); + + try (DataWriter dataWriter = + Parquet.writeData(outputFile) + .schema(schema) + .createWriterFunc(ParquetAvroWriter::buildWriter) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .withSortOrder(sortOrder) + .build()) { + records.forEachRemaining(dataWriter::write); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/partition/stats/TestPartitionStatsUtil.java b/core/src/test/java/org/apache/iceberg/partition/stats/TestPartitionStatsUtil.java new file mode 100644 index 000000000000..83695ae36bf7 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/partition/stats/TestPartitionStatsUtil.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.partition.stats; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import org.apache.iceberg.Files; +import org.apache.iceberg.Partition; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestPartitionStatsUtil { + private static final Logger LOG = LoggerFactory.getLogger(TestPartitionStatsUtil.class); + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "binary", Types.BinaryType.get())); + + @TempDir protected Path temp; + + @Test + public void testPartitionStats() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + Table testTable = + TestTables.create( + temp.resolve("test_partition_stats").toFile(), + "test_partition_stats", + SCHEMA, + spec, + SortOrder.unsorted(), + 2); + + Schema schema = Partition.icebergSchema(Partitioning.partitionType(testTable.specs().values())); + + ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); + + long seed = System.currentTimeMillis(); + LOG.info("Seed used for random generator is {}", seed); + Random random = new Random(seed); + + for (int i = 0; i < 42; i++) { + PartitionData partitionData = + new PartitionData( + schema.findField(Partition.Column.PARTITION_DATA.name()).type().asStructType()); + partitionData.set(0, random.nextLong()); + + Partition partition = + Partition.builder() + .withPartitionData(partitionData) + .withSpecId(random.nextInt(10)) + .withDataRecordCount(random.nextLong()) + .withDataFileCount(random.nextInt()) + .withDataFileSizeInBytes(1024L * random.nextInt(20)) + .withPosDeleteRecordCount(random.nextLong()) + .withPosDeleteFileCount(random.nextInt()) + .withEqDeleteRecordCount(random.nextLong()) + .withEqDeleteFileCount(random.nextInt()) + .withLastUpdatedAt(random.nextLong()) + .withLastUpdatedSnapshotId(random.nextLong()) + .build(); + + partitionListBuilder.add(partition); + } + List records = partitionListBuilder.build(); + + String dataFileFormatName = + testTable + .properties() + .getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + + OutputFile outputFile = + Files.localOutput( + testTable.location() + "/metadata/" + UUID.randomUUID() + "." + dataFileFormatName); + PartitionStatsUtil.writePartitionStatsFile( + records.iterator(), outputFile, testTable.specs().values()); + + Assertions.assertThat(Paths.get(outputFile.location())).exists(); + + List rows; + try (CloseableIterable recordIterator = + PartitionStatsUtil.readPartitionStatsFile( + schema, Files.localInput(outputFile.location()))) { + rows = Lists.newArrayList(recordIterator); + } + + Assertions.assertThat(rows).hasSize(records.size()); + for (int i = 0; i < records.size(); i++) { + Assertions.assertThat(rows.get(i)).isEqualTo(records.get(i)); + } + } +}