diff --git a/flink/v1.14/build.gradle b/flink/v1.14/build.gradle index 66e3708ce500..e25e0492b24f 100644 --- a/flink/v1.14/build.gradle +++ b/flink/v1.14/build.gradle @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { exclude group: 'org.apache.hive', module: 'hive-storage-api' } + testImplementation "org.apache.spark:spark-sql_2.12:3.2.0" testImplementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}" testImplementation "org.apache.flink:flink-core:${flinkVersion}" testImplementation "org.apache.flink:flink-runtime:${flinkVersion}" diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index ab7b1174c9f3..897edb3a0332 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -21,11 +21,13 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.time.Instant; import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; @@ -279,6 +281,10 @@ public ParquetValueReader primitive( case INT64: case DOUBLE: return new ParquetValueReaders.UnboxedReader<>(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. + return new TimestampInt96Reader(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -338,6 +344,31 @@ public DecimalData read(DecimalData ignored) { } } + private static class TimestampInt96Reader extends ParquetValueReaders.UnboxedReader { + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public TimestampData read(TimestampData ignored) { + ByteBuffer byteBuffer = readBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = byteBuffer.getLong(); + int julianDay = byteBuffer.getInt(); + return TimestampData.fromLocalDateTime(Instant + .ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) + .plusNanos(timeOfDayNanos).atOffset(ZoneOffset.UTC).toLocalDateTime()); + } + + + @Override + public Binary readBinary() { + return column.nextBinary(); + } + + } + private static class MicrosToTimestampTzReader extends ParquetValueReaders.UnboxedReader { MicrosToTimestampTzReader(ColumnDescriptor desc) { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 30a2a7bb51ce..ae4438f2af52 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -23,33 +23,57 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Random; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.DataTest; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetWriteAdapter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.RandomUtil; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Test; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + public class TestFlinkParquetReader extends DataTest { private static final int NUM_RECORDS = 100; @@ -135,4 +159,106 @@ protected void writeAndValidate(Schema schema) throws IOException { RandomGenericData.generateFallbackRecords(schema, NUM_RECORDS, 21124, NUM_RECORDS / 20), schema); } + + protected List flinkReadRowDataFromFile(InputFile inputFile, Schema schema) throws IOException { + try (CloseableIterable reader = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .build()) { + return Lists.newArrayList(reader); + } + } + + protected List genericReadRowDataFromFile(InputFile inputFile, + Schema schema) throws IOException { + try (CloseableIterable reader = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc(type -> GenericParquetReaders.buildReader(schema, type)) + .build()) { + return Lists.newArrayList(reader); + } + } + + @Test + public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { + String outputFilePath = String.format("%s/%s", temp.getRoot().getAbsolutePath(), "parquet_int96.parquet"); + HadoopOutputFile outputFile = + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(outputFilePath), new Configuration()); + Schema schema = new Schema(required(1, "ts", Types.TimestampType.withoutZone())); + StructType sparkSchema = + new StructType( + new StructField[] { + new StructField("ts", DataTypes.TimestampType, true, Metadata.empty()) + }); + + final Random random = new Random(0L); + List rows = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + rows.add(new GenericInternalRow(new Object[] { + RandomUtil.generatePrimitive(schema.asStruct().fieldType("ts").asPrimitiveType(), random)})); + } + + try (FileAppender writer = + new ParquetWriteAdapter<>( + new NativeSparkWriterBuilder(outputFile) + .set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json()) + .set("spark.sql.parquet.writeLegacyFormat", "false") + .set("spark.sql.parquet.outputTimestampType", "INT96") + .build(), + MetricsConfig.getDefault())) { + writer.addAll(rows); + } + + InputFile parquetInputFile = Files.localInput(outputFilePath); + List flinkReadDataRows = flinkReadRowDataFromFile(parquetInputFile, schema); + List genericReadDataRows = genericReadRowDataFromFile(parquetInputFile, + schema); + + Assert.assertEquals(rows.size(), flinkReadDataRows.size()); + Assert.assertEquals(rows.size(), genericReadDataRows.size()); + for (int i = 0; i < rows.size(); i += 1) { + TimestampData actual = ((TimestampData) ((GenericRowData) flinkReadDataRows.get(i)).getField(0)); + Assert.assertEquals( + rows.get(i).getLong(0), + actual.getMillisecond() * 1000 + actual.getNanoOfMillisecond() / 1000); + + OffsetDateTime expect = ((OffsetDateTime) genericReadDataRows.get(i).getField("ts")); + Assert.assertTrue(expect.toLocalDateTime().equals(actual.toLocalDateTime())); + } + } + + /** + * Native Spark ParquetWriter.Builder implementation so that we can write timestamps using Spark's native + * ParquetWriteSupport. + */ + private static class NativeSparkWriterBuilder + extends ParquetWriter.Builder { + private final Map config = Maps.newHashMap(); + + NativeSparkWriterBuilder(org.apache.parquet.io.OutputFile path) { + super(path); + } + + public NativeSparkWriterBuilder set(String property, String value) { + this.config.put(property, value); + return self(); + } + + @Override + protected NativeSparkWriterBuilder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration configuration) { + for (Map.Entry entry : config.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + + return new org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport(); + } + } }