Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ Property Description
``bigquery.view-expire-duration`` Expire duration for the materialized view. ``24h``
``bigquery.view-materialization-project`` The project where the materialized view is going to be created The view's project
``bigquery.view-materialization-dataset`` The dataset where the materialized view is going to be created The view's dataset
``bigquery.skip-view-materialization`` Use REST API to access views instead of Storage API. BigQuery
``BIGNUMERIC`` and ``TIMESTAMP`` types are unsupported. ``false``
``bigquery.views-cache-ttl`` Duration for which the materialization of a view will be ``15m``
cached and reused. Set to ``0ms`` to disable the cache.
``bigquery.max-read-rows-retries`` The number of retries in case of retryable server issues ``3``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,35 @@ public TableResult query(String sql)
}
}

public TableResult query(TableId table, List<String> requiredColumns, Optional<String> filter)
{
String sql = selectSql(table, requiredColumns, filter);
log.debug("Execute query: %s", sql);
try {
return bigQuery.query(QueryJobConfiguration.of(sql));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e);
}
}

private String selectSql(TableId table, List<String> requiredColumns, Optional<String> filter)
{
String columns = requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(","));
return selectSql(table, columns, filter);
}

private String selectSql(TableId table, String formattedColumns, Optional<String> filter)
{
String tableName = fullTableName(table);
String query = format("SELECT %s FROM `%s`", formattedColumns, tableName);
if (filter.isEmpty()) {
return query;
}
return query + " WHERE " + filter.get();
}

private String selectSql(TableInfo remoteTable, List<String> requiredColumns)
{
String columns = requiredColumns.isEmpty() ? "*" :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class BigQueryConfig
private Optional<Integer> parallelism = Optional.empty();
private boolean viewsEnabled;
private Duration viewExpireDuration = new Duration(24, HOURS);
private boolean skipViewMaterialization;
private Optional<String> viewMaterializationProject = Optional.empty();
private Optional<String> viewMaterializationDataset = Optional.empty();
private int maxReadRowsRetries = DEFAULT_MAX_READ_ROWS_RETRIES;
Expand Down Expand Up @@ -114,6 +115,19 @@ public BigQueryConfig setViewExpireDuration(Duration viewExpireDuration)
return this;
}

public boolean isSkipViewMaterialization()
{
return skipViewMaterialization;
}

@Config("bigquery.skip-view-materialization")
@ConfigDescription("Skip materializing views")
public BigQueryConfig setSkipViewMaterialization(boolean skipViewMaterialization)
{
this.skipViewMaterialization = skipViewMaterialization;
return this;
}

public Optional<String> getViewMaterializationProject()
{
return viewMaterializationProject;
Expand Down Expand Up @@ -202,5 +216,9 @@ public BigQueryConfig setServiceCacheTtl(Duration serviceCacheTtl)
public void validate()
{
checkState(viewExpireDuration.toMillis() > viewsCacheTtl.toMillis(), "View expiration duration must be longer than view cache TTL");

if (skipViewMaterialization) {
checkState(viewsEnabled, "%s config property must be enabled when skipping view materialization", VIEWS_ENABLED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.inject.Inject;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -36,12 +37,14 @@ public class BigQueryPageSourceProvider
{
private static final Logger log = Logger.get(BigQueryPageSourceProvider.class);

private final BigQueryClientFactory bigQueryClientFactory;
private final BigQueryReadClientFactory bigQueryReadClientFactory;
private final int maxReadRowsRetries;

@Inject
public BigQueryPageSourceProvider(BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryConfig config)
public BigQueryPageSourceProvider(BigQueryClientFactory bigQueryClientFactory, BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryConfig config)
{
this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory is null");
this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory is null");
this.maxReadRowsRetries = requireNonNull(config, "config is null").getMaxReadRowsRetries();
}
Expand Down Expand Up @@ -71,6 +74,36 @@ public ConnectorPageSource createPageSource(
.map(BigQueryColumnHandle.class::cast)
.collect(toImmutableList());

return new BigQueryResultPageSource(bigQueryReadClientFactory.create(session), maxReadRowsRetries, bigQuerySplit, bigQueryColumnHandles);
return createPageSource(session, (BigQueryTableHandle) table, bigQuerySplit, bigQueryColumnHandles);
}

private ConnectorPageSource createPageSource(
ConnectorSession session,
BigQueryTableHandle table,
BigQuerySplit split,
List<BigQueryColumnHandle> columnHandles)
{
switch (split.getMode()) {
case STORAGE:
return createStoragePageSource(session, split, columnHandles);
case QUERY:
return createQueryPageSource(session, table, columnHandles, split.getFilter());
}
throw new UnsupportedOperationException("Unsupported mode: " + split.getMode());
}

private ConnectorPageSource createStoragePageSource(ConnectorSession session, BigQuerySplit split, List<BigQueryColumnHandle> columnHandles)
{
return new BigQueryStoragePageSource(bigQueryReadClientFactory.create(session), maxReadRowsRetries, split, columnHandles);
}

private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQueryTableHandle table, List<BigQueryColumnHandle> columnHandles, Optional<String> filter)
{
return new BigQueryQueryPageSource(
bigQueryClientFactory.create(session),
table,
columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()),
columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()),
filter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.bigquery.BigQueryType.toTrinoTimestamp;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.Decimals.isLongDecimal;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.round;
import static java.lang.String.format;
import static java.time.temporal.ChronoField.NANO_OF_SECOND;
import static java.util.Objects.requireNonNull;

public class BigQueryQueryPageSource
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice implementation having different page sources for different use-cases. This also means that at some point we can have some logic to decide when to use storage API vs query API.

implements ConnectorPageSource
{
private static final DateTimeFormatter TIME_FORMATTER = new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.optionalStart()
.appendFraction(NANO_OF_SECOND, 0, 6, true)
.optionalEnd()
.toFormatter();

private final List<Type> columnTypes;
private final PageBuilder pageBuilder;
private final TableResult tableResult;

private boolean finished;

public BigQueryQueryPageSource(
BigQueryClient client,
BigQueryTableHandle table,
List<String> columnNames,
List<Type> columnTypes,
Optional<String> filter)
{
requireNonNull(client, "client is null");
requireNonNull(table, "table is null");
requireNonNull(columnNames, "columnNames is null");
requireNonNull(columnTypes, "columnTypes is null");
requireNonNull(filter, "filter is null");
checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes sizes don't match");
this.columnTypes = ImmutableList.copyOf(columnTypes);
this.pageBuilder = new PageBuilder(columnTypes);
TableId tableId = TableId.of(client.getProjectId(), table.getRemoteTableName().getDatasetName(), table.getRemoteTableName().getTableName());
this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter);
}

@Override
public long getCompletedBytes()
{
return 0;
}

@Override
public long getReadTimeNanos()
{
return 0;
}

@Override
public boolean isFinished()
{
return finished;
}

@Override
public long getMemoryUsage()
{
return 0;
}

@Override
public Page getNextPage()
{
verify(pageBuilder.isEmpty());
for (FieldValueList record : tableResult.iterateAll()) {
pageBuilder.declarePosition();
for (int column = 0; column < columnTypes.size(); column++) {
BlockBuilder output = pageBuilder.getBlockBuilder(column);
appendTo(columnTypes.get(column), record.get(column), output);
}
}
finished = true;

Page page = pageBuilder.build();
pageBuilder.reset();
return page;
}

private void appendTo(Type type, FieldValue value, BlockBuilder output)
Comment thread
hashhar marked this conversation as resolved.
{
// TODO (https://github.com/trinodb/trino/issues/12346) Add support for bignumeric and timestamp with time zone types
if (value == null || value.isNull()) {
output.appendNull();
return;
}

Class<?> javaType = type.getJavaType();
try {
if (javaType == boolean.class) {
type.writeBoolean(output, value.getBooleanValue());
}
else if (javaType == long.class) {
if (type.equals(BIGINT)) {
type.writeLong(output, value.getLongValue());
}
else if (type.equals(INTEGER)) {
type.writeLong(output, value.getLongValue());
}
else if (type.equals(DATE)) {
type.writeLong(output, LocalDate.parse(value.getStringValue()).toEpochDay());
}
else if (type.equals(TIME_MICROS)) {
LocalTime time = LocalTime.parse(value.getStringValue(), TIME_FORMATTER);
long nanosOfDay = time.toNanoOfDay();
verify(nanosOfDay < NANOSECONDS_PER_DAY, "Invalid value of nanosOfDay: %s", nanosOfDay);
long picosOfDay = nanosOfDay * PICOSECONDS_PER_NANOSECOND;
long rounded = round(picosOfDay, 12 - TIME_MICROS.getPrecision());
if (rounded == PICOSECONDS_PER_DAY) {
rounded = 0;
}
type.writeLong(output, rounded);
}
else if (type.equals(TIMESTAMP_MICROS)) {
type.writeLong(output, toTrinoTimestamp((value.getStringValue())));
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
}
}
else if (javaType == double.class) {
type.writeDouble(output, value.getDoubleValue());
}
else if (type.getJavaType() == Int128.class) {
verify(isLongDecimal(type), "The type should be long decimal");
DecimalType decimalType = (DecimalType) type;
BigDecimal decimal = value.getNumericValue();
type.writeObject(output, Decimals.encodeScaledValue(decimal, decimalType.getScale()));
}
else if (javaType == Slice.class) {
writeSlice(output, type, value);
}
else if (javaType == Block.class) {
writeBlock(output, type, value);
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
}
}
catch (ClassCastException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type), e);
}
}

private static void writeSlice(BlockBuilder output, Type type, FieldValue value)
{
if (type instanceof VarcharType) {
type.writeSlice(output, utf8Slice(value.getStringValue()));
}
else if (type instanceof VarbinaryType) {
type.writeSlice(output, Slices.wrappedBuffer(value.getBytesValue()));
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature());
}
}

private void writeBlock(BlockBuilder output, Type type, FieldValue value)
{
if (type instanceof ArrayType) {
BlockBuilder builder = output.beginBlockEntry();

for (FieldValue element : value.getRepeatedValue()) {
appendTo(type.getTypeParameters().get(0), element, builder);
}

output.closeEntry();
return;
}
if (type instanceof RowType) {
FieldValueList record = value.getRecordValue();
BlockBuilder builder = output.beginBlockEntry();

for (int index = 0; index < type.getTypeParameters().size(); index++) {
appendTo(type.getTypeParameters().get(index), record.get(index), builder);
}
output.closeEntry();
return;
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
}

@Override
public void close() {}
}
Loading