Skip to content

Conversation

@SinghAsDev
Copy link
Contributor

@SinghAsDev SinghAsDev commented Dec 13, 2021

Parquet files written with Parquet writers that use names other than list and element for repeated group
and element of the list respectively are read incorrectly. Parquet files generated by Hive fall in this category as well.

This PR fixes Iceberg's parquet writer returning nulls incorrectly for such parquet files and updates migration doc to inform users of this issue.

@SinghAsDev
Copy link
Contributor Author

@RussellSpitzer @rdblue what do you think about this, while I work on adding backwards compatibility support?

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reporting this @SinghAsDev.

I noticed that the Parquet file you submitted is using what looks like a custom Parquet version.

$ parquet-tools meta spark/v3.2/spark/src/test/resources/twoLevelList.pq
creator:         parquet-mr version 1.10.1-pinterest-0.0.75 (build ${buildNumber})

file schema:     hive_schema
--------------------------------------------------------------------------------
key:             OPTIONAL BINARY L:STRING R:0 D:1
val:             OPTIONAL F:1
.bag:            REPEATED F:1
..array_element: OPTIONAL F:2
...a1:           OPTIONAL BINARY L:STRING R:1 D:4
...a2:           OPTIONAL BINARY L:STRING R:1 D:4

row group 1:     RC:1 TS:153 OFFSET:4
--------------------------------------------------------------------------------
key:              BINARY ZSTD DO:0 FPO:4 SZ:58/49/0.84 VC:1 ENC:BIT_PACKED,PLAIN,RLE ST:[min: k1, max: k1, num_nulls: 0]
val:
.bag:
..array_element:
...a1:            BINARY ZSTD DO:0 FPO:62 SZ:61/52/0.85 VC:1 ENC:PLAIN,RLE ST:[min: a, max: a, num_nulls: 0]
...a2:            BINARY ZSTD DO:0 FPO:123 SZ:61/52/0.85 VC:1 ENC:PLAIN,RLE ST:[min: b, max: b, num_nulls: 0]

Does this also happen with OSS Iceberg? I will try to check. It would be great if instead of using a checked in file, Spark was used to generate the data instead (although I know that doesn't capture every case).

Comment on lines 54 to 70
/* Using a static file rather than generating test data in test, as parquet writers in Iceberg only supports
* three level lists. The twoLevelList.pq is a parquet file that contains following Parquet schema.
* message hive_schema {
* optional binary key (STRING);
* optional group val (LIST) {
* repeated group bag {
* optional group array_element {
* optional binary a1 (STRING);
* optional binary a2 (STRING);
* }
* }
* }
* }
*
* It contains only one row. Below is the json dump of the file.
* {"key":"k1","val":{"bag":[{"array_element":{"a1":"a","a2":"b"}}]}}
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your schema is off vs your JSON element:

It looks like val should be a list, but val in the JSON is a struct.

I might have that wrong though. I'm still working on a reproduction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I read in the element from the file, using spark, it doesn't have the same shape as the JSON that you showed.

scala> val df = spark.read.parquet("./spark/v3.2/spark/src/test/resources/twoLevelList.pq")
df: org.apache.spark.sql.DataFrame = [key: string, val: array<struct<a1:string,a2:string>>]

scala> df.printSchema
root
 |-- key: string (nullable = true)
 |-- val: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a1: string (nullable = true)
 |    |    |-- a2: string (nullable = true)


scala> df.toJSON.show(false)
+----------------------------------------+
|value                                   |
+----------------------------------------+
|{"key":"k1","val":[{"a1":"a","a2":"b"}]}|
+----------------------------------------+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it depends on how you are getting the json dump. If you use parquet-tools and use cat --json you would get the json I had pasted here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is correct. I did get the json you posted here.

@SinghAsDev
Copy link
Contributor Author

SinghAsDev commented Dec 13, 2021

@kbendick thanks for quick review.

Does this also happen with OSS Iceberg?

Yes.

It would be great if instead of using a checked in file, Spark was used to generate the data instead (although I know that doesn't capture every case).

Yea, the issue is that this issue only happens for parquet files generated from old non iceberg parquet writer. I have also mentioned this as a comment in the test file.

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Dec 13, 2021

I think it's worthwhile to add backwards compatibility. For reference we discussed this on slack and I noted the one case where we hit this internally #2167 but there we ended up just writing our files without the old structure in the source of the files. We hadn't hit it since then even with various other older hadoop distributions lying around.

That said if other folks are hitting this we should fix it.

It would be great to know the exact versions of hive/parquet that are generating this structure.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I have a reproduction that I will push to a separate branch.

I think it's because your schema has the word bag in it. I think this is one of those cases, like what happened with parquet-map type when it changed between 1.11.0 and 1.11.1, where we're relying on the parquet structure name.

When I run the same test with a file I generated with this schema (same data), I do not get null.

message spark_schema {
  required binary key (STRING);
  required group val (LIST) {
    repeated group list {
      optional group element {
        optional binary a1 (STRING);
        optional binary a2 (STRING);
      }
    }
  }
}

My file:

$ parquet-tools head spark/v3.2/spark/src/test/resources/parquet_test.parquet
key = k1
val:
.list:
..element:
...a1 = a
...a2 = b

Your file

$ parquet-tools head spark/v3.2/spark/src/test/resources/twoLevelList.pq
key = k1
val:
.bag:
..array_element:
...a1 = a
...a2 = b

Again, my element as JSON vs the element in the provided file as JSON:

{"key":"k1","val":{"list":[{"element":{"a1":"a","a2":"b"}}]}}

The one you provided

{"key":"k1","val":{"bag":[{"array_element":{"a1":"a","a2":"b"}}]}}

The difference is that yours is using the non-standard name bag. We applied a fix before where we can read any name (and don't rely on the parquet names for the field), so we can look into that tomorrow. I'll also check the discussion on slack.

But I'm wondering, how did your parquet file get generated with bag instead of list for the array element name? It seems the parquet version is parquet-mr version 1.10.1-pinterest-0.0.75 (build ${buildNumber}) as reported by the file, but what version (roughly speaking) of MR are you using?

@SinghAsDev
Copy link
Contributor Author

SinghAsDev commented Dec 13, 2021

bag is added by hive's schema converter (defined here. We use this serde from hive's 1.2.1 version. However, even latest versions still use bag here.

Another thing to note here is that even if we use list as name for repeated group of a list, I think Iceberg returns just empty list. I am working on a fix for this, along with backward compatibility support. Would love to get your reviews on that as well.

@kbendick
Copy link
Contributor

"bag" is added by hive's parquet serde, ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java. https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java#L53. We use this serde from hive's 1.2.1 version. However, even latest versions still use "bag", https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java.

Another thing to note here is that even if we use list as name for repeated group of a list, I think Iceberg returns just empty list. I am working on a fix for this, along with backward compatibility support. Would love to get your reviews on that as well.

I’d be happy to take a look when you have a fix. We applied a fix for Iceberg 0.12.1 where we stopped relying on the name provided in the parquet-schema for map fields.

I think something like that would be the best route forward so that we aren’t imposing any restrictions on the parquet provided names as we’ve observed these change across systems and parquet versions.

@SinghAsDev SinghAsDev changed the title Demonstrate issue where Iceberg's parquet reader silently returns nulls Fix Iceberg's parquet writer returning nulls incorrectly for parquet files written by writers that don't use list and element as names. Dec 13, 2021
@SinghAsDev
Copy link
Contributor Author

@kbendick @RussellSpitzer I updated the PR with fix for this issue. However, the fix is only for 3-level lists and I will work on compatibility support in a separate diff.


**Note** Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group
and element of the list respectively are **read incorrectly** by Iceberg upto 0.12.1 Iceberg versions. Parquet files
generated by Hive fall in this category.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to get a version bound here? Ideally the oss hive version where this starts and stops being an issue?

Copy link
Contributor

@kbendick kbendick Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked 3.x and it's still called bag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, some details here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use !!! Note for note.

Also "Parquet files generated by Hive" sounds a bit vague. I think we can specify example cases are Parquet files written using (1) org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe in Hive, or (2) spark.sql.parquet.writeLegacyFormat=true in Spark.

@SinghAsDev
Copy link
Contributor Author

SinghAsDev commented Dec 13, 2021 via email

@kbendick
Copy link
Contributor

Hive still writes with same names, so I would say it is a problem for any parquet file generated by writing to a hive table with ParquetHiveSerDe.
-- - Ashis

I suggest we consider going with the something similar to the map projection element rename fix that Ryan applied for maps from Parquet 1.11.0 to 1.11.1, where we don't depend on the name at all.

PR for context: #3309

It might not work because that was for projection, but it looks like you're updating similar areas of the code.

If we can get something like that where we don't assume names at all from parquet, just element structure and types, that would ideally allow for N levels of nesting etc. and further remove us from individual engines choice of element names for parquet (or changes within the parquet spec itself). cc @SinghAsDev

Comment on lines 109 to 115
private Type makeElement(Type element) {
// List's element in 3-level lists can be named differently across different parquet writers.
// For example, hive names it "array_element", whereas new parquet writers names it as "element".
if (element.getName().equals("element") || element.isPrimitive()) {
return element;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's possible to make this more generic and remove the dependency on the name entirely, that would be great.

Also, could you try to make a test that uses the logic in that file to design the schema instead of a checked in binary file? I know that might be difficult but if you wouldn't mind giving it a try, that would be preferred vs a binary file checked in.

As mentioned, here's a similar PR where you can find good test logic for making arbitrarily named Parquet message types: #3309

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make it generic from the perspective that we don't have magic keywords like "element", however I am not sure if the projection change you are referring to would work here, but do let me know if you think otherwise. Here, we are making this change so that constructed parquet schema had correct ids, but still has the same schema as the one in parquet file. I will at least make this part generic, will update the diff shortly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a bit to remove the usage of "element", however there isn't a clean way to do so. The only way I could come up with is build a dummy list and then get the element out of it. I am not sure if that is any cleaner than the current approach. Below is the change I am referring to.

  private Type makeElement(Type element) {
    // List's element in 3-level lists can be named differently across different parquet writers.
    // For example, hive names it "array_element", whereas new parquet writers names it as "element".
    if (element.isPrimitive()) {
      return element;
    }

    Types.BaseListBuilder.GroupElementBuilder<GroupType, Types.ListBuilder<GroupType>> dummyBuilder = Types.list(Type.Repetition.OPTIONAL)
            .groupElement(element.getRepetition())
            .addFields(element.asGroupType().getFields().toArray(new Type[0]));
    if (element.getId() != null) {
      dummyBuilder.id(element.getId().intValue());
    }
    return dummyBuilder.named("dummy").getType(0).asGroupType().getType(0);
  }

As such, I don't have a strong preference on it. @kbendick let me know if you like this one better and I will make the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbendick I also updated test to dynamically generate parquet file in required format. Mind giving it another pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll absolutely give this another pass. Sorry I was head down in something else today.

I'll also look for somebody to start the tests for you unless you'd like to wait. 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yeah, understood the projection related changes, while similar in concept, don't necessarily apply here unfortunately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kbendick , getting the tests started will definitely be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated tests look great @SinghAsDev! Thank you!

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SinghAsDev Two questions.

Are you using Hive on MapReduce or Hive on Tez?

And which Hive version did you say you were running? (I know that file still uses bag in later versions but just for my own knowledge so I can repro properly if need be).

@SinghAsDev
Copy link
Contributor Author

@kbendick we were using Hive on MR with Hive 1.2.1 and so still have tons of files written by it. However, as we discussed this is not specific to old versions.

@SinghAsDev
Copy link
Contributor Author

Hey @kbendick @RussellSpitzer would you be able to kick start the builds again? I had to fix a couple of checkStyle errors.

@SinghAsDev SinghAsDev changed the title Fix Iceberg's parquet writer returning nulls incorrectly for parquet files written by writers that don't use list and element as names. Fix Iceberg's parquet reader returning nulls incorrectly for parquet files written by writers that don't use list and element as names. Dec 15, 2021
@jackye1995
Copy link
Contributor

"bag" is added by hive's parquet serde, ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java

So sounds like the issue will only come when users load table through Hive and import to Iceberg through add-files procedure. Technically users could have a way out by using migrate-table procedure to rewrite the entire table using the Iceberg writer instead.

But this seems to impact quite a lot of users for fast Hive to Iceberg migration, so I will add this in 0.13 milestone for now, please let me know if anyone objects.


// generate parquet file with required schema
List<String> testData = Collections.singletonList("{\"col1\": [{\"col2\": 1}]}");
spark.read().schema(sparkSchema).json(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just run a SQL Insert statement here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@Test
public void testHiveStyleThreeLevelList() throws IOException {
File location = new File(temp.getRoot(), "parquetReaderTest");
StructType sparkSchema =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a different schema than would be created by

CREATE HIVEFORMAT TABLE x ( col_1 : List<struct<col2: int>>)

and

CREATE TABLE x ( col_1 : List<struct<col2: int>>) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason I had explicitly defined the schemas here is to demonstrate the issue a bit more clearly. However, I think now that we all are on same page in terms of the issue, I can make the tests a bit easy to follow. Will follow up with an update shortly.

Assert.assertEquals(expectedParquetSchema, schema.toString());

// read from Iceberg's parquet reader and ensure data is read correctly into Spark's internal row
Schema icebergSchema = SparkSchemaUtil.convert(sparkSchema);
Copy link
Member

@RussellSpitzer RussellSpitzer Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do a test to make sure files imported are read correctly, instead of using the internal methods doing an import files into an iceberg table and showing that the rows are correct? Part of add_files or Migrate tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also have the low level tests in TestParquetSchemaUtil.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will update shortly.

@SinghAsDev
Copy link
Contributor Author

@RussellSpitzer mind giving it another pass, thanks

@Test
public void testSchemaConversionForHiveStyleLists() {
String parquetSchemaString =
"message spark_schema {\n" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schema cannot be constructed using the Parquet API directly? I was hoping this would look similar to the tests above that create the schema using Parquet directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no, as the moment we call list, it will use 3-level list with list and element names

@SinghAsDev
Copy link
Contributor Author

@RussellSpitzer @jackye1995 thanks for your reviews, the diff is updated to address your suggestions.

!!! Note
Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group
and element of the list respectively are **read incorrectly as nulls** by Iceberg upto 0.12.1 Iceberg versions.
Most commonly such files are written by follow writers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the following writers:

btw, have you tried mkdocs serve? Not sure if the indentation would be correctly displayed, please verify that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jackye1995 , to ease the cherry-picks we decided to extract out format change to a separate PR. Will follow up on this right after we get this PR in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated docs to take of nit comment and made sure it looks as expected with mkdocs serve as well.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm +1, Will merge when tests finish in a second

@RussellSpitzer RussellSpitzer merged commit 9b0aba7 into apache:master Dec 17, 2021
@RussellSpitzer
Copy link
Member

Thanks so much to @SinghAsDev for the PR and @jackye1995 and @kbendick for the review

@SinghAsDev
Copy link
Contributor Author

Thanks for reviews @RussellSpitzer @jackye1995 @kbendick !

@SinghAsDev SinghAsDev deleted the pq_test branch December 17, 2021 22:38
RussellSpitzer pushed a commit that referenced this pull request Dec 18, 2021
…ns. (#3767)

* Add spark's legacy mode write tests to spark3.0 and 3.1 versions backport of tests from #3723
rdblue added a commit to rdblue/iceberg that referenced this pull request Dec 19, 2021
rdblue added a commit to rdblue/iceberg that referenced this pull request Dec 19, 2021
rdblue added a commit that referenced this pull request Dec 21, 2021
jackye1995 pushed a commit to jackye1995/iceberg-docs that referenced this pull request Feb 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants