Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.hudi.table.format.cow;

import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.ArrayGroupReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
Expand Down Expand Up @@ -62,6 +64,8 @@
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
Expand Down Expand Up @@ -282,12 +286,23 @@ private static ColumnVector createVectorFromConstant(
}
return tv;
case ARRAY:
HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
if (value == null) {
arrayVector.fillWithNulls();
return arrayVector;
ArrayType arrayType = (ArrayType) type;
if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
HeapArrayGroupColumnVector arrayGroup = new HeapArrayGroupColumnVector(batchSize);
if (value == null) {
arrayGroup.fillWithNulls();
return arrayGroup;
} else {
throw new UnsupportedOperationException("Unsupported create array with default value.");
}
} else {
throw new UnsupportedOperationException("Unsupported create array with default value.");
HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
if (value == null) {
arrayVector.fillWithNulls();
return arrayVector;
} else {
throw new UnsupportedOperationException("Unsupported create array with default value.");
}
}
case MAP:
HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize, null, null);
Expand Down Expand Up @@ -394,12 +409,23 @@ private static ColumnReader createColumnReader(
throw new AssertionError();
}
case ARRAY:
return new ArrayColumnReader(
descriptor,
pageReader,
utcTimestamp,
descriptor.getPrimitiveType(),
fieldType);
ArrayType arrayType = (ArrayType) fieldType;
if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
return new ArrayGroupReader(createColumnReader(
utcTimestamp,
arrayType.getElementType(),
physicalType.asGroupType().getType(0),
descriptors,
pages,
depth + 1));
} else {
return new ArrayColumnReader(
descriptor,
pageReader,
utcTimestamp,
descriptor.getPrimitiveType(),
fieldType);
}
case MAP:
MapType mapType = (MapType) fieldType;
ArrayColumnReader keyReader =
Expand All @@ -409,14 +435,24 @@ private static ColumnReader createColumnReader(
utcTimestamp,
descriptor.getPrimitiveType(),
new ArrayType(mapType.getKeyType()));
ArrayColumnReader valueReader =
new ArrayColumnReader(
descriptors.get(1),
pages.getPageReader(descriptors.get(1)),
utcTimestamp,
descriptors.get(1).getPrimitiveType(),
new ArrayType(mapType.getValueType()));
return new MapColumnReader(keyReader, valueReader, fieldType);
ColumnReader<WritableColumnVector> valueReader;
if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
valueReader = new ArrayGroupReader(createColumnReader(
utcTimestamp,
mapType.getValueType(),
physicalType.asGroupType().getType(0).asGroupType().getType(1), // Get the value physical type
descriptors.subList(1, descriptors.size()), // remove the key descriptor
pages,
depth + 2)); // increase the depth by 2, because there's a key_value entry in the path
} else {
valueReader = new ArrayColumnReader(
descriptors.get(1),
pages.getPageReader(descriptors.get(1)),
utcTimestamp,
descriptors.get(1).getPrimitiveType(),
new ArrayType(mapType.getValueType()));
}
return new MapColumnReader(keyReader, valueReader);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
Expand All @@ -427,14 +463,32 @@ private static ColumnReader createColumnReader(
if (fieldIndex < 0) {
fieldReaders.add(new EmptyColumnReader());
} else {
fieldReaders.add(
createColumnReader(
utcTimestamp,
rowType.getTypeAt(i),
groupType.getType(fieldIndex),
descriptors,
pages,
depth + 1));
// Check for nested row in array with atomic field type.

// This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields.
// In Parquet, an array of rows is stored as separate arrays for each field.

// Limitations: It won't work for multiple nested arrays and maps.
// The main problem is that the Flink classes and interface don't follow that pattern.
if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
fieldReaders.add(
createColumnReader(
utcTimestamp,
new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)),
groupType.getType(fieldIndex),
descriptors,
pages,
depth + 1));
} else {
fieldReaders.add(
createColumnReader(
utcTimestamp,
rowType.getTypeAt(i),
groupType.getType(fieldIndex),
descriptors,
pages,
depth + 1));
}
}
}
return new RowColumnReader(fieldReaders);
Expand Down Expand Up @@ -501,43 +555,65 @@ private static WritableColumnVector createWritableColumnVector(
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
return new HeapTimestampVector(batchSize);
case DECIMAL:
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
batchSize,
createWritableColumnVector(
batchSize,
arrayType.getElementType(),
physicalType,
descriptors,
depth));
if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
return new HeapArrayGroupColumnVector(
batchSize,
createWritableColumnVector(
batchSize,
arrayType.getElementType(),
physicalType.asGroupType().getType(0),
descriptors,
depth + 1));
} else {
return new HeapArrayVector(
batchSize,
createWritableColumnVector(
batchSize,
arrayType.getElementType(),
physicalType,
descriptors,
depth));
}
case MAP:
MapType mapType = (MapType) fieldType;
GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
// the map column has three level paths.
return new HeapMapColumnVector(
WritableColumnVector keyColumnVector = createWritableColumnVector(
batchSize,
createWritableColumnVector(
batchSize,
mapType.getKeyType(),
repeatedType.getType(0),
descriptors,
depth + 2),
createWritableColumnVector(
batchSize,
mapType.getValueType(),
repeatedType.getType(1),
descriptors,
depth + 2));
new ArrayType(mapType.getKeyType().isNullable(), mapType.getKeyType()),
repeatedType.getType(0),
descriptors,
depth + 2);
WritableColumnVector valueColumnVector;
if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
valueColumnVector = new HeapArrayGroupColumnVector(
batchSize,
createWritableColumnVector(
batchSize,
mapType.getValueType(),
repeatedType.getType(1).asGroupType(),
descriptors,
depth + 2));
} else {
valueColumnVector = createWritableColumnVector(
batchSize,
new ArrayType(mapType.getValueType().isNullable(), mapType.getValueType()),
repeatedType.getType(1),
descriptors,
depth + 2);
}
return new HeapMapColumnVector(batchSize, keyColumnVector, valueColumnVector);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
Expand All @@ -546,15 +622,44 @@ private static WritableColumnVector createWritableColumnVector(
// schema evolution: read the file with a new extended field name.
int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
if (fieldIndex < 0) {
columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
// Check for nested row in array with atomic field type.

// This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields.
// In Parquet, an array of rows is stored as separate arrays for each field.

// Limitations: It won't work for multiple nested arrays and maps.
// The main problem is that the Flink classes and interface don't follow that pattern.
if (groupType.getRepetition().equals(Type.Repetition.REPEATED) && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
columnVectors[i] = (WritableColumnVector) createVectorFromConstant(
new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)), null, batchSize);
} else {
columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
}
} else {
columnVectors[i] =
createWritableColumnVector(
batchSize,
rowType.getTypeAt(i),
groupType.getType(fieldIndex),
descriptors,
depth + 1);
// Check for nested row in array with atomic field type.

// This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields.
// In Parquet, an array of rows is stored as separate arrays for each field.

// Limitations: It won't work for multiple nested arrays and maps.
// The main problem is that the Flink classes and interface don't follow that pattern.
if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
columnVectors[i] =
createWritableColumnVector(
batchSize,
new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)),
groupType.getType(fieldIndex),
descriptors,
depth + 1);
} else {
columnVectors[i] =
createWritableColumnVector(
batchSize,
rowType.getTypeAt(i),
groupType.getType(fieldIndex),
descriptors,
depth + 1);
}
}
}
return new HeapRowColumnVector(batchSize, columnVectors);
Expand All @@ -575,8 +680,9 @@ private static int getFieldIndexInPhysicalType(String fieldName, GroupType group

/**
* Construct the error message when primitive type mismatches.
*
* @param primitiveType Primitive type
* @param fieldType Logical field type
* @param fieldType Logical field type
* @return The error message
*/
private static String getPrimitiveTypeCheckFailureMessage(PrimitiveType.PrimitiveTypeName primitiveType, LogicalType fieldType) {
Expand All @@ -585,8 +691,9 @@ private static String getPrimitiveTypeCheckFailureMessage(PrimitiveType.Primitiv

/**
* Construct the error message when original type mismatches.
*
* @param originalType Original type
* @param fieldType Logical field type
* @param fieldType Logical field type
* @return The error message
*/
private static String getOriginalTypeCheckFailureMessage(OriginalType originalType, LogicalType fieldType) {
Expand Down
Loading