Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.functions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -56,6 +57,202 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testOverwriteFilterSerializableIsolation() throws Exception {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();

List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "a"));
spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append();

// Validating from previous snapshot finds conflicts
Dataset<Row> conflictingDf = spark.createDataFrame(records, SimpleRecord.class);
AssertHelpers.assertThrowsCause("Conflicting new data files should throw exception",
ValidationException.class,
"Found conflicting files that can contain records matching ref(name=\"id\") == 1:",
() -> {
try {
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
.overwrite(functions.col("id").equalTo(1));
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
});

// Validating from latest snapshot should succeed
table.refresh();
long newSnapshotId = table.currentSnapshot().snapshotId();
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
.overwrite(functions.col("id").equalTo(1));
}

@Test
public void testOverwriteFilterSerializableIsolation2() throws Exception {
List<SimpleRecord> records = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"));
spark.createDataFrame(records, SimpleRecord.class).coalesce(1).writeTo(tableName).append();

Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();

// This should add a delete file
sql("DELETE FROM %s WHERE id='1' and data='b'", tableName);
table.refresh();

// Validating from previous snapshot finds conflicts
List<SimpleRecord> conflictingRecords = Lists.newArrayList(new SimpleRecord(1, "a"));
Dataset<Row> conflictingDf = spark.createDataFrame(conflictingRecords, SimpleRecord.class);
AssertHelpers.assertThrowsCause("Conflicting new delete files should throw exception",
ValidationException.class,
"Found new conflicting delete files that can apply to records matching ref(name=\"id\") == 1:",
() -> {
try {
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
.overwrite(functions.col("id").equalTo(1));
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
});

// Validating from latest snapshot should succeed
table.refresh();
long newSnapshotId = table.currentSnapshot().snapshotId();
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
.overwrite(functions.col("id").equalTo(1));
}

@Test
public void testOverwriteFilterSerializableIsolation3() throws Exception {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();

// This should delete a data file
sql("DELETE FROM %s WHERE id='1'", tableName);
table.refresh();

// Validating from previous snapshot finds conflicts
List<SimpleRecord> conflictingRecords = Lists.newArrayList(new SimpleRecord(1, "a"));
Dataset<Row> conflictingDf = spark.createDataFrame(conflictingRecords, SimpleRecord.class);
AssertHelpers.assertThrowsCause("Conflicting deleted data files should throw exception",
ValidationException.class,
"Found conflicting deleted files that can contain records matching ref(name=\"id\") == 1:",
() -> {
try {
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
.overwrite(functions.col("id").equalTo(1));
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
});

// Validating from latest snapshot should succeed
table.refresh();
long newSnapshotId = table.currentSnapshot().snapshotId();
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
.overwrite(functions.col("id").equalTo(1));
}

@Test
public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
Table table = validationCatalog.loadTable(tableIdent);

List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "a"));
spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append();

// Validating from no snapshot id defaults to beginning snapshot id and finds conflicts
Dataset<Row> conflictingDf = spark.createDataFrame(records, SimpleRecord.class);
AssertHelpers.assertThrowsCause("Conflicting new data files should throw exception",
ValidationException.class,
"Found conflicting files that can contain records matching ref(name=\"id\") == 1:",
() -> {
try {
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
.overwrite(functions.col("id").equalTo(1));
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
});

// Validating from latest snapshot should succeed
table.refresh();
long newSnapshotId = table.currentSnapshot().snapshotId();
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SERIALIZABLE.toString())
.overwrite(functions.col("id").equalTo(1));
}

@Test
public void testOverwriteFilterSnapshotIsolation() throws Exception {
List<SimpleRecord> records = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"));
spark.createDataFrame(records, SimpleRecord.class).coalesce(1).writeTo(tableName).append();

Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();

// This should add a delete file
sql("DELETE FROM %s WHERE id='1' and data='b'", tableName);
table.refresh();

// Validating from previous snapshot finds conflicts
List<SimpleRecord> conflictingRecords = Lists.newArrayList(new SimpleRecord(1, "a"));
Dataset<Row> conflictingDf = spark.createDataFrame(conflictingRecords, SimpleRecord.class);
AssertHelpers.assertThrowsCause("Conflicting new delete files should throw exception",
ValidationException.class,
"Found new conflicting delete files that can apply to records matching ref(name=\"id\") == 1:",
() -> {
try {
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
.overwrite(functions.col("id").equalTo(1));
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
}
});

// Validating from latest snapshot should succeed
table.refresh();
long newSnapshotId = table.currentSnapshot().snapshotId();
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(newSnapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
.overwrite(functions.col("id").equalTo(1));
}

@Test
public void testOverwriteFilterSnapshotIsolation2() throws Exception {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();

List<SimpleRecord> records = Lists.newArrayList(
new SimpleRecord(1, "a"));
spark.createDataFrame(records, SimpleRecord.class).writeTo(tableName).append();

// Validation should not fail due to conflicting data file in snapshot isolation mode
Dataset<Row> conflictingDf = spark.createDataFrame(records, SimpleRecord.class);
conflictingDf.writeTo(tableName)
.option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, String.valueOf(snapshotId))
.option(SparkWriteOptions.ISOLATION_LEVEL, IsolationLevel.SNAPSHOT.toString())
.overwrite(functions.col("id").equalTo(1));
}

@Test
public void testOverwritePartitionSerializableIsolation() throws Exception {
Table table = validationCatalog.loadTable(tableIdent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,21 @@ public void commit(WriterCommitMessage[] messages) {
overwriteFiles.addFile(file);
}

IsolationLevel isolationLevel = writeConf.isolationLevel();
Long validateFromSnapshotId = writeConf.validateFromSnapshotId();

if (isolationLevel != null && validateFromSnapshotId != null) {
overwriteFiles.validateFromSnapshot(validateFromSnapshotId);
}

if (isolationLevel == SERIALIZABLE) {
overwriteFiles.validateNoConflictingDeletes();
overwriteFiles.validateNoConflictingData();

} else if (isolationLevel == SNAPSHOT) {
overwriteFiles.validateNoConflictingDeletes();
}

String commitMsg = String.format("overwrite by filter %s with %d new data files", overwriteExpr, numFiles);
commitOperation(overwriteFiles, commitMsg);
}
Expand Down