Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions api/src/main/java/org/apache/iceberg/util/StructProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,30 @@ public static StructProjection create(Schema dataSchema, Schema projectedSchema)
return new StructProjection(dataSchema.asStruct(), projectedSchema.asStruct());
}

/**
* Creates a projecting wrapper for {@link StructLike} rows.
* <p>
* This projection allows missing fields and does not work with repeated types like lists and maps.
*
* @param structType type of rows wrapped by this projection
* @param projectedStructType result type of the projected rows
* @return a wrapper to project rows
*/
public static StructProjection createAllowMissing(StructType structType, StructType projectedStructType) {
return new StructProjection(structType, projectedStructType, true);
}

private final StructType type;
private final int[] positionMap;
private final StructProjection[] nestedProjections;
private StructLike struct;

private StructProjection(StructType structType, StructType projection) {
this(structType, projection, false);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private StructProjection(StructType structType, StructType projection, boolean allowMissing) {
this.type = projection;
this.positionMap = new int[projection.fields().size()];
this.nestedProjections = new StructProjection[projection.fields().size()];
Expand Down Expand Up @@ -116,7 +134,10 @@ private StructProjection(StructType structType, StructType projection) {
}
}

if (!found) {
if (!found && projectedField.isOptional() && allowMissing) {
positionMap[pos] = -1;
nestedProjections[pos] = null;
} else if (!found) {
throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectedField, structType));
}
}
Expand All @@ -134,11 +155,17 @@ public int size() {

@Override
public <T> T get(int pos, Class<T> javaClass) {
int structPos = positionMap[pos];

if (nestedProjections[pos] != null) {
return javaClass.cast(nestedProjections[pos].wrap(struct.get(positionMap[pos], StructLike.class)));
return javaClass.cast(nestedProjections[pos].wrap(struct.get(structPos, StructLike.class)));
}

return struct.get(positionMap[pos], javaClass);
if (structPos != -1) {
return struct.get(structPos, javaClass);
} else {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this relies on a null for nestedProjections[pos], I think it should set nestedProjections[pos] to null in the constructor where positionMap[pos] is set to -1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is actually always null as long as the field was not found. I'll add an explicit call, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

}
}

@Override
Expand Down
35 changes: 28 additions & 7 deletions core/src/main/java/org/apache/iceberg/MetadataColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ private MetadataColumns() {
Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
public static final NestedField IS_DELETED = NestedField.required(
Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted");
public static final NestedField SPEC_ID = NestedField.required(
Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID used to track the file containing a row");
// the partition column type is not static and depends on all specs in the table
public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that new reserved columns need to be added to the spec. We should also make sure the spec notes the ranges that are reserved. I'm not sure if we did that or just added specific IDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do that in a follow-up as we are missing _deleted too.

public static final String PARTITION_COLUMN_NAME = "_partition";
public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to";

// IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
public static final NestedField DELETE_FILE_PATH = NestedField.required(
Expand All @@ -51,24 +57,39 @@ private MetadataColumns() {
private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
FILE_PATH.name(), FILE_PATH,
ROW_POSITION.name(), ROW_POSITION,
IS_DELETED.name(), IS_DELETED);
IS_DELETED.name(), IS_DELETED,
SPEC_ID.name(), SPEC_ID
);

private static final Set<Integer> META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change this as the partition type is not static and is handled specially.

.collect(ImmutableSet.toImmutableSet());
private static final Set<Integer> META_IDS = ImmutableSet.of(
FILE_PATH.fieldId(),
ROW_POSITION.fieldId(),
IS_DELETED.fieldId(),
SPEC_ID.fieldId(),
PARTITION_COLUMN_ID
);

public static Set<Integer> metadataFieldIds() {
return META_IDS;
}

public static NestedField get(String name) {
return META_COLUMNS.get(name);
public static NestedField metadataColumn(Table table, String name) {
if (name.equals(PARTITION_COLUMN_NAME)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the logic case sensitive as before but we maybe should reconsider it at some point.

return Types.NestedField.optional(
PARTITION_COLUMN_ID,
PARTITION_COLUMN_NAME,
Partitioning.partitionType(table),
PARTITION_COLUMN_DOC);
} else {
return META_COLUMNS.get(name);
}
}

public static boolean isMetadataColumn(String name) {
return META_COLUMNS.containsKey(name);
return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am starting to wonder if it is better to just have a dummy column type mapping for _partition in META_COLUMNS and only separate the logic when necessary, which can avoid changes like this in quite a few places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that first but META_COLUMNS is an immutable map that does not allow null keys. I am reluctant to switch to a mutable map so I added this condition here. I hope we will be able to use metadataColumn(table, name) in other places so this workaround will be only part of MetadataColumns.

It looks ugly, though, I agree.

}

public static boolean nonMetadataColumn(String name) {
return !META_COLUMNS.containsKey(name);
return !isMetadataColumn(name);
}
}
30 changes: 29 additions & 1 deletion core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ private PartitionUtil() {
}

public static Map<Integer, ?> constantsMap(FileScanTask task) {
return constantsMap(task, (type, constant) -> constant);
return constantsMap(task, null, (type, constant) -> constant);
}

public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
return constantsMap(task, null, convertConstant);
}

public static Map<Integer, ?> constantsMap(FileScanTask task, Types.StructType partitionType,
BiFunction<Type, Object, Object> convertConstant) {
PartitionSpec spec = task.spec();
StructLike partitionData = task.file().partition();

Expand All @@ -51,6 +56,22 @@ private PartitionUtil() {
MetadataColumns.FILE_PATH.fieldId(),
convertConstant.apply(Types.StringType.get(), task.file().path()));

// add _spec_id
idToConstant.put(
MetadataColumns.SPEC_ID.fieldId(),
convertConstant.apply(Types.IntegerType.get(), task.file().specId()));

// add _partition
if (partitionType != null) {
if (partitionType.fields().size() > 0) {
StructLike coercedPartition = coercePartition(partitionType, spec, partitionData);
idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition));
} else {
// use null as some query engines may not be able to handle empty structs
idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null);
}
}

List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
Expand All @@ -63,4 +84,11 @@ private PartitionUtil() {

return idToConstant;
}

// adapts the provided partition data to match the table partition type
private static StructLike coercePartition(Types.StructType partitionType, PartitionSpec spec, StructLike partition) {
StructProjection projection = StructProjection.createAllowMissing(spec.partitionType(), partitionType);
projection.wrap(partition);
return projection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,32 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
Expand All @@ -55,24 +63,26 @@
abstract class BaseDataReader<T> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class);

private final Table table;
private final Iterator<FileScanTask> tasks;
private final Map<String, InputFile> inputFiles;

private CloseableIterator<T> currentIterator;
private T current = null;
private FileScanTask currentTask = null;

BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
BaseDataReader(Table table, CombinedScanTask task) {
this.table = table;
this.tasks = task.files().iterator();
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
task.files().stream()
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
.map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
.map(entry -> EncryptedFiles.encryptedInput(table.io().newInputFile(entry.getKey()), entry.getValue()));

// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encrypted::iterator);

Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
Expand Down Expand Up @@ -132,6 +142,15 @@ protected InputFile getInputFile(String location) {
return inputFiles.get(location);
}

protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) {
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer just to call the method rather than trying to optimize by not adding the partition entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer calling this all the time too. However, this would mean we can no longer query tables with unknown transforms. We need to know all transforms to build a common partition type. That's why I don't set the partition column if it is not requested.

Any thoughts on this, @rdblue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave unknown transforms out of the partition type instead? We can just ignore them if they're unknown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably consider persisting the partition type in the metadata instead. It might be confusing to silently ignore a partition column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine for unknown partitions right now. It would unblock this without much risk.

} else {
return PartitionUtil.constantsMap(task, BaseDataReader::convertConstant);
}
}

protected static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
Expand All @@ -155,6 +174,24 @@ protected static Object convertConstant(Type type, Object value) {
return ByteBuffers.toByteArray((ByteBuffer) value);
case BINARY:
return ByteBuffers.toByteArray((ByteBuffer) value);
case STRUCT:
StructType structType = (StructType) type;

if (structType.fields().isEmpty()) {
return new GenericInternalRow();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't hit this clause with _partition as the passed value will be null.

}

List<NestedField> fields = structType.fields();
Object[] values = new Object[fields.size()];
StructLike struct = (StructLike) value;

for (int index = 0; index < fields.size(); index++) {
NestedField field = fields.get(index);
Type fieldType = field.type();
values[index] = convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass()));
}

return new GenericInternalRow(values);
default:
}
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.vectorized.ColumnarBatch;

Expand All @@ -52,7 +51,7 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
private final int batchSize;

BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
super(task, table.io(), table.encryption());
super(table, task);
this.expectedSchema = expectedSchema;
this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
this.caseSensitive = caseSensitive;
Expand All @@ -66,7 +65,7 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
// update the current file for Spark's filename() function
InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());

Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, BatchDataReader::convertConstant);
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);

CloseableIterable<ColumnarBatch> iter;
InputFile location = getInputFile(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;

Expand All @@ -44,7 +43,7 @@ CloseableIterator<InternalRow> open(FileScanTask task) {

// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
DataFile file = task.file();

// update the current file for Spark's filename() function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;

Expand All @@ -56,7 +55,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
private final boolean caseSensitive;

RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
super(task, table.io(), table.encryption());
super(table, task);
this.tableSchema = table.schema();
this.expectedSchema = expectedSchema;
this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
Expand All @@ -69,7 +68,7 @@ CloseableIterator<InternalRow> open(FileScanTask task) {

// schema or rows returned by readers
Schema requiredSchema = deletes.requiredSchema();
Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
DataFile file = task.file();

// update the current file for Spark's filename() function
Expand Down
Loading