Skip to content

Conversation

@szehon-ho
Copy link
Member

@szehon-ho szehon-ho commented Aug 24, 2021

@RussellSpitzer
Copy link
Member

@rymurr I think you were also interested in this

return sortOrderId;
case 16:
return partitionSpecId;
case 17:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is a bug from https://github.com/apache/iceberg/pull/1723/files. I think this should return fileOrdinal and not pos!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea you are right, I can file issue and take a look at it after this change is in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue
Copy link
Contributor

rdblue commented Sep 9, 2021

This looks good to me. My only concern is that this adds the column at the end of the record, rather than just before the partition field. It would be nice to colocate those two.

@szehon-ho
Copy link
Member Author

@rdblue , thanks for taking a look at it, I moved the field to be before the partition data, if you want to take another look.

Also note, I had to fix a test that was depending on getting a specific index from files table, so if users have this use-case they will break as well, but I suppose we can add to release notes.

@rdblue
Copy link
Contributor

rdblue commented Oct 21, 2021

Looks like there's a checkstyle failure:

[ERROR] /home/runner/work/iceberg/iceberg/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:26:1: Extra separation in import group before 'org.apache.avro.generic.GenericData' [ImportOrder]

@szehon-ho
Copy link
Member Author

Yea beat me to it, just fixed :)

@szehon-ho
Copy link
Member Author

rebased the patch, @RussellSpitzer @rdblue can you see if this patch is ok ?

There's a break in the schema order based on the review comment to co-locate it with partition field, but on other hand metadata tables are not yet documented: ref: #3159

if ((Integer) record.get("status") < 2 /* added or existing */) {
GenericData.Record file = (GenericData.Record) record.get("data_file");
file.put(0, FileContent.DATA.id());
asMetadataRecord(file);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good improvement to the readability of these tests

@RussellSpitzer
Copy link
Member

I would have kept the column at the end just so we don't break anyone's position based indexing (even if we didn't doc it) but it sounds like @rdblue would rather have it in the middle. I'm good to merge if we are sure on that

@rdblue
Copy link
Contributor

rdblue commented Oct 25, 2021

@RussellSpitzer what do you mean about breaking position-based indexing? I wouldn't expect anyone to be doing that... or they could continue to request a projection that matches what they used before. Is there a case where you think this is a risk? I think you should always expect the positions to match the schema that you requested, and there is no guarantee that the table schema won't change.

@RussellSpitzer
Copy link
Member

@RussellSpitzer what do you mean about breaking position-based indexing? I wouldn't expect anyone to be doing that... or they could continue to request a projection that matches what they used before. Is there a case where you think this is a risk? I think you should always expect the positions to match the schema that you requested, and there is no guarantee that the table schema won't change.

Yep, I just know some times people do silly things. I think logically there is no issue with changing the internal ordering.

Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()),
"Equality comparison field IDs");
Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update the spec since we are assigning a new field ID here. We should at least note that it is reserved, even though we don't write it into data files. We can do that in a follow-up.

@rdblue
Copy link
Contributor

rdblue commented Oct 25, 2021

Looks good to me. @szehon-ho, can you rebase this? Also, should we do this for just one Spark version and port to the others afterward? That seems like a simpler way to manage multiple versions.

@szehon-ho
Copy link
Member Author

@rdblue done. Not sure if its exactly what you meant, but I fixed the tests asserting the old position in the new subfolders (2.4 and 3.2).

@rdblue
Copy link
Contributor

rdblue commented Oct 27, 2021

Thanks, @szehon-ho!

@rdblue rdblue merged commit a3eadf6 into apache:master Oct 27, 2021
return;
case 3:
this.partitionData = (PartitionData) value;
this.partitionSpecId = (value != null) ? (Integer) value : -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho @RussellSpitzer @rdblue @openinx FYI, shifting order is a breaking change that caused Flink failing to restore from checkpoint. It is a not big deal for us this time as we are still in testing phase. I just like to call out that we need to be more careful in the future.

java.lang.ClassCastException: class org.apache.iceberg.PartitionData cannot be cast to class java.lang.Integer (org.apache.iceberg.PartitionData is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3e063fd4; java.lang.Integer is in module java.base of loader 'bootstrap')
	at org.apache.iceberg.BaseFile.put(BaseFile.java:238)
	at org.apache.iceberg.avro.ValueReaders$IndexedRecordReader.set(ValueReaders.java:746)
	at org.apache.iceberg.avro.ValueReaders$IndexedRecordReader.set(ValueReaders.java:715)
	at org.apache.iceberg.avro.ValueReaders$StructReader.read(ValueReaders.java:669)
	at org.apache.iceberg.avro.ValueReaders$StructReader.read(ValueReaders.java:669)
	at org.apache.iceberg.data.avro.DecoderResolver.resolveAndRead(DecoderResolver.java:48)
	at org.apache.iceberg.avro.GenericAvroReader.read(GenericAvroReader.java:69)
	at org.apache.iceberg.avro.ProjectionDatumReader.read(ProjectionDatumReader.java:74)
	at org.apache.avro.file.DataFileStream.next(DataFileStream.java:250)
	at org.apache.iceberg.avro.AvroIterable$AvroReuseIterator.next(AvroIterable.java:202)
	at org.apache.iceberg.io.CloseableIterable$4$1.next(CloseableIterable.java:113)
	at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:66)
	at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:50)
	at org.apache.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:108)
	at org.apache.iceberg.relocated.com.google.common.collect.Iterators.addAll(Iterators.java:355)
	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:143)
	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:130)
	at org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:60)
	at org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:105)
	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:212)
	at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:156)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.base/java.lang.Thread.run(Thread.java:829)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I did warn against it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, it's good that we caught this. How did it happen? Was Flink relying on a specific position?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink Iceberg sink checkpoints the manifest file. After upgrading to the latest Iceberg master branch, Flink job can't restore the checkpoint due to this Avro schema position change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu, I understood the failure scenario. I'm wondering where Flink is relying on position. That sounds like a Flink bug and we should make sure we fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue FlinkManifestUtil calls ManifestReader (from core module) to read the manifest Avro file. Now the BaseFile (also an Avro IndexedRecord) changed the order of fields, which breaks the Avro read path. I am not sure this is a Flink bug.

Copy link
Member Author

@szehon-ho szehon-ho Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi guys sorry I'm out of town and may reply slowly. @stevenzwu i am curious did you find a fix? I'm not sure i understand the complete problem, but this change should not modify the serialized form of the metadata file if its saved in Flink checkpoint as specId is a derived field, (if that is the concern). See VXMetadata.java controlling serialization format, which is not changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is definitely a Flink bug. Avro can handle schema evolution. You just need to keep track of the write schema and the read schema. My guess is that it is not correctly tracking the write schema used and so you get incorrect results at read time. Where is the write schema tracked for Flink state?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants