diff --git a/docs/src/main/sphinx/connector/bigquery.rst b/docs/src/main/sphinx/connector/bigquery.rst index 65c00af962ab..afc43bf9e228 100644 --- a/docs/src/main/sphinx/connector/bigquery.rst +++ b/docs/src/main/sphinx/connector/bigquery.rst @@ -129,6 +129,8 @@ Property Description ``bigquery.view-expire-duration`` Expire duration for the materialized view. ``24h`` ``bigquery.view-materialization-project`` The project where the materialized view is going to be created The view's project ``bigquery.view-materialization-dataset`` The dataset where the materialized view is going to be created The view's dataset +``bigquery.skip-view-materialization`` Use REST API to access views instead of Storage API. BigQuery + ``BIGNUMERIC`` and ``TIMESTAMP`` types are unsupported. ``false`` ``bigquery.views-cache-ttl`` Duration for which the materialization of a view will be ``15m`` cached and reused. Set to ``0ms`` to disable the cache. ``bigquery.max-read-rows-retries`` The number of retries in case of retryable server issues ``3`` diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index 8be942d68cf1..f8f126e2db0b 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -219,6 +219,35 @@ public TableResult query(String sql) } } + public TableResult query(TableId table, List requiredColumns, Optional filter) + { + String sql = selectSql(table, requiredColumns, filter); + log.debug("Execute query: %s", sql); + try { + return bigQuery.query(QueryJobConfiguration.of(sql)); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e); + } + } + + private String selectSql(TableId table, List requiredColumns, Optional filter) + { + String columns = requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(",")); + return selectSql(table, columns, filter); + } + + private String selectSql(TableId table, String formattedColumns, Optional filter) + { + String tableName = fullTableName(table); + String query = format("SELECT %s FROM `%s`", formattedColumns, tableName); + if (filter.isEmpty()) { + return query; + } + return query + " WHERE " + filter.get(); + } + private String selectSql(TableInfo remoteTable, List requiredColumns) { String columns = requiredColumns.isEmpty() ? "*" : diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java index 143f3853e8f8..44ffe49b4e9f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java @@ -41,6 +41,7 @@ public class BigQueryConfig private Optional parallelism = Optional.empty(); private boolean viewsEnabled; private Duration viewExpireDuration = new Duration(24, HOURS); + private boolean skipViewMaterialization; private Optional viewMaterializationProject = Optional.empty(); private Optional viewMaterializationDataset = Optional.empty(); private int maxReadRowsRetries = DEFAULT_MAX_READ_ROWS_RETRIES; @@ -114,6 +115,19 @@ public BigQueryConfig setViewExpireDuration(Duration viewExpireDuration) return this; } + public boolean isSkipViewMaterialization() + { + return skipViewMaterialization; + } + + @Config("bigquery.skip-view-materialization") + @ConfigDescription("Skip materializing views") + public BigQueryConfig setSkipViewMaterialization(boolean skipViewMaterialization) + { + this.skipViewMaterialization = skipViewMaterialization; + return this; + } + public Optional getViewMaterializationProject() { return viewMaterializationProject; @@ -202,5 +216,9 @@ public BigQueryConfig setServiceCacheTtl(Duration serviceCacheTtl) public void validate() { checkState(viewExpireDuration.toMillis() > viewsCacheTtl.toMillis(), "View expiration duration must be longer than view cache TTL"); + + if (skipViewMaterialization) { + checkState(viewsEnabled, "%s config property must be enabled when skipping view materialization", VIEWS_ENABLED); + } } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index 6ce9ec8f6579..eb17b3575785 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -26,6 +26,7 @@ import javax.inject.Inject; import java.util.List; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -36,12 +37,14 @@ public class BigQueryPageSourceProvider { private static final Logger log = Logger.get(BigQueryPageSourceProvider.class); + private final BigQueryClientFactory bigQueryClientFactory; private final BigQueryReadClientFactory bigQueryReadClientFactory; private final int maxReadRowsRetries; @Inject - public BigQueryPageSourceProvider(BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryConfig config) + public BigQueryPageSourceProvider(BigQueryClientFactory bigQueryClientFactory, BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryConfig config) { + this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory is null"); this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory is null"); this.maxReadRowsRetries = requireNonNull(config, "config is null").getMaxReadRowsRetries(); } @@ -71,6 +74,36 @@ public ConnectorPageSource createPageSource( .map(BigQueryColumnHandle.class::cast) .collect(toImmutableList()); - return new BigQueryResultPageSource(bigQueryReadClientFactory.create(session), maxReadRowsRetries, bigQuerySplit, bigQueryColumnHandles); + return createPageSource(session, (BigQueryTableHandle) table, bigQuerySplit, bigQueryColumnHandles); + } + + private ConnectorPageSource createPageSource( + ConnectorSession session, + BigQueryTableHandle table, + BigQuerySplit split, + List columnHandles) + { + switch (split.getMode()) { + case STORAGE: + return createStoragePageSource(session, split, columnHandles); + case QUERY: + return createQueryPageSource(session, table, columnHandles, split.getFilter()); + } + throw new UnsupportedOperationException("Unsupported mode: " + split.getMode()); + } + + private ConnectorPageSource createStoragePageSource(ConnectorSession session, BigQuerySplit split, List columnHandles) + { + return new BigQueryStoragePageSource(bigQueryReadClientFactory.create(session), maxReadRowsRetries, split, columnHandles); + } + + private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQueryTableHandle table, List columnHandles, Optional filter) + { + return new BigQueryQueryPageSource( + bigQueryClientFactory.create(session), + table, + columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()), + columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()), + filter); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java new file mode 100644 index 000000000000..9574ac2f6c13 --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -0,0 +1,247 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableResult; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.Page; +import io.trino.spi.PageBuilder; +import io.trino.spi.TrinoException; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.Int128; +import io.trino.spi.type.RowType; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.bigquery.BigQueryType.toTrinoTimestamp; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.Decimals.isLongDecimal; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_DAY; +import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_DAY; +import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND; +import static io.trino.spi.type.Timestamps.round; +import static java.lang.String.format; +import static java.time.temporal.ChronoField.NANO_OF_SECOND; +import static java.util.Objects.requireNonNull; + +public class BigQueryQueryPageSource + implements ConnectorPageSource +{ + private static final DateTimeFormatter TIME_FORMATTER = new DateTimeFormatterBuilder() + .appendPattern("HH:mm:ss") + .optionalStart() + .appendFraction(NANO_OF_SECOND, 0, 6, true) + .optionalEnd() + .toFormatter(); + + private final List columnTypes; + private final PageBuilder pageBuilder; + private final TableResult tableResult; + + private boolean finished; + + public BigQueryQueryPageSource( + BigQueryClient client, + BigQueryTableHandle table, + List columnNames, + List columnTypes, + Optional filter) + { + requireNonNull(client, "client is null"); + requireNonNull(table, "table is null"); + requireNonNull(columnNames, "columnNames is null"); + requireNonNull(columnTypes, "columnTypes is null"); + requireNonNull(filter, "filter is null"); + checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes sizes don't match"); + this.columnTypes = ImmutableList.copyOf(columnTypes); + this.pageBuilder = new PageBuilder(columnTypes); + TableId tableId = TableId.of(client.getProjectId(), table.getRemoteTableName().getDatasetName(), table.getRemoteTableName().getTableName()); + this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter); + } + + @Override + public long getCompletedBytes() + { + return 0; + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public boolean isFinished() + { + return finished; + } + + @Override + public long getMemoryUsage() + { + return 0; + } + + @Override + public Page getNextPage() + { + verify(pageBuilder.isEmpty()); + for (FieldValueList record : tableResult.iterateAll()) { + pageBuilder.declarePosition(); + for (int column = 0; column < columnTypes.size(); column++) { + BlockBuilder output = pageBuilder.getBlockBuilder(column); + appendTo(columnTypes.get(column), record.get(column), output); + } + } + finished = true; + + Page page = pageBuilder.build(); + pageBuilder.reset(); + return page; + } + + private void appendTo(Type type, FieldValue value, BlockBuilder output) + { + // TODO (https://github.com/trinodb/trino/issues/12346) Add support for bignumeric and timestamp with time zone types + if (value == null || value.isNull()) { + output.appendNull(); + return; + } + + Class javaType = type.getJavaType(); + try { + if (javaType == boolean.class) { + type.writeBoolean(output, value.getBooleanValue()); + } + else if (javaType == long.class) { + if (type.equals(BIGINT)) { + type.writeLong(output, value.getLongValue()); + } + else if (type.equals(INTEGER)) { + type.writeLong(output, value.getLongValue()); + } + else if (type.equals(DATE)) { + type.writeLong(output, LocalDate.parse(value.getStringValue()).toEpochDay()); + } + else if (type.equals(TIME_MICROS)) { + LocalTime time = LocalTime.parse(value.getStringValue(), TIME_FORMATTER); + long nanosOfDay = time.toNanoOfDay(); + verify(nanosOfDay < NANOSECONDS_PER_DAY, "Invalid value of nanosOfDay: %s", nanosOfDay); + long picosOfDay = nanosOfDay * PICOSECONDS_PER_NANOSECOND; + long rounded = round(picosOfDay, 12 - TIME_MICROS.getPrecision()); + if (rounded == PICOSECONDS_PER_DAY) { + rounded = 0; + } + type.writeLong(output, rounded); + } + else if (type.equals(TIMESTAMP_MICROS)) { + type.writeLong(output, toTrinoTimestamp((value.getStringValue()))); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); + } + } + else if (javaType == double.class) { + type.writeDouble(output, value.getDoubleValue()); + } + else if (type.getJavaType() == Int128.class) { + verify(isLongDecimal(type), "The type should be long decimal"); + DecimalType decimalType = (DecimalType) type; + BigDecimal decimal = value.getNumericValue(); + type.writeObject(output, Decimals.encodeScaledValue(decimal, decimalType.getScale())); + } + else if (javaType == Slice.class) { + writeSlice(output, type, value); + } + else if (javaType == Block.class) { + writeBlock(output, type, value); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); + } + } + catch (ClassCastException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type), e); + } + } + + private static void writeSlice(BlockBuilder output, Type type, FieldValue value) + { + if (type instanceof VarcharType) { + type.writeSlice(output, utf8Slice(value.getStringValue())); + } + else if (type instanceof VarbinaryType) { + type.writeSlice(output, Slices.wrappedBuffer(value.getBytesValue())); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature()); + } + } + + private void writeBlock(BlockBuilder output, Type type, FieldValue value) + { + if (type instanceof ArrayType) { + BlockBuilder builder = output.beginBlockEntry(); + + for (FieldValue element : value.getRepeatedValue()) { + appendTo(type.getTypeParameters().get(0), element, builder); + } + + output.closeEntry(); + return; + } + if (type instanceof RowType) { + FieldValueList record = value.getRecordValue(); + BlockBuilder builder = output.beginBlockEntry(); + + for (int index = 0; index < type.getTypeParameters().size(); index++) { + appendTo(type.getTypeParameters().get(index), record.get(index), builder); + } + output.closeEntry(); + return; + } + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature()); + } + + @Override + public void close() {} +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java index 79f16a46b4bf..3973fb4dcb57 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java @@ -23,9 +23,12 @@ import java.util.List; import java.util.Objects; +import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.trino.plugin.bigquery.BigQuerySplit.Mode.QUERY; +import static io.trino.plugin.bigquery.BigQuerySplit.Mode.STORAGE; import static java.util.Objects.requireNonNull; public class BigQuerySplit @@ -35,33 +38,50 @@ public class BigQuerySplit private static final int NO_ROWS_TO_GENERATE = -1; + private final Mode mode; private final String streamName; private final String avroSchema; private final List columns; private final long emptyRowsToGenerate; + private final Optional filter; // do not use directly, it is public only for Jackson @JsonCreator public BigQuerySplit( + @JsonProperty("mode") Mode mode, @JsonProperty("streamName") String streamName, @JsonProperty("avroSchema") String avroSchema, @JsonProperty("columns") List columns, - @JsonProperty("emptyRowsToGenerate") long emptyRowsToGenerate) + @JsonProperty("emptyRowsToGenerate") long emptyRowsToGenerate, + @JsonProperty("filter") Optional filter) { + this.mode = requireNonNull(mode, "mode is null"); this.streamName = requireNonNull(streamName, "streamName cannot be null"); this.avroSchema = requireNonNull(avroSchema, "avroSchema cannot be null"); this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns cannot be null")); this.emptyRowsToGenerate = emptyRowsToGenerate; + this.filter = requireNonNull(filter, "filter is null"); } static BigQuerySplit forStream(String streamName, String avroSchema, List columns) { - return new BigQuerySplit(streamName, avroSchema, columns, NO_ROWS_TO_GENERATE); + return new BigQuerySplit(STORAGE, streamName, avroSchema, columns, NO_ROWS_TO_GENERATE, Optional.empty()); + } + + static BigQuerySplit forViewStream(List columns, Optional filter) + { + return new BigQuerySplit(QUERY, "", "", columns, NO_ROWS_TO_GENERATE, filter); } static BigQuerySplit emptyProjection(long numberOfRows) { - return new BigQuerySplit("", "", ImmutableList.of(), numberOfRows); + return new BigQuerySplit(STORAGE, "", "", ImmutableList.of(), numberOfRows, Optional.empty()); + } + + @JsonProperty + public Mode getMode() + { + return mode; } @JsonProperty @@ -88,6 +108,12 @@ public long getEmptyRowsToGenerate() return emptyRowsToGenerate; } + @JsonProperty + public Optional getFilter() + { + return filter; + } + @Override public boolean isRemotelyAccessible() { @@ -125,7 +151,8 @@ public boolean equals(Object o) return false; } BigQuerySplit that = (BigQuerySplit) o; - return Objects.equals(streamName, that.streamName) && + return Objects.equals(mode, that.mode) && + Objects.equals(streamName, that.streamName) && Objects.equals(avroSchema, that.avroSchema) && Objects.equals(columns, that.columns) && Objects.equals(emptyRowsToGenerate, that.emptyRowsToGenerate); @@ -134,13 +161,14 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(streamName, avroSchema, columns, emptyRowsToGenerate); + return Objects.hash(mode, streamName, avroSchema, columns, emptyRowsToGenerate); } @Override public String toString() { return toStringHelper(this) + .add("mode", mode) .add("streamName", streamName) .add("avroSchema", avroSchema) .add("columns", columns) @@ -152,4 +180,11 @@ boolean representsEmptyProjection() { return emptyRowsToGenerate != NO_ROWS_TO_GENERATE; } + + public enum Mode + { + STORAGE, + QUERY, + /**/; + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index a61c1a279bb7..5b8df2b584c9 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -14,6 +14,7 @@ package io.trino.plugin.bigquery; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TableResult; @@ -59,6 +60,7 @@ public class BigQuerySplitManager private final Optional parallelism; private final boolean viewEnabled; private final Duration viewExpiration; + private final boolean skipViewMaterialization; private final NodeManager nodeManager; @Inject @@ -75,6 +77,7 @@ public BigQuerySplitManager( this.parallelism = config.getParallelism(); this.viewEnabled = config.isViewsEnabled(); this.viewExpiration = config.getViewExpireDuration(); + this.skipViewMaterialization = config.isSkipViewMaterialization(); this.nodeManager = requireNonNull(nodeManager, "nodeManager cannot be null"); } @@ -95,7 +98,7 @@ public ConnectorSplitSource getSplits( Optional filter = BigQueryFilterQueryBuilder.buildFilter(constraint); List splits = emptyProjectionIsRequired(bigQueryTableHandle.getProjectedColumns()) ? createEmptyProjection(session, remoteTableId, actualParallelism, filter) : - readFromBigQuery(session, remoteTableId, bigQueryTableHandle.getProjectedColumns(), actualParallelism, filter); + readFromBigQuery(session, TableDefinition.Type.valueOf(bigQueryTableHandle.getType()), remoteTableId, bigQueryTableHandle.getProjectedColumns(), actualParallelism, filter); return new FixedSplitSource(splits); } @@ -104,7 +107,7 @@ private static boolean emptyProjectionIsRequired(Optional> pr return projectedColumns.isPresent() && projectedColumns.get().isEmpty(); } - private List readFromBigQuery(ConnectorSession session, TableId remoteTableId, Optional> projectedColumns, int actualParallelism, Optional filter) + private List readFromBigQuery(ConnectorSession session, TableDefinition.Type type, TableId remoteTableId, Optional> projectedColumns, int actualParallelism, Optional filter) { log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, actualParallelism=%s, filter=[%s])", remoteTableId, projectedColumns, actualParallelism, filter); List columns = projectedColumns.orElse(ImmutableList.of()); @@ -112,6 +115,9 @@ private List readFromBigQuery(ConnectorSession session, TableId r .map(column -> ((BigQueryColumnHandle) column).getName()) .collect(toImmutableList()); + if (skipViewMaterialization && type == VIEW) { + return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); + } ReadSession readSession = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, viewExpiration) .create(session, remoteTableId, projectedColumnsNames, filter, actualParallelism); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryResultPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStoragePageSource.java similarity index 98% rename from plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryResultPageSource.java rename to plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStoragePageSource.java index 9af7ac985b61..3bc420383932 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryResultPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStoragePageSource.java @@ -77,10 +77,10 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; -public class BigQueryResultPageSource +public class BigQueryStoragePageSource implements ConnectorPageSource { - private static final Logger log = Logger.get(BigQueryResultPageSource.class); + private static final Logger log = Logger.get(BigQueryStoragePageSource.class); private static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter(); @@ -92,7 +92,7 @@ public class BigQueryResultPageSource private final PageBuilder pageBuilder; private final Iterator responses; - public BigQueryResultPageSource( + public BigQueryStoragePageSource( BigQueryReadClient bigQueryReadClient, int maxReadRowsRetries, BigQuerySplit split, diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryTestView.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryTestView.java new file mode 100644 index 000000000000..eef2577741a5 --- /dev/null +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryTestView.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import io.trino.testing.sql.SqlExecutor; +import io.trino.testing.sql.TestTable; + +import java.util.List; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class BigQueryTestView + extends TestTable +{ + private final SqlExecutor sqlExecutor; + private final TestTable table; + private final String viewName; + + public BigQueryTestView(SqlExecutor sqlExecutor, TestTable table) + { + super(sqlExecutor, table.getName(), null); + this.sqlExecutor = requireNonNull(sqlExecutor, "sqlExecutor is null"); + this.table = requireNonNull(table, "table is null"); + this.viewName = table.getName() + "_view"; + } + + @Override + public void createAndInsert(List rowsToInsert) {} + + public void createView() + { + sqlExecutor.execute(format("CREATE VIEW %s AS SELECT * FROM %s", viewName, table.getName())); + } + + @Override + public String getName() + { + return viewName; + } + + @Override + public void close() + { + sqlExecutor.execute("DROP TABLE " + table.getName()); + sqlExecutor.execute("DROP VIEW " + viewName); + } +} diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryViewCreateAndInsertDataSetup.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryViewCreateAndInsertDataSetup.java new file mode 100644 index 000000000000..b537aceb4250 --- /dev/null +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BigQueryViewCreateAndInsertDataSetup.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import io.trino.testing.datatype.ColumnSetup; +import io.trino.testing.datatype.CreateAndInsertDataSetup; +import io.trino.testing.sql.SqlExecutor; +import io.trino.testing.sql.TestTable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class BigQueryViewCreateAndInsertDataSetup + extends CreateAndInsertDataSetup +{ + private final SqlExecutor sqlExecutor; + + public BigQueryViewCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String tableNamePrefix) + { + super(sqlExecutor, tableNamePrefix); + this.sqlExecutor = requireNonNull(sqlExecutor, "sqlExecutor is null"); + } + + @Override + public TestTable setupTestTable(List inputs) + { + TestTable table = super.setupTestTable(inputs); + BigQueryTestView view = new BigQueryTestView(sqlExecutor, table); + view.createView(); + return view; + } +} diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java index 9e7d3116d035..432449d23d66 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java @@ -37,6 +37,7 @@ public void testDefaults() .setParentProjectId(null) .setParallelism(null) .setViewExpireDuration(new Duration(24, HOURS)) + .setSkipViewMaterialization(false) .setViewMaterializationProject(null) .setViewMaterializationDataset(null) .setMaxReadRowsRetries(3) @@ -55,6 +56,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .put("bigquery.parallelism", "20") .put("bigquery.views-enabled", "true") .put("bigquery.view-expire-duration", "30m") + .put("bigquery.skip-view-materialization", "true") .put("bigquery.view-materialization-project", "vmproject") .put("bigquery.view-materialization-dataset", "vmdataset") .put("bigquery.max-read-rows-retries", "10") @@ -69,6 +71,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .setParallelism(20) .setViewsEnabled(true) .setViewExpireDuration(new Duration(30, MINUTES)) + .setSkipViewMaterialization(true) .setViewMaterializationProject("vmproject") .setViewMaterializationDataset("vmdataset") .setMaxReadRowsRetries(10) @@ -88,5 +91,12 @@ public void testInvalidViewSetting() .validate()) .isInstanceOf(IllegalStateException.class) .hasMessage("View expiration duration must be longer than view cache TTL"); + + assertThatThrownBy(() -> new BigQueryConfig() + .setSkipViewMaterialization(true) + .setViewsEnabled(false) + .validate()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("bigquery.views-enabled config property must be enabled when skipping view materialization"); } } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryTypeMapping.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryTypeMapping.java index 6a31cf3c6343..4226e9bb54df 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryTypeMapping.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryTypeMapping.java @@ -41,7 +41,9 @@ import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * @see BigQuery data types @@ -63,7 +65,7 @@ protected QueryRunner createQueryRunner() { return BigQueryQueryRunner.createQueryRunner( ImmutableMap.of(), - ImmutableMap.of()); + ImmutableMap.of("bigquery.skip-view-materialization", "true")); } @Test @@ -73,7 +75,8 @@ public void testBoolean() .addRoundTrip("boolean", "true", BOOLEAN, "true") .addRoundTrip("boolean", "false", BOOLEAN, "false") .addRoundTrip("boolean", "NULL", BOOLEAN, "CAST(NULL AS BOOLEAN)") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.boolean")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.boolean")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.boolean")); } @Test @@ -89,7 +92,8 @@ public void testBytes() .addRoundTrip("bytes", "from_hex('000000000000')", VARBINARY, "X'000000000000'") .addRoundTrip("bytes(10)", "from_hex('68656C6C6F')", VARBINARY, "to_utf8('hello')") .addRoundTrip("bytes(4001)", "from_hex('68656C6C6F')", VARBINARY, "to_utf8('hello')") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.bytes")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.bytes")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.bytes")); } @Test(dataProvider = "bigqueryIntegerTypeProvider") @@ -100,7 +104,8 @@ public void testInteger(String inputType) .addRoundTrip(inputType, "9223372036854775807", BIGINT, "9223372036854775807") .addRoundTrip(inputType, "0", BIGINT, "CAST(0 AS BIGINT)") .addRoundTrip(inputType, "NULL", BIGINT, "CAST(NULL AS BIGINT)") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.integer")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.integer")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.integer")); } @DataProvider @@ -127,7 +132,8 @@ public void testFloat() .addRoundTrip("float64", "CAST('NaN' AS float64)", DOUBLE, "nan()") .addRoundTrip("float64", "CAST('Infinity' AS float64)", DOUBLE, "+infinity()") .addRoundTrip("float64", "CAST('-Infinity' AS float64)", DOUBLE, "-infinity()") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.float")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.float")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.float")); } @Test @@ -161,6 +167,47 @@ public void testNumericMapping() .execute(getQueryRunner(), bigqueryCreateAndInsert("test.numeric")); } + @Test + public void testNumericMappingView() + { + // BigQuery views always return DECIMAL(38, 9) + SqlDataTypeTest.create() + .addRoundTrip("NUMERIC(3, 0)", "NUMERIC '193'", createDecimalType(38, 9), "CAST(193 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(3, 0)", "NUMERIC '19'", createDecimalType(38, 9), "CAST(19 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(3, 0)", "NUMERIC '-193'", createDecimalType(38, 9), "CAST(-193 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(3, 1)", "NUMERIC '10.0'", createDecimalType(38, 9), "CAST(10.0 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(3, 1)", "NUMERIC '10.1'", createDecimalType(38, 9), "CAST(10.1 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(3, 1)", "NUMERIC '-10.1'", createDecimalType(38, 9), "CAST(-10.1 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(4, 2)", "NUMERIC '2'", createDecimalType(38, 9), "CAST(2 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(4, 2)", "NUMERIC '2.3'", createDecimalType(38, 9), "CAST(2.3 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(24, 2)", "NUMERIC '2'", createDecimalType(38, 9), "CAST(2 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(24, 2)", "NUMERIC '2.3'", createDecimalType(38, 9), "CAST(2.3 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(24, 2)", "NUMERIC '123456789.3'", createDecimalType(38, 9), "CAST(123456789.3 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(24, 4)", "NUMERIC '12345678901234567890.31'", createDecimalType(38, 9), "CAST(12345678901234567890.31 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(29, 0)", "NUMERIC '27182818284590452353602874713'", createDecimalType(38, 9), "CAST('27182818284590452353602874713' AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(29, 0)", "NUMERIC '-27182818284590452353602874713'", createDecimalType(38, 9), "CAST('-27182818284590452353602874713' AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(30, 5)", "NUMERIC '3141592653589793238462643.38327'", createDecimalType(38, 9), "CAST(3141592653589793238462643.38327 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(30, 5)", "NUMERIC '-3141592653589793238462643.38327'", createDecimalType(38, 9), "CAST(-3141592653589793238462643.38327 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(38, 9)", "NUMERIC '100000000020000000001234567.123456789'", createDecimalType(38, 9), "CAST(100000000020000000001234567.123456789 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(38, 9)", "NUMERIC '-100000000020000000001234567.123456789'", createDecimalType(38, 9), "CAST(-100000000020000000001234567.123456789 AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(10, 3)", "CAST(NULL AS NUMERIC)", createDecimalType(38, 9), "CAST(NULL AS DECIMAL(38, 9))") + .addRoundTrip("NUMERIC(38, 9)", "CAST(NULL AS NUMERIC)", createDecimalType(38, 9), "CAST(NULL AS DECIMAL(38, 9))") + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.numeric")); + } + + @Test + public void testInvalidNumericScaleType() + { + String tableName = "test.invalid_numeric_scale_" + randomTableSuffix(); + try { + assertThatThrownBy(() -> getBigQuerySqlExecutor().execute(format("CREATE TABLE %s (invalid_type NUMERIC(38, 10))", tableName))) + .hasMessageContaining("In NUMERIC(P, S), S must be between 0 and 9"); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + @Test public void testBigNumericMapping() { @@ -190,6 +237,16 @@ public void testBigNumericMapping() .addRoundTrip("BIGNUMERIC(38)", "BIGNUMERIC '10000000002000000000300000000012345678'", createDecimalType(38, 0), "CAST('10000000002000000000300000000012345678' AS DECIMAL(38, 0))") .addRoundTrip("BIGNUMERIC(38)", "BIGNUMERIC '-10000000002000000000300000000012345678'", createDecimalType(38, 0), "CAST('-10000000002000000000300000000012345678' AS DECIMAL(38, 0))") .execute(getQueryRunner(), bigqueryCreateAndInsert("test.bignumeric")); + // TODO (https://github.com/trinodb/trino/pull/12210) Add support for bigquery type in views + } + + @Test + public void testUnsupportedBigNumericMappingView() + { + assertThatThrownBy(() -> SqlDataTypeTest.create() + .addRoundTrip("BIGNUMERIC(3, 0)", "BIGNUMERIC '193'", createDecimalType(3, 0), "CAST(193 AS DECIMAL(3, 0))") + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.bignumeric"))) + .hasMessageContaining("SELECT * not allowed from relation that has no columns"); } @Test(dataProvider = "bigqueryUnsupportedBigNumericTypeProvider") @@ -232,7 +289,8 @@ public void testDate() .addRoundTrip("date", "DATE '2017-07-01'", DATE, "DATE '2017-07-01'") .addRoundTrip("date", "DATE '2017-01-01'", DATE, "DATE '2017-01-01'") .addRoundTrip("date", "DATE '9999-12-31'", DATE, "DATE '9999-12-31'") // max value in BigQuery - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.date")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.date")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.date")); } @Test @@ -283,7 +341,8 @@ public void testDatetime() // max value in BigQuery .addRoundTrip("datetime", "datetime '9999-12-31 23:59:59.999999'", createTimestampType(6), "TIMESTAMP '9999-12-31 23:59:59.999999'") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.datetime")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.datetime")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.datetime")); } @Test @@ -305,7 +364,8 @@ public void testTime() .addRoundTrip("time", "'23:59:59.99999'", createTimeType(6), "TIME '23:59:59.999990'") .addRoundTrip("time", "'23:59:59.999999'", createTimeType(6), "TIME '23:59:59.999999'") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.time")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.time")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.time")); } @Test @@ -345,6 +405,7 @@ public void testTimestampWithTimeZone() .addRoundTrip("TIMESTAMP", "TIMESTAMP '9999-12-31 23:59:59.999999-00:00'", TIMESTAMP_TZ_MICROS, "TIMESTAMP '9999-12-31 23:59:59.999999 UTC'") .execute(getQueryRunner(), bigqueryCreateAndInsert("test.timestamp_tz")); + // TODO (https://github.com/trinodb/trino/pull/12210) Add support for timestamp with time zone type in views } @Test @@ -358,7 +419,8 @@ public void testString() .addRoundTrip("STRING", "'Ну, погоди!'", VARCHAR, "VARCHAR 'Ну, погоди!'") .addRoundTrip("STRING(255)", "'text_b'", VARCHAR, "VARCHAR 'text_b'") .addRoundTrip("STRING(4001)", "'text_c'", VARCHAR, "VARCHAR 'text_c'") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.string")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.string")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.string")); } @Test @@ -368,7 +430,8 @@ public void testGeography() .addRoundTrip("GEOGRAPHY", "ST_GeogPoint(0, 0)", VARCHAR, "VARCHAR 'POINT(0 0)'") .addRoundTrip("GEOGRAPHY", "ST_GeogPoint(90, -90)", VARCHAR, "VARCHAR 'POINT(90 -90)'") .addRoundTrip("GEOGRAPHY", "NULL", VARCHAR, "CAST(NULL AS VARCHAR)") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.geography")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.geography")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.geography")); } @Test @@ -384,7 +447,8 @@ public void testArray() "ARRAY[CAST(ROW(1, 'string') AS ROW(x BIGINT, y VARCHAR))]") .addRoundTrip("ARRAY", "[]", new ArrayType(BOOLEAN), "CAST(ARRAY[] AS ARRAY)") .addRoundTrip("ARRAY", "NULL", new ArrayType(BOOLEAN), "CAST(ARRAY[] AS ARRAY)") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.array")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.array")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.array")); } @Test @@ -407,7 +471,8 @@ public void testStruct() "NULL", RowType.from(ImmutableList.of(new Field(Optional.of("x"), BIGINT))), "CAST(NULL AS ROW(x BIGINT))") - .execute(getQueryRunner(), bigqueryCreateAndInsert("test.struct")); + .execute(getQueryRunner(), bigqueryCreateAndInsert("test.struct")) + .execute(getQueryRunner(), bigqueryViewCreateAndInsert("test.struct")); } private DataSetup bigqueryCreateAndInsert(String tableNamePrefix) @@ -415,6 +480,11 @@ private DataSetup bigqueryCreateAndInsert(String tableNamePrefix) return new CreateAndInsertDataSetup(getBigQuerySqlExecutor(), tableNamePrefix); } + private DataSetup bigqueryViewCreateAndInsert(String tableNamePrefix) + { + return new BigQueryViewCreateAndInsertDataSetup(getBigQuerySqlExecutor(), tableNamePrefix); + } + private SqlExecutor getBigQuerySqlExecutor() { return sql -> bigQuerySqlExecutor.execute(sql);