Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.cache.EvictableCacheBuilder;
Expand All @@ -49,7 +50,6 @@
import io.trino.spi.connector.TableNotFoundException;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -107,8 +107,8 @@ public class BigQueryClient
private final ViewMaterializationCache materializationCache;
private final boolean caseInsensitiveNameMatching;
private final LoadingCache<String, List<DatasetId>> remoteDatasetIdCache;
private final Cache<String, Map<String, RemoteDatabaseObject>> remoteDatasetCaseInsentiveCache;
private final Cache<DatasetId, Map<TableId, RemoteDatabaseObject>> remoteTableCaseInsentiveCache;
private final Cache<DatasetId, RemoteDatabaseObject> remoteDatasetCaseInsensitiveCache;
private final Cache<TableId, RemoteDatabaseObject> remoteTableCaseInsensitiveCache;
private final Optional<String> configProjectId;

public BigQueryClient(
Expand All @@ -130,8 +130,8 @@ public BigQueryClient(
.expireAfterWrite(metadataCacheTtl.toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build(CacheLoader.from(this::listDatasetIdsFromBigQuery));
this.remoteDatasetCaseInsentiveCache = buildCache(caseInsensitiveNameMatchingCacheTtl);
this.remoteTableCaseInsentiveCache = buildCache(caseInsensitiveNameMatchingCacheTtl);
this.remoteDatasetCaseInsensitiveCache = buildCache(caseInsensitiveNameMatchingCacheTtl);
this.remoteTableCaseInsensitiveCache = buildCache(caseInsensitiveNameMatchingCacheTtl);
this.configProjectId = requireNonNull(configProjectId, "projectId is null");
}

Expand Down Expand Up @@ -162,23 +162,20 @@ public Optional<RemoteDatabaseObject> toRemoteDataset(String projectId, String d
return Optional.of(RemoteDatabaseObject.of(datasetName));
}

try {
Map<String, RemoteDatabaseObject> datasetMap = remoteDatasetCaseInsentiveCache.get(projectId, () -> {
Map<String, RemoteDatabaseObject> mapping = new HashMap<>();
for (DatasetId dataset : datasetIds.get()) {
mapping.merge(
dataset.getDataset().toLowerCase(ENGLISH),
RemoteDatabaseObject.of(dataset.getDataset()),
(currentValue, collision) -> currentValue.registerCollision(collision.getOnlyRemoteName()));
}
return mapping;
});

return Optional.ofNullable(datasetMap.get(datasetName));
DatasetId cacheKey = DatasetId.of(projectId, datasetName);

Optional<RemoteDatabaseObject> remoteDataSetFromCache = Optional.ofNullable(remoteDatasetCaseInsensitiveCache.getIfPresent(cacheKey));
if (remoteDataSetFromCache.isPresent()) {
return remoteDataSetFromCache;
}
catch (ExecutionException e) {
return Optional.empty();

// Get all information from BigQuery and update cache from all fetched information
for (DatasetId datasetId : datasetIds.get()) {
DatasetId newCacheKey = datasetIdToLowerCase(datasetId);
RemoteDatabaseObject newValue = RemoteDatabaseObject.of(datasetId.getDataset());
updateCache(remoteDatasetCaseInsensitiveCache, newCacheKey, newValue);
}
return Optional.ofNullable(remoteDatasetCaseInsensitiveCache.getIfPresent(cacheKey));
}

public Optional<RemoteDatabaseObject> toRemoteTable(ConnectorSession session, String projectId, String remoteDatasetName, String tableName)
Expand All @@ -202,26 +199,49 @@ private Optional<RemoteDatabaseObject> toRemoteTable(String projectId, String re
}

TableId cacheKey = TableId.of(projectId, remoteDatasetName, tableName);
DatasetId datasetId = DatasetId.of(projectId, remoteDatasetName);

Optional<RemoteDatabaseObject> remoteTableFromCache = Optional.ofNullable(remoteTableCaseInsensitiveCache.getIfPresent(cacheKey));
if (remoteTableFromCache.isPresent()) {
return remoteTableFromCache;
}

// Get all information from BigQuery and update cache from all fetched information
for (TableId table : tableIds.get()) {
TableId newCacheKey = tableIdToLowerCase(table);
RemoteDatabaseObject newValue = RemoteDatabaseObject.of(table.getTable());
updateCache(remoteTableCaseInsensitiveCache, newCacheKey, newValue);
}

return Optional.ofNullable(remoteTableCaseInsensitiveCache.getIfPresent(cacheKey));
}

private static <T> void updateCache(Cache<T, RemoteDatabaseObject> caseInsensitiveCache, T newCacheKey, RemoteDatabaseObject newValue)
{
try {
Map<TableId, RemoteDatabaseObject> tableMap = remoteTableCaseInsentiveCache.get(datasetId, () -> {
Map<TableId, RemoteDatabaseObject> mapping = new HashMap<>();
for (TableId table : tableIds.get()) {
mapping.merge(
tableIdToLowerCase(table),
RemoteDatabaseObject.of(table.getTable()),
(currentValue, collision) -> currentValue.registerCollision(collision.getOnlyRemoteName()));
}
return mapping;
});

return Optional.ofNullable(tableMap.get(cacheKey));
RemoteDatabaseObject currentCacheValue = caseInsensitiveCache.getIfPresent(newCacheKey);
if (currentCacheValue == null) {
caseInsensitiveCache.get(newCacheKey, () -> newValue);
}
else if (!currentCacheValue.remoteNames.contains(newValue.getOnlyRemoteName())) {
// Cache already has key, check if new value is already registered and update with collision if it's not
RemoteDatabaseObject mergedValue = currentCacheValue.registerCollision(newValue.getOnlyRemoteName());
caseInsensitiveCache.invalidate(newCacheKey);
caseInsensitiveCache.get(newCacheKey, () -> mergedValue);
}
}
catch (ExecutionException e) {
return Optional.empty();
// Loading cache value should never throw as it's only storing precomputed value
throw new UncheckedExecutionException(e);
}
}

private static DatasetId datasetIdToLowerCase(DatasetId datasetId)
{
return DatasetId.of(
datasetId.getProject(),
datasetId.getDataset().toLowerCase(ENGLISH));
}

private static TableId tableIdToLowerCase(TableId tableId)
{
return TableId.of(
Expand Down Expand Up @@ -350,7 +370,7 @@ public void createSchema(DatasetInfo datasetInfo)
{
bigQuery.create(datasetInfo);
remoteDatasetIdCache.invalidate(datasetInfo.getDatasetId().getProject());
remoteDatasetCaseInsentiveCache.invalidate(datasetInfo.getDatasetId().getProject());
remoteDatasetCaseInsensitiveCache.invalidate(datasetIdToLowerCase(datasetInfo.getDatasetId()));
}

public void dropSchema(DatasetId datasetId, boolean cascade)
Expand All @@ -362,19 +382,19 @@ public void dropSchema(DatasetId datasetId, boolean cascade)
bigQuery.delete(datasetId);
}
remoteDatasetIdCache.invalidate(datasetId.getProject());
remoteDatasetCaseInsentiveCache.invalidate(datasetId.getProject());
remoteDatasetCaseInsensitiveCache.invalidate(datasetIdToLowerCase(datasetId));
}

public void createTable(TableInfo tableInfo)
{
bigQuery.create(tableInfo);
remoteTableCaseInsentiveCache.invalidate(DatasetId.of(tableInfo.getTableId().getProject(), tableInfo.getTableId().getDataset()));
remoteTableCaseInsensitiveCache.invalidate(tableIdToLowerCase(tableInfo.getTableId()));
}

public void dropTable(TableId tableId)
{
bigQuery.delete(tableId);
remoteTableCaseInsentiveCache.invalidate(DatasetId.of(tableId.getProject(), tableId.getDataset()));
remoteTableCaseInsensitiveCache.invalidate(tableIdToLowerCase(tableId));
}

Job create(JobInfo jobInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ protected QueryRunner createQueryRunner()
.setConnectorProperties(ImmutableMap.<String, String>builder()
.put("bigquery.case-insensitive-name-matching", "true")
.put("bigquery.case-insensitive-name-matching.cache-ttl", "1m")
.put("bigquery.service-cache-ttl", "0ms") // Disable service cache to focus on metadata cache
.buildOrThrow())
.build();
}
Expand Down