diff --git a/api/src/main/java/org/apache/iceberg/Metrics.java b/api/src/main/java/org/apache/iceberg/Metrics.java index d5367c448175..8a213ad8f839 100644 --- a/api/src/main/java/org/apache/iceberg/Metrics.java +++ b/api/src/main/java/org/apache/iceberg/Metrics.java @@ -24,8 +24,8 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ByteBuffers; /** @@ -230,7 +230,7 @@ private static Map readByteBufferMap(ObjectInputStream in) return null; } else { - Map result = new HashMap<>(size); + Map result = Maps.newHashMapWithExpectedSize(size); for (int i = 0; i < size; ++i) { Integer key = (Integer) in.readObject(); diff --git a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java index e2f3d62a24e3..e85545aac197 100644 --- a/api/src/main/java/org/apache/iceberg/OverwriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/OverwriteFiles.java @@ -107,23 +107,28 @@ public interface OverwriteFiles extends SnapshotUpdate { OverwriteFiles caseSensitive(boolean caseSensitive); /** - * Enables validation that files added concurrently do not conflict with this commit's operation. + * Enables validation that data files added concurrently do not conflict with this commit's operation. *

- * This method should be called when the table is queried to determine which files to delete/append. + * This method should be called while committing non-idempotent overwrite operations. * If a concurrent operation commits a new file after the data was read and that file might * contain rows matching the specified conflict detection filter, the overwrite operation - * will detect this during retries and fail. + * will detect this and fail. *

* Calling this method with a correct conflict detection filter is required to maintain - * serializable isolation for eager update/delete operations. Otherwise, the isolation level + * serializable isolation for overwrite operations. Otherwise, the isolation level * will be snapshot isolation. *

* Validation applies to files added to the table since the snapshot passed to {@link #validateFromSnapshot(long)}. * * @param conflictDetectionFilter an expression on rows in the table * @return this for method chaining + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and + * {@link #validateNoConflictingData()} instead. */ - OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter); + @Deprecated + default OverwriteFiles validateNoConflictingAppends(Expression conflictDetectionFilter) { + return conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData(); + } /** * Enables validation that files added concurrently do not conflict with this commit's operation. @@ -145,4 +150,52 @@ public interface OverwriteFiles extends SnapshotUpdate { */ @Deprecated OverwriteFiles validateNoConflictingAppends(Long readSnapshotId, Expression conflictDetectionFilter); + + /** + * Sets a conflict detection filter used to validate concurrently added data and delete files. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + OverwriteFiles conflictDetectionFilter(Expression conflictDetectionFilter); + + /** + * Enables validation that data added concurrently does not conflict with this commit's operation. + *

+ * This method should be called while committing non-idempotent overwrite operations. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified conflict detection filter, the overwrite operation + * will detect this and fail. + *

+ * Calling this method with a correct conflict detection filter is required to maintain + * isolation for non-idempotent overwrite operations. + *

+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and + * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * If the conflict detection filter is not set, any new data added concurrently will fail this + * overwrite operation. + * + * @return this for method chaining + */ + OverwriteFiles validateNoConflictingData(); + + /** + * Enables validation that deletes that happened concurrently do not conflict with this commit's operation. + *

+ * Validating concurrent deletes is required during non-idempotent overwrite operations. + * If a concurrent operation deletes data in one of the files being overwritten, the overwrite + * operation must be aborted as it may undelete rows that were removed concurrently. + *

+ * Calling this method with a correct conflict detection filter is required to maintain + * isolation for non-idempotent overwrite operations. + *

+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and + * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * If the conflict detection filter is not set, this operation will use the row filter provided + * in {@link #overwriteByRowFilter(Expression)} to check for new delete files and will ensure + * there are no conflicting deletes for data files removed via {@link #deleteFile(DataFile)}. + * + * @return this for method chaining + */ + OverwriteFiles validateNoConflictingDeletes(); } diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java index 765145d804cb..dcf250aff12f 100644 --- a/api/src/main/java/org/apache/iceberg/RowDelta.java +++ b/api/src/main/java/org/apache/iceberg/RowDelta.java @@ -94,7 +94,7 @@ public interface RowDelta extends SnapshotUpdate { RowDelta validateDeletedFiles(); /** - * Enables validation that files added concurrently do not conflict with this commit's operation. + * Enables validation that data files added concurrently do not conflict with this commit's operation. *

* This method should be called when the table is queried to determine which files to delete/append. * If a concurrent operation commits a new file after the data was read and that file might @@ -109,6 +109,54 @@ public interface RowDelta extends SnapshotUpdate { * * @param conflictDetectionFilter an expression on rows in the table * @return this for method chaining + * @deprecated since 0.13.0, will be removed in 0.14.0; use {@link #conflictDetectionFilter(Expression)} and + * {@link #validateNoConflictingDataFiles()} instead. */ - RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter); + @Deprecated + default RowDelta validateNoConflictingAppends(Expression conflictDetectionFilter) { + conflictDetectionFilter(conflictDetectionFilter); + return validateNoConflictingDataFiles(); + } + + /** + * Sets a conflict detection filter used to validate concurrently added data and delete files. + *

+ * If not called, a true literal will be used as the conflict detection filter. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + RowDelta conflictDetectionFilter(Expression conflictDetectionFilter); + + /** + * Enables validation that data files added concurrently do not conflict with this commit's operation. + *

+ * This method should be called when the table is queried to determine which files to delete/append. + * If a concurrent operation commits a new file after the data was read and that file might + * contain rows matching the specified conflict detection filter, this operation + * will detect this during retries and fail. + *

+ * Calling this method is required to maintain serializable isolation for update/delete operations. + * Otherwise, the isolation level will be snapshot isolation. + *

+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and + * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + RowDelta validateNoConflictingDataFiles(); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's operation. + *

+ * This method must be called when the table is queried to produce a row delta for UPDATE and + * MERGE operations independently of the isolation level. Calling this method isn't required + * for DELETE operations as it is OK to delete a record that is also deleted concurrently. + *

+ * Validation uses the conflict detection filter passed to {@link #conflictDetectionFilter(Expression)} and + * applies to operations that happened after the snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + RowDelta validateNoConflictingDeleteFiles(); } diff --git a/api/src/main/java/org/apache/iceberg/io/OutputFile.java b/api/src/main/java/org/apache/iceberg/io/OutputFile.java index 34b4e54abf62..67195c46c448 100644 --- a/api/src/main/java/org/apache/iceberg/io/OutputFile.java +++ b/api/src/main/java/org/apache/iceberg/io/OutputFile.java @@ -48,6 +48,7 @@ public interface OutputFile { * * @return an output stream that can report its position * @throws RuntimeIOException If the implementation throws an {@link IOException} + * @throws SecurityException If staging directory creation fails due to missing JVM level permission */ PositionOutputStream createOrOverwrite(); diff --git a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java index 8d4273c1625f..9074d289b4c7 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java @@ -283,6 +283,18 @@ public void testConcurrentCommits() throws Exception { Assert.assertEquals(2, table.schema().columns().size()); } + @Test + public void testDropNamespace() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + catalog.dropNamespace(namespace); + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.namespacePrimaryKey(namespace)) + .build()); + Assert.assertFalse("namespace must not exist", response.hasItem()); + } + private static String genRandomName() { return UUID.randomUUID().toString().replace("-", ""); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java index f9bef1514604..ebf4964aea85 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java @@ -248,7 +248,7 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept dynamo.deleteItem(DeleteItemRequest.builder() .tableName(awsProperties.dynamoDbTableName()) .key(namespacePrimaryKey(namespace)) - .conditionExpression("attribute_exists(" + namespace + ")") + .conditionExpression("attribute_exists(" + COL_NAMESPACE + ")") .build()); return true; } catch (ConditionalCheckFailedException e) { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 690dc9aea255..f66a2967d05f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -175,6 +175,7 @@ private void newStream() throws IOException { stream.close(); } + createStagingDirectoryIfNotExists(); currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory); currentStagingFile.deleteOnExit(); stagingFiles.add(currentStagingFile); @@ -328,6 +329,26 @@ private static InputStream uncheckedInputStream(File file) { } } + private void createStagingDirectoryIfNotExists() throws IOException, SecurityException { + if (!stagingDirectory.exists()) { + LOG.info("Staging directory does not exist, trying to create one: {}", + stagingDirectory.getAbsolutePath()); + boolean createdStagingDirectory = stagingDirectory.mkdirs(); + if (createdStagingDirectory) { + LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath()); + } else { + if (stagingDirectory.exists()) { + LOG.info("Successfully created staging directory by another process: {}", + stagingDirectory.getAbsolutePath()); + } else { + throw new IOException( + "Failed to create staging directory due to some unknown reason: " + stagingDirectory + .getAbsolutePath()); + } + } + } + } + @SuppressWarnings("checkstyle:NoFinalizer") @Override protected void finalize() throws Throwable { diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java index b4dc1ecf0b5e..b0f8b7384513 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java @@ -20,6 +20,7 @@ package org.apache.iceberg.aws.s3; import com.adobe.testing.s3mock.junit4.S3MockRule; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; @@ -29,6 +30,7 @@ import java.util.stream.Stream; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -72,6 +74,7 @@ public class S3OutputStreamTest { private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3)); private final Random random = new Random(1); private final Path tmpDir = Files.createTempDirectory("s3fileio-test-"); + private final String newTmpDirectory = "/tmp/newStagingDirectory"; private final AwsProperties properties = new AwsProperties(ImmutableMap.of( AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024), @@ -85,6 +88,14 @@ public void before() { s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build()); } + @After + public void after() { + File newStagingDirectory = new File(newTmpDirectory); + if (newStagingDirectory.exists()) { + newStagingDirectory.delete(); + } + } + @Test public void testWrite() { // Run tests for both byte and array write paths @@ -140,6 +151,14 @@ public void testMultipleClose() throws IOException { stream.close(); } + @Test + public void testStagingDirectoryCreation() throws IOException { + AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of( + AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory)); + S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties); + stream.close(); + } + private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) { try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) { if (arrayWrite) { diff --git a/build.gradle b/build.gradle index dd99804b3aef..8a8812ae3ec8 100644 --- a/build.gradle +++ b/build.gradle @@ -736,6 +736,8 @@ project(':iceberg-hive-runtime') { relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' + // relocate OrcSplit in order to avoid the conflict from Hive's OrcSplit + relocate 'org.apache.hadoop.hive.ql.io.orc.OrcSplit', 'org.apache.iceberg.shaded.org.apache.hadoop.hive.ql.io.orc.OrcSplit' classifier null } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index e1fbb0f942c5..fa38ebfa43a0 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -19,17 +19,23 @@ package org.apache.iceberg; +import java.util.Set; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { + private final Set deletedDataFiles = Sets.newHashSet(); private boolean validateAddedFilesMatchOverwriteFilter = false; private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; + private boolean validateNewDataFiles = false; + private boolean validateNewDeleteFiles = false; private boolean caseSensitive = true; protected BaseOverwriteFiles(String tableName, TableOperations ops) { @@ -60,6 +66,7 @@ public OverwriteFiles addFile(DataFile file) { @Override public OverwriteFiles deleteFile(DataFile file) { + deletedDataFiles.add(file); delete(file); return this; } @@ -93,9 +100,22 @@ public OverwriteFiles caseSensitive(boolean isCaseSensitive) { } @Override - public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetectionFilter) { + public OverwriteFiles conflictDetectionFilter(Expression newConflictDetectionFilter) { Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); this.conflictDetectionFilter = newConflictDetectionFilter; + return this; + } + + @Override + public OverwriteFiles validateNoConflictingData() { + this.validateNewDataFiles = true; + failMissingDeletePaths(); + return this; + } + + @Override + public OverwriteFiles validateNoConflictingDeletes() { + this.validateNewDeleteFiles = true; failMissingDeletePaths(); return this; } @@ -103,7 +123,7 @@ public OverwriteFiles validateNoConflictingAppends(Expression newConflictDetecti @Override protected void validate(TableMetadata base) { if (validateAddedFilesMatchOverwriteFilter) { - PartitionSpec spec = writeSpec(); + PartitionSpec spec = dataSpec(); Expression rowFilter = rowFilter(); Expression inclusiveExpr = Projections.inclusive(spec).project(rowFilter); @@ -127,8 +147,32 @@ protected void validate(TableMetadata base) { } } - if (conflictDetectionFilter != null && base.currentSnapshot() != null) { - validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); + + if (validateNewDataFiles) { + validateAddedDataFiles(base, startingSnapshotId, dataConflictDetectionFilter(), caseSensitive); + } + + if (validateNewDeleteFiles) { + if (rowFilter() != Expressions.alwaysFalse()) { + Expression filter = conflictDetectionFilter != null ? conflictDetectionFilter : rowFilter(); + validateNoNewDeleteFiles(base, startingSnapshotId, filter, caseSensitive); + } + + if (deletedDataFiles.size() > 0) { + validateNoNewDeletesForDataFiles( + base, startingSnapshotId, conflictDetectionFilter, + deletedDataFiles, caseSensitive); + } + } + } + + private Expression dataConflictDetectionFilter() { + if (conflictDetectionFilter != null) { + return conflictDetectionFilter; + } else if (rowFilter() != Expressions.alwaysFalse() && deletedDataFiles.isEmpty()) { + return rowFilter(); + } else { + return Expressions.alwaysTrue(); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java index 7f57195edc25..826bc2ae94ec 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplacePartitions.java @@ -55,7 +55,7 @@ public ReplacePartitions validateAppendOnly() { @Override public List apply(TableMetadata base) { - if (writeSpec().fields().size() <= 0) { + if (dataSpec().fields().size() <= 0) { // replace all data in an unpartitioned table deleteByRowFilter(Expressions.alwaysTrue()); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 8a1371311b30..ab71b554c405 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.CharSequenceSet; @@ -27,7 +28,9 @@ class BaseRowDelta extends MergingSnapshotProducer implements RowDelta private Long startingSnapshotId = null; // check all versions by default private final CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); private boolean validateDeletes = false; - private Expression conflictDetectionFilter = null; + private Expression conflictDetectionFilter = Expressions.alwaysTrue(); + private boolean validateNewDataFiles = false; + private boolean validateNewDeleteFiles = false; private boolean caseSensitive = true; BaseRowDelta(String tableName, TableOperations ops) { @@ -81,23 +84,39 @@ public RowDelta validateDataFilesExist(Iterable referenc } @Override - public RowDelta validateNoConflictingAppends(Expression newConflictDetectionFilter) { + public RowDelta conflictDetectionFilter(Expression newConflictDetectionFilter) { Preconditions.checkArgument(newConflictDetectionFilter != null, "Conflict detection filter cannot be null"); this.conflictDetectionFilter = newConflictDetectionFilter; return this; } + @Override + public RowDelta validateNoConflictingDataFiles() { + this.validateNewDataFiles = true; + return this; + } + + @Override + public RowDelta validateNoConflictingDeleteFiles() { + this.validateNewDeleteFiles = true; + return this; + } + @Override protected void validate(TableMetadata base) { if (base.currentSnapshot() != null) { if (!referencedDataFiles.isEmpty()) { - validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes); + validateDataFilesExist( + base, startingSnapshotId, referencedDataFiles, !validateDeletes, conflictDetectionFilter); } - // TODO: does this need to check new delete files? - if (conflictDetectionFilter != null) { + if (validateNewDataFiles) { validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); } + + if (validateNewDeleteFiles) { + validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); + } } } } diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 4e477b873339..bada6fd5bb52 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -220,6 +220,11 @@ public static Catalog buildIcebergCatalog(String name, Map optio default: throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); } + } else { + String catalogType = options.get(ICEBERG_CATALOG_TYPE); + Preconditions.checkArgument(catalogType == null, + "Cannot create catalog %s, both type and catalog-impl are set: type=%s, catalog-impl=%s", + name, catalogType, catalogImpl); } return CatalogUtil.loadCatalog(catalogImpl, name, options, conf); diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 1c3ce122bedd..79c42f7bc8ae 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -86,6 +87,20 @@ public boolean isEmpty() { return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty(); } + public Iterable referencedDeleteFiles() { + Iterable deleteFiles = Collections.emptyList(); + + if (globalDeletes != null) { + deleteFiles = Iterables.concat(deleteFiles, Arrays.asList(globalDeletes)); + } + + for (Pair partitionDeletes : sortedDeletesByPartition.values()) { + deleteFiles = Iterables.concat(deleteFiles, Arrays.asList(partitionDeletes.second())); + } + + return deleteFiles; + } + private StructLikeWrapper newWrapper(int specId) { return StructLikeWrapper.forType(partitionTypeById.get(specId)); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f26412b27bbd..f325edf98582 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; import org.apache.iceberg.events.CreateSnapshotEvent; @@ -39,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; @@ -64,7 +66,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private static final Set VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE); // delete files can be added in "overwrite" or "delete" operations - private static final Set VALIDATE_REPLACED_DATA_FILES_OPERATIONS = + private static final Set VALIDATE_ADDED_DELETE_FILES_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE); private final String tableName; @@ -78,25 +80,27 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final List newFiles = Lists.newArrayList(); - private final List newDeleteFiles = Lists.newArrayList(); + private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder(); private Expression deleteExpression = Expressions.alwaysFalse(); - private PartitionSpec spec; + private PartitionSpec dataSpec; // cache new manifests after writing private ManifestFile cachedNewManifest = null; private boolean hasNewFiles = false; - private ManifestFile cachedNewDeleteManifest = null; + + // cache new manifests for delete files + private final List cachedNewDeleteManifests = Lists.newLinkedList(); private boolean hasNewDeleteFiles = false; MergingSnapshotProducer(String tableName, TableOperations ops) { super(ops); this.tableName = tableName; this.ops = ops; - this.spec = null; + this.dataSpec = null; long targetSizeBytes = ops.current() .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); int minCountToMerge = ops.current() @@ -117,11 +121,10 @@ public ThisT set(String property, String value) { return self(); } - protected PartitionSpec writeSpec() { - Preconditions.checkState(spec != null, - "Cannot determine partition spec: no data or delete files have been added"); + protected PartitionSpec dataSpec() { + Preconditions.checkState(dataSpec != null, "Cannot determine partition spec: no data files have been added"); // the spec is set when the write is started - return spec; + return dataSpec; } protected Expression rowFilter() { @@ -190,8 +193,9 @@ protected void delete(CharSequence path) { * Add a data file to the new snapshot. */ protected void add(DataFile file) { - setWriteSpec(file); - addedFilesSummary.addedFile(writeSpec(), file); + Preconditions.checkNotNull(file, "Invalid data file: null"); + setDataSpec(file); + addedFilesSummary.addedFile(dataSpec(), file); hasNewFiles = true; newFiles.add(file); } @@ -200,21 +204,21 @@ protected void add(DataFile file) { * Add a delete file to the new snapshot. */ protected void add(DeleteFile file) { - setWriteSpec(file); - addedFilesSummary.addedFile(writeSpec(), file); + Preconditions.checkNotNull(file, "Invalid delete file: null"); + PartitionSpec fileSpec = ops.current().spec(file.specId()); + List deleteFiles = newDeleteFilesBySpec.computeIfAbsent(file.specId(), specId -> Lists.newArrayList()); + deleteFiles.add(file); + addedFilesSummary.addedFile(fileSpec, file); hasNewDeleteFiles = true; - newDeleteFiles.add(file); } - private void setWriteSpec(ContentFile file) { - Preconditions.checkNotNull(file, "Invalid content file: null"); - PartitionSpec writeSpec = ops.current().spec(file.specId()); - Preconditions.checkNotNull(writeSpec, - "Cannot find partition spec for file: %s", file.path()); - if (spec == null) { - spec = writeSpec; - } else if (spec.specId() != file.specId()) { - throw new ValidationException("Invalid file, expected spec id: %d", spec.specId()); + private void setDataSpec(DataFile file) { + PartitionSpec fileSpec = ops.current().spec(file.specId()); + Preconditions.checkNotNull(fileSpec, "Cannot find partition spec for data file: %s", file.path()); + if (dataSpec == null) { + dataSpec = fileSpec; + } else if (dataSpec.specId() != file.specId()) { + throw new ValidationException("Invalid data file, expected spec id: %d", dataSpec.specId()); } } @@ -293,20 +297,33 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI */ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, Iterable dataFiles) { + validateNoNewDeletesForDataFiles(base, startingSnapshotId, null, dataFiles, true); + } + + /** + * Validates that no new delete files that must be applied to the given data files have been added to the table since + * a starting snapshot. + * + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param dataFilter a data filter + * @param dataFiles data files to validate have no new row deletes + * @param caseSensitive whether expression binding should be case-sensitive + */ + protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, + Expression dataFilter, Iterable dataFiles, + boolean caseSensitive) { // if there is no current table state, no files have been added - if (base.currentSnapshot() == null) { + if (base.currentSnapshot() == null || base.formatVersion() < 2) { return; } Pair, Set> history = - validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES); + validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); List deleteManifests = history.first(); - long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber(); - DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests) - .afterSequenceNumber(startingSequenceNumber) - .specsById(ops.current().specsById()) - .build(); + long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); + DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); for (DataFile dataFile : dataFiles) { // if any delete is found that applies to files written in or before the starting snapshot, fail @@ -316,9 +333,60 @@ protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startin } } + /** + * Validates that no delete files matching a filter have been added to the table since a starting snapshot. + * + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param dataFilter an expression used to find new conflicting delete files + * @param caseSensitive whether expression evaluation should be case-sensitive + */ + protected void validateNoNewDeleteFiles(TableMetadata base, Long startingSnapshotId, + Expression dataFilter, boolean caseSensitive) { + // if there is no current table state, no files have been added + if (base.currentSnapshot() == null || base.formatVersion() < 2) { + return; + } + + Pair, Set> history = + validationHistory(base, startingSnapshotId, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES); + List deleteManifests = history.first(); + + long startingSequenceNumber = startingSequenceNumber(base, startingSnapshotId); + DeleteFileIndex deletes = buildDeleteFileIndex(deleteManifests, startingSequenceNumber, dataFilter, caseSensitive); + + ValidationException.check(deletes.isEmpty(), + "Found new conflicting delete files that can apply to records matching %s: %s", + dataFilter, Iterables.transform(deletes.referencedDeleteFiles(), ContentFile::path)); + } + + private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) { + if (staringSnapshotId != null && metadata.snapshot(staringSnapshotId) != null) { + Snapshot startingSnapshot = metadata.snapshot(staringSnapshotId); + return startingSnapshot.sequenceNumber(); + } else { + return TableMetadata.INITIAL_SEQUENCE_NUMBER; + } + } + + private DeleteFileIndex buildDeleteFileIndex(List deleteManifests, long startingSequenceNumber, + Expression dataFilter, boolean caseSensitive) { + DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(ops.io(), deleteManifests) + .afterSequenceNumber(startingSequenceNumber) + .caseSensitive(caseSensitive) + .specsById(ops.current().specsById()); + + if (dataFilter != null) { + builder.filterData(dataFilter); + } + + return builder.build(); + } + @SuppressWarnings("CollectionUndefinedEquality") protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, - CharSequenceSet requiredDataFiles, boolean skipDeletes) { + CharSequenceSet requiredDataFiles, boolean skipDeletes, + Expression conflictDetectionFilter) { // if there is no current table state, no files have been removed if (base.currentSnapshot() == null) { return; @@ -339,6 +407,10 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI .specsById(base.specsById()) .ignoreExisting(); + if (conflictDetectionFilter != null) { + matchingDeletesGroup.filterData(conflictDetectionFilter); + } + try (CloseableIterator> deletes = matchingDeletesGroup.entries().iterator()) { if (deletes.hasNext()) { throw new ValidationException("Cannot commit, missing data files: %s", @@ -458,9 +530,13 @@ private void cleanUncommittedAppends(Set committed) { this.cachedNewManifest = null; } - if (cachedNewDeleteManifest != null && !committed.contains(cachedNewDeleteManifest)) { - deleteFile(cachedNewDeleteManifest.path()); - this.cachedNewDeleteManifest = null; + ListIterator deleteManifestsIterator = cachedNewDeleteManifests.listIterator(); + while (deleteManifestsIterator.hasNext()) { + ManifestFile deleteManifest = deleteManifestsIterator.next(); + if (!committed.contains(deleteManifest)) { + deleteFile(deleteManifest.path()); + deleteManifestsIterator.remove(); + } } // rewritten manifests are always owned by the table @@ -513,7 +589,7 @@ private ManifestFile newFilesAsManifest() { if (cachedNewManifest == null) { try { - ManifestWriter writer = newManifestWriter(writeSpec()); + ManifestWriter writer = newManifestWriter(dataSpec()); try { writer.addAll(newFiles); } finally { @@ -531,36 +607,43 @@ private ManifestFile newFilesAsManifest() { } private Iterable prepareDeleteManifests() { - if (newDeleteFiles.isEmpty()) { + if (newDeleteFilesBySpec.isEmpty()) { return ImmutableList.of(); } - return ImmutableList.of(newDeleteFilesAsManifest()); + return newDeleteFilesAsManifests(); } - private ManifestFile newDeleteFilesAsManifest() { - if (hasNewDeleteFiles && cachedNewDeleteManifest != null) { - deleteFile(cachedNewDeleteManifest.path()); - cachedNewDeleteManifest = null; + private List newDeleteFilesAsManifests() { + if (hasNewDeleteFiles && cachedNewDeleteManifests.size() > 0) { + for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) { + deleteFile(cachedNewDeleteManifest.path()); + } + // this triggers a rewrite of all delete manifests even if there is only one new delete file + // if there is a relevant use case in the future, the behavior can be optimized + cachedNewDeleteManifests.clear(); } - if (cachedNewDeleteManifest == null) { - try { - ManifestWriter writer = newDeleteManifestWriter(writeSpec()); + if (cachedNewDeleteManifests.isEmpty()) { + newDeleteFilesBySpec.forEach((specId, deleteFiles) -> { + PartitionSpec spec = ops.current().spec(specId); try { - writer.addAll(newDeleteFiles); - } finally { - writer.close(); + ManifestWriter writer = newDeleteManifestWriter(spec); + try { + writer.addAll(deleteFiles); + } finally { + writer.close(); + } + cachedNewDeleteManifests.add(writer.toManifestFile()); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest writer"); } + }); - this.cachedNewDeleteManifest = writer.toManifestFile(); - this.hasNewDeleteFiles = false; - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close manifest writer"); - } + this.hasNewDeleteFiles = false; } - return cachedNewDeleteManifest; + return cachedNewDeleteManifests; } private class DataFileFilterManager extends ManifestFilterManager { diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 56c1cda7cad9..fe20302278a1 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -50,7 +50,7 @@ class PropertiesUpdate implements UpdateProperties { @Override public UpdateProperties set(String key, String value) { Preconditions.checkNotNull(key, "Key cannot be null"); - Preconditions.checkNotNull(key, "Value cannot be null"); + Preconditions.checkNotNull(value, "Value cannot be null"); Preconditions.checkArgument(!removals.contains(key), "Cannot remove and update the same key: %s", key); diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java index 5b73fe8d30ed..8219585a9aef 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java @@ -47,8 +47,7 @@ class JdbcClientPool extends ClientPoolImpl { @Override protected Connection newClient() { try { - Properties dbProps = new Properties(); - properties.forEach((key, value) -> dbProps.put(key.replace(JdbcCatalog.PROPERTY_PREFIX, ""), value)); + Properties dbProps = JdbcUtil.filterAndRemovePrefix(properties, JdbcCatalog.PROPERTY_PREFIX); return DriverManager.getConnection(dbUrl, dbProps); } catch (SQLException e) { throw new UncheckedSQLException(e, "Failed to connect: %s", dbUrl); diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java index b1eacbf59652..0b558395a105 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -19,6 +19,8 @@ package org.apache.iceberg.jdbc; +import java.util.Map; +import java.util.Properties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Joiner; @@ -86,4 +88,15 @@ public static TableIdentifier stringToTableIdentifier(String tableNamespace, Str return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace), tableName); } + public static Properties filterAndRemovePrefix(Map properties, + String prefix) { + Properties result = new Properties(); + properties.forEach((key, value) -> { + if (key.startsWith(prefix)) { + result.put(key.substring(prefix.length()), value); + } + }); + + return result; + } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 16a6bfd7aee5..d584e6dd7a44 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.stream.LongStream; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -77,7 +78,7 @@ public class TableTestBase { .build(); // Equality delete files. static final DeleteFile FILE_A2_DELETES = FileMetadata.deleteFileBuilder(SPEC) - .ofEqualityDeletes(3) + .ofEqualityDeletes(1) .withPath("/path/to/data-a2-deletes.parquet") .withFileSizeInBytes(10) .withPartitionPath("data_bucket=0") @@ -364,6 +365,20 @@ void validateTableFiles(Table tbl, DataFile... expectedFiles) { Assert.assertEquals("Files should match", expectedFilePaths, actualFilePaths); } + void validateTableDeleteFiles(Table tbl, DeleteFile... expectedFiles) { + Set expectedFilePaths = Sets.newHashSet(); + for (DeleteFile file : expectedFiles) { + expectedFilePaths.add(file.path()); + } + Set actualFilePaths = Sets.newHashSet(); + for (FileScanTask task : tbl.newScan().planFiles()) { + for (DeleteFile file : task.deletes()) { + actualFilePaths.add(file.path()); + } + } + Assert.assertEquals("Delete files should match", expectedFilePaths, actualFilePaths); + } + List paths(DataFile... dataFiles) { List paths = Lists.newArrayListWithExpectedSize(dataFiles.length); for (DataFile file : dataFiles) { @@ -433,6 +448,26 @@ void validateDeleteManifest(ManifestFile manifest, Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext()); } + protected DataFile newDataFile(String partitionPath) { + return DataFiles.builder(table.spec()) + .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath(partitionPath) + .withRecordCount(1) + .build(); + } + + protected DeleteFile newDeleteFile(int specId, String partitionPath) { + PartitionSpec spec = table.specs().get(specId); + return FileMetadata.deleteFileBuilder(spec) + .ofPositionDeletes() + .withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath(partitionPath) + .withRecordCount(1) + .build(); + } + static void validateManifestEntries(ManifestFile manifest, Iterator ids, Iterator expectedFiles, diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java index a008f9ba7df4..eb393cf33589 100644 --- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java @@ -159,6 +159,18 @@ public void loadCustomFileIO_badClass() { () -> CatalogUtil.loadFileIO(TestFileIONotImpl.class.getName(), Maps.newHashMap(), null)); } + @Test + public void buildCustomCatalog_withTypeSet() { + Map options = new HashMap<>(); + options.put(CatalogProperties.CATALOG_IMPL, "CustomCatalog"); + options.put(CatalogUtil.ICEBERG_CATALOG_TYPE, "hive"); + Configuration hadoopConf = new Configuration(); + String name = "custom"; + + AssertHelpers.assertThrows("Should complain about both configs being set", IllegalArgumentException.class, + "both type and catalog-impl are set", () -> CatalogUtil.buildIcebergCatalog(name, options, hadoopConf)); + } + public static class TestCatalog extends BaseMetastoreCatalog { private String catalogName; diff --git a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java index 26358864a76a..3d2018fd6034 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.types.Types; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -71,6 +72,14 @@ public class TestOverwriteWithValidation extends TableTestBase { )) .build(); + private static final DeleteFile FILE_DAY_1_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-1-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-08") + .withRecordCount(1) + .build(); + private static final DataFile FILE_DAY_2 = DataFiles .builder(PARTITION_SPEC) .withPath("/path/to/data-2.parquet") @@ -85,6 +94,22 @@ public class TestOverwriteWithValidation extends TableTestBase { )) .build(); + private static final DeleteFile FILE_DAY_2_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-2-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-09") + .withRecordCount(1) + .build(); + + private static final DeleteFile FILE_DAY_2_POS_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-2-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-09") + .withRecordCount(1) + .build(); + private static final DataFile FILE_DAY_2_MODIFIED = DataFiles .builder(PARTITION_SPEC) .withPath("/path/to/data-3.parquet") @@ -113,6 +138,21 @@ public class TestOverwriteWithValidation extends TableTestBase { )) .build(); + private static final DeleteFile FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES = FileMetadata.deleteFileBuilder(PARTITION_SPEC) + .ofEqualityDeletes() + .withPath("/path/to/data-3-eq-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("date=2018-06-09") + .withRecordCount(1) + .withMetrics(new Metrics(1L, + null, // no column sizes + ImmutableMap.of(1, 1L, 2, 1L), // value count + ImmutableMap.of(1, 0L, 2, 0L), // null count + ImmutableMap.of(1, longToBuffer(10L)), // lower bounds + ImmutableMap.of(1, longToBuffer(10L)) // upper bounds + )) + .build(); + private static final Expression EXPRESSION_DAY_2 = equal("date", "2018-06-09"); private static final Expression EXPRESSION_DAY_2_ID_RANGE = and( @@ -611,4 +651,218 @@ public void testTransactionIncompatibleAdditionValidated() { Assert.assertEquals("Should not create a new snapshot", committedSnapshotId, table.currentSnapshot().snapshotId()); } + + @Test + public void testConcurrentConflictingPositionDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(EXPRESSION_DAY_2) + .validateNoConflictingData() + .validateNoConflictingDeletes(); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_POS_DELETES) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "found new delete", + overwrite::commit); + } + + @Test + public void testConcurrentConflictingPositionDeletesOverwriteByFilter() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .overwriteByRowFilter(EXPRESSION_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(EXPRESSION_DAY_2) + .validateNoConflictingData() + .validateNoConflictingDeletes(); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_POS_DELETES) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete", + overwrite::commit); + } + + @Test + public void testConcurrentNonConflictingPositionDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(EXPRESSION_DAY_2) + .validateNoConflictingData() + .validateNoConflictingDeletes(); + + table.newRowDelta() + .addDeletes(FILE_DAY_1_POS_DELETES) + .commit(); + + overwrite.commit(); + + validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED); + validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES); + } + + @Test + public void testConcurrentNonConflictingPositionDeletesOverwriteByFilter() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .overwriteByRowFilter(EXPRESSION_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(EXPRESSION_DAY_2) + .validateNoConflictingData() + .validateNoConflictingDeletes(); + + table.newRowDelta() + .addDeletes(FILE_DAY_1_POS_DELETES) + .commit(); + + overwrite.commit(); + + validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED); + validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES); + } + + @Test + public void testConcurrentConflictingEqualityDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(EXPRESSION_DAY_2) + .validateNoConflictingData() + .validateNoConflictingDeletes(); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_EQ_DELETES) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "found new delete", + overwrite::commit); + } + + @Test + public void testConcurrentNonConflictingEqualityDeletes() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_2) + .appendFile(FILE_DAY_2_ANOTHER_RANGE) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .deleteFile(FILE_DAY_2) + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(EXPRESSION_DAY_2_ID_RANGE) + .validateNoConflictingData() + .validateNoConflictingDeletes(); + + table.newRowDelta() + .addDeletes(FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES) + .commit(); + + overwrite.commit(); + + validateTableFiles(table, FILE_DAY_2_ANOTHER_RANGE, FILE_DAY_2_MODIFIED); + validateTableDeleteFiles(table, FILE_DAY_2_ANOTHER_RANGE_EQ_DELETES); + } + + @Test + public void testOverwriteByFilterInheritsConflictDetectionFilter() { + Assume.assumeTrue(formatVersion == 2); + + Assert.assertNull("Should be empty table", table.currentSnapshot()); + + table.newAppend() + .appendFile(FILE_DAY_1) + .appendFile(FILE_DAY_2) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + OverwriteFiles overwrite = table.newOverwrite() + .overwriteByRowFilter(EXPRESSION_DAY_2) + .validateAddedFilesMatchOverwriteFilter() + .addFile(FILE_DAY_2_MODIFIED) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingData() + .validateNoConflictingDeletes(); + + table.newRowDelta() + .addDeletes(FILE_DAY_1_POS_DELETES) + .commit(); + + overwrite.commit(); + + validateTableFiles(table, FILE_DAY_1, FILE_DAY_2_MODIFIED); + validateTableDeleteFiles(table, FILE_DAY_1_POS_DELETES); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 6a5c43cd25ee..11b745eef137 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -19,14 +19,26 @@ package org.apache.iceberg; +import java.util.Map; +import java.util.Set; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; +import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.ADDED_POS_DELETES_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP; +import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_PREFIX; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; + public class TestRowDelta extends V2TableTestBase { @Test public void testAddDeleteFile() { @@ -629,4 +641,497 @@ public void testFastAppendDoesNotRemoveStaleDeleteFiles() { files(FILE_A_DELETES), statuses(Status.ADDED)); } + + @Test + public void testValidateDataFilesExistWithConflictDetectionFilter() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // add a data file to partition B + DataFile dataFile2 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile2) + .commit(); + + // use this snapshot as the starting snapshot in rowDelta + Snapshot baseSnapshot = table.currentSnapshot(); + + // add a delete file for partition A + DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile) + .validateDataFilesExist(ImmutableList.of(dataFile1.path())) + .validateDeletedFiles() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter); + + // concurrently delete the file for partition B + table.newDelete() + .deleteFile(dataFile2) + .commit(); + + // commit the delta for partition A + rowDelta.commit(); + + Assert.assertEquals("Table should have one new delete manifest", + 1, table.currentSnapshot().deleteManifests().size()); + ManifestFile deletes = table.currentSnapshot().deleteManifests().get(0); + validateDeleteManifest(deletes, + seqs(4), + ids(table.currentSnapshot().snapshotId()), + files(deleteFile), + statuses(Status.ADDED)); + } + + @Test + public void testValidateDataFilesDoNotExistWithConflictDetectionFilter() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // use this snapshot as the starting snapshot in rowDelta + Snapshot baseSnapshot = table.currentSnapshot(); + + // add a delete file for partition A + DeleteFile deleteFile = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile) + .validateDataFilesExist(ImmutableList.of(dataFile1.path())) + .validateDeletedFiles() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter); + + // concurrently delete the file for partition A + table.newDelete() + .deleteFile(dataFile1) + .commit(); + + AssertHelpers.assertThrows("Should fail to add deletes because data file is missing", + ValidationException.class, "Cannot commit, missing data files", + rowDelta::commit); + } + + @Test + public void testAddDeleteFilesMultipleSpecs() { + // enable partition summaries + table.updateProperties() + .set(TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, "10") + .commit(); + + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + table.newAppend() + .appendFile(firstSnapshotDataFile) + .commit(); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .commit(); + + Assert.assertTrue("Spec must be unpartitioned", table.spec().isUnpartitioned()); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + table.newAppend() + .appendFile(secondSnapshotDataFile) + .commit(); + + // evolve the spec and add a new partition field + table.updateSpec() + .addField("data") + .commit(); + + // append a data file with the new spec + DataFile thirdSnapshotDataFile = newDataFile("data=abc"); + table.newAppend() + .appendFile(thirdSnapshotDataFile) + .commit(); + + Assert.assertEquals("Should have 3 specs", 3, table.specs().size()); + + // commit a row delta with 1 data file and 3 delete files where delete files have different specs + DataFile dataFile = newDataFile("data=xyz"); + DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + DeleteFile thirdDeleteFile = newDeleteFile(thirdSnapshotDataFile.specId(), "data=abc"); + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(firstDeleteFile) + .addDeletes(secondDeleteFile) + .addDeletes(thirdDeleteFile) + .commit(); + + Snapshot snapshot = table.currentSnapshot(); + Assert.assertEquals("Commit should produce sequence number 4", 4, snapshot.sequenceNumber()); + Assert.assertEquals("Last sequence number should be 4", 4, table.ops().current().lastSequenceNumber()); + Assert.assertEquals("Delta commit should be 'overwrite'", DataOperations.OVERWRITE, snapshot.operation()); + + Map summary = snapshot.summary(); + + Assert.assertEquals("Should change 4 partitions", "4", summary.get(CHANGED_PARTITION_COUNT_PROP)); + Assert.assertEquals("Should add 1 data file", "1", summary.get(ADDED_FILES_PROP)); + Assert.assertEquals("Should have 4 data files", "4", summary.get(TOTAL_DATA_FILES_PROP)); + Assert.assertEquals("Should add 3 delete files", "3", summary.get(ADDED_DELETE_FILES_PROP)); + Assert.assertEquals("Should have 3 delete files", "3", summary.get(TOTAL_DELETE_FILES_PROP)); + Assert.assertEquals("Should add 3 position deletes", "3", summary.get(ADDED_POS_DELETES_PROP)); + Assert.assertEquals("Should have 3 position deletes", "3", summary.get(TOTAL_POS_DELETES_PROP)); + + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX).contains(ADDED_DELETE_FILES_PROP + "=1")); + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX + "data_bucket=0").contains(ADDED_DELETE_FILES_PROP + "=1")); + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX + "data=abc").contains(ADDED_DELETE_FILES_PROP + "=1")); + Assert.assertTrue("Partition metrics must be correct", + summary.get(CHANGED_PARTITION_PREFIX + "data=xyz").contains(ADDED_FILES_PROP + "=1")); + + // 3 appends + 1 row delta + Assert.assertEquals("Should have 4 data manifest", 4, snapshot.dataManifests().size()); + validateManifest( + snapshot.dataManifests().get(0), + seqs(4), + ids(snapshot.snapshotId()), + files(dataFile), + statuses(Status.ADDED)); + + // each delete file goes into a separate manifest as the specs are different + Assert.assertEquals("Should produce 3 delete manifest", 3, snapshot.deleteManifests().size()); + + ManifestFile firstDeleteManifest = snapshot.deleteManifests().get(2); + Assert.assertEquals("Spec must match", firstSnapshotDataFile.specId(), firstDeleteManifest.partitionSpecId()); + validateDeleteManifest( + firstDeleteManifest, + seqs(4), + ids(snapshot.snapshotId()), + files(firstDeleteFile), + statuses(Status.ADDED)); + + ManifestFile secondDeleteManifest = snapshot.deleteManifests().get(1); + Assert.assertEquals("Spec must match", secondSnapshotDataFile.specId(), secondDeleteManifest.partitionSpecId()); + validateDeleteManifest( + secondDeleteManifest, + seqs(4), + ids(snapshot.snapshotId()), + files(secondDeleteFile), + statuses(Status.ADDED)); + + ManifestFile thirdDeleteManifest = snapshot.deleteManifests().get(0); + Assert.assertEquals("Spec must match", thirdSnapshotDataFile.specId(), thirdDeleteManifest.partitionSpecId()); + validateDeleteManifest( + thirdDeleteManifest, + seqs(4), + ids(snapshot.snapshotId()), + files(thirdDeleteFile), + statuses(Status.ADDED)); + } + + @Test + public void testManifestMergingMultipleSpecs() { + // make sure we enable manifest merging + table.updateProperties() + .set(TableProperties.MANIFEST_MERGE_ENABLED, "true") + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2") + .commit(); + + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + table.newAppend() + .appendFile(firstSnapshotDataFile) + .commit(); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .commit(); + + Assert.assertTrue("Spec must be unpartitioned", table.spec().isUnpartitioned()); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + table.newAppend() + .appendFile(secondSnapshotDataFile) + .commit(); + + // commit two delete files to two specs in a single operation + DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + + table.newRowDelta() + .addDeletes(firstDeleteFile) + .addDeletes(secondDeleteFile) + .commit(); + + Snapshot thirdSnapshot = table.currentSnapshot(); + + // 2 appends and 1 row delta where delete files belong to different specs + Assert.assertEquals("Should have 2 data manifest", 2, thirdSnapshot.dataManifests().size()); + Assert.assertEquals("Should have 2 delete manifest", 2, thirdSnapshot.deleteManifests().size()); + + // commit two more delete files to the same specs to trigger merging + DeleteFile thirdDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile fourthDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + + table.newRowDelta() + .addDeletes(thirdDeleteFile) + .addDeletes(fourthDeleteFile) + .commit(); + + Snapshot fourthSnapshot = table.currentSnapshot(); + + // make sure merging respects spec boundaries + Assert.assertEquals("Should have 2 data manifest", 2, fourthSnapshot.dataManifests().size()); + Assert.assertEquals("Should have 2 delete manifest", 2, fourthSnapshot.deleteManifests().size()); + + ManifestFile firstDeleteManifest = fourthSnapshot.deleteManifests().get(1); + Assert.assertEquals("Spec must match", firstSnapshotDataFile.specId(), firstDeleteManifest.partitionSpecId()); + validateDeleteManifest( + firstDeleteManifest, + seqs(4, 3), + ids(fourthSnapshot.snapshotId(), thirdSnapshot.snapshotId()), + files(thirdDeleteFile, firstDeleteFile), + statuses(Status.ADDED, Status.EXISTING)); + + ManifestFile secondDeleteManifest = fourthSnapshot.deleteManifests().get(0); + Assert.assertEquals("Spec must match", secondSnapshotDataFile.specId(), secondDeleteManifest.partitionSpecId()); + validateDeleteManifest( + secondDeleteManifest, + seqs(4, 3), + ids(fourthSnapshot.snapshotId(), thirdSnapshot.snapshotId()), + files(fourthDeleteFile, secondDeleteFile), + statuses(Status.ADDED, Status.EXISTING)); + } + + @Test + public void testAbortMultipleSpecs() { + // append a partitioned data file + DataFile firstSnapshotDataFile = newDataFile("data_bucket=0"); + table.newAppend() + .appendFile(firstSnapshotDataFile) + .commit(); + + // remove the only partition field to make the spec unpartitioned + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .commit(); + + Assert.assertTrue("Spec must be unpartitioned", table.spec().isUnpartitioned()); + + // append an unpartitioned data file + DataFile secondSnapshotDataFile = newDataFile(""); + table.newAppend() + .appendFile(secondSnapshotDataFile) + .commit(); + + // prepare two delete files that belong to different specs + DeleteFile firstDeleteFile = newDeleteFile(firstSnapshotDataFile.specId(), "data_bucket=0"); + DeleteFile secondDeleteFile = newDeleteFile(secondSnapshotDataFile.specId(), ""); + + // capture all deletes + Set deletedFiles = Sets.newHashSet(); + + RowDelta rowDelta = table.newRowDelta() + .addDeletes(firstDeleteFile) + .addDeletes(secondDeleteFile) + .deleteWith(deletedFiles::add) + .validateDeletedFiles() + .validateDataFilesExist(ImmutableList.of(firstSnapshotDataFile.path())); + + rowDelta.apply(); + + // perform a conflicting concurrent operation + table.newDelete() + .deleteFile(firstSnapshotDataFile) + .commit(); + + AssertHelpers.assertThrows("Should fail to commit row delta", + ValidationException.class, "Cannot commit, missing data files", + rowDelta::commit); + + // we should clean up 1 manifest list and 2 delete manifests + Assert.assertEquals("Should delete 3 files", 3, deletedFiles.size()); + } + + @Test + public void testConcurrentConflictingRowDelta() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + // mock a MERGE operation with serializable isolation + RowDelta rowDelta = table.newRowDelta() + .addRows(FILE_B) + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles() + .validateNoConflictingDeleteFiles(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .validateNoConflictingAppends(conflictDetectionFilter) + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete files", + rowDelta::commit); + } + + @Test + public void testConcurrentConflictingRowDeltaWithoutAppendValidation() { + table.newAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.alwaysTrue(); + + // mock a MERGE operation with snapshot isolation (i.e. no append validation) + RowDelta rowDelta = table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDeleteFiles(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(firstSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles() + .commit(); + + AssertHelpers.assertThrows("Should reject commit", + ValidationException.class, "Found new conflicting delete files", + rowDelta::commit); + } + + @Test + public void testConcurrentNonConflictingRowDelta() { + // change the spec to be partitioned by data + table.updateSpec() + .removeField(Expressions.bucket("data", 16)) + .addField(Expressions.ref("data")) + .commit(); + + // add a data file to partition A + DataFile dataFile1 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile1) + .commit(); + + // add a data file to partition B + DataFile dataFile2 = DataFiles.builder(table.spec()) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newAppend() + .appendFile(dataFile2) + .commit(); + + Snapshot baseSnapshot = table.currentSnapshot(); + + Expression conflictDetectionFilter = Expressions.equal("data", "a"); + + // add a delete file for partition A + DeleteFile deleteFile1 = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=a") + .withRecordCount(1) + .build(); + + // mock a DELETE operation with serializable isolation + RowDelta rowDelta = table.newRowDelta() + .addDeletes(deleteFile1) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .conflictDetectionFilter(conflictDetectionFilter) + .validateNoConflictingDataFiles() + .validateNoConflictingDeleteFiles(); + + // add a delete file for partition B + DeleteFile deleteFile2 = FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/data-b-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=b") + .withRecordCount(1) + .build(); + + table.newRowDelta() + .addDeletes(deleteFile2) + .validateFromSnapshot(baseSnapshot.snapshotId()) + .commit(); + + rowDelta.commit(); + + validateTableDeleteFiles(table, deleteFile1, deleteFile2); + } } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java new file mode 100644 index 000000000000..92fa90eb02d8 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.jdbc; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class TestJdbcUtil { + + @Test + public void testFilterAndRemovePrefix() { + Map input = new HashMap<>(); + input.put("warehouse", "/tmp/warehouse"); + input.put("user", "foo"); + input.put("jdbc.user", "bar"); + input.put("jdbc.pass", "secret"); + input.put("jdbc.jdbc.abcxyz", "abcxyz"); + + Properties expected = new Properties(); + expected.put("user", "bar"); + expected.put("pass", "secret"); + expected.put("jdbc.abcxyz", "abcxyz"); + + Properties actual = JdbcUtil.filterAndRemovePrefix(input, "jdbc."); + + Assertions.assertThat(expected).isEqualTo(actual); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a8eb13cdfa68..148f9d90c035 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -134,9 +134,14 @@ private List> applyEqDeletes() { Iterable> deleteRecords = Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema)); + + // copy the delete records because they will be held in a set + CloseableIterable records = CloseableIterable.transform( + CloseableIterable.concat(deleteRecords), Record::copy); + StructLikeSet deleteSet = Deletes.toEqualitySet( - // copy the delete records because they will be held in a set - CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy), + CloseableIterable.transform( + records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)), deleteSchema.asStruct()); Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 70ac77473c5d..69b0a572ad73 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -20,6 +20,7 @@ package org.apache.iceberg.data; import java.io.IOException; +import java.time.LocalDate; import java.util.List; import java.util.Set; import org.apache.iceberg.DataFile; @@ -34,6 +35,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; @@ -51,17 +53,30 @@ public abstract class DeleteReadTests { Types.NestedField.required(2, "data", Types.StringType.get()) ); + public static final Schema DATE_SCHEMA = new Schema( + Types.NestedField.required(1, "dt", Types.DateType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "id", Types.IntegerType.get()) + ); + // Partition spec used to create tables public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .bucket("data", 16) .build(); + public static final PartitionSpec DATE_SPEC = PartitionSpec.builderFor(DATE_SCHEMA) + .day("dt") + .build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); - private String tableName = null; + protected String tableName = null; + protected String dateTableName = null; protected Table table = null; + protected Table dateTable = null; private List records = null; + private List dateRecords = null; private DataFile dataFile = null; @Before @@ -90,6 +105,46 @@ public void writeTestDataFile() throws IOException { @After public void cleanup() throws IOException { dropTable("test"); + dropTable("test2"); + } + + private void initDateTable() throws IOException { + dropTable("test2"); + this.dateTableName = "test2"; + this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); + + GenericRecord record = GenericRecord.create(dateTable.schema()); + + this.dateRecords = Lists.newArrayList( + record.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), + record.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), + record.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3), + record.copy("dt", LocalDate.parse("2021-09-04"), "data", "d", "id", 4), + record.copy("dt", LocalDate.parse("2021-09-05"), "data", "e", "id", 5)); + + DataFile dataFile1 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dateRecords.subList(0, 1)); + DataFile dataFile2 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dateRecords.subList(1, 2)); + DataFile dataFile3 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dateRecords.subList(2, 3)); + DataFile dataFile4 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-04"))), dateRecords.subList(3, 4)); + DataFile dataFile5 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-05"))), dateRecords.subList(4, 5)); + + dateTable.newAppend() + .appendFile(dataFile1) + .appendFile(dataFile2) + .appendFile(dataFile3) + .appendFile(dataFile4) + .appendFile(dataFile5) + .commit(); } protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException; @@ -119,12 +174,47 @@ public void testEqualityDeletes() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); } + @Test + public void testEqualityDateDeletes() throws IOException { + initDateTable(); + + Schema deleteRowSchema = dateTable.schema().select("*"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), + dataDelete.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), + dataDelete.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3) + ); + + DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dataDeletes.subList(0, 1), deleteRowSchema); + DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dataDeletes.subList(1, 2), deleteRowSchema); + DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dataDeletes.subList(2, 3), deleteRowSchema); + + dateTable.newRowDelta() + .addDeletes(eqDeletes1) + .addDeletes(eqDeletes2) + .addDeletes(eqDeletes3) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(dateTable, dateRecords, 1, 2, 3); + + StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + @Test public void testEqualityDeletesWithRequiredEqColumn() throws IOException { Schema deleteRowSchema = table.schema().select("data"); @@ -142,7 +232,7 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id"); + StructLikeSet expected = selectColumns(rowSetWithoutIds(table, records, 29, 89, 122), "id"); StructLikeSet actual = rowSet(tableName, table, "id"); if (expectPruned()) { @@ -180,7 +270,7 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122, 144); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -202,7 +292,7 @@ public void testPositionDeletes() throws IOException { .validateDataFilesExist(posDeletes.second()) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -235,7 +325,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { .validateDataFilesExist(posDeletes.second()) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -269,7 +359,7 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { .addDeletes(idEqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -306,7 +396,7 @@ public void testEqualityDeleteByNull() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(131); + StructLikeSet expected = rowSetWithoutIds(table, records, 131); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -321,11 +411,12 @@ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { return set; } - private StructLikeSet rowSetWithoutIds(int... idsToRemove) { + private static StructLikeSet rowSetWithoutIds(Table table, List recordList, int... idsToRemove) { Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - records.stream() + recordList.stream() .filter(row -> !deletedIds.contains(row.getField("id"))) + .map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record)) .forEach(set::add); return set; } diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java index 72d48831de06..e92c0daec385 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TestTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -48,7 +49,8 @@ protected void dropTable(String name) { public StructLikeSet rowSet(String name, Table table, String... columns) throws IOException { StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { - reader.forEach(set::add); + Iterables.addAll(set, CloseableIterable.transform( + reader, record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record))); } return set; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index ff9174a84399..010df8cf5da2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -50,7 +50,6 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; @@ -282,9 +281,14 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri // txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the // merged one will lead to the incorrect delete semantic. WriteResult result = e.getValue(); - RowDelta rowDelta = table.newRowDelta() - .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())) - .validateDeletedFiles(); + + // Row delta validations are not needed for streaming changes that write equality deletes. Equality deletes + // are applied to data in all previous sequence numbers, so retries may push deletes further in the future, + // but do not affect correctness. Position deletes committed to the table in this path are used only to delete + // rows from data files that are being added in this commit. There is no way for data files added along with + // the delete files to be concurrently removed, so there is no need to validate the files referenced by the + // position delete files that are being committed. + RowDelta rowDelta = table.newRowDelta(); int numDataFiles = result.dataFiles().length; Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 34785cfb6a34..e16940fc4875 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -49,7 +48,6 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TableTestBase; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestHelpers; @@ -679,66 +677,6 @@ public void testDeleteFiles() throws Exception { } } - @Test - public void testValidateDataFileExist() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); - long timestamp = 0; - long checkpoint = 10; - JobID jobId = new JobID(); - FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); - - RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); - DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1)); - - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { - harness.setup(); - harness.open(); - - // Txn#1: insert the row <1, 'aaa'> - harness.processElement(WriteResult.builder() - .addDataFiles(dataFile1) - .build(), - ++timestamp); - harness.snapshot(checkpoint, ++timestamp); - harness.notifyOfCompletedCheckpoint(checkpoint); - - // Txn#2: Overwrite the committed data-file-1 - RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); - DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2)); - new TestTableLoader(tablePath) - .loadTable() - .newOverwrite() - .addFile(dataFile2) - .deleteFile(dataFile1) - .commit(); - } - - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { - harness.setup(); - harness.open(); - - // Txn#3: position-delete the <1, 'aaa'> (NOT committed). - DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory, - "pos-delete-file-1", - ImmutableList.of(Pair.of(dataFile1.path(), 0L))); - harness.processElement(WriteResult.builder() - .addDeleteFiles(deleteFile1) - .addReferencedDataFiles(dataFile1.path()) - .build(), - ++timestamp); - harness.snapshot(++checkpoint, ++timestamp); - - // Txn#3: validate will be failure when committing. - final long currentCheckpointId = checkpoint; - AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.", - ValidationException.class, "Cannot commit, missing data files", - () -> { - harness.notifyOfCompletedCheckpoint(currentCheckpointId); - return null; - }); - } - } - @Test public void testCommitTwoCheckpointsInSingleTxn() throws Exception { Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); diff --git a/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java new file mode 100644 index 000000000000..00f389eee792 --- /dev/null +++ b/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.ColumnarSplit; +import org.apache.hadoop.hive.ql.io.LlapAwareSplit; +import org.apache.hadoop.hive.ql.io.SyntheticFileId; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.orc.OrcProto; +import org.apache.orc.impl.OrcTail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * In order to fix some compatibility issues with ORC support with Hive 3.x and the shaded ORC libraries, + * this class has been copied from Hive 3.x source code. However, this class should be removed once + * Hive 4 is out. + */ +public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit { + private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class); + private OrcTail orcTail; + private boolean hasFooter; + /** + * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} + */ + private boolean isOriginal; + private boolean hasBase; + // partition root + private Path rootDir; + private final List deltas = new ArrayList<>(); + private long projColsUncompressedSize; + private transient Object fileKey; + private long fileLen; + + static final int HAS_SYNTHETIC_FILEID_FLAG = 16; + static final int HAS_LONG_FILEID_FLAG = 8; + static final int BASE_FLAG = 4; + static final int ORIGINAL_FLAG = 2; + static final int FOOTER_FLAG = 1; + + protected OrcSplit() { + // The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. + // This constructor is used to create the object and then call readFields() + // so just pass nulls to this super constructor. + super(null, 0, 0, (String[]) null); + } + + public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts, + OrcTail orcTail, boolean isOriginal, boolean hasBase, + List deltas, long projectedDataSize, long fileLen, Path rootDir) { + super(path, offset, length, hosts); + // For HDFS, we could avoid serializing file ID and just replace the path with inode-based + // path. However, that breaks bunch of stuff because Hive later looks up things by split path. + this.fileKey = fileId; + this.orcTail = orcTail; + hasFooter = this.orcTail != null; + this.isOriginal = isOriginal; + this.hasBase = hasBase; + this.rootDir = rootDir; + this.deltas.addAll(deltas); + this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize; + // setting file length to Long.MAX_VALUE will let orc reader read file length from file system + this.fileLen = fileLen <= 0 ? Long.MAX_VALUE : fileLen; + } + + @Override + public void write(DataOutput out) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + // serialize path, offset, length using FileSplit + super.write(dos); + int required = bos.size(); + + // write addition payload required for orc + writeAdditionalPayload(dos); + int additional = bos.size() - required; + + out.write(bos.toByteArray()); + if (LOG.isTraceEnabled()) { + LOG.trace("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.", + additional, required); + } + } + + private void writeAdditionalPayload(final DataOutputStream out) throws IOException { + boolean isFileIdLong = fileKey instanceof Long; + boolean isFileIdWritable = fileKey instanceof Writable; + int flags = (hasBase ? BASE_FLAG : 0) | + (isOriginal ? ORIGINAL_FLAG : 0) | + (hasFooter ? FOOTER_FLAG : 0) | + (isFileIdLong ? HAS_LONG_FILEID_FLAG : 0) | + (isFileIdWritable ? HAS_SYNTHETIC_FILEID_FLAG : 0); + out.writeByte(flags); + out.writeInt(deltas.size()); + for (AcidInputFormat.DeltaMetaData delta : deltas) { + delta.write(out); + } + if (hasFooter) { + OrcProto.FileTail fileTail = orcTail.getMinimalFileTail(); + byte[] tailBuffer = fileTail.toByteArray(); + int tailLen = tailBuffer.length; + WritableUtils.writeVInt(out, tailLen); + out.write(tailBuffer); + } + if (isFileIdLong) { + out.writeLong(((Long) fileKey).longValue()); + } else if (isFileIdWritable) { + ((Writable) fileKey).write(out); + } + out.writeLong(fileLen); + out.writeUTF(rootDir.toString()); + } + + @Override + public void readFields(DataInput in) throws IOException { + // deserialize path, offset, length using FileSplit + super.readFields(in); + + byte flags = in.readByte(); + hasFooter = (FOOTER_FLAG & flags) != 0; + isOriginal = (ORIGINAL_FLAG & flags) != 0; + hasBase = (BASE_FLAG & flags) != 0; + boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0; + boolean hasWritableFileId = (HAS_SYNTHETIC_FILEID_FLAG & flags) != 0; + if (hasLongFileId && hasWritableFileId) { + throw new IOException("Invalid split - both file ID types present"); + } + + deltas.clear(); + int numDeltas = in.readInt(); + for (int i = 0; i < numDeltas; i++) { + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData(); + dmd.readFields(in); + deltas.add(dmd); + } + if (hasFooter) { + int tailLen = WritableUtils.readVInt(in); + byte[] tailBuffer = new byte[tailLen]; + in.readFully(tailBuffer); + OrcProto.FileTail fileTail = OrcProto.FileTail.parseFrom(tailBuffer); + orcTail = new OrcTail(fileTail, null); + } + if (hasLongFileId) { + fileKey = in.readLong(); + } else if (hasWritableFileId) { + SyntheticFileId fileId = new SyntheticFileId(); + fileId.readFields(in); + this.fileKey = fileId; + } + fileLen = in.readLong(); + rootDir = new Path(in.readUTF()); + } + + public OrcTail getOrcTail() { + return orcTail; + } + + public boolean hasFooter() { + return hasFooter; + } + + /** + * @return {@code true} if file schema doesn't have Acid metadata columns + * Such file may be in a delta_x_y/ or base_x due to being added via + * "load data" command. It could be at partition|table root due to table having + * been converted from non-acid to acid table. It could even be something like + * "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" if it was written by an + * "insert into t select ... from A union all select ... from B" + */ + public boolean isOriginal() { + return isOriginal; + } + + public boolean hasBase() { + return hasBase; + } + + public Path getRootDir() { + return rootDir; + } + + public List getDeltas() { + return deltas; + } + + public long getFileLength() { + return fileLen; + } + + /** + * If this method returns true, then for sure it is ACID. + * However, if it returns false.. it could be ACID or non-ACID. + * + * @return true if is ACID + */ + public boolean isAcid() { + return hasBase || deltas.size() > 0; + } + + public long getProjectedColumnsUncompressedSize() { + return projColsUncompressedSize; + } + + public Object getFileKey() { + return fileKey; + } + + @Override + public long getColumnarProjectionSize() { + return projColsUncompressedSize; + } + + @Override + public boolean canUseLlapIo(Configuration conf) { + final boolean hasDelta = deltas != null && !deltas.isEmpty(); + final boolean isAcidRead = AcidUtils.isFullAcidScan(conf); + final boolean isVectorized = HiveConf.getBoolVar(conf, ConfVars.HIVE_VECTORIZATION_ENABLED); + Boolean isSplitUpdate = null; + if (isAcidRead) { + final AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(conf); + isSplitUpdate = acidOperationalProperties.isSplitUpdate(); + } + + if (isOriginal) { + if (!isAcidRead && !hasDelta) { + // Original scan only + return true; + } + } else { + boolean isAcidEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED); + if (isAcidEnabled && isAcidRead && hasBase && isVectorized) { + if (hasDelta) { + if (isSplitUpdate) { // Base with delete deltas + return true; + } + } else { + // Base scan only + return true; + } + } + } + return false; + } + + @Override + public String toString() { + return "OrcSplit [" + getPath() + ", start=" + getStart() + ", length=" + getLength() + + ", isOriginal=" + isOriginal + ", fileLength=" + fileLen + ", hasFooter=" + hasFooter + + ", hasBase=" + hasBase + ", deltas=" + (deltas == null ? 0 : deltas.size()) + "]"; + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java index b5cc63b42955..0bf04731124b 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java +++ b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java @@ -19,7 +19,6 @@ package org.apache.iceberg.mr; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -39,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; /** @@ -150,7 +150,7 @@ public static Table createTable(Configuration conf, Properties props) { String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); // Create a table property map without the controlling properties - Map map = new HashMap<>(props.size()); + Map map = Maps.newHashMapWithExpectedSize(props.size()); for (Object key : props.keySet()) { if (!PROPERTIES_TO_REMOVE.contains(key)) { map.put(key.toString(), props.get(key).toString()); @@ -202,7 +202,15 @@ public static boolean dropTable(Configuration conf, Properties props) { */ public static boolean hiveCatalog(Configuration conf, Properties props) { String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); - return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(getCatalogType(conf, catalogName)); + String catalogType = getCatalogType(conf, catalogName); + if (catalogType != null) { + return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType); + } + catalogType = getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME); + if (catalogType != null) { + return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType); + } + return getCatalogProperties(conf, catalogName, catalogType).get(CatalogProperties.CATALOG_IMPL) == null; } @VisibleForTesting @@ -279,9 +287,7 @@ private static String getCatalogType(Configuration conf, String catalogName) { } } else { String catalogType = conf.get(InputFormatConfig.CATALOG); - if (catalogType == null) { - return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; - } else if (catalogType.equals(LOCATION)) { + if (catalogType != null && catalogType.equals(LOCATION)) { return NO_CATALOG_TYPE; } else { return catalogType; diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java index 458affdd7c60..47e9f3e0537d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -33,6 +33,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.schema.SchemaWithPartnerVisitor; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types.ListType; @@ -232,7 +233,7 @@ private static class FixNameMappingObjectInspectorPair extends ObjectInspectorPa FixNameMappingObjectInspectorPair(Schema schema, ObjectInspectorPair pair) { super(pair.writerInspector(), pair.sourceInspector()); - this.sourceNameMap = new HashMap<>(schema.columns().size()); + this.sourceNameMap = Maps.newHashMapWithExpectedSize(schema.columns().size()); List fields = ((StructObjectInspector) sourceInspector()).getAllStructFieldRefs(); for (int i = 0; i < schema.columns().size(); ++i) { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index 707db6f808ec..ce425c6d10a4 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -46,6 +45,7 @@ import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; import org.apache.iceberg.mr.mapred.Container; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +55,7 @@ public class HiveIcebergSerDe extends AbstractSerDe { private ObjectInspector inspector; private Schema tableSchema; - private Map deserializers = new HashMap<>(1); + private Map deserializers = Maps.newHashMapWithExpectedSize(1); private Container row = new Container<>(); @Override diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java index 0d5b1d63bb23..4d4a87f03c54 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java @@ -197,6 +197,7 @@ public void testLegacyLoadCatalogDefault() { Optional defaultCatalog = Catalogs.loadCatalog(conf, null); Assert.assertTrue(defaultCatalog.isPresent()); Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class); + Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties())); } @Test @@ -205,6 +206,7 @@ public void testLegacyLoadCatalogHive() { Optional hiveCatalog = Catalogs.loadCatalog(conf, null); Assert.assertTrue(hiveCatalog.isPresent()); Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class); + Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties())); } @Test @@ -214,6 +216,7 @@ public void testLegacyLoadCatalogHadoop() { Optional hadoopCatalog = Catalogs.loadCatalog(conf, null); Assert.assertTrue(hadoopCatalog.isPresent()); Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class); + Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties())); } @Test @@ -223,12 +226,14 @@ public void testLegacyLoadCatalogCustom() { Optional customHadoopCatalog = Catalogs.loadCatalog(conf, null); Assert.assertTrue(customHadoopCatalog.isPresent()); Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class); + Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties())); } @Test public void testLegacyLoadCatalogLocation() { conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION); Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent()); + Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties())); } @Test @@ -241,9 +246,13 @@ public void testLegacyLoadCatalogUnknown() { @Test public void testLoadCatalogDefault() { - Optional defaultCatalog = Catalogs.loadCatalog(conf, "barCatalog"); + String catalogName = "barCatalog"; + Optional defaultCatalog = Catalogs.loadCatalog(conf, catalogName); Assert.assertTrue(defaultCatalog.isPresent()); Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertTrue(Catalogs.hiveCatalog(conf, properties)); } @Test @@ -254,6 +263,20 @@ public void testLoadCatalogHive() { Optional hiveCatalog = Catalogs.loadCatalog(conf, catalogName); Assert.assertTrue(hiveCatalog.isPresent()); Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertTrue(Catalogs.hiveCatalog(conf, properties)); + } + + @Test + public void testLegacyLoadCustomCatalogWithHiveCatalogTypeSet() { + String catalogName = "barCatalog"; + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE); + conf.set(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalog.class.getName()); + conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation"); + AssertHelpers.assertThrows("Should complain about both configs being set", IllegalArgumentException.class, + "both type and catalog-impl are set", () -> Catalogs.loadCatalog(conf, catalogName)); } @Test @@ -267,6 +290,9 @@ public void testLoadCatalogHadoop() { Assert.assertTrue(hadoopCatalog.isPresent()); Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class); Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString()); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertFalse(Catalogs.hiveCatalog(conf, properties)); } @Test @@ -279,6 +305,9 @@ public void testLoadCatalogHadoopWithLegacyWarehouseLocation() { Assert.assertTrue(hadoopCatalog.isPresent()); Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class); Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString()); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertFalse(Catalogs.hiveCatalog(conf, properties)); } @Test @@ -291,6 +320,9 @@ public void testLoadCatalogCustom() { Optional customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName); Assert.assertTrue(customHadoopCatalog.isPresent()); Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertFalse(Catalogs.hiveCatalog(conf, properties)); } @Test diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java index ef964f8fb92e..2ba4e50e8aa1 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java @@ -32,6 +32,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.data.DeleteReadTests; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -104,6 +105,7 @@ public StructLikeSet rowSet(String name, Table table, String... columns) { .filter(recordFactory -> recordFactory.name().equals(inputFormat)) .map(recordFactory -> recordFactory.create(builder.project(projected).conf()).getRecords()) .flatMap(List::stream) + .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record)) .collect(Collectors.toList()) ); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index acdda78c680b..f181875ad3ac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -104,12 +104,7 @@ public Type list(GroupType list, Type element) { return list; } else if (element != null) { if (!Objects.equal(element, originalElement)) { - Integer listId = getId(list); - // the element type was projected - Type listType = Types.list(list.getRepetition()) - .element(element) - .named(list.getName()); - return listId == null ? listType : listType.withId(listId); + return list.withNewFields(repeated.withNewFields(element)); } return list; } @@ -129,14 +124,8 @@ public Type map(GroupType map, Type key, Type value) { if ((keyId != null && selectedIds.contains(keyId)) || (valueId != null && selectedIds.contains(valueId))) { return map; } else if (value != null) { - Integer mapId = getId(map); if (!Objects.equal(value, originalValue)) { - Type mapType = Types.map(map.getRepetition()) - .key(originalKey) - .value(value) - .named(map.getName()); - - return mapId == null ? mapType : mapType.withId(mapId); + return map.withNewFields(repeated.withNewFields(originalKey, value)); } return map; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java new file mode 100644 index 000000000000..dfa7e64a4758 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestPruneColumns { + @Test + public void testMapKeyValueName() { + MessageType fileSchema = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.stringType()) + .id(2) + .named("key")) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(6).named("z")) + .id(3) + .named("value")) + .named("custom_key_value_name")) + .as(LogicalTypeAnnotation.mapType()) + .id(1) + .named("m")) + .named("table"); + + // project map.value.x and map.value.y + Schema projection = new Schema( + NestedField.optional(1, "m", MapType.ofOptional(2, 3, + StringType.get(), + StructType.of( + NestedField.required(4, "x", DoubleType.get()), + NestedField.required(5, "y", DoubleType.get()) + ) + )) + ); + + MessageType expected = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.stringType()) + .id(2) + .named("key")) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .id(3) + .named("value")) + .named("custom_key_value_name")) + .as(LogicalTypeAnnotation.mapType()) + .id(1) + .named("m")) + .named("table"); + + MessageType actual = ParquetSchemaUtil.pruneColumns(fileSchema, projection); + Assert.assertEquals("Pruned schema should not rename repeated struct", expected, actual); + } + + @Test + public void testListElementName() { + MessageType fileSchema = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(6).named("z")) + .id(3) + .named("custom_element_name")) + .named("custom_repeated_name")) + .as(LogicalTypeAnnotation.listType()) + .id(1) + .named("m")) + .named("table"); + + // project map.value.x and map.value.y + Schema projection = new Schema( + NestedField.optional(1, "m", ListType.ofOptional(3, + StructType.of( + NestedField.required(4, "x", DoubleType.get()), + NestedField.required(5, "y", DoubleType.get()) + ) + )) + ); + + MessageType expected = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .id(3) + .named("custom_element_name")) + .named("custom_repeated_name")) + .as(LogicalTypeAnnotation.listType()) + .id(1) + .named("m")) + .named("table"); + + MessageType actual = ParquetSchemaUtil.pruneColumns(fileSchema, projection); + Assert.assertEquals("Pruned schema should not rename repeated struct", expected, actual); + } +} diff --git a/site/docs/hive.md b/site/docs/hive.md index b189e347a993..8b59900c5976 100644 --- a/site/docs/hive.md +++ b/site/docs/hive.md @@ -98,8 +98,8 @@ To globally register different catalogs, set the following Hadoop configurations | Config Key | Description | | --------------------------------------------- | ------------------------------------------------------ | -| iceberg.catalog..type | type of catalog: `hive` or `hadoop` | -| iceberg.catalog..catalog-impl | catalog implementation, must not be null if type is null | +| iceberg.catalog..type | type of catalog: `hive`, `hadoop`, or left unset if using a custom catalog | +| iceberg.catalog..catalog-impl | catalog implementation, must not be null if type is empty | | iceberg.catalog.. | any config key and value pairs for the catalog | Here are some examples using Hive CLI: diff --git a/site/docs/spark-configuration.md b/site/docs/spark-configuration.md index 93a87597b38a..67befeab574e 100644 --- a/site/docs/spark-configuration.md +++ b/site/docs/spark-configuration.md @@ -54,8 +54,8 @@ Both catalogs are configured using properties nested under the catalog name. Com | Property | Values | Description | | -------------------------------------------------- | ----------------------------- | -------------------------------------------------------------------- | -| spark.sql.catalog._catalog-name_.type | `hive` or `hadoop` | The underlying Iceberg catalog implementation, `HiveCatalog` or `HadoopCatalog` | -| spark.sql.catalog._catalog-name_.catalog-impl | | The underlying Iceberg catalog implementation. When set, the value of `type` property is ignored | +| spark.sql.catalog._catalog-name_.type | `hive` or `hadoop` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog` or left unset if using a custom catalog | +| spark.sql.catalog._catalog-name_.catalog-impl | | The underlying Iceberg catalog implementation.| | spark.sql.catalog._catalog-name_.default-namespace | default | The default current namespace for the catalog | | spark.sql.catalog._catalog-name_.uri | thrift://host:port | Metastore connect URI; default from `hive-site.xml` | | spark.sql.catalog._catalog-name_.warehouse | hdfs://nn:8020/warehouse/path | Base path for the warehouse directory | @@ -104,8 +104,7 @@ spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000 ### Loading a custom catalog -Spark supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property. -When `catalog-impl` is set, the value of `type` is ignored. Here is an example: +Spark supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property. Here is an example: ```plain spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog diff --git a/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java b/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java index ac659f6c7b13..862626d0cd6d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java +++ b/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java @@ -34,6 +34,7 @@ public static void registerBucketUDF(SparkSession session, String funcName, Data SparkTypeToType typeConverter = new SparkTypeToType(); Type sourceIcebergType = typeConverter.atomic(sourceType); Transform bucket = Transforms.bucket(sourceIcebergType, numBuckets); - session.udf().register(funcName, bucket::apply, DataTypes.IntegerType); + session.udf().register(funcName, + value -> bucket.apply(SparkValueConverter.convert(sourceIcebergType, value)), DataTypes.IntegerType); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java b/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java index 92c812a9b979..ef453c0cef2b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java @@ -79,8 +79,9 @@ public static Object convert(Type type, Object object) { return DateTimeUtils.fromJavaTimestamp((Timestamp) object); case BINARY: return ByteBuffer.wrap((byte[]) object); - case BOOLEAN: case INTEGER: + return ((Number) object).intValue(); + case BOOLEAN: case LONG: case FLOAT: case DOUBLE: diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java index 14785d7f27ca..f8ebe21b58a4 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java @@ -19,13 +19,22 @@ package org.apache.iceberg.spark.source; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Timestamp; import java.util.List; import org.apache.iceberg.spark.IcebergSpark; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; +import org.apache.spark.sql.types.CharType; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.VarcharType; +import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -48,23 +57,132 @@ public static void stopSpark() { } @Test - public void testRegisterBucketUDF() { + public void testRegisterIntegerBucketUDF() { IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_int_16", DataTypes.IntegerType, 16); List results = spark.sql("SELECT iceberg_bucket_int_16(1)").collectAsList(); Assert.assertEquals(1, results.size()); Assert.assertEquals((int) Transforms.bucket(Types.IntegerType.get(), 16).apply(1), results.get(0).getInt(0)); + } + + @Test + public void testRegisterShortBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_short_16", DataTypes.ShortType, 16); + List results = spark.sql("SELECT iceberg_bucket_short_16(1S)").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.IntegerType.get(), 16).apply(1), + results.get(0).getInt(0)); + } + + @Test + public void testRegisterByteBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_byte_16", DataTypes.ByteType, 16); + List results = spark.sql("SELECT iceberg_bucket_byte_16(1Y)").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.IntegerType.get(), 16).apply(1), + results.get(0).getInt(0)); + } + @Test + public void testRegisterLongBucketUDF() { IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_long_16", DataTypes.LongType, 16); - List results2 = spark.sql("SELECT iceberg_bucket_long_16(1L)").collectAsList(); - Assert.assertEquals(1, results2.size()); + List results = spark.sql("SELECT iceberg_bucket_long_16(1L)").collectAsList(); + Assert.assertEquals(1, results.size()); Assert.assertEquals((int) Transforms.bucket(Types.LongType.get(), 16).apply(1L), - results2.get(0).getInt(0)); + results.get(0).getInt(0)); + } + @Test + public void testRegisterStringBucketUDF() { IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_string_16", DataTypes.StringType, 16); - List results3 = spark.sql("SELECT iceberg_bucket_string_16('hello')").collectAsList(); - Assert.assertEquals(1, results3.size()); + List results = spark.sql("SELECT iceberg_bucket_string_16('hello')").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.StringType.get(), 16).apply("hello"), + results.get(0).getInt(0)); + } + + @Test + public void testRegisterCharBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_char_16", new CharType(5), 16); + List results = spark.sql("SELECT iceberg_bucket_char_16('hello')").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.StringType.get(), 16).apply("hello"), + results.get(0).getInt(0)); + } + + @Test + public void testRegisterVarCharBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_varchar_16", new VarcharType(5), 16); + List results = spark.sql("SELECT iceberg_bucket_varchar_16('hello')").collectAsList(); + Assert.assertEquals(1, results.size()); Assert.assertEquals((int) Transforms.bucket(Types.StringType.get(), 16).apply("hello"), - results3.get(0).getInt(0)); + results.get(0).getInt(0)); + } + + @Test + public void testRegisterDateBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_date_16", DataTypes.DateType, 16); + List results = spark.sql("SELECT iceberg_bucket_date_16(DATE '2021-06-30')").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.DateType.get(), 16) + .apply(DateTimeUtils.fromJavaDate(Date.valueOf("2021-06-30"))), + results.get(0).getInt(0)); + } + + @Test + public void testRegisterTimestampBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_timestamp_16", DataTypes.TimestampType, 16); + List results = + spark.sql("SELECT iceberg_bucket_timestamp_16(TIMESTAMP '2021-06-30 00:00:00.000')").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.TimestampType.withZone(), 16) + .apply(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2021-06-30 00:00:00.000"))), + results.get(0).getInt(0)); + } + + @Test + public void testRegisterBinaryBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_binary_16", DataTypes.BinaryType, 16); + List results = + spark.sql("SELECT iceberg_bucket_binary_16(X'0020001F')").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.BinaryType.get(), 16) + .apply(ByteBuffer.wrap(new byte[]{0x00, 0x20, 0x00, 0x1F})), + results.get(0).getInt(0)); + } + + @Test + public void testRegisterDecimalBucketUDF() { + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_decimal_16", new DecimalType(4, 2), 16); + List results = + spark.sql("SELECT iceberg_bucket_decimal_16(11.11)").collectAsList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals((int) Transforms.bucket(Types.DecimalType.of(4, 2), 16) + .apply(new BigDecimal("11.11")), + results.get(0).getInt(0)); + } + + @Test + public void testRegisterBooleanBucketUDF() { + Assertions.assertThatThrownBy(() -> + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_boolean_16", DataTypes.BooleanType, 16)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot bucket by type: boolean"); + } + + @Test + public void testRegisterDoubleBucketUDF() { + Assertions.assertThatThrownBy(() -> + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_double_16", DataTypes.DoubleType, 16)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot bucket by type: double"); + } + + @Test + public void testRegisterFloatBucketUDF() { + Assertions.assertThatThrownBy(() -> + IcebergSpark.registerBucketUDF(spark, "iceberg_bucket_float_16", DataTypes.FloatType, 16)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot bucket by type: float"); } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index a3da366768de..1ab63ab4d517 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -357,7 +357,9 @@ private void commitWithSerializableIsolation(OverwriteFiles overwriteFiles, } Expression conflictDetectionFilter = conflictDetectionFilter(); - overwriteFiles.validateNoConflictingAppends(conflictDetectionFilter); + overwriteFiles.conflictDetectionFilter(conflictDetectionFilter); + overwriteFiles.validateNoConflictingData(); + overwriteFiles.validateNoConflictingDeletes(); String commitMsg = String.format( "overwrite of %d data files with %d new data files, scanSnapshotId: %d, conflictDetectionFilter: %s", @@ -368,6 +370,15 @@ private void commitWithSerializableIsolation(OverwriteFiles overwriteFiles, private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles, int numOverwrittenFiles, int numAddedFiles) { + Long scanSnapshotId = scan.snapshotId(); + if (scanSnapshotId != null) { + overwriteFiles.validateFromSnapshot(scanSnapshotId); + } + + Expression conflictDetectionFilter = conflictDetectionFilter(); + overwriteFiles.conflictDetectionFilter(conflictDetectionFilter); + overwriteFiles.validateNoConflictingDeletes(); + String commitMsg = String.format( "overwrite of %d data files with %d new data files", numOverwrittenFiles, numAddedFiles);