Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ public interface DataFile extends ContentFile<DataFile> {
Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()),
"Equality comparison field IDs");
Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update the spec since we are assigning a new field ID here. We should at least note that it is reserved, even though we don't write it into data files. We can do that in a follow-up.


int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 141
// NEXT ID TO ASSIGN: 142

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
return StructType.of(
CONTENT,
FILE_PATH,
FILE_FORMAT,
SPEC_ID,
required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
RECORD_COUNT,
FILE_SIZE,
Expand Down
58 changes: 32 additions & 26 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,45 +235,48 @@ public void put(int i, Object value) {
this.format = FileFormat.valueOf(value.toString());
return;
case 3:
this.partitionData = (PartitionData) value;
this.partitionSpecId = (value != null) ? (Integer) value : -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho @RussellSpitzer @rdblue @openinx FYI, shifting order is a breaking change that caused Flink failing to restore from checkpoint. It is a not big deal for us this time as we are still in testing phase. I just like to call out that we need to be more careful in the future.

java.lang.ClassCastException: class org.apache.iceberg.PartitionData cannot be cast to class java.lang.Integer (org.apache.iceberg.PartitionData is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3e063fd4; java.lang.Integer is in module java.base of loader 'bootstrap')
	at org.apache.iceberg.BaseFile.put(BaseFile.java:238)
	at org.apache.iceberg.avro.ValueReaders$IndexedRecordReader.set(ValueReaders.java:746)
	at org.apache.iceberg.avro.ValueReaders$IndexedRecordReader.set(ValueReaders.java:715)
	at org.apache.iceberg.avro.ValueReaders$StructReader.read(ValueReaders.java:669)
	at org.apache.iceberg.avro.ValueReaders$StructReader.read(ValueReaders.java:669)
	at org.apache.iceberg.data.avro.DecoderResolver.resolveAndRead(DecoderResolver.java:48)
	at org.apache.iceberg.avro.GenericAvroReader.read(GenericAvroReader.java:69)
	at org.apache.iceberg.avro.ProjectionDatumReader.read(ProjectionDatumReader.java:74)
	at org.apache.avro.file.DataFileStream.next(DataFileStream.java:250)
	at org.apache.iceberg.avro.AvroIterable$AvroReuseIterator.next(AvroIterable.java:202)
	at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113)
	at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:66)
	at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
	at org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
	at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:355)
	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:143)
	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:130)
	at org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:60)
	at org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:105)
	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:212)
	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:156)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.base/java.lang.Thread.run(Thread.java:829)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I did warn against it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, it's good that we caught this. How did it happen? Was Flink relying on a specific position?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink Iceberg sink checkpoints the manifest file. After upgrading to the latest Iceberg master branch, Flink job can't restore the checkpoint due to this Avro schema position change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, I understood the failure scenario. I'm wondering where Flink is relying on position. That sounds like a Flink bug and we should make sure we fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue FlinkManifestUtil calls ManifestReader (from core module) to read the manifest Avro file. Now the BaseFile (also an Avro IndexedRecord) changed the order of fields, which breaks the Avro read path. I am not sure this is a Flink bug.

Copy link
Member Author

@szehon-ho szehon-ho Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi guys sorry I'm out of town and may reply slowly. @stevenzwu i am curious did you find a fix? I'm not sure i understand the complete problem, but this change should not modify the serialized form of the metadata file if its saved in Flink checkpoint as specId is a derived field, (if that is the concern). See VXMetadata.java controlling serialization format, which is not changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is definitely a Flink bug. Avro can handle schema evolution. You just need to keep track of the write schema and the read schema. My guess is that it is not correctly tracking the write schema used and so you get incorrect results at read time. Where is the write schema tracked for Flink state?

return;
case 4:
this.recordCount = (Long) value;
this.partitionData = (PartitionData) value;
return;
case 5:
this.fileSizeInBytes = (Long) value;
this.recordCount = (Long) value;
return;
case 6:
this.columnSizes = (Map<Integer, Long>) value;
this.fileSizeInBytes = (Long) value;
return;
case 7:
this.valueCounts = (Map<Integer, Long>) value;
this.columnSizes = (Map<Integer, Long>) value;
return;
case 8:
this.nullValueCounts = (Map<Integer, Long>) value;
this.valueCounts = (Map<Integer, Long>) value;
return;
case 9:
this.nanValueCounts = (Map<Integer, Long>) value;
this.nullValueCounts = (Map<Integer, Long>) value;
return;
case 10:
this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
this.nanValueCounts = (Map<Integer, Long>) value;
return;
case 11:
this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 12:
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 13:
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
return;
case 14:
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
return;
case 15:
this.sortOrderId = (Integer) value;
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 16:
this.sortOrderId = (Integer) value;
return;
case 17:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -301,32 +304,34 @@ public Object get(int i) {
case 2:
return format != null ? format.toString() : null;
case 3:
return partitionData;
return partitionSpecId;
case 4:
return recordCount;
return partitionData;
case 5:
return fileSizeInBytes;
return recordCount;
case 6:
return columnSizes;
return fileSizeInBytes;
case 7:
return valueCounts;
return columnSizes;
case 8:
return nullValueCounts;
return valueCounts;
case 9:
return nanValueCounts;
return nullValueCounts;
case 10:
return lowerBounds;
return nanValueCounts;
case 11:
return upperBounds;
return lowerBounds;
case 12:
return keyMetadata();
return upperBounds;
case 13:
return splitOffsets();
return keyMetadata();
case 14:
return equalityFieldIds();
return splitOffsets();
case 15:
return sortOrderId;
return equalityFieldIds();
case 16:
return sortOrderId;
case 17:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is a bug from https://github.com/apache/iceberg/pull/1723/files. I think this should return fileOrdinal and not pos!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea you are right, I can file issue and take a look at it after this change is in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return pos;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
Expand Down Expand Up @@ -442,6 +447,7 @@ public String toString() {
.add("content", content.toString().toLowerCase(Locale.ROOT))
.add("file_path", filePath)
.add("file_format", format)
.add("spec_id", specId())
.add("partition", partitionData)
.add("record_count", recordCount)
.add("file_size_in_bytes", fileSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testAllEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -386,7 +386,7 @@ public void testFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -442,7 +442,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception {
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -549,7 +549,7 @@ public void testFilesUnpartitionedTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -646,7 +646,7 @@ public void testAllDataFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -1411,4 +1411,9 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException {

Assert.assertEquals("Rows must match", records, actualRecords);
}

private void asMetadataRecord(GenericData.Record file) {
file.put(0, FileContent.DATA.id());
file.put(3, 0); // specId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private Set<String> extractFilePathsMatchingConditionOnPartition(List<Row> files
// idx 1: file_path, idx 3: partition
return files.stream()
.filter(r -> {
Row partition = r.getStruct(3);
Row partition = r.getStruct(4);
return condition.test(partition);
}).map(r -> CountOpenLocalFileSystem.stripScheme(r.getString(1)))
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.AssertHelpers;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void testEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -298,7 +299,7 @@ public void testAllEntriesTable() throws Exception {
rows.forEach(row -> {
row.put(2, 0L);
GenericData.Record file = (GenericData.Record) row.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(row);
});
}
Expand Down Expand Up @@ -373,7 +374,7 @@ public void testFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -429,7 +430,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception {
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -536,7 +537,7 @@ public void testFilesUnpartitionedTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -633,7 +634,7 @@ public void testAllDataFilesTable() throws Exception {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
expected.add(file);
}
}
Expand Down Expand Up @@ -1181,4 +1182,44 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException {

Assert.assertEquals("Rows must match", records, actualRecords);
}

@Test
public void testFilesTablePartitionId() throws Exception {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
int spec0 = table.spec().specId();

Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);

df1.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(loadLocation(tableIdentifier));

// change partition spec
table.refresh();
table.updateSpec().removeField("id").commit();
int spec1 = table.spec().specId();

// add a second file
df2.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(loadLocation(tableIdentifier));

List<Integer> actual = spark.read()
.format("iceberg")
.load(loadLocation(tableIdentifier, "files"))
.sort(DataFile.SPEC_ID.name())
.collectAsList()
.stream().map(r -> (Integer) r.getAs(DataFile.SPEC_ID.name())).collect(Collectors.toList());

Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual);
}

private void asMetadataRecord(GenericData.Record file) {
file.put(0, FileContent.DATA.id());
file.put(3, 0); // specId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private Set<String> extractFilePathsMatchingConditionOnPartition(List<Row> files
// idx 1: file_path, idx 3: partition
return files.stream()
.filter(r -> {
Row partition = r.getStruct(3);
Row partition = r.getStruct(4);
return condition.test(partition);
}).map(r -> CountOpenLocalFileSystem.stripScheme(r.getString(1)))
.collect(Collectors.toSet());
Expand Down
Loading