Skip to content

Commit

Permalink
Choose random thread for consumerFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Jun 7, 2023
1 parent 50b9a93 commit 725d1b9
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher

private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final PersistentTopic topic;
protected final Executor topicExecutor;
protected final Executor executor;

protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
Expand All @@ -79,7 +79,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration(), cursor);
this.topic = topic;
this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
this.executor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread();
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
Expand Down Expand Up @@ -148,7 +148,7 @@ protected void cancelPendingRead() {

@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
topicExecutor.execute(() -> internalReadEntriesComplete(entries, obj));
executor.execute(() -> internalReadEntriesComplete(entries, obj));
}

public synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {
Expand Down Expand Up @@ -226,7 +226,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the socket.
topicExecutor.execute(() -> {
executor.execute(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
readMoreEntries(newConsumer);
Expand All @@ -238,7 +238,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e

@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
topicExecutor.execute(() -> internalConsumerFlow(consumer));
executor.execute(() -> internalConsumerFlow(consumer));
}

private synchronized void internalConsumerFlow(Consumer consumer) {
Expand Down Expand Up @@ -267,7 +267,7 @@ private synchronized void internalConsumerFlow(Consumer consumer) {

@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
topicExecutor.execute(() -> internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
executor.execute(() -> internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
}

private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
Expand Down Expand Up @@ -459,7 +459,7 @@ protected Pair<Integer, Long> calculateToRead(Consumer consumer) {

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
topicExecutor.execute(() -> internalReadEntriesFailed(exception, ctx));
executor.execute(() -> internalReadEntriesFailed(exception, ctx));
}

private synchronized void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) {
Expand Down Expand Up @@ -507,7 +507,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
topic.getBrokerService().executor().schedule(() -> {

// Jump again into dispatcher dedicated thread
topicExecutor.execute(() -> {
executor.execute(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer and there is no pending read
Expand Down

0 comments on commit 725d1b9

Please sign in to comment.