Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
595942b
Core: Fix null value check for table properties (#3052)
rawataaryan9 Aug 30, 2021
8a7aa53
AWS: Fix DynamoDbCatalog.dropNamespace attr check (#3035)
bijanhoule Sep 13, 2021
0796365
Core: Fix JDBC properties, only keep keys with jdbc. prefix (#3078)
dungdm93 Sep 26, 2021
dcafafb
Data: Fix equality deletes with date/time types (#3135)
xloya Sep 28, 2021
1a7534d
Core: Optimize check for referenced data files in BaseRowDelta (#3071)
aokolnychyi Sep 13, 2021
04cd655
Core: Support committing delete files with multiple specs (#2985)
aokolnychyi Sep 21, 2021
80117c2
Core: Validate concurrently added delete files in RowDelta (#3195)
aokolnychyi Sep 28, 2021
ab88000
AWS: Add check to create staging directory if not exists for S3Output…
rajarshisarkar Sep 28, 2021
ba25668
Core: Validate concurrently added delete files in OvewriteFiles (#3199)
aokolnychyi Oct 1, 2021
473ff6e
Hive: Fix NoSuchMethodError of OrcTail with Hive3.x and Vectorized OR…
omarsmak Oct 12, 2021
c8cafb7
Flink: Fix CDC validation errors (#3258)
rdblue Oct 19, 2021
1525b94
Parquet: Fix map projection after map to key_value rename (#3309)
rdblue Oct 19, 2021
59667eb
Hotfix: Fix Flink test imports. (#3319)
rdblue Oct 19, 2021
8476aa1
Build: Fix ErrorProne NewHashMapInt warnings (#3260)
kbendick Oct 10, 2021
fa75f7e
Core: Fail if both Catalog type and catalog-impl are configured (#3162)
omarsmak Oct 12, 2021
64cc9a4
Hive: Fix Catalogs.hiveCatalog method for default catalogs (#3338)
Oct 26, 2021
2f0154c
Spark: Fix ClassCastException when using bucket UDF (#3368)
izchen Oct 26, 2021
d4ae00e
HOTFIX - Remove additional parens until #3386 is merged in
kbendick Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/main/java/org/apache/iceberg/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ByteBuffers;

/**
Expand Down Expand Up @@ -230,7 +230,7 @@ private static Map<Integer, ByteBuffer> readByteBufferMap(ObjectInputStream in)
return null;

} else {
Map<Integer, ByteBuffer> result = new HashMap<>(size);
Map<Integer, ByteBuffer> result = Maps.newHashMapWithExpectedSize(size);

for (int i = 0; i < size; ++i) {
Integer key = (Integer) in.readObject();
Expand Down
63 changes: 58 additions & 5 deletions api/src/main/java/org/apache/iceberg/OverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,28 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
OverwriteFiles caseSensitive(boolean caseSensitive);

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* This method should be called while committing non-idempotent overwrite operations.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, the overwrite operation
* will detect this during retries and fail.
* will detect this and fail.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* serializable isolation for eager update/delete operations. Otherwise, the isolation level
* serializable isolation for overwrite operations. Otherwise, the isolation level
* will be snapshot isolation.
* <p>
* Validation applies to files added to the table since the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
* {@link #validateNoConflictingData()} instead.
*/
OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter);
@Deprecated
default OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter) {
return conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData();
}

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
Expand All @@ -145,4 +150,52 @@ public interface OverwriteFiles extends SnapshotUpdate<OverwriteFiles> {
*/
@Deprecated
OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression conflictDetectionFilter);

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter);

/**
* Enables validation that data added concurrently does not conflict with this commit's operation.
* <p>
* This method should be called while committing non-idempotent overwrite operations.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, the overwrite operation
* will detect this and fail.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* isolation for non-idempotent overwrite operations.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
* If the conflict detection filter is not set, any new data added concurrently will fail this
* overwrite operation.
*
* @return this for method chaining
*/
OverwriteFiles validateNoConflictingData();

/**
* Enables validation that deletes that happened concurrently do not conflict with this commit's operation.
* <p>
* Validating concurrent deletes is required during non-idempotent overwrite operations.
* If a concurrent operation deletes data in one of the files being overwritten, the overwrite
* operation must be aborted as it may undelete rows that were removed concurrently.
* <p>
* Calling this method with a correct conflict detection filter is required to maintain
* isolation for non-idempotent overwrite operations.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
* If the conflict detection filter is not set, this operation will use the row filter provided
* in {@link #overwriteByRowFilter(Expression)} to check for new delete files and will ensure
* there are no conflicting deletes for data files removed via {@link #deleteFile(DataFile)}.
*
* @return this for method chaining
*/
OverwriteFiles validateNoConflictingDeletes();
}
52 changes: 50 additions & 2 deletions api/src/main/java/org/apache/iceberg/RowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
RowDelta validateDeletedFiles();

/**
* Enables validation that files added concurrently do not conflict with this commit's operation.
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* If a concurrent operation commits a new file after the data was read and that file might
Expand All @@ -109,6 +109,54 @@ public interface RowDelta extends SnapshotUpdate<RowDelta> {
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
* @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and
* {@link #validateNoConflictingDataFiles()} instead.
*/
RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter);
@Deprecated
default RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter) {
conflictDetectionFilter(conflictDetectionFilter);
return validateNoConflictingDataFiles();
}

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
* <p>
* If not called, a true literal will be used as the conflict detection filter.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
RowDelta conflictDetectionFilter(Expression conflictDetectionFilter);

/**
* Enables validation that data files added concurrently do not conflict with this commit's operation.
* <p>
* This method should be called when the table is queried to determine which files to delete/append.
* If a concurrent operation commits a new file after the data was read and that file might
* contain rows matching the specified conflict detection filter, this operation
* will detect this during retries and fail.
* <p>
* Calling this method is required to maintain serializable isolation for update/delete operations.
* Otherwise, the isolation level will be snapshot isolation.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
RowDelta validateNoConflictingDataFiles();

/**
* Enables validation that delete files added concurrently do not conflict with this commit's operation.
* <p>
* This method must be called when the table is queried to produce a row delta for UPDATE and
* MERGE operations independently of the isolation level. Calling this method isn't required
* for DELETE operations as it is OK to delete a record that is also deleted concurrently.
* <p>
* Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and
* applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
RowDelta validateNoConflictingDeleteFiles();
}
1 change: 1 addition & 0 deletions api/src/main/java/org/apache/iceberg/io/OutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public interface OutputFile {
*
* @return an output stream that can report its position
* @throws RuntimeIOException If the implementation throws an {@link IOException}
* @throws SecurityException If staging directory creation fails due to missing JVM level permission
*/
PositionOutputStream createOrOverwrite();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ public void testConcurrentCommits() throws Exception {
Assert.assertEquals(2, table.schema().columns().size());
}

@Test
public void testDropNamespace() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
catalog.dropNamespace(namespace);
GetItemResponse response = dynamo.getItem(GetItemRequest.builder()
.tableName(catalogTableName)
.key(DynamoDbCatalog.namespacePrimaryKey(namespace))
.build());
Assert.assertFalse("namespace must not exist", response.hasItem());
}

private static String genRandomName() {
return UUID.randomUUID().toString().replace("-", "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept
dynamo.deleteItem(DeleteItemRequest.builder()
.tableName(awsProperties.dynamoDbTableName())
.key(namespacePrimaryKey(namespace))
.conditionExpression("attribute_exists(" + namespace + ")")
.conditionExpression("attribute_exists(" + COL_NAMESPACE + ")")
.build());
return true;
} catch (ConditionalCheckFailedException e) {
Expand Down
21 changes: 21 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private void newStream() throws IOException {
stream.close();
}

createStagingDirectoryIfNotExists();
currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
currentStagingFile.deleteOnExit();
stagingFiles.add(currentStagingFile);
Expand Down Expand Up @@ -328,6 +329,26 @@ private static InputStream uncheckedInputStream(File file) {
}
}

private void createStagingDirectoryIfNotExists() throws IOException, SecurityException {
if (!stagingDirectory.exists()) {
LOG.info("Staging directory does not exist, trying to create one: {}",
stagingDirectory.getAbsolutePath());
boolean createdStagingDirectory = stagingDirectory.mkdirs();
if (createdStagingDirectory) {
LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath());
} else {
if (stagingDirectory.exists()) {
LOG.info("Successfully created staging directory by another process: {}",
stagingDirectory.getAbsolutePath());
} else {
throw new IOException(
"Failed to create staging directory due to some unknown reason: " + stagingDirectory
.getAbsolutePath());
}
}
}
}

@SuppressWarnings("checkstyle:NoFinalizer")
@Override
protected void finalize() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.aws.s3;

import com.adobe.testing.s3mock.junit4.S3MockRule;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
Expand All @@ -29,6 +30,7 @@
import java.util.stream.Stream;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class S3OutputStreamTest {
private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3));
private final Random random = new Random(1);
private final Path tmpDir = Files.createTempDirectory("s3fileio-test-");
private final String newTmpDirectory = "/tmp/newStagingDirectory";

private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
Expand All @@ -85,6 +88,14 @@ public void before() {
s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@After
public void after() {
File newStagingDirectory = new File(newTmpDirectory);
if (newStagingDirectory.exists()) {
newStagingDirectory.delete();
}
}

@Test
public void testWrite() {
// Run tests for both byte and array write paths
Expand Down Expand Up @@ -140,6 +151,14 @@ public void testMultipleClose() throws IOException {
stream.close();
}

@Test
public void testStagingDirectoryCreation() throws IOException {
AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of(
AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory));
S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties);
stream.close();
}

private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) {
try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) {
if (arrayWrite) {
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,8 @@ project(':iceberg-hive-runtime') {
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
// relocate OrcSplit in order to avoid the conflict from Hive's OrcSplit
relocate 'org.apache.hadoop.hive.ql.io.orc.OrcSplit', 'org.apache.iceberg.shaded.org.apache.hadoop.hive.ql.io.orc.OrcSplit'

classifier null
}
Expand Down
Loading