diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index d621c6bdb0b4..a9254cfefe7b 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.util.Arrays; import java.util.Random; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -147,6 +148,42 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, } } + public static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType primitive, Random random) { + int value = random.nextInt(3); + switch (primitive.typeId()) { + case BOOLEAN: + return true; // doesn't really matter for booleans since they are not dictionary encoded + case INTEGER: + case DATE: + return value; + case FLOAT: + return (float) value; + case DOUBLE: + return (double) value; + case LONG: + case TIME: + case TIMESTAMP: + return (long) value; + case STRING: + return String.valueOf(value); + case FIXED: + byte[] fixed = new byte[((Types.FixedType) primitive).length()]; + Arrays.fill(fixed, (byte) value); + return fixed; + case BINARY: + byte[] binary = new byte[value + 1]; + Arrays.fill(binary, (byte) value); + return binary; + case DECIMAL: + Types.DecimalType type = (Types.DecimalType) primitive; + BigInteger unscaled = new BigInteger(String.valueOf(value + 1)); + return new BigDecimal(unscaled, type.scale()); + default: + throw new IllegalArgumentException( + "Cannot generate random value for unknown type: " + primitive); + } + } + private static final long FIFTY_YEARS_IN_MICROS = (50L * (365 * 3 + 366) * 24 * 60 * 60 * 1_000_000) / 4; private static final int ABOUT_380_YEARS_IN_DAYS = 380 * 365; diff --git a/build.gradle b/build.gradle index 8086bce5023a..83a790485282 100644 --- a/build.gradle +++ b/build.gradle @@ -261,6 +261,9 @@ project(':iceberg-flink') { testCompile("org.apache.flink:flink-test-utils_2.12") { exclude group: "org.apache.curator", module: 'curator-test' } + + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') } } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 07419d57df8d..3e4e7c779597 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -62,30 +62,31 @@ import org.apache.parquet.schema.Type; public class GenericParquetReaders { - private GenericParquetReaders() { + protected GenericParquetReaders() { } - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema) { return buildReader(expectedSchema, fileSchema, ImmutableMap.of()); } + @SuppressWarnings("unchecked") - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema, - Map idToConstant) { + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema, + Map idToConstant) { if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, new ReadBuilder(fileSchema, idToConstant)); } else { - return (ParquetValueReader) + return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, new FallbackReadBuilder(fileSchema, idToConstant)); } } - private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type, Map idToConstant) { + protected static class FallbackReadBuilder extends ReadBuilder { + protected FallbackReadBuilder(MessageType type, Map idToConstant) { super(type, idToConstant); } @@ -111,15 +112,15 @@ public ParquetValueReader struct(StructType expected, GroupType struct, types.add(fieldType); } - return new RecordReader(types, newFields, expected); + return createStructReader(types, newFields, expected); } } - private static class ReadBuilder extends TypeWithSchemaVisitor> { + protected static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private final Map idToConstant; - ReadBuilder(MessageType type, Map idToConstant) { + protected ReadBuilder(MessageType type, Map idToConstant) { this.type = type; this.idToConstant = idToConstant; } @@ -130,6 +131,12 @@ public ParquetValueReader message(StructType expected, MessageType message, return struct(expected, message.asGroupType(), fieldReaders); } + protected StructReader createStructReader(List types, + List> readers, + StructType struct) { + return new RecordReader(types, readers, struct); + } + @Override public ParquetValueReader struct(StructType expected, GroupType struct, List> fieldReaders) { @@ -168,7 +175,7 @@ public ParquetValueReader struct(StructType expected, GroupType struct, } } - return new RecordReader(types, reorderedFields, expected); + return createStructReader(types, reorderedFields, expected); } @Override @@ -416,7 +423,6 @@ protected Record newStructData(Record reuse) { } @Override - @SuppressWarnings("unchecked") protected Object getField(Record intermediate, int pos) { return intermediate.get(pos); } diff --git a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index 0c998eaa4e70..4cdb2dee1d21 100644 --- a/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -46,18 +46,18 @@ import org.apache.parquet.schema.Type; public class GenericParquetWriter { - private GenericParquetWriter() { + protected GenericParquetWriter() { } @SuppressWarnings("unchecked") - public static ParquetValueWriter buildWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + public static ParquetValueWriter buildWriter(MessageType type) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); } - private static class WriteBuilder extends ParquetTypeVisitor> { + protected static class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; - WriteBuilder(MessageType type) { + protected WriteBuilder(MessageType type) { this.type = type; } @@ -67,6 +67,10 @@ public ParquetValueWriter message(MessageType message, return struct(message.asGroupType(), fieldWriters); } + protected StructWriter createStructWriter(List> writers) { + return new RecordWriter(writers); + } + @Override public ParquetValueWriter struct(GroupType struct, List> fieldWriters) { @@ -78,7 +82,7 @@ public ParquetValueWriter struct(GroupType struct, writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return new RecordWriter(writers); + return createStructWriter(writers); } @Override diff --git a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java index e97656ac8fc0..19179c0b1de4 100644 --- a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java @@ -46,7 +46,7 @@ public class RandomGenericData { private RandomGenericData() {} public static List generate(Schema schema, int numRecords, long seed) { - RandomDataGenerator generator = new RandomDataGenerator(seed); + RandomRecordGenerator generator = new RandomRecordGenerator(seed); List records = Lists.newArrayListWithExpectedSize(numRecords); for (int i = 0; i < numRecords; i += 1) { records.add((Record) TypeUtil.visit(schema, generator)); @@ -55,11 +55,9 @@ public static List generate(Schema schema, int numRecords, long seed) { return records; } - private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { - private final Random random; - - private RandomDataGenerator(long seed) { - this.random = new Random(seed); + private static class RandomRecordGenerator extends RandomDataGenerator { + private RandomRecordGenerator(long seed) { + super(seed); } @Override @@ -78,6 +76,25 @@ public Record struct(Types.StructType struct, Iterable fieldResults) { return rec; } + } + + public abstract static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { + private final Random random; + private static final int MAX_ENTRIES = 20; + + protected RandomDataGenerator(long seed) { + this.random = new Random(seed); + } + + protected int getMaxEntries() { + return MAX_ENTRIES; + } + + @Override + public abstract T schema(Schema schema, Supplier structResult); + + @Override + public abstract T struct(Types.StructType struct, Iterable fieldResults); @Override public Object field(Types.NestedField field, Supplier fieldResult) { @@ -90,7 +107,7 @@ public Object field(Types.NestedField field, Supplier fieldResult) { @Override public Object list(Types.ListType list, Supplier elementResult) { - int numElements = random.nextInt(20); + int numElements = random.nextInt(getMaxEntries()); List result = Lists.newArrayListWithExpectedSize(numElements); for (int i = 0; i < numElements; i += 1) { @@ -107,7 +124,7 @@ public Object list(Types.ListType list, Supplier elementResult) { @Override public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { - int numEntries = random.nextInt(20); + int numEntries = random.nextInt(getMaxEntries()); Map result = Maps.newLinkedHashMap(); Supplier keyFunc; @@ -140,7 +157,7 @@ public Object map(Types.MapType map, Supplier keyResult, Supplier buildRowReader(Schema expectedSchema, + MessageType fileSchema) { + if (ParquetSchemaUtil.hasIds(fileSchema)) { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new ReadBuilder(fileSchema, ImmutableMap.of())); + } else { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new FallbackReadBuilder(fileSchema, ImmutableMap.of())); + } + } + + private static class FallbackReadBuilder extends GenericParquetReaders.FallbackReadBuilder { + + private FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + + @Override + protected ParquetValueReaders.StructReader createStructReader(List types, + List> readers, + Types.StructType struct) { + return new RowReader(types, readers, struct); + } + } + + private static class ReadBuilder extends GenericParquetReaders.ReadBuilder { + + private ReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + + @Override + protected ParquetValueReaders.StructReader createStructReader(List types, + List> readers, + Types.StructType struct) { + return new RowReader(types, readers, struct); + } + } + + static class RowReader extends ParquetValueReaders.StructReader { + private final Types.StructType structType; + + RowReader(List types, List> readers, Types.StructType struct) { + super(types, readers); + this.structType = struct; + } + + @Override + protected Row newStructData(Row reuse) { + if (reuse != null) { + return reuse; + } else { + return new Row(structType.fields().size()); + } + } + + @Override + protected Object getField(Row row, int pos) { + return row.getField(pos); + } + + @Override + protected Row buildStruct(Row row) { + return row; + } + + @Override + protected void set(Row row, int pos, Object value) { + row.setField(pos, value); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java new file mode 100644 index 000000000000..2e36b4ac624f --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.List; +import org.apache.flink.types.Row; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.parquet.ParquetTypeVisitor; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.parquet.schema.MessageType; + +public class FlinkParquetWriters extends GenericParquetWriter { + private FlinkParquetWriters() { + } + + @SuppressWarnings("unchecked") + public static ParquetValueWriter buildRowWriter(MessageType type) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + } + + private static class WriteBuilder extends GenericParquetWriter.WriteBuilder { + + private WriteBuilder(MessageType type) { + super(type); + } + + @Override + protected ParquetValueWriters.StructWriter createStructWriter(List> writers) { + return new RowWriter(writers); + } + } + + private static class RowWriter extends ParquetValueWriters.StructWriter { + + private RowWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(Row row, int index) { + return row.getField(index); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java new file mode 100644 index 000000000000..cdf667bdf61b --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/RandomData.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.function.Supplier; +import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.RandomUtil; + +class RandomData { + private RandomData() { + } + + private static Iterable generateData(Schema schema, int numRecords, Supplier supplier) { + return () -> new Iterator() { + private final RandomRowGenerator generator = supplier.get(); + private int count = 0; + + @Override + public boolean hasNext() { + return count < numRecords; + } + + @Override + public Row next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + ++count; + return (Row) TypeUtil.visit(schema, generator); + } + }; + } + + static Iterable generate(Schema schema, int numRecords, long seed) { + return generateData(schema, numRecords, () -> new RandomRowGenerator(seed)); + } + + static Iterable generateFallbackData(Schema schema, int numRecords, long seed, long numDictRows) { + return generateData(schema, numRecords, () -> new FallbackGenerator(seed, numDictRows)); + } + + static Iterable generateDictionaryEncodableData(Schema schema, int numRecords, long seed) { + return generateData(schema, numRecords, () -> new DictionaryEncodedGenerator(seed)); + } + + private static class RandomRowGenerator extends RandomGenericData.RandomDataGenerator { + + RandomRowGenerator(long seed) { + super(seed); + } + + @Override + public Row schema(Schema schema, Supplier structResult) { + return (Row) structResult.get(); + } + + @Override + public Row struct(Types.StructType struct, Iterable fieldResults) { + Row row = new Row(struct.fields().size()); + + List values = Lists.newArrayList(fieldResults); + for (int i = 0; i < values.size(); i += 1) { + row.setField(i, values.get(i)); + } + + return row; + } + } + + private static class DictionaryEncodedGenerator extends RandomRowGenerator { + DictionaryEncodedGenerator(long seed) { + super(seed); + } + + @Override + protected int getMaxEntries() { + // Here we limited the max entries in LIST or MAP to be 3, because we have the mechanism to duplicate + // the keys in RandomDataGenerator#map while the dictionary encoder will generate a string with + // limited values("0","1","2"). It's impossible for us to request the generator to generate more than 3 keys, + // otherwise we will get in a infinite loop in RandomDataGenerator#map. + return 3; + } + + @Override + protected Object randomValue(Type.PrimitiveType primitive, Random random) { + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, random); + } + } + + private static class FallbackGenerator extends RandomRowGenerator { + private final long dictionaryEncodedRows; + private long rowCount = 0; + + FallbackGenerator(long seed, long numDictionaryEncoded) { + super(seed); + this.dictionaryEncodedRows = numDictionaryEncoded; + } + + @Override + protected Object randomValue(Type.PrimitiveType primitive, Random rand) { + this.rowCount += 1; + if (rowCount > dictionaryEncodedRows) { + return RandomUtil.generatePrimitive(primitive, rand); + } else { + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, rand); + } + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java new file mode 100644 index 000000000000..f8bf6a53e514 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReaderWriter.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.data; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import org.apache.flink.types.Row; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestFlinkParquetReaderWriter { + private static final int NUM_RECORDS = 20_000; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static final Schema COMPLEX_SCHEMA = new Schema( + required(1, "roots", Types.LongType.get()), + optional(3, "lime", Types.ListType.ofRequired(4, Types.DoubleType.get())), + required(5, "strict", Types.StructType.of( + required(9, "tangerine", Types.StringType.get()), + optional(6, "hopeful", Types.StructType.of( + required(7, "steel", Types.FloatType.get()), + required(8, "lantern", Types.DateType.get()) + )), + optional(10, "vehement", Types.LongType.get()) + )), + optional(11, "metamorphosis", Types.MapType.ofRequired(12, 13, + Types.StringType.get(), Types.TimestampType.withZone())), + required(14, "winter", Types.ListType.ofOptional(15, Types.StructType.of( + optional(16, "beet", Types.DoubleType.get()), + required(17, "stamp", Types.FloatType.get()), + optional(18, "wheeze", Types.StringType.get()) + ))), + optional(19, "renovate", Types.MapType.ofRequired(20, 21, + Types.StringType.get(), Types.StructType.of( + optional(22, "jumpy", Types.DoubleType.get()), + required(23, "koala", Types.IntegerType.get()), + required(24, "couch rope", Types.IntegerType.get()) + ))), + optional(2, "slide", Types.StringType.get()) + ); + + private void testCorrectness(Schema schema, int numRecords, Iterable iterable) throws IOException { + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .createWriterFunc(FlinkParquetWriters::buildRowWriter) + .build()) { + writer.addAll(iterable); + } + + try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) + .project(schema) + .createReaderFunc(type -> FlinkParquetReaders.buildRowReader(schema, type)) + .build()) { + Iterator expected = iterable.iterator(); + Iterator rows = reader.iterator(); + for (int i = 0; i < numRecords; i += 1) { + Assert.assertTrue("Should have expected number of rows", rows.hasNext()); + Assert.assertEquals(expected.next(), rows.next()); + } + Assert.assertFalse("Should not have extra rows", rows.hasNext()); + } + } + + @Test + public void testNormalRowData() throws IOException { + testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, RandomData.generate(COMPLEX_SCHEMA, NUM_RECORDS, 19981)); + } + + @Test + public void testDictionaryEncodedData() throws IOException { + testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, + RandomData.generateDictionaryEncodableData(COMPLEX_SCHEMA, NUM_RECORDS, 21124)); + } + + @Test + public void testFallbackData() throws IOException { + testCorrectness(COMPLEX_SCHEMA, NUM_RECORDS, + RandomData.generateFallbackData(COMPLEX_SCHEMA, NUM_RECORDS, 21124, NUM_RECORDS / 20)); + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java index f99c0fccb89c..0336759ca7e4 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java @@ -20,9 +20,7 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; -import java.math.BigInteger; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -331,42 +329,6 @@ public Object primitive(Type.PrimitiveType primitive) { } } - private static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType primitive, Random random) { - int value = random.nextInt(3); - switch (primitive.typeId()) { - case BOOLEAN: - return true; // doesn't really matter for booleans since they are not dictionary encoded - case INTEGER: - case DATE: - return value; - case FLOAT: - return (float) value; - case DOUBLE: - return (double) value; - case LONG: - case TIME: - case TIMESTAMP: - return (long) value; - case STRING: - return String.valueOf(value); - case FIXED: - byte[] fixed = new byte[((Types.FixedType) primitive).length()]; - Arrays.fill(fixed, (byte) value); - return fixed; - case BINARY: - byte[] binary = new byte[value + 1]; - Arrays.fill(binary, (byte) value); - return binary; - case DECIMAL: - Types.DecimalType type = (Types.DecimalType) primitive; - BigInteger unscaled = new BigInteger(String.valueOf(value + 1)); - return new BigDecimal(unscaled, type.scale()); - default: - throw new IllegalArgumentException( - "Cannot generate random value for unknown type: " + primitive); - } - } - private static class DictionaryEncodedDataGenerator extends RandomDataGenerator { private DictionaryEncodedDataGenerator(Schema schema, long seed, float nullPercentage) { super(schema, seed, nullPercentage); @@ -374,7 +336,7 @@ private DictionaryEncodedDataGenerator(Schema schema, long seed, float nullPerce @Override protected Object randomValue(Type.PrimitiveType primitive, Random random) { - return generateDictionaryEncodablePrimitive(primitive, random); + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, random); } } @@ -393,7 +355,7 @@ protected Object randomValue(Type.PrimitiveType primitive, Random rand) { if (rowCount > dictionaryEncodedRows) { return RandomUtil.generatePrimitive(primitive, rand); } else { - return generateDictionaryEncodablePrimitive(primitive, rand); + return RandomUtil.generateDictionaryEncodablePrimitive(primitive, rand); } } }