Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,17 +555,25 @@ PartitionSpec buildUnchecked() {
static void checkCompatibility(PartitionSpec spec, Schema schema) {
for (PartitionField field : spec.fields) {
Type sourceType = schema.findType(field.sourceId());
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
sourceType);
ValidationException.check(
field.transform().canTransform(sourceType),
"Invalid source type %s for transform: %s",
sourceType,
field.transform());
Transform<?, ?> transform = field.transform();
// In the case of a Version 1 partition-spec field gets deleted,
// it is replaced with a void transform, see:
// https://iceberg.apache.org/spec/#partition-transforms
// We don't care about the source type since a VoidTransform is always compatible and skip the
// checks
if (!transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
sourceType);
ValidationException.check(
transform.canTransform(sourceType),
"Invalid source type %s for transform: %s",
sourceType,
transform);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,12 @@ static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node)
// parse the spec array
ImmutableList.Builder<PartitionSpec> builder = ImmutableList.builder();
for (JsonNode spec : specArray) {
builder.add(PartitionSpecParser.fromJson(schema, spec));
UnboundPartitionSpec unboundSpec = PartitionSpecParser.fromJson(spec);
if (unboundSpec.specId() == defaultSpecId) {
builder.add(unboundSpec.bind(schema));
Comment on lines +391 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] as per my understanding, I think this might not work with v1 partition spec considering we still have void transform for dropped partitions, which would be holding reference to dropped source field.

Should we update PartitionSpec#checkCompatibility to handle void transforms. Thoughts ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also should we add a unit test to TableMetadataParserTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 Can you elaborate a bit more? I'm not sure that I understand the issue.

For V1, using the spec instead of specs, nothing changes. Since it takes the other branch:

Preconditions.checkArgument(
formatVersion == 1, "%s must exist in format v%s", PARTITION_SPECS, formatVersion);
// partition spec is required for older readers, but is always set to the default if the spec
// array is set. it is only used to default the spec map is missing, indicating that the
// table metadata was written by an older writer.
defaultSpecId = TableMetadata.INITIAL_SPEC_ID;
specs =
ImmutableList.of(
PartitionSpecParser.fromJsonFields(
schema, TableMetadata.INITIAL_SPEC_ID, JsonUtil.get(PARTITION_SPEC, node)));

Copy link
Contributor

@singhpk234 singhpk234 Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for not being clear,
What I meant here was, mapping the current schema to the current spec might still cause issue as in case of tables of V1 format, (Considering the UT in this PR, last DDL) The current spec will have a void transform for day_of_ts since day_of_ts is dropped from partitioning (in V1 format, we replace this transform with a void transform). Now when we attempt to bind current partition spec with current schema, current schema will not have day_of_ts, but in PartitionSpec#checkCompatibility we will try to find schema.findType(field.sourceId()), from current schema for void transform of day_of_ts and it will fail, as we have removed day_of_ts from current schema.

A sample UT for repro (modified from this PR) :

  @Test
  public void testDropColumnOfOldPartitionField() {
    // default table created in v1 format
    sql(
        "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
        tableName);

    sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);
    sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
  }

Copy link
Contributor Author

@Fokko Fokko Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 Thanks for the thorough explanation. I wasn't aware of the replacement by a void transform, and it indeed has a similar issue:

image

Thanks for raising this, and let me think of a solution. Do you know the historical reason to replace it with a void transform?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko , This explanation from @rdblue, nicely explains the motivation behind introducing void transforms in v1 format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for linking @singhpk234, and that is indeed a very clear explanation and it makes a lot of sense.

I've added a condition to skip the validation when we encounter a VoidTransform. Since it is almost always compatible, I think we should be okay with that, but curious to hear what others think.

} else {
builder.add(unboundSpec.bindUnchecked(schema));
}
}
specs = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,31 @@ public void testSparkTableAddDropPartitions() throws Exception {
"spark table partition should be empty", 0, sparkTable().partitioning().length);
}

@Test
public void testDropColumnOfOldPartitionFieldV1() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right place to test this. There are partition spec evolution tests in core, which is a much better place than here.

// default table created in v1 format
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we explicitly set format-version=1, in case the default changes in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion, added! 👍🏻

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, it doesn't look like this change was pushed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was working on a follow-up to fix the other issue in the read-path. I waited with pushing until I had a proper fix for this but turns out that it is a bit more complicated than I originally anticipated. I've created a new PR #5907

tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also include a sql to read back the data from the table after the drop? We have noticed the same issue in Trino, but for us the drop column still succeeds, it's the subsequent read operations that start failing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @marton-bod, the ALTER TABLE threw an exception before, but you are right that subsequent reads are also failing. Digging into this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, Interesting. This is what I assumed would happen and went with spec change (disruptive change)

@marton-bod: I have tagged you in one of the slack discussions. Where I proposed spec change to handle this.
If Fokko doesn't find an easy way to fix this, we can discuss my approach after his experiment.

}

@Test
public void testDropColumnOfOldPartitionFieldV2() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2');", tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

private void assertPartitioningEquals(SparkTable table, int len, String transform) {
Assert.assertEquals("spark table partition should be " + len, len, table.partitioning().length);
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,31 @@ public void testSparkTableAddDropPartitions() throws Exception {
"spark table partition should be empty", 0, sparkTable().partitioning().length);
}

@Test
public void testDropColumnOfOldPartitionFieldV1() {
// default table created in v1 format
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

@Test
public void testDropColumnOfOldPartitionFieldV2() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2');", tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

private void assertPartitioningEquals(SparkTable table, int len, String transform) {
Assert.assertEquals("spark table partition should be " + len, len, table.partitioning().length);
Assert.assertEquals(
Expand Down