diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
index be05b0fe2db5..c18f69f729f7 100644
--- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java
+++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
@@ -58,12 +58,30 @@ public static StructProjection create(Schema dataSchema, Schema projectedSchema)
return new StructProjection(dataSchema.asStruct(), projectedSchema.asStruct());
}
+ /**
+ * Creates a projecting wrapper for {@link StructLike} rows.
+ *
+ * This projection allows missing fields and does not work with repeated types like lists and maps.
+ *
+ * @param structType type of rows wrapped by this projection
+ * @param projectedStructType result type of the projected rows
+ * @return a wrapper to project rows
+ */
+ public static StructProjection createAllowMissing(StructType structType, StructType projectedStructType) {
+ return new StructProjection(structType, projectedStructType, true);
+ }
+
private final StructType type;
private final int[] positionMap;
private final StructProjection[] nestedProjections;
private StructLike struct;
private StructProjection(StructType structType, StructType projection) {
+ this(structType, projection, false);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private StructProjection(StructType structType, StructType projection, boolean allowMissing) {
this.type = projection;
this.positionMap = new int[projection.fields().size()];
this.nestedProjections = new StructProjection[projection.fields().size()];
@@ -116,7 +134,10 @@ private StructProjection(StructType structType, StructType projection) {
}
}
- if (!found) {
+ if (!found && projectedField.isOptional() && allowMissing) {
+ positionMap[pos] = -1;
+ nestedProjections[pos] = null;
+ } else if (!found) {
throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectedField, structType));
}
}
@@ -134,11 +155,17 @@ public int size() {
@Override
public T get(int pos, Class javaClass) {
+ int structPos = positionMap[pos];
+
if (nestedProjections[pos] != null) {
- return javaClass.cast(nestedProjections[pos].wrap(struct.get(positionMap[pos], StructLike.class)));
+ return javaClass.cast(nestedProjections[pos].wrap(struct.get(structPos, StructLike.class)));
}
- return struct.get(positionMap[pos], javaClass);
+ if (structPos != -1) {
+ return struct.get(structPos, javaClass);
+ } else {
+ return null;
+ }
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
index e1cf096cd003..af7b655b2bfe 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
@@ -38,6 +38,12 @@ private MetadataColumns() {
Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
public static final NestedField IS_DELETED = NestedField.required(
Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted");
+ public static final NestedField SPEC_ID = NestedField.required(
+ Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID used to track the file containing a row");
+ // the partition column type is not static and depends on all specs in the table
+ public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
+ public static final String PARTITION_COLUMN_NAME = "_partition";
+ public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to";
// IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
public static final NestedField DELETE_FILE_PATH = NestedField.required(
@@ -51,24 +57,39 @@ private MetadataColumns() {
private static final Map META_COLUMNS = ImmutableMap.of(
FILE_PATH.name(), FILE_PATH,
ROW_POSITION.name(), ROW_POSITION,
- IS_DELETED.name(), IS_DELETED);
+ IS_DELETED.name(), IS_DELETED,
+ SPEC_ID.name(), SPEC_ID
+ );
- private static final Set META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId)
- .collect(ImmutableSet.toImmutableSet());
+ private static final Set META_IDS = ImmutableSet.of(
+ FILE_PATH.fieldId(),
+ ROW_POSITION.fieldId(),
+ IS_DELETED.fieldId(),
+ SPEC_ID.fieldId(),
+ PARTITION_COLUMN_ID
+ );
public static Set metadataFieldIds() {
return META_IDS;
}
- public static NestedField get(String name) {
- return META_COLUMNS.get(name);
+ public static NestedField metadataColumn(Table table, String name) {
+ if (name.equals(PARTITION_COLUMN_NAME)) {
+ return Types.NestedField.optional(
+ PARTITION_COLUMN_ID,
+ PARTITION_COLUMN_NAME,
+ Partitioning.partitionType(table),
+ PARTITION_COLUMN_DOC);
+ } else {
+ return META_COLUMNS.get(name);
+ }
}
public static boolean isMetadataColumn(String name) {
- return META_COLUMNS.containsKey(name);
+ return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name);
}
public static boolean nonMetadataColumn(String name) {
- return !META_COLUMNS.containsKey(name);
+ return !isMetadataColumn(name);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index 929f77af4e78..02c8b302dad5 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -36,10 +36,15 @@ private PartitionUtil() {
}
public static Map constantsMap(FileScanTask task) {
- return constantsMap(task, (type, constant) -> constant);
+ return constantsMap(task, null, (type, constant) -> constant);
}
public static Map constantsMap(FileScanTask task, BiFunction convertConstant) {
+ return constantsMap(task, null, convertConstant);
+ }
+
+ public static Map constantsMap(FileScanTask task, Types.StructType partitionType,
+ BiFunction convertConstant) {
PartitionSpec spec = task.spec();
StructLike partitionData = task.file().partition();
@@ -51,6 +56,22 @@ private PartitionUtil() {
MetadataColumns.FILE_PATH.fieldId(),
convertConstant.apply(Types.StringType.get(), task.file().path()));
+ // add _spec_id
+ idToConstant.put(
+ MetadataColumns.SPEC_ID.fieldId(),
+ convertConstant.apply(Types.IntegerType.get(), task.file().specId()));
+
+ // add _partition
+ if (partitionType != null) {
+ if (partitionType.fields().size() > 0) {
+ StructLike coercedPartition = coercePartition(partitionType, spec, partitionData);
+ idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition));
+ } else {
+ // use null as some query engines may not be able to handle empty structs
+ idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null);
+ }
+ }
+
List partitionFields = spec.partitionType().fields();
List fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
@@ -63,4 +84,11 @@ private PartitionUtil() {
return idToConstant;
}
+
+ // adapts the provided partition data to match the table partition type
+ private static StructLike coercePartition(Types.StructType partitionType, PartitionSpec spec, StructLike partition) {
+ StructProjection projection = StructProjection.createAllowMissing(spec.partitionType(), partitionType);
+ projection.wrap(partition);
+ return projection;
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index c8b33dd2f706..b58745c7a00d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -24,24 +24,32 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
@@ -55,6 +63,7 @@
abstract class BaseDataReader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class);
+ private final Table table;
private final Iterator tasks;
private final Map inputFiles;
@@ -62,17 +71,18 @@ abstract class BaseDataReader implements Closeable {
private T current = null;
private FileScanTask currentTask = null;
- BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
+ BaseDataReader(Table table, CombinedScanTask task) {
+ this.table = table;
this.tasks = task.files().iterator();
Map keyMetadata = Maps.newHashMap();
task.files().stream()
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream encrypted = keyMetadata.entrySet().stream()
- .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
+ .map(entry -> EncryptedFiles.encryptedInput(table.io().newInputFile(entry.getKey()), entry.getValue()));
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
- Iterable decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
+ Iterable decryptedFiles = table.encryption().decrypt(encrypted::iterator);
Map files = Maps.newHashMapWithExpectedSize(task.files().size());
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
@@ -132,6 +142,15 @@ protected InputFile getInputFile(String location) {
return inputFiles.get(location);
}
+ protected Map constantsMap(FileScanTask task, Schema readSchema) {
+ if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
+ StructType partitionType = Partitioning.partitionType(table);
+ return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
+ } else {
+ return PartitionUtil.constantsMap(task, BaseDataReader::convertConstant);
+ }
+ }
+
protected static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
@@ -155,6 +174,24 @@ protected static Object convertConstant(Type type, Object value) {
return ByteBuffers.toByteArray((ByteBuffer) value);
case BINARY:
return ByteBuffers.toByteArray((ByteBuffer) value);
+ case STRUCT:
+ StructType structType = (StructType) type;
+
+ if (structType.fields().isEmpty()) {
+ return new GenericInternalRow();
+ }
+
+ List fields = structType.fields();
+ Object[] values = new Object[fields.size()];
+ StructLike struct = (StructLike) value;
+
+ for (int index = 0; index < fields.size(); index++) {
+ NestedField field = fields.get(index);
+ Type fieldType = field.type();
+ values[index] = convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass()));
+ }
+
+ return new GenericInternalRow(values);
default:
}
return value;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 8cfe46b598fc..e4bd3ceba6ce 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -41,7 +41,6 @@
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -52,7 +51,7 @@ class BatchDataReader extends BaseDataReader {
private final int batchSize;
BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
- super(task, table.io(), table.encryption());
+ super(table, task);
this.expectedSchema = expectedSchema;
this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
@@ -66,7 +65,7 @@ CloseableIterator open(FileScanTask task) {
// update the current file for Spark's filename() function
InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
- Map idToConstant = PartitionUtil.constantsMap(task, BatchDataReader::convertConstant);
+ Map idToConstant = constantsMap(task, expectedSchema);
CloseableIterable iter;
InputFile location = getInputFile(task);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index d4328addc759..ce2226f4f75e 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -26,7 +26,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -44,7 +43,7 @@ CloseableIterator open(FileScanTask task) {
// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
- Map idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+ Map idToConstant = constantsMap(task, expectedSchema);
DataFile file = task.file();
// update the current file for Spark's filename() function
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 391d4a053490..8770e17aa015 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -44,7 +44,6 @@
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -56,7 +55,7 @@ class RowDataReader extends BaseDataReader {
private final boolean caseSensitive;
RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
- super(task, table.io(), table.encryption());
+ super(table, task);
this.tableSchema = table.schema();
this.expectedSchema = expectedSchema;
this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
@@ -69,7 +68,7 @@ CloseableIterator open(FileScanTask task) {
// schema or rows returned by readers
Schema requiredSchema = deletes.requiredSchema();
- Map idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+ Map idToConstant = constantsMap(task, expectedSchema);
DataFile file = task.file();
// update the current file for Spark's filename() function
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
index 51b47cbd972d..8bae666c0475 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
@@ -30,7 +30,6 @@
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
-import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.DataFile;
@@ -39,8 +38,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
@@ -59,7 +56,7 @@ public abstract class TestSparkBaseDataReader {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
- private static final Configuration CONFD = new Configuration();
+ private Table table;
// Simulates the closeable iterator of data to be read
private static class CloseableIntegerRange implements CloseableIterator {
@@ -92,10 +89,8 @@ public Integer next() {
private static class ClosureTrackingReader extends BaseDataReader {
private Map tracker = new HashMap<>();
- ClosureTrackingReader(List tasks) {
- super(new BaseCombinedScanTask(tasks),
- new HadoopFileIO(CONFD),
- new PlaintextEncryptionManager());
+ ClosureTrackingReader(Table table, List tasks) {
+ super(table, new BaseCombinedScanTask(tasks));
}
@Override
@@ -124,7 +119,7 @@ public void testClosureOnDataExhaustion() throws IOException {
Integer recordPerTask = 10;
List tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
int countRecords = 0;
while (reader.next()) {
@@ -151,7 +146,7 @@ public void testClosureDuringIteration() throws IOException {
FileScanTask firstTask = tasks.get(0);
FileScanTask secondTask = tasks.get(1);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
// Total of 2 elements
Assert.assertTrue(reader.next());
@@ -175,7 +170,7 @@ public void testClosureWithoutAnyRead() throws IOException {
Integer recordPerTask = 10;
List tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
reader.close();
@@ -191,7 +186,7 @@ public void testExplicitClosure() throws IOException {
Integer recordPerTask = 10;
List tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
Integer halfDataSize = (totalTasks * recordPerTask) / 2;
for (int i = 0; i < halfDataSize; i++) {
@@ -217,7 +212,7 @@ public void testIdempotentExplicitClosure() throws IOException {
Integer recordPerTask = 10;
List tasks = createFileScanTasks(totalTasks, recordPerTask);
- ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+ ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
// Total 100 elements, only 5 iterators have been created
for (int i = 0; i < 45; i++) {
@@ -250,7 +245,7 @@ private List createFileScanTasks(Integer totalTasks, Integer recor
);
try {
- Table table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
+ this.table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
// Important: use the table's schema for the rest of the test
// When tables are created, the column ids are reassigned.
Schema tableSchema = table.schema();
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 633a17143f52..483dbcfc5b2f 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -149,7 +149,7 @@ private Schema schemaWithMetadataColumns() {
// metadata columns
List fields = metaColumns.stream()
.distinct()
- .map(MetadataColumns::get)
+ .map(name -> MetadataColumns.metadataColumn(table, name))
.collect(Collectors.toList());
Schema meta = new Schema(fields);
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java
new file mode 100644
index 000000000000..afb1136f4fa5
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+// TODO: remove this class once we compile against Spark 3.2
+public class SparkTestTable extends SparkTable {
+
+ private final String[] metadataColumnNames;
+
+ public SparkTestTable(Table icebergTable, String[] metadataColumnNames, boolean refreshEagerly) {
+ super(icebergTable, refreshEagerly);
+ this.metadataColumnNames = metadataColumnNames;
+ }
+
+ @Override
+ public StructType schema() {
+ StructType schema = super.schema();
+ if (metadataColumnNames != null) {
+ for (String columnName : metadataColumnNames) {
+ Types.NestedField metadataColumn = MetadataColumns.metadataColumn(table(), columnName);
+ schema = schema.add(columnName, SparkSchemaUtil.convert(metadataColumn.type()));
+ }
+ }
+ return schema;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ SparkScanBuilder scanBuilder = (SparkScanBuilder) super.newScanBuilder(options);
+ if (metadataColumnNames != null) {
+ scanBuilder.withMetadataColumns(metadataColumnNames);
+ }
+ return scanBuilder;
+ }
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 92013d396c1a..027c88cd4df6 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.spark.source;
-import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -31,7 +30,14 @@ public class TestSparkCatalog exten
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
- TestTables.TestTable table = TestTables.load(Spark3Util.identifierToTableIdentifier(ident).toString());
- return new SparkTable(table, false);
+ String[] parts = ident.name().split("\\$", 2);
+ if (parts.length == 2) {
+ TestTables.TestTable table = TestTables.load(parts[0]);
+ String[] metadataColumns = parts[1].split(",");
+ return new SparkTestTable(table, metadataColumns, false);
+ } else {
+ TestTables.TestTable table = TestTables.load(ident.name());
+ return new SparkTestTable(table, null, false);
+ }
}
}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
new file mode 100644
index 000000000000..b29d281863cb
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED;
+import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
+
+@RunWith(Parameterized.class)
+public class TestSparkMetadataColumns extends SparkTestBase {
+
+ private static final String TABLE_NAME = "test_table";
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "category", Types.StringType.get()),
+ Types.NestedField.optional(3, "data", Types.StringType.get())
+ );
+ private static final PartitionSpec UNKNOWN_SPEC = PartitionSpecParser.fromJson(SCHEMA,
+ "{ \"spec-id\": 1, \"fields\": [ { \"name\": \"id_zero\", \"transform\": \"zero\", \"source-id\": 1 } ] }");
+
+ @Parameterized.Parameters(name = "fileFormat = {0}, vectorized = {1}, formatVersion = {2}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ { FileFormat.PARQUET, false, 1},
+ { FileFormat.PARQUET, true, 1},
+ { FileFormat.PARQUET, false, 2},
+ { FileFormat.PARQUET, true, 2},
+ { FileFormat.AVRO, false, 1},
+ { FileFormat.AVRO, false, 2},
+ { FileFormat.ORC, false, 1},
+ { FileFormat.ORC, true, 1},
+ { FileFormat.ORC, false, 2},
+ { FileFormat.ORC, true, 2},
+ };
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private final FileFormat fileFormat;
+ private final boolean vectorized;
+ private final int formatVersion;
+
+ private Table table = null;
+
+ public TestSparkMetadataColumns(FileFormat fileFormat, boolean vectorized, int formatVersion) {
+ this.fileFormat = fileFormat;
+ this.vectorized = vectorized;
+ this.formatVersion = formatVersion;
+ }
+
+ @BeforeClass
+ public static void setupSpark() {
+ ImmutableMap config = ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "cache-enabled", "true"
+ );
+ spark.conf().set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog");
+ config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.spark_catalog." + key, value));
+ }
+
+ @Before
+ public void setupTable() throws IOException {
+ createAndInitTable();
+ }
+
+ @After
+ public void dropTable() {
+ TestTables.clearTables();
+ }
+
+ // TODO: remove testing workarounds once we compile against Spark 3.2
+
+ @Test
+ public void testSpecAndPartitionMetadataColumns() {
+ // TODO: support metadata structs in vectorized ORC reads
+ Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .addField("data")
+ .commit();
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .addField(Expressions.bucket("category", 8))
+ .commit();
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .removeField("data")
+ .commit();
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+ table.refresh();
+ table.updateSpec()
+ .renameField("category_bucket_8", "category_bucket_8_another_name")
+ .commit();
+
+ List