diff --git a/.github/labeler.yml b/.github/labeler.yml index 521e1a42aaae..c623fbc6dd4c 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -81,4 +81,6 @@ ALIYUN: GCP: - gcp/**/* DELL: - - dell/**/* \ No newline at end of file + - dell/**/* +SNOWFLAKE: + - snowflake/**/* diff --git a/build.gradle b/build.gradle index 0fba8cf1a901..7b14f3b73163 100644 --- a/build.gradle +++ b/build.gradle @@ -697,6 +697,24 @@ project(':iceberg-dell') { } } +project(':iceberg-snowflake') { + test { + useJUnitPlatform() + } + + dependencies { + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation "com.fasterxml.jackson.core:jackson-databind" + implementation "com.fasterxml.jackson.core:jackson-core" + + runtimeOnly("net.snowflake:snowflake-jdbc") + + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') + } +} + @Memoized boolean versionFileExists() { return file('version.txt').exists() diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java index daa04908f41e..60e5eb49a4f8 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java @@ -27,12 +27,12 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ClientPoolImpl; -class JdbcClientPool extends ClientPoolImpl { +public class JdbcClientPool extends ClientPoolImpl { private final String dbUrl; private final Map properties; - JdbcClientPool(String dbUrl, Map props) { + public JdbcClientPool(String dbUrl, Map props) { this( Integer.parseInt( props.getOrDefault( @@ -42,7 +42,7 @@ class JdbcClientPool extends ClientPoolImpl { props); } - JdbcClientPool(int poolSize, String dbUrl, Map props) { + public JdbcClientPool(int poolSize, String dbUrl, Map props) { super(poolSize, SQLNonTransientConnectionException.class, true); properties = props; this.dbUrl = dbUrl; diff --git a/settings.gradle b/settings.gradle index d1a14abe5b0f..c5ac07e080c2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -34,6 +34,7 @@ include 'hive-metastore' include 'nessie' include 'gcp' include 'dell' +include 'snowflake' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -51,6 +52,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' project(':dell').name = 'iceberg-dell' +project(':snowflake').name = 'iceberg-snowflake' if (null != System.getProperty("allVersions")) { System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions")) diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java new file mode 100644 index 000000000000..1618f76c10e5 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/JdbcSnowflakeClient.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.jdbc.UncheckedInterruptedException; +import org.apache.iceberg.jdbc.UncheckedSQLException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * This implementation of SnowflakeClient builds on top of Snowflake's JDBC driver to interact with + * Snowflake's Iceberg-aware resource model. + */ +class JdbcSnowflakeClient implements SnowflakeClient { + static final String EXPECTED_JDBC_IMPL = "net.snowflake.client.jdbc.SnowflakeDriver"; + + @FunctionalInterface + interface ResultSetParser { + T parse(ResultSet rs) throws SQLException; + } + + /** + * This class wraps the basic boilerplate of setting up PreparedStatements and applying a + * ResultSetParser to translate a ResultSet into parsed objects. Allows easily injecting + * subclasses for debugging/testing purposes. + */ + static class QueryHarness { + public T query(Connection conn, String sql, ResultSetParser parser, String... args) + throws SQLException { + try (PreparedStatement statement = conn.prepareStatement(sql)) { + if (args != null) { + for (int i = 0; i < args.length; ++i) { + statement.setString(i + 1, args[i]); + } + } + + try (ResultSet rs = statement.executeQuery()) { + return parser.parse(rs); + } + } + } + } + + /** + * Expects to handle ResultSets representing fully-qualified Snowflake Database identifiers, + * containing "name" (representing databaseName). + */ + public static final ResultSetParser> DATABASE_RESULT_SET_HANDLER = + rs -> { + List databases = Lists.newArrayList(); + while (rs.next()) { + String databaseName = rs.getString("name"); + databases.add(SnowflakeIdentifier.ofDatabase(databaseName)); + } + return databases; + }; + + /** + * Expects to handle ResultSets representing fully-qualified Snowflake Schema identifiers, + * containing "database_name" and "name" (representing schemaName). + */ + public static final ResultSetParser> SCHEMA_RESULT_SET_HANDLER = + rs -> { + List schemas = Lists.newArrayList(); + while (rs.next()) { + String databaseName = rs.getString("database_name"); + String schemaName = rs.getString("name"); + schemas.add(SnowflakeIdentifier.ofSchema(databaseName, schemaName)); + } + return schemas; + }; + + /** + * Expects to handle ResultSets representing fully-qualified Snowflake Table identifiers, + * containing "database_name", "schema_name", and "name" (representing tableName). + */ + public static final ResultSetParser> TABLE_RESULT_SET_HANDLER = + rs -> { + List tables = Lists.newArrayList(); + while (rs.next()) { + String databaseName = rs.getString("database_name"); + String schemaName = rs.getString("schema_name"); + String tableName = rs.getString("name"); + tables.add(SnowflakeIdentifier.ofTable(databaseName, schemaName, tableName)); + } + return tables; + }; + + /** + * Expects to handle ResultSets representing a single record holding Snowflake Iceberg metadata. + */ + public static final ResultSetParser TABLE_METADATA_RESULT_SET_HANDLER = + rs -> { + if (!rs.next()) { + return null; + } + + String rawJsonVal = rs.getString("METADATA"); + return SnowflakeTableMetadata.parseJson(rawJsonVal); + }; + + private final JdbcClientPool connectionPool; + private QueryHarness queryHarness; + + JdbcSnowflakeClient(JdbcClientPool conn) { + Preconditions.checkArgument(null != conn, "JdbcClientPool must be non-null"); + connectionPool = conn; + queryHarness = new QueryHarness(); + } + + @VisibleForTesting + void setQueryHarness(QueryHarness queryHarness) { + this.queryHarness = queryHarness; + } + + /** + * For rare cases where PreparedStatements aren't supported for user-supplied identifiers intended + * for use in special LIKE clauses, we can sanitize by "broadening" the identifier with + * single-character wildcards and manually post-filter client-side. + * + *

Note: This sanitization approach intentionally "broadens" the scope of matching results; + * callers must be able to handle this method returning an all-wildcard expression; i.e. the + * caller must treat the usage of the LIKE clause as only an optional optimization, and should + * post-filter for correctness as if the LIKE clause wasn't present in the query at all. + */ + @VisibleForTesting + String sanitizeIdentifierWithWildcardForLikeClause(String identifier) { + // Restrict identifiers to the "Unquoted object identifiers" synax documented at + // https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html + // + // Use a strict allowlist of characters, replace everything *not* matching the character set + // with "_", which is used as a single-character wildcard in Snowflake. + String sanitized = identifier.replaceAll("[^a-zA-Z0-9_$]", "_"); + if (sanitized.startsWith("$")) { + sanitized = "_" + sanitized.substring(1); + } + return sanitized; + } + + @Override + public boolean databaseExists(SnowflakeIdentifier database) { + Preconditions.checkArgument( + database.type() == SnowflakeIdentifier.Type.DATABASE, + "databaseExists requires a DATABASE identifier, got '%s'", + database); + + // Due to current limitations in PreparedStatement parameters for the LIKE clause in + // SHOW DATABASES queries, we'll use a fairly limited allowlist for identifier characters, + // using wildcards for non-allowed characters, and post-filter for matching. + final String finalQuery = + String.format( + "SHOW DATABASES LIKE '%s' IN ACCOUNT", + sanitizeIdentifierWithWildcardForLikeClause(database.databaseName())); + List databases; + try { + databases = + connectionPool.run( + conn -> queryHarness.query(conn, finalQuery, DATABASE_RESULT_SET_HANDLER)); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to check if database '%s' exists", database); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException( + e, "Interrupted while checking if database '%s' exists", database); + } + + // Filter to handle the edge case of '_' appearing as a wildcard that can't be remapped the way + // it can for predicates in SELECT statements. + databases.removeIf(db -> !database.databaseName().equalsIgnoreCase(db.databaseName())); + return !databases.isEmpty(); + } + + @Override + public boolean schemaExists(SnowflakeIdentifier schema) { + Preconditions.checkArgument( + schema.type() == SnowflakeIdentifier.Type.SCHEMA, + "schemaExists requires a SCHEMA identifier, got '%s'", + schema); + + if (!databaseExists(SnowflakeIdentifier.ofDatabase(schema.databaseName()))) { + return false; + } + + // Due to current limitations in PreparedStatement parameters for the LIKE clause in + // SHOW SCHEMAS queries, we'll use a fairly limited allowlist for identifier characters, + // using wildcards for non-allowed characters, and post-filter for matching. + final String finalQuery = + String.format( + "SHOW SCHEMAS LIKE '%s' IN DATABASE IDENTIFIER(?)", + sanitizeIdentifierWithWildcardForLikeClause(schema.schemaName())); + List schemas; + try { + schemas = + connectionPool.run( + conn -> + queryHarness.query( + conn, finalQuery, SCHEMA_RESULT_SET_HANDLER, schema.databaseName())); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to check if schema '%s' exists", schema); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException( + e, "Interrupted while checking if schema '%s' exists", schema); + } + + // Filter to handle the edge case of '_' appearing as a wildcard that can't be remapped the way + // it can for predicates in SELECT statements. + schemas.removeIf(sc -> !schema.schemaName().equalsIgnoreCase(sc.schemaName())); + return !schemas.isEmpty(); + } + + @Override + public List listDatabases() { + List databases; + try { + databases = + connectionPool.run( + conn -> + queryHarness.query( + conn, "SHOW DATABASES IN ACCOUNT", DATABASE_RESULT_SET_HANDLER)); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to list databases"); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException(e, "Interrupted while listing databases"); + } + databases.forEach( + db -> + Preconditions.checkState( + db.type() == SnowflakeIdentifier.Type.DATABASE, + "Expected DATABASE, got identifier '%s'", + db)); + return databases; + } + + @Override + public List listSchemas(SnowflakeIdentifier scope) { + StringBuilder baseQuery = new StringBuilder("SHOW SCHEMAS"); + String[] queryParams = null; + switch (scope.type()) { + case ROOT: + // account-level listing + baseQuery.append(" IN ACCOUNT"); + break; + case DATABASE: + // database-level listing + baseQuery.append(" IN DATABASE IDENTIFIER(?)"); + queryParams = new String[] {scope.toIdentifierString()}; + break; + default: + throw new IllegalArgumentException( + String.format("Unsupported scope type for listSchemas: %s", scope)); + } + + final String finalQuery = baseQuery.toString(); + final String[] finalQueryParams = queryParams; + List schemas; + try { + schemas = + connectionPool.run( + conn -> + queryHarness.query( + conn, finalQuery, SCHEMA_RESULT_SET_HANDLER, finalQueryParams)); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to list schemas for scope '%s'", scope); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException( + e, "Interrupted while listing schemas for scope '%s'", scope); + } + schemas.forEach( + schema -> + Preconditions.checkState( + schema.type() == SnowflakeIdentifier.Type.SCHEMA, + "Expected SCHEMA, got identifier '%s' for scope '%s'", + schema, + scope)); + return schemas; + } + + @Override + public List listIcebergTables(SnowflakeIdentifier scope) { + StringBuilder baseQuery = new StringBuilder("SHOW ICEBERG TABLES"); + String[] queryParams = null; + switch (scope.type()) { + case ROOT: + // account-level listing + baseQuery.append(" IN ACCOUNT"); + break; + case DATABASE: + // database-level listing + baseQuery.append(" IN DATABASE IDENTIFIER(?)"); + queryParams = new String[] {scope.toIdentifierString()}; + break; + case SCHEMA: + // schema-level listing + baseQuery.append(" IN SCHEMA IDENTIFIER(?)"); + queryParams = new String[] {scope.toIdentifierString()}; + break; + default: + throw new IllegalArgumentException( + String.format("Unsupported scope type for listIcebergTables: %s", scope)); + } + + final String finalQuery = baseQuery.toString(); + final String[] finalQueryParams = queryParams; + List tables; + try { + tables = + connectionPool.run( + conn -> + queryHarness.query(conn, finalQuery, TABLE_RESULT_SET_HANDLER, finalQueryParams)); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to list tables for scope '%s'", scope); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException( + e, "Interrupted while listing tables for scope '%s'", scope); + } + tables.forEach( + table -> + Preconditions.checkState( + table.type() == SnowflakeIdentifier.Type.TABLE, + "Expected TABLE, got identifier '%s' for scope '%s'", + table, + scope)); + return tables; + } + + @Override + public SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier tableIdentifier) { + Preconditions.checkArgument( + tableIdentifier.type() == SnowflakeIdentifier.Type.TABLE, + "loadTableMetadata requires a TABLE identifier, got '%s'", + tableIdentifier); + SnowflakeTableMetadata tableMeta; + try { + final String finalQuery = "SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"; + tableMeta = + connectionPool.run( + conn -> + queryHarness.query( + conn, + finalQuery, + TABLE_METADATA_RESULT_SET_HANDLER, + tableIdentifier.toIdentifierString())); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to get table metadata for '%s'", tableIdentifier); + } catch (InterruptedException e) { + throw new UncheckedInterruptedException( + e, "Interrupted while getting table metadata for '%s'", tableIdentifier); + } + return tableMeta; + } + + @Override + public void close() { + connectionPool.close(); + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java new file mode 100644 index 000000000000..28dacbca9817 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/NamespaceHelpers.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class NamespaceHelpers { + private static final int MAX_NAMESPACE_DEPTH = 2; + private static final int NAMESPACE_ROOT_LEVEL = 0; + private static final int NAMESPACE_DB_LEVEL = 1; + private static final int NAMESPACE_SCHEMA_LEVEL = 2; + + private NamespaceHelpers() {} + + /** + * Converts a Namespace into a SnowflakeIdentifier representing ROOT, a DATABASE, or a SCHEMA. + * + * @throws IllegalArgumentException if the namespace is not a supported depth. + */ + public static SnowflakeIdentifier toSnowflakeIdentifier(Namespace namespace) { + switch (namespace.length()) { + case NAMESPACE_ROOT_LEVEL: + return SnowflakeIdentifier.ofRoot(); + case NAMESPACE_DB_LEVEL: + return SnowflakeIdentifier.ofDatabase(namespace.level(NAMESPACE_DB_LEVEL - 1)); + case NAMESPACE_SCHEMA_LEVEL: + return SnowflakeIdentifier.ofSchema( + namespace.level(NAMESPACE_DB_LEVEL - 1), namespace.level(NAMESPACE_SCHEMA_LEVEL - 1)); + default: + throw new IllegalArgumentException( + String.format( + "Snowflake max namespace level is %d, got namespace '%s'", + MAX_NAMESPACE_DEPTH, namespace)); + } + } + + /** + * Converts a TableIdentifier into a SnowflakeIdentifier of type TABLE; the identifier must have + * exactly the right namespace depth to represent a fully-qualified Snowflake table identifier. + */ + public static SnowflakeIdentifier toSnowflakeIdentifier(TableIdentifier identifier) { + SnowflakeIdentifier namespaceScope = toSnowflakeIdentifier(identifier.namespace()); + Preconditions.checkArgument( + namespaceScope.type() == SnowflakeIdentifier.Type.SCHEMA, + "Namespace portion of '%s' must be at the SCHEMA level, got namespaceScope '%s'", + identifier, + namespaceScope); + return SnowflakeIdentifier.ofTable( + namespaceScope.databaseName(), namespaceScope.schemaName(), identifier.name()); + } + + /** + * Converts a SnowflakeIdentifier of type ROOT, DATABASE, or SCHEMA into an equivalent Iceberg + * Namespace; throws IllegalArgumentException if not an appropriate type. + */ + public static Namespace toIcebergNamespace(SnowflakeIdentifier identifier) { + switch (identifier.type()) { + case ROOT: + return Namespace.empty(); + case DATABASE: + return Namespace.of(identifier.databaseName()); + case SCHEMA: + return Namespace.of(identifier.databaseName(), identifier.schemaName()); + default: + throw new IllegalArgumentException( + String.format("Cannot convert identifier '%s' to Namespace", identifier)); + } + } + + /** + * Converts a SnowflakeIdentifier to an equivalent Iceberg TableIdentifier; the identifier must be + * of type TABLE. + */ + public static TableIdentifier toIcebergTableIdentifier(SnowflakeIdentifier identifier) { + Preconditions.checkArgument( + identifier.type() == SnowflakeIdentifier.Type.TABLE, + "SnowflakeIdentifier must be type TABLE, got '%s'", + identifier); + return TableIdentifier.of( + identifier.databaseName(), identifier.schemaName(), identifier.tableName()); + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java new file mode 100644 index 000000000000..19302d578497 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnowflakeCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable { + private static final String DEFAULT_CATALOG_NAME = "snowflake_catalog"; + private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO"; + + // Injectable factory for testing purposes. + static class FileIOFactory { + public FileIO newFileIO(String impl, Map properties, Object hadoopConf) { + return CatalogUtil.loadFileIO(impl, properties, hadoopConf); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(SnowflakeCatalog.class); + + private CloseableGroup closeableGroup; + private Object conf; + private String catalogName; + private Map catalogProperties; + private FileIOFactory fileIOFactory; + private SnowflakeClient snowflakeClient; + + public SnowflakeCatalog() {} + + @Override + public List listTables(Namespace namespace) { + SnowflakeIdentifier scope = NamespaceHelpers.toSnowflakeIdentifier(namespace); + Preconditions.checkArgument( + scope.type() == SnowflakeIdentifier.Type.SCHEMA, + "listTables must be at SCHEMA level; got %s from namespace %s", + scope, + namespace); + + List sfTables = snowflakeClient.listIcebergTables(scope); + + return sfTables.stream() + .map(NamespaceHelpers::toIcebergTableIdentifier) + .collect(Collectors.toList()); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + throw new UnsupportedOperationException( + "SnowflakeCatalog does not currently support dropTable"); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + throw new UnsupportedOperationException( + "SnowflakeCatalog does not currently support renameTable"); + } + + @Override + public void initialize(String name, Map properties) { + String uri = properties.get(CatalogProperties.URI); + Preconditions.checkArgument(null != uri, "JDBC connection URI is required"); + try { + // We'll ensure the expected JDBC driver implementation class is initialized through + // reflection regardless of which classloader ends up using this JdbcSnowflakeClient, but + // we'll only warn if the expected driver fails to load, since users may use repackaged or + // custom JDBC drivers for Snowflake communication. + Class.forName(JdbcSnowflakeClient.EXPECTED_JDBC_IMPL); + } catch (ClassNotFoundException cnfe) { + LOG.warn( + "Failed to load expected JDBC SnowflakeDriver - if queries fail by failing" + + " to find a suitable driver for jdbc:snowflake:// URIs, you must add the Snowflake " + + " JDBC driver to your jars/packages", + cnfe); + } + JdbcClientPool connectionPool = new JdbcClientPool(uri, properties); + + initialize(name, new JdbcSnowflakeClient(connectionPool), new FileIOFactory(), properties); + } + + /** + * Initialize using caller-supplied SnowflakeClient and FileIO. + * + * @param name The name of the catalog, defaults to "snowflake_catalog" + * @param snowflakeClient The client encapsulating network communication with Snowflake + * @param fileIOFactory The {@link FileIOFactory} to use to instantiate a new FileIO for each new + * table operation + * @param properties The catalog options to use and propagate to dependencies + */ + @SuppressWarnings("checkstyle:HiddenField") + void initialize( + String name, + SnowflakeClient snowflakeClient, + FileIOFactory fileIOFactory, + Map properties) { + Preconditions.checkArgument(null != snowflakeClient, "snowflakeClient must be non-null"); + Preconditions.checkArgument(null != fileIOFactory, "fileIOFactory must be non-null"); + this.catalogName = name == null ? DEFAULT_CATALOG_NAME : name; + this.snowflakeClient = snowflakeClient; + this.fileIOFactory = fileIOFactory; + this.catalogProperties = properties; + this.closeableGroup = new CloseableGroup(); + closeableGroup.addCloseable(snowflakeClient); + closeableGroup.setSuppressCloseFailure(true); + } + + @Override + public void close() throws IOException { + if (null != closeableGroup) { + closeableGroup.close(); + } + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + throw new UnsupportedOperationException( + "SnowflakeCatalog does not currently support createNamespace"); + } + + @Override + public List listNamespaces(Namespace namespace) { + SnowflakeIdentifier scope = NamespaceHelpers.toSnowflakeIdentifier(namespace); + List results = null; + switch (scope.type()) { + case ROOT: + results = snowflakeClient.listDatabases(); + break; + case DATABASE: + results = snowflakeClient.listSchemas(scope); + break; + default: + throw new IllegalArgumentException( + String.format( + "listNamespaces must be at either ROOT or DATABASE level; got %s from namespace %s", + scope, namespace)); + } + + return results.stream().map(NamespaceHelpers::toIcebergNamespace).collect(Collectors.toList()); + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) + throws NoSuchNamespaceException { + SnowflakeIdentifier id = NamespaceHelpers.toSnowflakeIdentifier(namespace); + boolean namespaceExists; + switch (id.type()) { + case DATABASE: + namespaceExists = snowflakeClient.databaseExists(id); + break; + case SCHEMA: + namespaceExists = snowflakeClient.schemaExists(id); + break; + default: + throw new IllegalArgumentException( + String.format( + "loadNamespaceMetadat must be at either DATABASE or SCHEMA level; got %s from namespace %s", + id, namespace)); + } + if (namespaceExists) { + return ImmutableMap.of(); + } else { + throw new NoSuchNamespaceException( + "Namespace '%s' with snowflake identifier '%s' doesn't exist", namespace, id); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) { + throw new UnsupportedOperationException( + "SnowflakeCatalog does not currently support dropNamespace"); + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) { + throw new UnsupportedOperationException( + "SnowflakeCatalog does not currently support setProperties"); + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) { + throw new UnsupportedOperationException( + "SnowflakeCatalog does not currently support removeProperties"); + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + String fileIOImpl = DEFAULT_FILE_IO_IMPL; + if (catalogProperties.containsKey(CatalogProperties.FILE_IO_IMPL)) { + fileIOImpl = catalogProperties.get(CatalogProperties.FILE_IO_IMPL); + } + + // Initialize a fresh FileIO for each TableOperations created, because some FileIO + // implementations such as S3FileIO can become bound to a single S3 bucket. Additionally, + // FileIO implementations often support only a finite set of one or more URI schemes (i.e. + // S3FileIO only supports s3/s3a/s3n, and even ResolvingFileIO only supports the combination + // of schemes registered for S3FileIO and HadoopFileIO). Individual catalogs may need to + // support tables across different cloud/storage providers with disjoint FileIO implementations. + FileIO fileIO = fileIOFactory.newFileIO(fileIOImpl, catalogProperties, conf); + closeableGroup.addCloseable(fileIO); + return new SnowflakeTableOperations(snowflakeClient, fileIO, catalogName, tableIdentifier); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + throw new UnsupportedOperationException( + "SnowflakeCatalog does not currently support defaultWarehouseLocation"); + } + + @Override + public void setConf(Object conf) { + this.conf = conf; + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java new file mode 100644 index 000000000000..2dfadb9a65b4 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeClient.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.io.Closeable; +import java.util.List; + +/** + * This interface abstracts out the underlying communication protocols for contacting Snowflake to + * obtain the various resource representations defined under "entities". Classes using this + * interface should minimize assumptions about whether an underlying client uses e.g. REST, JDBC or + * other underlying libraries/protocols. + */ +interface SnowflakeClient extends Closeable { + + /** Returns true if the database exists, false otherwise. */ + boolean databaseExists(SnowflakeIdentifier database); + + /** Returns true if the schema and its parent database exists, false otherwise. */ + boolean schemaExists(SnowflakeIdentifier schema); + + /** Lists all Snowflake databases within the currently configured account. */ + List listDatabases(); + + /** + * Lists all Snowflake schemas within a given scope. Returned SnowflakeIdentifiers must have + * type() == SnowflakeIdentifier.Type.SCHEMA. + * + * @param scope The scope in which to list, which may be ROOT or a single DATABASE. + */ + List listSchemas(SnowflakeIdentifier scope); + + /** + * Lists all Snowflake Iceberg tables within a given scope. Returned SnowflakeIdentifiers must + * have type() == SnowflakeIdentifier.Type.TABLE. + * + * @param scope The scope in which to list, which may be ROOT, a DATABASE, or a SCHEMA. + */ + List listIcebergTables(SnowflakeIdentifier scope); + + /** + * Returns Snowflake-level metadata containing locations to more detailed metadata. + * + * @param tableIdentifier The fully-qualified identifier that must be of type + * SnowflakeIdentifier.Type.TABLE. + */ + SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier tableIdentifier); +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java new file mode 100644 index 000000000000..3082b1d8e58a --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeIdentifier.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Since the SnowflakeCatalog supports exactly two levels of Iceberg Namespaces, corresponding + * directly to the "database" and "schema" portions of Snowflake's resource model, this class + * represents a pre-validated and structured representation of a fully-qualified Snowflake resource + * identifier. Snowflake-specific helper libraries should operate on this representation instead of + * directly operating on TableIdentifiers or Namespaces wherever possible to avoid duplication of + * parsing/validation logic for Iceberg TableIdentifier/Namespace levels. + */ +class SnowflakeIdentifier { + public enum Type { + ROOT, + DATABASE, + SCHEMA, + TABLE + } + + private final String databaseName; + private final String schemaName; + private final String tableName; + private final Type type; + + private SnowflakeIdentifier(String databaseName, String schemaName, String tableName, Type type) { + this.databaseName = databaseName; + this.schemaName = schemaName; + this.tableName = tableName; + this.type = type; + } + + public static SnowflakeIdentifier ofRoot() { + return new SnowflakeIdentifier(null, null, null, Type.ROOT); + } + + public static SnowflakeIdentifier ofDatabase(String databaseName) { + Preconditions.checkArgument(null != databaseName, "databaseName must be non-null"); + return new SnowflakeIdentifier(databaseName, null, null, Type.DATABASE); + } + + public static SnowflakeIdentifier ofSchema(String databaseName, String schemaName) { + Preconditions.checkArgument(null != databaseName, "databaseName must be non-null"); + Preconditions.checkArgument(null != schemaName, "schemaName must be non-null"); + return new SnowflakeIdentifier(databaseName, schemaName, null, Type.SCHEMA); + } + + public static SnowflakeIdentifier ofTable( + String databaseName, String schemaName, String tableName) { + Preconditions.checkArgument(null != databaseName, "databaseName must be non-null"); + Preconditions.checkArgument(null != schemaName, "schemaName must be non-null"); + Preconditions.checkArgument(null != tableName, "tableName must be non-null"); + return new SnowflakeIdentifier(databaseName, schemaName, tableName, Type.TABLE); + } + + /** + * If type is TABLE, expect non-null databaseName, schemaName, and tableName. If type is SCHEMA, + * expect non-null databaseName and schemaName. If type is DATABASE, expect non-null databaseName. + * If type is ROOT, expect all of databaseName, schemaName, and tableName to be null. + */ + public Type type() { + return type; + } + + public String tableName() { + return tableName; + } + + public String databaseName() { + return databaseName; + } + + public String schemaName() { + return schemaName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof SnowflakeIdentifier)) { + return false; + } + + SnowflakeIdentifier that = (SnowflakeIdentifier) o; + return Objects.equal(this.databaseName, that.databaseName) + && Objects.equal(this.schemaName, that.schemaName) + && Objects.equal(this.tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hashCode(databaseName, schemaName, tableName); + } + + /** Returns this identifier as a String suitable for use in a Snowflake IDENTIFIER param. */ + public String toIdentifierString() { + switch (type()) { + case TABLE: + return String.format("%s.%s.%s", databaseName, schemaName, tableName); + case SCHEMA: + return String.format("%s.%s", databaseName, schemaName); + case DATABASE: + return databaseName; + default: + return ""; + } + } + + @Override + public String toString() { + return String.format("%s: '%s'", type(), toIdentifierString()); + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java new file mode 100644 index 000000000000..c550b3e13a3a --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableMetadata.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +class SnowflakeTableMetadata { + public static final Pattern SNOWFLAKE_AZURE_PATTERN = + Pattern.compile("azure://([^/]+)/([^/]+)/(.*)"); + + private final String snowflakeMetadataLocation; + private final String icebergMetadataLocation; + private final String status; + + // Note: Since not all sources will necessarily come from a raw JSON representation, this raw + // JSON should only be considered a convenient debugging field. Equality of two + // SnowflakeTableMetadata instances should not depend on equality of this field. + private final String rawJsonVal; + + SnowflakeTableMetadata( + String snowflakeMetadataLocation, + String icebergMetadataLocation, + String status, + String rawJsonVal) { + this.snowflakeMetadataLocation = snowflakeMetadataLocation; + this.icebergMetadataLocation = icebergMetadataLocation; + this.status = status; + this.rawJsonVal = rawJsonVal; + } + + /** Storage location of table metadata in Snowflake's path syntax. */ + public String snowflakeMetadataLocation() { + return snowflakeMetadataLocation; + } + + /** Storage location of table metadata in Iceberg's path syntax. */ + public String icebergMetadataLocation() { + return icebergMetadataLocation; + } + + public String getStatus() { + return status; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof SnowflakeTableMetadata)) { + return false; + } + + // Only consider parsed fields, not the raw JSON that may or may not be the original source of + // this instance. + SnowflakeTableMetadata that = (SnowflakeTableMetadata) o; + return Objects.equal(this.snowflakeMetadataLocation, that.snowflakeMetadataLocation) + && Objects.equal(this.icebergMetadataLocation, that.icebergMetadataLocation) + && Objects.equal(this.status, that.status); + } + + @Override + public int hashCode() { + return Objects.hashCode(snowflakeMetadataLocation, icebergMetadataLocation, status); + } + + @Override + public String toString() { + return String.format( + "snowflakeMetadataLocation: '%s', icebergMetadataLocation: '%s', status: '%s'", + snowflakeMetadataLocation, icebergMetadataLocation, status); + } + + public String toDebugString() { + return String.format("%s, rawJsonVal: %s", toString(), rawJsonVal); + } + + /** + * Translates from Snowflake's path syntax to Iceberg's path syntax for paths matching known + * non-compatible Snowflake paths. Throws IllegalArgumentException if the prefix of the + * snowflakeLocation is a known non-compatible path syntax but fails to match the expected path + * components for a successful translation. + */ + public static String snowflakeLocationToIcebergLocation(String snowflakeLocation) { + if (snowflakeLocation.startsWith("azure://")) { + // Convert from expected path of the form: + // azure://account.blob.core.windows.net/container/volumepath + // to: + // wasbs://container@account.blob.core.windows.net/volumepath + Matcher matcher = SNOWFLAKE_AZURE_PATTERN.matcher(snowflakeLocation); + Preconditions.checkArgument( + matcher.matches(), + "Location '%s' failed to match pattern '%s'", + snowflakeLocation, + SNOWFLAKE_AZURE_PATTERN); + return String.format( + "wasbs://%s@%s/%s", matcher.group(2), matcher.group(1), matcher.group(3)); + } else if (snowflakeLocation.startsWith("gcs://")) { + // Convert from expected path of the form: + // gcs://bucket/path + // to: + // gs://bucket/path + return "gs" + snowflakeLocation.substring(3); + } + + return snowflakeLocation; + } + + /** + * Factory method for parsing a JSON string containing expected Snowflake table metadata into a + * SnowflakeTableMetadata object. + */ + public static SnowflakeTableMetadata parseJson(String json) { + JsonNode parsedVal; + try { + parsedVal = JsonUtil.mapper().readValue(json, JsonNode.class); + } catch (IOException ioe) { + throw new IllegalArgumentException(String.format("Malformed JSON: %s", json), ioe); + } + + String snowflakeMetadataLocation = JsonUtil.getString("metadataLocation", parsedVal); + String status = JsonUtil.getStringOrNull("status", parsedVal); + + String icebergMetadataLocation = snowflakeLocationToIcebergLocation(snowflakeMetadataLocation); + + return new SnowflakeTableMetadata( + snowflakeMetadataLocation, icebergMetadataLocation, status, json); + } +} diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java new file mode 100644 index 000000000000..1fe90d7eff42 --- /dev/null +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeTableOperations.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SnowflakeTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(SnowflakeTableOperations.class); + + private final FileIO fileIO; + private final TableIdentifier tableIdentifier; + private final SnowflakeIdentifier snowflakeIdentifierForTable; + private final String fullTableName; + + private final SnowflakeClient snowflakeClient; + + protected SnowflakeTableOperations( + SnowflakeClient snowflakeClient, + FileIO fileIO, + String catalogName, + TableIdentifier tableIdentifier) { + this.snowflakeClient = snowflakeClient; + this.fileIO = fileIO; + this.tableIdentifier = tableIdentifier; + this.snowflakeIdentifierForTable = NamespaceHelpers.toSnowflakeIdentifier(tableIdentifier); + this.fullTableName = String.format("%s.%s", catalogName, tableIdentifier); + } + + @Override + public void doRefresh() { + LOG.debug("Getting metadata location for table {}", tableIdentifier); + String location = loadTableMetadataLocation(); + Preconditions.checkState( + location != null && !location.isEmpty(), + "Got null or empty location %s for table %s", + location, + tableIdentifier); + refreshFromMetadataLocation(location); + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected String tableName() { + return fullTableName; + } + + @VisibleForTesting + String fullTableName() { + return tableName(); + } + + private String loadTableMetadataLocation() { + SnowflakeTableMetadata metadata = + snowflakeClient.loadTableMetadata(snowflakeIdentifierForTable); + + if (metadata == null) { + throw new NoSuchTableException("Cannot find table %s", snowflakeIdentifierForTable); + } + + if (!metadata.getStatus().equals("success")) { + LOG.warn( + "Got non-successful table metadata: {} with metadataLocation {} for table {}", + metadata.getStatus(), + metadata.icebergMetadataLocation(), + snowflakeIdentifierForTable); + } + + return metadata.icebergMetadataLocation(); + } +} diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java new file mode 100644 index 000000000000..834dc3c6c4db --- /dev/null +++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/FakeSnowflakeClient.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.jdbc.UncheckedSQLException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class FakeSnowflakeClient implements SnowflakeClient { + // In-memory lookup by database/schema/tableName to table metadata. + private final Map>> databases = + Maps.newTreeMap(); + private boolean closed = false; + + public FakeSnowflakeClient() {} + + /** + * Also adds parent database/schema if they don't already exist. If the tableName already exists + * under the given database/schema, the value is replaced with the provided metadata. + */ + public void addTable(SnowflakeIdentifier tableIdentifier, SnowflakeTableMetadata metadata) { + Preconditions.checkState(!closed, "Cannot call addTable after calling close()"); + if (!databases.containsKey(tableIdentifier.databaseName())) { + databases.put(tableIdentifier.databaseName(), Maps.newTreeMap()); + } + Map> schemas = + databases.get(tableIdentifier.databaseName()); + if (!schemas.containsKey(tableIdentifier.schemaName())) { + schemas.put(tableIdentifier.schemaName(), Maps.newTreeMap()); + } + Map tables = schemas.get(tableIdentifier.schemaName()); + tables.put(tableIdentifier.tableName(), metadata); + } + + @Override + public boolean databaseExists(SnowflakeIdentifier database) { + return databases.containsKey(database.databaseName()); + } + + @Override + public boolean schemaExists(SnowflakeIdentifier schema) { + return databases.containsKey(schema.databaseName()) + && databases.get(schema.databaseName()).containsKey(schema.schemaName()); + } + + @Override + public List listDatabases() { + Preconditions.checkState(!closed, "Cannot call listDatabases after calling close()"); + List databaseIdentifiers = Lists.newArrayList(); + for (String databaseName : databases.keySet()) { + databaseIdentifiers.add(SnowflakeIdentifier.ofDatabase(databaseName)); + } + return databaseIdentifiers; + } + + @Override + public List listSchemas(SnowflakeIdentifier scope) { + Preconditions.checkState(!closed, "Cannot call listSchemas after calling close()"); + List schemas = Lists.newArrayList(); + switch (scope.type()) { + case ROOT: + // "account-level" listing. + for (Map.Entry>> db : + databases.entrySet()) { + for (String schema : db.getValue().keySet()) { + schemas.add(SnowflakeIdentifier.ofSchema(db.getKey(), schema)); + } + } + break; + case DATABASE: + String dbName = scope.databaseName(); + if (databases.containsKey(dbName)) { + for (String schema : databases.get(dbName).keySet()) { + schemas.add(SnowflakeIdentifier.ofSchema(dbName, schema)); + } + } else { + throw new UncheckedSQLException("Object does not exist: database: '%s'", dbName); + } + break; + default: + throw new IllegalArgumentException( + String.format("Unsupported scope type for listSchemas: '%s'", scope)); + } + return schemas; + } + + @Override + public List listIcebergTables(SnowflakeIdentifier scope) { + Preconditions.checkState(!closed, "Cannot call listIcebergTables after calling close()"); + List tables = Lists.newArrayList(); + switch (scope.type()) { + case ROOT: + { + // "account-level" listing. + for (Map.Entry>> db : + databases.entrySet()) { + for (Map.Entry> schema : + db.getValue().entrySet()) { + for (String tableName : schema.getValue().keySet()) { + tables.add(SnowflakeIdentifier.ofTable(db.getKey(), schema.getKey(), tableName)); + } + } + } + break; + } + case DATABASE: + { + String dbName = scope.databaseName(); + if (databases.containsKey(dbName)) { + for (Map.Entry> schema : + databases.get(dbName).entrySet()) { + for (String tableName : schema.getValue().keySet()) { + tables.add(SnowflakeIdentifier.ofTable(dbName, schema.getKey(), tableName)); + } + } + } else { + throw new UncheckedSQLException("Object does not exist: database: '%s'", dbName); + } + break; + } + case SCHEMA: + { + String dbName = scope.databaseName(); + if (databases.containsKey(dbName)) { + String schemaName = scope.schemaName(); + if (databases.get(dbName).containsKey(schemaName)) { + for (String tableName : databases.get(dbName).get(schemaName).keySet()) { + tables.add(SnowflakeIdentifier.ofTable(dbName, schemaName, tableName)); + } + } else { + throw new UncheckedSQLException( + "Object does not exist: database.schema: '%s.%s'", dbName, schemaName); + } + } else { + throw new UncheckedSQLException("Object does not exist: database: '%s'", dbName); + } + break; + } + default: + throw new IllegalArgumentException( + String.format("Unsupported scope type for listing tables: %s", scope)); + } + return tables; + } + + @Override + public SnowflakeTableMetadata loadTableMetadata(SnowflakeIdentifier tableIdentifier) { + Preconditions.checkState(!closed, "Cannot call getTableMetadata after calling close()"); + + Preconditions.checkArgument( + tableIdentifier.type() == SnowflakeIdentifier.Type.TABLE, + "tableIdentifier must be type TABLE, get: %s", + tableIdentifier); + String dbName = tableIdentifier.databaseName(); + String schemaName = tableIdentifier.schemaName(); + if (!databases.containsKey(dbName) + || !databases.get(dbName).containsKey(schemaName) + || !databases.get(dbName).get(schemaName).containsKey(tableIdentifier.tableName())) { + throw new UncheckedSQLException("Object does not exist: object: '%s'", tableIdentifier); + } + return databases.get(dbName).get(schemaName).get(tableIdentifier.tableName()); + } + + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + closed = true; + } +} diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java new file mode 100644 index 000000000000..1374ad8ac283 --- /dev/null +++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/JdbcSnowflakeClientTest.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.iceberg.jdbc.UncheckedInterruptedException; +import org.apache.iceberg.jdbc.UncheckedSQLException; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class JdbcSnowflakeClientTest { + @Mock private Connection mockConnection; + @Mock private JdbcClientPool mockClientPool; + @Mock private JdbcSnowflakeClient.QueryHarness mockQueryHarness; + @Mock private ResultSet mockResultSet; + + private JdbcSnowflakeClient snowflakeClient; + + @Before + public void before() throws SQLException, InterruptedException { + snowflakeClient = new JdbcSnowflakeClient(mockClientPool); + snowflakeClient.setQueryHarness(mockQueryHarness); + + doAnswer(invocation -> ((ClientPool.Action) invocation.getArguments()[0]).run(mockConnection)) + .when(mockClientPool) + .run(any(ClientPool.Action.class)); + doAnswer( + invocation -> + ((JdbcSnowflakeClient.ResultSetParser) invocation.getArguments()[2]) + .parse(mockResultSet)) + .when(mockQueryHarness) + .query( + any(Connection.class), + any(String.class), + any(JdbcSnowflakeClient.ResultSetParser.class), + ArgumentMatchers.any()); + } + + @Test + public void testNullClientPoolInConstructor() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> new JdbcSnowflakeClient(null)) + .withMessageContaining("JdbcClientPool must be non-null"); + } + + @Test + public void testDatabaseExists() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DB_1"); + + Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1"))) + .isTrue(); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"), + any(JdbcSnowflakeClient.ResultSetParser.class)); + } + + @Test + public void testDatabaseExistsSpecialCharacters() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("$DB_1$.'!@#%^&*"); + + Assertions.assertThat( + snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("$DB_1$.'!@#%^&*"))) + .isTrue(); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW DATABASES LIKE '_DB_1$_________' IN ACCOUNT"), + any(JdbcSnowflakeClient.ResultSetParser.class)); + } + + @Test + public void testDatabaseDoesntExistNoResults() throws SQLException { + when(mockResultSet.next()).thenReturn(false); + + Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1"))) + .isFalse(); + } + + @Test + public void testDatabaseDoesntExistMismatchedResults() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DBZ1"); + + Assertions.assertThat(snowflakeClient.databaseExists(SnowflakeIdentifier.ofDatabase("DB_1"))) + .isFalse(); + } + + @Test + public void testSchemaExists() throws SQLException { + when(mockResultSet.next()) + .thenReturn(true) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("SCHEMA_1"); + when(mockResultSet.getString("database_name")).thenReturn("DB_1"); + + Assertions.assertThat( + snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"))) + .isTrue(); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"), + any(JdbcSnowflakeClient.ResultSetParser.class)); + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW SCHEMAS LIKE 'SCHEMA_1' IN DATABASE IDENTIFIER(?)"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1")); + } + + @Test + public void testSchemaExistsSpecialCharacters() throws SQLException { + when(mockResultSet.next()) + .thenReturn(true) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("$SCHEMA_1$.'!@#%^&*"); + when(mockResultSet.getString("database_name")).thenReturn("DB_1"); + + Assertions.assertThat( + snowflakeClient.schemaExists( + SnowflakeIdentifier.ofSchema("DB_1", "$SCHEMA_1$.'!@#%^&*"))) + .isTrue(); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW DATABASES LIKE 'DB_1' IN ACCOUNT"), + any(JdbcSnowflakeClient.ResultSetParser.class)); + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW SCHEMAS LIKE '_SCHEMA_1$_________' IN DATABASE IDENTIFIER(?)"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1")); + } + + @Test + public void testSchemaDoesntExistMismatchDatabase() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DBZ1"); + + Assertions.assertThat( + snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"))) + .isFalse(); + } + + @Test + public void testSchemaDoesntExistNoSchemaFound() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(false).thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DB_1"); + + Assertions.assertThat( + snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"))) + .isFalse(); + } + + @Test + public void testSchemaDoesntExistSchemaMismatch() throws SQLException { + when(mockResultSet.next()) + .thenReturn(true) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("SCHEMAZ1"); + when(mockResultSet.getString("database_name")).thenReturn("DB_1"); + + Assertions.assertThat( + snowflakeClient.schemaExists(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"))) + .isFalse(); + } + + @Test + public void testListDatabasesInAccount() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("name")).thenReturn("DB_1").thenReturn("DB_2").thenReturn("DB_3"); + + List actualList = snowflakeClient.listDatabases(); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW DATABASES IN ACCOUNT"), + any(JdbcSnowflakeClient.ResultSetParser.class)); + + Assertions.assertThat(actualList) + .containsExactly( + SnowflakeIdentifier.ofDatabase("DB_1"), + SnowflakeIdentifier.ofDatabase("DB_2"), + SnowflakeIdentifier.ofDatabase("DB_3")); + } + + /** + * For the root scope, expect an underlying query to list schemas at the ACCOUNT level with no + * query parameters. + */ + @Test + public void testListSchemasInAccount() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("database_name")) + .thenReturn("DB_1") + .thenReturn("DB_1") + .thenReturn("DB_2"); + when(mockResultSet.getString("name")) + .thenReturn("SCHEMA_1") + .thenReturn("SCHEMA_2") + .thenReturn("SCHEMA_3"); + + List actualList = + snowflakeClient.listSchemas(SnowflakeIdentifier.ofRoot()); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW SCHEMAS IN ACCOUNT"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq(null)); + + Assertions.assertThat(actualList) + .containsExactly( + SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"), + SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2"), + SnowflakeIdentifier.ofSchema("DB_2", "SCHEMA_3")); + } + + /** + * For a DATABASE scope, expect an underlying query to list schemas at the DATABASE level and + * supply the database as a query param in an IDENTIFIER. + */ + @Test + public void testListSchemasInDatabase() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("database_name")).thenReturn("DB_1").thenReturn("DB_1"); + when(mockResultSet.getString("name")).thenReturn("SCHEMA_1").thenReturn("SCHEMA_2"); + + List actualList = + snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1")); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW SCHEMAS IN DATABASE IDENTIFIER(?)"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1")); + + Assertions.assertThat(actualList) + .containsExactly( + SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1"), + SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_2")); + } + + /** + * Any unexpected SQLException from the underlying connection will propagate out as an + * UncheckedSQLException when listing schemas. + */ + @Test + public void testListSchemasSQLException() throws SQLException, InterruptedException { + when(mockClientPool.run(any(ClientPool.Action.class))) + .thenThrow(new SQLException("Fake SQL exception")); + Assertions.assertThatExceptionOfType(UncheckedSQLException.class) + .isThrownBy(() -> snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1"))) + .withStackTraceContaining("Fake SQL exception"); + } + + /** + * Any unexpected InterruptedException from the underlying connection will propagate out as an + * UncheckedInterruptedException when listing schemas. + */ + @Test + public void testListSchemasInterruptedException() throws SQLException, InterruptedException { + when(mockClientPool.run(any(ClientPool.Action.class))) + .thenThrow(new InterruptedException("Fake interrupted exception")); + Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class) + .isThrownBy(() -> snowflakeClient.listSchemas(SnowflakeIdentifier.ofDatabase("DB_1"))) + .withStackTraceContaining("Fake interrupted exception"); + } + + /** + * For the root/empty scope, expect an underlying query to list tables at the ACCOUNT level with + * no query parameters. + */ + @Test + public void testListIcebergTablesInAccount() throws SQLException { + when(mockResultSet.next()) + .thenReturn(true) + .thenReturn(true) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false); + when(mockResultSet.getString("database_name")) + .thenReturn("DB_1") + .thenReturn("DB_1") + .thenReturn("DB_1") + .thenReturn("DB_2"); + when(mockResultSet.getString("schema_name")) + .thenReturn("SCHEMA_1") + .thenReturn("SCHEMA_1") + .thenReturn("SCHEMA_2") + .thenReturn("SCHEMA_3"); + when(mockResultSet.getString("name")) + .thenReturn("TABLE_1") + .thenReturn("TABLE_2") + .thenReturn("TABLE_3") + .thenReturn("TABLE_4"); + + List actualList = + snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofRoot()); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW ICEBERG TABLES IN ACCOUNT"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq(null)); + + Assertions.assertThat(actualList) + .containsExactly( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"), + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"), + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_2", "TABLE_3"), + SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_3", "TABLE_4")); + } + + /** + * For a DATABASE scope, expect an underlying query to list tables at the DATABASE level and + * supply the database as a query param in an IDENTIFIER. + */ + @Test + public void testListIcebergTablesInDatabase() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("database_name")) + .thenReturn("DB_1") + .thenReturn("DB_1") + .thenReturn("DB_1"); + when(mockResultSet.getString("schema_name")) + .thenReturn("SCHEMA_1") + .thenReturn("SCHEMA_1") + .thenReturn("SCHEMA_2"); + when(mockResultSet.getString("name")) + .thenReturn("TABLE_1") + .thenReturn("TABLE_2") + .thenReturn("TABLE_3"); + + List actualList = + snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1")); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW ICEBERG TABLES IN DATABASE IDENTIFIER(?)"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1")); + + Assertions.assertThat(actualList) + .containsExactly( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"), + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2"), + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_2", "TABLE_3")); + } + + /** + * For a SCHEMA scope, expect an underlying query to list tables at the SCHEMA level and supply + * the schema as a query param in an IDENTIFIER. + */ + @Test + public void testListIcebergTablesInSchema() throws SQLException { + when(mockResultSet.next()).thenReturn(true).thenReturn(true).thenReturn(false); + when(mockResultSet.getString("database_name")).thenReturn("DB_1").thenReturn("DB_1"); + when(mockResultSet.getString("schema_name")).thenReturn("SCHEMA_1").thenReturn("SCHEMA_1"); + when(mockResultSet.getString("name")).thenReturn("TABLE_1").thenReturn("TABLE_2"); + + List actualList = + snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofSchema("DB_1", "SCHEMA_1")); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SHOW ICEBERG TABLES IN SCHEMA IDENTIFIER(?)"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1.SCHEMA_1")); + + Assertions.assertThat(actualList) + .containsExactly( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"), + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_2")); + } + + /** + * Any unexpected SQLException from the underlying connection will propagate out as an + * UncheckedSQLException when listing tables. + */ + @Test + public void testListIcebergTablesSQLException() throws SQLException, InterruptedException { + when(mockClientPool.run(any(ClientPool.Action.class))) + .thenThrow(new SQLException("Fake SQL exception")); + Assertions.assertThatExceptionOfType(UncheckedSQLException.class) + .isThrownBy(() -> snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1"))) + .withStackTraceContaining("Fake SQL exception"); + } + + /** + * Any unexpected InterruptedException from the underlying connection will propagate out as an + * UncheckedInterruptedException when listing tables. + */ + @Test + public void testListIcebergTablesInterruptedException() + throws SQLException, InterruptedException { + when(mockClientPool.run(any(ClientPool.Action.class))) + .thenThrow(new InterruptedException("Fake interrupted exception")); + Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class) + .isThrownBy(() -> snowflakeClient.listIcebergTables(SnowflakeIdentifier.ofDatabase("DB_1"))) + .withStackTraceContaining("Fake interrupted exception"); + } + + /** + * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION call, with the S3 path + * unaltered between snowflake/iceberg path representations. + */ + @Test + public void testGetS3TableMetadata() throws SQLException { + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getString("METADATA")) + .thenReturn( + "{\"metadataLocation\":\"s3://tab1/metadata/v3.metadata.json\",\"status\":\"success\"}"); + + SnowflakeTableMetadata actualMetadata = + snowflakeClient.loadTableMetadata( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1")); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1.SCHEMA_1.TABLE_1")); + + SnowflakeTableMetadata expectedMetadata = + new SnowflakeTableMetadata( + "s3://tab1/metadata/v3.metadata.json", + "s3://tab1/metadata/v3.metadata.json", + "success", + null); + Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata); + } + + /** + * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION call, with the Azure + * path translated from an azure:// format to a wasbs:// format. + */ + @Test + public void testGetAzureTableMetadata() throws SQLException { + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getString("METADATA")) + .thenReturn( + "{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json\",\"status\":\"success\"}"); + + SnowflakeTableMetadata actualMetadata = + snowflakeClient.loadTableMetadata( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1")); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1.SCHEMA_1.TABLE_1")); + + SnowflakeTableMetadata expectedMetadata = + new SnowflakeTableMetadata( + "azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json", + "wasbs://mycontainer@myaccount.blob.core.windows.net/tab3/metadata/v334.metadata.json", + "success", + null); + Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata); + } + + /** + * Test parsing of table metadata JSON from a GET_ICEBERG_TABLE_INFORMATION call, with the GCS + * path translated from a gcs:// format to a gs:// format. + */ + @Test + public void testGetGcsTableMetadata() throws SQLException { + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getString("METADATA")) + .thenReturn( + "{\"metadataLocation\":\"gcs://tab5/metadata/v793.metadata.json\",\"status\":\"success\"}"); + + SnowflakeTableMetadata actualMetadata = + snowflakeClient.loadTableMetadata( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1")); + + verify(mockQueryHarness) + .query( + eq(mockConnection), + eq("SELECT SYSTEM$GET_ICEBERG_TABLE_INFORMATION(?) AS METADATA"), + any(JdbcSnowflakeClient.ResultSetParser.class), + eq("DB_1.SCHEMA_1.TABLE_1")); + + SnowflakeTableMetadata expectedMetadata = + new SnowflakeTableMetadata( + "gcs://tab5/metadata/v793.metadata.json", + "gs://tab5/metadata/v793.metadata.json", + "success", + null); + Assertions.assertThat(actualMetadata).isEqualTo(expectedMetadata); + } + + /** Malformed JSON from a ResultSet should propagate as an IllegalArgumentException. */ + @Test + public void testGetTableMetadataMalformedJson() throws SQLException { + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getString("METADATA")).thenReturn("{\"malformed_no_closing_bracket"); + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + snowflakeClient.loadTableMetadata( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"))) + .withMessageContaining("{\"malformed_no_closing_bracket"); + } + + /** + * Any unexpected SQLException from the underlying connection will propagate out as an + * UncheckedSQLException when getting table metadata. + */ + @Test + public void testGetTableMetadataSQLException() throws SQLException, InterruptedException { + when(mockClientPool.run(any(ClientPool.Action.class))) + .thenThrow(new SQLException("Fake SQL exception")); + Assertions.assertThatExceptionOfType(UncheckedSQLException.class) + .isThrownBy( + () -> + snowflakeClient.loadTableMetadata( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"))) + .withStackTraceContaining("Fake SQL exception"); + } + + /** + * Any unexpected InterruptedException from the underlying connection will propagate out as an + * UncheckedInterruptedException when getting table metadata. + */ + @Test + public void testGetTableMetadataInterruptedException() throws SQLException, InterruptedException { + when(mockClientPool.run(any(ClientPool.Action.class))) + .thenThrow(new InterruptedException("Fake interrupted exception")); + Assertions.assertThatExceptionOfType(UncheckedInterruptedException.class) + .isThrownBy( + () -> + snowflakeClient.loadTableMetadata( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TABLE_1"))) + .withStackTraceContaining("Fake interrupted exception"); + } + + /** Calling close() propagates to closing underlying client pool. */ + @Test + public void testClose() { + snowflakeClient.close(); + verify(mockClientPool).close(); + } +} diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java new file mode 100644 index 000000000000..2dd7fb6ec9af --- /dev/null +++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/NamespaceHelpersTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class NamespaceHelpersTest { + @Test + public void testRoundTripRoot() { + Namespace icebergNamespace = Namespace.empty(); + SnowflakeIdentifier snowflakeIdentifier = + NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace); + Assertions.assertThat(snowflakeIdentifier).isEqualTo(SnowflakeIdentifier.ofRoot()); + Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier)) + .isEqualTo(icebergNamespace); + } + + @Test + public void testRoundTripDatabase() { + Namespace icebergNamespace = Namespace.of("DB1"); + SnowflakeIdentifier snowflakeIdentifier = + NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace); + Assertions.assertThat(snowflakeIdentifier).isEqualTo(SnowflakeIdentifier.ofDatabase("DB1")); + Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier)) + .isEqualTo(icebergNamespace); + } + + @Test + public void testRoundTripSchema() { + Namespace icebergNamespace = Namespace.of("DB1", "SCHEMA1"); + SnowflakeIdentifier snowflakeIdentifier = + NamespaceHelpers.toSnowflakeIdentifier(icebergNamespace); + Assertions.assertThat(snowflakeIdentifier) + .isEqualTo(SnowflakeIdentifier.ofSchema("DB1", "SCHEMA1")); + Assertions.assertThat(NamespaceHelpers.toIcebergNamespace(snowflakeIdentifier)) + .isEqualTo(icebergNamespace); + } + + @Test + public void testRoundTripTable() { + TableIdentifier icebergTable = TableIdentifier.of("DB1", "SCHEMA1", "TABLE1"); + SnowflakeIdentifier snowflakeIdentifier = NamespaceHelpers.toSnowflakeIdentifier(icebergTable); + Assertions.assertThat(snowflakeIdentifier) + .isEqualTo(SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1")); + Assertions.assertThat(NamespaceHelpers.toIcebergTableIdentifier(snowflakeIdentifier)) + .isEqualTo(icebergTable); + } + + @Test + public void testToSnowflakeIdentifierMaxNamespaceLevel() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + NamespaceHelpers.toSnowflakeIdentifier( + Namespace.of("DB1", "SCHEMA1", "THIRD_NS_LVL"))) + .withMessageContaining("max namespace level"); + } + + @Test + public void testToSnowflakeIdentifierTableBadNamespace() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + NamespaceHelpers.toSnowflakeIdentifier( + TableIdentifier.of(Namespace.of("DB1_WITHOUT_SCHEMA"), "TABLE1"))) + .withMessageContaining("must be at the SCHEMA level"); + } + + @Test + public void testToIcebergNamespaceTableFails() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + NamespaceHelpers.toIcebergNamespace( + SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1"))) + .withMessageContaining("Cannot convert identifier"); + } + + @Test + public void testToIcebergTableIdentifier() { + Assertions.assertThat( + NamespaceHelpers.toIcebergTableIdentifier( + SnowflakeIdentifier.ofTable("DB1", "SCHEMA1", "TABLE1"))) + .isEqualTo(TableIdentifier.of("DB1", "SCHEMA1", "TABLE1")); + } + + @Test + public void testToIcebergTableIdentifierWrongType() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + NamespaceHelpers.toIcebergTableIdentifier( + SnowflakeIdentifier.ofSchema("DB1", "SCHEMA1"))) + .withMessageContaining("must be type TABLE"); + } +} diff --git a/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java new file mode 100644 index 000000000000..9f66f352e8c1 --- /dev/null +++ b/snowflake/src/test/java/org/apache/iceberg/snowflake/SnowflakeCatalogTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.snowflake; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InMemoryFileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; + +public class SnowflakeCatalogTest { + + private static final String TEST_CATALOG_NAME = "slushLog"; + private SnowflakeCatalog catalog; + private FakeSnowflakeClient fakeClient; + private InMemoryFileIO fakeFileIO; + private SnowflakeCatalog.FileIOFactory fakeFileIOFactory; + private Map properties; + + @Before + public void before() { + catalog = new SnowflakeCatalog(); + + fakeClient = new FakeSnowflakeClient(); + fakeClient.addTable( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TAB_1"), + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"s3://tab1/metadata/v3.metadata.json\",\"status\":\"success\"}")); + fakeClient.addTable( + SnowflakeIdentifier.ofTable("DB_1", "SCHEMA_1", "TAB_2"), + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"s3://tab2/metadata/v1.metadata.json\",\"status\":\"success\"}")); + fakeClient.addTable( + SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_2", "TAB_3"), + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab3/metadata/v334.metadata.json\",\"status\":\"success\"}")); + fakeClient.addTable( + SnowflakeIdentifier.ofTable("DB_2", "SCHEMA_2", "TAB_4"), + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"azure://myaccount.blob.core.windows.net/mycontainer/tab4/metadata/v323.metadata.json\",\"status\":\"success\"}")); + fakeClient.addTable( + SnowflakeIdentifier.ofTable("DB_3", "SCHEMA_3", "TAB_5"), + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"gcs://tab5/metadata/v793.metadata.json\",\"status\":\"success\"}")); + fakeClient.addTable( + SnowflakeIdentifier.ofTable("DB_3", "SCHEMA_4", "TAB_6"), + SnowflakeTableMetadata.parseJson( + "{\"metadataLocation\":\"gcs://tab6/metadata/v123.metadata.json\",\"status\":\"success\"}")); + + fakeFileIO = new InMemoryFileIO(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "x", Types.StringType.get(), "comment1"), + Types.NestedField.required(2, "y", Types.StringType.get(), "comment2")); + PartitionSpec partitionSpec = + PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build(); + fakeFileIO.addFile( + "s3://tab1/metadata/v3.metadata.json", + TableMetadataParser.toJson( + TableMetadata.newTableMetadata( + schema, partitionSpec, "s3://tab1/", ImmutableMap.of())) + .getBytes()); + fakeFileIO.addFile( + "wasbs://mycontainer@myaccount.blob.core.windows.net/tab3/metadata/v334.metadata.json", + TableMetadataParser.toJson( + TableMetadata.newTableMetadata( + schema, + partitionSpec, + "wasbs://mycontainer@myaccount.blob.core.windows.net/tab1/", + ImmutableMap.of())) + .getBytes()); + fakeFileIO.addFile( + "gs://tab5/metadata/v793.metadata.json", + TableMetadataParser.toJson( + TableMetadata.newTableMetadata( + schema, partitionSpec, "gs://tab5/", ImmutableMap.of())) + .getBytes()); + + fakeFileIOFactory = + new SnowflakeCatalog.FileIOFactory() { + @Override + public FileIO newFileIO(String impl, Map prop, Object hadoopConf) { + return fakeFileIO; + } + }; + + properties = Maps.newHashMap(); + catalog.initialize(TEST_CATALOG_NAME, fakeClient, fakeFileIOFactory, properties); + } + + @Test + public void testInitializeNullClient() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> catalog.initialize(TEST_CATALOG_NAME, null, fakeFileIOFactory, properties)) + .withMessageContaining("snowflakeClient must be non-null"); + } + + @Test + public void testInitializeNullFileIO() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> catalog.initialize(TEST_CATALOG_NAME, fakeClient, null, properties)) + .withMessageContaining("fileIOFactory must be non-null"); + } + + @Test + public void testListNamespaceInRoot() { + Assertions.assertThat(catalog.listNamespaces()) + .containsExactly(Namespace.of("DB_1"), Namespace.of("DB_2"), Namespace.of("DB_3")); + } + + @Test + public void testListNamespaceWithinDB() { + String dbName = "DB_1"; + Assertions.assertThat(catalog.listNamespaces(Namespace.of(dbName))) + .containsExactly(Namespace.of(dbName, "SCHEMA_1")); + } + + @Test + public void testListNamespaceWithinNonExistentDB() { + // Existence check for nonexistent parent namespaces is optional in the SupportsNamespaces + // interface. + String dbName = "NONEXISTENT_DB"; + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> catalog.listNamespaces(Namespace.of(dbName))) + .withMessageContaining("does not exist") + .withMessageContaining(dbName); + } + + @Test + public void testListNamespaceWithinSchema() { + // No "sub-namespaces" beyond database.schema; invalid to try to list namespaces given + // a database.schema. + String dbName = "DB_3"; + String schemaName = "SCHEMA_4"; + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> catalog.listNamespaces(Namespace.of(dbName, schemaName))) + .withMessageContaining("level") + .withMessageContaining("DB_3.SCHEMA_4"); + } + + @Test + public void testListTables() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> catalog.listTables(Namespace.empty())) + .withMessageContaining("listTables must be at SCHEMA level"); + } + + @Test + public void testListTablesWithinDB() { + String dbName = "DB_1"; + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> catalog.listTables(Namespace.of(dbName))) + .withMessageContaining("listTables must be at SCHEMA level"); + } + + @Test + public void testListTablesWithinNonexistentDB() { + String dbName = "NONEXISTENT_DB"; + String schemaName = "NONEXISTENT_SCHEMA"; + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> catalog.listTables(Namespace.of(dbName, schemaName))) + .withMessageContaining("does not exist") + .withMessageContaining(dbName); + } + + @Test + public void testListTablesWithinSchema() { + String dbName = "DB_2"; + String schemaName = "SCHEMA_2"; + Assertions.assertThat(catalog.listTables(Namespace.of(dbName, schemaName))) + .containsExactly( + TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_3"), + TableIdentifier.of("DB_2", "SCHEMA_2", "TAB_4")); + } + + @Test + public void testListTablesWithinNonexistentSchema() { + String dbName = "DB_2"; + String schemaName = "NONEXISTENT_SCHEMA"; + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> catalog.listTables(Namespace.of(dbName, schemaName))) + .withMessageContaining("does not exist") + .withMessageContaining("DB_2.NONEXISTENT_SCHEMA"); + } + + @Test + public void testLoadS3Table() { + Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_1", "SCHEMA_1"), "TAB_1")); + Assertions.assertThat(table.location()).isEqualTo("s3://tab1/"); + } + + @Test + public void testLoadAzureTable() { + Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_2", "SCHEMA_2"), "TAB_3")); + Assertions.assertThat(table.location()) + .isEqualTo("wasbs://mycontainer@myaccount.blob.core.windows.net/tab1/"); + } + + @Test + public void testLoadGcsTable() { + Table table = catalog.loadTable(TableIdentifier.of(Namespace.of("DB_3", "SCHEMA_3"), "TAB_5")); + Assertions.assertThat(table.location()).isEqualTo("gs://tab5/"); + } + + @Test + public void testLoadTableWithMalformedTableIdentifier() { + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> + catalog.loadTable( + TableIdentifier.of(Namespace.of("DB_1", "SCHEMA_1", "BAD_NS_LEVEL"), "TAB_1"))) + .withMessageContaining("level") + .withMessageContaining("DB_1.SCHEMA_1.BAD_NS_LEVEL"); + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy( + () -> catalog.loadTable(TableIdentifier.of(Namespace.of("DB_WITHOUT_SCHEMA"), "TAB_1"))) + .withMessageContaining("level") + .withMessageContaining("DB_WITHOUT_SCHEMA.TAB_1"); + } + + @Test + public void testCloseBeforeInitializeDoesntThrow() throws IOException { + catalog = new SnowflakeCatalog(); + + // Make sure no exception is thrown if we call close() before initialize(), in case callers + // add a catalog to auto-close() helpers but end up never using/initializing a catalog. + catalog.close(); + + Assertions.assertThat(fakeClient.isClosed()) + .overridingErrorMessage("expected not to have called close() on snowflakeClient") + .isFalse(); + } + + @Test + public void testClose() throws IOException { + catalog.newTableOps(TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_1")); + catalog.close(); + Assertions.assertThat(fakeClient.isClosed()) + .overridingErrorMessage("expected close() to propagate to snowflakeClient") + .isTrue(); + Assertions.assertThat(fakeFileIO.isClosed()) + .overridingErrorMessage("expected close() to propagate to fileIO") + .isTrue(); + } + + @Test + public void testTableNameFromTableOperations() { + SnowflakeTableOperations castedTableOps = + (SnowflakeTableOperations) + catalog.newTableOps(TableIdentifier.of("DB_1", "SCHEMA_1", "TAB_1")); + Assertions.assertThat(castedTableOps.fullTableName()).isEqualTo("slushLog.DB_1.SCHEMA_1.TAB_1"); + } + + @Test + public void testDatabaseExists() { + Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1"))).isTrue(); + Assertions.assertThat(catalog.namespaceExists(Namespace.of("NONEXISTENT_DB"))).isFalse(); + } + + @Test + public void testSchemaExists() { + Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1", "SCHEMA_1"))).isTrue(); + Assertions.assertThat(catalog.namespaceExists(Namespace.of("DB_1", "NONEXISTENT_SCHEMA"))) + .isFalse(); + Assertions.assertThat(catalog.namespaceExists(Namespace.of("NONEXISTENT_DB", "SCHEMA_1"))) + .isFalse(); + } +} diff --git a/spark/v3.1/build.gradle b/spark/v3.1/build.gradle index eca34afcbd02..c7861d36e555 100644 --- a/spark/v3.1/build.gradle +++ b/spark/v3.1/build.gradle @@ -213,6 +213,9 @@ project(':iceberg-spark:iceberg-spark-runtime-3.1_2.12') { implementation(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation (project(':iceberg-snowflake')) { + exclude group: 'net.snowflake' , module: 'snowflake-jdbc' + } integrationImplementation "org.apache.spark:spark-hive_2.12:${sparkVersion}" integrationImplementation 'org.junit.vintage:junit-vintage-engine' diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index 62a51ae494c8..c7bef9928da3 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -224,6 +224,9 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio implementation(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation (project(':iceberg-snowflake')) { + exclude group: 'net.snowflake' , module: 'snowflake-jdbc' + } integrationImplementation "org.scala-lang.modules:scala-collection-compat_${scalaVersion}" integrationImplementation "org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}" diff --git a/spark/v3.3/build.gradle b/spark/v3.3/build.gradle index 56a6fe7f51ea..3f7cf71b7482 100644 --- a/spark/v3.3/build.gradle +++ b/spark/v3.3/build.gradle @@ -216,6 +216,9 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio implementation(project(':iceberg-nessie')) { exclude group: 'com.google.code.findbugs', module: 'jsr305' } + implementation (project(':iceberg-snowflake')) { + exclude group: 'net.snowflake' , module: 'snowflake-jdbc' + } integrationImplementation "org.scala-lang.modules:scala-collection-compat_${scalaVersion}" integrationImplementation "org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}" diff --git a/versions.props b/versions.props index d007ff43ac5a..670487b9974a 100644 --- a/versions.props +++ b/versions.props @@ -28,6 +28,7 @@ org.scala-lang.modules:scala-collection-compat_2.12 = 2.6.0 org.scala-lang.modules:scala-collection-compat_2.13 = 2.6.0 com.emc.ecs:object-client-bundle = 3.3.2 org.immutables:value = 2.9.2 +net.snowflake:snowflake-jdbc = 3.13.22 # test deps org.junit.vintage:junit-vintage-engine = 5.8.2