diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index 75104cacd9a5..1639a44b2917 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -51,7 +51,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -public class FlinkParquetWriters { +class FlinkParquetWriters { private FlinkParquetWriters() { } @@ -122,7 +122,7 @@ private ParquetValueWriter newOption(org.apache.parquet.schema.Type fieldType } @Override - public ParquetValueWriter primitive(LogicalType sType, PrimitiveType primitive) { + public ParquetValueWriter primitive(LogicalType fType, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); if (primitive.getOriginalType() != null) { @@ -135,7 +135,7 @@ public ParquetValueWriter primitive(LogicalType sType, PrimitiveType primitiv case INT_8: case INT_16: case INT_32: - return ints(sType, desc); + return ints(fType, desc); case INT_64: return ParquetValueWriters.longs(desc); case TIME_MICROS: @@ -171,7 +171,7 @@ public ParquetValueWriter primitive(LogicalType sType, PrimitiveType primitiv case BOOLEAN: return ParquetValueWriters.booleans(desc); case INT32: - return ints(sType, desc); + return ints(fType, desc); case INT64: return ParquetValueWriters.longs(desc); case FLOAT: @@ -203,10 +203,14 @@ private static ParquetValueWriters.PrimitiveWriter timeMicros(ColumnDes private static ParquetValueWriters.PrimitiveWriter decimalAsInteger(ColumnDescriptor desc, int precision, int scale) { + Preconditions.checkArgument(precision <= 9, "Cannot write decimal value as integer with precision larger than 9," + + " wrong precision %s", precision); return new IntegerDecimalWriter(desc, precision, scale); } private static ParquetValueWriters.PrimitiveWriter decimalAsLong(ColumnDescriptor desc, int precision, int scale) { + Preconditions.checkArgument(precision <= 18, "Cannot write decimal value as long with precision larger than 18, " + + " wrong precision %s", precision); return new LongDecimalWriter(desc, precision, scale); } @@ -241,7 +245,7 @@ private TimeMicrosWriter(ColumnDescriptor desc) { @Override public void write(int repetitionLevel, Integer value) { - long micros = Long.valueOf(value) * 1000; + long micros = value.longValue() * 1000; column.writeLong(repetitionLevel, micros); } } @@ -346,11 +350,13 @@ protected Iterator elements(ArrayData list) { private class ElementIterator implements Iterator { private final int size; private final ArrayData list; + private final ArrayData.ElementGetter getter; private int index; private ElementIterator(ArrayData list) { this.list = list; size = list.size(); + getter = ArrayData.createElementGetter(elementType); index = 0; } @@ -366,13 +372,7 @@ public E next() { throw new NoSuchElementException(); } - E element; - if (list.isNullAt(index)) { - element = null; - } else { - element = (E) ArrayData.createElementGetter(elementType).getElementOrNull(list, index); - } - + E element = (E) getter.getElementOrNull(list, index); index += 1; return element; @@ -402,6 +402,8 @@ private class EntryIterator implements Iterator> { private final ArrayData keys; private final ArrayData values; private final ParquetValueReaders.ReusableEntry entry; + private final ArrayData.ElementGetter keyGetter; + private final ArrayData.ElementGetter valueGetter; private int index; private EntryIterator(MapData map) { @@ -409,6 +411,8 @@ private EntryIterator(MapData map) { keys = map.keyArray(); values = map.valueArray(); entry = new ParquetValueReaders.ReusableEntry<>(); + keyGetter = ArrayData.createElementGetter(keyType); + valueGetter = ArrayData.createElementGetter(valueType); index = 0; } @@ -424,13 +428,7 @@ public Map.Entry next() { throw new NoSuchElementException(); } - if (values.isNullAt(index)) { - entry.set((K) ArrayData.createElementGetter(keyType).getElementOrNull(keys, index), null); - } else { - entry.set((K) ArrayData.createElementGetter(keyType).getElementOrNull(keys, index), - (V) ArrayData.createElementGetter(valueType).getElementOrNull(values, index)); - } - + entry.set((K) keyGetter.getElementOrNull(keys, index), (V) valueGetter.getElementOrNull(values, index)); index += 1; return entry; @@ -439,16 +437,19 @@ public Map.Entry next() { } private static class RowDataWriter extends ParquetValueWriters.StructWriter { - private final List types; + private final RowData.FieldGetter[] fieldGetter; RowDataWriter(List> writers, List types) { super(writers); - this.types = types; + fieldGetter = new RowData.FieldGetter[types.size()]; + for (int i = 0; i < types.size(); i += 1) { + fieldGetter[i] = RowData.createFieldGetter(types.get(i), i); + } } @Override protected Object get(RowData struct, int index) { - return RowData.createFieldGetter(types.get(index), index).getFieldOrNull(struct); + return fieldGetter[index].getFieldOrNull(struct); } } }