diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java index 62e6f8acf7a5..e6a3d75b1ec8 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java @@ -21,8 +21,6 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ManifestEvaluator; @@ -35,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; /** Base class logic for files metadata tables */ @@ -46,15 +45,37 @@ abstract class BaseFilesTable extends BaseMetadataTable { @Override public Schema schema() { - StructType partitionType = Partitioning.partitionType(table()); + // avoid returning an empty struct, which is not always supported. + // instead, drop the partition field + boolean dropPartitionColumnForUnpartitioned = true; + return schemaInternal(table(), dropPartitionColumnForUnpartitioned); + } + + private static Schema schemaInternal(Table table, boolean dropPartitionColumnForUnpartitioned) { + StructType partitionType = Partitioning.partitionType(table); Schema schema = new Schema(DataFile.getType(partitionType).fields()); - if (partitionType.fields().size() < 1) { - // avoid returning an empty struct, which is not always supported. - // instead, drop the partition field + if (dropPartitionColumnForUnpartitioned && partitionType.fields().isEmpty()) { schema = TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID)); } - return TypeUtil.join(schema, MetricsUtil.readableMetricsSchema(table().schema(), schema)); + return withDerivedColumns(table.schema(), schema); + } + + private static Schema withDerivedColumns(Schema baseTableSchema, Schema meteTableSchema) { + Schema metadataTableSchema = + TypeUtil.join(meteTableSchema, dataSequenceNumberSchema(meteTableSchema)); + + return TypeUtil.join( + metadataTableSchema, + MetricsUtil.readableMetricsSchema(baseTableSchema, metadataTableSchema)); + } + + private static Schema dataSequenceNumberSchema(Schema schema) { + return new Schema( + NestedField.optional( + schema.highestFieldId() + 1, + MetadataTableUtils.DATA_SEQUENCE_NUMBER, + Types.LongType.get())); } private static CloseableIterable planFiles( @@ -85,11 +106,20 @@ private static CloseableIterable planFiles( Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); + // metadata schema will represent the files schema and indifferent to if table is partitioned + Schema metadataSchema = schemaInternal(table, false); + return CloseableIterable.transform( filteredManifests, manifest -> new ManifestReadTask( - table, manifest, projectedSchema, schemaString, specString, residuals)); + table, + manifest, + metadataSchema, + projectedSchema, + schemaString, + specString, + residuals)); } abstract static class BaseFilesTableScan extends BaseMetadataTableScan { @@ -138,16 +168,19 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final Map specsById; private final ManifestFile manifest; private final Schema dataTableSchema; + private final Schema metadataSchema; private final Schema projection; ManifestReadTask( Table table, ManifestFile manifest, + Schema metadataSchema, Schema projection, String schemaString, String specString, ResidualEvaluator residuals) { super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); + this.metadataSchema = metadataSchema; this.io = table.io(); this.specsById = Maps.newHashMap(table.specs()); this.manifest = manifest; @@ -157,47 +190,52 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { @Override public CloseableIterable rows() { - Types.NestedField readableMetricsField = projection.findField(MetricsUtil.READABLE_METRICS); - - if (readableMetricsField == null) { - return CloseableIterable.transform(files(projection), file -> (StructLike) file); + if (projectionWithComputedColumns(projection)) { + return CloseableIterable.transform(files(), this::withComputedColumns); } else { - - Schema actualProjection = projectionForReadableMetrics(projection, readableMetricsField); - return CloseableIterable.transform( - files(actualProjection), f -> withReadableMetrics(f, readableMetricsField)); + return CloseableIterable.transform(files(projection), file -> (StructLike) file); } } + private boolean projectionWithComputedColumns(Schema requestedProjection) { + Types.NestedField readableMetricsField = + requestedProjection.findField(MetricsUtil.READABLE_METRICS); + Types.NestedField dataSequenceNumberField = + requestedProjection.findField(MetadataTableUtils.DATA_SEQUENCE_NUMBER); + return readableMetricsField != null || dataSequenceNumberField != null; + } + private CloseableIterable> files(Schema fileProjection) { - switch (manifest.content()) { - case DATA: - return ManifestFiles.read(manifest, io, specsById).project(fileProjection); - case DELETES: - return ManifestFiles.readDeleteManifest(manifest, io, specsById).project(fileProjection); - default: - throw new IllegalArgumentException( - "Unsupported manifest content type:" + manifest.content()); - } + return ManifestFiles.open(manifest, io, specsById).project(fileProjection); + } + + private CloseableIterable> files() { + return ManifestFiles.open(manifest, io, specsById); } /** - * Given content file metadata, append a 'readable_metrics' column that return the file's - * metrics in human-readable form. + * Given a content file metadata, append computed columns + * + *
    + *
  • readable_metrics: file's metrics in human-readable form + *
  • data_sequence_number: data sequence number assigned to file on commit + *
* * @param file content file metadata - * @param readableMetricsField projected "readable_metrics" field - * @return struct representing content file, with appended readable_metrics field + * @return result content with appended computed columns */ - private StructLike withReadableMetrics( - ContentFile file, Types.NestedField readableMetricsField) { - int structSize = projection.columns().size(); - MetricsUtil.ReadableMetricsStruct readableMetrics = - readableMetrics(file, readableMetricsField); - int metricsPosition = projection.columns().indexOf(readableMetricsField); - - return new MetricsUtil.StructWithReadableMetrics( - (StructLike) file, structSize, readableMetrics, metricsPosition); + private StructLike withComputedColumns(ContentFile file) { + Types.NestedField readableMetricsField = projection.findField(MetricsUtil.READABLE_METRICS); + StructLike readAbleMetricsStruct = + readableMetricsField == null + ? EmptyStructLike.get() + : readableMetrics(file, readableMetricsField); + return new MetadataTableUtils.StructWithComputedColumns( + metadataSchema, + projection, + (StructLike) file, + file.dataSequenceNumber(), + readAbleMetricsStruct); } private MetricsUtil.ReadableMetricsStruct readableMetrics( @@ -206,28 +244,6 @@ private MetricsUtil.ReadableMetricsStruct readableMetrics( return MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType); } - /** - * Create a projection on content files metadata by removing virtual 'readable_column' and - * ensuring that the underlying metrics used to create that column are part of the final - * projection. - * - * @param requestedProjection requested projection - * @param readableMetricsField readable_metrics field - * @return actual projection to be used - */ - private Schema projectionForReadableMetrics( - Schema requestedProjection, Types.NestedField readableMetricsField) { - Set readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type()); - Schema realProjection = TypeUtil.selectNot(requestedProjection, readableMetricsIds); - - Schema requiredMetricsColumns = - new Schema( - MetricsUtil.READABLE_METRIC_COLS.stream() - .map(MetricsUtil.ReadableMetricColDefinition::originalCol) - .collect(Collectors.toList())); - return TypeUtil.join(realProjection, requiredMetricsColumns); - } - @Override public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index adb0f18ba1ad..20c7e021dbfd 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -19,10 +19,19 @@ package org.apache.iceberg; import java.util.Locale; +import java.util.Set; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; public class MetadataTableUtils { + static final String DATA_SEQUENCE_NUMBER = "data_sequence_number"; + + public static final Set DERIVED_FIELDS = + Sets.newHashSet(MetricsUtil.READABLE_METRICS, DATA_SEQUENCE_NUMBER); + private MetadataTableUtils() {} public static boolean hasMetadataTableName(TableIdentifier identifier) { @@ -109,4 +118,63 @@ public static Table createMetadataTableInstance( private static String metadataTableName(String tableName, MetadataTableType type) { return tableName + (tableName.contains("/") ? "#" : ".") + type.name().toLowerCase(Locale.ROOT); } + + static class StructWithComputedColumns implements StructLike { + private final StructLike struct; + private final int projectionColumnCount; + private final int dataSequenceNumberPosition; + private final Long dataSequenceNumber; + private final int metricsPosition; + private final StructLike readableMetricsStruct; + + private final int[] positionMap; + + StructWithComputedColumns( + Schema base, + Schema projection, + StructLike struct, + Long dataSequenceNumber, + StructLike readableMetrics) { + this.projectionColumnCount = projection.columns().size(); + this.positionMap = new int[this.projectionColumnCount]; + // build projection map + for (Types.NestedField field : projection.asStruct().fields()) { + int projectPosition = projection.columns().indexOf(field); + int basePosition = base.columns().indexOf(base.findField(field.fieldId())); + Preconditions.checkArgument( + projectPosition >= 0, "Cannot find %s in projection", field.name()); + Preconditions.checkArgument(basePosition >= 0, "Cannot find %s in base", field.name()); + positionMap[projectPosition] = basePosition; + } + this.struct = struct; + this.dataSequenceNumberPosition = + projection.columns().indexOf(projection.findField(DATA_SEQUENCE_NUMBER)); + this.dataSequenceNumber = dataSequenceNumber; + this.metricsPosition = + projection.columns().indexOf(projection.findField(MetricsUtil.READABLE_METRICS)); + this.readableMetricsStruct = readableMetrics; + } + + @Override + public int size() { + return projectionColumnCount; + } + + @Override + public T get(int pos, Class javaClass) { + if (pos == dataSequenceNumberPosition) { + return javaClass.cast(dataSequenceNumber); + } else if (pos == metricsPosition) { + return javaClass.cast(readableMetricsStruct); + } else { + int structPosition = positionMap[pos]; + return struct.get(structPosition, javaClass); + } + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("StructWithComputedColumns is read only"); + } + } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index f58cc87c6a29..73889f565c15 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -45,7 +45,6 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; -import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.Parameter; import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; @@ -285,7 +284,7 @@ public void testUnPartitionedTable() throws IOException { List deleteColumns = deleteFilesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String deleteNames = deleteColumns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -311,7 +310,7 @@ public void testUnPartitionedTable() throws IOException { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -384,7 +383,7 @@ public void testPartitionedTable() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -470,7 +469,7 @@ public void testAllFilesUnpartitioned() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -553,7 +552,7 @@ public void testAllFilesPartitioned() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index f58cc87c6a29..73889f565c15 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -45,7 +45,6 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; -import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.Parameter; import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; @@ -285,7 +284,7 @@ public void testUnPartitionedTable() throws IOException { List deleteColumns = deleteFilesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String deleteNames = deleteColumns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -311,7 +310,7 @@ public void testUnPartitionedTable() throws IOException { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -384,7 +383,7 @@ public void testPartitionedTable() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -470,7 +469,7 @@ public void testAllFilesUnpartitioned() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -553,7 +552,7 @@ public void testAllFilesPartitioned() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index f58cc87c6a29..73889f565c15 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -45,7 +45,6 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; -import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.Parameter; import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; @@ -285,7 +284,7 @@ public void testUnPartitionedTable() throws IOException { List deleteColumns = deleteFilesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String deleteNames = deleteColumns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -311,7 +310,7 @@ public void testUnPartitionedTable() throws IOException { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -384,7 +383,7 @@ public void testPartitionedTable() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -470,7 +469,7 @@ public void testAllFilesUnpartitioned() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); @@ -553,7 +552,7 @@ public void testAllFilesPartitioned() throws Exception { List columns = filesTableSchema.columns().stream() .map(Types.NestedField::name) - .filter(c -> !c.equals(MetricsUtil.READABLE_METRICS)) + .filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c)) .collect(Collectors.toList()); String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(",")); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index f2633fc67640..46d6decf37c1 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -47,6 +47,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -828,7 +829,7 @@ public static Dataset selectNonDerived(Dataset metadataTable) { StructField[] fields = metadataTable.schema().fields(); return metadataTable.select( Stream.of(fields) - .filter(f -> !f.name().equals("readable_metrics")) // derived field + .filter(f -> !MetadataTableUtils.DERIVED_FIELDS.contains(f.name())) // derived field .map(f -> new Column(f.name())) .toArray(Column[]::new)); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 8e6b576ddffb..b0f98693b6b3 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -48,6 +48,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -844,7 +845,7 @@ public static Dataset selectNonDerived(Dataset metadataTable) { StructField[] fields = metadataTable.schema().fields(); return metadataTable.select( Stream.of(fields) - .filter(f -> !f.name().equals("readable_metrics")) // derived field + .filter(f -> !MetadataTableUtils.DERIVED_FIELDS.contains(f.name())) // derived field .map(f -> new Column(f.name())) .toArray(Column[]::new)); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java index 50376589b671..8febffe556da 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -276,8 +276,6 @@ public void testAllFilesUnpartitioned() throws Exception { Assert.assertEquals("Table should be cleared", 0, results.size()); Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); - Schema filesTableSchema = - Spark3Util.loadIcebergTable(spark, tableName + ".all_data_files").schema(); // Check all data files table Dataset actualDataFilesDs = spark.sql("SELECT * FROM " + tableName + ".all_data_files"); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index c73ef630ac48..3c49830cc8d8 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -49,6 +49,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -817,7 +818,7 @@ public static Dataset selectNonDerived(Dataset metadataTable) { StructField[] fields = metadataTable.schema().fields(); return metadataTable.select( Stream.of(fields) - .filter(f -> !f.name().equals("readable_metrics")) // derived field + .filter(f -> !MetadataTableUtils.DERIVED_FIELDS.contains(f.name())) // derived field .map(f -> new Column(f.name())) .toArray(Column[]::new)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadDerivedColumns.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadDerivedColumns.java new file mode 100644 index 000000000000..fd033512c0d5 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTableReadDerivedColumns.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.TestBaseWithCatalog; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.io.TempDir; + +public class TestMetadataTableReadDerivedColumns extends TestBaseWithCatalog { + + @TempDir private Path temp; + + private Long appendSnapshotId0; + private Long appendSnapshotId1; + private Long eqDeleteSnapshotId; + private Long posDeleteSnapshotId; + + private static final Schema SCHEMA = + new Schema( + required(1, "booleanCol", Types.BooleanType.get()), + required(2, "intCol", Types.IntegerType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("intCol").build(); + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + // only SparkCatalog supports metadata table sql queries + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + SparkCatalogConfig.HIVE.properties() + }, + }; + } + + protected String tableName() { + return tableName.split("\\.")[2]; + } + + protected String database() { + return tableName.split("\\.")[1]; + } + + private Table createPrimitiveTable() throws IOException { + Table table = + catalog.createTable( + TableIdentifier.of(Namespace.of(database()), tableName()), + SCHEMA, + SPEC, + Collections.singletonMap(TableProperties.FORMAT_VERSION, "2")); + + // write 2 data files + List records0 = Lists.newArrayList(createRecord(true, 0), createRecord(false, 0)); + List records1 = Lists.newArrayList(createRecord(true, 1), createRecord(false, 1)); + DataFile dataFile0 = + FileHelpers.writeDataFile( + table, Files.localOutput(temp.toFile()), TestHelpers.Row.of(0), records0); + DataFile dataFile1 = + FileHelpers.writeDataFile( + table, Files.localOutput(temp.toFile()), TestHelpers.Row.of(1), records1); + table.newAppend().appendFile(dataFile0).commit(); + table.refresh(); + appendSnapshotId0 = table.currentSnapshot().snapshotId(); + table.newAppend().appendFile(dataFile1).commit(); + table.refresh(); + appendSnapshotId1 = table.currentSnapshot().snapshotId(); + + // write equality deletes + List eqDeletes = Lists.newArrayList(); + Schema deleteRowSchema = SCHEMA.select("intCol"); + Record delete = GenericRecord.create(deleteRowSchema); + eqDeletes.add(delete.copy("intCol", 1)); + DeleteFile equalityDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(temp.toFile()), + TestHelpers.Row.of(1), + eqDeletes, + deleteRowSchema); + table.newRowDelta().addDeletes(equalityDeletes).commit(); + table.refresh(); + eqDeleteSnapshotId = table.currentSnapshot().snapshotId(); + + // write position deletes + List> posDeletes = + Lists.newArrayList(positionDelete(table.schema(), dataFile0.path(), 0L, true, 0)); + + DeleteFile posDeleteFiles = + FileHelpers.writePosDeleteFile( + table, Files.localOutput(temp.toFile()), TestHelpers.Row.of(0), posDeletes); + table.newRowDelta().addDeletes(posDeleteFiles).commit(); + table.refresh(); + posDeleteSnapshotId = table.currentSnapshot().snapshotId(); + + return table; + } + + @AfterEach + public void dropTable() { + sql("DROP TABLE %s", tableName); + } + + protected GenericRecord createRecord(boolean booleanCol, int intCol) { + GenericRecord record = GenericRecord.create(SCHEMA); + record.set(0, booleanCol); + record.set(1, intCol); + return record; + } + + private PositionDelete positionDelete( + Schema tableSchema, CharSequence path, Long position, boolean boolValue, int intValue) { + PositionDelete posDelete = PositionDelete.create(); + GenericRecord nested = GenericRecord.create(tableSchema); + nested.set(0, boolValue); + nested.set(1, intValue); + posDelete.set(path, position, nested); + return posDelete; + } + + @TestTemplate + public void testDataSequenceNumberOnDeleteFiles() throws Exception { + Table table = createPrimitiveTable(); + DeleteFile equalityDelete = + table.snapshot(eqDeleteSnapshotId).addedDeleteFiles(table.io()).iterator().next(); + DeleteFile positionDelete = + table.snapshot(posDeleteSnapshotId).addedDeleteFiles(table.io()).iterator().next(); + + List expected = + ImmutableList.of( + row(equalityDelete.dataSequenceNumber()), row(positionDelete.dataSequenceNumber())); + String deleteSql = "SELECT data_sequence_number FROM %s.%s order by data_sequence_number"; + List actual = sql(String.format(deleteSql, tableName, "delete_files")); + assertEquals( + "Select of data sequence number only should match record for delete_files table", + expected, + actual); + } + + @TestTemplate + public void testMixedColumnsOnFiles() throws Exception { + Table table = createPrimitiveTable(); + DataFile dataFileFirst = + table.snapshot(appendSnapshotId0).addedDataFiles(table.io()).iterator().next(); + DataFile dataFileSecond = + table.snapshot(appendSnapshotId1).addedDataFiles(table.io()).iterator().next(); + DeleteFile equalityDelete = + table.snapshot(eqDeleteSnapshotId).addedDeleteFiles(table.io()).iterator().next(); + DeleteFile positionDelete = + table.snapshot(posDeleteSnapshotId).addedDeleteFiles(table.io()).iterator().next(); + + List expected = + ImmutableList.of( + row(dataFileFirst.path(), dataFileFirst.dataSequenceNumber()), + row(dataFileSecond.path(), dataFileSecond.dataSequenceNumber()), + row(equalityDelete.path(), equalityDelete.dataSequenceNumber()), + row(positionDelete.path(), positionDelete.dataSequenceNumber())); + String sql = "SELECT file_path, data_sequence_number FROM %s.%s order by data_sequence_number"; + List actual = sql(String.format(sql, tableName, "files")); + assertEquals( + "Select of derived and non-derived column should match record for files table", + expected, + actual); + } + + @TestTemplate + public void testVirtualColumnsOnDataFiles() throws Exception { + Table table = createPrimitiveTable(); + DataFile dataFileFirst = + table.snapshot(appendSnapshotId0).addedDataFiles(table.io()).iterator().next(); + DataFile dataFileSecond = + table.snapshot(appendSnapshotId1).addedDataFiles(table.io()).iterator().next(); + + List expected = + ImmutableList.of( + row(dataFileFirst.dataSequenceNumber(), 0, true, dataFileFirst.recordCount()), + row(dataFileSecond.dataSequenceNumber(), 1, true, dataFileSecond.recordCount())); + + String dataFileSql = + "SELECT data_sequence_number," + + "readable_metrics.intCol.lower_bound, readable_metrics.booleanCol.upper_bound," + + "record_count FROM %s.%s order by data_sequence_number"; + List result = sql(String.format(dataFileSql, tableName, "data_files")); + assertEquals( + "Select of data sequence number, readable metrcics and record count should match record for data_files table", + expected, + result); + } +}