Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public void commit(WriterCommitMessage[] messages) {

protected void commitOperation(SnapshotUpdate<?> operation, int numFiles, String description) {
LOG.info("Committing {} with {} files to table {}", description, numFiles, table);
if (numFiles == 0) {
return;
}

if (applicationId != null) {
operation.set("spark.app.id", applicationId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testDeleteFromEmptyTable() {
sql("DELETE FROM %s WHERE dep = 'hr'", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots()));
Assert.assertEquals("Should have 0 snapshots", 0, Iterables.size(table.snapshots()));

assertEquals("Should have expected rows",
ImmutableList.of(),
Expand Down Expand Up @@ -150,10 +150,10 @@ public void testDeleteNonExistingRecords() {
sql("DELETE FROM %s AS t WHERE t.id > 10", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots()));
Assert.assertEquals("Should have 1 snapshots", 1, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "overwrite", "0", null, null);
validateSnapshot(currentSnapshot, "append", "2", null, "3");

assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testUpdateEmptyTable() {
sql("UPDATE %s SET id = -1 WHERE dep = 'hr'", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots()));
Assert.assertEquals("Should have 0 snapshots", 0, Iterables.size(table.snapshots()));

assertEquals("Should have expected rows",
ImmutableList.of(),
Expand Down Expand Up @@ -188,10 +188,10 @@ public void testUpdateNonExistingRecords() {
sql("UPDATE %s SET id = -1 WHERE id > 10", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots()));
Assert.assertEquals("Should have 1 snapshots", 1, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
validateSnapshot(currentSnapshot, "overwrite", "0", null, null);
validateSnapshot(currentSnapshot, "append", "2", null, "3");

assertEquals("Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware"), row(null, "hr")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ private WriterFactory createWriterFactory() {
return new WriterFactory(tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
}

private void commitOperation(SnapshotUpdate<?> operation, String description) {
private void commitOperation(SnapshotUpdate<?> operation, int numFiles, String description) {
LOG.info("Committing {} to table {}", description, table);
if (numFiles == 0) {
return;
}

if (applicationId != null) {
operation.set("spark.app.id", applicationId);
}
Expand Down Expand Up @@ -234,7 +238,7 @@ public void commit(WriterCommitMessage[] messages) {
append.appendFile(file);
}

commitOperation(append, String.format("append with %d new data files", numFiles));
commitOperation(append, numFiles, String.format("append with %d new data files", numFiles));
}
}

Expand All @@ -256,7 +260,8 @@ public void commit(WriterCommitMessage[] messages) {
dynamicOverwrite.addFile(file);
}

commitOperation(dynamicOverwrite, String.format("dynamic partition overwrite with %d new data files", numFiles));
commitOperation(dynamicOverwrite, numFiles,
String.format("dynamic partition overwrite with %d new data files", numFiles));
}
}

Expand All @@ -279,7 +284,7 @@ public void commit(WriterCommitMessage[] messages) {
}

String commitMsg = String.format("overwrite by filter %s with %d new data files", overwriteExpr, numFiles);
commitOperation(overwriteFiles, commitMsg);
commitOperation(overwriteFiles, numFiles, commitMsg);
}
}

Expand Down Expand Up @@ -350,7 +355,7 @@ private void commitWithSerializableIsolation(OverwriteFiles overwriteFiles,
String commitMsg = String.format(
"overwrite of %d data files with %d new data files, scanSnapshotId: %d, conflictDetectionFilter: %s",
numOverwrittenFiles, numAddedFiles, scanSnapshotId, conflictDetectionFilter);
commitOperation(overwriteFiles, commitMsg);
commitOperation(overwriteFiles, numOverwrittenFiles + numAddedFiles, commitMsg);
}

private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles,
Expand All @@ -368,7 +373,7 @@ private void commitWithSnapshotIsolation(OverwriteFiles overwriteFiles,
String commitMsg = String.format(
"overwrite of %d data files with %d new data files",
numOverwrittenFiles, numAddedFiles);
commitOperation(overwriteFiles, commitMsg);
commitOperation(overwriteFiles, numOverwrittenFiles + numAddedFiles, commitMsg);
}
}

Expand Down Expand Up @@ -420,10 +425,10 @@ public final void commit(long epochId, WriterCommitMessage[] messages) {

protected abstract void doCommit(long epochId, WriterCommitMessage[] messages);

protected <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId, String description) {
protected <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId, int numFiles, String description) {
snapshotUpdate.set(QUERY_ID_PROPERTY, queryId);
snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId));
commitOperation(snapshotUpdate, description);
commitOperation(snapshotUpdate, numFiles, description);
}

private Long findLastCommittedEpochId() {
Expand Down Expand Up @@ -467,7 +472,7 @@ protected void doCommit(long epochId, WriterCommitMessage[] messages) {
append.appendFile(file);
numFiles++;
}
commit(append, epochId, String.format("streaming append with %d new data files", numFiles));
commit(append, epochId, numFiles, String.format("streaming append with %d new data files", numFiles));
}
}

Expand All @@ -486,7 +491,8 @@ public void doCommit(long epochId, WriterCommitMessage[] messages) {
overwriteFiles.addFile(file);
numFiles++;
}
commit(overwriteFiles, epochId, String.format("streaming complete overwrite with %d new data files", numFiles));
commit(overwriteFiles, epochId, numFiles,
String.format("streaming complete overwrite with %d new data files", numFiles));
}
}

Expand Down