Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Random;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -147,6 +148,42 @@ public static Object generatePrimitive(Type.PrimitiveType primitive,
}
}

public static Object generateDictionaryEncodablePrimitive(Type.PrimitiveType primitive, Random random) {
int value = random.nextInt(3);
switch (primitive.typeId()) {
case BOOLEAN:
return true; // doesn't really matter for booleans since they are not dictionary encoded
case INTEGER:
case DATE:
return value;
case FLOAT:
return (float) value;
case DOUBLE:
return (double) value;
case LONG:
case TIME:
case TIMESTAMP:
return (long) value;
case STRING:
return String.valueOf(value);
case FIXED:
byte[] fixed = new byte[((Types.FixedType) primitive).length()];
Arrays.fill(fixed, (byte) value);
return fixed;
case BINARY:
byte[] binary = new byte[value + 1];
Arrays.fill(binary, (byte) value);
return binary;
case DECIMAL:
Types.DecimalType type = (Types.DecimalType) primitive;
BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
return new BigDecimal(unscaled, type.scale());
default:
throw new IllegalArgumentException(
"Cannot generate random value for unknown type: " + primitive);
}
}

private static final long FIFTY_YEARS_IN_MICROS =
(50L * (365 * 3 + 366) * 24 * 60 * 60 * 1_000_000) / 4;
private static final int ABOUT_380_YEARS_IN_DAYS = 380 * 365;
Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ project(':iceberg-flink') {
testCompile("org.apache.flink:flink-test-utils_2.12") {
exclude group: "org.apache.curator", module: 'curator-test'
}

testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,31 @@
import org.apache.parquet.schema.Type;

public class GenericParquetReaders {
private GenericParquetReaders() {
protected GenericParquetReaders() {
}

public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
MessageType fileSchema) {
public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
MessageType fileSchema) {
return buildReader(expectedSchema, fileSchema, ImmutableMap.of());
}

@SuppressWarnings("unchecked")
public static ParquetValueReader<GenericRecord> buildReader(Schema expectedSchema,
MessageType fileSchema,
Map<Integer, ?> idToConstant) {
public static ParquetValueReader<Record> buildReader(Schema expectedSchema,
MessageType fileSchema,
Map<Integer, ?> idToConstant) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<GenericRecord>)
return (ParquetValueReader<Record>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema, idToConstant));
} else {
return (ParquetValueReader<GenericRecord>)
return (ParquetValueReader<Record>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema, idToConstant));
}
}

private static class FallbackReadBuilder extends ReadBuilder {
FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
protected static class FallbackReadBuilder extends ReadBuilder {
protected FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
super(type, idToConstant);
}

Expand All @@ -111,15 +112,15 @@ public ParquetValueReader<?> struct(StructType expected, GroupType struct,
types.add(fieldType);
}

return new RecordReader(types, newFields, expected);
return createStructReader(types, newFields, expected);
}
}

private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
protected static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, ?> idToConstant;

ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
protected ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
this.type = type;
this.idToConstant = idToConstant;
}
Expand All @@ -130,6 +131,12 @@ public ParquetValueReader<?> message(StructType expected, MessageType message,
return struct(expected, message.asGroupType(), fieldReaders);
}

protected StructReader<?, ?> createStructReader(List<Type> types,
List<ParquetValueReader<?>> readers,
StructType struct) {
return new RecordReader(types, readers, struct);
}

@Override
public ParquetValueReader<?> struct(StructType expected, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
Expand Down Expand Up @@ -168,7 +175,7 @@ public ParquetValueReader<?> struct(StructType expected, GroupType struct,
}
}

return new RecordReader(types, reorderedFields, expected);
return createStructReader(types, reorderedFields, expected);
}

@Override
Expand Down Expand Up @@ -416,7 +423,6 @@ protected Record newStructData(Record reuse) {
}

@Override
@SuppressWarnings("unchecked")
protected Object getField(Record intermediate, int pos) {
return intermediate.get(pos);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@
import org.apache.parquet.schema.Type;

public class GenericParquetWriter {
private GenericParquetWriter() {
protected GenericParquetWriter() {
}

@SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(MessageType type) {
return (ParquetValueWriter<T>) ParquetTypeVisitor.visit(type, new WriteBuilder(type));
public static ParquetValueWriter<Record> buildWriter(MessageType type) {
return (ParquetValueWriter<Record>) ParquetTypeVisitor.visit(type, new WriteBuilder(type));
}

private static class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> {
protected static class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> {
private final MessageType type;

WriteBuilder(MessageType type) {
protected WriteBuilder(MessageType type) {
this.type = type;
}

Expand All @@ -67,6 +67,10 @@ public ParquetValueWriter<?> message(MessageType message,
return struct(message.asGroupType(), fieldWriters);
}

protected StructWriter<?> createStructWriter(List<ParquetValueWriter<?>> writers) {
return new RecordWriter(writers);
}

@Override
public ParquetValueWriter<?> struct(GroupType struct,
List<ParquetValueWriter<?>> fieldWriters) {
Expand All @@ -78,7 +82,7 @@ public ParquetValueWriter<?> struct(GroupType struct,
writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i)));
}

return new RecordWriter(writers);
return createStructWriter(writers);
}

@Override
Expand Down
39 changes: 30 additions & 9 deletions data/src/test/java/org/apache/iceberg/data/RandomGenericData.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class RandomGenericData {
private RandomGenericData() {}

public static List<Record> generate(Schema schema, int numRecords, long seed) {
RandomDataGenerator generator = new RandomDataGenerator(seed);
RandomRecordGenerator generator = new RandomRecordGenerator(seed);
List<Record> records = Lists.newArrayListWithExpectedSize(numRecords);
for (int i = 0; i < numRecords; i += 1) {
records.add((Record) TypeUtil.visit(schema, generator));
Expand All @@ -55,11 +55,9 @@ public static List<Record> generate(Schema schema, int numRecords, long seed) {
return records;
}

private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor<Object> {
private final Random random;

private RandomDataGenerator(long seed) {
this.random = new Random(seed);
private static class RandomRecordGenerator extends RandomDataGenerator<Record> {
private RandomRecordGenerator(long seed) {
super(seed);
}

@Override
Expand All @@ -78,6 +76,25 @@ public Record struct(Types.StructType struct, Iterable<Object> fieldResults) {

return rec;
}
}

public abstract static class RandomDataGenerator<T> extends TypeUtil.CustomOrderSchemaVisitor<Object> {
private final Random random;
private static final int MAX_ENTRIES = 20;

protected RandomDataGenerator(long seed) {
this.random = new Random(seed);
}

protected int getMaxEntries() {
return MAX_ENTRIES;
}

@Override
public abstract T schema(Schema schema, Supplier<Object> structResult);

@Override
public abstract T struct(Types.StructType struct, Iterable<Object> fieldResults);

@Override
public Object field(Types.NestedField field, Supplier<Object> fieldResult) {
Expand All @@ -90,7 +107,7 @@ public Object field(Types.NestedField field, Supplier<Object> fieldResult) {

@Override
public Object list(Types.ListType list, Supplier<Object> elementResult) {
int numElements = random.nextInt(20);
int numElements = random.nextInt(getMaxEntries());

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
for (int i = 0; i < numElements; i += 1) {
Expand All @@ -107,7 +124,7 @@ public Object list(Types.ListType list, Supplier<Object> elementResult) {

@Override
public Object map(Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
int numEntries = random.nextInt(20);
int numEntries = random.nextInt(getMaxEntries());

Map<Object, Object> result = Maps.newLinkedHashMap();
Supplier<Object> keyFunc;
Expand Down Expand Up @@ -140,7 +157,7 @@ public Object map(Types.MapType map, Supplier<Object> keyResult, Supplier<Object

@Override
public Object primitive(Type.PrimitiveType primitive) {
Object result = RandomUtil.generatePrimitive(primitive, random);
Object result = randomValue(primitive, random);
switch (primitive.typeId()) {
case BINARY:
return ByteBuffer.wrap((byte[]) result);
Expand All @@ -161,6 +178,10 @@ public Object primitive(Type.PrimitiveType primitive) {
return result;
}
}

protected Object randomValue(Type.PrimitiveType primitive, Random rand) {
return RandomUtil.generatePrimitive(primitive, rand);
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink.data;

import java.util.List;
import java.util.Map;
import org.apache.flink.types.Row;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

public class FlinkParquetReaders extends GenericParquetReaders {
private FlinkParquetReaders() {
}

@SuppressWarnings("unchecked")
public static ParquetValueReader<Row> buildRowReader(Schema expectedSchema,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the buildReader to buildRowReader because the parent buildReader will return with a ParquetValueReader <Record> data type, which clashes with this FlinkParquetReaders 's buildReader returned ParquetValueReader <Row>.

MessageType fileSchema) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<Row>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema, ImmutableMap.of()));
} else {
return (ParquetValueReader<Row>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema, ImmutableMap.of()));
}
}

private static class FallbackReadBuilder extends GenericParquetReaders.FallbackReadBuilder {

private FallbackReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
super(type, idToConstant);
}

@Override
protected ParquetValueReaders.StructReader<?, ?> createStructReader(List<Type> types,
List<ParquetValueReader<?>> readers,
Types.StructType struct) {
return new RowReader(types, readers, struct);
}
}

private static class ReadBuilder extends GenericParquetReaders.ReadBuilder {

private ReadBuilder(MessageType type, Map<Integer, ?> idToConstant) {
super(type, idToConstant);
}

@Override
protected ParquetValueReaders.StructReader<Row, Row> createStructReader(List<Type> types,
List<ParquetValueReader<?>> readers,
Types.StructType struct) {
return new RowReader(types, readers, struct);
}
}

static class RowReader extends ParquetValueReaders.StructReader<Row, Row> {
private final Types.StructType structType;

RowReader(List<Type> types, List<ParquetValueReader<?>> readers, Types.StructType struct) {
super(types, readers);
this.structType = struct;
}

@Override
protected Row newStructData(Row reuse) {
if (reuse != null) {
return reuse;
} else {
return new Row(structType.fields().size());
}
}

@Override
protected Object getField(Row row, int pos) {
return row.getField(pos);
}

@Override
protected Row buildStruct(Row row) {
return row;
}

@Override
protected void set(Row row, int pos, Object value) {
row.setField(pos, value);
}
}
}
Loading