Skip to content

Commit

Permalink
fix: fix a bug in RocksGroupCommitService; remove RocksDBConsumeQueue…
Browse files Browse the repository at this point in the history
…Store#findConsumeQueueMap override

Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui committed Nov 14, 2024
1 parent 4fedc06 commit 461e8de
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -529,16 +529,6 @@ public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId)
return oldLogic != null ? oldLogic : newLogic;
}

@Override
public ConcurrentMap<Integer, ConsumeQueueInterface> findConsumeQueueMap(String topic) {
if (MixAll.isLmq(topic)) {
ConcurrentMap<Integer, ConsumeQueueInterface> result = new ConcurrentHashMap<>(1);
result.put(MixAll.LMQ_QUEUE_ID, findOrCreateConsumeQueue(topic, MixAll.LMQ_QUEUE_ID));
return result;
}
return super.findConsumeQueueMap(topic);
}

@Override
public long rollNextFile(ConsumeQueueInterface consumeQueue, long offset) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,35 @@ public void putRequest(final DispatchRequest request) throws InterruptedExceptio
}

private void doCommit() {
boolean interrupted = false;
while (!buffer.isEmpty() && !interrupted) {
DispatchRequest dispatchRequest = buffer.poll();
if (null != dispatchRequest) {
requests.add(dispatchRequest);
}
while (!buffer.isEmpty()) {
while (true) {
DispatchRequest dispatchRequest = buffer.poll();
if (null != dispatchRequest) {
requests.add(dispatchRequest);
}

if (requests.isEmpty()) {
break;
}
if (requests.isEmpty()) {
// buffer has been drained
break;
}

if (null == dispatchRequest || requests.size() >= PREFERRED_DISPATCH_REQUEST_COUNT) {
while (!store.isStopped()) {
try {
// putMessagePosition will clear requests after consume queue building completion
store.putMessagePosition(requests);
break;
} catch (RocksDBException e) {
log.error("Failed to build consume queue in RocksDB", e);
}
if (null == dispatchRequest || requests.size() >= PREFERRED_DISPATCH_REQUEST_COUNT) {
groupCommit();
}
}
}
}

private void groupCommit() {
while (!store.isStopped()) {
try {
// putMessagePosition will clear requests after consume queue building completion
store.putMessagePosition(requests);
break;
} catch (RocksDBException e) {
log.error("Failed to build consume queue in RocksDB", e);
}
}
}

}

0 comments on commit 461e8de

Please sign in to comment.