Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -144,14 +145,22 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private Schema lazySchema() {
if (schema == null) {
if (requestedSchema != null) {
this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema);
// the projection should include all columns that will be returned, including those only used in filters
this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, filterExpression(), caseSensitive);
} else {
this.schema = table.schema();
}
}
return schema;
}

private Expression filterExpression() {
if (filterExpressions != null) {
return filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
}
return Expressions.alwaysTrue();
}

private StructType lazyType() {
if (type == null) {
this.type = SparkSchemaUtil.convert(lazySchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,51 +95,33 @@ Iterator<InternalRow> open(FileScanTask task) {
InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());

// schema or rows returned by readers
Schema finalSchema = expectedSchema;
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();

// schema needed for the projection and filtering
StructType sparkType = SparkSchemaUtil.convert(finalSchema);
Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();

Schema iterSchema;
Iterator<InternalRow> iter;

if (hasJoinedPartitionColumns) {
if (projectsIdentityPartitionColumns) {
if (SUPPORTS_CONSTANTS.contains(file.format())) {
iterSchema = requiredSchema;
iter = open(task, requiredSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant));
} else {
// schema used to read data files
Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
JoinedRow joined = new JoinedRow();
return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant));
}

InternalRow partition = convertToRow.apply(file.partition());
joined.withRight(partition);
// schema used to read data files
Schema readSchema = TypeUtil.selectNot(expectedSchema, idColumns);
PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
JoinedRow joined = new JoinedRow();

// create joined rows and project from the joined schema to the final schema
iterSchema = TypeUtil.join(readSchema, partitionSchema);
iter = Iterators.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft);
}
} else if (hasExtraFilterColumns) {
// add projection to the final schema
iterSchema = requiredSchema;
iter = open(task, requiredSchema, ImmutableMap.of());
} else {
// return the base iterator
iterSchema = finalSchema;
iter = open(task, finalSchema, ImmutableMap.of());
// create joined rows and project from the joined schema to the final schema
Schema joinedSchema = TypeUtil.join(readSchema, partitionSchema);
InternalRow partition = convertToRow.apply(file.partition());
joined.withRight(partition);

return Iterators.transform(
Iterators.transform(open(task, readSchema, ImmutableMap.of()), joined::withLeft),
APPLY_PROJECTION.bind(projection(expectedSchema, joinedSchema))::invoke);
}

// TODO: remove the projection by reporting the iterator's schema back to Spark
return Iterators.transform(
iter,
APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
// return the base iterator
return open(task, expectedSchema, ImmutableMap.of());
}

private Iterator<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
Expand Down