Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -30,6 +31,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

Expand Down Expand Up @@ -118,6 +120,7 @@ protected CloseableIterable<FileScanTask> planFiles(
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());

return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> {
if (snap.manifestListLocation() != null) {
Expand All @@ -128,14 +131,14 @@ protected CloseableIterable<FileScanTask> planFiles(
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask(
return new ManifestListReadTask(ops.io(), schema(), specs, new BaseFileScanTask(
manifestListAsDataFile, null,
schemaString, specString, residuals));
} else {
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(),
manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest)
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest)
);
}
}));
Expand All @@ -145,13 +148,13 @@ MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(),
static class ManifestListReadTask implements DataTask {
private final FileIO io;
private final Schema schema;
private final PartitionSpec spec;
private final Map<Integer, PartitionSpec> specs;
private final FileScanTask manifestListTask;

ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) {
ManifestListReadTask(FileIO io, Schema schema, Map<Integer, PartitionSpec> specs, FileScanTask manifestListTask) {
this.io = io;
this.schema = schema;
this.spec = spec;
this.specs = specs;
this.manifestListTask = manifestListTask;
}

Expand All @@ -173,7 +176,7 @@ public CloseableIterable<StructLike> rows() {
.build()) {

CloseableIterable<StructLike> rowIterable = CloseableIterable.transform(manifests,
manifest -> ManifestsTable.manifestFileToRow(spec, manifest));
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest));

StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
return CloseableIterable.transform(rowIterable, projection::wrap);
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/java/org/apache/iceberg/ManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;

Expand All @@ -44,15 +46,12 @@ public class ManifestsTable extends BaseMetadataTable {
)))
);

private final PartitionSpec spec;

ManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".manifests");
}

ManifestsTable(TableOperations ops, Table table, String name) {
super(ops, table, name);
this.spec = table.spec();
}

@Override
Expand All @@ -73,10 +72,15 @@ MetadataTableType metadataTableType() {
protected DataTask task(TableScan scan) {
TableOperations ops = operations();
String location = scan.snapshot().manifestListLocation();
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());

return StaticDataTask.of(
ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()),
schema(), scan.schema(), scan.snapshot().allManifests(),
manifest -> ManifestsTable.manifestFileToRow(spec, manifest)
manifest -> {
PartitionSpec spec = specs.get(manifest.partitionSpecId());
return ManifestsTable.manifestFileToRow(spec, manifest);
}
);
}

Expand Down
25 changes: 20 additions & 5 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

Expand Down Expand Up @@ -210,7 +211,8 @@ public static StructType partitionType(Table table) {
}

Map<Integer, PartitionField> fieldMap = Maps.newHashMap();
List<NestedField> structFields = Lists.newArrayList();
Map<Integer, Type> typeMap = Maps.newHashMap();
Map<Integer, String> nameMap = Maps.newHashMap();

// sort the spec IDs in descending order to pick up the most recent field names
List<Integer> specIds = table.specs().keySet().stream()
Expand All @@ -222,27 +224,40 @@ public static StructType partitionType(Table table) {

for (PartitionField field : spec.fields()) {
int fieldId = field.fieldId();
NestedField structField = spec.partitionType().field(fieldId);
PartitionField existingField = fieldMap.get(fieldId);

if (existingField == null) {
fieldMap.put(fieldId, field);
NestedField structField = spec.partitionType().field(fieldId);
structFields.add(structField);
typeMap.put(fieldId, structField.type());
nameMap.put(fieldId, structField.name());

} else {
// verify the fields are compatible as they may conflict in v1 tables
ValidationException.check(equivalentIgnoringNames(field, existingField),
"Conflicting partition fields: ['%s', '%s']",
field, existingField);

// use the correct type for dropped partitions in v1 tables
if (isVoidTransform(existingField) && !isVoidTransform(field)) {
fieldMap.put(fieldId, field);
typeMap.put(fieldId, structField.type());
}
}
}
}

List<NestedField> sortedStructFields = structFields.stream()
.sorted(Comparator.comparingInt(NestedField::fieldId))
List<NestedField> sortedStructFields = fieldMap.keySet().stream()
.sorted(Comparator.naturalOrder())
.map(fieldId -> NestedField.optional(fieldId, nameMap.get(fieldId), typeMap.get(fieldId)))
.collect(Collectors.toList());
return StructType.of(sortedStructFields);
}

private static boolean isVoidTransform(PartitionField field) {
return field.transform().equals(Transforms.alwaysNull());
}

private static List<Transform<?, ?>> collectUnknownTransforms(Table table) {
List<Transform<?, ?>> unknownTransforms = Lists.newArrayList();

Expand Down
Loading