Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,71 @@ void testTableBranchAtomicCommitWithFailures() throws Exception {
.build());
}

@Test
void testCommitDeltaTxnWithAppendFiles() throws Exception {
Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
assertThat(table.snapshots()).isEmpty();

DynamicWriteResultAggregator aggregator =
new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();

WriteTarget writeTarget1 =
new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2));
WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true, Sets.newHashSet());

WriteResult writeResult1 = WriteResult.builder().addDataFiles(DATA_FILE).build();
WriteResult writeResult2 = WriteResult.builder().addDataFiles(DATA_FILE_2).build();

final String jobId = JobID.generate().toHexString();
final String operatorId = new OperatorID().toHexString();
final int checkpointId = 1;

byte[] deltaManifest1 =
aggregator.writeToManifest(
writeTarget1,
Sets.newHashSet(new DynamicWriteResult(writeTarget1, writeResult1)),
checkpointId);

CommitRequest<DynamicCommittable> commitRequest1 =
new MockCommitRequest<>(
new DynamicCommittable(writeTarget1, deltaManifest1, jobId, operatorId, checkpointId));

byte[] deltaManifest2 =
aggregator.writeToManifest(
writeTarget2,
Sets.newHashSet(new DynamicWriteResult(writeTarget2, writeResult2)),
checkpointId);

CommitRequest<DynamicCommittable> commitRequest2 =
new MockCommitRequest<>(
new DynamicCommittable(writeTarget2, deltaManifest2, jobId, operatorId, checkpointId));

boolean overwriteMode = false;
int workerPoolSize = 1;
String sinkId = "sinkId";
UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
DynamicCommitterMetrics committerMetrics = new DynamicCommitterMetrics(metricGroup);
DynamicCommitter dynamicCommitter =
new DynamicCommitter(
CATALOG_EXTENSION.catalog(),
Maps.newHashMap(),
overwriteMode,
workerPoolSize,
sinkId,
committerMetrics);

dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2));

table.refresh();
assertThat(table.snapshots()).hasSize(1);

Snapshot snapshot = Iterables.getFirst(table.snapshots(), null);
assertThat(snapshot.operation()).isEqualTo("append");
Comment on lines +654 to +656
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we already have that test case. @bezdomniy Could you revert the change in main, only keeping this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good - done.

}

@Test
void testReplacePartitions() throws Exception {
Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
Expand Down