From 4324764cb9b47300adf1c9a6ad5a2c27f6e893b8 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Thu, 12 Oct 2023 12:30:08 -0700 Subject: [PATCH] Update BigQueryStorageAvroPageSource to SqlRow --- .../BigQueryArrowToPageConverter.java | 50 +++++------------ .../bigquery/BigQueryQueryPageSource.java | 41 ++++++-------- .../BigQueryStorageAvroPageSource.java | 53 +++++++++---------- 3 files changed, 53 insertions(+), 91 deletions(-) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java index c80724efdb75..fbfd8ff664d3 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryArrowToPageConverter.java @@ -13,12 +13,10 @@ */ package io.trino.plugin.bigquery; -import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.trino.spi.PageBuilder; import io.trino.spi.TrinoException; import io.trino.spi.block.ArrayBlockBuilder; -import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.RowBlockBuilder; import io.trino.spi.type.ArrayType; @@ -28,7 +26,6 @@ import io.trino.spi.type.LongTimestampWithTimeZone; import io.trino.spi.type.RowType; import io.trino.spi.type.Type; -import io.trino.spi.type.TypeSignatureParameter; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import org.apache.arrow.memory.ArrowBuf; @@ -57,10 +54,8 @@ import java.util.List; import java.util.function.Consumer; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.Iterables.getOnlyElement; import static io.airlift.slice.Slices.wrappedBuffer; import static io.trino.plugin.bigquery.BigQueryUtil.toBigQueryColumnName; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -83,8 +78,6 @@ import static org.apache.arrow.compression.CommonsCompressionFactory.INSTANCE; import static org.apache.arrow.vector.complex.BaseRepeatedValueVector.OFFSET_WIDTH; import static org.apache.arrow.vector.types.Types.MinorType.DECIMAL256; -import static org.apache.arrow.vector.types.Types.MinorType.LIST; -import static org.apache.arrow.vector.types.Types.MinorType.STRUCT; public class BigQueryArrowToPageConverter implements AutoCloseable @@ -170,8 +163,11 @@ else if (javaType == Slice.class) { else if (javaType == LongTimestampWithTimeZone.class) { writeVectorValues(output, vector, index -> writeObjectTimestampWithTimezone(output, type, vector, index), offset, length); } - else if (javaType == Block.class) { - writeVectorValues(output, vector, index -> writeBlock(output, type, vector, index), offset, length); + else if (type instanceof ArrayType arrayType) { + writeVectorValues(output, vector, index -> writeArrayBlock(output, arrayType, vector, index), offset, length); + } + else if (type instanceof RowType rowType) { + writeVectorValues(output, vector, index -> writeRowBlock(output, rowType, vector, index), offset, length); } else { throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); @@ -237,24 +233,10 @@ private void writeObjectTimestampWithTimezone(BlockBuilder output, Type type, Fi type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY)); } - private void writeBlock(BlockBuilder output, Type type, FieldVector vector, int index) - { - if (type instanceof ArrayType && vector.getMinorType() == LIST) { - writeArrayBlock(output, type, vector, index); - return; - } - if (type instanceof RowType && vector.getMinorType() == STRUCT) { - writeRowBlock(output, type, vector, index); - return; - } - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature()); - } - - private void writeArrayBlock(BlockBuilder output, Type type, FieldVector vector, int index) + private void writeArrayBlock(BlockBuilder output, ArrayType arrayType, FieldVector vector, int index) { + Type elementType = arrayType.getElementType(); ((ArrayBlockBuilder) output).buildEntry(elementBuilder -> { - Type elementType = getOnlyElement(type.getTypeParameters()); - ArrowBuf offsetBuffer = vector.getOffsetBuffer(); int start = offsetBuffer.getInt((long) index * OFFSET_WIDTH); @@ -270,20 +252,14 @@ private void writeArrayBlock(BlockBuilder output, Type type, FieldVector vector, }); } - private void writeRowBlock(BlockBuilder output, Type type, FieldVector vector, int index) + private void writeRowBlock(BlockBuilder output, RowType rowType, FieldVector vector, int index) { + List fields = rowType.getFields(); ((RowBlockBuilder) output).buildEntry(fieldBuilders -> { - ImmutableList.Builder fieldNamesBuilder = ImmutableList.builder(); - for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) { - TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i); - fieldNamesBuilder.add(parameter.getNamedTypeSignature().getName().orElse("field" + i)); - } - List fieldNames = fieldNamesBuilder.build(); - checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldNames size differs from type %s type parameters size", type); - - for (int i = 0; i < type.getTypeParameters().size(); i++) { - FieldVector innerVector = ((StructVector) vector).getChild(fieldNames.get(i)); - convertType(fieldBuilders.get(i), type.getTypeParameters().get(i), innerVector, index, 1); + for (int i = 0; i < fields.size(); i++) { + RowType.Field field = fields.get(i); + FieldVector innerVector = ((StructVector) vector).getChild(field.getName().orElse("field" + i)); + convertType(fieldBuilders.get(i), field.getType(), innerVector, index, 1); } }); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 1636eeff28e3..7330328f5653 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -24,7 +24,6 @@ import io.trino.spi.PageBuilder; import io.trino.spi.TrinoException; import io.trino.spi.block.ArrayBlockBuilder; -import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.RowBlockBuilder; import io.trino.spi.connector.ConnectorPageSource; @@ -208,8 +207,22 @@ else if (type.getJavaType() == Int128.class) { else if (javaType == Slice.class) { writeSlice(output, type, value); } - else if (javaType == Block.class) { - writeBlock(output, type, value); + else if (type instanceof ArrayType arrayType) { + ((ArrayBlockBuilder) output).buildEntry(elementBuilder -> { + Type elementType = arrayType.getElementType(); + for (FieldValue element : value.getRepeatedValue()) { + appendTo(elementType, element, elementBuilder); + } + }); + } + else if (type instanceof RowType rowType) { + FieldValueList record = value.getRecordValue(); + List fields = rowType.getFields(); + ((RowBlockBuilder) output).buildEntry(fieldBuilders -> { + for (int index = 0; index < fields.size(); index++) { + appendTo(fields.get(index).getType(), record.get(index), fieldBuilders.get(index)); + } + }); } else { throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); @@ -233,28 +246,6 @@ else if (type instanceof VarbinaryType) { } } - private void writeBlock(BlockBuilder output, Type type, FieldValue value) - { - if (type instanceof ArrayType) { - ((ArrayBlockBuilder) output).buildEntry(elementBuilder -> { - for (FieldValue element : value.getRepeatedValue()) { - appendTo(type.getTypeParameters().get(0), element, elementBuilder); - } - }); - return; - } - if (type instanceof RowType) { - FieldValueList record = value.getRecordValue(); - ((RowBlockBuilder) output).buildEntry(fieldBuilders -> { - for (int index = 0; index < type.getTypeParameters().size(); index++) { - appendTo(type.getTypeParameters().get(index), record.get(index), fieldBuilders.get(index)); - } - }); - return; - } - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature()); - } - @Override public void close() {} } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java index 87bedcafd8ce..f4bede11819d 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java @@ -22,7 +22,6 @@ import io.trino.spi.PageBuilder; import io.trino.spi.TrinoException; import io.trino.spi.block.ArrayBlockBuilder; -import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.RowBlockBuilder; import io.trino.spi.connector.ConnectorPageSource; @@ -32,8 +31,8 @@ import io.trino.spi.type.Int128; import io.trino.spi.type.LongTimestampWithTimeZone; import io.trino.spi.type.RowType; +import io.trino.spi.type.RowType.Field; import io.trino.spi.type.Type; -import io.trino.spi.type.TypeSignatureParameter; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; import org.apache.avro.Conversions.DecimalConversion; @@ -48,7 +47,6 @@ import java.io.UncheckedIOException; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -206,8 +204,11 @@ else if (javaType == LongTimestampWithTimeZone.class) { int picosOfMillis = toIntExact(floorMod(epochMicros, MICROSECONDS_PER_MILLISECOND)) * PICOSECONDS_PER_MICROSECOND; type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY)); } - else if (javaType == Block.class) { - writeBlock(output, type, value); + else if (type instanceof ArrayType arrayType) { + writeArray((ArrayBlockBuilder) output, (List) value, arrayType); + } + else if (type instanceof RowType rowType) { + writeRow((RowBlockBuilder) output, rowType, (GenericRecord) value); } else { throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); @@ -249,31 +250,25 @@ private static void writeObject(BlockBuilder output, Type type, Object value) } } - private void writeBlock(BlockBuilder output, Type type, Object value) + private void writeArray(ArrayBlockBuilder output, List value, ArrayType arrayType) { - if (type instanceof ArrayType && value instanceof List) { - ((ArrayBlockBuilder) output).buildEntry(elementBuilder -> { - for (Object element : (List) value) { - appendTo(type.getTypeParameters().get(0), element, elementBuilder); - } - }); - return; - } - if (type instanceof RowType && value instanceof GenericRecord record) { - ((RowBlockBuilder) output).buildEntry(fieldBuilders -> { - List fieldNames = new ArrayList<>(); - for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) { - TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i); - fieldNames.add(parameter.getNamedTypeSignature().getName().orElse("field" + i)); - } - checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldName doesn't match with type size : %s", type); - for (int index = 0; index < type.getTypeParameters().size(); index++) { - appendTo(type.getTypeParameters().get(index), record.get(fieldNames.get(index)), fieldBuilders.get(index)); - } - }); - return; - } - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature()); + Type elementType = arrayType.getElementType(); + output.buildEntry(elementBuilder -> { + for (Object element : value) { + appendTo(elementType, element, elementBuilder); + } + }); + } + + private void writeRow(RowBlockBuilder output, RowType rowType, GenericRecord record) + { + List fields = rowType.getFields(); + output.buildEntry(fieldBuilders -> { + for (int index = 0; index < fields.size(); index++) { + Field field = fields.get(index); + appendTo(field.getType(), record.get(field.getName().orElse("field" + index)), fieldBuilders.get(index)); + } + }); } @Override