Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import java.util.function.Supplier;

import static com.google.cloud.bigquery.JobStatistics.QueryStatistics.StatementType.SELECT;
import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.SNAPSHOT;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -75,6 +78,8 @@ public class BigQueryClient
{
private static final Logger log = Logger.get(BigQueryClient.class);

static final Set<TableDefinition.Type> TABLE_TYPES = ImmutableSet.of(TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL, SNAPSHOT);

private final BigQuery bigQuery;
private final ViewMaterializationCache materializationCache;
private final boolean caseInsensitiveNameMatching;
Expand Down Expand Up @@ -126,7 +131,7 @@ public Optional<RemoteDatabaseObject> toRemoteDataset(String projectId, String d

public Optional<RemoteDatabaseObject> toRemoteTable(String projectId, String remoteDatasetName, String tableName)
{
return toRemoteTable(projectId, remoteDatasetName, tableName, () -> listTables(DatasetId.of(projectId, remoteDatasetName), TABLE, VIEW));
return toRemoteTable(projectId, remoteDatasetName, tableName, () -> listTables(DatasetId.of(projectId, remoteDatasetName)));
}

public Optional<RemoteDatabaseObject> toRemoteTable(String projectId, String remoteDatasetName, String tableName, Iterable<Table> tables)
Expand Down Expand Up @@ -223,12 +228,11 @@ private List<Dataset> listDatasetsFromBigQuery(String projectId)
.collect(toImmutableList());
}

public Iterable<Table> listTables(DatasetId remoteDatasetId, TableDefinition.Type... types)
public Iterable<Table> listTables(DatasetId remoteDatasetId)
{
Set<TableDefinition.Type> allowedTypes = ImmutableSet.copyOf(types);
Iterable<Table> allTables = bigQuery.listTables(remoteDatasetId).iterateAll();
return stream(allTables)
.filter(table -> allowedTypes.contains(table.getDefinition().getType()))
.filter(table -> TABLE_TYPES.contains(table.getDefinition().getType()))
.collect(toImmutableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -193,7 +189,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder();
for (String remoteSchemaName : remoteSchemaNames) {
try {
Iterable<Table> tables = client.listTables(DatasetId.of(projectId, remoteSchemaName), TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL);
Iterable<Table> tables = client.listTables(DatasetId.of(projectId, remoteSchemaName));
for (Table table : tables) {
// filter ambiguous tables
client.toRemoteTable(projectId, remoteSchemaName, table.getTableId().getTable().toLowerCase(ENGLISH), tables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled;
Expand Down Expand Up @@ -168,7 +168,7 @@ private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, Tabl
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(remoteTableId.getDataset(), remoteTableId.getTable())));
// Note that we cannot use row count from TableInfo because for writes via insertAll/streaming API the number is incorrect until the streaming buffer is flushed
// (and there's no mechanism to trigger an on-demand flush). This can lead to incorrect results for queries with empty projections.
if (tableInfo.getDefinition().getType() == TABLE || tableInfo.getDefinition().getType() == VIEW) {
if (TABLE_TYPES.contains(tableInfo.getDefinition().getType())) {
String sql = client.selectSql(remoteTableId, "COUNT(*)");
TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session));
numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private TableInfo getActualTable(
// get it from the view
return client.getCachedTable(viewExpiration, remoteTable, requiredColumns);
}
// not regular table or a view
// Storage API doesn't support reading other table types (materialized views, external)
throw new TrinoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported",
tableType, remoteTable.getTableId().getDataset(), remoteTable.getTableId().getTable()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,43 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.TableDefinition;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.testing.BaseConnectorTest;
import io.trino.testing.MaterializedResult;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.TestTable;
import io.trino.testing.sql.TestView;
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Parameters;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.SNAPSHOT;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.TEST_SCHEMA;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.assertions.Assert.assertEventually;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -48,11 +60,15 @@ public abstract class BaseBigQueryConnectorTest
extends BaseConnectorTest
{
protected BigQuerySqlExecutor bigQuerySqlExecutor;
private String gcpStorageBucket;

@BeforeClass(alwaysRun = true)
public void initBigQueryExecutor()
@Parameters("testing.gcp-storage-bucket")
public void initBigQueryExecutor(String gcpStorageBucket)
{
this.bigQuerySqlExecutor = new BigQuerySqlExecutor();
// Prerequisite: upload region.csv in resources directory to gs://{testing.gcp-storage-bucket}/tpch/tiny/region.csv
this.gcpStorageBucket = gcpStorageBucket;
}

@SuppressWarnings("DuplicateBranchesInSwitch")
Expand Down Expand Up @@ -191,18 +207,40 @@ public void testCreateTableIfNotExists()
}
}

@Test
public void testEmptyProjection()
@Test(dataProvider = "emptyProjectionSetupDataProvider")
public void testEmptyProjection(TableDefinition.Type tableType, String createSql, String dropSql)
{
// Regression test for https://github.com/trinodb/trino/issues/14981
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_emtpy_projection",
" AS SELECT * FROM region")) {
assertQuery("SELECT count(*) FROM " + table.getName(), "VALUES 5");
// Regression test for https://github.com/trinodb/trino/issues/14981, https://github.com/trinodb/trino/issues/5635 and https://github.com/trinodb/trino/issues/6696
String name = TEST_SCHEMA + ".test_empty_projection_" + tableType.name().toLowerCase(ENGLISH) + randomNameSuffix();
onBigQuery(createSql.formatted(name));
try {
assertQuery("SELECT count(*) FROM " + name, "VALUES 5");
assertQuery("SELECT count(*) FROM " + name, "VALUES 5"); // repeated query to cover https://github.com/trinodb/trino/issues/6696
assertQuery("SELECT count(*) FROM " + name + " WHERE regionkey = 1", "VALUES 1");
assertQuery("SELECT count(name) FROM " + name + " WHERE regionkey = 1", "VALUES 1");
}
finally {
onBigQuery(dropSql.formatted(name));
}
}

@DataProvider
public Object[][] emptyProjectionSetupDataProvider()
{
Object[][] testCases = new Object[][] {
{TABLE, "CREATE TABLE %s AS SELECT * FROM tpch.region", "DROP TABLE %s"},
{VIEW, "CREATE VIEW %s AS SELECT * FROM tpch.region", "DROP VIEW %s"},
{MATERIALIZED_VIEW, "CREATE MATERIALIZED VIEW %s AS SELECT * FROM tpch.region", "DROP MATERIALIZED VIEW %s"},
{EXTERNAL, "CREATE EXTERNAL TABLE %s OPTIONS (format = 'CSV', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])", "DROP EXTERNAL TABLE %s"},
{SNAPSHOT, "CREATE SNAPSHOT TABLE %s CLONE tpch.region", "DROP SNAPSHOT TABLE %s"},
};
Set<TableDefinition.Type> testedTableTypes = Arrays.stream(testCases)
.map(array -> (TableDefinition.Type) array[0])
.collect(toImmutableSet());
verify(testedTableTypes.containsAll(TABLE_TYPES));
return testCases;
}

@Override
protected Optional<DataMappingTestSetup> filterDataMappingSmokeTestData(DataMappingTestSetup dataMappingTestSetup)
{
Expand Down Expand Up @@ -385,33 +423,6 @@ public void testPredicatePushdownPrunnedColumns()
}
}

@Test(description = "regression test for https://github.com/trinodb/trino/issues/5635")
public void testCountAggregationView()
{
try (TestTable table = new TestTable(
bigQuerySqlExecutor,
"test.count_aggregation_table",
"(a INT64, b INT64, c INT64)",
List.of("1, 2, 3", "4, 5, 6"));
TestView view = new TestView(bigQuerySqlExecutor, "test.count_aggregation_view", "SELECT * FROM " + table.getName())) {
assertQuery("SELECT count(*) FROM " + view.getName(), "VALUES (2)");
assertQuery("SELECT count(*) FROM " + view.getName() + " WHERE a = 1", "VALUES (1)");
assertQuery("SELECT count(a) FROM " + view.getName() + " WHERE b = 2", "VALUES (1)");
}
}

/**
* regression test for https://github.com/trinodb/trino/issues/6696
*/
@Test
public void testRepeatCountAggregationView()
{
try (TestView view = new TestView(bigQuerySqlExecutor, "test.repeat_count_aggregation_view", "SELECT 1 AS col1")) {
assertQuery("SELECT count(*) FROM " + view.getName(), "VALUES (1)");
assertQuery("SELECT count(*) FROM " + view.getName(), "VALUES (1)");
}
}

/**
* https://github.com/trinodb/trino/issues/8183
*/
Expand Down Expand Up @@ -569,10 +580,8 @@ public void testBigQuerySnapshotTable()
}

@Test
@Parameters("testing.gcp-storage-bucket")
public void testBigQueryExternalTable(String gcpStorageBucket)
public void testBigQueryExternalTable()
{
// Prerequisite: upload region.csv in resources directory to gs://{testing.gcp-storage-bucket}/tpch/tiny/region.csv
String externalTable = "test_external" + randomNameSuffix();
try {
onBigQuery("CREATE EXTERNAL TABLE test." + externalTable + " OPTIONS (format = 'CSV', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])");
Expand Down