Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private StructProjection(StructType structType, StructType projection) {
break;
case MAP:
case LIST:
throw new IllegalArgumentException(String.format("Cannot project list or map field: %s", projectedField));
// TODO Figure this out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about allowing the projection if the the fields are primitives or if the entire struct is projected? That would cover the cases that are currently supported and avoid introducing a new pruning bug to replace the one you're fixing (where nested structs don't match the requested struct schema).

default:
nestedProjections[pos] = null;
}
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,13 @@ protected CloseableIterable<FileScanTask> planFiles(
TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = allManifestFiles(ops.current().snapshots());
Type fileProjection = schema().findType("data_file");
Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
ops.io(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById()));
ops.io(), manifest, schema(), schemaString, specString, residuals, ops.current().specsById()));
}
}

Expand Down
17 changes: 12 additions & 5 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

/**
* A {@link Table} implementation that exposes a table's valid manifest files as rows.
Expand Down Expand Up @@ -124,7 +125,6 @@ protected CloseableIterable<FileScanTask> planFiles(
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());

// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> {
if (snap.manifestListLocation() != null) {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
Expand All @@ -134,26 +134,30 @@ protected CloseableIterable<FileScanTask> planFiles(
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask(
return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask(
manifestListAsDataFile, null,
schemaString, specString, residuals));
} else {
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
snap.allManifests(),
manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest));
manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest),
MANIFEST_FILE_SCHEMA,
schema());
}
}));
}
}

static class ManifestListReadTask implements DataTask {
private final FileIO io;
private final Schema schema;
private final PartitionSpec spec;
private final FileScanTask manifestListTask;

ManifestListReadTask(FileIO io, PartitionSpec spec, FileScanTask manifestListTask) {
ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) {
this.io = io;
this.schema = schema;
this.spec = spec;
this.manifestListTask = manifestListTask;
}
Expand All @@ -175,9 +179,12 @@ public CloseableIterable<StructLike> rows() {
.reuseContainers(false)
.build()) {

return CloseableIterable.transform(manifests,
CloseableIterable<StructLike> rowIterable = CloseableIterable.transform(manifests,
manifest -> ManifestsTable.manifestFileToRow(spec, manifest));

StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
return CloseableIterable.transform(rowIterable, projection::wrap);

} catch (IOException e) {
throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListTask.file().path());
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,8 @@ protected CloseableIterable<FileScanTask> planFiles(
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/HistoryTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ private DataTask task(TableScan scan) {
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
ops.current().snapshotLog(),
convertHistoryEntryFunc(table()));
convertHistoryEntryFunc(table()),
schema(),
scan.schema());
}

private class HistoryScan extends StaticTableScan {
Expand Down
25 changes: 18 additions & 7 deletions core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.StructProjection;

/**
* A {@link Table} implementation that exposes a table's manifest entries as rows, for both delete and data files.
Expand Down Expand Up @@ -107,44 +109,53 @@ protected CloseableIterable<FileScanTask> planFiles(
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
// return entries from both data and delete manifests
CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.allManifests());
Type fileProjection = schema().findType("data_file");
Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals,
new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals,
ops.current().specsById()));
}
}

static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema schema;
private final Schema fileSchema;
private final FileIO io;
private final ManifestFile manifest;
private final Map<Integer, PartitionSpec> specsById;

ManifestReadTask(FileIO io, ManifestFile manifest, Schema fileSchema, String schemaString,
ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
String specString, ResidualEvaluator residuals, Map<Integer, PartitionSpec> specsById) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.fileSchema = fileSchema;
this.schema = schema;
this.io = io;
this.manifest = manifest;
this.specsById = specsById;

Type fileProjection = schema.findType("data_file");
this.fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
}

@Override
public CloseableIterable<StructLike> rows() {
// Project data-file fields
CloseableIterable<StructLike> prunedRows;
if (manifest.content() == ManifestContent.DATA) {
return CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(),
prunedRows = CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(),
file -> (GenericManifestEntry<DataFile>) file);
} else {
return CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById)
prunedRows = CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById)
.project(fileSchema).entries(),
file -> (GenericManifestEntry<DeleteFile>) file);
}

// Project non-readable fields
Schema readSchema = ManifestEntry.wrapFileSchema(fileSchema.asStruct());
StructProjection projection = StructProjection.create(readSchema, schema);
return CloseableIterable.transform(prunedRows, projection::wrap);
}

@Override
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/ManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ protected DataTask task(TableScan scan) {
return StaticDataTask.of(
ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()),
scan.snapshot().allManifests(),
manifest -> ManifestsTable.manifestFileToRow(spec, manifest));
manifest -> ManifestsTable.manifestFileToRow(spec, manifest),
schema(),
scan.schema());
}

private class ManifestsTableScan extends StaticTableScan {
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,19 @@ private DataTask task(StaticTableScan scan) {
Iterable<Partition> partitions = partitions(scan);
if (table().spec().fields().size() < 1) {
// the table is unpartitioned, partitions contains only the root partition
return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
root -> StaticDataTask.Row.of(root.recordCount, root.fileCount));
return StaticDataTask.of(
io().newInputFile(ops.current().metadataFileLocation()),
partitions,
root -> StaticDataTask.Row.of(root.recordCount, root.fileCount),
schema(),
scan.schema());
} else {
return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
PartitionsTable::convertPartition);
return StaticDataTask.of(
io().newInputFile(ops.current().metadataFileLocation()),
partitions,
PartitionsTable::convertPartition,
schema(),
scan.schema());
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ private DataTask task(BaseTableScan scan) {
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
ops.current().snapshots(),
SnapshotsTable::snapshotToRow);
SnapshotsTable::snapshotToRow,
schema(),
scan.schema());
}

@Override
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/java/org/apache/iceberg/StaticDataTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,26 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructProjection;

class StaticDataTask implements DataTask {

static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform) {
static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform,
Schema original, Schema projected) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm not sure if it's just me, but I'd normally place lambda function arguments at the end of the list. Since this is internal, we can move these just after InputFile.

Also, is original always the table schema? If so, maybe we should use tableSchema instead?

return new StaticDataTask(metadata,
Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]));
Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]),
original,
projected);
}

private final DataFile metadataFile;
private final StructLike[] rows;
private final Schema original;
private final Schema projectedSchema;

private StaticDataTask(InputFile metadata, StructLike[] rows) {
private StaticDataTask(InputFile metadata, StructLike[] rows, Schema original, Schema projectedSchema) {
this.original = original;
this.projectedSchema = projectedSchema;
this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(metadata)
.withRecordCount(rows.length)
Expand All @@ -57,7 +65,9 @@ public List<DeleteFile> deletes() {

@Override
public CloseableIterable<StructLike> rows() {
return CloseableIterable.withNoopClose(Arrays.asList(rows));
StructProjection projection = StructProjection.create(original, projectedSchema);
Iterable<StructLike> projectedRows = Iterables.transform(Arrays.asList(rows), projection::wrap);
return CloseableIterable.withNoopClose(projectedRows);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,28 +186,27 @@ private CloseableIterable<InternalRow> newOrcIterable(
}

private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
StructInternalRow row = new StructInternalRow(readSchema.asStruct());
CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
task.asDataTask().rows(), row::setStruct);
return CloseableIterable.transform(
asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
return asSparkRows;
}

private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
StructType struct = SparkSchemaUtil.convert(readSchema);
StructType readStruct = SparkSchemaUtil.convert(readSchema);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed these variables because too many things were called "struct, or ref" and I was getting confused which was which

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes still needed?


List<AttributeReference> refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
List<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
List<AttributeReference> readReferences = JavaConverters.seqAsJavaListConverter(readStruct.toAttributes()).asJava();
List<Attribute> attrs = Lists.newArrayListWithExpectedSize(readStruct.fields().length);
List<org.apache.spark.sql.catalyst.expressions.Expression> exprs =
Lists.newArrayListWithExpectedSize(struct.fields().length);
Lists.newArrayListWithExpectedSize(readStruct.fields().length);

for (AttributeReference ref : refs) {
for (AttributeReference ref : readReferences) {
attrs.add(ref.toAttribute());
}

for (Types.NestedField field : finalSchema.columns()) {
int indexInReadSchema = struct.fieldIndex(field.name());
exprs.add(refs.get(indexInReadSchema));
int indexInReadSchema = readStruct.fieldIndex(field.name());
exprs.add(readReferences.get(indexInReadSchema));
}

return UnsafeProjection.create(
Expand Down
Loading