Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -30,6 +31,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

Expand Down Expand Up @@ -118,6 +120,7 @@ protected CloseableIterable<FileScanTask> planFiles(
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());

return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> {
if (snap.manifestListLocation() != null) {
Expand All @@ -128,14 +131,14 @@ protected CloseableIterable<FileScanTask> planFiles(
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask(
return new ManifestListReadTask(ops.io(), schema(), specs, new BaseFileScanTask(
manifestListAsDataFile, null,
schemaString, specString, residuals));
} else {
return StaticDataTask.of(
ops.io().newInputFile(ops.current().metadataFileLocation()),
MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(),
manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest)
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest)
);
}
}));
Expand All @@ -145,13 +148,13 @@ MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(),
static class ManifestListReadTask implements DataTask {
private final FileIO io;
private final Schema schema;
private final PartitionSpec spec;
private final Map<Integer, PartitionSpec> specs;
private final FileScanTask manifestListTask;

ManifestListReadTask(FileIO io, Schema schema, PartitionSpec spec, FileScanTask manifestListTask) {
ManifestListReadTask(FileIO io, Schema schema, Map<Integer, PartitionSpec> specs, FileScanTask manifestListTask) {
this.io = io;
this.schema = schema;
this.spec = spec;
this.specs = specs;
this.manifestListTask = manifestListTask;
}

Expand All @@ -173,7 +176,7 @@ public CloseableIterable<StructLike> rows() {
.build()) {

CloseableIterable<StructLike> rowIterable = CloseableIterable.transform(manifests,
manifest -> ManifestsTable.manifestFileToRow(spec, manifest));
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest));

StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
return CloseableIterable.transform(rowIterable, projection::wrap);
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/java/org/apache/iceberg/ManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;

Expand All @@ -44,15 +46,12 @@ public class ManifestsTable extends BaseMetadataTable {
)))
);

private final PartitionSpec spec;

ManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".manifests");
}

ManifestsTable(TableOperations ops, Table table, String name) {
super(ops, table, name);
this.spec = table.spec();
}

@Override
Expand All @@ -73,10 +72,15 @@ MetadataTableType metadataTableType() {
protected DataTask task(TableScan scan) {
TableOperations ops = operations();
String location = scan.snapshot().manifestListLocation();
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());

return StaticDataTask.of(
ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()),
schema(), scan.schema(), scan.snapshot().allManifests(),
manifest -> ManifestsTable.manifestFileToRow(spec, manifest)
manifest -> {
PartitionSpec spec = specs.get(manifest.partitionSpecId());
return ManifestsTable.manifestFileToRow(spec, manifest);
}
);
}

Expand Down
25 changes: 20 additions & 5 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

Expand Down Expand Up @@ -210,7 +211,8 @@ public static StructType partitionType(Table table) {
}

Map<Integer, PartitionField> fieldMap = Maps.newHashMap();
List<NestedField> structFields = Lists.newArrayList();
Map<Integer, Type> typeMap = Maps.newHashMap();
Map<Integer, String> nameMap = Maps.newHashMap();
Copy link
Contributor

@szlta szlta Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run into similar issues, and I think this will help resolve the type change of columns in older specs.
Another thing I have seen and is probably still a problem, is that this method may return a column name multiple times. Consider the following:
Table schema: a int, b date, c date
spec0: year(b), a
spec1: a
spec2: year(b), year(c)
then the result is something like this: 1000: b_year int, 1001: a int, 1002: b_year int, 1003: c_year int
Further down the line when we construct a Schema object, we will have a failure due to b_year name being present in two fields (1000, 1002).
How should this case be handled? Maybe appending _r [fieldId] to each column name?
cc: @RussellSpitzer , @aokolnychyi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szlta, Iceberg should rename older columns when it updates the spec to avoid conflicts. That's why we have to take the latest column names. I think we should have some tests in TestPartitioning. Could you check, @szlta?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi I think you're referring to what's happening in V1 tables. For those the spec is ever-growing in a way that no partition fields/transforms are removed, but rather converted to void.
The rename logic is there: https://github.com/apache/iceberg/blob/master/core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Ficeberg%2FBaseUpdatePartitionSpec.java#L179
but I think this is not used for V2 tables, as per https://github.com/apache/iceberg/blob/master/core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Ficeberg%2FBaseUpdatePartitionSpec.java#L261-L268

Since in V2, specs don't retain old deleted partition fields this rename is not required for normal operations.

The problem I'm describing only affects the metadata table queries, because for V2, due to the lack of above renames, Partitioning.partitionType() collects all partition fields from all previous specs too. With the lack of renames this can result in the same field name being present multiple times, and cause the PartitionsTable's (or DataFilesTable's) schema to be failed to get constructed.

Copy link
Contributor

@aokolnychyi aokolnychyi Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are right, @szlta. Here is an example to reproduce.

PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
    .identity("data")
    .build();
TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION);

table.updateSpec()
    .removeField("data")
    .commit();

table.updateSpec()
    .addField("data")
    .addField("id")
    .commit();

struct<1000: data: optional string, 1001: data: optional string, 1002: id: optional int>

While it would be great to update the logic that evolves the spec, I think we have to adapt the method that builds a common representation too. Otherwise, existing tables may be broken. Maintaining a set of used names and appending a suffix of the field ID sounds like a reasonable approach.

Thoughts, @rdblue @RussellSpitzer @szehon-ho @flyrain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are using field IDs while projecting values from the common representation in Spark.
At least, that part is working.

protected Map<Integer, StructProjection> buildPartitionProjections(Types.StructType partitionType,
                                                                   Map<Integer, PartitionSpec> specs) {
  Map<Integer, StructProjection> partitionProjections = Maps.newHashMap();
  specs.forEach((specID, spec) ->
      partitionProjections.put(specID, StructProjection.create(partitionType, spec.partitionType()))
  );
  return partitionProjections;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have two cases to solve this problem. The combined partition struct type is used in both metadata tables and when we need to project the _partition field for certain queries. For queries that use the _partition field, I think adding a suffix to make the names unique is the right approach. That's internal so it doesn't really matter what we rename to, as long as we get the projections that produce the partition tuple for a given spec right.

For metadata tables, we expect the field names to match the partition names. A simple example is that we use identity partitions using the original column name. So it would be weird to partition by category and need to query category_r1003 in the metadata table. I think that the right thing to do for metadata tables is to have a separate way to produce the combined struct that produces only one partition column per name.

In @aokolnychyi's example, 1001: data should be present and 1000: data should not be used for metadata tables.

This is also an area where we may want to bring back compatible partition columns. There's no reason why Anton's example couldn't detect that 1000: data was in an old spec and reuse the ID to avoid this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about partition transforms where the transform itself could change, but the field name remains the same?
E.g.: bucket(data, 10) would be data_bucket, and after dropping this from spec and re-adding with a new spec bucket(data, 8), id, then we would have 1000: data_bucket, 1001: data_bucket, 1002: id where the two data_bucket fields describe different things. I guess this is similar for truncate.

A partition in the old spec where data_bucket = 0 would be different from a partition in the new spec where data_bucket = 0

I agree with @rdblue that it's weird to have category_r1003 in queries, but if we want to avoid it, I see two ways of proceeding from metadata query perspective:
A metadata table should..

  1. ..either give back partition information as per the latest spec only..
  2. ..or we could combine the data returned for similarly named partition fields cascaded into just one field, but extend such tables with spec_id information in these cases.

So as per above example use case the partitions table would look like this for the two cases (with 1 partition in each spec):

1:
+---------------------------------+--------------------+------------------+
|         test.partition          | test.record_count  | test.file_count  |
+---------------------------------+--------------------+------------------+
| {data_bucket : 0, id : 1}       |          1         |        1         |
+---------------------------------+--------------------+------------------+

2:
+---------------------------------+--------------------+------------------+--------------+
|         test.partition          | test.record_count  | test.file_count  | test.spec_id |
+---------------------------------+--------------------+------------------+--------------+
| {data_bucket : 0, id : null}    |          1         |        1         |       0      |
| {data_bucket : 0, id : 1}       |          1         |        1         |       1      |
+---------------------------------+--------------------+------------------+--------------+

Note this is how it would be for V2 tables. For V1, due to the renaming we're already doing, the renamed fields would be present too (unless we aim at changing that too):

1:
+---------------------------------+--------------------+------------------+
|         test.partition          | test.record_count  | test.file_count  |
+---------------------------------+--------------------+------------------+
| {data_bucket : 0, id : 1}       |          1         |        1         |
+---------------------------------+--------------------+------------------+

2:
+----------------------------------------------------------+--------------------+------------------+--------------+
|         test.partition                                   | test.record_count  | test.file_count  | test.spec_id |
+----------------------------------------------------------+--------------------+------------------+--------------+
| {data_bucket_1000 : 0,    data_bucket : null, id : null} |          1         |        1         |       0      |
| {data_bucket_1000 : null, data_bucket: 0,     id : 1}    |          1         |        1         |       1      |
+----------------------------------------------------------+--------------------+------------------+--------------+

I'm personally more for solution 2, where we don't have to omit the old partitions but at the same time we get a nice and coherent partition info result. It's kind of in league what I'm proposing in issue #4292 (we could also continue the discussion of this problem there, I didn't mean to hijack @ConeyLiu 's PR like this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also up for solution 2, where we rename just the columns from older specs. That sounds like a reasonable solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll catch up today. Sorry for the delay!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the fixes in this PR look good other than a couple minor updates that are needed. We should definitely follow up with a PR that fixes the names as suggested by @szlta.


// sort the spec IDs in descending order to pick up the most recent field names
List<Integer> specIds = table.specs().keySet().stream()
Expand All @@ -222,27 +224,40 @@ public static StructType partitionType(Table table) {

for (PartitionField field : spec.fields()) {
int fieldId = field.fieldId();
NestedField structField = spec.partitionType().field(fieldId);
PartitionField existingField = fieldMap.get(fieldId);

if (existingField == null) {
fieldMap.put(fieldId, field);
NestedField structField = spec.partitionType().field(fieldId);
structFields.add(structField);
typeMap.put(fieldId, structField.type());
nameMap.put(fieldId, structField.name());

} else {
// verify the fields are compatible as they may conflict in v1 tables
ValidationException.check(equivalentIgnoringNames(field, existingField),
"Conflicting partition fields: ['%s', '%s']",
field, existingField);

// use the correct type for dropped partitions in v1 tables
if (isVoidTransform(existingField) && !isVoidTransform(field)) {
fieldMap.put(fieldId, field);
typeMap.put(fieldId, structField.type());
}
}
}
}

List<NestedField> sortedStructFields = structFields.stream()
.sorted(Comparator.comparingInt(NestedField::fieldId))
List<NestedField> sortedStructFields = fieldMap.keySet().stream()
.sorted(Comparator.naturalOrder())
.map(fieldId -> NestedField.optional(fieldId, nameMap.get(fieldId), typeMap.get(fieldId)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the changes in this file are correct and fix the first issue, where the type may be incorrect for void transform fields in v1 tables.

.collect(Collectors.toList());
return StructType.of(sortedStructFields);
}

private static boolean isVoidTransform(PartitionField field) {
return field.transform().equals(Transforms.alwaysNull());
}

private static List<Transform<?, ?>> collectUnknownTransforms(Table table) {
List<Transform<?, ?>> unknownTransforms = Lists.newArrayList();

Expand Down
Loading