-
Notifications
You must be signed in to change notification settings - Fork 3k
🐛 fix Flink Read support for parquet int96 timestamps #3987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
8b7cfb2
be8b45f
a4ada94
d886d80
9371226
1fee14e
f24d0ff
636c2c5
cc6ba8a
068163b
0e7c819
a49fc8c
85d6471
7db1116
02570a4
036fd64
51c1ea4
150f800
f29b145
6610357
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,9 +22,14 @@ | |
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Random; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.flink.table.types.logical.LogicalType; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.iceberg.Files; | ||
| import org.apache.iceberg.MetricsConfig; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.data.DataTest; | ||
| import org.apache.iceberg.data.RandomGenericData; | ||
|
|
@@ -34,8 +39,26 @@ | |
| import org.apache.iceberg.flink.TestHelpers; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
| import org.apache.iceberg.parquet.ParquetWriteAdapter; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.RandomUtil; | ||
| import org.apache.parquet.hadoop.ParquetWriter; | ||
| import org.apache.parquet.hadoop.api.WriteSupport; | ||
| import org.apache.parquet.hadoop.util.HadoopOutputFile; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
| import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; | ||
| import org.apache.spark.sql.types.DataTypes; | ||
| import org.apache.spark.sql.types.Metadata; | ||
| import org.apache.spark.sql.types.StructField; | ||
| import org.apache.spark.sql.types.StructType; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
| import static org.apache.iceberg.types.Types.NestedField.required; | ||
|
|
||
| public class TestFlinkParquetReader extends DataTest { | ||
| private static final int NUM_RECORDS = 100; | ||
|
|
@@ -72,4 +95,86 @@ protected void writeAndValidate(Schema schema) throws IOException { | |
| writeAndValidate(RandomGenericData.generateDictionaryEncodableRecords(schema, NUM_RECORDS, 21124), schema); | ||
| writeAndValidate(RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), schema); | ||
| } | ||
|
|
||
| protected List<RowData> rowDatasFromFile(InputFile inputFile, Schema schema) throws IOException { | ||
kingeasternsun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try (CloseableIterable<RowData> reader = | ||
| Parquet.read(inputFile) | ||
| .project(schema) | ||
| .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) | ||
| .build()) { | ||
| return Lists.newArrayList(reader); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { | ||
| String outputFilePath = String.format("%s/%s", temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); | ||
| HadoopOutputFile outputFile = | ||
| HadoopOutputFile.fromPath( | ||
| new org.apache.hadoop.fs.Path(outputFilePath), new Configuration()); | ||
| Schema schema = new Schema(required(1, "ts", Types.TimestampType.withZone())); | ||
| StructType sparkSchema = | ||
| new StructType( | ||
| new StructField[] { | ||
| new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) | ||
| }); | ||
|
|
||
| final Random random = new Random(0L); | ||
| List<InternalRow> rows = Lists.newArrayList(); | ||
| for (int i = 0; i < 10; i++) { | ||
| rows.add(new GenericInternalRow(new Object[] { | ||
| RandomUtil.generatePrimitive(schema.asStruct().fieldType("ts").asPrimitiveType(), random)})); | ||
| } | ||
|
|
||
| try (FileAppender<InternalRow> writer = | ||
| new ParquetWriteAdapter<>( | ||
| new NativeSparkWriterBuilder(outputFile) | ||
| .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) | ||
| .set("spark.sql.parquet.writeLegacyFormat", "false") | ||
| .set("spark.sql.parquet.outputTimestampType", "INT96") | ||
| .build(), | ||
| MetricsConfig.getDefault())) { | ||
| writer.addAll(rows); | ||
| } | ||
|
|
||
| InputFile parquetInputFile = Files.localInput(outputFilePath); | ||
| List<RowData> readDataRows = rowDatasFromFile(parquetInputFile, schema); | ||
| Assert.assertEquals(rows.size(), readDataRows.size()); | ||
| for (int i = 0; i < rows.size(); i += 1) { | ||
| Assert.assertEquals(rows.get(i).getLong(0), readDataRows.get(i).getLong(0)); | ||
|
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Native Spark ParquetWriter.Builder implementation so that we can write timestamps using Spark's native | ||
| * ParquetWriteSupport. | ||
| * thanks for the PR https://github.com/apache/iceberg/pull/1184 by @gustavoatt | ||
kingeasternsun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| */ | ||
| private static class NativeSparkWriterBuilder | ||
| extends ParquetWriter.Builder<InternalRow, NativeSparkWriterBuilder> { | ||
| private final Map<String, String> config = Maps.newHashMap(); | ||
|
|
||
| NativeSparkWriterBuilder(org.apache.parquet.io.OutputFile path) { | ||
| super(path); | ||
| } | ||
|
|
||
| public NativeSparkWriterBuilder set(String property, String value) { | ||
| this.config.put(property, value); | ||
| return self(); | ||
| } | ||
|
|
||
| @Override | ||
| protected NativeSparkWriterBuilder self() { | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| protected WriteSupport<InternalRow> getWriteSupport(Configuration configuration) { | ||
| for (Map.Entry<String, String> entry : config.entrySet()) { | ||
| configuration.set(entry.getKey(), entry.getValue()); | ||
| } | ||
|
|
||
| return new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,7 @@ project(':iceberg-flink:iceberg-flink-1.14') { | |
| exclude group: 'org.apache.hive', module: 'hive-storage-api' | ||
| } | ||
|
|
||
| testImplementation "org.apache.spark:spark-sql_2.12:3.2.0" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this dependency was added for writing the timestamp as int96 in the unit test, but in fact we apache flink's ParquetRowDataWriter support writing a timestamp_with_local_time_zone into an INT96. So I will suggest to use the flink parquet writer rather than the spark parquet writer. (It's strange for me to introduce a spark module in in the flink module).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually prefer using the Spark module, unless Flink natively supports writing INT96 timestamps to Parquet. The benefit of using the Spark module is that support has been around for a long time and is relatively trusted to produce correct INT96 timestamp values. |
||
| testImplementation "org.apache.flink:flink-core:${flinkVersion}" | ||
| testImplementation "org.apache.flink:flink-runtime:${flinkVersion}" | ||
| testImplementation ("org.apache.flink:flink-test-utils-junit:${flinkVersion}") { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,13 +25,17 @@ | |
| import java.util.ArrayList; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Random; | ||
| import org.apache.avro.generic.GenericData; | ||
| import org.apache.avro.generic.GenericRecord; | ||
| import org.apache.avro.generic.GenericRecordBuilder; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.flink.table.types.logical.LogicalType; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.iceberg.Files; | ||
| import org.apache.iceberg.MetricsConfig; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.avro.AvroSchemaUtil; | ||
| import org.apache.iceberg.data.DataTest; | ||
|
|
@@ -42,14 +46,28 @@ | |
| import org.apache.iceberg.flink.TestHelpers; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
| import org.apache.iceberg.parquet.ParquetWriteAdapter; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.RandomUtil; | ||
| import org.apache.parquet.avro.AvroParquetWriter; | ||
| import org.apache.parquet.hadoop.ParquetWriter; | ||
| import org.apache.parquet.hadoop.api.WriteSupport; | ||
| import org.apache.parquet.hadoop.util.HadoopOutputFile; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
| import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; | ||
| import org.apache.spark.sql.types.DataTypes; | ||
| import org.apache.spark.sql.types.Metadata; | ||
| import org.apache.spark.sql.types.StructField; | ||
| import org.apache.spark.sql.types.StructType; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
| import static org.apache.iceberg.types.Types.NestedField.optional; | ||
| import static org.apache.iceberg.types.Types.NestedField.required; | ||
|
|
||
| public class TestFlinkParquetReader extends DataTest { | ||
| private static final int NUM_RECORDS = 100; | ||
|
|
@@ -129,4 +147,86 @@ protected void writeAndValidate(Schema schema) throws IOException { | |
| writeAndValidate(RandomGenericData.generateDictionaryEncodableRecords(schema, NUM_RECORDS, 21124), schema); | ||
| writeAndValidate(RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), schema); | ||
| } | ||
|
|
||
| protected List<RowData> rowDatasFromFile(InputFile inputFile, Schema schema) throws IOException { | ||
| try (CloseableIterable<RowData> reader = | ||
| Parquet.read(inputFile) | ||
| .project(schema) | ||
| .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) | ||
| .build()) { | ||
| return Lists.newArrayList(reader); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will suggest to write few rows by using the flink native writers, and then use the the following readers to assert the their results:
|
||
| String outputFilePath = String.format("%s/%s", temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); | ||
| HadoopOutputFile outputFile = | ||
| HadoopOutputFile.fromPath( | ||
| new org.apache.hadoop.fs.Path(outputFilePath), new Configuration()); | ||
| Schema schema = new Schema(required(1, "ts", Types.TimestampType.withZone())); | ||
| StructType sparkSchema = | ||
| new StructType( | ||
| new StructField[] { | ||
| new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) | ||
| }); | ||
|
|
||
| final Random random = new Random(0L); | ||
| List<InternalRow> rows = Lists.newArrayList(); | ||
| for (int i = 0; i < 10; i++) { | ||
| rows.add(new GenericInternalRow(new Object[] { | ||
| RandomUtil.generatePrimitive(schema.asStruct().fieldType("ts").asPrimitiveType(), random)})); | ||
| } | ||
|
|
||
| try (FileAppender<InternalRow> writer = | ||
| new ParquetWriteAdapter<>( | ||
| new NativeSparkWriterBuilder(outputFile) | ||
| .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) | ||
| .set("spark.sql.parquet.writeLegacyFormat", "false") | ||
| .set("spark.sql.parquet.outputTimestampType", "INT96") | ||
| .build(), | ||
| MetricsConfig.getDefault())) { | ||
| writer.addAll(rows); | ||
| } | ||
|
|
||
| InputFile parquetInputFile = Files.localInput(outputFilePath); | ||
| List<RowData> readDataRows = rowDatasFromFile(parquetInputFile, schema); | ||
| Assert.assertEquals(rows.size(), readDataRows.size()); | ||
| for (int i = 0; i < rows.size(); i += 1) { | ||
| Assert.assertEquals(rows.get(i).getLong(0), readDataRows.get(i).getLong(0)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Native Spark ParquetWriter.Builder implementation so that we can write timestamps using Spark's native | ||
| * ParquetWriteSupport. | ||
| * thanks for the PR https://github.com/apache/iceberg/pull/1184 by @gustavoatt | ||
| */ | ||
| private static class NativeSparkWriterBuilder | ||
| extends ParquetWriter.Builder<InternalRow, NativeSparkWriterBuilder> { | ||
| private final Map<String, String> config = Maps.newHashMap(); | ||
|
|
||
| NativeSparkWriterBuilder(org.apache.parquet.io.OutputFile path) { | ||
| super(path); | ||
| } | ||
|
|
||
| public NativeSparkWriterBuilder set(String property, String value) { | ||
| this.config.put(property, value); | ||
| return self(); | ||
| } | ||
|
|
||
| @Override | ||
| protected NativeSparkWriterBuilder self() { | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| protected WriteSupport<InternalRow> getWriteSupport(Configuration configuration) { | ||
| for (Map.Entry<String, String> entry : config.entrySet()) { | ||
| configuration.set(entry.getKey(), entry.getValue()); | ||
| } | ||
|
|
||
| return new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport(); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.