-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Read parquet BINARY column as String for expected #8808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@fengjiajie thanks for working on this. Could you please add a test that reproduces the issue? |
f921c46 to
c446df6
Compare
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
Outdated
Show resolved
Hide resolved
c446df6 to
ae05b57
Compare
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java
Outdated
Show resolved
Hide resolved
ae05b57 to
88b11ae
Compare
|
@nastra Thank you for taking the time to review my code |
The code changes LGTM, but I wonder whether this issue isn't something that should be fixed in Impala itself? |
Hi @nastra, thank you for the feedback. |
| if (expected.typeId() == Types.StringType.get().typeId()) { | ||
| return new StringReader(desc); | ||
| } else { | ||
| return new ParquetValueReaders.ByteArrayReader(desc); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about the backward compatibility of this change.
Someone might already depend on reading them as binary, and this change would break their use-case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about the backward compatibility of this change. Someone might already depend on reading them as binary, and this change would break their use-case
This modification is only applicable to cases where the iceberg definition is 'string' and parquet column is 'binary'. Previously, such cases would encounter the following exception (unit test can reproduce this exception):
java.lang.ClassCastException: [B cannot be cast to org.apache.flink.table.data.StringData
at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
at org.apache.iceberg.flink.data.TestFlinkParquetReader.testReadBinaryFieldAsString(TestFlinkParquetReader.java:137)
| assertThat(rows).as("Should not have more than one row").isExhausted(); | ||
| } | ||
|
|
||
| // read as byte[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a separate test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unit test is testing whether expected values can be read from same file and column when the iceberg column definition is both string and binary.
|
This is a small change, so it might not be too hard to keep the different Flink version changes in sync, but usually we introduce the changes on the latest Flink, and then create a different backport PR to backport them to the older Flink versions. This is better for the reviewer, as the change is smaller and easier to focus on the issues, and better for the contributor since when changes are required then they are not needed to be continuously merged to the other branches. And when the backport time comes, it is easier to check the backport specific changes. |
88b11ae to
536f377
Compare
Thank you for your review suggestions. I have made the changes to only include flink 1.17. |
|
@fengjiajie: Checked the codepath for the Spark readers and I have 2 questions:
|
@pvary The issue is that the type of the read result should be determined by the column type definition in Iceberg, rather than the data type within the Parquet file.
|
Thanks for the explanation!
How hard would it be to incorporate this to the Spark reader as well?
I think this is a bigger nut to crack. Probably worth another PR in Flink to fix this. |
I made modifications on Spark 3.5. Before the changes, the following exception would occur: |
|
@RussellSpitzer: Could you please take a look at the Spark change? |
|
I'm also a little nervous about this change, how are we guaranteed that the binary is parsable as UTF8 bytes? Seems like we should just be fixing the type annotations rather than changing our readers to read files that have been written incorrectly? |
@RussellSpitzer Thank you for participating in the review. The data reading type should be determined based on the column type definition in the iceberg metadata, rather than the column type definition in the parquet file. An imperfect analogy would be reading a CSV file where the column type is determined by the table's structural metadata during reading, rather than the type defined in the CSV file itself. |
|
My thoughts were more like, if we have a column defined as "double" we may allow "float" in the file definition but we wouldn't allow Binary. So how is this different? |
@RussellSpitzer I understand your point. This pull request can be seen as:
|
@RussellSpitzer Hi, can you please tell if this issue can be moved forward? We have a lot of hive tables that contain such parquet files and we are trying to convert these hive tables into iceberg tables, this process of parquet files cannot be rewritten (because of the large number of history files). We can guarantee that it could be parsed in UTF-8 because the data was originally defined as a string in hive. |
|
You can only guarantee this is safe for your data, for any other user this could be unsafe. That’s the underlying issue with this PR, we are essentially allowing a cast binary as string.Sent from my iPhoneOn Nov 24, 2023, at 4:47 AM, fengjiajie ***@***.***> wrote:
I'm also a little nervous about this change, how are we guaranteed that the binary is parsable as UTF8 bytes? Seems like we should just be fixing the type annotations rather than changing our readers to read files that have been written incorrectly?
@RussellSpitzer Hi, can you please tell if this issue can be moved forward?
We have a lot of hive tables that contain such parquet files and we are trying to convert these hive tables into iceberg tables, this process of parquet files cannot be rewritten (because of the large number of history files).
We can guarantee that it could be parsed in UTF-8 because the data was originally defined as a string in hive.
If it wasn't a string before, there's no reason defining it as a string when defining the iceberg table would make it fail to parse.
—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you were mentioned.Message ID: ***@***.***>
|
@RussellSpitzer Thanks for the reply, but I still don't get it.
Anyway, the conversion is based on the fact that the user defines the column as string and wants to use it as a string. If you think there is an inappropriate scenario, could you give an example? |
User sets the column as a string but the data is not UTF-8 encoded. Or worse, some files do have UTF-8 encoded binary and others do not. |
I think the logic here is straight forward for our users? If your data is not a string than you should not annotate it as a string right? If users define it as a string, it is the users' duty to make sure that the binary is a string. And about the 'unsafe' problem. WIthout the PR here, if users specify a binary data as string, it will just return byte array directly and the upper layer will crash because of type mismatch. And with the PR here, if the data is UTF-8 encoded, we are happy as our code could pass now. If the data is not UTF-8 encoded, we will crash, which is the same result before this PR. So in general, the PR here does not add new crashing scenarios right? Or at least, maybe we could introduce a fallback option here? If a binary data is annotated as string but no encoding information provided, we should use the fallback encoding to decode it? WDYT? @RussellSpitzer Thanks. |
|
It still feels like we are adding a special workaround to Iceberg for something which shouldn't be happening in the first place. Can you explain the use case again? Why can't the upstream file producer write correctly annotated binary columns? |
Impala used to write data like this, and there are bunch of data already written like this so we need to find a workaround without rewritting all the parquet data. New data will be written 'correctly', of course, but It is impossible for users to stop their business for a week(or even more time) when upgrading to iceberg... And I do not think the solution here breaks anything? Thanks. |
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially a schema promotion from binary to string. The data is stored as binary, but read as a string, something I proposed in #5151 a while ago.
I agree with Russel that this is fixing a bug in another system (Impala), and that makes it a bit of a slippery slope. However, if we want to add this at some point, then we can bring this in.
|
Spark adds a configuration to control whether to treat binary as string in parquet reader, see: https://github.com/apache/spark/blob/c8137960a0ba725d1633795a057c68f2bbef414b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L981 For this cases, the similar approach should be adapted, rather to promote binary to string regardless. It would be safer and it's up to the users to decide. WDYT @RussellSpitzer |
|
@advancedxy Thanks for the context from spark. There was a mail discussing this issue, and we agreed that providing an option to promote binary to string is the correct approach. I think we are on the same page. https://lists.apache.org/thread/r48tswg4f3gvzscgz7bzq7y10bpdnhz4 |
i met this problem with spark 3.5, so how can we solve this? |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Some systems like older versions of Impala do not annotate String type as UTF-8 columns in Parquet files. When importing these Parquet files into Iceberg, reading these Binary columns will encounter type errors.