Skip to content

Commit

Permalink
CXP-2956: Remove the callback when processing table changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Jul 5, 2024
1 parent 4ba244a commit 25eac40
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,45 +345,39 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTables - the requested tables to obtain changes for
* @param changeTable - the requested table to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @throws SQLException
*/
public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn,
Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer)
throws SQLException, InterruptedException {
final String[] queries = new String[changeTables.length];
final StatementPreparer[] preparers = new StatementPreparer[changeTables.length];

int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn) throws SQLException, InterruptedException {
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
if (!statement.isCloseOnCompletion()) {
throw new RuntimeException("isCloseOnCompletion is false!!!");
}

if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());
};
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());

idx++;
}
prepareQuery(queries, preparers, consumer);
return statement.executeQuery();
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,17 @@ else if (!checkAgent) {
tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn));
collectChangeTablesWithKnownStopLsn(partition, tablesSlot.get());
}
try {
dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> {

SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[]{};
try {
{
long eventSerialNoInInitialTx = 1;
final int tableCount = resultSets.length;
final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount];
final SqlServerChangeTable[] tables = tablesSlot.get();
changeTables = new SqlServerChangeTablePointer[tables.length];

for (int i = 0; i < tableCount; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]);
for (int i = 0; i < tables.length; i++) {
ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet);
changeTables[i].next();
}

Expand Down Expand Up @@ -342,14 +343,19 @@ else if (!checkAgent) {
connectorConfig));
tableWithSmallestLsn.next();
}
});
}
streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn));
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
}
catch (SQLException e) {
tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
}
finally {
for (SqlServerChangeTablePointer t : changeTables) {
t.close();
}
}
}
}
catch (Exception e) {
Expand Down
Loading

0 comments on commit 25eac40

Please sign in to comment.