Skip to content

Commit

Permalink
[fix][broker] Check cursor state before adding it to the `waitingCurs…
Browse files Browse the repository at this point in the history
…ors` (apache#22191)
  • Loading branch information
Technoboy- authored Mar 27, 2024
1 parent f77fe5f commit b702d44
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob
name);
}
// Let the managed ledger know we want to be notified whenever a new entry is published
ledger.waitingCursors.add(this);
ledger.addWaitingCursor(this);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skip notification registering since we do have entries available",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3813,6 +3813,16 @@ public void removeWaitingCursor(ManagedCursor cursor) {
this.waitingCursors.remove(cursor);
}

public void addWaitingCursor(ManagedCursorImpl cursor) {
if (cursor instanceof NonDurableCursorImpl) {
if (cursor.isActive()) {
this.waitingCursors.add(cursor);
}
} else {
this.waitingCursors.add(cursor);
}
}

public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -75,6 +80,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -662,4 +668,44 @@ public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exceptio
subscribe.close();
admin.topics().delete(topicName);
}

@Test
public void testAddWaitingCursorsForNonDurable() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/testAddWaitingCursors";
admin.topics().createNonPartitionedTopic(topicName);
final Optional<Topic> topic = pulsar.getBrokerService().getTopic(topicName, false).join();
assertNotNull(topic.get());
PersistentTopic persistentTopic = (PersistentTopic) topic.get();
ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
doAnswer((invocation) -> {
Thread.sleep(10_000);
invocation.callRealMethod();
return null;
}).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class),
any(AsyncCallbacks.ReadEntriesCallback.class), any(Object.class), any(PositionImpl.class));
Field cursorField = ManagedLedgerImpl.class.getDeclaredField("cursors");
cursorField.setAccessible(true);
ManagedCursorContainer container = (ManagedCursorContainer) cursorField.get(ledger);
container.removeCursor("sub-2");
container.add(spyCursor, null);
final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("sub-2").subscribe();
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
producer.send("test");
producer.close();
final Message<String> receive = consumer.receive();
assertEquals("test", receive.getValue());
consumer.close();
Awaitility.await()
.pollDelay(5, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertEquals(ledger.getWaitingCursorsCount(), 0);
});
}
}

0 comments on commit b702d44

Please sign in to comment.