From ab0c7e4ca6906c5c187707ea50cd41d9c0478281 Mon Sep 17 00:00:00 2001 From: openinx Date: Mon, 8 Jun 2020 20:12:01 +0800 Subject: [PATCH 1/8] Implement the parquet value reader & writer for apache flink --- build.gradle | 2 + .../data/parquet/GenericParquetReaders.java | 52 ++++-- .../data/parquet/GenericParquetWriter.java | 18 ++- .../flink/data/FlinkParquetReaders.java | 75 +++++++++ .../flink/data/FlinkParquetWriters.java | 49 ++++++ .../apache/iceberg/flink/data/RandomData.java | 148 ++++++++++++++++++ .../data/TestFlinkParquetReaderWriter.java | 100 ++++++++++++ 7 files changed, 424 insertions(+), 20 deletions(-) create mode 100644 flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java diff --git a/build.gradle b/build.gradle index 8086bce5023a..28400840a762 100644 --- a/build.gradle +++ b/build.gradle @@ -261,6 +261,8 @@ project(':iceberg-flink') { testCompile("org.apache.flink:flink-test-utils_2.12") { exclude group: "org.apache.curator", module: 'curator-test' } + + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') } } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 07419d57df8d..b9da8ddb0662 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; @@ -67,26 +66,34 @@ private GenericParquetReaders() { public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { - return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); + return buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RecordReader::new); } - @SuppressWarnings("unchecked") + public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema, Map idToConstant) { + return buildReader(expectedSchema, fileSchema, idToConstant, RecordReader::new); + } + + @SuppressWarnings("unchecked") + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant, + StructReaderFactory structReaderFactory) { if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema, idToConstant)); + new ReadBuilder(fileSchema, idToConstant, structReaderFactory)); } else { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema, idToConstant)); + new FallbackReadBuilder(fileSchema, idToConstant, structReaderFactory)); } } private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type, Map idToConstant) { - super(type, idToConstant); + FallbackReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { + super(type, idToConstant, structReaderFactory); } @Override @@ -111,17 +118,19 @@ public ParquetValueReader struct(StructType expected, GroupType struct, types.add(fieldType); } - return new RecordReader(types, newFields, expected); + return structReaderFactory().create(types, newFields, expected); } } private static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; + private final StructReaderFactory structReaderFactory; - ReadBuilder(MessageType type, Map idToConstant) { + ReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { this.type = type; this.idToConstant = idToConstant; + this.structReaderFactory = structReaderFactory; } @Override @@ -168,7 +177,7 @@ public ParquetValueReader struct(StructType expected, GroupType struct, } } - return new RecordReader(types, reorderedFields, expected); + return structReaderFactory.create(types, reorderedFields, expected); } @Override @@ -298,6 +307,10 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy MessageType type() { return type; } + + StructReaderFactory structReaderFactory() { + return structReaderFactory; + } } private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); @@ -396,7 +409,12 @@ public byte[] read(byte[] reuse) { } } - static class RecordReader extends StructReader { + public interface StructReaderFactory { + + StructReader create(List types, List> readers, StructType struct); + } + + static class RecordReader extends StructReader { private final StructType structType; RecordReader(List types, @@ -407,7 +425,7 @@ static class RecordReader extends StructReader { } @Override - protected Record newStructData(Record reuse) { + protected GenericRecord newStructData(GenericRecord reuse) { if (reuse != null) { return reuse; } else { @@ -417,17 +435,17 @@ protected Record newStructData(Record reuse) { @Override @SuppressWarnings("unchecked") - protected Object getField(Record intermediate, int pos) { + protected Object getField(GenericRecord intermediate, int pos) { return intermediate.get(pos); } @Override - protected Record buildStruct(Record struct) { + protected GenericRecord buildStruct(GenericRecord struct) { return struct; } @Override - protected void set(Record struct, int pos, Object value) { + protected void set(GenericRecord struct, int pos, Object value) { struct.set(pos, value); } } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index 0c998eaa4e70..fe016b49eb4a 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -51,14 +51,21 @@ private GenericParquetWriter() { @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return buildWriter(type, RecordWriter::new); + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter(MessageType type, StructWriterFactory structWriterFactory) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type, structWriterFactory)); } private static class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; + private final StructWriterFactory structWriterFactory; - WriteBuilder(MessageType type) { + WriteBuilder(MessageType type, StructWriterFactory structWriterFactory) { this.type = type; + this.structWriterFactory = structWriterFactory; } @Override @@ -78,7 +85,7 @@ public ParquetValueWriter struct(GroupType struct, writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return new RecordWriter(writers); + return structWriterFactory.create(writers); } @Override @@ -284,6 +291,11 @@ public void write(int repetitionLevel, byte[] value) { } } + public interface StructWriterFactory { + + StructWriter create(List> writers); + } + private static class RecordWriter extends StructWriter { private RecordWriter(List> writers) { super(writers); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java new file mode 100644 index 000000000000..c1862cdf8984 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.List; +import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +public class FlinkParquetReaders { + private FlinkParquetReaders() { + + } + + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema) { + return GenericParquetReaders.buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RowReader::new); + } + + static class RowReader extends ParquetValueReaders.StructReader { + private final Types.StructType structType; + + RowReader(List types, List> readers, Types.StructType struct) { + super(types, readers); + this.structType = struct; + } + + @Override + protected Row newStructData(Row reuse) { + if (reuse != null) { + return reuse; + } else { + return new Row(structType.fields().size()); + } + } + + @Override + protected Object getField(Row row, int pos) { + return row.getField(pos); + } + + @Override + protected Row buildStruct(Row row) { + return row; + } + + @Override + protected void set(Row row, int pos, Object value) { + row.setField(pos, value); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java new file mode 100644 index 000000000000..5cd3a5325861 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.List; +import org.apache.flink.types.Row; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.parquet.schema.MessageType; + +public class FlinkParquetWriters { + private FlinkParquetWriters() { + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter(MessageType type) { + return GenericParquetWriter.buildWriter(type, RowWriter::new); + } + + private static class RowWriter extends ParquetValueWriters.StructWriter { + + private RowWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(Row row, int index) { + return row.getField(index); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java new file mode 100644 index 000000000000..1a8ce9d0dd17 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.function.Supplier; +import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.RandomUtil; + +public class RandomData { + private RandomData() {} + + public static List generate(Schema schema, int numRecords, long seed) { + RandomDataGenerator generator = new RandomDataGenerator(seed); + List rows = Lists.newArrayListWithExpectedSize(numRecords); + for (int i = 0; i < numRecords; i += 1) { + rows.add((Row) TypeUtil.visit(schema, generator)); + } + + return rows; + } + + private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { + private final Random random; + + private RandomDataGenerator(long seed) { + this.random = new Random(seed); + } + + @Override + public Row schema(Schema schema, Supplier structResult) { + return (Row) structResult.get(); + } + + @Override + public Row struct(Types.StructType struct, Iterable fieldResults) { + Row row = new Row(struct.fields().size()); + + List values = Lists.newArrayList(fieldResults); + for (int i = 0; i < values.size(); i += 1) { + row.setField(i, values.get(i)); + } + + return row; + } + + @Override + public Object field(Types.NestedField field, Supplier fieldResult) { + // return null 5% of the time when the value is optional + if (field.isOptional() && random.nextInt(20) == 1) { + return null; + } + return fieldResult.get(); + } + + @Override + public Object list(Types.ListType list, Supplier elementResult) { + int numElements = random.nextInt(20); + + List result = Lists.newArrayListWithExpectedSize(numElements); + for (int i = 0; i < numElements; i += 1) { + // return null 5% of the time when the value is optional + if (list.isElementOptional() && random.nextInt(20) == 1) { + result.add(null); + } else { + result.add(elementResult.get()); + } + } + + return result; + } + + @Override + public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { + int numEntries = random.nextInt(20); + + Map result = Maps.newLinkedHashMap(); + Supplier keyFunc; + if (map.keyType() == Types.StringType.get()) { + keyFunc = () -> keyResult.get().toString(); + } else { + keyFunc = keyResult; + } + + Set keySet = Sets.newHashSet(); + for (int i = 0; i < numEntries; i += 1) { + Object key = keyFunc.get(); + // ensure no collisions + while (keySet.contains(key)) { + key = keyFunc.get(); + } + + keySet.add(key); + + // return null 5% of the time when the value is optional + if (map.isValueOptional() && random.nextInt(20) == 1) { + result.put(key, null); + } else { + result.put(key, valueResult.get()); + } + } + + return result; + } + + @Override + public Object primitive(Type.PrimitiveType primitive) { + Object result = RandomUtil.generatePrimitive(primitive, random); + switch (primitive.typeId()) { + case BINARY: + return ByteBuffer.wrap((byte[]) result); + case UUID: + return UUID.nameUUIDFromBytes((byte[]) result); + default: + return result; + } + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java new file mode 100644 index 000000000000..0a1918a0618a --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.types.Row; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestFlinkParquetReaderWriter { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static final Schema COMPLEX_SCHEMA = new Schema( + required(1, "roots", Types.LongType.get()), + optional(3, "lime", Types.ListType.ofRequired(4, Types.DoubleType.get())), + required(5, "strict", Types.StructType.of( + required(9, "tangerine", Types.StringType.get()), + optional(6, "hopeful", Types.StructType.of( + required(7, "steel", Types.FloatType.get()), + required(8, "lantern", Types.DateType.get()) + )), + optional(10, "vehement", Types.LongType.get()) + )), + optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13, + Types.StringType.get(), Types.TimestampType.withZone())), + required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of( + optional(16, "beet", Types.DoubleType.get()), + required(17, "stamp", Types.FloatType.get()), + optional(18, "wheeze", Types.StringType.get()) + ))), + optional(19, "renovate", Types.MapType.ofRequired(20, 21, + Types.StringType.get(), Types.StructType.of( + optional(22, "jumpy", Types.DoubleType.get()), + required(23, "koala", Types.IntegerType.get()), + required(24, "couch rope", Types.IntegerType.get()) + ))), + optional(2, "slide", Types.StringType.get()) + ); + + @Test + public void testCorrectness() throws IOException { + int numRows = 2500; + Iterable records = RandomData.generate(COMPLEX_SCHEMA, numRows, 19981); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) + .schema(COMPLEX_SCHEMA) + .createWriterFunc(FlinkParquetWriters::buildWriter) + .build()) { + writer.addAll(records); + } + + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(COMPLEX_SCHEMA) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(COMPLEX_SCHEMA, type)) + .build()) { + Iterator expected = records.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < numRows; i += 1) { + Assert.assertTrue("Should have expected number of rows", rows.hasNext()); + Assert.assertEquals(expected.next(), rows.next()); + } + Assert.assertFalse("Should not have extra rows", rows.hasNext()); + } + } +} From e33d2cdeb44b4be7e5c8a7c4a6110e32672c938a Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 18 Jun 2020 19:53:56 +0800 Subject: [PATCH 2/8] Fix the failure unit tests. --- .../apache/iceberg/flink/data/RandomData.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java index 1a8ce9d0dd17..8b66126086cf 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java @@ -20,6 +20,11 @@ package org.apache.iceberg.flink.data; import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; import java.util.Random; @@ -36,8 +41,11 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.RandomUtil; +import static java.time.temporal.ChronoUnit.MICROS; + public class RandomData { - private RandomData() {} + private RandomData() { + } public static List generate(Schema schema, int numRecords, long seed) { RandomDataGenerator generator = new RandomDataGenerator(seed); @@ -140,9 +148,23 @@ public Object primitive(Type.PrimitiveType primitive) { return ByteBuffer.wrap((byte[]) result); case UUID: return UUID.nameUUIDFromBytes((byte[]) result); + case DATE: + return EPOCH_DAY.plusDays((Integer) result); + case TIME: + return LocalTime.ofNanoOfDay((long) result * 1000); + case TIMESTAMP: + Types.TimestampType ts = (Types.TimestampType) primitive; + if (ts.shouldAdjustToUTC()) { + return EPOCH.plus((long) result, MICROS); + } else { + return EPOCH.plus((long) result, MICROS).toLocalDateTime(); + } default: return result; } } } + + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); } From 1c37f6def03ed67d4b717f5347909ca4b8763897 Mon Sep 17 00:00:00 2001 From: openinx Date: Wed, 24 Jun 2020 10:59:58 +0800 Subject: [PATCH 3/8] Remove the StructWriterFactory & StructReaderFactory. --- build.gradle | 1 + .../data/parquet/GenericParquetReaders.java | 62 ++++------ .../data/parquet/GenericParquetWriter.java | 24 ++-- .../iceberg/data/RandomGenericData.java | 24 +++- .../flink/data/FlinkParquetReaders.java | 42 ++++++- .../flink/data/FlinkParquetWriters.java | 15 ++- .../apache/iceberg/flink/data/RandomData.java | 117 ++---------------- 7 files changed, 114 insertions(+), 171 deletions(-) diff --git a/build.gradle b/build.gradle index 28400840a762..83a790485282 100644 --- a/build.gradle +++ b/build.gradle @@ -263,6 +263,7 @@ project(':iceberg-flink') { } testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') } } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index b9da8ddb0662..0900c7027fdc 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; @@ -64,36 +65,29 @@ public class GenericParquetReaders { private GenericParquetReaders() { } - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { - return buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RecordReader::new); - } - - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema, - Map idToConstant) { - return buildReader(expectedSchema, fileSchema, idToConstant, RecordReader::new); + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema) { + return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); } @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema, - Map idToConstant, - StructReaderFactory structReaderFactory) { + Map idToConstant) { if (ParquetSchemaUtil.hasIds(fileSchema)) { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema, idToConstant, structReaderFactory)); + new ReadBuilder(fileSchema, idToConstant)); } else { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema, idToConstant, structReaderFactory)); + new FallbackReadBuilder(fileSchema, idToConstant)); } } - private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { - super(type, idToConstant, structReaderFactory); + public static class FallbackReadBuilder extends ReadBuilder { + protected FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); } @Override @@ -118,19 +112,17 @@ public ParquetValueReader struct(StructType expected, GroupType struct, types.add(fieldType); } - return structReaderFactory().create(types, newFields, expected); + return createStructReader(types, newFields, expected); } } - private static class ReadBuilder extends TypeWithSchemaVisitor> { + public static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; - private final StructReaderFactory structReaderFactory; - ReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { + protected ReadBuilder(MessageType type, Map idToConstant) { this.type = type; this.idToConstant = idToConstant; - this.structReaderFactory = structReaderFactory; } @Override @@ -139,6 +131,12 @@ public ParquetValueReader message(StructType expected, MessageType message, return struct(expected, message.asGroupType(), fieldReaders); } + protected StructReader createStructReader(List types, + List> readers, + StructType struct) { + return new RecordReader(types, readers, struct); + } + @Override public ParquetValueReader struct(StructType expected, GroupType struct, List> fieldReaders) { @@ -177,7 +175,7 @@ public ParquetValueReader struct(StructType expected, GroupType struct, } } - return structReaderFactory.create(types, reorderedFields, expected); + return createStructReader(types, reorderedFields, expected); } @Override @@ -307,10 +305,6 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy MessageType type() { return type; } - - StructReaderFactory structReaderFactory() { - return structReaderFactory; - } } private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); @@ -409,12 +403,7 @@ public byte[] read(byte[] reuse) { } } - public interface StructReaderFactory { - - StructReader create(List types, List> readers, StructType struct); - } - - static class RecordReader extends StructReader { + static class RecordReader extends StructReader { private final StructType structType; RecordReader(List types, @@ -425,7 +414,7 @@ static class RecordReader extends StructReader { } @Override - protected GenericRecord newStructData(GenericRecord reuse) { + protected Record newStructData(Record reuse) { if (reuse != null) { return reuse; } else { @@ -434,18 +423,17 @@ protected GenericRecord newStructData(GenericRecord reuse) { } @Override - @SuppressWarnings("unchecked") - protected Object getField(GenericRecord intermediate, int pos) { + protected Object getField(Record intermediate, int pos) { return intermediate.get(pos); } @Override - protected GenericRecord buildStruct(GenericRecord struct) { + protected Record buildStruct(Record struct) { return struct; } @Override - protected void set(GenericRecord struct, int pos, Object value) { + protected void set(Record struct, int pos, Object value) { struct.set(pos, value); } } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index fe016b49eb4a..e8c36fb76efd 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -51,21 +51,14 @@ private GenericParquetWriter() { @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(MessageType type) { - return buildWriter(type, RecordWriter::new); + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); } - @SuppressWarnings("unchecked") - public static ParquetValueWriter buildWriter(MessageType type, StructWriterFactory structWriterFactory) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type, structWriterFactory)); - } - - private static class WriteBuilder extends ParquetTypeVisitor> { + public static class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; - private final StructWriterFactory structWriterFactory; - WriteBuilder(MessageType type, StructWriterFactory structWriterFactory) { + protected WriteBuilder(MessageType type) { this.type = type; - this.structWriterFactory = structWriterFactory; } @Override @@ -74,6 +67,10 @@ public ParquetValueWriter message(MessageType message, return struct(message.asGroupType(), fieldWriters); } + protected StructWriter createStructWriter(List> writers) { + return new RecordWriter(writers); + } + @Override public ParquetValueWriter struct(GroupType struct, List> fieldWriters) { @@ -85,7 +82,7 @@ public ParquetValueWriter struct(GroupType struct, writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return structWriterFactory.create(writers); + return createStructWriter(writers); } @Override @@ -291,11 +288,6 @@ public void write(int repetitionLevel, byte[] value) { } } - public interface StructWriterFactory { - - StructWriter create(List> writers); - } - private static class RecordWriter extends StructWriter { private RecordWriter(List> writers) { super(writers); diff --git a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java index e97656ac8fc0..c8ad61426be5 100644 --- a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java @@ -46,7 +46,7 @@ public class RandomGenericData { private RandomGenericData() {} public static List generate(Schema schema, int numRecords, long seed) { - RandomDataGenerator generator = new RandomDataGenerator(seed); + RandomRecordDataGenerator generator = new RandomRecordDataGenerator(seed); List records = Lists.newArrayListWithExpectedSize(numRecords); for (int i = 0; i < numRecords; i += 1) { records.add((Record) TypeUtil.visit(schema, generator)); @@ -55,11 +55,9 @@ public static List generate(Schema schema, int numRecords, long seed) { return records; } - private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { - private final Random random; - - private RandomDataGenerator(long seed) { - this.random = new Random(seed); + private static class RandomRecordDataGenerator extends RandomDataGenerator { + private RandomRecordDataGenerator(long seed) { + super(seed); } @Override @@ -78,6 +76,20 @@ public Record struct(Types.StructType struct, Iterable fieldResults) { return rec; } + } + + public abstract static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { + private final Random random; + + protected RandomDataGenerator(long seed) { + this.random = new Random(seed); + } + + @Override + public abstract T schema(Schema schema, Supplier structResult); + + @Override + public abstract T struct(Types.StructType struct, Iterable fieldResults); @Override public Object field(Types.NestedField field, Supplier fieldResult) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index c1862cdf8984..898909a26f02 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -20,11 +20,14 @@ package org.apache.iceberg.flink.data; import java.util.List; +import java.util.Map; import org.apache.flink.types.Row; import org.apache.iceberg.Schema; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.apache.parquet.schema.MessageType; @@ -35,9 +38,46 @@ private FlinkParquetReaders() { } + @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { - return GenericParquetReaders.buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RowReader::new); + if (ParquetSchemaUtil.hasIds(fileSchema)) { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new ReadBuilder(fileSchema, ImmutableMap.of())); + } else { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new FallbackReadBuilder(fileSchema, ImmutableMap.of())); + } + } + + private static class FallbackReadBuilder extends GenericParquetReaders.FallbackReadBuilder { + + private FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + + @Override + protected ParquetValueReaders.StructReader createStructReader(List types, + List> readers, + Types.StructType struct) { + return new RowReader(types, readers, struct); + } + } + + private static class ReadBuilder extends GenericParquetReaders.ReadBuilder { + + private ReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + + @Override + protected ParquetValueReaders.StructReader createStructReader(List types, + List> readers, + Types.StructType struct) { + return new RowReader(types, readers, struct); + } } static class RowReader extends ParquetValueReaders.StructReader { diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 5cd3a5325861..190092ff9c98 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.flink.types.Row; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.parquet.schema.MessageType; @@ -32,7 +33,19 @@ private FlinkParquetWriters() { @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(MessageType type) { - return GenericParquetWriter.buildWriter(type, RowWriter::new); + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + } + + private static class WriteBuilder extends GenericParquetWriter.WriteBuilder { + + private WriteBuilder(MessageType type) { + super(type); + } + + @Override + protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { + return new RowWriter(writers); + } } private static class RowWriter extends ParquetValueWriters.StructWriter { diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java index 8b66126086cf..5d300ce2fd13 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java @@ -19,49 +19,32 @@ package org.apache.iceberg.flink.data; -import java.nio.ByteBuffer; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.UUID; import java.util.function.Supplier; import org.apache.flink.types.Row; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.RandomUtil; -import static java.time.temporal.ChronoUnit.MICROS; - -public class RandomData { +class RandomData { private RandomData() { } - public static List generate(Schema schema, int numRecords, long seed) { - RandomDataGenerator generator = new RandomDataGenerator(seed); + static List generate(Schema schema, int numRecords, long seed) { + RandomRowDataGenerator generator = new RandomRowDataGenerator(seed); List rows = Lists.newArrayListWithExpectedSize(numRecords); for (int i = 0; i < numRecords; i += 1) { rows.add((Row) TypeUtil.visit(schema, generator)); } - return rows; } - private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { - private final Random random; + private static class RandomRowDataGenerator extends RandomGenericData.RandomDataGenerator { - private RandomDataGenerator(long seed) { - this.random = new Random(seed); + private RandomRowDataGenerator(long seed) { + super(seed); } @Override @@ -80,91 +63,5 @@ public Row struct(Types.StructType struct, Iterable fieldResults) { return row; } - - @Override - public Object field(Types.NestedField field, Supplier fieldResult) { - // return null 5% of the time when the value is optional - if (field.isOptional() && random.nextInt(20) == 1) { - return null; - } - return fieldResult.get(); - } - - @Override - public Object list(Types.ListType list, Supplier elementResult) { - int numElements = random.nextInt(20); - - List result = Lists.newArrayListWithExpectedSize(numElements); - for (int i = 0; i < numElements; i += 1) { - // return null 5% of the time when the value is optional - if (list.isElementOptional() && random.nextInt(20) == 1) { - result.add(null); - } else { - result.add(elementResult.get()); - } - } - - return result; - } - - @Override - public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { - int numEntries = random.nextInt(20); - - Map result = Maps.newLinkedHashMap(); - Supplier keyFunc; - if (map.keyType() == Types.StringType.get()) { - keyFunc = () -> keyResult.get().toString(); - } else { - keyFunc = keyResult; - } - - Set keySet = Sets.newHashSet(); - for (int i = 0; i < numEntries; i += 1) { - Object key = keyFunc.get(); - // ensure no collisions - while (keySet.contains(key)) { - key = keyFunc.get(); - } - - keySet.add(key); - - // return null 5% of the time when the value is optional - if (map.isValueOptional() && random.nextInt(20) == 1) { - result.put(key, null); - } else { - result.put(key, valueResult.get()); - } - } - - return result; - } - - @Override - public Object primitive(Type.PrimitiveType primitive) { - Object result = RandomUtil.generatePrimitive(primitive, random); - switch (primitive.typeId()) { - case BINARY: - return ByteBuffer.wrap((byte[]) result); - case UUID: - return UUID.nameUUIDFromBytes((byte[]) result); - case DATE: - return EPOCH_DAY.plusDays((Integer) result); - case TIME: - return LocalTime.ofNanoOfDay((long) result * 1000); - case TIMESTAMP: - Types.TimestampType ts = (Types.TimestampType) primitive; - if (ts.shouldAdjustToUTC()) { - return EPOCH.plus((long) result, MICROS); - } else { - return EPOCH.plus((long) result, MICROS).toLocalDateTime(); - } - default: - return result; - } - } } - - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); } From f4799a0000639f662c3df823240c2e27d0734b10 Mon Sep 17 00:00:00 2001 From: openinx Date: Wed, 24 Jun 2020 11:53:49 +0800 Subject: [PATCH 4/8] Revert the generic type in GenericParquetReaders#buildReader --- .../data/parquet/GenericParquetReaders.java | 14 +++++++------- .../iceberg/flink/data/FlinkParquetReaders.java | 6 +++--- .../iceberg/flink/data/FlinkParquetWriters.java | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 0900c7027fdc..1910743b1dd8 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -65,21 +65,21 @@ public class GenericParquetReaders { private GenericParquetReaders() { } - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema) { return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); } @SuppressWarnings("unchecked") - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema, - Map idToConstant) { + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant) { if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, new ReadBuilder(fileSchema, idToConstant)); } else { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, new FallbackReadBuilder(fileSchema, idToConstant)); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 898909a26f02..749da9100eca 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -73,9 +73,9 @@ private ReadBuilder(MessageType type, Map idToConstant) { } @Override - protected ParquetValueReaders.StructReader createStructReader(List types, - List> readers, - Types.StructType struct) { + protected ParquetValueReaders.StructReader createStructReader(List types, + List> readers, + Types.StructType struct) { return new RowReader(types, readers, struct); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 190092ff9c98..2d6e469b614d 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -43,7 +43,7 @@ private WriteBuilder(MessageType type) { } @Override - protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { + protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { return new RowWriter(writers); } } From 0350b9b9ce9c717bccbae03256b0ab792bbb710e Mon Sep 17 00:00:00 2001 From: openinx Date: Wed, 24 Jun 2020 15:12:07 +0800 Subject: [PATCH 5/8] Add test suits for DictionaryEncodedData and FallbackData --- .../org/apache/iceberg/util/RandomUtil.java | 37 ++++++++ .../iceberg/data/RandomGenericData.java | 15 +++- .../apache/iceberg/flink/data/RandomData.java | 85 +++++++++++++++++-- .../data/TestFlinkParquetReaderWriter.java | 36 +++++--- .../apache/iceberg/spark/data/RandomData.java | 42 +-------- 5 files changed, 153 insertions(+), 62 deletions(-) diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index d621c6bdb0b4..a9254cfefe7b 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.util.Arrays; import java.util.Random; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -147,6 +148,42 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, } } + public static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType primitive, Random random) { + int value = random.nextInt(3); + switch (primitive.typeId()) { + case BOOLEAN: + return true; // doesn't really matter for booleans since they are not dictionary encoded + case INTEGER: + case DATE: + return value; + case FLOAT: + return (float) value; + case DOUBLE: + return (double) value; + case LONG: + case TIME: + case TIMESTAMP: + return (long) value; + case STRING: + return String.valueOf(value); + case FIXED: + byte[] fixed = new byte[((Types.FixedType) primitive).length()]; + Arrays.fill(fixed, (byte) value); + return fixed; + case BINARY: + byte[] binary = new byte[value + 1]; + Arrays.fill(binary, (byte) value); + return binary; + case DECIMAL: + Types.DecimalType type = (Types.DecimalType) primitive; + BigInteger unscaled = new BigInteger(String.valueOf(value + 1)); + return new BigDecimal(unscaled, type.scale()); + default: + throw new IllegalArgumentException( + "Cannot generate random value for unknown type: " + primitive); + } + } + private static final long FIFTY_YEARS_IN_MICROS = (50L * (365 * 3 + 366) * 24 * 60 * 60 * 1_000_000) / 4; private static final int ABOUT_380_YEARS_IN_DAYS = 380 * 365; diff --git a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java index c8ad61426be5..095e0c204e28 100644 --- a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java @@ -80,11 +80,16 @@ public Record struct(Types.StructType struct, Iterable fieldResults) { public abstract static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { private final Random random; + private static final int MAX_ENTRIES = 20; protected RandomDataGenerator(long seed) { this.random = new Random(seed); } + protected int getMaxEntries() { + return MAX_ENTRIES; + } + @Override public abstract T schema(Schema schema, Supplier structResult); @@ -102,7 +107,7 @@ public Object field(Types.NestedField field, Supplier fieldResult) { @Override public Object list(Types.ListType list, Supplier elementResult) { - int numElements = random.nextInt(20); + int numElements = random.nextInt(getMaxEntries()); List result = Lists.newArrayListWithExpectedSize(numElements); for (int i = 0; i < numElements; i += 1) { @@ -119,7 +124,7 @@ public Object list(Types.ListType list, Supplier elementResult) { @Override public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { - int numEntries = random.nextInt(20); + int numEntries = random.nextInt(getMaxEntries()); Map result = Maps.newLinkedHashMap(); Supplier keyFunc; @@ -152,7 +157,7 @@ public Object map(Types.MapType map, Supplier keyResult, Supplier generate(Schema schema, int numRecords, long seed) { - RandomRowDataGenerator generator = new RandomRowDataGenerator(seed); - List rows = Lists.newArrayListWithExpectedSize(numRecords); - for (int i = 0; i < numRecords; i += 1) { - rows.add((Row) TypeUtil.visit(schema, generator)); - } - return rows; + private static Iterable generateData(Schema schema, int numRecords, Supplier supplier) { + return () -> new Iterator() { + private final RandomRowDataGenerator generator = supplier.get(); + private int count = 0; + + @Override + public boolean hasNext() { + return count < numRecords; + } + + @Override + public Row next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + ++count; + return (Row) TypeUtil.visit(schema, generator); + } + }; + } + + static Iterable generate(Schema schema, int numRecords, long seed) { + return generateData(schema, numRecords, () -> new RandomRowDataGenerator(seed)); + } + + static Iterable generateFallbackData(Schema schema, int numRecords, long seed, long numDictRows) { + return generateData(schema, numRecords, () -> new FallbackDataGenerator(seed, numDictRows)); + } + + static Iterable generateDictionaryEncodableData(Schema schema, int numRecords, long seed) { + return generateData(schema, numRecords, () -> new DictionaryEncodedDataGenerator(seed)); } private static class RandomRowDataGenerator extends RandomGenericData.RandomDataGenerator { - private RandomRowDataGenerator(long seed) { + RandomRowDataGenerator(long seed) { super(seed); } @@ -64,4 +93,44 @@ public Row struct(Types.StructType struct, Iterable fieldResults) { return row; } } + + private static class DictionaryEncodedDataGenerator extends RandomRowDataGenerator { + DictionaryEncodedDataGenerator(long seed) { + super(seed); + } + + @Override + protected int getMaxEntries() { + // Here we limited the max entries in LIST or MAP to be 3, because we have the mechanism to duplicate + // the keys in RandomDataGenerator#map while the dictionary encoder will generate a string with + // limited values("0","1","2"). It's impossible for us to request the generator to generate more than 3 keys, + // otherwise we will get in a infinite loop in RandomDataGenerator#map. + return 3; + } + + @Override + protected Object randomValue(Type.PrimitiveType primitive, Random random) { + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, random); + } + } + + private static class FallbackDataGenerator extends RandomRowDataGenerator { + private final long dictionaryEncodedRows; + private long rowCount = 0; + + FallbackDataGenerator(long seed, long numDictionaryEncoded) { + super(seed); + this.dictionaryEncodedRows = numDictionaryEncoded; + } + + @Override + protected Object randomValue(Type.PrimitiveType primitive, Random rand) { + this.rowCount += 1; + if (rowCount > dictionaryEncodedRows) { + return RandomUtil.generatePrimitive(primitive, rand); + } else { + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, rand); + } + } + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java index 0a1918a0618a..31b419f27914 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java @@ -38,6 +38,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; public class TestFlinkParquetReaderWriter { + private static final int NUM_RECORDS = 1_000_000; @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -69,32 +70,45 @@ public class TestFlinkParquetReaderWriter { optional(2, "slide", Types.StringType.get()) ); - @Test - public void testCorrectness() throws IOException { - int numRows = 2500; - Iterable records = RandomData.generate(COMPLEX_SCHEMA, numRows, 19981); - + private void testCorrectness(Schema schema, int numRecords, Iterable iterable) throws IOException { File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) - .schema(COMPLEX_SCHEMA) + .schema(schema) .createWriterFunc(FlinkParquetWriters::buildWriter) .build()) { - writer.addAll(records); + writer.addAll(iterable); } try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) - .project(COMPLEX_SCHEMA) - .createReaderFunc(type -> FlinkParquetReaders.buildReader(COMPLEX_SCHEMA, type)) + .project(schema) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) .build()) { - Iterator expected = records.iterator(); + Iterator expected = iterable.iterator(); Iterator rows = reader.iterator(); - for (int i = 0; i < numRows; i += 1) { + for (int i = 0; i < numRecords; i += 1) { Assert.assertTrue("Should have expected number of rows", rows.hasNext()); Assert.assertEquals(expected.next(), rows.next()); } Assert.assertFalse("Should not have extra rows", rows.hasNext()); } } + + @Test + public void testNormalRowData() throws IOException { + testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, RandomData.generate(COMPLEX_SCHEMA, NUM_RECORDS, 19981)); + } + + @Test + public void testDictionaryEncodedData() throws IOException { + testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, + RandomData.generateDictionaryEncodableData(COMPLEX_SCHEMA, NUM_RECORDS, 21124)); + } + + @Test + public void testFallbackData() throws IOException { + testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, + RandomData.generateFallbackData(COMPLEX_SCHEMA, NUM_RECORDS, 21124, NUM_RECORDS / 20)); + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java index f99c0fccb89c..0336759ca7e4 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java @@ -20,9 +20,7 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; -import java.math.BigInteger; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -331,42 +329,6 @@ public Object primitive(Type.PrimitiveType primitive) { } } - private static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType primitive, Random random) { - int value = random.nextInt(3); - switch (primitive.typeId()) { - case BOOLEAN: - return true; // doesn't really matter for booleans since they are not dictionary encoded - case INTEGER: - case DATE: - return value; - case FLOAT: - return (float) value; - case DOUBLE: - return (double) value; - case LONG: - case TIME: - case TIMESTAMP: - return (long) value; - case STRING: - return String.valueOf(value); - case FIXED: - byte[] fixed = new byte[((Types.FixedType) primitive).length()]; - Arrays.fill(fixed, (byte) value); - return fixed; - case BINARY: - byte[] binary = new byte[value + 1]; - Arrays.fill(binary, (byte) value); - return binary; - case DECIMAL: - Types.DecimalType type = (Types.DecimalType) primitive; - BigInteger unscaled = new BigInteger(String.valueOf(value + 1)); - return new BigDecimal(unscaled, type.scale()); - default: - throw new IllegalArgumentException( - "Cannot generate random value for unknown type: " + primitive); - } - } - private static class DictionaryEncodedDataGenerator extends RandomDataGenerator { private DictionaryEncodedDataGenerator(Schema schema, long seed, float nullPercentage) { super(schema, seed, nullPercentage); @@ -374,7 +336,7 @@ private DictionaryEncodedDataGenerator(Schema schema, long seed, float nullPerce @Override protected Object randomValue(Type.PrimitiveType primitive, Random random) { - return generateDictionaryEncodablePrimitive(primitive, random); + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, random); } } @@ -393,7 +355,7 @@ protected Object randomValue(Type.PrimitiveType primitive, Random rand) { if (rowCount > dictionaryEncodedRows) { return RandomUtil.generatePrimitive(primitive, rand); } else { - return generateDictionaryEncodablePrimitive(primitive, rand); + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, rand); } } } From 17845b17870ae19d260ba93410206748fc8c6eb7 Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 25 Jun 2020 22:31:16 +0800 Subject: [PATCH 6/8] Revert to user the StructWriterFactory & StructReaderFactory --- .../data/parquet/GenericParquetReaders.java | 60 +++++++++++-------- .../data/parquet/GenericParquetWriter.java | 25 +++++--- .../flink/data/FlinkParquetReaders.java | 42 +------------ .../flink/data/FlinkParquetWriters.java | 16 +---- 4 files changed, 53 insertions(+), 90 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 1910743b1dd8..31622248a63c 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; @@ -67,27 +66,34 @@ private GenericParquetReaders() { public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { - return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); + return buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RecordReader::new); } - @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema, Map idToConstant) { + return buildReader(expectedSchema, fileSchema, idToConstant, RecordReader::new); + } + + @SuppressWarnings("unchecked") + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant, + StructReaderFactory structReaderFactory) { if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema, idToConstant)); + new ReadBuilder(fileSchema, idToConstant, structReaderFactory)); } else { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema, idToConstant)); + new FallbackReadBuilder(fileSchema, idToConstant, structReaderFactory)); } } - public static class FallbackReadBuilder extends ReadBuilder { - protected FallbackReadBuilder(MessageType type, Map idToConstant) { - super(type, idToConstant); + private static class FallbackReadBuilder extends ReadBuilder { + FallbackReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { + super(type, idToConstant, structReaderFactory); } @Override @@ -112,17 +118,19 @@ public ParquetValueReader struct(StructType expected, GroupType struct, types.add(fieldType); } - return createStructReader(types, newFields, expected); + return structReaderFactory().create(types, newFields, expected); } } - public static class ReadBuilder extends TypeWithSchemaVisitor> { + private static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; + private final StructReaderFactory structReaderFactory; - protected ReadBuilder(MessageType type, Map idToConstant) { + ReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { this.type = type; this.idToConstant = idToConstant; + this.structReaderFactory = structReaderFactory; } @Override @@ -131,12 +139,6 @@ public ParquetValueReader message(StructType expected, MessageType message, return struct(expected, message.asGroupType(), fieldReaders); } - protected StructReader createStructReader(List types, - List> readers, - StructType struct) { - return new RecordReader(types, readers, struct); - } - @Override public ParquetValueReader struct(StructType expected, GroupType struct, List> fieldReaders) { @@ -175,7 +177,7 @@ public ParquetValueReader struct(StructType expected, GroupType struct, } } - return createStructReader(types, reorderedFields, expected); + return structReaderFactory.create(types, reorderedFields, expected); } @Override @@ -305,6 +307,10 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy MessageType type() { return type; } + + StructReaderFactory structReaderFactory() { + return structReaderFactory; + } } private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); @@ -403,7 +409,11 @@ public byte[] read(byte[] reuse) { } } - static class RecordReader extends StructReader { + public interface StructReaderFactory { + StructReader create(List types, List> readers, StructType struct); + } + + static class RecordReader extends StructReader { private final StructType structType; RecordReader(List types, @@ -414,7 +424,7 @@ static class RecordReader extends StructReader { } @Override - protected Record newStructData(Record reuse) { + protected GenericRecord newStructData(GenericRecord reuse) { if (reuse != null) { return reuse; } else { @@ -423,17 +433,17 @@ protected Record newStructData(Record reuse) { } @Override - protected Object getField(Record intermediate, int pos) { + protected Object getField(GenericRecord intermediate, int pos) { return intermediate.get(pos); } @Override - protected Record buildStruct(Record struct) { + protected GenericRecord buildStruct(GenericRecord struct) { return struct; } @Override - protected void set(Record struct, int pos, Object value) { + protected void set(GenericRecord struct, int pos, Object value) { struct.set(pos, value); } } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index e8c36fb76efd..9a3e5b43c2fc 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -49,16 +49,22 @@ public class GenericParquetWriter { private GenericParquetWriter() { } - @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return buildWriter(type, RecordWriter::new); } - public static class WriteBuilder extends ParquetTypeVisitor> { + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildWriter(MessageType type, StructWriterFactory structWriterFactory) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type, structWriterFactory)); + } + + private static class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; + private final StructWriterFactory structWriterFactory; - protected WriteBuilder(MessageType type) { + WriteBuilder(MessageType type, StructWriterFactory structWriterFactory) { this.type = type; + this.structWriterFactory = structWriterFactory; } @Override @@ -67,10 +73,6 @@ public ParquetValueWriter message(MessageType message, return struct(message.asGroupType(), fieldWriters); } - protected StructWriter createStructWriter(List> writers) { - return new RecordWriter(writers); - } - @Override public ParquetValueWriter struct(GroupType struct, List> fieldWriters) { @@ -82,7 +84,7 @@ public ParquetValueWriter struct(GroupType struct, writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return createStructWriter(writers); + return structWriterFactory.create(writers); } @Override @@ -288,6 +290,11 @@ public void write(int repetitionLevel, byte[] value) { } } + public interface StructWriterFactory { + + StructWriter create(List> writers); + } + private static class RecordWriter extends StructWriter { private RecordWriter(List> writers) { super(writers); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 749da9100eca..c1862cdf8984 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -20,14 +20,11 @@ package org.apache.iceberg.flink.data; import java.util.List; -import java.util.Map; import org.apache.flink.types.Row; import org.apache.iceberg.Schema; import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; -import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.apache.parquet.schema.MessageType; @@ -38,46 +35,9 @@ private FlinkParquetReaders() { } - @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { - if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema, ImmutableMap.of())); - } else { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema, ImmutableMap.of())); - } - } - - private static class FallbackReadBuilder extends GenericParquetReaders.FallbackReadBuilder { - - private FallbackReadBuilder(MessageType type, Map idToConstant) { - super(type, idToConstant); - } - - @Override - protected ParquetValueReaders.StructReader createStructReader(List types, - List> readers, - Types.StructType struct) { - return new RowReader(types, readers, struct); - } - } - - private static class ReadBuilder extends GenericParquetReaders.ReadBuilder { - - private ReadBuilder(MessageType type, Map idToConstant) { - super(type, idToConstant); - } - - @Override - protected ParquetValueReaders.StructReader createStructReader(List types, - List> readers, - Types.StructType struct) { - return new RowReader(types, readers, struct); - } + return GenericParquetReaders.buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RowReader::new); } static class RowReader extends ParquetValueReaders.StructReader { diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 2d6e469b614d..86b46e36a386 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.flink.types.Row; import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.parquet.schema.MessageType; @@ -31,21 +30,8 @@ public class FlinkParquetWriters { private FlinkParquetWriters() { } - @SuppressWarnings("unchecked") public static ParquetValueWriter buildWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); - } - - private static class WriteBuilder extends GenericParquetWriter.WriteBuilder { - - private WriteBuilder(MessageType type) { - super(type); - } - - @Override - protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { - return new RowWriter(writers); - } + return GenericParquetWriter.buildWriter(type, RowWriter::new); } private static class RowWriter extends ParquetValueWriters.StructWriter { From 95e197a8544dd32597da0f4bd0b84fa41f6664c6 Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 25 Jun 2020 22:56:40 +0800 Subject: [PATCH 7/8] Use the Record instead of GenericRecord --- .../data/parquet/GenericParquetReaders.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 31622248a63c..12e422bd511e 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; @@ -64,14 +65,14 @@ public class GenericParquetReaders { private GenericParquetReaders() { } - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema) { return buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RecordReader::new); } - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema, - Map idToConstant) { + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant) { return buildReader(expectedSchema, fileSchema, idToConstant, RecordReader::new); } @@ -92,7 +93,7 @@ public static ParquetValueReader buildReader(Schema expectedSchema, } private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { + FallbackReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { super(type, idToConstant, structReaderFactory); } @@ -413,7 +414,7 @@ public interface StructReaderFactory { StructReader create(List types, List> readers, StructType struct); } - static class RecordReader extends StructReader { + static class RecordReader extends StructReader { private final StructType structType; RecordReader(List types, @@ -424,7 +425,7 @@ static class RecordReader extends StructReader { } @Override - protected GenericRecord newStructData(GenericRecord reuse) { + protected Record newStructData(Record reuse) { if (reuse != null) { return reuse; } else { @@ -433,17 +434,17 @@ protected GenericRecord newStructData(GenericRecord reuse) { } @Override - protected Object getField(GenericRecord intermediate, int pos) { + protected Object getField(Record intermediate, int pos) { return intermediate.get(pos); } @Override - protected GenericRecord buildStruct(GenericRecord struct) { + protected Record buildStruct(Record struct) { return struct; } @Override - protected void set(GenericRecord struct, int pos, Object value) { + protected void set(Record struct, int pos, Object value) { struct.set(pos, value); } } From 5bffceda9758076f67236011d4ba4e560b92590c Mon Sep 17 00:00:00 2001 From: openinx Date: Sat, 27 Jun 2020 10:35:47 +0800 Subject: [PATCH 8/8] Make the flink parquet reader/write inherit the generic parquet reader writer. --- .../data/parquet/GenericParquetReaders.java | 51 ++++++++----------- .../data/parquet/GenericParquetWriter.java | 27 ++++------ .../iceberg/data/RandomGenericData.java | 6 +-- .../flink/data/FlinkParquetReaders.java | 47 +++++++++++++++-- .../flink/data/FlinkParquetWriters.java | 20 ++++++-- .../apache/iceberg/flink/data/RandomData.java | 22 ++++---- .../data/TestFlinkParquetReaderWriter.java | 6 +-- 7 files changed, 107 insertions(+), 72 deletions(-) diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 12e422bd511e..3e4e7c779597 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -62,39 +62,32 @@ import org.apache.parquet.schema.Type; public class GenericParquetReaders { - private GenericParquetReaders() { + protected GenericParquetReaders() { } public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema) { - return buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RecordReader::new); + return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); } + @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema, Map idToConstant) { - return buildReader(expectedSchema, fileSchema, idToConstant, RecordReader::new); - } - - @SuppressWarnings("unchecked") - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema, - Map idToConstant, - StructReaderFactory structReaderFactory) { if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema, idToConstant, structReaderFactory)); + new ReadBuilder(fileSchema, idToConstant)); } else { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema, idToConstant, structReaderFactory)); + new FallbackReadBuilder(fileSchema, idToConstant)); } } - private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { - super(type, idToConstant, structReaderFactory); + protected static class FallbackReadBuilder extends ReadBuilder { + protected FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); } @Override @@ -119,19 +112,17 @@ public ParquetValueReader struct(StructType expected, GroupType struct, types.add(fieldType); } - return structReaderFactory().create(types, newFields, expected); + return createStructReader(types, newFields, expected); } } - private static class ReadBuilder extends TypeWithSchemaVisitor> { + protected static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; - private final StructReaderFactory structReaderFactory; - ReadBuilder(MessageType type, Map idToConstant, StructReaderFactory structReaderFactory) { + protected ReadBuilder(MessageType type, Map idToConstant) { this.type = type; this.idToConstant = idToConstant; - this.structReaderFactory = structReaderFactory; } @Override @@ -140,6 +131,12 @@ public ParquetValueReader message(StructType expected, MessageType message, return struct(expected, message.asGroupType(), fieldReaders); } + protected StructReader createStructReader(List types, + List> readers, + StructType struct) { + return new RecordReader(types, readers, struct); + } + @Override public ParquetValueReader struct(StructType expected, GroupType struct, List> fieldReaders) { @@ -178,7 +175,7 @@ public ParquetValueReader struct(StructType expected, GroupType struct, } } - return structReaderFactory.create(types, reorderedFields, expected); + return createStructReader(types, reorderedFields, expected); } @Override @@ -308,10 +305,6 @@ public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveTy MessageType type() { return type; } - - StructReaderFactory structReaderFactory() { - return structReaderFactory; - } } private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); @@ -410,10 +403,6 @@ public byte[] read(byte[] reuse) { } } - public interface StructReaderFactory { - StructReader create(List types, List> readers, StructType struct); - } - static class RecordReader extends StructReader { private final StructType structType; diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index 9a3e5b43c2fc..4cdb2dee1d21 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -46,25 +46,19 @@ import org.apache.parquet.schema.Type; public class GenericParquetWriter { - private GenericParquetWriter() { - } - - public static ParquetValueWriter buildWriter(MessageType type) { - return buildWriter(type, RecordWriter::new); + protected GenericParquetWriter() { } @SuppressWarnings("unchecked") - public static ParquetValueWriter buildWriter(MessageType type, StructWriterFactory structWriterFactory) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type, structWriterFactory)); + public static ParquetValueWriter buildWriter(MessageType type) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); } - private static class WriteBuilder extends ParquetTypeVisitor> { + protected static class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; - private final StructWriterFactory structWriterFactory; - WriteBuilder(MessageType type, StructWriterFactory structWriterFactory) { + protected WriteBuilder(MessageType type) { this.type = type; - this.structWriterFactory = structWriterFactory; } @Override @@ -73,6 +67,10 @@ public ParquetValueWriter message(MessageType message, return struct(message.asGroupType(), fieldWriters); } + protected StructWriter createStructWriter(List> writers) { + return new RecordWriter(writers); + } + @Override public ParquetValueWriter struct(GroupType struct, List> fieldWriters) { @@ -84,7 +82,7 @@ public ParquetValueWriter struct(GroupType struct, writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return structWriterFactory.create(writers); + return createStructWriter(writers); } @Override @@ -290,11 +288,6 @@ public void write(int repetitionLevel, byte[] value) { } } - public interface StructWriterFactory { - - StructWriter create(List> writers); - } - private static class RecordWriter extends StructWriter { private RecordWriter(List> writers) { super(writers); diff --git a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java index 095e0c204e28..19179c0b1de4 100644 --- a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java @@ -46,7 +46,7 @@ public class RandomGenericData { private RandomGenericData() {} public static List generate(Schema schema, int numRecords, long seed) { - RandomRecordDataGenerator generator = new RandomRecordDataGenerator(seed); + RandomRecordGenerator generator = new RandomRecordGenerator(seed); List records = Lists.newArrayListWithExpectedSize(numRecords); for (int i = 0; i < numRecords; i += 1) { records.add((Record) TypeUtil.visit(schema, generator)); @@ -55,8 +55,8 @@ public static List generate(Schema schema, int numRecords, long seed) { return records; } - private static class RandomRecordDataGenerator extends RandomDataGenerator { - private RandomRecordDataGenerator(long seed) { + private static class RandomRecordGenerator extends RandomDataGenerator { + private RandomRecordGenerator(long seed) { super(seed); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index c1862cdf8984..e9d9bc08b5ef 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -20,24 +20,63 @@ package org.apache.iceberg.flink.data; import java.util.List; +import java.util.Map; import org.apache.flink.types.Row; import org.apache.iceberg.Schema; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; -public class FlinkParquetReaders { +public class FlinkParquetReaders extends GenericParquetReaders { private FlinkParquetReaders() { + } + + @SuppressWarnings("unchecked") + public static ParquetValueReader buildRowReader(Schema expectedSchema, + MessageType fileSchema) { + if (ParquetSchemaUtil.hasIds(fileSchema)) { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new ReadBuilder(fileSchema, ImmutableMap.of())); + } else { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new FallbackReadBuilder(fileSchema, ImmutableMap.of())); + } + } + + private static class FallbackReadBuilder extends GenericParquetReaders.FallbackReadBuilder { + + private FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + @Override + protected ParquetValueReaders.StructReader createStructReader(List types, + List> readers, + Types.StructType struct) { + return new RowReader(types, readers, struct); + } } - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { - return GenericParquetReaders.buildReader(expectedSchema, fileSchema, ImmutableMap.of(), RowReader::new); + private static class ReadBuilder extends GenericParquetReaders.ReadBuilder { + + private ReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + + @Override + protected ParquetValueReaders.StructReader createStructReader(List types, + List> readers, + Types.StructType struct) { + return new RowReader(types, readers, struct); + } } static class RowReader extends ParquetValueReaders.StructReader { diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 86b46e36a386..2e36b4ac624f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -22,16 +22,30 @@ import java.util.List; import org.apache.flink.types.Row; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.parquet.schema.MessageType; -public class FlinkParquetWriters { +public class FlinkParquetWriters extends GenericParquetWriter { private FlinkParquetWriters() { } - public static ParquetValueWriter buildWriter(MessageType type) { - return GenericParquetWriter.buildWriter(type, RowWriter::new); + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildRowWriter(MessageType type) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + } + + private static class WriteBuilder extends GenericParquetWriter.WriteBuilder { + + private WriteBuilder(MessageType type) { + super(type); + } + + @Override + protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { + return new RowWriter(writers); + } } private static class RowWriter extends ParquetValueWriters.StructWriter { diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java index a44162d2f5cb..cdf667bdf61b 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java @@ -37,9 +37,9 @@ class RandomData { private RandomData() { } - private static Iterable generateData(Schema schema, int numRecords, Supplier supplier) { + private static Iterable generateData(Schema schema, int numRecords, Supplier supplier) { return () -> new Iterator() { - private final RandomRowDataGenerator generator = supplier.get(); + private final RandomRowGenerator generator = supplier.get(); private int count = 0; @Override @@ -59,20 +59,20 @@ public Row next() { } static Iterable generate(Schema schema, int numRecords, long seed) { - return generateData(schema, numRecords, () -> new RandomRowDataGenerator(seed)); + return generateData(schema, numRecords, () -> new RandomRowGenerator(seed)); } static Iterable generateFallbackData(Schema schema, int numRecords, long seed, long numDictRows) { - return generateData(schema, numRecords, () -> new FallbackDataGenerator(seed, numDictRows)); + return generateData(schema, numRecords, () -> new FallbackGenerator(seed, numDictRows)); } static Iterable generateDictionaryEncodableData(Schema schema, int numRecords, long seed) { - return generateData(schema, numRecords, () -> new DictionaryEncodedDataGenerator(seed)); + return generateData(schema, numRecords, () -> new DictionaryEncodedGenerator(seed)); } - private static class RandomRowDataGenerator extends RandomGenericData.RandomDataGenerator { + private static class RandomRowGenerator extends RandomGenericData.RandomDataGenerator { - RandomRowDataGenerator(long seed) { + RandomRowGenerator(long seed) { super(seed); } @@ -94,8 +94,8 @@ public Row struct(Types.StructType struct, Iterable fieldResults) { } } - private static class DictionaryEncodedDataGenerator extends RandomRowDataGenerator { - DictionaryEncodedDataGenerator(long seed) { + private static class DictionaryEncodedGenerator extends RandomRowGenerator { + DictionaryEncodedGenerator(long seed) { super(seed); } @@ -114,11 +114,11 @@ protected Object randomValue(Type.PrimitiveType primitive, Random random) { } } - private static class FallbackDataGenerator extends RandomRowDataGenerator { + private static class FallbackGenerator extends RandomRowGenerator { private final long dictionaryEncodedRows; private long rowCount = 0; - FallbackDataGenerator(long seed, long numDictionaryEncoded) { + FallbackGenerator(long seed, long numDictionaryEncoded) { super(seed); this.dictionaryEncodedRows = numDictionaryEncoded; } diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java index 31b419f27914..f8bf6a53e514 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java @@ -38,7 +38,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; public class TestFlinkParquetReaderWriter { - private static final int NUM_RECORDS = 1_000_000; + private static final int NUM_RECORDS = 20_000; @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -76,14 +76,14 @@ private void testCorrectness(Schema schema, int numRecords, Iterable iterab try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) .schema(schema) - .createWriterFunc(FlinkParquetWriters::buildWriter) + .createWriterFunc(FlinkParquetWriters::buildRowWriter) .build()) { writer.addAll(iterable); } try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) .project(schema) - .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) + .createReaderFunc(type -> FlinkParquetReaders.buildRowReader(schema, type)) .build()) { Iterator expected = iterable.iterator(); Iterator rows = reader.iterator();