diff --git a/.github/labeler.yml b/.github/labeler.yml index d11c68264cb5..3eea37ca4488 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -159,7 +159,9 @@ GCP: - changed-files: - any-glob-to-any-file: [ 'gcp/**/*', - 'gcp-bundle/**/*' + 'gcp-bundle/**/*', + 'bigquery/**/*', + 'bigquery-bundle/**/*' ] DELL: diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryClient.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryClient.java new file mode 100644 index 000000000000..310293d0ed18 --- /dev/null +++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryClient.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery; + +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList.Datasets; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableList.Tables; +import com.google.api.services.bigquery.model.TableReference; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A client of Google BigQuery Metastore functions over the BigQuery service. Uses the Google + * BigQuery API. + */ +public interface BigQueryClient { + + /** + * Creates and returns a new dataset. + * + * @param dataset the dataset to create + */ + Dataset createDataset(Dataset dataset); + + /** + * Returns a dataset. + * + * @param datasetReference full dataset reference + */ + Dataset getDataset(DatasetReference datasetReference); + + /** + * Deletes a dataset. + * + * @param datasetReference full dataset reference + */ + void deleteDataset(DatasetReference datasetReference); + + /** + * Updates parameters of a dataset or adds them if did not exist, leaving already-existing ones + * intact. + * + * @param datasetReference full dataset reference + * @param parameters metadata parameters to add + * @return dataset after patch + */ + Dataset setDatasetParameters(DatasetReference datasetReference, Map parameters); + + /** + * Removes given set of keys of parameters of a dataset. Ignores keys that do not exist already. + * + * @param datasetReference full dataset reference + * @param parameters metadata parameter keys to remove + * @return dataset after patch + */ + Dataset removeDatasetParameters(DatasetReference datasetReference, Set parameters); + + /** + * Lists datasets under a given project + * + * @param projectId the identifier of the project to list datasets under + */ + List listDatasets(String projectId); + + /** + * Creates and returns a new table. + * + * @param table body of the table to create + */ + Table createTable(Table table); + + /** + * Returns a table. + * + * @param tableReference full table reference + */ + Table getTable(TableReference tableReference); + + /** + * Updates the catalog table options of an Iceberg table and returns the updated table. + * + * @param tableReference full table reference + * @param table to patch + */ + Table patchTable(TableReference tableReference, Table table); + + /** + * Renames a table. + * + * @param tableToRename full table reference + * @param newTableId new table identifier + */ + Table renameTable(TableReference tableToRename, String newTableId); + + /** + * Deletes a table. + * + * @param tableReference full table reference + */ + void deleteTable(TableReference tableReference); + + /** + * Returns all tables in a database. + * + * @param datasetReference full dataset reference + * @param filterUnsupportedTables if true, fetches every item on the list to verify it is + * supported, in order to filter the list from unsupported Iceberg tables + */ + List listTables(DatasetReference datasetReference, boolean filterUnsupportedTables); +} diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryClientImpl.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryClientImpl.java new file mode 100644 index 000000000000..b35a7aaaa6b4 --- /dev/null +++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryClientImpl.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery; + +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.client.util.Data; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.BigqueryScopes; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList; +import com.google.api.services.bigquery.model.DatasetList.Datasets; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableList; +import com.google.api.services.bigquery.model.TableList.Tables; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotAuthorizedException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ServiceFailureException; +import org.apache.iceberg.exceptions.ServiceUnavailableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** A client of Google Bigquery Metastore functions over the BigQuery service. */ +public final class BigQueryClientImpl implements BigQueryClient { + + public static final String NOT_AUTHORIZED_ERROR_MESSAGE = + "Not authorized to call the BigQuery API or access this resource"; + private final Bigquery client; + + /** Constructs a client of the Google BigQuery service. */ + public BigQueryClientImpl() throws IOException, GeneralSecurityException { + // Initialize client that will be used to send requests. This client only needs to be created + // once, and can be reused for multiple requests + HttpCredentialsAdapter httpCredentialsAdapter = + new HttpCredentialsAdapter( + GoogleCredentials.getApplicationDefault().createScoped(BigqueryScopes.all())); + this.client = + new Bigquery.Builder( + GoogleNetHttpTransport.newTrustedTransport(), + GsonFactory.getDefaultInstance(), + httpRequest -> { + httpCredentialsAdapter.initialize(httpRequest); + httpRequest.setThrowExceptionOnExecuteError( + false); // Less catching of exceptions, more analysis of the same HttpResponse + // object, inspecting its status code + }) + .setApplicationName("BigQuery Iceberg Catalog Plugin") + .build(); + } + + @VisibleForTesting + BigQueryClientImpl(Bigquery client) { + this.client = client; + } + + @Override + public Dataset createDataset(Dataset dataset) { + try { + HttpResponse response = + client + .datasets() + .insert(dataset.getDatasetReference().getProjectId(), dataset) + .executeUnparsed(); + return convertExceptionIfUnsuccessful(response).parseAs(Dataset.class); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public Dataset getDataset(DatasetReference datasetReference) { + try { + HttpResponse response = + client + .datasets() + .get(datasetReference.getProjectId(), datasetReference.getDatasetId()) + .executeUnparsed(); + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new NoSuchNamespaceException(response.getStatusMessage()); + } + + return convertExceptionIfUnsuccessful(response).parseAs(Dataset.class); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public void deleteDataset(DatasetReference datasetReference) { + try { + HttpResponse response = + client + .datasets() + .delete(datasetReference.getProjectId(), datasetReference.getDatasetId()) + .executeUnparsed(); + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new NoSuchNamespaceException(response.getStatusMessage()); + } + + convertExceptionIfUnsuccessful(response); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public Dataset setDatasetParameters( + DatasetReference datasetReference, Map parameters) { + Dataset dataset = getDataset(datasetReference); + ExternalCatalogDatasetOptions externalCatalogDatasetOptions = + dataset.getExternalCatalogDatasetOptions() == null + ? new ExternalCatalogDatasetOptions() + : dataset.getExternalCatalogDatasetOptions(); + Map finalParameters = + externalCatalogDatasetOptions.getParameters() == null + ? Maps.newHashMap() + : externalCatalogDatasetOptions.getParameters(); + finalParameters.putAll(parameters); + + dataset.setExternalCatalogDatasetOptions( + externalCatalogDatasetOptions.setParameters(finalParameters)); + + return updateDataset(dataset); + } + + @Override + public Dataset removeDatasetParameters( + DatasetReference datasetReference, Set parameters) { + Dataset dataset = getDataset(datasetReference); + ExternalCatalogDatasetOptions externalCatalogDatasetOptions = + dataset.getExternalCatalogDatasetOptions() == null + ? new ExternalCatalogDatasetOptions() + : dataset.getExternalCatalogDatasetOptions(); + Map finalParameters = + externalCatalogDatasetOptions.getParameters() == null + ? Maps.newHashMap() + : externalCatalogDatasetOptions.getParameters(); + parameters.forEach(finalParameters::remove); + + dataset.setExternalCatalogDatasetOptions( + externalCatalogDatasetOptions.setParameters(finalParameters)); + + return updateDataset(dataset); + } + + @Override + public List listDatasets(String projectId) { + try { + String nextPageToken = null; + List datasets = Lists.newArrayList(); + do { + HttpResponse pageResponse = + client.datasets().list(projectId).setPageToken(nextPageToken).executeUnparsed(); + DatasetList result = + convertExceptionIfUnsuccessful(pageResponse).parseAs(DatasetList.class); + nextPageToken = result.getNextPageToken(); + datasets.addAll(result.getDatasets()); + } while (nextPageToken != null && !nextPageToken.isEmpty()); + return datasets; + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public Table createTable(Table table) { + try { + validateTable(table); + HttpResponse response = + client + .tables() + .insert( + Preconditions.checkNotNull(table.getTableReference()).getProjectId(), + Preconditions.checkNotNull(table.getTableReference()).getDatasetId(), + table) + .executeUnparsed(); + return convertExceptionIfUnsuccessful(response).parseAs(Table.class); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public Table getTable(TableReference tableReference) { + try { + HttpResponse response = + client + .tables() + .get( + tableReference.getProjectId(), + tableReference.getDatasetId(), + tableReference.getTableId()) + .executeUnparsed(); + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new NoSuchTableException(response.getStatusMessage()); + } + + return validateTable(convertExceptionIfUnsuccessful(response).parseAs(Table.class)); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public Table patchTable(TableReference tableReference, Table table) { + validateTable(table); // Ensure it is an Iceberg table supported by the catalog + + ExternalCatalogTableOptions newExternalCatalogTableOptions = + new ExternalCatalogTableOptions() + .setStorageDescriptor(table.getExternalCatalogTableOptions().getStorageDescriptor()) + .setConnectionId(table.getExternalCatalogTableOptions().getConnectionId()) + .setParameters(table.getExternalCatalogTableOptions().getParameters()); + Table patch = + new Table() + .setExternalCatalogTableOptions(newExternalCatalogTableOptions) + // TODO(b/341933455): Update this once the server side accepts schema. + // Must set the schema as null for using schema auto-detect + .setSchema(Data.nullOf(TableSchema.class)); + + try { + HttpResponse response = + client + .tables() + .patch( + tableReference.getProjectId(), + tableReference.getDatasetId(), + tableReference.getTableId(), + patch) + .setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag())) + .executeUnparsed(); + + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + String responseString = response.parseAsString(); + if (responseString.toLowerCase(Locale.ROOT).contains("not found: connection")) { + throw new BadRequestException(responseString); + } + + throw new NoSuchTableException(response.getStatusMessage()); + } + + return convertExceptionIfUnsuccessful(response).parseAs(Table.class); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public Table renameTable(TableReference tableToRename, String newTableId) { + Table table = getTable(tableToRename); // Verify table first + Table patch = + new Table() + .setTableReference( + new TableReference() + .setProjectId(table.getTableReference().getProjectId()) + .setDatasetId(table.getTableReference().getDatasetId()) + .setTableId(newTableId)); + + try { + HttpResponse response = + client + .tables() + .patch( + tableToRename.getProjectId(), + tableToRename.getDatasetId(), + tableToRename.getTableId(), + patch) + .setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag())) + .executeUnparsed(); + + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new NoSuchTableException(response.getStatusMessage()); + } + + return convertExceptionIfUnsuccessful(response).parseAs(Table.class); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public void deleteTable(TableReference tableReference) { + try { + getTable(tableReference); // Fetching it to validate it is a BigQuery Metastore table first + + HttpResponse response = + client + .tables() + .delete( + tableReference.getProjectId(), + tableReference.getDatasetId(), + tableReference.getTableId()) + .executeUnparsed(); + + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new NoSuchTableException(response.getStatusMessage()); + } + + convertExceptionIfUnsuccessful(response); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + @SuppressWarnings("FormatStringAnnotation") + public List listTables( + DatasetReference datasetReference, boolean filterUnsupportedTables) { + try { + String nextPageToken = null; + Stream tablesStream = Stream.empty(); + do { + HttpResponse pageResponse = + client + .tables() + .list(datasetReference.getProjectId(), datasetReference.getDatasetId()) + .setPageToken(nextPageToken) + .executeUnparsed(); + if (pageResponse.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new NoSuchNamespaceException(pageResponse.getStatusMessage()); + } + + TableList result = convertExceptionIfUnsuccessful(pageResponse).parseAs(TableList.class); + nextPageToken = result.getNextPageToken(); + Stream tablesPageStream = result.getTables().stream(); + tablesStream = Stream.concat(tablesStream, tablesPageStream); + } while (nextPageToken != null && !nextPageToken.isEmpty()); + + // TODO(b/345839927): The server should return more metadata here to distinguish Iceberg + // BQMS tables for us to filter out those results since invoking `getTable` on them would + // correctly raise a `NoSuchIcebergTableException` for being inoperable by this plugin. + if (filterUnsupportedTables) { + tablesStream = + tablesStream + .parallel() + .filter( + table -> { + try { + getTable(table.getTableReference()); + } catch (NoSuchTableException e) { + return false; + } + return true; + }); + } + + return tablesStream.collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @SuppressWarnings("FormatStringAnnotation") + private Dataset updateDataset(Dataset dataset) { + try { + HttpResponse response = + client + .datasets() + .update( + Preconditions.checkNotNull(dataset.getDatasetReference()).getProjectId(), + Preconditions.checkNotNull(dataset.getDatasetReference().getDatasetId()), + dataset) + .setRequestHeaders(new HttpHeaders().setIfMatch(dataset.getEtag())) + .executeUnparsed(); + if (response.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + throw new NoSuchNamespaceException(response.getStatusMessage()); + } + + return convertExceptionIfUnsuccessful(response).parseAs(Dataset.class); + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + /** + * Returns true when it is a BigQuery Metastore Iceberg table, defined by having the + * ExternalCatalogTableOptions object and a parameter of METADATA_LOCATION_PROP as part of its + * parameters map. + * + * @param table to check + */ + private boolean isValidIcebergTable(Table table) { + return table.getExternalCatalogTableOptions() != null + && !table.getExternalCatalogTableOptions().isEmpty() + && table.getExternalCatalogTableOptions().getParameters() != null + && table + .getExternalCatalogTableOptions() + .getParameters() + .containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) + && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase( + table + .getExternalCatalogTableOptions() + .getParameters() + .get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + } + + private Table validateTable(Table table) { + if (!isValidIcebergTable(table)) { + throw new NoSuchIcebergTableException("This table is not a valid Iceberg table: %s", table); + } + + return table; + } + + /** + * Converts BigQuery generic API errors to Iceberg exceptions, *without* handling the + * resource-specific exceptions like NoSuchTableException, NoSuchNamespaceException, etc. + */ + @SuppressWarnings("FormatStringAnnotation") + private HttpResponse convertExceptionIfUnsuccessful(HttpResponse response) throws IOException { + if (response.isSuccessStatusCode()) { + return response; + } + + String errorMessage = + String.format( + "%s\n%s", + response.getStatusMessage(), + response.getContent() != null + ? new String(response.getContent().readAllBytes(), StandardCharsets.UTF_8) + : ""); + switch (response.getStatusCode()) { + case HttpStatusCodes.STATUS_CODE_UNAUTHORIZED: + throw new NotAuthorizedException(errorMessage, NOT_AUTHORIZED_ERROR_MESSAGE); + case HttpStatusCodes.STATUS_CODE_BAD_REQUEST: + throw new BadRequestException(errorMessage); + case HttpStatusCodes.STATUS_CODE_FORBIDDEN: + throw new ForbiddenException(errorMessage); + case HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED: + throw new ValidationException(errorMessage); + case HttpStatusCodes.STATUS_CODE_NOT_FOUND: + throw new NotFoundException(errorMessage); + case HttpStatusCodes.STATUS_CODE_SERVER_ERROR: + throw new ServiceFailureException(errorMessage); + case HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE: + throw new ServiceUnavailableException(errorMessage); + default: + throw new HttpResponseException(response); + } + } +} diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreCatalog.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreCatalog.java new file mode 100644 index 000000000000..d3b497ec87d5 --- /dev/null +++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreCatalog.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery.metastore; + +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList.Datasets; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.TableReference; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ServiceFailureException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.gcp.bigquery.BigQueryClient; +import org.apache.iceberg.gcp.bigquery.BigQueryClientImpl; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Strings; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.LocationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Iceberg Bigquery Metastore (BQMS) Catalog implementation. */ +public final class BigQueryMetastoreCatalog extends BaseMetastoreCatalog + implements SupportsNamespaces, Configurable { + + /** + * A user-provided property to filter tables that are inoperable by the BigQuery Metastore - + * Iceberg integration. + */ + public static final String PROPERTIES_KEY_FILTER_UNSUPPORTED_TABLES = "filter_unsupported_tables"; + + private static final Logger LOG = LoggerFactory.getLogger(BigQueryMetastoreCatalog.class); + + private static final String DEFAULT_GCP_LOCATION = "us"; + + private String catalogPluginName; + private Map catalogProperties; + private FileIO fileIO; + private Object conf; + private String projectId; + private String location; + private String defaultWarehouseLocation; + private BigQueryClient client; + private boolean filterUnsupportedTables; + + // Must have a no-arg constructor to be dynamically loaded + // initialize(String name, Map properties) will be called to complete + // initialization + public BigQueryMetastoreCatalog() {} + + @Override + public void initialize(String inputName, Map properties) { + if (!properties.containsKey(GCPProperties.PROJECT_ID)) { + throw new ValidationException("GCP project must be specified"); + } + + this.projectId = properties.get(GCPProperties.PROJECT_ID); + this.location = properties.getOrDefault(GCPProperties.BIGQUERY_LOCATION, DEFAULT_GCP_LOCATION); + BigQueryClient bigQueryClient; + try { + bigQueryClient = new BigQueryClientImpl(); + } catch (IOException e) { + throw new ServiceFailureException(e, "Creating BigQuery client failed"); + } catch (GeneralSecurityException e) { + throw new ValidationException(e, "Creating BigQuery client failed due to a security issue"); + } + + initialize(inputName, properties, projectId, location, bigQueryClient); + } + + @VisibleForTesting + void initialize( + String inputName, + Map properties, + String gcpProjectId, + String gcpLocation, + BigQueryClient bigQueryClient) { + this.catalogPluginName = inputName; + this.catalogProperties = ImmutableMap.copyOf(properties); + this.projectId = gcpProjectId; + this.location = gcpLocation; + this.client = + Preconditions.checkNotNull(bigQueryClient, "Failed to initialize BigQuery Client"); + + LOG.info("Using BigQuery Metastore Iceberg Catalog: {}", inputName); + + if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) { + // Iceberg always removes trailing slash to avoid paths like "//data" in file systems + // like s3. + this.defaultWarehouseLocation = + LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION)); + } + + String fileIoImpl = + properties.getOrDefault( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.hadoop.HadoopFileIO"); + this.fileIO = CatalogUtil.loadFileIO(fileIoImpl, properties, conf); + + this.filterUnsupportedTables = + Boolean.parseBoolean( + properties.getOrDefault(PROPERTIES_KEY_FILTER_UNSUPPORTED_TABLES, "false")); + } + + @Override + protected TableOperations newTableOps(TableIdentifier identifier) { + return new BigQueryTableOperations( + client, + fileIO, + projectId, + // Sometimes extensions have the namespace contain the table name too, so we are forced to + // allow invalid namespace and just take the first part here like other catalog + // implementations do. + identifier.namespace().level(0), + identifier.name()); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier identifier) { + String locationUri = null; + DatasetReference datasetReference = toDatasetReference(identifier.namespace()); + Dataset dataset = client.getDataset(datasetReference); + if (dataset != null && dataset.getExternalCatalogDatasetOptions() != null) { + locationUri = dataset.getExternalCatalogDatasetOptions().getDefaultStorageLocationUri(); + } + + return String.format( + "%s/%s", + Strings.isNullOrEmpty(locationUri) + ? getDefaultStorageLocationUri(datasetReference.getDatasetId()) + : locationUri, + identifier.name()); + } + + @Override + public List listTables(Namespace namespace) { + validateNamespace(namespace); + + return client.listTables(toDatasetReference(namespace), filterUnsupportedTables).stream() + .map( + table -> TableIdentifier.of(namespace.level(0), table.getTableReference().getTableId())) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + try { + TableOperations ops = newTableOps(identifier); + TableMetadata lastMetadata = ops.current(); + + client.deleteTable(toBqTableReference(identifier)); + + if (purge && lastMetadata != null) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + + } catch (NoSuchTableException e) { // Not catching a NoSuchIcebergTableException on purpose + return false; // The documentation says just return false in this case + } + + return true; + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + if (!from.namespace().equals(to.namespace())) { + throw new ValidationException("New table name must be in the same namespace"); + } + + // TODO(b/354981675): Enable once supported by the API. + throw new ServiceFailureException( + "Table rename operation is unsupported. Try the SQL operation directly on BigQuery: \"ALTER TABLE %s RENAME TO %s;\"", + from.name(), to.name()); + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + Dataset builder = new Dataset(); + DatasetReference datasetReference = toDatasetReference(namespace); + builder.setLocation(this.location); + builder.setDatasetReference(datasetReference); + builder.setExternalCatalogDatasetOptions( + BigQueryMetastoreUtils.createExternalCatalogDatasetOptions( + getDefaultStorageLocationUri(datasetReference.getDatasetId()), metadata)); + + client.createDataset(builder); + } + + /** + * Since this catalog only supports one-level namespaces, it always returns an empty list unless + * passed an empty namespace to list all namespaces within the catalog. + */ + @Override + public List listNamespaces(Namespace namespace) { + if (namespace.levels().length != 0) { + // BQMS does not support namespaces under database or tables, returns empty. + // It is called when dropping a namespace to make sure it's empty (listTables is called as + // well), returns empty to unblock deletion. + return ImmutableList.of(); + } + + return client.listDatasets(projectId).stream() + .map(BigQueryMetastoreCatalog::getNamespace) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public boolean dropNamespace(Namespace namespace) { + client.deleteDataset(toDatasetReference(namespace)); + /* We don't delete the data folder for safety, which aligns with Hive Metastore's default + * behavior. + * We can support database or catalog level config controlling file deletion in the future. + */ + return true; + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) { + client.setDatasetParameters(toDatasetReference(namespace), properties); + return true; + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) { + client.removeDatasetParameters(toDatasetReference(namespace), properties); + return true; + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) { + return getMetadata(client.getDataset(toDatasetReference(namespace))); + } + + @Override + public String name() { + return catalogPluginName; + } + + @Override + protected Map properties() { + return catalogProperties == null ? ImmutableMap.of() : catalogProperties; + } + + @Override + public void setConf(Object conf) { + this.conf = conf; + } + + private String getDefaultStorageLocationUri(String dbId) { + Preconditions.checkNotNull(defaultWarehouseLocation, "Data warehouse location is not set"); + return String.format( + "%s/%s.db", LocationUtil.stripTrailingSlash(defaultWarehouseLocation), dbId); + } + + private static Namespace getNamespace(Datasets datasets) { + return Namespace.of(datasets.getDatasetReference().getDatasetId()); + } + + private DatasetReference toDatasetReference(Namespace namespace) { + validateNamespace(namespace); + return new DatasetReference().setProjectId(projectId).setDatasetId(namespace.level(0)); + } + + private TableReference toBqTableReference(TableIdentifier tableIdentifier) { + DatasetReference datasetReference = toDatasetReference(tableIdentifier.namespace()); + return new TableReference() + .setProjectId(datasetReference.getProjectId()) + .setDatasetId(datasetReference.getDatasetId()) + .setTableId(tableIdentifier.name()); + } + + private static Map getMetadata(Dataset dataset) { + ExternalCatalogDatasetOptions options = dataset.getExternalCatalogDatasetOptions(); + Map metadata = Maps.newHashMap(); + if (options != null) { + metadata.putAll(options.getParameters()); + if (!Strings.isNullOrEmpty(options.getDefaultStorageLocationUri())) { + metadata.put("location", options.getDefaultStorageLocationUri()); + } + } + return metadata; + } + + private static void validateNamespace(Namespace namespace) { + Preconditions.checkArgument(namespace.levels().length == 1, invalidNamespaceMessage(namespace)); + } + + private static String invalidNamespaceMessage(Namespace namespace) { + return String.format( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"%s\" has %d" + + " levels", + namespace, namespace.levels().length); + } +} diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreUtils.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreUtils.java new file mode 100644 index 000000000000..63e80f1b1ad2 --- /dev/null +++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery.metastore; + +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.SerDeInfo; +import com.google.api.services.bigquery.model.StorageDescriptor; +import java.util.Map; + +/** Shared utilities for BigQuery Metastore specific functions and constants. */ +public final class BigQueryMetastoreUtils { + + private BigQueryMetastoreUtils() {} + + // TODO: Consider using "org.apache.iceberg.mr.hive.HiveIcebergSerDe" when + // TableProperties.ENGINE_HIVE_ENABLED is set. + public static final String SERIALIZATION_LIBRARY = + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + public static final String FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.FileInputFormat"; + public static final String FILE_OUTPUT_FORMAT = "org.apache.hadoop.mapred.FileOutputFormat"; + + /** + * Creates a new ExternalCatalogTableOptions object populated with the supported library constants + * and parameters given. + * + * @param locationUri storage location uri + * @param parameters table metadata parameters + */ + public static ExternalCatalogTableOptions createExternalCatalogTableOptions( + String locationUri, Map parameters) { + return new ExternalCatalogTableOptions() + .setStorageDescriptor(createStorageDescriptor(locationUri)) + .setParameters(parameters); + } + + /** + * Creates a new ExternalCatalogDatasetOptions object populated with the supported library + * constants and parameters given. + * + * @param defaultStorageLocationUri dataset's default location uri + * @param metadataParameters metadata parameters for the dataset + */ + public static ExternalCatalogDatasetOptions createExternalCatalogDatasetOptions( + String defaultStorageLocationUri, Map metadataParameters) { + return new ExternalCatalogDatasetOptions() + .setDefaultStorageLocationUri(defaultStorageLocationUri) + .setParameters(metadataParameters); + } + + private static StorageDescriptor createStorageDescriptor(String locationUri) { + SerDeInfo serDeInfo = new SerDeInfo().setSerializationLibrary(SERIALIZATION_LIBRARY); + + return new StorageDescriptor() + .setLocationUri(locationUri) + .setInputFormat(FILE_INPUT_FORMAT) + .setOutputFormat(FILE_OUTPUT_FORMAT) + .setSerdeInfo(serDeInfo); + } +} diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryTableOperations.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryTableOperations.java new file mode 100644 index 000000000000..4360259a4c27 --- /dev/null +++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryTableOperations.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery.metastore; + +import com.google.api.client.util.Maps; +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import java.util.Locale; +import java.util.Map; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.gcp.bigquery.BigQueryClient; +import org.apache.iceberg.io.FileIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Handles BigQuery metastore table operations. */ +public final class BigQueryTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableOperations.class); + + public static final String TABLE_PROPERTIES_BQ_CONNECTION = "bq_connection"; + + private final BigQueryClient client; + private final FileIO fileIO; + private final TableReference tableReference; + + BigQueryTableOperations( + BigQueryClient client, FileIO fileIO, String project, String dataset, String table) { + this.client = client; + this.fileIO = fileIO; + this.tableReference = + new TableReference().setProjectId(project).setDatasetId(dataset).setTableId(table); + } + + // The doRefresh method should provide implementation on how to get the metadata location. + @Override + public void doRefresh() { + // Must default to null. + String metadataLocation = null; + try { + metadataLocation = + getMetadataLocationOrThrow( + client.getTable(this.tableReference).getExternalCatalogTableOptions()); + } catch (NoSuchTableException e) { + if (currentMetadataLocation() != null) { + // Re-throws the exception because the table must exist in this case. + throw e; + } + } + + refreshFromMetadataLocation(metadataLocation); + } + + // The doCommit method should provide implementation on how to update with metadata location + // atomically + @Override + public void doCommit(TableMetadata base, TableMetadata metadata) { + String newMetadataLocation = + base == null && metadata.metadataFileLocation() != null + ? metadata.metadataFileLocation() + : writeNewMetadata(metadata, currentVersion() + 1); + + CommitStatus commitStatus = CommitStatus.FAILURE; + try { + if (base == null) { + createTable(newMetadataLocation, metadata); + } else { + updateTable(base.metadataFileLocation(), newMetadataLocation, metadata); + } + + commitStatus = CommitStatus.SUCCESS; + } catch (CommitFailedException | CommitStateUnknownException e) { + throw e; + } catch (Throwable e) { + LOG.error("Exception thrown on commit: ", e); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); + if (commitStatus == CommitStatus.FAILURE) { + throw new CommitFailedException(e, "Failed to commit"); + } + + if (commitStatus == CommitStatus.UNKNOWN) { + throw new CommitStateUnknownException(e); + } + } finally { + try { + if (commitStatus == CommitStatus.FAILURE) { + LOG.warn("Failed to commit updates to table {}", tableName()); + } + } catch (RuntimeException e) { + LOG.error("Failed to cleanup metadata file at {} for table", newMetadataLocation, e); + } + } + } + + @Override + public String tableName() { + return String.format("%s.%s", tableReference.getDatasetId(), tableReference.getTableId()); + } + + @Override + public FileIO io() { + return fileIO; + } + + private void createTable(String newMetadataLocation, TableMetadata metadata) { + LOG.debug("Creating a new Iceberg table: {}", tableName()); + Table tableBuilder = makeNewTable(metadata, newMetadataLocation); + tableBuilder.setTableReference(this.tableReference); + addConnectionIfProvided(tableBuilder, metadata.properties()); + + client.createTable(tableBuilder); + } + + private void addConnectionIfProvided(Table tableBuilder, Map metadataProperties) { + if (metadataProperties.containsKey(TABLE_PROPERTIES_BQ_CONNECTION)) { + tableBuilder + .getExternalCatalogTableOptions() + .setConnectionId(metadataProperties.get(TABLE_PROPERTIES_BQ_CONNECTION)); + } + } + + /** Update table properties with concurrent update detection using etag. */ + private void updateTable( + String oldMetadataLocation, String newMetadataLocation, TableMetadata metadata) { + Table table = client.getTable(this.tableReference); + if (table.getEtag().isEmpty()) { + throw new ValidationException( + "Etag of legacy table %s is empty, manually update the table via the BigQuery API or" + + " recreate and retry", + tableName()); + } + + ExternalCatalogTableOptions options = table.getExternalCatalogTableOptions(); + addConnectionIfProvided(table, metadata.properties()); + + // If `metadataLocationFromMetastore` is different from metadata location of base, it means + // someone has updated metadata location in metastore, which is a conflict update. + String metadataLocationFromMetastore = + options.getParameters().getOrDefault(METADATA_LOCATION_PROP, ""); + if (!metadataLocationFromMetastore.isEmpty() + && !metadataLocationFromMetastore.equals(oldMetadataLocation)) { + throw new CommitFailedException( + "Base metadata location '%s' is not same as the current table metadata location '%s' for" + + " %s.%s", + oldMetadataLocation, + metadataLocationFromMetastore, + tableReference.getDatasetId(), + tableReference.getTableId()); + } + + options.setParameters(buildTableParameters(newMetadataLocation, metadata)); + try { + client.patchTable(tableReference, table); + } catch (ValidationException e) { + if (e.getMessage().toLowerCase(Locale.ROOT).contains("etag mismatch")) { + throw new CommitFailedException( + "Updating table failed due to conflict updates (etag mismatch). Retry the update"); + } + + throw e; + } + } + + // To make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED + // parameter. + // + // TODO(b/318693532): Decide on whether and how to make the table queryable from Hive engine. + // (could be a server side change or a client side change - that's TBD). + private Table makeNewTable(TableMetadata metadata, String metadataFileLocation) { + return new Table() + .setExternalCatalogTableOptions( + BigQueryMetastoreUtils.createExternalCatalogTableOptions( + metadata.location(), buildTableParameters(metadataFileLocation, metadata))); + } + + // Follow Iceberg's HiveTableOperations to populate more table parameters for HMS compatibility. + private Map buildTableParameters( + String metadataFileLocation, TableMetadata metadata) { + Map parameters = Maps.newHashMap(); + parameters.putAll(metadata.properties()); + if (metadata.uuid() != null) { + parameters.put(TableProperties.UUID, metadata.uuid()); + } + + if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); + } + + parameters.put(METADATA_LOCATION_PROP, metadataFileLocation); + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE); + // Follow HMS to use the EXTERNAL type. + parameters.put("EXTERNAL", "TRUE"); + + // Hive style basic statistics. + updateParametersWithSnapshotMetadata(metadata, parameters); + // More Iceberg metadata can be exposed, e.g., statistic, schema, partition spec, as HMS do. But + // we should be careful that these metadata could be huge and make the metadata API response + // less readable (e.g., list tables). Users can always inspect these metadata in Spark, so they + // are not set for now. + return parameters; + } + + /** Adds Hive-style basic statistics from snapshot metadata if it exists. */ + private static void updateParametersWithSnapshotMetadata( + TableMetadata metadata, Map parameters) { + if (metadata.currentSnapshot() == null) { + return; + } + + Map summary = metadata.currentSnapshot().summary(); + if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) { + parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + } + + if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) { + parameters.put(StatsSetupConst.ROW_COUNT, summary.get(SnapshotSummary.TOTAL_RECORDS_PROP)); + } + + if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) { + parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + } + } + + private String getMetadataLocationOrThrow(ExternalCatalogTableOptions tableOptions) { + if (tableOptions == null || !tableOptions.getParameters().containsKey(METADATA_LOCATION_PROP)) { + throw new ValidationException( + "Table %s is not a valid BigQuery Metastore Iceberg table, metadata location not found", + tableName()); + } + + return tableOptions.getParameters().get(METADATA_LOCATION_PROP); + } +} diff --git a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/BigQueryClientImplTest.java b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/BigQueryClientImplTest.java new file mode 100644 index 000000000000..3d9c430981b8 --- /dev/null +++ b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/BigQueryClientImplTest.java @@ -0,0 +1,770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.client.testing.http.HttpTesting; +import com.google.api.client.testing.http.MockHttpContent; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.client.util.Data; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.Bigquery.Datasets; +import com.google.api.services.bigquery.Bigquery.Tables; +import com.google.api.services.bigquery.BigqueryRequest; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.SerDeInfo; +import com.google.api.services.bigquery.model.StorageDescriptor; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableList; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.ForbiddenException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotAuthorizedException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ServiceFailureException; +import org.apache.iceberg.exceptions.ServiceUnavailableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.gcp.bigquery.metastore.BigQueryMetastoreUtils; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; + +public class BigQueryClientImplTest { + + public static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + + private static final String GCP_PROJECT = "my-project"; + private static final String GCP_REGION = "europe-west4"; + private static final String DATASET_ID = "db"; + private static final String TABLE_ID = "tbl"; + private static final DatasetReference DATASET_REFERENCE = + new DatasetReference().setProjectId(GCP_PROJECT).setDatasetId(DATASET_ID); + private static final TableReference TABLE_REFERENCE = + new TableReference().setProjectId(GCP_PROJECT).setDatasetId(DATASET_ID).setTableId(TABLE_ID); + private final Dataset dataset = + new Dataset() + .setEtag("datasetEtag") + .setDatasetReference(DATASET_REFERENCE) + .setLocation(GCP_REGION) + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setParameters(Maps.newHashMap(Map.of("initialDatasetKey", "initialDatasetVal"))) + .setDefaultStorageLocationUri("someDefaultStorageLocationUri")); + private final Table table = + new Table() + .setEtag("tableEtag") + .setTableReference(TABLE_REFERENCE) + .setExternalCatalogTableOptions( + new ExternalCatalogTableOptions() + .setParameters( + Map.of( + "initialTableKey", + "initialTableVal", + BaseMetastoreTableOperations.METADATA_LOCATION_PROP, + "location", + BaseMetastoreTableOperations.TABLE_TYPE_PROP, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE)) + .setConnectionId("someConnectionId") + .setStorageDescriptor( + new StorageDescriptor() + .setLocationUri("someLocationUri") + .setInputFormat(BigQueryMetastoreUtils.FILE_INPUT_FORMAT) + .setOutputFormat(BigQueryMetastoreUtils.FILE_OUTPUT_FORMAT) + .setSerdeInfo( + new SerDeInfo() + .setSerializationLibrary( + BigQueryMetastoreUtils.SERIALIZATION_LIBRARY)))); + + private final Map> errorCodeToIcebergException = + Map.of( + HttpStatusCodes.STATUS_CODE_UNAUTHORIZED, + NotAuthorizedException.class, + HttpStatusCodes.STATUS_CODE_BAD_REQUEST, + BadRequestException.class, + HttpStatusCodes.STATUS_CODE_FORBIDDEN, + ForbiddenException.class, + HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED, + ValidationException.class, + HttpStatusCodes.STATUS_CODE_NOT_FOUND, + NotFoundException.class, + HttpStatusCodes.STATUS_CODE_SERVER_ERROR, + ServiceFailureException.class, + HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE, + ServiceUnavailableException.class, + 12345, // This lucky number is here to represent a random unsupported HTTP status code + RuntimeIOException.class); + + private final Bigquery bigqueryMock = mock(Bigquery.class); + + private final Datasets datasetsMock = mock(Datasets.class); + private final Datasets.Insert datasetInsertMock = mock(Datasets.Insert.class); + private final Datasets.Get datasetGetMock = mock(Datasets.Get.class); + private final Datasets.Delete datasetDeleteMock = mock(Datasets.Delete.class); + private final Datasets.Update datasetUpdateMock = mock(Datasets.Update.class); + private final Datasets.List datasetListMock = mock(Datasets.List.class); + + private final Tables tablesMock = mock(Tables.class); + private final Tables.Insert tableInsertMock = mock(Tables.Insert.class); + private final Tables.Get tableGetMock = mock(Tables.Get.class); + private final Tables.Get unsupportedTableGetMock = mock(Tables.Get.class); + private final Tables.Patch tablePatchMock = mock(Tables.Patch.class); + private final Tables.Delete tableDeleteMock = mock(Tables.Delete.class); + private final Tables.List tableListMock = mock(Tables.List.class); + + private BigQueryClient client; + + @BeforeEach + public void before() throws Exception { + prepareMocks(); + setJsonFactory(); + this.client = new BigQueryClientImpl(bigqueryMock); + } + + private void prepareMocks() throws IOException { + when(bigqueryMock.datasets()).thenReturn(datasetsMock); + when(bigqueryMock.tables()).thenReturn(tablesMock); + + when(datasetsMock.insert(GCP_PROJECT, dataset)).thenReturn(datasetInsertMock); + when(datasetsMock.get(GCP_PROJECT, DATASET_ID)).thenReturn(datasetGetMock); + when(datasetsMock.delete(GCP_PROJECT, DATASET_ID)).thenReturn(datasetDeleteMock); + when(datasetsMock.update(GCP_PROJECT, DATASET_ID, dataset)).thenReturn(datasetUpdateMock); + when(datasetsMock.list(GCP_PROJECT)).thenReturn(datasetListMock); + + when(tablesMock.insert(GCP_PROJECT, DATASET_ID, table)).thenReturn(tableInsertMock); + when(tablesMock.get(GCP_PROJECT, DATASET_ID, TABLE_ID)).thenReturn(tableGetMock); + when(tablesMock.delete(GCP_PROJECT, DATASET_ID, TABLE_ID)).thenReturn(tableDeleteMock); + when(tablesMock.list(GCP_PROJECT, DATASET_ID)).thenReturn(tableListMock); + } + + private void setJsonFactory() { + this.dataset.setFactory(JSON_FACTORY); + this.table.setFactory(JSON_FACTORY); + } + + @Test + public void testCreateDataset_success() throws Exception { + when(datasetInsertMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + + assertEquals(client.createDataset(dataset), dataset); + } + + @Test + public void testCreateDataset_throwsCorrectGeneralIcebergExceptions() throws Exception { + testThrowsCommonExceptions(datasetInsertMock, Set.of(), () -> client.createDataset(dataset)); + } + + @Test + public void testGetDataset_success() throws Exception { + when(datasetGetMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + + assertEquals(client.getDataset(DATASET_REFERENCE), dataset); + } + + @Test + public void testGetDataset_throwsCorrectGeneralIcebergExceptions() throws Exception { + testThrowsCommonExceptions( + datasetGetMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> client.getDataset(DATASET_REFERENCE)); + } + + @Test + public void testGetDataset_throwsNoSuchNamespaceExceptionWhenNotFound() throws Exception { + when(datasetGetMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + NoSuchNamespaceException exception = + assertThrows(NoSuchNamespaceException.class, () -> client.getDataset(DATASET_REFERENCE)); + assertEquals("Got status code: 404", exception.getMessage()); + } + + @Test + public void testDeleteDataset_success() throws Exception { + when(datasetDeleteMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_OK)); + + client.deleteDataset(DATASET_REFERENCE); + + verify(datasetDeleteMock, times(1)).executeUnparsed(); + } + + @Test + public void testDeleteDataset_throwsCorrectGeneralIcebergExceptions() throws Exception { + testThrowsCommonExceptions( + datasetDeleteMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> client.deleteDataset(DATASET_REFERENCE)); + } + + @Test + public void testDeleteDataset_throwsNoSuchNamespaceExceptionWhenNotFound() throws Exception { + when(datasetDeleteMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + NoSuchNamespaceException exception = + assertThrows(NoSuchNamespaceException.class, () -> client.deleteDataset(DATASET_REFERENCE)); + assertEquals("Got status code: 404", exception.getMessage()); + } + + @Test + public void testSetDatasetParameters_success() throws Exception { + when(datasetGetMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + dataset.getExternalCatalogDatasetOptions().getParameters().put("newKey", "newVal"); + when(datasetUpdateMock.setRequestHeaders(new HttpHeaders().setIfMatch(dataset.getEtag()))) + .thenReturn(datasetUpdateMock); + when(datasetUpdateMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + + assertEquals( + dataset, client.setDatasetParameters(DATASET_REFERENCE, Map.of("newKey", "newVal"))); + } + + @Test + public void testSetDatasetParameters_throwsCorrectGeneralIcebergExceptions() throws Exception { + String initialDatasetStringState = dataset.toPrettyString(); + dataset.getExternalCatalogDatasetOptions().getParameters().put("newKey", "newVal"); + + testThrowsCommonExceptions( + datasetUpdateMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> { + when(datasetGetMock.executeUnparsed()) + .thenReturn(buildResponse(initialDatasetStringState)); + when(datasetUpdateMock.setRequestHeaders(new HttpHeaders().setIfMatch(dataset.getEtag()))) + .thenReturn(datasetUpdateMock); + client.setDatasetParameters(DATASET_REFERENCE, Map.of("newKey", "newVal")); + }); + } + + @Test + public void testSetDatasetParameters_throwsNoSuchNamespaceExceptionWhenNotFound() + throws Exception { + when(datasetGetMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + dataset.getExternalCatalogDatasetOptions().getParameters().put("newKey", "newVal"); + when(datasetUpdateMock.setRequestHeaders(new HttpHeaders().setIfMatch(dataset.getEtag()))) + .thenReturn(datasetUpdateMock); + when(datasetUpdateMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + NoSuchNamespaceException exception = + assertThrows( + NoSuchNamespaceException.class, + () -> client.setDatasetParameters(DATASET_REFERENCE, Map.of("newKey", "newVal"))); + assertEquals("Got status code: 404", exception.getMessage()); + } + + @Test + public void testRemoveDatasetParameters_success() throws Exception { + when(datasetGetMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + dataset.getExternalCatalogDatasetOptions().getParameters().remove("initialDatasetKey"); + when(datasetUpdateMock.setRequestHeaders(new HttpHeaders().setIfMatch(dataset.getEtag()))) + .thenReturn(datasetUpdateMock); + when(datasetUpdateMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + + assertEquals( + dataset, client.removeDatasetParameters(DATASET_REFERENCE, Set.of("initialDatasetKey"))); + } + + @Test + public void testRemoveDatasetParameters_throwsCorrectGeneralIcebergExceptions() throws Exception { + String initialDatasetStringState = dataset.toPrettyString(); + dataset.getExternalCatalogDatasetOptions().getParameters().remove("initialDatasetKey"); + + testThrowsCommonExceptions( + datasetUpdateMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> { + when(datasetGetMock.executeUnparsed()) + .thenReturn(buildResponse(initialDatasetStringState)); + when(datasetUpdateMock.setRequestHeaders(new HttpHeaders().setIfMatch(dataset.getEtag()))) + .thenReturn(datasetUpdateMock); + client.removeDatasetParameters(DATASET_REFERENCE, Set.of("initialDatasetKey")); + }); + } + + @Test + public void testRemoveDatasetParameters_throwsNoSuchNamespaceExceptionWhenNotFound() + throws Exception { + when(datasetGetMock.executeUnparsed()).thenReturn(buildResponse(dataset.toPrettyString())); + dataset.getExternalCatalogDatasetOptions().getParameters().remove("initialDatasetKey"); + when(datasetUpdateMock.setRequestHeaders(new HttpHeaders().setIfMatch(dataset.getEtag()))) + .thenReturn(datasetUpdateMock); + when(datasetUpdateMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + NoSuchNamespaceException exception = + assertThrows( + NoSuchNamespaceException.class, + () -> client.removeDatasetParameters(DATASET_REFERENCE, Set.of("initialDatasetKey"))); + assertEquals("Got status code: 404", exception.getMessage()); + } + + @Test + public void testListDatasets_success() throws Exception { + DatasetList.Datasets dataset1 = generateDatasetListItem("dataset1"); + DatasetList.Datasets dataset2 = generateDatasetListItem("dataset2"); + DatasetList firstPage = + new DatasetList() + .setDatasets(List.of(dataset1, dataset2)) + .setNextPageToken("firstPageToken"); + firstPage.setFactory(JSON_FACTORY); + + DatasetList.Datasets dataset3 = generateDatasetListItem("dataset3"); + DatasetList secondPage = new DatasetList().setDatasets(List.of(dataset3)); + secondPage.setFactory(JSON_FACTORY); + + when(datasetListMock.setPageToken(null)) + .thenReturn(datasetListMock); // For getting the first page + when(datasetListMock.setPageToken("firstPageToken")) + .thenReturn(datasetListMock); // For getting the second page + // Two invocations for the two pages. + when(datasetListMock.executeUnparsed()) + .thenReturn( + buildResponse(firstPage.toPrettyString()), buildResponse(secondPage.toPrettyString())); + + assertEquals(List.of(dataset1, dataset2, dataset3), client.listDatasets(GCP_PROJECT)); + } + + @Test + public void testListDatasets_throwsCorrectGeneralIcebergExceptions() throws Exception { + when(datasetListMock.setPageToken(null)) + .thenReturn(datasetListMock); // For constructing the request to get the first page + testThrowsCommonExceptions(datasetListMock, Set.of(), () -> client.listDatasets(GCP_PROJECT)); + } + + @Test + public void testCreateTable_success() throws Exception { + when(tableInsertMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + + assertEquals(table, client.createTable(table)); + } + + @Test + public void testCreateTable_throwsWhenNotBqmsTable() { + table.setExternalCatalogTableOptions(null); + + assertThrows(NoSuchIcebergTableException.class, () -> client.createTable(table)); + } + + @Test + public void testCreateTable_throwsCorrectGeneralIcebergExceptions() throws Exception { + testThrowsCommonExceptions(tableInsertMock, Set.of(), () -> client.createTable(table)); + } + + @Test + public void testGetTable_success() throws Exception { + when(tableGetMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + + assertEquals(table, client.getTable(TABLE_REFERENCE)); + } + + @Test + public void testGetTable_throwsWhenNotBqmsTable() throws Exception { + table.setExternalCatalogTableOptions(null); + when(tableGetMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + + assertThrows(NoSuchIcebergTableException.class, () -> client.getTable(TABLE_REFERENCE)); + } + + @Test + public void testGetTable_throwsCorrectGeneralIcebergExceptions() throws Exception { + testThrowsCommonExceptions( + tableGetMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> client.getTable(TABLE_REFERENCE)); + } + + @Test + public void testGetTable_throwsNoSuchTableExceptionWhenNotFound() throws Exception { + when(tableGetMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + assertThrows(NoSuchTableException.class, () -> client.getTable(TABLE_REFERENCE)); + } + + @Test + public void testPatchTable_success() throws Exception { + ExternalCatalogTableOptions newExternalCatalogTableOptions = + table.getExternalCatalogTableOptions(); + Table patch = + new Table() + .setSchema(Data.nullOf(TableSchema.class)) + .setExternalCatalogTableOptions(newExternalCatalogTableOptions); + + when(tablesMock.patch(GCP_PROJECT, DATASET_ID, TABLE_ID, patch)).thenReturn(tablePatchMock); + when(tablePatchMock.setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag()))) + .thenReturn(tablePatchMock); + when(tablePatchMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + + assertEquals( + newExternalCatalogTableOptions, + client.patchTable(TABLE_REFERENCE, table).getExternalCatalogTableOptions()); + } + + @Test + public void testPatchTable_clearsSchema() throws Exception { + table.setSchema(Data.nullOf(TableSchema.class)); + String tableStateAfterPatch = table.toPrettyString(); + table.setSchema(new TableSchema()); + + Table patch = + new Table() + .setSchema(Data.nullOf(TableSchema.class)) + .setExternalCatalogTableOptions(table.getExternalCatalogTableOptions()); + + when(tablesMock.patch(GCP_PROJECT, DATASET_ID, TABLE_ID, patch)).thenReturn(tablePatchMock); + when(tablePatchMock.setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag()))) + .thenReturn(tablePatchMock); + when(tablePatchMock.executeUnparsed()).thenReturn(buildResponse(tableStateAfterPatch)); + + assertEquals( + Data.nullOf(TableSchema.class), client.patchTable(TABLE_REFERENCE, table).getSchema()); + } + + @Test + public void testPatchTable_throwsCorrectGeneralIcebergExceptions() throws Exception { + ExternalCatalogTableOptions newExternalCatalogTableOptions = + table.getExternalCatalogTableOptions(); + Table patch = + new Table() + .setSchema(Data.nullOf(TableSchema.class)) + .setExternalCatalogTableOptions(newExternalCatalogTableOptions); + + testThrowsCommonExceptions( + tablePatchMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> { + when(tablePatchMock.setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag()))) + .thenReturn(tablePatchMock); + when(tablesMock.patch(GCP_PROJECT, DATASET_ID, TABLE_ID, patch)) + .thenReturn(tablePatchMock); + client.patchTable(TABLE_REFERENCE, table); + }); + } + + @Test + public void testPatchTableParameters_throwsNoSuchTableExceptionWhenNotFound() throws Exception { + Table patch = + new Table() + .setSchema(Data.nullOf(TableSchema.class)) + .setExternalCatalogTableOptions(table.getExternalCatalogTableOptions()); + when(tablePatchMock.setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag()))) + .thenReturn(tablePatchMock); + when(tablesMock.patch(GCP_PROJECT, DATASET_ID, TABLE_ID, patch)).thenReturn(tablePatchMock); + when(tablePatchMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + assertThrows(NoSuchTableException.class, () -> client.patchTable(TABLE_REFERENCE, table)); + } + + @Test + public void testPatchTableParameters_throwsBadRequestExceptionWhenConnectionNotFound() + throws Exception { + table.getExternalCatalogTableOptions().setConnectionId("A-connection-that-does-not-exist"); + Table patch = + new Table() + .setSchema(Data.nullOf(TableSchema.class)) + .setExternalCatalogTableOptions(table.getExternalCatalogTableOptions()); + when(tablePatchMock.setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag()))) + .thenReturn(tablePatchMock); + when(tablesMock.patch(GCP_PROJECT, DATASET_ID, TABLE_ID, patch)).thenReturn(tablePatchMock); + when(tablePatchMock.executeUnparsed()) + .thenReturn( + buildResponseWithStatus( + HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "Not found: Connection A-connection-that-does-not-exist")); + + assertThrows(BadRequestException.class, () -> client.patchTable(TABLE_REFERENCE, table)); + } + + @Test + public void testRenameTable_success() throws Exception { + when(tableGetMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + String newTableId = "newTableId"; + TableReference newTableReference = + new TableReference() + .setProjectId(GCP_PROJECT) + .setDatasetId(DATASET_ID) + .setTableId(newTableId); + table.setTableReference(newTableReference); + Table patch = new Table().setTableReference(newTableReference); + when(tablesMock.patch(GCP_PROJECT, DATASET_ID, TABLE_ID, patch)).thenReturn(tablePatchMock); + when(tablePatchMock.setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag()))) + .thenReturn(tablePatchMock); + when(tablePatchMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + + assertEquals(table, client.renameTable(TABLE_REFERENCE, newTableId)); + } + + @Test + public void testRenameTable_throwsCorrectGeneralIcebergExceptions() throws Exception { + String initialTableStringState = table.toPrettyString(); + String newTableId = "newTableId"; + TableReference newTableReference = + new TableReference() + .setProjectId(GCP_PROJECT) + .setDatasetId(DATASET_ID) + .setTableId(newTableId); + table.setTableReference(newTableReference); + Table patch = new Table().setTableReference(newTableReference); + + testThrowsCommonExceptions( + tablePatchMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> { + when(tableGetMock.executeUnparsed()).thenReturn(buildResponse(initialTableStringState)); + when(tablePatchMock.setRequestHeaders(new HttpHeaders().setIfMatch(table.getEtag()))) + .thenReturn(tablePatchMock); + when(tablesMock.patch(GCP_PROJECT, DATASET_ID, TABLE_ID, patch)) + .thenReturn(tablePatchMock); + client.renameTable(TABLE_REFERENCE, newTableId); + }); + } + + @Test + public void testRenameTable_throwsNoSuchTableExceptionWhenNotFound() throws Exception { + when(tableGetMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + assertThrows( + NoSuchTableException.class, () -> client.renameTable(TABLE_REFERENCE, "someNewTableId")); + } + + @Test + public void testDeleteTable_success() throws Exception { + when(tableGetMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + when(tableDeleteMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_OK)); + + client.deleteTable(TABLE_REFERENCE); + + verify(tableDeleteMock, times(1)).executeUnparsed(); + } + + @Test + public void testDeleteTable_throwsCorrectGeneralIcebergExceptions() throws Exception { + testThrowsCommonExceptions( + tableDeleteMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> { + when(tableGetMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + client.deleteTable(TABLE_REFERENCE); + }); + } + + @Test + public void testDeleteTable_throwsNoSuchTableExceptionWhenNotFoundWhileDeleting() + throws Exception { + when(tableGetMock.executeUnparsed()).thenReturn(buildResponse(table.toPrettyString())); + when(tableDeleteMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + NoSuchTableException exception = + assertThrows(NoSuchTableException.class, () -> client.deleteTable(TABLE_REFERENCE)); + assertEquals("Got status code: 404", exception.getMessage()); + } + + @Test + public void testDeleteTable_throwsNoSuchTableExceptionWhenNotFoundInitially() throws Exception { + when(tableGetMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + when(tableDeleteMock.executeUnparsed()) + .thenThrow(new IllegalStateException("Why are you even calling this, test?")); + + NoSuchTableException exception = + assertThrows(NoSuchTableException.class, () -> client.deleteTable(TABLE_REFERENCE)); + assertEquals("Got status code: 404", exception.getMessage()); + } + + @Test + public void testListTables_success() throws Exception { + when(tableGetMock.executeUnparsed()) + .thenReturn(buildResponse(table.toPrettyString())) // Response for table1 + .thenReturn(buildResponse(table.toPrettyString())); // Response for table3 + when(unsupportedTableGetMock.executeUnparsed()) + .thenThrow( + new NoSuchIcebergTableException( + "We don't support that")); // Response for table2ThatIsNotSupported + + TableList.Tables table1 = generateTableListItem("table1"); + when(tablesMock.get( + table1.getTableReference().getProjectId(), + table1.getTableReference().getDatasetId(), + table1.getTableReference().getTableId())) + .thenReturn(tableGetMock); + TableList.Tables table2ThatIsNotSupported = generateTableListItem("table2ThatIsNotSupported"); + when(tablesMock.get( + table2ThatIsNotSupported.getTableReference().getProjectId(), + table2ThatIsNotSupported.getTableReference().getDatasetId(), + table2ThatIsNotSupported.getTableReference().getTableId())) + .thenReturn(unsupportedTableGetMock); + TableList firstPage = + new TableList() + .setTables(List.of(table1, table2ThatIsNotSupported)) + .setNextPageToken("firstPageToken"); + firstPage.setFactory(JSON_FACTORY); + + TableList.Tables table3 = generateTableListItem("table3"); + when(tablesMock.get( + table3.getTableReference().getProjectId(), + table3.getTableReference().getDatasetId(), + table3.getTableReference().getTableId())) + .thenReturn(tableGetMock); + TableList secondPage = new TableList().setTables(List.of(table3)); + secondPage.setFactory(JSON_FACTORY); + + when(tableListMock.setPageToken(null)).thenReturn(tableListMock); // For getting the first page + when(tableListMock.setPageToken("firstPageToken")) + .thenReturn(tableListMock); // For getting the second page + // Two invocations for the two pages. + when(tableListMock.executeUnparsed()) + .thenReturn( + buildResponse(firstPage.toPrettyString()), buildResponse(secondPage.toPrettyString())); + + // Notice how table2ThatIsNotSupported is intentionally not included. + assertEquals( + List.of(table1, table3), + client.listTables(DATASET_REFERENCE, /* filterUnsupportedTables= */ true)); + } + + @Test + public void testListTables_throwsCorrectGeneralIcebergExceptions() throws Exception { + when(tableListMock.setPageToken(null)) + .thenReturn(tableListMock); // For constructing the request to get the first page + testThrowsCommonExceptions( + tableListMock, + Set.of(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + () -> client.listTables(DATASET_REFERENCE, true)); + } + + @Test + public void testListTables_throwsNoSuchNamespaceExceptionWhenDatasetNotFound() throws Exception { + when(tableListMock.setPageToken(null)) + .thenReturn(tableListMock); // For constructing the request to get the first page + when(tableListMock.executeUnparsed()) + .thenReturn(buildResponseWithStatus(HttpStatusCodes.STATUS_CODE_NOT_FOUND)); + + assertThrows(NoSuchNamespaceException.class, () -> client.listTables(DATASET_REFERENCE, false)); + } + + private DatasetList.Datasets generateDatasetListItem(String datasetId) { + return new DatasetList.Datasets() + .setDatasetReference( + new DatasetReference().setProjectId(GCP_PROJECT).setDatasetId(datasetId)); + } + + private TableList.Tables generateTableListItem(String tableId) { + return new TableList.Tables() + .setTableReference(new TableReference().setProjectId(GCP_PROJECT).setTableId(tableId)); + } + + private void testThrowsCommonExceptions( + @SuppressWarnings("rawtypes") BigqueryRequest requestMocker, // Intended to be generic + Set excludedStatusCodes, + Executable executable) + throws IOException { + for (Map.Entry> codeToException : + errorCodeToIcebergException.entrySet()) { + int statusCode = codeToException.getKey(); + if (excludedStatusCodes.contains(statusCode)) { + continue; + } + + when(requestMocker.executeUnparsed()) + .thenReturn(buildResponseWithStatus(codeToException.getKey())); + assertThrows(codeToException.getValue(), executable); + } + } + + private HttpResponse buildResponseWithStatus(int statusCode) throws IOException { + return buildResponseWithStatus(statusCode, null); + } + + private HttpResponse buildResponseWithStatus(int statusCode, String content) throws IOException { + return buildResponse(statusCode, content); + } + + private HttpResponse buildResponse(String content) throws IOException { + return buildResponse(HttpStatusCodes.STATUS_CODE_OK, content); + } + + private HttpResponse buildResponse(int statusCode, String content) throws IOException { + HttpTransport mockHttpTransport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() { + MockLowLevelHttpResponse result = new MockLowLevelHttpResponse(); + if (content != null) { + result.setContent(content); + } + result.setStatusCode(statusCode); + if (statusCode != HttpStatusCodes.STATUS_CODE_OK) { + result.setReasonPhrase("Got status code: " + statusCode); + } + result.setContentEncoding("UTF-8"); + result.setContentType("application/json"); + result.setHeaderNames(List.of("header1", "header2")); + result.setHeaderValues(List.of("header1", "header2")); + return result; + } + }; + } + }; + + return mockHttpTransport + .createRequestFactory() + .buildPostRequest(HttpTesting.SIMPLE_GENERIC_URL, new MockHttpContent()) + .setThrowExceptionOnExecuteError(false) + .setParser(JSON_FACTORY.createJsonObjectParser()) + .execute(); + } +} diff --git a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreCatalogTest.java b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreCatalogTest.java new file mode 100644 index 000000000000..13630cd22c44 --- /dev/null +++ b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreCatalogTest.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery.metastore; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList.Datasets; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableList.Tables; +import com.google.api.services.bigquery.model.TableReference; +import java.io.File; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ServiceFailureException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.gcp.bigquery.BigQueryClient; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class BigQueryMetastoreCatalogTest { + + @TempDir private File tempFolder; + + private static final String GCP_PROJECT = "my-project"; + private static final String GCP_REGION = "us"; + private static final String CATALOG_ID = "bqms"; + private static final String DATASET_ID = "db"; + private static final String TABLE_ID = "tbl"; + private static final DatasetReference DATASET_REFERENCE = + new DatasetReference().setProjectId(GCP_PROJECT).setDatasetId(DATASET_ID); + private static final TableReference TABLE_REFERENCE = + new TableReference().setProjectId(GCP_PROJECT).setDatasetId(DATASET_ID).setTableId(TABLE_ID); + + private final BigQueryClient bigQueryClient = mock(BigQueryClient.class); + + private BigQueryMetastoreCatalog bigQueryMetastoreCatalog; + private String warehouseLocation; + + @BeforeEach + public void before() { + this.bigQueryMetastoreCatalog = new BigQueryMetastoreCatalog(); + this.bigQueryMetastoreCatalog.setConf(new Configuration()); + this.warehouseLocation = tempFolder.toPath().resolve("hive-warehouse").toString(); + + bigQueryMetastoreCatalog.initialize( + CATALOG_ID, + /* properties= */ ImmutableMap.of( + GCPProperties.PROJECT_ID, + GCP_PROJECT, + CatalogProperties.WAREHOUSE_LOCATION, + warehouseLocation), + GCP_PROJECT, + GCP_REGION, + bigQueryClient); + } + + @Test + public void testDefaultWarehouseWithDatabaseLocation_asExpected() { + when(bigQueryClient.getDataset(DATASET_REFERENCE)) + .thenReturn( + new Dataset() + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setDefaultStorageLocationUri("build/db_folder"))); + + assertEquals( + "build/db_folder/table", + bigQueryMetastoreCatalog.defaultWarehouseLocation(TableIdentifier.of(DATASET_ID, "table"))); + } + + @Test + public void testDefaultWarehouseWithoutDatabaseLocation_asExpected() { + when(bigQueryClient.getDataset(DATASET_REFERENCE)) + .thenReturn( + new Dataset().setExternalCatalogDatasetOptions(new ExternalCatalogDatasetOptions())); + + assertEquals( + warehouseLocation + "/db.db/table", + bigQueryMetastoreCatalog.defaultWarehouseLocation(TableIdentifier.of(DATASET_ID, "table"))); + } + + @Test + public void testCreateTable_succeedWhenNotExist() throws Exception { + // The table to create does not exist. + TableIdentifier tableIdentifier = TableIdentifier.of(DATASET_ID, TABLE_ID); + Schema schema = BigQueryMetastoreTestUtils.getTestSchema(); + + when(bigQueryClient.getTable(TABLE_REFERENCE)) + .thenThrow(new NoSuchTableException("error message getTable")); + Table createdTable = + BigQueryMetastoreTestUtils.createTestTable( + tempFolder, bigQueryMetastoreCatalog, TABLE_REFERENCE); + reset(bigQueryClient); + when(bigQueryClient.getTable(TABLE_REFERENCE)).thenReturn(createdTable, createdTable); + + org.apache.iceberg.Table loadedTable = bigQueryMetastoreCatalog.loadTable(tableIdentifier); + assertEquals(SchemaParser.toJson(schema), SchemaParser.toJson(loadedTable.schema())); + + // Creates a table that already exists. + Exception exception = + assertThrows( + AlreadyExistsException.class, + () -> + bigQueryMetastoreCatalog + .buildTable(tableIdentifier, schema) + .withLocation(tempFolder.toPath().resolve("new_tbl").toString()) + .createTransaction() + .commitTransaction()); + assertTrue(exception.getMessage().contains("already exist")); + } + + @Test + public void testListTables_asExpected() { + when(bigQueryClient.listTables(DATASET_REFERENCE, false)) + .thenReturn( + ImmutableList.of( + new Tables() + .setTableReference( + new TableReference() + .setProjectId(GCP_PROJECT) + .setDatasetId(DATASET_ID) + .setTableId("tbl0")), + new Tables() + .setTableReference( + new TableReference() + .setProjectId(GCP_PROJECT) + .setDatasetId(DATASET_ID) + .setTableId("tbl1")))); + + List result = bigQueryMetastoreCatalog.listTables(Namespace.of(DATASET_ID)); + assertEquals(2, result.size()); + assertEquals(TableIdentifier.of(DATASET_ID, "tbl0"), result.get(0)); + assertEquals(TableIdentifier.of(DATASET_ID, "tbl1"), result.get(1)); + } + + @Test + public void testListTables_throwsOnInvalidNamespace_emptyNamespace() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.listTables(Namespace.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"\" has 0" + + " levels", + exception.getMessage()); + } + + @Test + public void testListTables_throwsOnInvalidNamespace_nestedNamespace() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.listTables(Namespace.of("n0", "n1"))); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"n0.n1\" has" + + " 2 levels", + exception.getMessage()); + } + + @Test + public void testDropTable_failsWhenTableNotFound() { + when(bigQueryClient.getTable(TABLE_REFERENCE)) + .thenThrow(new NoSuchTableException("error message getTable")); + doThrow(new NoSuchTableException("error message deleteTable")) + .when(bigQueryClient) + .deleteTable(TABLE_REFERENCE); + + assertFalse( + bigQueryMetastoreCatalog.dropTable( + TableIdentifier.of(DATASET_ID, TABLE_ID), /* purge= */ false)); + } + + @Test + public void testDropTable_succeedsWhenTableExists_deleteFiles() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of(DATASET_ID, TABLE_ID); + + when(bigQueryClient.getTable(TABLE_REFERENCE)) + .thenThrow(new NoSuchTableException("error message getTable")); + Table createdTable = + BigQueryMetastoreTestUtils.createTestTable( + tempFolder, bigQueryMetastoreCatalog, TABLE_REFERENCE); + String tableDir = + createdTable.getExternalCatalogTableOptions().getStorageDescriptor().getLocationUri(); + assertTrue(BigQueryMetastoreTestUtils.getIcebergMetadataFilePath(tableDir).isPresent()); + + reset(bigQueryClient); + when(bigQueryClient.getTable(TABLE_REFERENCE)).thenReturn(createdTable, createdTable); + + bigQueryMetastoreCatalog.dropTable(tableIdentifier, /* purge= */ false); + assertTrue(BigQueryMetastoreTestUtils.getIcebergMetadataFilePath(tableDir).isPresent()); + + bigQueryMetastoreCatalog.dropTable(tableIdentifier, /* purge= */ true); + assertFalse(BigQueryMetastoreTestUtils.getIcebergMetadataFilePath(tableDir).isPresent()); + } + + @Test + public void testRenameTable_sameDatabase_unsupported() { + when(bigQueryClient.renameTable(TABLE_REFERENCE, "newTableId")) + .thenThrow( + new AssertionError("You should not make this API call, it's unsupported anyway!")); + + ServiceFailureException exception = + assertThrows( + ServiceFailureException.class, + () -> + bigQueryMetastoreCatalog.renameTable( + TableIdentifier.of(DATASET_ID, TABLE_ID), + TableIdentifier.of(DATASET_ID, "newTableId"))); + assertEquals( + "Table rename operation is unsupported. " + + "Try the SQL operation directly on BigQuery: " + + "\"ALTER TABLE tbl RENAME TO newTableId;\"", + exception.getMessage()); + } + + @Test + public void testRenameTable_differentDatabase_fail() { + when(bigQueryClient.renameTable(TABLE_REFERENCE, "newTableId")) + .thenReturn( + new Table() + .setTableReference( + new TableReference() + .setProjectId(TABLE_REFERENCE.getProjectId()) + .setDatasetId(TABLE_REFERENCE.getDatasetId()) + .setTableId("newTableId"))); + + Exception exception = + assertThrows( + ValidationException.class, + () -> + bigQueryMetastoreCatalog.renameTable( + TableIdentifier.of(DATASET_ID, TABLE_ID), + TableIdentifier.of("A different DATASET_ID", "newTableId"))); + assertEquals("New table name must be in the same namespace", exception.getMessage()); + } + + @Test + public void testCreateNamespace_createDatabase() { + Map metadata = ImmutableMap.of(); + String dbDir = warehouseLocation + String.format("/%s.db", DATASET_ID); + Dataset dataset = + new Dataset() + .setDatasetReference(DATASET_REFERENCE) + .setLocation(GCP_REGION) + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setParameters(metadata) + .setDefaultStorageLocationUri(dbDir)); + when(bigQueryClient.createDataset(dataset)).thenReturn(dataset); + + bigQueryMetastoreCatalog.createNamespace(Namespace.of(DATASET_ID), metadata); + + verify(bigQueryClient, times(1)).createDataset(dataset); + } + + @Test + public void testCreateNamespace_failsWhenInvalid_tooLong() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> + bigQueryMetastoreCatalog.createNamespace( + Namespace.of("n0", "n1"), ImmutableMap.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"n0.n1\" has" + + " 2 levels", + exception.getMessage()); + } + + @Test + public void testCreateNamespace_failsWhenInvalid_emptyNamespace() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.createNamespace(Namespace.of(), ImmutableMap.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"\" has 0" + + " levels", + exception.getMessage()); + } + + @Test + public void testListNamespaces_asExpected() { + when(bigQueryClient.listDatasets(GCP_PROJECT)) + .thenReturn( + ImmutableList.of( + new Datasets() + .setDatasetReference( + new DatasetReference().setProjectId(GCP_PROJECT).setDatasetId("db0")), + new Datasets() + .setDatasetReference( + new DatasetReference().setProjectId(GCP_PROJECT).setDatasetId("db1")))); + + List result = bigQueryMetastoreCatalog.listNamespaces(Namespace.of()); + + assertEquals(2, result.size()); + assertEquals(Namespace.of("db0"), result.get(0)); + assertEquals(Namespace.of("db1"), result.get(1)); + } + + @Test + public void testListNamespaces_emptyWhenInvalid() { + assertTrue(bigQueryMetastoreCatalog.listNamespaces(Namespace.of(DATASET_ID)).isEmpty()); + } + + @Test + public void testDropNamespace_deleteDataset_success() { + doNothing().when(bigQueryClient).deleteDataset(DATASET_REFERENCE); + + bigQueryMetastoreCatalog.dropNamespace(Namespace.of(DATASET_ID)); + } + + @Test + public void testDropNamespace_throwsOnInvalidNamespace_emptyNamespace() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.dropNamespace(Namespace.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"\" has 0" + + " levels", + exception.getMessage()); + } + + @Test + public void testSetProperties_throwsOnInvalidNamespace_emptyNamespace() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.setProperties(Namespace.of(), ImmutableMap.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"\" has 0" + + " levels", + exception.getMessage()); + } + + @Test + public void testDropNamespace_throwsOnInvalidNamespace_nestedNamespace() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.dropNamespace(Namespace.of("n0", "n1"))); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"n0.n1\" has" + + " 2 levels", + exception.getMessage()); + } + + @Test + public void testSetProperties_throwsOnInvalidNamespace_nestedNamespace() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> + bigQueryMetastoreCatalog.setProperties( + Namespace.of("n0", "n1"), ImmutableMap.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"n0.n1\" has" + + " 2 levels", + exception.getMessage()); + } + + @Test + public void testSetProperties_succeedForDataset() { + when(bigQueryClient.getDataset(DATASET_REFERENCE)) + .thenReturn( + new Dataset() + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setParameters(ImmutableMap.of("key1", "value1", "key2", "value2")))); + when(bigQueryClient.setDatasetParameters(DATASET_REFERENCE, ImmutableMap.of("key3", "value3"))) + .thenReturn( + new Dataset() + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setParameters( + ImmutableMap.of( + "key1", "value1", "key2", "value2", "key3", "value3")))); + + assertTrue( + bigQueryMetastoreCatalog.setProperties( + Namespace.of(DATASET_ID), + ImmutableMap.of("key1", "value1", "key2", "value2", "key3", "value3"))); + } + + @Test + public void testRemoveProperties_throwsOnInvalidNamespace_emptyNamespace() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.setProperties(Namespace.of(), ImmutableMap.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"\" has 0" + + " levels", + exception.getMessage()); + } + + @Test + public void testRemoveProperties_throwsOnInvalidNamespace_nestedNamespace() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> + bigQueryMetastoreCatalog.removeProperties( + Namespace.of("n0", "n1"), ImmutableSet.of())); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"n0.n1\" has" + + " 2 levels", + exception.getMessage()); + } + + @Test + public void testRemoveProperties_succeedForDatabase() { + when(bigQueryClient.getDataset(DATASET_REFERENCE)) + .thenReturn( + new Dataset() + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setParameters(ImmutableMap.of("key1", "value1", "key2", "value2")))); + when(bigQueryClient.removeDatasetParameters(DATASET_REFERENCE, ImmutableSet.of("key1"))) + .thenReturn( + new Dataset() + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setParameters(ImmutableMap.of("key1", "value1")))); + + assertTrue( + bigQueryMetastoreCatalog.removeProperties( + Namespace.of(DATASET_ID), ImmutableSet.of("key1", "key3"))); + } + + @Test + public void testLoadNamespaceMetadata_catalogAsExpected() { + when(bigQueryClient.getDataset(DATASET_REFERENCE)).thenReturn(new Dataset()); + + assertTrue(bigQueryMetastoreCatalog.loadNamespaceMetadata(Namespace.of(DATASET_ID)).isEmpty()); + } + + @Test + public void testLoadNamespaceMetadata_datasetAsExpected() { + Map metadata = ImmutableMap.of("key1", "value1", "key2", "value2"); + String dbDir = warehouseLocation + String.format("/%s.db", DATASET_ID); + Dataset dataset = + new Dataset() + .setDatasetReference(DATASET_REFERENCE) + .setType("BIGQUERY_METASTORE") + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setParameters(metadata) + .setDefaultStorageLocationUri(dbDir)); + when(bigQueryClient.createDataset(dataset)).thenReturn(dataset); + + when(bigQueryClient.getDataset(DATASET_REFERENCE)).thenReturn(dataset); + + assertEquals( + ImmutableMap.of("location", dbDir, "key1", "value1", "key2", "value2"), + bigQueryMetastoreCatalog.loadNamespaceMetadata(Namespace.of(DATASET_ID))); + } + + @Test + public void testLoadNamespaceMetadata_failWhenInvalid() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> bigQueryMetastoreCatalog.loadNamespaceMetadata(Namespace.of("n0", "n1"))); + assertEquals( + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"n0.n1\" has" + + " 2 levels", + exception.getMessage()); + } + + @Test + public void testName_asExpected() { + assertEquals("bqms", bigQueryMetastoreCatalog.name()); + } + + @Test + public void testProperties_asExpected() { + assertEquals( + ImmutableMap.of("gcp_project", GCP_PROJECT, "warehouse", warehouseLocation), + bigQueryMetastoreCatalog.properties()); + } + + @Test + public void testNewTableOps_asExpected() { + assertNotNull(bigQueryMetastoreCatalog.newTableOps(TableIdentifier.of(DATASET_ID, TABLE_ID))); + } + + @Test + public void testNewTableOps_doesNotFailForInvalidNamespace() { + assertNotNull( + bigQueryMetastoreCatalog.newTableOps( + TableIdentifier.of(Namespace.of("n0", TABLE_ID), TABLE_ID))); + } +} diff --git a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreTestUtils.java b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreTestUtils.java new file mode 100644 index 000000000000..24323fb443e4 --- /dev/null +++ b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryMetastoreTestUtils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery.metastore; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.StorageDescriptor; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; + +/** Test utility methods for BigQuery Metastore Iceberg catalog. */ +public final class BigQueryMetastoreTestUtils { + + /** A utility class that is not to be instantiated */ + private BigQueryMetastoreTestUtils() {} + + public static final String METADATA_LOCATION_PROP = "metadata_location"; + + public static Schema getTestSchema() { + return new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get())); + } + + public static Table createTestTable( + File tempFolder, + BigQueryMetastoreCatalog bigQueryMetastoreCatalog, + TableReference tableReference) + throws IOException { + Schema schema = getTestSchema(); + TableIdentifier tableIdentifier = + TableIdentifier.of(tableReference.getDatasetId(), tableReference.getTableId()); + String tableDir = tempFolder.toPath().resolve(tableReference.getTableId()).toString(); + + bigQueryMetastoreCatalog + .buildTable(tableIdentifier, schema) + .withLocation(tableDir) + .createTransaction() + .commitTransaction(); + + Optional metadataLocation = getIcebergMetadataFilePath(tableDir); + assertTrue(metadataLocation.isPresent()); + return new Table() + .setTableReference(tableReference) + .setExternalCatalogTableOptions( + new ExternalCatalogTableOptions() + .setStorageDescriptor(new StorageDescriptor().setLocationUri(tableDir)) + .setParameters( + Collections.singletonMap(METADATA_LOCATION_PROP, metadataLocation.get()))); + } + + public static Optional getIcebergMetadataFilePath(String tableDir) throws IOException { + for (File file : + FileUtils.listFiles(new File(tableDir), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE)) { + if (file.getCanonicalPath().endsWith(".json")) { + return Optional.of(file.getCanonicalPath()); + } + } + + return Optional.empty(); + } +} diff --git a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryTableOperationsTest.java b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryTableOperationsTest.java new file mode 100644 index 000000000000..d0f49fca48c2 --- /dev/null +++ b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/metastore/BigQueryTableOperationsTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.bigquery.metastore; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.gcp.bigquery.BigQueryClient; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; + +public class BigQueryTableOperationsTest { + + @TempDir private File tempFolder; + + private static final String GCP_PROJECT = "my-project"; + private static final String GCP_REGION = "us"; + private static final String DATASET_ID = "db"; + private static final String TABLE_ID = "tbl"; + private static final TableIdentifier SPARK_TABLE_ID = TableIdentifier.of(DATASET_ID, TABLE_ID); + + private static final TableReference TABLE_REFERENCE = + new TableReference().setProjectId(GCP_PROJECT).setDatasetId(DATASET_ID).setTableId(TABLE_ID); + + private final BigQueryClient bigQueryClient = mock(BigQueryClient.class); + + private BigQueryMetastoreCatalog bigQueryMetastoreCatalog; + private BigQueryTableOperations tableOps; + + @BeforeEach + public void before() { + this.bigQueryMetastoreCatalog = new BigQueryMetastoreCatalog(); + this.bigQueryMetastoreCatalog.setConf(new Configuration()); + String warehouseLocation = tempFolder.toPath().resolve("hive-warehouse").toString(); + + bigQueryMetastoreCatalog.initialize( + "CATALOG_ID", + /* properties= */ ImmutableMap.of( + GCPProperties.PROJECT_ID, + GCP_PROJECT, + CatalogProperties.WAREHOUSE_LOCATION, + warehouseLocation), + GCP_PROJECT, + GCP_REGION, + bigQueryClient); + this.tableOps = (BigQueryTableOperations) bigQueryMetastoreCatalog.newTableOps(SPARK_TABLE_ID); + } + + @Test + public void testDoFresh_fetchLatestMetadataFromBigQuery() throws Exception { + Table createdTable = createTestTable(); + reset(bigQueryClient); + when(bigQueryClient.getTable(TABLE_REFERENCE)).thenReturn(createdTable); + + tableOps.refresh(); + assertEquals( + createdTable + .getExternalCatalogTableOptions() + .getParameters() + .getOrDefault(BigQueryMetastoreTestUtils.METADATA_LOCATION_PROP, ""), + tableOps.currentMetadataLocation()); + + reset(bigQueryClient); + when(bigQueryClient.getTable(TABLE_REFERENCE)) + .thenThrow(new NoSuchTableException("error message getTable")); + // Refresh fails when table is not found but metadata already presents. + assertThrows(NoSuchTableException.class, () -> tableOps.refresh()); + } + + @Test + public void testDoFresh_failForNonIcebergTable() { + when(bigQueryClient.getTable(TABLE_REFERENCE)) + .thenReturn(new Table().setTableReference(TABLE_REFERENCE)); + + Exception exception = assertThrows(ValidationException.class, () -> tableOps.refresh()); + assertTrue(exception.getMessage().contains("metadata location not found")); + } + + @Test + public void testDoFresh_noOpWhenMetadataAndTableNotFound() { + when(bigQueryClient.getTable(TABLE_REFERENCE)) + .thenThrow(new NoSuchTableException("error message getTable")); + // Table not found won't cause errors when the metadata is null. + assertNull(tableOps.currentMetadataLocation()); + tableOps.refresh(); + } + + @Test + public void testTableName_asExpected() { + assertEquals("db.tbl", tableOps.tableName()); + } + + @Test + public void testDoCommit_useEtagForUpdateTable() throws Exception { + Table tableWithEtag = createTestTable().setEtag("etag"); + reset(bigQueryClient); + when(bigQueryClient.getTable(TABLE_REFERENCE)).thenReturn(tableWithEtag, tableWithEtag); + + org.apache.iceberg.Table loadedTable = bigQueryMetastoreCatalog.loadTable(SPARK_TABLE_ID); + + when(bigQueryClient.patchTable(any(), any())).thenReturn(tableWithEtag); + loadedTable.updateSchema().addColumn("n", Types.IntegerType.get()).commit(); + + ArgumentCaptor tableReferenceArgumentCaptor = + ArgumentCaptor.forClass(TableReference.class); + ArgumentCaptor tableArgumentCaptor = ArgumentCaptor.forClass(Table.class); + verify(bigQueryClient, times(1)) + .patchTable(tableReferenceArgumentCaptor.capture(), tableArgumentCaptor.capture()); + assertEquals(TABLE_REFERENCE, tableReferenceArgumentCaptor.getValue()); + assertEquals("etag", tableArgumentCaptor.getValue().getEtag()); + } + + @Test + public void testDoCommit_failWhenEtagMismatch() throws Exception { + Table tableWithEtag = createTestTable().setEtag("etag"); + reset(bigQueryClient); + when(bigQueryClient.getTable(TABLE_REFERENCE)).thenReturn(tableWithEtag, tableWithEtag); + + org.apache.iceberg.Table loadedTable = bigQueryMetastoreCatalog.loadTable(SPARK_TABLE_ID); + + when(bigQueryClient.patchTable(any(), any())) + .thenThrow(new ValidationException("error message etag mismatch")); + assertThrows( + CommitFailedException.class, + () -> loadedTable.updateSchema().addColumn("n", Types.IntegerType.get()).commit()); + } + + @Test + public void testDoCommit_failWhenMetadataLocationDiff() throws Exception { + Table tableWithEtag = createTestTable().setEtag("etag"); + Table tableWithNewMetadata = + new Table() + .setEtag("etag") + .setExternalCatalogTableOptions( + new ExternalCatalogTableOptions() + .setParameters( + ImmutableMap.of( + BigQueryMetastoreTestUtils.METADATA_LOCATION_PROP, "a/new/location"))); + + reset(bigQueryClient); + // Two invocations, for loadTable and commit. + when(bigQueryClient.getTable(TABLE_REFERENCE)).thenReturn(tableWithEtag, tableWithNewMetadata); + + org.apache.iceberg.Table loadedTable = bigQueryMetastoreCatalog.loadTable(SPARK_TABLE_ID); + + when(bigQueryClient.patchTable(any(), any())).thenReturn(tableWithEtag); + assertThrows( + CommitFailedException.class, + () -> loadedTable.updateSchema().addColumn("n", Types.IntegerType.get()).commit()); + } + + @Test + public void testCreateTable_doCommitSucceeds() throws Exception { + var testTable = createTestTable(); + when(bigQueryClient.createTable(any())).thenReturn(testTable); + when(bigQueryClient.getDataset( + new DatasetReference().setProjectId(GCP_PROJECT).setDatasetId(DATASET_ID))) + .thenReturn( + new Dataset() + .setExternalCatalogDatasetOptions( + new ExternalCatalogDatasetOptions() + .setDefaultStorageLocationUri("build/db_folder"))); + + Schema schema = BigQueryMetastoreTestUtils.getTestSchema(); + bigQueryMetastoreCatalog.createTable(SPARK_TABLE_ID, schema, PartitionSpec.unpartitioned()); + } + + /** Creates a test table to have Iceberg metadata files in place. */ + private Table createTestTable() throws Exception { + when(bigQueryClient.getTable(TABLE_REFERENCE)) + .thenThrow(new NoSuchTableException("error message getTable")); + return BigQueryMetastoreTestUtils.createTestTable( + tempFolder, bigQueryMetastoreCatalog, TABLE_REFERENCE); + } +} diff --git a/build.gradle b/build.gradle index 7a11943cf8be..b1f3e4f8e3c8 100644 --- a/build.gradle +++ b/build.gradle @@ -157,7 +157,7 @@ subprojects { rootTask.finalizedBy showDeprecationRulesOnRevApiFailure } } - + tasks.named("revapiAnalyze").configure { dependsOn(":iceberg-common:jar") } @@ -636,6 +636,45 @@ project(':iceberg-delta-lake') { } } +project(':iceberg-bigquery') { + test { + useJUnitPlatform() + } + + dependencies { + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-gcp') + + api project(':iceberg-api') + implementation project(':iceberg-common') + implementation project(':iceberg-core') + + implementation("com.google.apis:google-api-services-bigquery:v2-rev20240602-2.0.0") + + compileOnly('org.apache.hive:hive-metastore:4.0.0') { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.apache.hbase' + exclude group: 'org.apache.logging.log4j' + exclude group: 'co.cask.tephra' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all' + exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet' + exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle' + exclude group: 'com.tdunning', module: 'json' + exclude group: 'javax.transaction', module: 'transaction-api' + exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'com.google.code.gson', module: 'gson' + } + + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') + + testImplementation 'org.apache.hadoop:hadoop-common:3.4.0' + testImplementation 'org.mockito:mockito-core:5.12.0' + } +} + project(':iceberg-gcp') { test { useJUnitPlatform() diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java index 4465ee29012a..a62a38c0f472 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java @@ -26,6 +26,9 @@ import org.apache.iceberg.util.PropertyUtil; public class GCPProperties implements Serializable { + /** The GCP project ID. Required. */ + public static final String PROJECT_ID = "gcp_project"; + // Service Options public static final String GCS_PROJECT_ID = "gcs.project-id"; public static final String GCS_CLIENT_LIB_TOKEN = "gcs.client-lib-token"; @@ -52,6 +55,9 @@ public class GCPProperties implements Serializable { */ public static final int GCS_DELETE_BATCH_SIZE_DEFAULT = 50; + /** The BIGQUERY GCP location (https://cloud.google.com/bigquery/docs/locations). Required. */ + public static final String BIGQUERY_LOCATION = "gcp_location"; + private String projectId; private String clientLibToken; private String serviceHost; diff --git a/settings.gradle b/settings.gradle index 1e6d92bf1e1f..5f9dd9b79117 100644 --- a/settings.gradle +++ b/settings.gradle @@ -38,6 +38,7 @@ include 'hive-metastore' include 'nessie' include 'gcp' include 'gcp-bundle' +include 'bigquery' include 'dell' include 'snowflake' include 'delta-lake' @@ -64,6 +65,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' project(':gcp-bundle').name = 'iceberg-gcp-bundle' +project(':bigquery').name = 'iceberg-bigquery' project(':dell').name = 'iceberg-dell' project(':snowflake').name = 'iceberg-snowflake' project(':delta-lake').name = 'iceberg-delta-lake'