Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion api/src/main/java/org/apache/iceberg/util/StructProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.util.Set;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.MapType;
import org.apache.iceberg.types.Types.StructType;

public class StructProjection implements StructLike {
Expand Down Expand Up @@ -82,8 +85,31 @@ private StructProjection(StructType structType, StructType projection) {
dataField.type().asStructType(), projectedField.type().asStructType());
break;
case MAP:
MapType projectedMap = projectedField.type().asMapType();
MapType originalMap = dataField.type().asMapType();

boolean keyProjectable = !projectedMap.keyType().isNestedType() ||
projectedMap.keyType().equals(originalMap.keyType());
boolean valueProjectable = !projectedMap.valueType().isNestedType() ||
projectedMap.valueType().equals(originalMap.valueType());
Preconditions.checkArgument(keyProjectable && valueProjectable,
"Cannot project a partial map key or value struct. Trying to project %s out of %s",
projectedField, dataField);

nestedProjections[pos] = null;
break;
case LIST:
throw new IllegalArgumentException(String.format("Cannot project list or map field: %s", projectedField));
ListType projectedList = projectedField.type().asListType();
ListType originalList = dataField.type().asListType();

boolean elementProjectable = !projectedList.elementType().isNestedType() ||
projectedList.elementType().equals(originalList.elementType());
Preconditions.checkArgument(elementProjectable,
"Cannot project a partial list element struct. Trying to project %s out of %s",
projectedField, dataField);

nestedProjections[pos] = null;
break;
default:
nestedProjections[pos] = null;
}
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -102,15 +101,13 @@ protected CloseableIterable<FileScanTask> planFiles(
TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = allManifestFiles(ops.current().snapshots());
Type fileProjection = schema().findType("data_file");
Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
ops.io(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById()));
ops.io(), manifest, schema(), schemaString, specString, residuals, ops.current().specsById()));
}
}

Expand Down
18 changes: 12 additions & 6 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

/**
* A {@link Table} implementation that exposes a table's valid manifest files as rows.
Expand Down Expand Up @@ -124,7 +125,6 @@ protected CloseableIterable<FileScanTask> planFiles(
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());

// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> {
if (snap.manifestListLocation() != null) {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
Expand All @@ -134,26 +134,29 @@ protected CloseableIterable<FileScanTask> planFiles(
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask(
return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask(
manifestListAsDataFile, null,
schemaString, specString, residuals));
} else {
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
snap.allManifests(),
manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest));
MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(),
manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest)
);
}
}));
}
}

static class ManifestListReadTask implements DataTask {
private final FileIO io;
private final Schema schema;
private final PartitionSpec spec;
private final FileScanTask manifestListTask;

ManifestListReadTask(FileIO io, PartitionSpec spec, FileScanTask manifestListTask) {
ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) {
this.io = io;
this.schema = schema;
this.spec = spec;
this.manifestListTask = manifestListTask;
}
Expand All @@ -175,9 +178,12 @@ public CloseableIterable<StructLike> rows() {
.reuseContainers(false)
.build()) {

return CloseableIterable.transform(manifests,
CloseableIterable<StructLike> rowIterable = CloseableIterable.transform(manifests,
manifest -> ManifestsTable.manifestFileToRow(spec, manifest));

StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
return CloseableIterable.transform(rowIterable, projection::wrap);

} catch (IOException e) {
throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListTask.file().path());
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,8 @@ protected CloseableIterable<FileScanTask> planFiles(
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/HistoryTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ private DataTask task(TableScan scan) {
TableOperations ops = operations();
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
ops.current().snapshotLog(),
convertHistoryEntryFunc(table()));
schema(), scan.schema(), ops.current().snapshotLog(),
convertHistoryEntryFunc(table())
);
}

private class HistoryScan extends StaticTableScan {
Expand Down
24 changes: 17 additions & 7 deletions core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.StructProjection;

/**
* A {@link Table} implementation that exposes a table's manifest entries as rows, for both delete and data files.
Expand Down Expand Up @@ -107,44 +108,53 @@ protected CloseableIterable<FileScanTask> planFiles(
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
// return entries from both data and delete manifests
CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.allManifests());
Type fileProjection = schema().findType("data_file");
Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals,
new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals,
ops.current().specsById()));
}
}

static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema schema;
private final Schema fileSchema;
private final FileIO io;
private final ManifestFile manifest;
private final Map<Integer, PartitionSpec> specsById;

ManifestReadTask(FileIO io, ManifestFile manifest, Schema fileSchema, String schemaString,
ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
String specString, ResidualEvaluator residuals, Map<Integer, PartitionSpec> specsById) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.fileSchema = fileSchema;
this.schema = schema;
this.io = io;
this.manifest = manifest;
this.specsById = specsById;

Type fileProjection = schema.findType("data_file");
this.fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
}

@Override
public CloseableIterable<StructLike> rows() {
// Project data-file fields
CloseableIterable<StructLike> prunedRows;
if (manifest.content() == ManifestContent.DATA) {
return CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(),
prunedRows = CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(),
file -> (GenericManifestEntry<DataFile>) file);
} else {
return CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById)
prunedRows = CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById)
.project(fileSchema).entries(),
file -> (GenericManifestEntry<DeleteFile>) file);
}

// Project non-readable fields
Schema readSchema = ManifestEntry.wrapFileSchema(fileSchema.asStruct());
StructProjection projection = StructProjection.create(readSchema, schema);
return CloseableIterable.transform(prunedRows, projection::wrap);
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ protected DataTask task(TableScan scan) {
String location = scan.snapshot().manifestListLocation();
return StaticDataTask.of(
ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()),
scan.snapshot().allManifests(),
manifest -> ManifestsTable.manifestFileToRow(spec, manifest));
schema(), scan.schema(), scan.snapshot().allManifests(),
manifest -> ManifestsTable.manifestFileToRow(spec, manifest)
);
}

private class ManifestsTableScan extends StaticTableScan {
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,17 @@ private DataTask task(StaticTableScan scan) {
Iterable<Partition> partitions = partitions(scan);
if (table().spec().fields().size() < 1) {
// the table is unpartitioned, partitions contains only the root partition
return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
root -> StaticDataTask.Row.of(root.recordCount, root.fileCount));
return StaticDataTask.of(
io().newInputFile(ops.current().metadataFileLocation()),
schema(), scan.schema(), partitions,
root -> StaticDataTask.Row.of(root.recordCount, root.fileCount)
);
} else {
return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
PartitionsTable::convertPartition);
return StaticDataTask.of(
io().newInputFile(ops.current().metadataFileLocation()),
schema(), scan.schema(), partitions,
PartitionsTable::convertPartition
);
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ private DataTask task(BaseTableScan scan) {
TableOperations ops = operations();
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
ops.current().snapshots(),
SnapshotsTable::snapshotToRow);
schema(), scan.schema(), ops.current().snapshots(),
SnapshotsTable::snapshotToRow
);
}

@Override
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/java/org/apache/iceberg/StaticDataTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,26 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructProjection;

class StaticDataTask implements DataTask {

static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform) {
static <T> DataTask of(InputFile metadata, Schema tableSchema, Schema projectedSchema, Iterable<T> values,
Function<T, Row> transform) {
return new StaticDataTask(metadata,
tableSchema,
projectedSchema,
Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]));
}

private final DataFile metadataFile;
private final StructLike[] rows;
private final Schema tableSchema;
private final Schema projectedSchema;

private StaticDataTask(InputFile metadata, StructLike[] rows) {
private StaticDataTask(InputFile metadata, Schema tableSchema, Schema projectedSchema, StructLike[] rows) {
this.tableSchema = tableSchema;
this.projectedSchema = projectedSchema;
this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(metadata)
.withRecordCount(rows.length)
Expand All @@ -57,7 +65,9 @@ public List<DeleteFile> deletes() {

@Override
public CloseableIterable<StructLike> rows() {
return CloseableIterable.withNoopClose(Arrays.asList(rows));
StructProjection projection = StructProjection.create(tableSchema, projectedSchema);
Iterable<StructLike> projectedRows = Iterables.transform(Arrays.asList(rows), projection::wrap);
return CloseableIterable.withNoopClose(projectedRows);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
Expand All @@ -31,7 +30,6 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
Expand All @@ -40,28 +38,17 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConverters;

class RowDataReader extends BaseDataReader<InternalRow> {
// for some reason, the apply method can't be called from Java without reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
.build();

private final Schema tableSchema;
private final Schema expectedSchema;
Expand Down Expand Up @@ -186,33 +173,10 @@ private CloseableIterable<InternalRow> newOrcIterable(
}

private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
StructInternalRow row = new StructInternalRow(readSchema.asStruct());
CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
task.asDataTask().rows(), row::setStruct);
return CloseableIterable.transform(
asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
}

private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
StructType struct = SparkSchemaUtil.convert(readSchema);

List<AttributeReference> refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
List<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
List<org.apache.spark.sql.catalyst.expressions.Expression> exprs =
Lists.newArrayListWithExpectedSize(struct.fields().length);

for (AttributeReference ref : refs) {
attrs.add(ref.toAttribute());
}

for (Types.NestedField field : finalSchema.columns()) {
int indexInReadSchema = struct.fieldIndex(field.name());
exprs.add(refs.get(indexInReadSchema));
}

return UnsafeProjection.create(
JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
return asSparkRows;
}

protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
Expand Down
Loading