From 480c1be57ec8bcc175ca3671a701a5651bf3d973 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 13 Aug 2021 19:51:56 -0700 Subject: [PATCH 1/2] Spark: Support spec ID and partition metadata columns --- .../apache/iceberg/util/StructProjection.java | 34 +++- .../org/apache/iceberg/MetadataColumns.java | 35 +++- .../apache/iceberg/util/PartitionUtil.java | 28 ++- .../iceberg/spark/source/BaseDataReader.java | 47 ++++- .../iceberg/spark/source/BatchDataReader.java | 5 +- .../spark/source/EqualityDeleteRowReader.java | 3 +- .../iceberg/spark/source/RowDataReader.java | 5 +- .../spark/source/TestSparkBaseDataReader.java | 23 +-- .../spark/source/SparkScanBuilder.java | 2 +- .../iceberg/spark/source/SparkTestTable.java | 59 ++++++ .../spark/source/TestSparkCatalog.java | 12 +- .../source/TestSparkMetadataColumns.java | 192 ++++++++++++++++++ 12 files changed, 403 insertions(+), 42 deletions(-) create mode 100644 spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java create mode 100644 spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index be05b0fe2db5..e8236d138ceb 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -58,12 +58,32 @@ public static StructProjection create(Schema dataSchema, Schema projectedSchema) return new StructProjection(dataSchema.asStruct(), projectedSchema.asStruct()); } + /** + * Creates a projecting wrapper for {@link StructLike} rows. + *

+ * This projection does not work with repeated types like lists and maps. + * + * @param structType type of rows wrapped by this projection + * @param projectedStructType result type of the projected rows + * @param projectMissingFieldsAsNulls a flag whether to project missing fields as nulls + * @return a wrapper to project rows + */ + public static StructProjection create(StructType structType, StructType projectedStructType, + boolean projectMissingFieldsAsNulls) { + return new StructProjection(structType, projectedStructType, projectMissingFieldsAsNulls); + } + private final StructType type; private final int[] positionMap; private final StructProjection[] nestedProjections; private StructLike struct; private StructProjection(StructType structType, StructType projection) { + this(structType, projection, false); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private StructProjection(StructType structType, StructType projection, boolean projectMissingFieldsAsNulls) { this.type = projection; this.positionMap = new int[projection.fields().size()]; this.nestedProjections = new StructProjection[projection.fields().size()]; @@ -116,7 +136,9 @@ private StructProjection(StructType structType, StructType projection) { } } - if (!found) { + if (!found && projectedField.isOptional() && projectMissingFieldsAsNulls) { + positionMap[pos] = -1; + } else if (!found) { throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectedField, structType)); } } @@ -134,11 +156,17 @@ public int size() { @Override public T get(int pos, Class javaClass) { + int structPos = positionMap[pos]; + if (nestedProjections[pos] != null) { - return javaClass.cast(nestedProjections[pos].wrap(struct.get(positionMap[pos], StructLike.class))); + return javaClass.cast(nestedProjections[pos].wrap(struct.get(structPos, StructLike.class))); } - return struct.get(positionMap[pos], javaClass); + if (structPos != -1) { + return struct.get(structPos, javaClass); + } else { + return null; + } } @Override diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index e1cf096cd003..395bf2d0eb75 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -38,6 +38,12 @@ private MetadataColumns() { Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file"); public static final NestedField IS_DELETED = NestedField.required( Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted"); + public static final NestedField SPEC_ID = NestedField.required( + Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID to which a row belongs to"); + // the partition column type is not static and depends on all specs in the table + public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5; + public static final String PARTITION_COLUMN_NAME = "_partition"; + public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to"; // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns public static final NestedField DELETE_FILE_PATH = NestedField.required( @@ -51,24 +57,39 @@ private MetadataColumns() { private static final Map META_COLUMNS = ImmutableMap.of( FILE_PATH.name(), FILE_PATH, ROW_POSITION.name(), ROW_POSITION, - IS_DELETED.name(), IS_DELETED); + IS_DELETED.name(), IS_DELETED, + SPEC_ID.name(), SPEC_ID + ); - private static final Set META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId) - .collect(ImmutableSet.toImmutableSet()); + private static final Set META_IDS = ImmutableSet.of( + FILE_PATH.fieldId(), + ROW_POSITION.fieldId(), + IS_DELETED.fieldId(), + SPEC_ID.fieldId(), + PARTITION_COLUMN_ID + ); public static Set metadataFieldIds() { return META_IDS; } - public static NestedField get(String name) { - return META_COLUMNS.get(name); + public static NestedField metadataColumn(Table table, String name) { + if (name.equals(PARTITION_COLUMN_NAME)) { + return Types.NestedField.optional( + PARTITION_COLUMN_ID, + PARTITION_COLUMN_NAME, + Partitioning.partitionType(table), + PARTITION_COLUMN_DOC); + } else { + return META_COLUMNS.get(name); + } } public static boolean isMetadataColumn(String name) { - return META_COLUMNS.containsKey(name); + return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name); } public static boolean nonMetadataColumn(String name) { - return !META_COLUMNS.containsKey(name); + return !isMetadataColumn(name); } } diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index 929f77af4e78..b2c3a733f32b 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -36,10 +36,15 @@ private PartitionUtil() { } public static Map constantsMap(FileScanTask task) { - return constantsMap(task, (type, constant) -> constant); + return constantsMap(task, null, (type, constant) -> constant); } public static Map constantsMap(FileScanTask task, BiFunction convertConstant) { + return constantsMap(task, null, convertConstant); + } + + public static Map constantsMap(FileScanTask task, Types.StructType partitionType, + BiFunction convertConstant) { PartitionSpec spec = task.spec(); StructLike partitionData = task.file().partition(); @@ -51,6 +56,20 @@ private PartitionUtil() { MetadataColumns.FILE_PATH.fieldId(), convertConstant.apply(Types.StringType.get(), task.file().path())); + // add _spec_id + idToConstant.put( + MetadataColumns.SPEC_ID.fieldId(), + convertConstant.apply(Types.IntegerType.get(), task.file().specId())); + + // add _partition + if (partitionType != null && partitionType.fields().size() > 0) { + StructLike coercedPartition = coercePartition(partitionType, spec, partitionData); + idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition)); + } else if (partitionType != null) { + // use null as some query engines may not be able to handle empty structs + idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null); + } + List partitionFields = spec.partitionType().fields(); List fields = spec.fields(); for (int pos = 0; pos < fields.size(); pos += 1) { @@ -63,4 +82,11 @@ private PartitionUtil() { return idToConstant; } + + // adapts the provided partition data to match the table partition type + private static StructLike coercePartition(Types.StructType partitionType, PartitionSpec spec, StructLike partition) { + StructProjection projection = StructProjection.create(spec.partitionType(), partitionType, true); + projection.wrap(partition); + return projection; + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java index c8b33dd2f706..b58745c7a00d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java @@ -24,24 +24,32 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.stream.Stream; import org.apache.avro.generic.GenericData; import org.apache.avro.util.Utf8; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; @@ -55,6 +63,7 @@ abstract class BaseDataReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class); + private final Table table; private final Iterator tasks; private final Map inputFiles; @@ -62,17 +71,18 @@ abstract class BaseDataReader implements Closeable { private T current = null; private FileScanTask currentTask = null; - BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) { + BaseDataReader(Table table, CombinedScanTask task) { + this.table = table; this.tasks = task.files().iterator(); Map keyMetadata = Maps.newHashMap(); task.files().stream() .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata())); Stream encrypted = keyMetadata.entrySet().stream() - .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue())); + .map(entry -> EncryptedFiles.encryptedInput(table.io().newInputFile(entry.getKey()), entry.getValue())); // decrypt with the batch call to avoid multiple RPCs to a key server, if possible - Iterable decryptedFiles = encryptionManager.decrypt(encrypted::iterator); + Iterable decryptedFiles = table.encryption().decrypt(encrypted::iterator); Map files = Maps.newHashMapWithExpectedSize(task.files().size()); decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted)); @@ -132,6 +142,15 @@ protected InputFile getInputFile(String location) { return inputFiles.get(location); } + protected Map constantsMap(FileScanTask task, Schema readSchema) { + if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { + StructType partitionType = Partitioning.partitionType(table); + return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant); + } else { + return PartitionUtil.constantsMap(task, BaseDataReader::convertConstant); + } + } + protected static Object convertConstant(Type type, Object value) { if (value == null) { return null; @@ -155,6 +174,24 @@ protected static Object convertConstant(Type type, Object value) { return ByteBuffers.toByteArray((ByteBuffer) value); case BINARY: return ByteBuffers.toByteArray((ByteBuffer) value); + case STRUCT: + StructType structType = (StructType) type; + + if (structType.fields().isEmpty()) { + return new GenericInternalRow(); + } + + List fields = structType.fields(); + Object[] values = new Object[fields.size()]; + StructLike struct = (StructLike) value; + + for (int index = 0; index < fields.size(); index++) { + NestedField field = fields.get(index); + Type fieldType = field.type(); + values[index] = convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass())); + } + + return new GenericInternalRow(values); default: } return value; diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 8cfe46b598fc..e4bd3ceba6ce 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -41,7 +41,6 @@ import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -52,7 +51,7 @@ class BatchDataReader extends BaseDataReader { private final int batchSize; BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) { - super(task, table.io(), table.encryption()); + super(table, task); this.expectedSchema = expectedSchema; this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.caseSensitive = caseSensitive; @@ -66,7 +65,7 @@ CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - Map idToConstant = PartitionUtil.constantsMap(task, BatchDataReader::convertConstant); + Map idToConstant = constantsMap(task, expectedSchema); CloseableIterable iter; InputFile location = getInputFile(task); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index d4328addc759..ce2226f4f75e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -26,7 +26,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; @@ -44,7 +43,7 @@ CloseableIterator open(FileScanTask task) { // schema or rows returned by readers Schema requiredSchema = matches.requiredSchema(); - Map idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant); + Map idToConstant = constantsMap(task, expectedSchema); DataFile file = task.file(); // update the current file for Spark's filename() function diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 391d4a053490..8770e17aa015 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -44,7 +44,6 @@ import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.PartitionUtil; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; @@ -56,7 +55,7 @@ class RowDataReader extends BaseDataReader { private final boolean caseSensitive; RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { - super(task, table.io(), table.encryption()); + super(table, task); this.tableSchema = table.schema(); this.expectedSchema = expectedSchema; this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); @@ -69,7 +68,7 @@ CloseableIterator open(FileScanTask task) { // schema or rows returned by readers Schema requiredSchema = deletes.requiredSchema(); - Map idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant); + Map idToConstant = constantsMap(task, expectedSchema); DataFile file = task.file(); // update the current file for Spark's filename() function diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java index 51b47cbd972d..8bae666c0475 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java @@ -30,7 +30,6 @@ import java.util.stream.IntStream; import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.DataFile; @@ -39,8 +38,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.encryption.PlaintextEncryptionManager; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; @@ -59,7 +56,7 @@ public abstract class TestSparkBaseDataReader { @Rule public TemporaryFolder temp = new TemporaryFolder(); - private static final Configuration CONFD = new Configuration(); + private Table table; // Simulates the closeable iterator of data to be read private static class CloseableIntegerRange implements CloseableIterator { @@ -92,10 +89,8 @@ public Integer next() { private static class ClosureTrackingReader extends BaseDataReader { private Map tracker = new HashMap<>(); - ClosureTrackingReader(List tasks) { - super(new BaseCombinedScanTask(tasks), - new HadoopFileIO(CONFD), - new PlaintextEncryptionManager()); + ClosureTrackingReader(Table table, List tasks) { + super(table, new BaseCombinedScanTask(tasks)); } @Override @@ -124,7 +119,7 @@ public void testClosureOnDataExhaustion() throws IOException { Integer recordPerTask = 10; List tasks = createFileScanTasks(totalTasks, recordPerTask); - ClosureTrackingReader reader = new ClosureTrackingReader(tasks); + ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks); int countRecords = 0; while (reader.next()) { @@ -151,7 +146,7 @@ public void testClosureDuringIteration() throws IOException { FileScanTask firstTask = tasks.get(0); FileScanTask secondTask = tasks.get(1); - ClosureTrackingReader reader = new ClosureTrackingReader(tasks); + ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks); // Total of 2 elements Assert.assertTrue(reader.next()); @@ -175,7 +170,7 @@ public void testClosureWithoutAnyRead() throws IOException { Integer recordPerTask = 10; List tasks = createFileScanTasks(totalTasks, recordPerTask); - ClosureTrackingReader reader = new ClosureTrackingReader(tasks); + ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks); reader.close(); @@ -191,7 +186,7 @@ public void testExplicitClosure() throws IOException { Integer recordPerTask = 10; List tasks = createFileScanTasks(totalTasks, recordPerTask); - ClosureTrackingReader reader = new ClosureTrackingReader(tasks); + ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks); Integer halfDataSize = (totalTasks * recordPerTask) / 2; for (int i = 0; i < halfDataSize; i++) { @@ -217,7 +212,7 @@ public void testIdempotentExplicitClosure() throws IOException { Integer recordPerTask = 10; List tasks = createFileScanTasks(totalTasks, recordPerTask); - ClosureTrackingReader reader = new ClosureTrackingReader(tasks); + ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks); // Total 100 elements, only 5 iterators have been created for (int i = 0; i < 45; i++) { @@ -250,7 +245,7 @@ private List createFileScanTasks(Integer totalTasks, Integer recor ); try { - Table table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned()); + this.table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned()); // Important: use the table's schema for the rest of the test // When tables are created, the column ids are reassigned. Schema tableSchema = table.schema(); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 633a17143f52..483dbcfc5b2f 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -149,7 +149,7 @@ private Schema schemaWithMetadataColumns() { // metadata columns List fields = metaColumns.stream() .distinct() - .map(MetadataColumns::get) + .map(name -> MetadataColumns.metadataColumn(table, name)) .collect(Collectors.toList()); Schema meta = new Schema(fields); diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java new file mode 100644 index 000000000000..078246222411 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkTestTable extends SparkTable { + + private final String[] metadataColumnNames; + + public SparkTestTable(Table icebergTable, String[] metadataColumnNames, boolean refreshEagerly) { + super(icebergTable, refreshEagerly); + this.metadataColumnNames = metadataColumnNames; + } + + @Override + public StructType schema() { + StructType schema = super.schema(); + if (metadataColumnNames != null) { + for (String columnName : metadataColumnNames) { + Types.NestedField metadataColumn = MetadataColumns.metadataColumn(table(), columnName); + schema = schema.add(columnName, SparkSchemaUtil.convert(metadataColumn.type())); + } + } + return schema; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + SparkScanBuilder scanBuilder = (SparkScanBuilder) super.newScanBuilder(options); + if (metadataColumnNames != null) { + scanBuilder.withMetadataColumns(metadataColumnNames); + } + return scanBuilder; + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java index 92013d396c1a..027c88cd4df6 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.source; -import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; @@ -31,7 +30,14 @@ public class TestSparkCatalog exten @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - TestTables.TestTable table = TestTables.load(Spark3Util.identifierToTableIdentifier(ident).toString()); - return new SparkTable(table, false); + String[] parts = ident.name().split("\\$", 2); + if (parts.length == 2) { + TestTables.TestTable table = TestTables.load(parts[0]); + String[] metadataColumns = parts[1].split(","); + return new SparkTestTable(table, metadataColumns, false); + } else { + TestTables.TestTable table = TestTables.load(ident.name()); + return new SparkTestTable(table, null, false); + } } } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java new file mode 100644 index 000000000000..8047e56ab343 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED; +import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED; + +@RunWith(Parameterized.class) +public class TestSparkMetadataColumns extends SparkTestBase { + + private static final String TABLE_NAME = "test_table"; + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "category", Types.StringType.get()), + Types.NestedField.optional(3, "data", Types.StringType.get()) + ); + private static final PartitionSpec UNKNOWN_SPEC = PartitionSpecParser.fromJson(SCHEMA, + "{ \"spec-id\": 1, \"fields\": [ { \"name\": \"id_zero\", \"transform\": \"zero\", \"source-id\": 1 } ] }"); + + @Parameterized.Parameters(name = "fileFormat = {0}, vectorized = {1}, formatVersion = {2}") + public static Object[][] parameters() { + return new Object[][] { + { FileFormat.PARQUET, false, 1}, + { FileFormat.PARQUET, true, 1}, + { FileFormat.PARQUET, false, 2}, + { FileFormat.PARQUET, true, 2}, + { FileFormat.AVRO, false, 1}, + { FileFormat.AVRO, false, 2}, + { FileFormat.ORC, false, 1}, + { FileFormat.ORC, true, 1}, + { FileFormat.ORC, false, 2}, + { FileFormat.ORC, true, 2}, + }; + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private final FileFormat fileFormat; + private final boolean vectorized; + private final int formatVersion; + + private Table table = null; + + public TestSparkMetadataColumns(FileFormat fileFormat, boolean vectorized, int formatVersion) { + this.fileFormat = fileFormat; + this.vectorized = vectorized; + this.formatVersion = formatVersion; + } + + @BeforeClass + public static void setupSpark() { + ImmutableMap config = ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "cache-enabled", "true" + ); + spark.conf().set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog"); + config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.spark_catalog." + key, value)); + } + + @Before + public void setupTable() throws IOException { + createAndInitTable(); + } + + @After + public void dropTable() { + TestTables.clearTables(); + } + + @Test + public void testSpecAndPartitionMetadataColumns() { + // TODO: support metadata structs in vectorized ORC reads + Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized); + + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME); + + table.refresh(); + table.updateSpec() + .addField("data") + .commit(); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME); + + table.refresh(); + table.updateSpec() + .addField(Expressions.bucket("category", 8)) + .commit(); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME); + + table.refresh(); + table.updateSpec() + .removeField("data") + .commit(); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME); + + table.refresh(); + table.updateSpec() + .renameField("category_bucket_8", "category_bucket_8_another_name") + .commit(); + + List expected = ImmutableList.of( + row(0, row(null, null)), + row(1, row("b1", null)), + row(2, row("b1", 2)), + row(3, row(null, 2)) + ); + assertEquals("Rows must match", expected, + sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME)); + } + + @Test + public void testPartitionMetadataColumnWithUnknownTransforms() { + // replace the table spec to include an unknown transform + TableOperations ops = ((HasTableOperations) table).operations(); + TableMetadata base = ops.current(); + ops.commit(base, base.updatePartitionSpec(UNKNOWN_SPEC)); + + AssertHelpers.assertThrows("Should fail to query the partition metadata column", + ValidationException.class, "Cannot build table partition type, unknown transforms", + () -> sql("SELECT _partition FROM `%s$_partition`", TABLE_NAME)); + } + + private void createAndInitTable() throws IOException { + this.table = TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, PartitionSpec.unpartitioned()); + + UpdateProperties updateProperties = table.updateProperties(); + updateProperties.set(FORMAT_VERSION, String.valueOf(formatVersion)); + updateProperties.set(DEFAULT_FILE_FORMAT, fileFormat.name()); + + switch (fileFormat) { + case PARQUET: + updateProperties.set(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)); + break; + case ORC: + updateProperties.set(ORC_VECTORIZATION_ENABLED, String.valueOf(vectorized)); + break; + default: + Preconditions.checkState(!vectorized, "File format %s does not support vectorized reads", fileFormat); + } + + updateProperties.commit(); + } +} From ac34d21ad89682390f866f45d430a0ea68f4653c Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 27 Sep 2021 09:36:13 -0700 Subject: [PATCH 2/2] Review comments --- .../apache/iceberg/util/StructProjection.java | 13 ++++++------- .../java/org/apache/iceberg/MetadataColumns.java | 2 +- .../org/apache/iceberg/util/PartitionUtil.java | 16 +++++++++------- .../iceberg/spark/source/SparkTestTable.java | 1 + .../spark/source/TestSparkMetadataColumns.java | 2 ++ 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java index e8236d138ceb..c18f69f729f7 100644 --- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java +++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java @@ -61,16 +61,14 @@ public static StructProjection create(Schema dataSchema, Schema projectedSchema) /** * Creates a projecting wrapper for {@link StructLike} rows. *

- * This projection does not work with repeated types like lists and maps. + * This projection allows missing fields and does not work with repeated types like lists and maps. * * @param structType type of rows wrapped by this projection * @param projectedStructType result type of the projected rows - * @param projectMissingFieldsAsNulls a flag whether to project missing fields as nulls * @return a wrapper to project rows */ - public static StructProjection create(StructType structType, StructType projectedStructType, - boolean projectMissingFieldsAsNulls) { - return new StructProjection(structType, projectedStructType, projectMissingFieldsAsNulls); + public static StructProjection createAllowMissing(StructType structType, StructType projectedStructType) { + return new StructProjection(structType, projectedStructType, true); } private final StructType type; @@ -83,7 +81,7 @@ private StructProjection(StructType structType, StructType projection) { } @SuppressWarnings("checkstyle:CyclomaticComplexity") - private StructProjection(StructType structType, StructType projection, boolean projectMissingFieldsAsNulls) { + private StructProjection(StructType structType, StructType projection, boolean allowMissing) { this.type = projection; this.positionMap = new int[projection.fields().size()]; this.nestedProjections = new StructProjection[projection.fields().size()]; @@ -136,8 +134,9 @@ private StructProjection(StructType structType, StructType projection, boolean p } } - if (!found && projectedField.isOptional() && projectMissingFieldsAsNulls) { + if (!found && projectedField.isOptional() && allowMissing) { positionMap[pos] = -1; + nestedProjections[pos] = null; } else if (!found) { throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectedField, structType)); } diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index 395bf2d0eb75..af7b655b2bfe 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -39,7 +39,7 @@ private MetadataColumns() { public static final NestedField IS_DELETED = NestedField.required( Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted"); public static final NestedField SPEC_ID = NestedField.required( - Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID to which a row belongs to"); + Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID used to track the file containing a row"); // the partition column type is not static and depends on all specs in the table public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5; public static final String PARTITION_COLUMN_NAME = "_partition"; diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index b2c3a733f32b..02c8b302dad5 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -62,12 +62,14 @@ private PartitionUtil() { convertConstant.apply(Types.IntegerType.get(), task.file().specId())); // add _partition - if (partitionType != null && partitionType.fields().size() > 0) { - StructLike coercedPartition = coercePartition(partitionType, spec, partitionData); - idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition)); - } else if (partitionType != null) { - // use null as some query engines may not be able to handle empty structs - idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null); + if (partitionType != null) { + if (partitionType.fields().size() > 0) { + StructLike coercedPartition = coercePartition(partitionType, spec, partitionData); + idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition)); + } else { + // use null as some query engines may not be able to handle empty structs + idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null); + } } List partitionFields = spec.partitionType().fields(); @@ -85,7 +87,7 @@ private PartitionUtil() { // adapts the provided partition data to match the table partition type private static StructLike coercePartition(Types.StructType partitionType, PartitionSpec spec, StructLike partition) { - StructProjection projection = StructProjection.create(spec.partitionType(), partitionType, true); + StructProjection projection = StructProjection.createAllowMissing(spec.partitionType(), partitionType); projection.wrap(partition); return projection; } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java index 078246222411..afb1136f4fa5 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +// TODO: remove this class once we compile against Spark 3.2 public class SparkTestTable extends SparkTable { private final String[] metadataColumnNames; diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java index 8047e56ab343..b29d281863cb 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java @@ -117,6 +117,8 @@ public void dropTable() { TestTables.clearTables(); } + // TODO: remove testing workarounds once we compile against Spark 3.2 + @Test public void testSpecAndPartitionMetadataColumns() { // TODO: support metadata structs in vectorized ORC reads