Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5ad32b0
Initial read-only Snowflake Catalog implementation by @sfc-gh-mparmar…
sfc-gh-dhuo Dec 13, 2022
930a3f0
Add JdbcSnowflakeClientTest using mocks (#2)
dennishuo Dec 14, 2022
86b3d11
Merge branch 'main' into snowflake-catalog-initial
dennishuo Dec 14, 2022
076a14a
Add test { useJUnitPlatform() } tuple to iceberg-snowflake for
dennishuo Dec 16, 2022
a7b5aa7
Extract versions into versions.props per PR review
dennishuo Dec 17, 2022
dd5255c
Misc test-related refactors per review suggestions
dennishuo Dec 17, 2022
500b36b
Fix unsupported behaviors of loadNamedpaceMetadata and defaultWarehou…
dennishuo Dec 17, 2022
ad2c55f
Move TableIdentifier checks out of newTableOps into the
dennishuo Dec 17, 2022
7f13674
Refactor out any Namespace-related business logic from the lower
dennishuo Dec 17, 2022
58d258e
Finish migrating JdbcSnowflakeClientTest off any usage of org.junit.A…
dennishuo Dec 17, 2022
0183129
Style refactorings from review comments, expanded and moved InMemoryF…
dennishuo Dec 20, 2022
ca6deab
Fix behavior of getNamespaceMetadata to throw when the namespace doesn't
dennishuo Dec 21, 2022
b3a2842
Move private constructor to top, add assertion to test case.
dennishuo Dec 21, 2022
676d024
Define minimal ResultSetParser/QueryHarness classes to fully replace
dennishuo Dec 21, 2022
cc493d0
Update snowflake/src/main/java/org/apache/iceberg/snowflake/Snowflake…
dennishuo Dec 23, 2022
ce7e28c
Refactor style suggestions; remove debug-level logging, arguments in …
dennishuo Dec 23, 2022
2729e64
Fix precondition messages, remove getConf()
dennishuo Jan 6, 2023
4c1e79f
Clean up varargs.
dennishuo Jan 7, 2023
bc0c6ee
Make data members final, include rawJsonVal in toString for debuggabi…
dennishuo Jan 7, 2023
8240d9e
Merge branch 'main' into snowflake-catalog-initial
dennishuo Jan 9, 2023
ebe5dd6
Combine some small test cases into roundtrip test cases, misc cleanup
dennishuo Jan 10, 2023
9e9b9e6
Add comment for why a factory class is exposed for testing purposes.
dennishuo Jan 13, 2023
e8fee31
Merge remote-tracking branch 'origin/main' into snowflake-catalog-ini…
dennishuo Jan 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ ALIYUN:
GCP:
- gcp/**/*
DELL:
- dell/**/*
- dell/**/*
SNOWFLAKE:
- snowflake/**/*
18 changes: 18 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,24 @@ project(':iceberg-dell') {
}
}

project(':iceberg-snowflake') {
test {
useJUnitPlatform()
}

dependencies {
implementation project(':iceberg-core')
implementation project(':iceberg-common')
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.fasterxml.jackson.core:jackson-core"

runtimeOnly("net.snowflake:snowflake-jdbc")

testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
}
}

@Memoized
boolean versionFileExists() {
return file('version.txt').exists()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ClientPoolImpl;

class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
public class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {

private final String dbUrl;
private final Map<String, String> properties;

JdbcClientPool(String dbUrl, Map<String, String> props) {
public JdbcClientPool(String dbUrl, Map<String, String> props) {
this(
Integer.parseInt(
props.getOrDefault(
Expand All @@ -42,7 +42,7 @@ class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
props);
}

JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
super(poolSize, SQLNonTransientConnectionException.class, true);
properties = props;
this.dbUrl = dbUrl;
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/InMemoryFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.util.Map;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class InMemoryFileIO implements FileIO {

private Map<String, byte[]> inMemoryFiles = Maps.newHashMap();
private boolean closed = false;

public void addFile(String path, byte[] contents) {
Preconditions.checkState(!closed, "Cannot call addFile after calling close()");
inMemoryFiles.put(path, contents);
}

public boolean fileExists(String path) {
return inMemoryFiles.containsKey(path);
}

@Override
public InputFile newInputFile(String path) {
Preconditions.checkState(!closed, "Cannot call newInputFile after calling close()");
if (!inMemoryFiles.containsKey(path)) {
throw new NotFoundException("No in-memory file found for path: %s", path);
}
return new InMemoryInputFile(path, inMemoryFiles.get(path));
}

@Override
public OutputFile newOutputFile(String path) {
Preconditions.checkState(!closed, "Cannot call newOutputFile after calling close()");
return new InMemoryOutputFile(path, this);
}

@Override
public void deleteFile(String path) {
Preconditions.checkState(!closed, "Cannot call deleteFile after calling close()");
if (!inMemoryFiles.containsKey(path)) {
throw new NotFoundException("No in-memory file found for path: %s", path);
}
inMemoryFiles.remove(path);
}

public boolean isClosed() {
return closed;
}

@Override
public void close() {
closed = true;
}
}
26 changes: 24 additions & 2 deletions core/src/test/java/org/apache/iceberg/io/InMemoryOutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,38 @@ public class InMemoryOutputFile implements OutputFile {

private boolean exists = false;
private ByteArrayOutputStream contents;
private InMemoryFileIO parentFileIO;

public InMemoryOutputFile() {
this("memory:" + UUID.randomUUID());
}

public InMemoryOutputFile(String location) {
this(location, null);
}

/**
* If the optional parentFileIO is provided, file-existence behaves similarly to S3FileIO;
* existence checks are performed up-front if creating without overwrite, but files only exist in
* the parentFileIO if close() has been called on the associated output streams (or pre-existing
* files are populated into the parentFileIO through other means).
*
* @param location the location returned by location() of this OutputFile, the InputFile obtained
* from calling toInputFile(), and the location for looking up the associated InputFile from a
* parentFileIO, if non-null.
* @param parentFileIO if non-null, commits an associated InMemoryInputFile on close() into the
* parentFileIO, and uses the parentFileIO for "already exists" checks if creating without
* overwriting.
*/
public InMemoryOutputFile(String location, InMemoryFileIO parentFileIO) {
Preconditions.checkNotNull(location, "location is null");
this.location = location;
this.parentFileIO = parentFileIO;
}

@Override
public PositionOutputStream create() {
if (exists) {
if (exists || (parentFileIO != null && parentFileIO.fileExists(location))) {
throw new AlreadyExistsException("Already exists");
}
return createOrOverwrite();
Expand Down Expand Up @@ -70,7 +89,7 @@ public byte[] toByteArray() {
return contents.toByteArray();
}

private static class InMemoryPositionOutputStream extends PositionOutputStream {
private class InMemoryPositionOutputStream extends PositionOutputStream {
private final ByteArrayOutputStream delegate;
private boolean closed = false;

Expand Down Expand Up @@ -112,6 +131,9 @@ public void flush() throws IOException {
public void close() throws IOException {
delegate.close();
closed = true;
if (parentFileIO != null) {
parentFileIO.addFile(location(), toByteArray());
}
}

private void checkOpen() {
Expand Down
111 changes: 111 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/TestInMemoryFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class TestInMemoryFileIO {
String location = "s3://foo/bar.txt";

@Test
public void testBasicEndToEnd() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
Assertions.assertThat(fileIO.fileExists(location)).isFalse();

OutputStream outputStream = fileIO.newOutputFile(location).create();
byte[] data = "hello world".getBytes();
outputStream.write(data);
outputStream.close();
Assertions.assertThat(fileIO.fileExists(location)).isTrue();

InputStream inputStream = fileIO.newInputFile(location).newStream();
byte[] buf = new byte[data.length];
inputStream.read(buf);
inputStream.close();
Assertions.assertThat(new String(buf)).isEqualTo("hello world");

fileIO.deleteFile(location);
Assertions.assertThat(fileIO.fileExists(location)).isFalse();
}

@Test
public void testNewInputFileNotFound() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
Assertions.assertThatExceptionOfType(NotFoundException.class)
.isThrownBy(() -> fileIO.newInputFile("s3://nonexistent/file"));
}

@Test
public void testDeleteFileNotFound() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
Assertions.assertThatExceptionOfType(NotFoundException.class)
.isThrownBy(() -> fileIO.deleteFile("s3://nonexistent/file"));
}

@Test
public void testCreateNoOverwrite() throws IOException {
InMemoryFileIO fileIO = new InMemoryFileIO();
fileIO.addFile(location, "hello world".getBytes());
Assertions.assertThatExceptionOfType(AlreadyExistsException.class)
.isThrownBy(() -> fileIO.newOutputFile(location).create());
}

@Test
public void testOverwriteBeforeAndAfterClose() throws IOException {
byte[] oldData = "old data".getBytes();
byte[] newData = "new data".getBytes();

InMemoryFileIO fileIO = new InMemoryFileIO();
OutputStream outputStream = fileIO.newOutputFile(location).create();
outputStream.write(oldData);

// Even though we've called create() and started writing data, this file won't yet exist
// in the parentFileIO before we've closed it.
Assertions.assertThat(fileIO.fileExists(location)).isFalse();

// File appears after closing it.
outputStream.close();
Assertions.assertThat(fileIO.fileExists(location)).isTrue();

// Start a new OutputFile and write new data but don't close() it yet.
outputStream = fileIO.newOutputFile(location).createOrOverwrite();
outputStream.write(newData);

// We'll still read old data.
InputStream inputStream = fileIO.newInputFile(location).newStream();
byte[] buf = new byte[oldData.length];
inputStream.read(buf);
inputStream.close();
Assertions.assertThat(new String(buf)).isEqualTo("old data");

// Finally, close the new output stream; data should be overwritten with new data now.
outputStream.close();
inputStream = fileIO.newInputFile(location).newStream();
buf = new byte[newData.length];
inputStream.read(buf);
inputStream.close();
Assertions.assertThat(new String(buf)).isEqualTo("new data");
}
}
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ include 'hive-metastore'
include 'nessie'
include 'gcp'
include 'dell'
include 'snowflake'

project(':api').name = 'iceberg-api'
project(':common').name = 'iceberg-common'
Expand All @@ -51,6 +52,7 @@ project(':hive-metastore').name = 'iceberg-hive-metastore'
project(':nessie').name = 'iceberg-nessie'
project(':gcp').name = 'iceberg-gcp'
project(':dell').name = 'iceberg-dell'
project(':snowflake').name = 'iceberg-snowflake'

if (null != System.getProperty("allVersions")) {
System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions"))
Expand Down
Loading