Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
*/
package io.trino.plugin.bigquery;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.type.ArrayType;
Expand All @@ -28,7 +26,6 @@
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.arrow.memory.ArrowBuf;
Expand Down Expand Up @@ -57,10 +54,8 @@
import java.util.List;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.plugin.bigquery.BigQueryUtil.toBigQueryColumnName;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
Expand All @@ -83,8 +78,6 @@
import static org.apache.arrow.compression.CommonsCompressionFactory.INSTANCE;
import static org.apache.arrow.vector.complex.BaseRepeatedValueVector.OFFSET_WIDTH;
import static org.apache.arrow.vector.types.Types.MinorType.DECIMAL256;
import static org.apache.arrow.vector.types.Types.MinorType.LIST;
import static org.apache.arrow.vector.types.Types.MinorType.STRUCT;

public class BigQueryArrowToPageConverter
implements AutoCloseable
Expand Down Expand Up @@ -170,8 +163,11 @@ else if (javaType == Slice.class) {
else if (javaType == LongTimestampWithTimeZone.class) {
writeVectorValues(output, vector, index -> writeObjectTimestampWithTimezone(output, type, vector, index), offset, length);
}
else if (javaType == Block.class) {
writeVectorValues(output, vector, index -> writeBlock(output, type, vector, index), offset, length);
else if (type instanceof ArrayType arrayType) {
writeVectorValues(output, vector, index -> writeArrayBlock(output, arrayType, vector, index), offset, length);
}
else if (type instanceof RowType rowType) {
writeVectorValues(output, vector, index -> writeRowBlock(output, rowType, vector, index), offset, length);
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
Expand Down Expand Up @@ -237,24 +233,10 @@ private void writeObjectTimestampWithTimezone(BlockBuilder output, Type type, Fi
type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY));
}

private void writeBlock(BlockBuilder output, Type type, FieldVector vector, int index)
{
if (type instanceof ArrayType && vector.getMinorType() == LIST) {
writeArrayBlock(output, type, vector, index);
return;
}
if (type instanceof RowType && vector.getMinorType() == STRUCT) {
writeRowBlock(output, type, vector, index);
return;
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
}

private void writeArrayBlock(BlockBuilder output, Type type, FieldVector vector, int index)
private void writeArrayBlock(BlockBuilder output, ArrayType arrayType, FieldVector vector, int index)
{
Type elementType = arrayType.getElementType();
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
Type elementType = getOnlyElement(type.getTypeParameters());

ArrowBuf offsetBuffer = vector.getOffsetBuffer();

int start = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
Expand All @@ -270,20 +252,14 @@ private void writeArrayBlock(BlockBuilder output, Type type, FieldVector vector,
});
}

private void writeRowBlock(BlockBuilder output, Type type, FieldVector vector, int index)
private void writeRowBlock(BlockBuilder output, RowType rowType, FieldVector vector, int index)
{
List<RowType.Field> fields = rowType.getFields();
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
ImmutableList.Builder<String> fieldNamesBuilder = ImmutableList.builder();
for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) {
TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i);
fieldNamesBuilder.add(parameter.getNamedTypeSignature().getName().orElse("field" + i));
}
List<String> fieldNames = fieldNamesBuilder.build();
checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldNames size differs from type %s type parameters size", type);

for (int i = 0; i < type.getTypeParameters().size(); i++) {
FieldVector innerVector = ((StructVector) vector).getChild(fieldNames.get(i));
convertType(fieldBuilders.get(i), type.getTypeParameters().get(i), innerVector, index, 1);
for (int i = 0; i < fields.size(); i++) {
RowType.Field field = fields.get(i);
FieldVector innerVector = ((StructVector) vector).getChild(field.getName().orElse("field" + i));
convertType(fieldBuilders.get(i), field.getType(), innerVector, index, 1);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
Expand Down Expand Up @@ -208,8 +207,22 @@ else if (type.getJavaType() == Int128.class) {
else if (javaType == Slice.class) {
writeSlice(output, type, value);
}
else if (javaType == Block.class) {
writeBlock(output, type, value);
else if (type instanceof ArrayType arrayType) {
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
Type elementType = arrayType.getElementType();
for (FieldValue element : value.getRepeatedValue()) {
appendTo(elementType, element, elementBuilder);
}
});
}
else if (type instanceof RowType rowType) {
FieldValueList record = value.getRecordValue();
List<RowType.Field> fields = rowType.getFields();
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
for (int index = 0; index < fields.size(); index++) {
appendTo(fields.get(index).getType(), record.get(index), fieldBuilders.get(index));
}
});
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
Expand All @@ -233,28 +246,6 @@ else if (type instanceof VarbinaryType) {
}
}

private void writeBlock(BlockBuilder output, Type type, FieldValue value)
{
if (type instanceof ArrayType) {
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
for (FieldValue element : value.getRepeatedValue()) {
appendTo(type.getTypeParameters().get(0), element, elementBuilder);
}
});
return;
}
if (type instanceof RowType) {
FieldValueList record = value.getRecordValue();
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
for (int index = 0; index < type.getTypeParameters().size(); index++) {
appendTo(type.getTypeParameters().get(index), record.get(index), fieldBuilders.get(index));
}
});
return;
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
Expand All @@ -32,8 +31,8 @@
import io.trino.spi.type.Int128;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.avro.Conversions.DecimalConversion;
Expand All @@ -48,7 +47,6 @@
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -206,8 +204,11 @@ else if (javaType == LongTimestampWithTimeZone.class) {
int picosOfMillis = toIntExact(floorMod(epochMicros, MICROSECONDS_PER_MILLISECOND)) * PICOSECONDS_PER_MICROSECOND;
type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY));
}
else if (javaType == Block.class) {
writeBlock(output, type, value);
else if (type instanceof ArrayType arrayType) {
writeArray((ArrayBlockBuilder) output, (List<?>) value, arrayType);
}
else if (type instanceof RowType rowType) {
writeRow((RowBlockBuilder) output, rowType, (GenericRecord) value);
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
Expand Down Expand Up @@ -249,31 +250,25 @@ private static void writeObject(BlockBuilder output, Type type, Object value)
}
}

private void writeBlock(BlockBuilder output, Type type, Object value)
private void writeArray(ArrayBlockBuilder output, List<?> value, ArrayType arrayType)
{
if (type instanceof ArrayType && value instanceof List<?>) {
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
for (Object element : (List<?>) value) {
appendTo(type.getTypeParameters().get(0), element, elementBuilder);
}
});
return;
}
if (type instanceof RowType && value instanceof GenericRecord record) {
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
List<String> fieldNames = new ArrayList<>();
for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) {
TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i);
fieldNames.add(parameter.getNamedTypeSignature().getName().orElse("field" + i));
}
checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldName doesn't match with type size : %s", type);
for (int index = 0; index < type.getTypeParameters().size(); index++) {
appendTo(type.getTypeParameters().get(index), record.get(fieldNames.get(index)), fieldBuilders.get(index));
}
});
return;
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
Type elementType = arrayType.getElementType();
output.buildEntry(elementBuilder -> {
for (Object element : value) {
appendTo(elementType, element, elementBuilder);
}
});
}

private void writeRow(RowBlockBuilder output, RowType rowType, GenericRecord record)
{
List<Field> fields = rowType.getFields();
output.buildEntry(fieldBuilders -> {
for (int index = 0; index < fields.size(); index++) {
Field field = fields.get(index);
appendTo(field.getType(), record.get(field.getName().orElse("field" + index)), fieldBuilders.get(index));
}
});
}

@Override
Expand Down