Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ Property Description
``BIGNUMERIC`` and ``TIMESTAMP`` types are unsupported. ``false``
``bigquery.views-cache-ttl`` Duration for which the materialization of a view will be ``15m``
cached and reused. Set to ``0ms`` to disable the cache.
``bigquery.metadata.cache-ttl`` Duration for which metadata retrieved from BigQuery ``0ms``
is cached and reused. Set to ``0ms`` to disable the cache.
``bigquery.max-read-rows-retries`` The number of retries in case of retryable server issues ``3``
``bigquery.credentials-key`` The base64 encoded credentials key None. See the `requirements <#requirements>`_ section.
``bigquery.credentials-file`` The path to the JSON credentials file None. See the `requirements <#requirements>`_ section.
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@
<excludes>
<!-- If you are adding entry here also add an entry to cloud-tests or cloud-tests-case-insensitive-mapping profile below -->
<exclude>**/TestBigQueryConnectorTest.java</exclude>
<exclude>**/TestBigQueryMetadataCaching.java</exclude>
<exclude>**/TestBigQueryTypeMapping.java</exclude>
<exclude>**/TestBigQueryMetadata.java</exclude>
<exclude>**/TestBigQueryInstanceCleaner.java</exclude>
Expand Down Expand Up @@ -415,6 +416,7 @@
<configuration>
<includes>
<include>**/TestBigQueryConnectorTest.java</include>
<include>**/TestBigQueryMetadataCaching.java</include>
<include>**/TestBigQueryTypeMapping.java</include>
<include>**/TestBigQueryMetadata.java</include>
<include>**/TestBigQueryInstanceCleaner.java</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.TableNotFoundException;

Expand All @@ -45,6 +48,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import static com.google.cloud.bigquery.JobStatistics.QueryStatistics.StatementType.SELECT;
Expand All @@ -58,9 +62,11 @@
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.joining;

public class BigQueryClient
Expand All @@ -70,12 +76,17 @@ public class BigQueryClient
private final BigQuery bigQuery;
private final ViewMaterializationCache materializationCache;
private final boolean caseInsensitiveNameMatching;
private final LoadingCache<String, List<Dataset>> remoteDatasetCache;

public BigQueryClient(BigQuery bigQuery, boolean caseInsensitiveNameMatching, ViewMaterializationCache materializationCache)
public BigQueryClient(BigQuery bigQuery, boolean caseInsensitiveNameMatching, ViewMaterializationCache materializationCache, Duration metadataCacheTtl)
{
this.bigQuery = requireNonNull(bigQuery, "bigQuery is null");
this.materializationCache = requireNonNull(materializationCache, "materializationCache is null");
this.caseInsensitiveNameMatching = caseInsensitiveNameMatching;
this.remoteDatasetCache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(metadataCacheTtl.toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build(CacheLoader.from(this::listDatasetsFromBigQuery));
}

public Optional<RemoteDatabaseObject> toRemoteDataset(String projectId, String datasetName)
Expand Down Expand Up @@ -175,7 +186,18 @@ public String getProjectId()

public Iterable<Dataset> listDatasets(String projectId)
{
return bigQuery.listDatasets(projectId).iterateAll();
try {
return remoteDatasetCache.get(projectId);
}
catch (ExecutionException e) {
throw new TrinoException(BIGQUERY_LISTING_DATASET_ERROR, "Failed to retrieve datasets from BigQuery", e);
}
}

private List<Dataset> listDatasetsFromBigQuery(String projectId)
{
return stream(bigQuery.listDatasets(projectId).iterateAll())
.collect(toImmutableList());
}

public Iterable<Table> listTables(DatasetId remoteDatasetId, TableDefinition.Type... types)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.cache.CacheBuilder;
import io.airlift.units.Duration;
import io.trino.collect.cache.NonEvictableCache;
import io.trino.spi.connector.ConnectorSession;

Expand All @@ -40,6 +41,7 @@ public class BigQueryClientFactory
private final ViewMaterializationCache materializationCache;
private final HeaderProvider headerProvider;
private final NonEvictableCache<IdentityCacheMapping.IdentityCacheKey, BigQueryClient> clientCache;
private final Duration metadataCacheTtl;

@Inject
public BigQueryClientFactory(
Expand All @@ -56,6 +58,7 @@ public BigQueryClientFactory(
this.caseInsensitiveNameMatching = bigQueryConfig.isCaseInsensitiveNameMatching();
this.materializationCache = requireNonNull(materializationCache, "materializationCache is null");
this.headerProvider = requireNonNull(headerProvider, "headerProvider is null");
this.metadataCacheTtl = bigQueryConfig.getMetadataCacheTtl();

CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder()
.expireAfterWrite(bigQueryConfig.getServiceCacheTtl().toMillis(), MILLISECONDS);
Expand All @@ -72,7 +75,7 @@ public BigQueryClient create(ConnectorSession session)

protected BigQueryClient createBigQueryClient(ConnectorSession session)
{
return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache);
return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache, metadataCacheTtl);
}

protected BigQuery createBigQuery(ConnectorSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;

@DefunctConfig("bigquery.case-insensitive-name-matching.cache-ttl")
Expand All @@ -51,6 +52,7 @@ public class BigQueryConfig
private boolean caseInsensitiveNameMatching;
private Duration viewsCacheTtl = new Duration(15, MINUTES);
private Duration serviceCacheTtl = new Duration(3, MINUTES);
private Duration metadataCacheTtl = new Duration(0, MILLISECONDS);
private boolean queryResultsCacheEnabled;

private int rpcInitialChannelCount = 1;
Expand Down Expand Up @@ -222,6 +224,21 @@ public BigQueryConfig setServiceCacheTtl(Duration serviceCacheTtl)
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getMetadataCacheTtl()
{
return metadataCacheTtl;
}

@Config("bigquery.metadata.cache-ttl")
@ConfigDescription("Duration for which BigQuery client metadata is cached after listing")
public BigQueryConfig setMetadataCacheTtl(Duration metadataCacheTtl)
{
this.metadataCacheTtl = metadataCacheTtl;
return this;
}

public boolean isQueryResultsCacheEnabled()
{
return queryResultsCacheEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -44,6 +45,7 @@ public void testDefaults()
.setCaseInsensitiveNameMatching(false)
.setViewsCacheTtl(new Duration(15, MINUTES))
.setServiceCacheTtl(new Duration(3, MINUTES))
.setMetadataCacheTtl(new Duration(0, MILLISECONDS))
.setViewsEnabled(false)
.setQueryResultsCacheEnabled(false)
.setRpcInitialChannelCount(1)
Expand All @@ -69,6 +71,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.put("bigquery.case-insensitive-name-matching", "true")
.put("bigquery.views-cache-ttl", "1m")
.put("bigquery.service-cache-ttl", "10d")
.put("bigquery.metadata.cache-ttl", "5d")
.put("bigquery.query-results-cache.enabled", "true")
.put("bigquery.channel-pool.initial-size", "11")
.put("bigquery.channel-pool.min-size", "12")
Expand All @@ -90,6 +93,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.setCaseInsensitiveNameMatching(true)
.setViewsCacheTtl(new Duration(1, MINUTES))
.setServiceCacheTtl(new Duration(10, DAYS))
.setMetadataCacheTtl(new Duration(5, DAYS))
.setQueryResultsCacheEnabled(true)
.setRpcInitialChannelCount(11)
.setRpcMinChannelCount(12)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.google.common.collect.ImmutableMap;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.testng.annotations.Test;

import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static org.testng.Assert.assertEquals;

public class TestBigQueryMetadataCaching
extends AbstractTestQueryFramework
{
protected BigQuerySqlExecutor bigQuerySqlExecutor;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
this.bigQuerySqlExecutor = new BigQuerySqlExecutor();
return BigQueryQueryRunner.createQueryRunner(
ImmutableMap.of(),
ImmutableMap.of("bigquery.metadata.cache-ttl", "5m"));
}

@Test
public void testMetadataCaching()
{
String schema = "test_metadata_caching_" + randomTableSuffix();
try {
getQueryRunner().execute("CREATE SCHEMA " + schema);
assertEquals(getQueryRunner().execute("SHOW SCHEMAS IN bigquery LIKE '" + schema + "'").getOnlyValue(), schema);

String schemaTableName = schema + ".test_metadata_caching";
getQueryRunner().execute("CREATE TABLE " + schemaTableName + " AS SELECT * FROM tpch.tiny.region");
assertEquals(getQueryRunner().execute("SELECT * FROM " + schemaTableName).getRowCount(), 5);

bigQuerySqlExecutor.execute("DROP SCHEMA " + schema + " CASCADE");
assertEquals(getQueryRunner().execute("SHOW SCHEMAS IN bigquery LIKE '" + schema + "'").getOnlyValue(), schema);

assertQueryFails("SELECT * FROM " + schemaTableName, ".*Schema '.+' does not exist.*");
}
finally {
bigQuerySqlExecutor.execute("DROP SCHEMA IF EXISTS " + schema + " CASCADE");
}
}
}