diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java index 6be70bf6a3ae..572217896041 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java @@ -31,6 +31,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.functions; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,6 +57,202 @@ public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); } + @Test + public void testOverwriteFilterSerializableIsolation() throws Exception { + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + + List records = Lists.newArrayList(new SimpleRecord(1, "a")); + spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append(); + + // Validating from previous snapshot finds conflicts + Dataset conflictingDf = spark.createDataFrame(records, SimpleRecord.class); + AssertHelpers.assertThrowsCause("Conflicting new data files should throw exception", + ValidationException.class, + "Found conflicting files that can contain records matching ref(name=\"id\") == 1:", + () -> { + try { + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString()) + .overwrite(functions.col("id").equalTo(1)); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }); + + // Validating from latest snapshot should succeed + table.refresh(); + long newSnapshotId = table.currentSnapshot().snapshotId(); + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString()) + .overwrite(functions.col("id").equalTo(1)); + } + + @Test + public void testOverwriteFilterSerializableIsolation2() throws Exception { + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "b")); + spark.createDataFrame(records, SimpleRecord.class).coalesce(1).writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + + // This should add a delete file + sql("DELETE FROM %s WHERE id='1' and data='b'", tableName); + table.refresh(); + + // Validating from previous snapshot finds conflicts + List conflictingRecords = Lists.newArrayList(new SimpleRecord(1, "a")); + Dataset conflictingDf = spark.createDataFrame(conflictingRecords, SimpleRecord.class); + AssertHelpers.assertThrowsCause("Conflicting new delete files should throw exception", + ValidationException.class, + "Found new conflicting delete files that can apply to records matching ref(name=\"id\") == 1:", + () -> { + try { + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString()) + .overwrite(functions.col("id").equalTo(1)); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }); + + // Validating from latest snapshot should succeed + table.refresh(); + long newSnapshotId = table.currentSnapshot().snapshotId(); + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString()) + .overwrite(functions.col("id").equalTo(1)); + } + + @Test + public void testOverwriteFilterSerializableIsolation3() throws Exception { + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + + // This should delete a data file + sql("DELETE FROM %s WHERE id='1'", tableName); + table.refresh(); + + // Validating from previous snapshot finds conflicts + List conflictingRecords = Lists.newArrayList(new SimpleRecord(1, "a")); + Dataset conflictingDf = spark.createDataFrame(conflictingRecords, SimpleRecord.class); + AssertHelpers.assertThrowsCause("Conflicting deleted data files should throw exception", + ValidationException.class, + "Found conflicting deleted files that can contain records matching ref(name=\"id\") == 1:", + () -> { + try { + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString()) + .overwrite(functions.col("id").equalTo(1)); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }); + + // Validating from latest snapshot should succeed + table.refresh(); + long newSnapshotId = table.currentSnapshot().snapshotId(); + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString()) + .overwrite(functions.col("id").equalTo(1)); + } + + @Test + public void testOverwriteFilterNoSnapshotIdValidation() throws Exception { + Table table = validationCatalog.loadTable(tableIdent); + + List records = Lists.newArrayList(new SimpleRecord(1, "a")); + spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append(); + + // Validating from no snapshot id defaults to beginning snapshot id and finds conflicts + Dataset conflictingDf = spark.createDataFrame(records, SimpleRecord.class); + AssertHelpers.assertThrowsCause("Conflicting new data files should throw exception", + ValidationException.class, + "Found conflicting files that can contain records matching ref(name=\"id\") == 1:", + () -> { + try { + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString()) + .overwrite(functions.col("id").equalTo(1)); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }); + + // Validating from latest snapshot should succeed + table.refresh(); + long newSnapshotId = table.currentSnapshot().snapshotId(); + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString()) + .overwrite(functions.col("id").equalTo(1)); + } + + @Test + public void testOverwriteFilterSnapshotIsolation() throws Exception { + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "b")); + spark.createDataFrame(records, SimpleRecord.class).coalesce(1).writeTo(tableName).append(); + + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + + // This should add a delete file + sql("DELETE FROM %s WHERE id='1' and data='b'", tableName); + table.refresh(); + + // Validating from previous snapshot finds conflicts + List conflictingRecords = Lists.newArrayList(new SimpleRecord(1, "a")); + Dataset conflictingDf = spark.createDataFrame(conflictingRecords, SimpleRecord.class); + AssertHelpers.assertThrowsCause("Conflicting new delete files should throw exception", + ValidationException.class, + "Found new conflicting delete files that can apply to records matching ref(name=\"id\") == 1:", + () -> { + try { + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString()) + .overwrite(functions.col("id").equalTo(1)); + } catch (NoSuchTableException e) { + throw new RuntimeException(e); + } + }); + + // Validating from latest snapshot should succeed + table.refresh(); + long newSnapshotId = table.currentSnapshot().snapshotId(); + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString()) + .overwrite(functions.col("id").equalTo(1)); + } + + @Test + public void testOverwriteFilterSnapshotIsolation2() throws Exception { + Table table = validationCatalog.loadTable(tableIdent); + long snapshotId = table.currentSnapshot().snapshotId(); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a")); + spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append(); + + // Validation should not fail due to conflicting data file in snapshot isolation mode + Dataset conflictingDf = spark.createDataFrame(records, SimpleRecord.class); + conflictingDf.writeTo(tableName) + .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId)) + .option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString()) + .overwrite(functions.col("id").equalTo(1)); + } + @Test public void testOverwritePartitionSerializableIsolation() throws Exception { Table table = validationCatalog.loadTable(tableIdent); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 4400a61dfaaa..d084a6f66838 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -319,6 +319,21 @@ public void commit(WriterCommitMessage[] messages) { overwriteFiles.addFile(file); } + IsolationLevel isolationLevel = writeConf.isolationLevel(); + Long validateFromSnapshotId = writeConf.validateFromSnapshotId(); + + if (isolationLevel != null && validateFromSnapshotId != null) { + overwriteFiles.validateFromSnapshot(validateFromSnapshotId); + } + + if (isolationLevel == SERIALIZABLE) { + overwriteFiles.validateNoConflictingDeletes(); + overwriteFiles.validateNoConflictingData(); + + } else if (isolationLevel == SNAPSHOT) { + overwriteFiles.validateNoConflictingDeletes(); + } + String commitMsg = String.format("overwrite by filter %s with %d new data files", overwriteExpr, numFiles); commitOperation(overwriteFiles, commitMsg); }