Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 76 additions & 60 deletions core/src/main/java/org/apache/iceberg/BaseFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
Expand All @@ -35,6 +33,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

/** Base class logic for files metadata tables */
Expand All @@ -46,15 +45,37 @@ abstract class BaseFilesTable extends BaseMetadataTable {

@Override
public Schema schema() {
StructType partitionType = Partitioning.partitionType(table());
// avoid returning an empty struct, which is not always supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit lost with all the schemas here. Let me see if we can simplify this block.

// instead, drop the partition field
boolean dropPartitionColumnForUnpartitioned = true;
return schemaInternal(table(), dropPartitionColumnForUnpartitioned);
}

private static Schema schemaInternal(Table table, boolean dropPartitionColumnForUnpartitioned) {
StructType partitionType = Partitioning.partitionType(table);
Schema schema = new Schema(DataFile.getType(partitionType).fields());
if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported.
// instead, drop the partition field
if (dropPartitionColumnForUnpartitioned && partitionType.fields().isEmpty()) {
schema = TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
}

return TypeUtil.join(schema, MetricsUtil.readableMetricsSchema(table().schema(), schema));
return withDerivedColumns(table.schema(), schema);
}

private static Schema withDerivedColumns(Schema baseTableSchema, Schema meteTableSchema) {
Schema metadataTableSchema =
TypeUtil.join(meteTableSchema, dataSequenceNumberSchema(meteTableSchema));

return TypeUtil.join(
metadataTableSchema,
MetricsUtil.readableMetricsSchema(baseTableSchema, metadataTableSchema));
}

private static Schema dataSequenceNumberSchema(Schema schema) {
return new Schema(
NestedField.optional(
schema.highestFieldId() + 1,
MetadataTableUtils.DATA_SEQUENCE_NUMBER,
Types.LongType.get()));
}

private static CloseableIterable<FileScanTask> planFiles(
Expand Down Expand Up @@ -85,11 +106,20 @@ private static CloseableIterable<FileScanTask> planFiles(
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

// metadata schema will represent the files schema and indifferent to if table is partitioned
Schema metadataSchema = schemaInternal(table, false);

return CloseableIterable.transform(
filteredManifests,
manifest ->
new ManifestReadTask(
table, manifest, projectedSchema, schemaString, specString, residuals));
table,
manifest,
metadataSchema,
projectedSchema,
schemaString,
specString,
residuals));
}

abstract static class BaseFilesTableScan extends BaseMetadataTableScan {
Expand Down Expand Up @@ -138,16 +168,19 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
private final Schema dataTableSchema;
private final Schema metadataSchema;
private final Schema projection;

ManifestReadTask(
Table table,
ManifestFile manifest,
Schema metadataSchema,
Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.metadataSchema = metadataSchema;
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
Expand All @@ -157,47 +190,52 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {

@Override
public CloseableIterable<StructLike> rows() {
Types.NestedField readableMetricsField = projection.findField(MetricsUtil.READABLE_METRICS);

if (readableMetricsField == null) {
return CloseableIterable.transform(files(projection), file -> (StructLike) file);
if (projectionWithComputedColumns(projection)) {
return CloseableIterable.transform(files(), this::withComputedColumns);
} else {

Schema actualProjection = projectionForReadableMetrics(projection, readableMetricsField);
return CloseableIterable.transform(
files(actualProjection), f -> withReadableMetrics(f, readableMetricsField));
return CloseableIterable.transform(files(projection), file -> (StructLike) file);
}
}

private boolean projectionWithComputedColumns(Schema requestedProjection) {
Types.NestedField readableMetricsField =
requestedProjection.findField(MetricsUtil.READABLE_METRICS);
Types.NestedField dataSequenceNumberField =
requestedProjection.findField(MetadataTableUtils.DATA_SEQUENCE_NUMBER);
return readableMetricsField != null || dataSequenceNumberField != null;
}

private CloseableIterable<? extends ContentFile<?>> files(Schema fileProjection) {
switch (manifest.content()) {
case DATA:
return ManifestFiles.read(manifest, io, specsById).project(fileProjection);
case DELETES:
return ManifestFiles.readDeleteManifest(manifest, io, specsById).project(fileProjection);
default:
throw new IllegalArgumentException(
"Unsupported manifest content type:" + manifest.content());
}
return ManifestFiles.open(manifest, io, specsById).project(fileProjection);
}

private CloseableIterable<? extends ContentFile<?>> files() {
return ManifestFiles.open(manifest, io, specsById);
}

/**
* Given content file metadata, append a 'readable_metrics' column that return the file's
* metrics in human-readable form.
* Given a content file metadata, append computed columns
*
* <ul>
* <li>readable_metrics: file's metrics in human-readable form
* <li>data_sequence_number: data sequence number assigned to file on commit
* </ul>
*
* @param file content file metadata
* @param readableMetricsField projected "readable_metrics" field
* @return struct representing content file, with appended readable_metrics field
* @return result content with appended computed columns
*/
private StructLike withReadableMetrics(
ContentFile<?> file, Types.NestedField readableMetricsField) {
int structSize = projection.columns().size();
MetricsUtil.ReadableMetricsStruct readableMetrics =
readableMetrics(file, readableMetricsField);
int metricsPosition = projection.columns().indexOf(readableMetricsField);

return new MetricsUtil.StructWithReadableMetrics(
(StructLike) file, structSize, readableMetrics, metricsPosition);
private StructLike withComputedColumns(ContentFile<?> file) {
Types.NestedField readableMetricsField = projection.findField(MetricsUtil.READABLE_METRICS);
StructLike readAbleMetricsStruct =
readableMetricsField == null
? EmptyStructLike.get()
: readableMetrics(file, readableMetricsField);
return new MetadataTableUtils.StructWithComputedColumns(
metadataSchema,
projection,
(StructLike) file,
file.dataSequenceNumber(),
readAbleMetricsStruct);
}

private MetricsUtil.ReadableMetricsStruct readableMetrics(
Expand All @@ -206,28 +244,6 @@ private MetricsUtil.ReadableMetricsStruct readableMetrics(
return MetricsUtil.readableMetricsStruct(dataTableSchema, file, projectedMetricType);
}

/**
* Create a projection on content files metadata by removing virtual 'readable_column' and
* ensuring that the underlying metrics used to create that column are part of the final
* projection.
*
* @param requestedProjection requested projection
* @param readableMetricsField readable_metrics field
* @return actual projection to be used
*/
private Schema projectionForReadableMetrics(
Schema requestedProjection, Types.NestedField readableMetricsField) {
Set<Integer> readableMetricsIds = TypeUtil.getProjectedIds(readableMetricsField.type());
Schema realProjection = TypeUtil.selectNot(requestedProjection, readableMetricsIds);

Schema requiredMetricsColumns =
new Schema(
MetricsUtil.READABLE_METRIC_COLS.stream()
.map(MetricsUtil.ReadableMetricColDefinition::originalCol)
.collect(Collectors.toList()));
return TypeUtil.join(realProjection, requiredMetricsColumns);
}

@Override
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
Expand Down
68 changes: 68 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@
package org.apache.iceberg;

import java.util.Locale;
import java.util.Set;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;

public class MetadataTableUtils {
static final String DATA_SEQUENCE_NUMBER = "data_sequence_number";

public static final Set<String> DERIVED_FIELDS =
Sets.newHashSet(MetricsUtil.READABLE_METRICS, DATA_SEQUENCE_NUMBER);

private MetadataTableUtils() {}

public static boolean hasMetadataTableName(TableIdentifier identifier) {
Expand Down Expand Up @@ -109,4 +118,63 @@ public static Table createMetadataTableInstance(
private static String metadataTableName(String tableName, MetadataTableType type) {
return tableName + (tableName.contains("/") ? "#" : ".") + type.name().toLowerCase(Locale.ROOT);
}

static class StructWithComputedColumns implements StructLike {
private final StructLike struct;
private final int projectionColumnCount;
private final int dataSequenceNumberPosition;
private final Long dataSequenceNumber;
private final int metricsPosition;
private final StructLike readableMetricsStruct;

private final int[] positionMap;

StructWithComputedColumns(
Schema base,
Schema projection,
StructLike struct,
Long dataSequenceNumber,
StructLike readableMetrics) {
this.projectionColumnCount = projection.columns().size();
this.positionMap = new int[this.projectionColumnCount];
// build projection map
for (Types.NestedField field : projection.asStruct().fields()) {
int projectPosition = projection.columns().indexOf(field);
int basePosition = base.columns().indexOf(base.findField(field.fieldId()));
Preconditions.checkArgument(
projectPosition >= 0, "Cannot find %s in projection", field.name());
Preconditions.checkArgument(basePosition >= 0, "Cannot find %s in base", field.name());
positionMap[projectPosition] = basePosition;
}
this.struct = struct;
this.dataSequenceNumberPosition =
projection.columns().indexOf(projection.findField(DATA_SEQUENCE_NUMBER));
this.dataSequenceNumber = dataSequenceNumber;
this.metricsPosition =
projection.columns().indexOf(projection.findField(MetricsUtil.READABLE_METRICS));
this.readableMetricsStruct = readableMetrics;
}

@Override
public int size() {
return projectionColumnCount;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
if (pos == dataSequenceNumberPosition) {
return javaClass.cast(dataSequenceNumber);
} else if (pos == metricsPosition) {
return javaClass.cast(readableMetricsStruct);
} else {
int structPosition = positionMap[pos];
return struct.get(structPosition, javaClass);
}
}

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("StructWithComputedColumns is read only");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -285,7 +284,7 @@ public void testUnPartitionedTable() throws IOException {
List<String> deleteColumns =
deleteFilesTableSchema.columns().stream()
.map(Types.NestedField::name)
.filter(c -> !c.equals(MetricsUtil.READABLE_METRICS))
.filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c))
.collect(Collectors.toList());
String deleteNames =
deleteColumns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(","));
Expand All @@ -311,7 +310,7 @@ public void testUnPartitionedTable() throws IOException {
List<String> columns =
filesTableSchema.columns().stream()
.map(Types.NestedField::name)
.filter(c -> !c.equals(MetricsUtil.READABLE_METRICS))
.filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c))
.collect(Collectors.toList());
String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(","));

Expand Down Expand Up @@ -384,7 +383,7 @@ public void testPartitionedTable() throws Exception {
List<String> columns =
filesTableSchema.columns().stream()
.map(Types.NestedField::name)
.filter(c -> !c.equals(MetricsUtil.READABLE_METRICS))
.filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c))
.collect(Collectors.toList());
String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(","));

Expand Down Expand Up @@ -470,7 +469,7 @@ public void testAllFilesUnpartitioned() throws Exception {
List<String> columns =
filesTableSchema.columns().stream()
.map(Types.NestedField::name)
.filter(c -> !c.equals(MetricsUtil.READABLE_METRICS))
.filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c))
.collect(Collectors.toList());
String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(","));

Expand Down Expand Up @@ -553,7 +552,7 @@ public void testAllFilesPartitioned() throws Exception {
List<String> columns =
filesTableSchema.columns().stream()
.map(Types.NestedField::name)
.filter(c -> !c.equals(MetricsUtil.READABLE_METRICS))
.filter(c -> !MetadataTableUtils.DERIVED_FIELDS.contains(c))
.collect(Collectors.toList());
String names = columns.stream().map(n -> "`" + n + "`").collect(Collectors.joining(","));

Expand Down
Loading