Skip to content
Merged
1 change: 1 addition & 0 deletions api/src/main/java/org/apache/iceberg/io/OutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public interface OutputFile {
*
* @return an output stream that can report its position
* @throws RuntimeIOException If the implementation throws an {@link IOException}
* @throws SecurityException If staging directory creation fails due to missing JVM level permission
*/
PositionOutputStream createOrOverwrite();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ public void testConcurrentCommits() throws Exception {
Assert.assertEquals(2, table.schema().columns().size());
}

@Test
public void testDropNamespace() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
catalog.dropNamespace(namespace);
GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.namespacePrimaryKey(namespace))
.build());
Assert.assertFalse("namespace must not exist", response.hasItem());
}

private static String genRandomName() {
return UUID.randomUUID().toString().replace("-", "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept
dynamo.deleteItem(DeleteItemRequest.builder()
.tableName(awsProperties.dynamoDbTableName())
.key(namespacePrimaryKey(namespace))
.conditionExpression("attribute_exists(" + namespace + ")")
.conditionExpression("attribute_exists(" + COL_NAMESPACE + ")")
.build());
return true;
} catch (ConditionalCheckFailedException e) {
Expand Down
21 changes: 21 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private void newStream() throws IOException {
stream.close();
}

createStagingDirectoryIfNotExists();
currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
currentStagingFile.deleteOnExit();
stagingFiles.add(currentStagingFile);
Expand Down Expand Up @@ -328,6 +329,26 @@ private static InputStream uncheckedInputStream(File file) {
}
}

private void createStagingDirectoryIfNotExists() throws IOException, SecurityException {
if (!stagingDirectory.exists()) {
LOG.info("Staging directory does not exist, trying to create one: {}",
stagingDirectory.getAbsolutePath());
boolean createdStagingDirectory = stagingDirectory.mkdirs();
if (createdStagingDirectory) {
LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath());
} else {
if (stagingDirectory.exists()) {
LOG.info("Successfully created staging directory by another process: {}",
stagingDirectory.getAbsolutePath());
} else {
throw new IOException(
"Failed to create staging directory due to some unknown reason: " + stagingDirectory
.getAbsolutePath());
}
}
}
}

@SuppressWarnings("checkstyle:NoFinalizer")
@Override
protected void finalize() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.aws.s3;

import com.adobe.testing.s3mock.junit4.S3MockRule;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
Expand All @@ -29,6 +30,7 @@
import java.util.stream.Stream;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class S3OutputStreamTest {
private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3));
private final Random random = new Random(1);
private final Path tmpDir = Files.createTempDirectory("s3fileio-test-");
private final String newTmpDirectory = "/tmp/newStagingDirectory";

private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
Expand All @@ -85,6 +88,14 @@ public void before() {
s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@After
public void after() {
File newStagingDirectory = new File(newTmpDirectory);
if (newStagingDirectory.exists()) {
newStagingDirectory.delete();
}
}

@Test
public void testWrite() {
// Run tests for both byte and array write paths
Expand Down Expand Up @@ -140,6 +151,14 @@ public void testMultipleClose() throws IOException {
stream.close();
}

@Test
public void testStagingDirectoryCreation() throws IOException {
AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of(
AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory));
S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties);
stream.close();
}

private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) {
try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) {
if (arrayWrite) {
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,8 @@ project(':iceberg-hive-runtime') {
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
// relocate OrcSplit in order to avoid the conflict from Hive's OrcSplit
relocate 'org.apache.hadoop.hive.ql.io.orc.OrcSplit', 'org.apache.iceberg.shaded.org.apache.hadoop.hive.ql.io.orc.OrcSplit'

classifier null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PropertiesUpdate implements UpdateProperties {
@Override
public UpdateProperties set(String key, String value) {
Preconditions.checkNotNull(key, "Key cannot be null");
Preconditions.checkNotNull(key, "Value cannot be null");
Preconditions.checkNotNull(value, "Value cannot be null");
Preconditions.checkArgument(!removals.contains(key),
"Cannot remove and update the same key: %s", key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> {
@Override
protected Connection newClient() {
try {
Properties dbProps = new Properties();
properties.forEach((key, value) -> dbProps.put(key.replace(JdbcCatalog.PROPERTY_PREFIX, ""), value));
Properties dbProps = JdbcUtil.filterAndRemovePrefix(properties, JdbcCatalog.PROPERTY_PREFIX);
return DriverManager.getConnection(dbUrl, dbProps);
} catch (SQLException e) {
throw new UncheckedSQLException(e, "Failed to connect: %s", dbUrl);
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iceberg.jdbc;

import java.util.Map;
import java.util.Properties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
Expand Down Expand Up @@ -86,4 +88,15 @@ public static TableIdentifier stringToTableIdentifier(String tableNamespace, Str
return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace), tableName);
}

public static Properties filterAndRemovePrefix(Map<String, String> properties,
String prefix) {
Properties result = new Properties();
properties.forEach((key, value) -> {
if (key.startsWith(prefix)) {
result.put(key.substring(prefix.length()), value);
}
});

return result;
}
}
48 changes: 48 additions & 0 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.jdbc;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class TestJdbcUtil {

@Test
public void testFilterAndRemovePrefix() {
Map<String, String> input = new HashMap<>();
input.put("warehouse", "/tmp/warehouse");
input.put("user", "foo");
input.put("jdbc.user", "bar");
input.put("jdbc.pass", "secret");
input.put("jdbc.jdbc.abcxyz", "abcxyz");

Properties expected = new Properties();
expected.put("user", "bar");
expected.put("pass", "secret");
expected.put("jdbc.abcxyz", "abcxyz");

Properties actual = JdbcUtil.filterAndRemovePrefix(input, "jdbc.");

Assertions.assertThat(expected).isEqualTo(actual);
}
}
9 changes: 7 additions & 2 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,14 @@ private List<Predicate<T>> applyEqDeletes() {

Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
delete -> openDeletes(delete, deleteSchema));

// copy the delete records because they will be held in a set
CloseableIterable<Record> records = CloseableIterable.transform(
CloseableIterable.concat(deleteRecords), Record::copy);

StructLikeSet deleteSet = Deletes.toEqualitySet(
// copy the delete records because they will be held in a set
CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
CloseableIterable.transform(
records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
deleteSchema.asStruct());

Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
Expand Down
Loading