-
Notifications
You must be signed in to change notification settings - Fork 3k
Read support for parquet int96 timestamps #1184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
ca3e955
cea839f
944c325
0c94f88
b35027b
7e48187
68dc4c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,21 +21,32 @@ | |
|
|
||
| import java.io.File; | ||
| import java.io.IOException; | ||
| import java.nio.file.Path; | ||
| import java.time.Instant; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.avro.generic.GenericData; | ||
| import org.apache.iceberg.Files; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.types.TypeUtil; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.spark.sql.Encoders; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
| import org.junit.Assert; | ||
| import org.junit.Assume; | ||
| import org.junit.Test; | ||
|
|
||
| import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; | ||
| import static org.apache.iceberg.types.Types.NestedField.optional; | ||
|
|
||
| public class TestSparkParquetReader extends AvroDataTest { | ||
| @Override | ||
|
|
@@ -67,4 +78,49 @@ protected void writeAndValidate(Schema schema) throws IOException { | |
| Assert.assertFalse("Should not have extra rows", rows.hasNext()); | ||
| } | ||
| } | ||
|
|
||
| protected List<InternalRow> rowsFromFile(InputFile inputFile, Schema schema) throws IOException { | ||
| try (CloseableIterable<InternalRow> reader = | ||
| Parquet.read(inputFile) | ||
| .project(schema) | ||
| .createReaderFunc(type -> SparkParquetReaders.buildReader(schema, type)) | ||
| .build()) { | ||
| return Lists.newArrayList(reader); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOException { | ||
| final SparkSession spark = | ||
| SparkSession.builder() | ||
| .master("local[2]") | ||
| .config("spark.sql.parquet.int96AsTimestamp", "false") | ||
| .getOrCreate(); | ||
|
||
|
|
||
| final String parquetPath = temp.getRoot().getAbsolutePath() + "/parquet_int96"; | ||
| final java.sql.Timestamp ts = java.sql.Timestamp.valueOf("2014-01-01 23:00:01"); | ||
| spark.createDataset(ImmutableList.of(ts), Encoders.TIMESTAMP()).write().parquet(parquetPath); | ||
|
||
| spark.stop(); | ||
|
|
||
| // Get the single parquet file produced by spark. | ||
| List<Path> parquetOutputs = | ||
| java.nio.file.Files.find( | ||
| java.nio.file.Paths.get(parquetPath), | ||
| 1, | ||
| (path, basicFileAttributes) -> path.toString().endsWith(".parquet")) | ||
| .collect(Collectors.toList()); | ||
| Assert.assertEquals(1, parquetOutputs.size()); | ||
|
|
||
| List<InternalRow> rows = | ||
| rowsFromFile( | ||
| Files.localInput(parquetOutputs.get(0).toFile()), | ||
| new Schema(optional(1, "timestamp", Types.TimestampType.withoutZone()))); | ||
| Assert.assertEquals(1, rows.size()); | ||
| Assert.assertEquals(1, rows.get(0).numFields()); | ||
|
|
||
| // Spark represents Timestamps as epoch micros and are stored as longs. | ||
| Assert.assertEquals( | ||
| ts.toInstant(), | ||
| Instant.ofEpochMilli(TimeUnit.MICROSECONDS.toMillis(rows.get(0).getLong(0)))); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for reviewers (and future me):
toByteBufferreturns a duplicate of the internal buffer so that it is safe for uses of it to modify the buffer's position with methods likegetLong.