Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class FlinkParquetWriters {
class FlinkParquetWriters {
private FlinkParquetWriters() {
}

Expand Down Expand Up @@ -122,7 +122,7 @@ private ParquetValueWriter<?> newOption(org.apache.parquet.schema.Type fieldType
}

@Override
public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType primitive) {
public ParquetValueWriter<?> primitive(LogicalType fType, PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
Expand All @@ -135,7 +135,7 @@ public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType primitiv
case INT_8:
case INT_16:
case INT_32:
return ints(sType, desc);
return ints(fType, desc);
case INT_64:
return ParquetValueWriters.longs(desc);
case TIME_MICROS:
Expand Down Expand Up @@ -171,7 +171,7 @@ public ParquetValueWriter<?> primitive(LogicalType sType, PrimitiveType primitiv
case BOOLEAN:
return ParquetValueWriters.booleans(desc);
case INT32:
return ints(sType, desc);
return ints(fType, desc);
case INT64:
return ParquetValueWriters.longs(desc);
case FLOAT:
Expand Down Expand Up @@ -203,10 +203,14 @@ private static ParquetValueWriters.PrimitiveWriter<Integer> timeMicros(ColumnDes

private static ParquetValueWriters.PrimitiveWriter<DecimalData> decimalAsInteger(ColumnDescriptor desc,
int precision, int scale) {
Preconditions.checkArgument(precision <= 9, "Cannot write decimal value as integer with precision larger than 9," +
" wrong precision %s", precision);
return new IntegerDecimalWriter(desc, precision, scale);
}
private static ParquetValueWriters.PrimitiveWriter<DecimalData> decimalAsLong(ColumnDescriptor desc,
int precision, int scale) {
Preconditions.checkArgument(precision <= 18, "Cannot write decimal value as long with precision larger than 18, " +
" wrong precision %s", precision);
return new LongDecimalWriter(desc, precision, scale);
}

Expand Down Expand Up @@ -241,7 +245,7 @@ private TimeMicrosWriter(ColumnDescriptor desc) {

@Override
public void write(int repetitionLevel, Integer value) {
long micros = Long.valueOf(value) * 1000;
long micros = value.longValue() * 1000;
column.writeLong(repetitionLevel, micros);
}
}
Expand Down Expand Up @@ -346,11 +350,13 @@ protected Iterator<E> elements(ArrayData list) {
private class ElementIterator<E> implements Iterator<E> {
private final int size;
private final ArrayData list;
private final ArrayData.ElementGetter getter;
private int index;

private ElementIterator(ArrayData list) {
this.list = list;
size = list.size();
getter = ArrayData.createElementGetter(elementType);
index = 0;
}

Expand All @@ -366,13 +372,7 @@ public E next() {
throw new NoSuchElementException();
}

E element;
if (list.isNullAt(index)) {
element = null;
} else {
element = (E) ArrayData.createElementGetter(elementType).getElementOrNull(list, index);
}

E element = (E) getter.getElementOrNull(list, index);
index += 1;

return element;
Expand Down Expand Up @@ -402,13 +402,17 @@ private class EntryIterator<K, V> implements Iterator<Map.Entry<K, V>> {
private final ArrayData keys;
private final ArrayData values;
private final ParquetValueReaders.ReusableEntry<K, V> entry;
private final ArrayData.ElementGetter keyGetter;
private final ArrayData.ElementGetter valueGetter;
private int index;

private EntryIterator(MapData map) {
size = map.size();
keys = map.keyArray();
values = map.valueArray();
entry = new ParquetValueReaders.ReusableEntry<>();
keyGetter = ArrayData.createElementGetter(keyType);
valueGetter = ArrayData.createElementGetter(valueType);
index = 0;
}

Expand All @@ -424,13 +428,7 @@ public Map.Entry<K, V> next() {
throw new NoSuchElementException();
}

if (values.isNullAt(index)) {
entry.set((K) ArrayData.createElementGetter(keyType).getElementOrNull(keys, index), null);
} else {
entry.set((K) ArrayData.createElementGetter(keyType).getElementOrNull(keys, index),
(V) ArrayData.createElementGetter(valueType).getElementOrNull(values, index));
}

entry.set((K) keyGetter.getElementOrNull(keys, index), (V) valueGetter.getElementOrNull(values, index));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ArrayData interface only defines getElementOrNull.

index += 1;

return entry;
Expand All @@ -439,16 +437,19 @@ public Map.Entry<K, V> next() {
}

private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowData> {
private final List<LogicalType> types;
private final RowData.FieldGetter[] fieldGetter;

RowDataWriter(List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
super(writers);
this.types = types;
fieldGetter = new RowData.FieldGetter[types.size()];
for (int i = 0; i < types.size(); i += 1) {
fieldGetter[i] = RowData.createFieldGetter(types.get(i), i);
}
}

@Override
protected Object get(RowData struct, int index) {
return RowData.createFieldGetter(types.get(index), index).getFieldOrNull(struct);
return fieldGetter[index].getFieldOrNull(struct);
}
}
}