Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ ALIYUN:
GCP:
- gcp/**/*
DELL:
- dell/**/*
- dell/**/*
SNOWFLAKE:
- snowflake/**/*
17 changes: 17 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,23 @@ project(':iceberg-dell') {
}
}

project(':iceberg-snowflake') {
dependencies {
implementation project(':iceberg-core')
implementation project(':iceberg-common')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
implementation project(':iceberg-aws')
implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.fasterxml.jackson.core:jackson-core"
implementation "commons-dbutils:commons-dbutils:1.7"

runtimeOnly("net.snowflake:snowflake-jdbc:3.13.22")

testImplementation "org.xerial:sqlite-jdbc"
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
}
}

@Memoized
boolean versionFileExists() {
return file('version.txt').exists()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ClientPoolImpl;

class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {

private final String dbUrl;
private final Map<String, String> properties;

JdbcClientPool(String dbUrl, Map<String, String> props) {
public JdbcClientPool(String dbUrl, Map<String, String> props) {
this(
Integer.parseInt(
props.getOrDefault(
Expand All @@ -42,7 +42,7 @@ class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
props);
}

JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
super(poolSize, SQLNonTransientConnectionException.class, true);
properties = props;
this.dbUrl = dbUrl;
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ include 'hive-metastore'
include 'nessie'
include 'gcp'
include 'dell'
include 'snowflake'

project(':api').name = 'iceberg-api'
project(':common').name = 'iceberg-common'
Expand All @@ -51,6 +52,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore'
project(':nessie').name = 'iceberg-nessie'
project(':gcp').name = 'iceberg-gcp'
project(':dell').name = 'iceberg-dell'
project(':snowflake').name = 'iceberg-snowflake'

List<String> knownFlinkVersions = System.getProperty("knownFlinkVersions").split(",")
String flinkVersionsString = System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.snowflake;

import java.io.Closeable;
import java.util.List;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.snowflake.entities.SnowflakeSchema;
import org.apache.iceberg.snowflake.entities.SnowflakeTable;
import org.apache.iceberg.snowflake.entities.SnowflakeTableMetadata;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need javadoc comments on all the classes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for classes that can benefit from explanation

public interface QueryFactory extends Closeable {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming-wise, I'd normally expected a QueryFactory to produce queries, rather than execute queries. Maybe better to name this class SnowflakeClient or something like that if the intent is to wrap JDBC and/or other API clients under a more constrained/usable interface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

List<SnowflakeSchema> listSchemas(Namespace namespace);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to use this as an abstraction layer allowing us to easily swap out the bottom connectivity layer into Snowflake APIs, we should make the input argument also use Snowflake concepts, so that the business logic of translating an Iceberg Namespace into a Snowflake Database/Schema is common across implementations, while the way a list of tables is fetched given a db.schema can vary based on underlying implementations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's actually a fair amount of JDBC-specific logic in translating the namespace into the right predicate or identifier in the SQL statement, I'll wait until we add another lower-level client impl before extracting the namespace-conversion out, and for now just focus on refacting within JdbcSnowflakeClient to reduce duplication of boilerplate.


List<SnowflakeTable> listIcebergTables(Namespace namespace);

SnowflakeTableMetadata getTableMetadata(TableIdentifier tableIdentifier);

void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.snowflake;

import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.jdbc.JdbcClientPool;
import org.apache.iceberg.jdbc.UncheckedInterruptedException;
import org.apache.iceberg.jdbc.UncheckedSQLException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.snowflake.entities.SnowflakeSchema;
import org.apache.iceberg.snowflake.entities.SnowflakeTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeCatalog extends BaseMetastoreCatalog
implements Closeable, SupportsNamespaces, Configurable<Object> {

private static final Logger LOG = LoggerFactory.getLogger(SnowflakeCatalog.class);

private Object conf;
private String catalogName = SnowflakeResources.DEFAULT_CATALOG_NAME;
private Map<String, String> catalogProperties = null;
private FileIO fileIO;
private QueryFactory queryFactory;

public SnowflakeCatalog() {}

public void setQueryFactory(QueryFactory factory) {
queryFactory = factory;
}

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
Preconditions.checkArgument(
namespace.length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH,
"Snowflake doesn't supports more than 2 levels of namespace");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Snowflake doesn't support more than {} namespace levels", SnowflakeResources.MAX_NAMESPACE_DEPTH

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


List<SnowflakeTable> sfTables = queryFactory.listIcebergTables(namespace);

return sfTables.stream()
.map(
table ->
TableIdentifier.of(table.getDatabase(), table.getSchemaName(), table.getName()))
.collect(Collectors.toList());
}

@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
return false;
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {}

@Override
public void initialize(String name, Map<String, String> properties) {
catalogProperties = properties;

String uri = properties.get(CatalogProperties.URI);
Preconditions.checkNotNull(uri, "JDBC connection URI is required");

if (name != null) {
this.catalogName = name;
}

LOG.debug("Connecting to JDBC database {}", properties.get(CatalogProperties.URI));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check on best practices for scrubbing sensitive secrets from log statements in case the URI is used with inline user/password strings.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do these logs go? Should we even be logging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


JdbcClientPool connectionPool = new JdbcClientPool(uri, properties);

if (queryFactory == null) {
queryFactory = new SnowflakeQueryFactory(connectionPool);
}

String fileIOImpl = SnowflakeResources.DEFAULT_FILE_IO_IMPL;

if (null != catalogProperties.get(CatalogProperties.FILE_IO_IMPL)) {
fileIOImpl = catalogProperties.get(CatalogProperties.FILE_IO_IMPL);
}

fileIO = CatalogUtil.loadFileIO(fileIOImpl, catalogProperties, conf);
}

@Override
public void close() {
queryFactory.close();
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> metadata) {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, added throw new UnsupportedOperationException (which is in-line with the base catalog impl) here and other unimplemented methods.


@Override
public List<Namespace> listNamespaces(Namespace namespace) {
Preconditions.checkArgument(
namespace.length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH,
"Snowflake doesn't supports more than 2 levels of namespace");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update message per the comment above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

List<Namespace> namespaceList = Lists.newArrayList();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any advantage to using Lists.newArrayList() over just using a new statement?

Copy link
Collaborator Author

@sfc-gh-dhuo sfc-gh-dhuo Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC it started mostly as a pre-Java9 convention where you'd have to duplicate the generic type like "new ArrayList<Namespace>()", and also to avoid having to import both java.util.List and java.util.ArrayList; in this case I guess we can just follow whatever the rest of the Iceberg project is doing for consistency.

[dhuo@SDP_DevVM-dhuo ~/Snowflake-Labs/iceberg]$ grep -I -R Lists.newArrayList * | wc -l
3050
[dhuo@SDP_DevVM-dhuo ~/Snowflake-Labs/iceberg]$ grep -I -R "new ArrayList" * | wc -l
22
[dhuo@SDP_DevVM-dhuo ~/Snowflake-Labs/iceberg]$ grep -I -R "new ArrayList<>" * | wc -l
10

Apparently they also helped make it explicit by encoding it in their checkstyle rules, so that's reassuring that we'll have some compiler guidance on project style-preferences:

https://github.com/apache/iceberg/blob/86156757653112b1a89d7e7f08903ef746fd918b/.baseline/checkstyle/checkstyle.xml#L60

<module name="RegexpSingleline">
    <property name="format" value="new ArrayList&lt;&gt;\(.*\)"/>
    <property name="message" value="Prefer using Lists.newArrayList() instead."/>
</module>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks.

try {
List<SnowflakeSchema> sfSchemas = queryFactory.listSchemas(namespace);
namespaceList =
sfSchemas.stream()
.map(schema -> Namespace.of(schema.getDatabase(), schema.getName()))
.collect(Collectors.toList());
} catch (UncheckedSQLException | UncheckedInterruptedException ex) {
LOG.error("{}", ex.getMessage(), ex);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What unchecked exceptions do we think we're going to get here? Do we need to catch these? What happens to the exception? We don't end up showing anything to the customer in this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this try/catch; at this level in the code, unchecked exceptions should probably just propagate all the way up.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, thanks.

}

if (namespace.length() == SnowflakeResources.MAX_NAMESPACE_DEPTH) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if namespace.length() < SnowflakeResources.MAX_NAMESPACE_DEPTH? Are we returning something that makes sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the logic here is actually for an edge case where we're trying to list schemas but already specified the exact schema we want. As in:

listNamespaces("foo_database.bar_schema");

and the logic in the JDBC layer currently just lists all schemas under "foo_database" and then this filter makes it so:

  1. If the schema doesn't exist, we return empty list
  2. If it does exist, we filter it down to a size-1 list "equal" to the input

Thinking about it more though, I'm not sure listing schemas in a schema should behave like an existence-check.

When listing namespaces under the "database" level, the 1-level database namespace isn't returned as one of the namespaces, even though the 1-level namespace is allowed in Spark's "in" clause:

scala> spark.sql("show namespaces in iot_data").show(false);
+---------------------------+
|namespace                  |
+---------------------------+
|IOT_DATA.ICE_SCHEMA        |
|IOT_DATA.INFORMATION_SCHEMA|
|IOT_DATA.PUBLIC            |
|IOT_DATA.TPCDS10TB         |
+---------------------------+

So that implies to me that listing "in" a two-level namespace is supposed to only return 3-level namespaces, which aren't supported, and thus we should just throw IllegalArgumentException in that case.

Fixed the Precondition to require level <= MAX - 1

if (namespaceList.stream()
.anyMatch(n -> n.toString().equalsIgnoreCase(namespace.toString()))) {
return Arrays.asList(namespace);
} else {
return Lists.newArrayList();
}
}

return namespaceList;
}

@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace)
throws NoSuchNamespaceException {
List<Namespace> allNamespaces = listNamespaces(namespace);

if (allNamespaces.size() == 0) {
throw new NoSuchNamespaceException("Could not find namespace %s", namespace);
}

Map<String, String> nameSpaceMetadata = Maps.newHashMap();
nameSpaceMetadata.put("name", namespace.toString());
return nameSpaceMetadata;
}

@Override
public boolean dropNamespace(Namespace namespace) {
return false;
}

@Override
public boolean setProperties(Namespace namespace, Map<String, String> properties) {
return false;
}

@Override
public boolean removeProperties(Namespace namespace, Set<String> properties) {
return false;
}

@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
Preconditions.checkArgument(
tableIdentifier.namespace().length() <= SnowflakeResources.MAX_NAMESPACE_DEPTH,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update per above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"Snowflake doesn't supports more than 2 levels of namespace");

return new SnowflakeTableOperations(
queryFactory, fileIO, catalogProperties, catalogName, tableIdentifier);
}

@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
return null;
}

@Override
public void setConf(Object conf) {
this.conf = conf;
}

public Object getConf() {
return conf;
}
}
Loading