Skip to content

Commit

Permalink
Merge pull request #12 from fetch-rewards/bugfix/init-sync-skip-no-of…
Browse files Browse the repository at this point in the history
…fset

handle no offset init.sync.skip
  • Loading branch information
ryanjclark authored Mar 27, 2023
2 parents 3f4c270 + 8b646ac commit fb3411f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ private void setStateFromOffset() {
} else {
LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName());
sourceInfo = new SourceInfo(tableDesc.getTableName(), clock);
sourceInfo.startInitSync();
if (initSyncSkip) {
sourceInfo.skipInitSync();
} else {
sourceInfo.startInitSync();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,25 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep
assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo());
}

@Test
public void initSyncIsSkippedWithNoOffsetOnStart() throws InterruptedException {
configs.put("init.sync.skip", "true");
// Arrange
DynamoDBSourceTask task = new SourceTaskBuilder()
.withOffset(null)
.buildTask();

// Act
task.start(configs);

// Assert
SourceInfo sourceInfo = task.getSourceInfo();
assertEquals(tableName, sourceInfo.tableName);
assertEquals(InitSyncStatus.SKIPPED, sourceInfo.initSyncStatus);
assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncStart);
assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncEnd);
}

@Test
public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws InterruptedException {
configs.put("init.sync.skip", "true");
Expand Down

0 comments on commit fb3411f

Please sign in to comment.