diff --git a/.github/labeler.yml b/.github/labeler.yml index 55fd64b479f5..7ac78abeb06c 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -82,4 +82,6 @@ ALIYUN: GCP: - gcp/**/* DELL: - - dell/**/* \ No newline at end of file + - dell/**/* +SNOWFLAKE: + - snowflake/**/* diff --git a/build.gradle b/build.gradle index 026bc3ddd0af..dd8980590128 100644 --- a/build.gradle +++ b/build.gradle @@ -682,6 +682,22 @@ project(':iceberg-dell') { } } +project(':iceberg-snowflake') { + dependencies { + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation project(':iceberg-aws') + implementation "com.fasterxml.jackson.core:jackson-databind" + implementation "com.fasterxml.jackson.core:jackson-core" + implementation "commons-dbutils:commons-dbutils:1.7" + + runtimeOnly("net.snowflake:snowflake-jdbc:3.13.22") + + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') + } +} + @Memoized boolean versionFileExists() { return file('version.txt').exists() diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java index daa04908f41e..60e5eb49a4f8 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java @@ -27,12 +27,12 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ClientPoolImpl; -class JdbcClientPool extends ClientPoolImpl { +public class JdbcClientPool extends ClientPoolImpl { private final String dbUrl; private final Map properties; - JdbcClientPool(String dbUrl, Map props) { + public JdbcClientPool(String dbUrl, Map props) { this( Integer.parseInt( props.getOrDefault( @@ -42,7 +42,7 @@ class JdbcClientPool extends ClientPoolImpl { props); } - JdbcClientPool(int poolSize, String dbUrl, Map props) { + public JdbcClientPool(int poolSize, String dbUrl, Map props) { super(poolSize, SQLNonTransientConnectionException.class, true); properties = props; this.dbUrl = dbUrl; diff --git a/settings.gradle b/settings.gradle index abc8f4c001fc..314fcc7942d7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -34,6 +34,7 @@ include 'hive-metastore' include 'nessie' include 'gcp' include 'dell' +include 'snowflake' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -51,6 +52,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' project(':dell').name = 'iceberg-dell' +project(':snowflake').name = 'iceberg-snowflake' List knownFlinkVersions = System.getProperty("knownFlinkVersions").split(",") String flinkVersionsString = System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions") diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java new file mode 100644 index 000000000000..9730a5f3724b --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.sql.SQLException; +import java.util.List; +import org.apache.commons.dbutils.QueryRunner; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.jdbc.UncheckedInterruptedException; +import org.apache.iceberg.jdbc.UncheckedSQLException; +import org.apache.iceberg.snowflake.entities.SnowflakeSchema; +import org.apache.iceberg.snowflake.entities.SnowflakeTable; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implementation of SnowflakeClient builds on top of Snowflake's JDBC driver to interact with + * Snowflake's Iceberg-aware resource model. Despite using JDBC libraries, the resource model is + * derived from Snowflake's own first-class support for Iceberg tables as opposed to using an opaque + * JDBC layer to store Iceberg metadata itself in an Iceberg-agnostic database. + * + *

This thus differs from the JdbcCatalog in that Snowflake's service provides the source of + * truth of Iceberg metadata, rather than serving as a storage layer for a client-defined Iceberg + * resource model. + */ +public class JdbcSnowflakeClient implements SnowflakeClient { + public static final String EXPECTED_JDBC_IMPL = "net.snowflake.client.jdbc.SnowflakeDriver"; + + private static final Logger LOG = LoggerFactory.getLogger(JdbcSnowflakeClient.class); + private final JdbcClientPool connectionPool; + + JdbcSnowflakeClient(JdbcClientPool conn) { + connectionPool = conn; + } + + @Override + public List listSchemas(Namespace namespace) { + StringBuilder baseQuery = new StringBuilder("SHOW SCHEMAS"); + Object[] queryParams = null; + if (namespace == null || namespace.isEmpty()) { + // for empty or null namespace search for all schemas at account level where the user + // has access to list. + baseQuery.append(" IN ACCOUNT"); + } else { + // otherwise restrict listing of schema within the database. + baseQuery.append(" IN DATABASE IDENTIFIER(?)"); + queryParams = new Object[] {namespace.level(SnowflakeResources.NAMESPACE_DB_LEVEL - 1)}; + } + + final String finalQuery = baseQuery.toString(); + final Object[] finalQueryParams = queryParams; + QueryRunner run = new QueryRunner(true); + List schemas; + try { + schemas = + connectionPool.run( + conn -> + run.query(conn, finalQuery, SnowflakeSchema.createHandler(), finalQueryParams)); + } catch (SQLException e) { + throw new UncheckedSQLException( + e, + "Failed to list schemas for namespace %s", + namespace != null ? namespace.toString() : ""); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e, "Interrupted while listing schemas"); + } + return schemas; + } + + @Override + public List listIcebergTables(Namespace namespace) { + StringBuilder baseQuery = new StringBuilder("SHOW ICEBERG TABLES"); + Object[] queryParams = null; + if (namespace.length() == SnowflakeResources.MAX_NAMESPACE_DEPTH) { + // For two level namespace, search for iceberg tables within the given schema. + baseQuery.append(" IN SCHEMA IDENTIFIER(?)"); + queryParams = + new Object[] { + String.format( + "%s.%s", + namespace.level(SnowflakeResources.NAMESPACE_DB_LEVEL - 1), + namespace.level(SnowflakeResources.NAMESPACE_SCHEMA_LEVEL - 1)) + }; + } else if (namespace.length() == SnowflakeResources.NAMESPACE_DB_LEVEL) { + // For one level namespace, search for iceberg tables within the given database. + baseQuery.append(" IN DATABASE IDENTIFIER(?)"); + queryParams = new Object[] {namespace.level(SnowflakeResources.NAMESPACE_DB_LEVEL - 1)}; + } else { + // For empty or db level namespace, search at account level. + baseQuery.append(" IN ACCOUNT"); + } + + final String finalQuery = baseQuery.toString(); + final Object[] finalQueryParams = queryParams; + QueryRunner run = new QueryRunner(true); + List tables; + try { + tables = + connectionPool.run( + conn -> + run.query(conn, finalQuery, SnowflakeTable.createHandler(), finalQueryParams)); + } catch (SQLException e) { + throw new UncheckedSQLException( + e, "Failed to list tables for namespace %s", namespace.toString()); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e, "Interrupted while listing tables"); + } + return tables; + } + + @Override + public SnowflakeTableMetadata getTableMetadata(TableIdentifier tableIdentifier) { + QueryRunner run = new QueryRunner(true); + + SnowflakeTableMetadata tableMeta; + try { + final String finalQuery = "SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"; + tableMeta = + connectionPool.run( + conn -> + run.query( + conn, + finalQuery, + SnowflakeTableMetadata.createHandler(), + tableIdentifier.toString())); + } catch (SQLException e) { + throw new UncheckedSQLException( + e, "Failed to get table metadata for %s", tableIdentifier.toString()); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e, "Interrupted while getting table metadata"); + } + return tableMeta; + } + + @Override + public void close() { + connectionPool.close(); + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java new file mode 100644 index 000000000000..40259b54ff21 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.io.Closeable; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.snowflake.entities.SnowflakeSchema; +import org.apache.iceberg.snowflake.entities.SnowflakeTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnowflakeCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable { + + private static final Logger LOG = LoggerFactory.getLogger(SnowflakeCatalog.class); + + private Object conf; + private String catalogName = SnowflakeResources.DEFAULT_CATALOG_NAME; + private Map catalogProperties = null; + private FileIO fileIO; + private SnowflakeClient snowflakeClient; + + public SnowflakeCatalog() {} + + @VisibleForTesting + void setSnowflakeClient(SnowflakeClient snowflakeClient) { + this.snowflakeClient = snowflakeClient; + } + + @VisibleForTesting + void setFileIO(FileIO fileIO) { + this.fileIO = fileIO; + } + + @Override + public List listTables(Namespace namespace) { + LOG.debug("listTables with namespace: {}", namespace); + Preconditions.checkArgument( + namespace.length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH, + "Snowflake doesn't support more than %s levels of namespace, got %s", + SnowflakeResources.MAX_NAMESPACE_DEPTH, + namespace); + + List sfTables = snowflakeClient.listIcebergTables(namespace); + + return sfTables.stream() + .map( + table -> + TableIdentifier.of(table.getDatabase(), table.getSchemaName(), table.getName())) + .collect(Collectors.toList()); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + throw new UnsupportedOperationException( + String.format("dropTable not supported; attempted for table '%s'", identifier)); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + throw new UnsupportedOperationException( + String.format("renameTable not supported; attempted from '%s' to '%s'", from, to)); + } + + @Override + public void initialize(String name, Map properties) { + catalogProperties = properties; + + if (name != null) { + this.catalogName = name; + } + + if (snowflakeClient == null) { + String uri = properties.get(CatalogProperties.URI); + Preconditions.checkNotNull(uri, "JDBC connection URI is required"); + + try { + // We'll ensure the expected JDBC driver implementation class is initialized through + // reflection + // regardless of which classloader ends up using this JdbcSnowflakeClient, but we'll only + // warn if the expected driver fails to load, since users may use repackaged or custom + // JDBC drivers for Snowflake communcation. + Class.forName(JdbcSnowflakeClient.EXPECTED_JDBC_IMPL); + } catch (ClassNotFoundException cnfe) { + LOG.warn( + "Failed to load expected JDBC SnowflakeDriver - if queries fail by failing" + + " to find a suitable driver for jdbc:snowflake:// URIs, you must add the Snowflake " + + " JDBC driver to your jars/packages", + cnfe); + } + + JdbcClientPool connectionPool = new JdbcClientPool(uri, properties); + snowflakeClient = new JdbcSnowflakeClient(connectionPool); + } + + if (fileIO == null) { + String fileIOImpl = SnowflakeResources.DEFAULT_FILE_IO_IMPL; + + if (catalogProperties.containsKey(CatalogProperties.FILE_IO_IMPL)) { + fileIOImpl = catalogProperties.get(CatalogProperties.FILE_IO_IMPL); + } + + fileIO = CatalogUtil.loadFileIO(fileIOImpl, catalogProperties, conf); + } + } + + @Override + public void close() { + snowflakeClient.close(); + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + throw new UnsupportedOperationException( + String.format("createNamespace not supported; attempted for namespace '%s'", namespace)); + } + + @Override + public List listNamespaces(Namespace namespace) { + LOG.debug("listNamespaces with namespace: {}", namespace); + Preconditions.checkArgument( + namespace.length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH - 1, + "Snowflake doesn't support more than %s levels of namespace, tried to list under %s", + SnowflakeResources.MAX_NAMESPACE_DEPTH, + namespace); + List sfSchemas = snowflakeClient.listSchemas(namespace); + + List namespaceList = + sfSchemas.stream() + .map(schema -> Namespace.of(schema.getDatabase(), schema.getName())) + .collect(Collectors.toList()); + return namespaceList; + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) + throws NoSuchNamespaceException { + LOG.debug("loadNamespaceMetadata with namespace: {}", namespace); + Map nameSpaceMetadata = Maps.newHashMap(); + nameSpaceMetadata.put("name", namespace.toString()); + return nameSpaceMetadata; + } + + @Override + public boolean dropNamespace(Namespace namespace) { + throw new UnsupportedOperationException( + String.format("dropNamespace not supported; attempted for namespace '%s'", namespace)); + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) { + throw new UnsupportedOperationException( + String.format("setProperties not supported; attempted for namespace '%s'", namespace)); + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) { + throw new UnsupportedOperationException( + String.format("removeProperties not supported; attempted for namespace '%s'", namespace)); + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + Preconditions.checkArgument( + tableIdentifier.namespace().length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH, + "Snowflake doesn't support more than %s levels of namespace, got %s", + SnowflakeResources.MAX_NAMESPACE_DEPTH, + tableIdentifier); + + return new SnowflakeTableOperations( + snowflakeClient, fileIO, catalogProperties, catalogName, tableIdentifier); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return null; + } + + @Override + public void setConf(Object conf) { + this.conf = conf; + } + + public Object getConf() { + return conf; + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java new file mode 100644 index 000000000000..560acedbd45e --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.io.Closeable; +import java.util.List; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.snowflake.entities.SnowflakeSchema; +import org.apache.iceberg.snowflake.entities.SnowflakeTable; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; + +/** + * This interface abstracts out the underlying communication protocols for contacting Snowflake to + * obtain the various resource representations defined under "entities". Classes using this + * interface should minimize assumptions about whether an underlying client uses e.g. REST, JDBC or + * other underlying libraries/protocols. + */ +public interface SnowflakeClient extends Closeable { + List listSchemas(Namespace namespace); + + List listIcebergTables(Namespace namespace); + + SnowflakeTableMetadata getTableMetadata(TableIdentifier tableIdentifier); + + @Override + void close(); +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeResources.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeResources.java new file mode 100644 index 000000000000..16669253939b --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeResources.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +final class SnowflakeResources { + static final String DEFAULT_CATALOG_NAME = "snowlog"; + static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; + static final int MAX_NAMESPACE_DEPTH = 2; + static final int NAMESPACE_DB_LEVEL = 1; + static final int NAMESPACE_SCHEMA_LEVEL = 2; + + private SnowflakeResources() {} +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java new file mode 100644 index 000000000000..0da132331a61 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.Map; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SnowflakeTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(SnowflakeTableOperations.class); + private final String catalogName; + + private final FileIO fileIO; + private final TableIdentifier tableIdentifier; + + private final SnowflakeClient snowflakeClient; + + private final Map catalogProperties; + + protected SnowflakeTableOperations( + SnowflakeClient snowflakeClient, + FileIO fileIO, + Map properties, + String catalogName, + TableIdentifier tableIdentifier) { + this.snowflakeClient = snowflakeClient; + this.fileIO = fileIO; + this.catalogProperties = properties; + this.catalogName = catalogName; + this.tableIdentifier = tableIdentifier; + } + + @Override + public void doRefresh() { + LOG.debug("Getting metadata location for table {}", tableIdentifier); + String location = getTableMetadataLocation(); + Preconditions.checkState( + location != null && !location.isEmpty(), + "Got null or empty location %s for table %s", + location, + tableIdentifier); + refreshFromMetadataLocation(location); + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected String tableName() { + return tableIdentifier.toString(); + } + + private String getTableMetadataLocation() { + SnowflakeTableMetadata metadata = snowflakeClient.getTableMetadata(tableIdentifier); + + if (metadata == null) { + throw new NoSuchTableException("Cannot find table %s", tableIdentifier); + } + if (!metadata.getStatus().equals("success")) { + LOG.warn( + "Got non-successful table metadata: {} with metadataLocation {} for table {}", + metadata.getStatus(), + metadata.getIcebergMetadataLocation(), + tableIdentifier); + } + return metadata.getIcebergMetadataLocation(); + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeSchema.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeSchema.java new file mode 100644 index 000000000000..50410555ad48 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeSchema.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake.entities; + +import java.util.List; +import org.apache.commons.dbutils.ResultSetHandler; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class SnowflakeSchema { + private String name; + private String databaseName; + + public SnowflakeSchema(String databaseName, String name) { + this.databaseName = databaseName; + this.name = name; + } + + public String getName() { + return name; + } + + public String getDatabase() { + return databaseName; + } + + public static ResultSetHandler> createHandler() { + return rs -> { + List schemas = Lists.newArrayList(); + while (rs.next()) { + String databaseName = rs.getString("database_name"); + String name = rs.getString("name"); + schemas.add(new SnowflakeSchema(databaseName, name)); + } + return schemas; + }; + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeTable.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeTable.java new file mode 100644 index 000000000000..f619ed0ca7fa --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeTable.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake.entities; + +import java.util.List; +import org.apache.commons.dbutils.ResultSetHandler; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class SnowflakeTable { + private String databaseName; + private String schemaName; + private String name; + + public SnowflakeTable(String databaseName, String schemaName, String name) { + this.databaseName = databaseName; + this.schemaName = schemaName; + this.name = name; + } + + public String getName() { + return name; + } + + public String getDatabase() { + return databaseName; + } + + public String getSchemaName() { + return schemaName; + } + + public static ResultSetHandler> createHandler() { + return rs -> { + List tables = Lists.newArrayList(); + while (rs.next()) { + String databaseName = rs.getString("database_name"); + String schemaName = rs.getString("schema_name"); + String name = rs.getString("name"); + tables.add(new SnowflakeTable(databaseName, schemaName, name)); + } + return tables; + }; + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeTableMetadata.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeTableMetadata.java new file mode 100644 index 000000000000..554b7db3bab4 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/entities/SnowflakeTableMetadata.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake.entities; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.dbutils.ResultSetHandler; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +public class SnowflakeTableMetadata { + public static final Pattern SNOWFLAKE_AZURE_PATTERN = + Pattern.compile("azure://([^/]+)/([^/]+)/(.*)"); + + private String snowflakeMetadataLocation; + private String status; + private String icebergMetadataLocation; + + private String rawJsonVal; + + public SnowflakeTableMetadata( + String snowflakeMetadataLocation, + String icebergMetadataLocation, + String status, + String rawJsonVal) { + this.snowflakeMetadataLocation = snowflakeMetadataLocation; + this.icebergMetadataLocation = icebergMetadataLocation; + this.status = status; + this.rawJsonVal = rawJsonVal; + } + + /** Storage location of table metadata in Snowflake's path syntax. */ + public String getSnowflakeMetadataLocation() { + return snowflakeMetadataLocation; + } + + /** Storage location of table metadata in Iceberg's path syntax. */ + public String getIcebergMetadataLocation() { + return icebergMetadataLocation; + } + + public String getStatus() { + return status; + } + + /** + * Translates from Snowflake's path syntax to Iceberg's path syntax for paths matching known + * non-compatible Snowflake paths. Throws IllegalArgumentException if the prefix of the + * snowflakeLocation is a known non-compatible path syntax but fails to match the expected path + * components for a successful translation. + */ + public static String getIcebergLocationFromSnowflakeLocation(String snowflakeLocation) { + if (snowflakeLocation.startsWith("azure://")) { + // Convert from expected path of the form: + // azure://account.blob.core.windows.net/container/volumepath + // to: + // wasbs://container@account.blob.core.windows.net/volumepath + Matcher matcher = SNOWFLAKE_AZURE_PATTERN.matcher(snowflakeLocation); + Preconditions.checkArgument( + matcher.matches(), + "Location '%s' failed to match pattern '%s'", + snowflakeLocation, + SNOWFLAKE_AZURE_PATTERN); + return String.format( + "wasbs://%s@%s/%s", matcher.group(2), matcher.group(1), matcher.group(3)); + } else if (snowflakeLocation.startsWith("gcs://")) { + // Convert from expected path of the form: + // gcs://bucket/path + // to: + // gs://bucket/path + return "gs" + snowflakeLocation.substring(3); + } + return snowflakeLocation; + } + + /** + * Factory method for parsing a JSON string containing expected Snowflake table metadata into a + * SnowflakeTableMetadata object. + */ + public static SnowflakeTableMetadata parseJson(String json) { + JsonNode parsedVal; + try { + parsedVal = JsonUtil.mapper().readValue(json, JsonNode.class); + } catch (IOException ioe) { + throw new IllegalArgumentException(String.format("Malformed JSON: %s", json), ioe); + } + + String snowflakeMetadataLocation = JsonUtil.getString("metadataLocation", parsedVal); + String status = JsonUtil.getStringOrNull("status", parsedVal); + + String icebergMetadataLocation = + getIcebergLocationFromSnowflakeLocation(snowflakeMetadataLocation); + + return new SnowflakeTableMetadata( + snowflakeMetadataLocation, icebergMetadataLocation, status, json); + } + + public static ResultSetHandler createHandler() { + return rs -> { + if (!rs.next()) { + return null; + } + + String rawJsonVal = rs.getString("METADATA"); + return SnowflakeTableMetadata.parseJson(rawJsonVal); + }; + } +} diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java new file mode 100644 index 000000000000..1e5b723061e1 --- /dev/null +++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.jdbc.UncheckedSQLException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.snowflake.entities.SnowflakeSchema; +import org.apache.iceberg.snowflake.entities.SnowflakeTable; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; + +public class FakeSnowflakeClient implements SnowflakeClient { + // In-memory lookup by database/schema/tableName to table metadata. + private Map>> databases = + Maps.newTreeMap(); + + public FakeSnowflakeClient() {} + + /** + * Also adds parent database/schema if they don't already exist. If the tableName already exists + * under the given database/schema, the value is replaced with the provided metadata. + */ + public void addTable( + String database, String schema, String tableName, SnowflakeTableMetadata metadata) { + if (!databases.containsKey(database)) { + databases.put(database, Maps.newTreeMap()); + } + Map> schemas = databases.get(database); + if (!schemas.containsKey(schema)) { + schemas.put(schema, Maps.newTreeMap()); + } + Map tables = schemas.get(schema); + tables.put(tableName, metadata); + } + + @Override + public List listSchemas(Namespace namespace) { + Preconditions.checkArgument( + namespace.length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH, + "Namespace {} must have namespace of length <= {}", + namespace, + SnowflakeResources.MAX_NAMESPACE_DEPTH); + List schemas = Lists.newArrayList(); + if (namespace.length() == 0) { + // "account-level" listing. + for (Map.Entry>> db : + databases.entrySet()) { + for (String schema : db.getValue().keySet()) { + schemas.add(new SnowflakeSchema(db.getKey(), schema)); + } + } + } else if (namespace.length() == SnowflakeResources.NAMESPACE_DB_LEVEL) { + String dbName = namespace.level(SnowflakeResources.NAMESPACE_DB_LEVEL - 1); + if (databases.containsKey(dbName)) { + for (String schema : databases.get(dbName).keySet()) { + schemas.add(new SnowflakeSchema(dbName, schema)); + } + } else { + throw new UncheckedSQLException("Nonexistent database: '%s'", dbName); + } + } else { + throw new IllegalArgumentException( + String.format( + "Tried to listSchemas using a namespace with too many levels: '%s'", namespace)); + } + return schemas; + } + + @Override + public List listIcebergTables(Namespace namespace) { + Preconditions.checkArgument( + namespace.length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH, + "Namespace {} must have namespace of length <= {}", + namespace, + SnowflakeResources.MAX_NAMESPACE_DEPTH); + List tables = Lists.newArrayList(); + if (namespace.length() == 0) { + // "account-level" listing. + for (Map.Entry>> db : + databases.entrySet()) { + for (Map.Entry> schema : + db.getValue().entrySet()) { + for (String tableName : schema.getValue().keySet()) { + tables.add(new SnowflakeTable(db.getKey(), schema.getKey(), tableName)); + } + } + } + } else if (namespace.length() == SnowflakeResources.NAMESPACE_DB_LEVEL) { + String dbName = namespace.level(SnowflakeResources.NAMESPACE_DB_LEVEL - 1); + if (databases.containsKey(dbName)) { + for (Map.Entry> schema : + databases.get(dbName).entrySet()) { + for (String tableName : schema.getValue().keySet()) { + tables.add(new SnowflakeTable(dbName, schema.getKey(), tableName)); + } + } + } else { + throw new UncheckedSQLException("Nonexistent database: '%s'", dbName); + } + } else { + String dbName = namespace.level(SnowflakeResources.NAMESPACE_DB_LEVEL - 1); + if (databases.containsKey(dbName)) { + String schemaName = namespace.level(SnowflakeResources.NAMESPACE_SCHEMA_LEVEL - 1); + if (databases.get(dbName).containsKey(schemaName)) { + for (String tableName : databases.get(dbName).get(schemaName).keySet()) { + tables.add(new SnowflakeTable(dbName, schemaName, tableName)); + } + } else { + throw new UncheckedSQLException( + "Nonexistent datbase.schema: '%s.%s'", dbName, schemaName); + } + } else { + throw new UncheckedSQLException("Nonexistent database: '%s'", dbName); + } + } + return tables; + } + + @Override + public SnowflakeTableMetadata getTableMetadata(TableIdentifier tableIdentifier) { + Namespace ns = tableIdentifier.namespace(); + Preconditions.checkArgument( + ns.length() == SnowflakeResources.MAX_NAMESPACE_DEPTH, + "TableIdentifier {} must have namespace of length {}", + tableIdentifier, + SnowflakeResources.MAX_NAMESPACE_DEPTH); + String dbName = ns.level(SnowflakeResources.NAMESPACE_DB_LEVEL - 1); + String schemaName = ns.level(SnowflakeResources.NAMESPACE_SCHEMA_LEVEL - 1); + if (!databases.containsKey(dbName) + || !databases.get(dbName).containsKey(schemaName) + || !databases.get(dbName).get(schemaName).containsKey(tableIdentifier.name())) { + throw new UncheckedSQLException("Nonexistent object: '%s'", tableIdentifier); + } + return databases.get(dbName).get(schemaName).get(tableIdentifier.name()); + } + + @Override + public void close() {} +} diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/InMemoryFileIO.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/InMemoryFileIO.java new file mode 100644 index 000000000000..3873375f8e89 --- /dev/null +++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/InMemoryFileIO.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.Map; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InMemoryInputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class InMemoryFileIO implements FileIO { + + private Map inMemoryFiles = Maps.newHashMap(); + + public void addFile(String path, byte[] contents) { + inMemoryFiles.put(path, new InMemoryInputFile(path, contents)); + } + + @Override + public InputFile newInputFile(String path) { + if (!inMemoryFiles.containsKey(path)) { + throw new NotFoundException("No in-memory file found for path: %s", path); + } + return inMemoryFiles.get(path); + } + + @Override + public OutputFile newOutputFile(String path) { + return null; + } + + @Override + public void deleteFile(String path) {} +} diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java new file mode 100644 index 000000000000..f8f82efb60eb --- /dev/null +++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnowflakeCatalogTest { + + static final String TEST_CATALOG_NAME = "slushLog"; + private SnowflakeCatalog catalog; + + @Before + public void before() { + catalog = new SnowflakeCatalog(); + + FakeSnowflakeClient client = new FakeSnowflakeClient(); + client.addTable( + "DB_1", + "SCHEMA_1", + "TAB_1", + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"s3://tab1/metadata/v3.metadata.json\",\"status\":\"success\"}")); + client.addTable( + "DB_1", + "SCHEMA_1", + "TAB_2", + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"s3://tab2/metadata/v1.metadata.json\",\"status\":\"success\"}")); + client.addTable( + "DB_2", + "SCHEMA_2", + "TAB_3", + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json\",\"status\":\"success\"}")); + client.addTable( + "DB_2", + "SCHEMA_2", + "TAB_4", + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab4/metadata/v323.metadata.json\",\"status\":\"success\"}")); + client.addTable( + "DB_3", + "SCHEMA_3", + "TAB_5", + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"gcs://tab5/metadata/v793.metadata.json\",\"status\":\"success\"}")); + client.addTable( + "DB_3", + "SCHEMA_4", + "TAB_6", + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"gcs://tab6/metadata/v123.metadata.json\",\"status\":\"success\"}")); + + catalog.setSnowflakeClient(client); + + InMemoryFileIO fakeFileIO = new InMemoryFileIO(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "x", Types.StringType.get(), "comment1"), + Types.NestedField.required(2, "y", Types.StringType.get(), "comment2")); + PartitionSpec partitionSpec = + PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build(); + fakeFileIO.addFile( + "s3://tab1/metadata/v3.metadata.json", + TableMetadataParser.toJson( + TableMetadata.newTableMetadata( + schema, partitionSpec, "s3://tab1/", ImmutableMap.of())) + .getBytes()); + fakeFileIO.addFile( + "wasbs://mycontainer@myaccount.blob.core.windows.net/tab3/metadata/v334.metadata.json", + TableMetadataParser.toJson( + TableMetadata.newTableMetadata( + schema, + partitionSpec, + "wasbs://mycontainer@myaccount.blob.core.windows.net/tab1/", + ImmutableMap.of())) + .getBytes()); + fakeFileIO.addFile( + "gs://tab5/metadata/v793.metadata.json", + TableMetadataParser.toJson( + TableMetadata.newTableMetadata( + schema, partitionSpec, "gs://tab5/", ImmutableMap.of())) + .getBytes()); + + catalog.setFileIO(fakeFileIO); + + Map properties = Maps.newHashMap(); + catalog.initialize(TEST_CATALOG_NAME, properties); + } + + @Test + public void testListNamespace() { + List namespaces = catalog.listNamespaces(); + Assert.assertEquals( + Lists.newArrayList( + Namespace.of("DB_1", "SCHEMA_1"), + Namespace.of("DB_2", "SCHEMA_2"), + Namespace.of("DB_3", "SCHEMA_3"), + Namespace.of("DB_3", "SCHEMA_4")), + namespaces); + } + + @Test + public void testListNamespaceWithinDB() { + String dbName = "DB_1"; + List namespaces = catalog.listNamespaces(Namespace.of(dbName)); + Assert.assertEquals(Lists.newArrayList(Namespace.of(dbName, "SCHEMA_1")), namespaces); + } + + @Test + public void testListNamespaceWithinNonExistentDB() { + // Existence check for nonexistent parent namespaces is optional in the SupportsNamespaces + // interface. + String dbName = "NONEXISTENT_DB"; + Assert.assertThrows(RuntimeException.class, () -> catalog.listNamespaces(Namespace.of(dbName))); + } + + @Test + public void testListNamespaceWithinSchema() { + // No "sub-namespaces" beyond database.schema; invalid to try to list namespaces given + // a database.schema. + String dbName = "DB_3"; + String schemaName = "SCHEMA_4"; + Assert.assertThrows( + IllegalArgumentException.class, + () -> catalog.listNamespaces(Namespace.of(dbName, schemaName))); + } + + @Test + public void testListTables() { + List tables = catalog.listTables(Namespace.empty()); + Assert.assertEquals( + Lists.newArrayList( + TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_1"), + TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_2"), + TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_3"), + TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_4"), + TableIdentifier.of("DB_3", "SCHEMA_3", "TAB_5"), + TableIdentifier.of("DB_3", "SCHEMA_4", "TAB_6")), + tables); + } + + @Test + public void testListTablesWithinDB() { + String dbName = "DB_1"; + List tables = catalog.listTables(Namespace.of(dbName)); + Assert.assertEquals( + Lists.newArrayList( + TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_1"), + TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_2")), + tables); + } + + @Test + public void testListTablesWithinNonexistentDB() { + String dbName = "NONEXISTENT_DB"; + Assert.assertThrows(RuntimeException.class, () -> catalog.listTables(Namespace.of(dbName))); + } + + @Test + public void testListTablesWithinSchema() { + String dbName = "DB_2"; + String schemaName = "SCHEMA_2"; + List tables = catalog.listTables(Namespace.of(dbName, schemaName)); + Assert.assertEquals( + Lists.newArrayList( + TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_3"), + TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_4")), + tables); + } + + @Test + public void testListTablesWithinNonexistentSchema() { + String dbName = "DB_2"; + String schemaName = "NONEXISTENT_DB"; + Assert.assertThrows( + RuntimeException.class, () -> catalog.listTables(Namespace.of(dbName, schemaName))); + } + + @Test + public void testLoadS3Table() { + Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_1", "SCHEMA_1"), "TAB_1")); + Assert.assertEquals(table.location(), "s3://tab1/"); + } + + @Test + public void testLoadAzureTable() { + Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_2", "SCHEMA_2"), "TAB_3")); + Assert.assertEquals( + table.location(), "wasbs://mycontainer@myaccount.blob.core.windows.net/tab1/"); + } + + @Test + public void testLoadGcsTable() { + Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_3", "SCHEMA_3"), "TAB_5")); + Assert.assertEquals(table.location(), "gs://tab5/"); + } +} diff --git a/spark/v3.3/build.gradle b/spark/v3.3/build.gradle index 577700787e69..30f3eb02bb5f 100644 --- a/spark/v3.3/build.gradle +++ b/spark/v3.3/build.gradle @@ -221,6 +221,9 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio implementation(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation (project(':iceberg-snowflake')) { + exclude group: 'net.snowflake' , module: 'snowflake-jdbc' + } integrationImplementation "org.scala-lang.modules:scala-collection-compat_${scalaVersion}" integrationImplementation "org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}"