Skip to content

Commit

Permalink
[fix][broker]A failed consumer/producer future in ServerCnx can never…
Browse files Browse the repository at this point in the history
… be removed (#23123)

(cherry picked from commit 114880b)
  • Loading branch information
poorbarcode committed Aug 6, 2024
1 parent 57fd904 commit 9c45082
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3604,19 +3604,39 @@ public String clientSourceAddressAndPort() {

@Override
public CompletableFuture<Optional<Boolean>> checkConnectionLiveness() {
if (!isActive()) {
return CompletableFuture.completedFuture(Optional.of(false));
}
if (connectionLivenessCheckTimeoutMillis > 0) {
return NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
if (!isActive()) {
return CompletableFuture.completedFuture(Optional.of(false));
}
if (connectionCheckInProgress != null) {
return connectionCheckInProgress;
} else {
final CompletableFuture<Optional<Boolean>> finalConnectionCheckInProgress =
new CompletableFuture<>();
connectionCheckInProgress = finalConnectionCheckInProgress;
ctx.executor().schedule(() -> {
if (finalConnectionCheckInProgress == connectionCheckInProgress
&& !finalConnectionCheckInProgress.isDone()) {
if (!isActive()) {
finalConnectionCheckInProgress.complete(Optional.of(false));
return;
}
if (finalConnectionCheckInProgress.isDone()) {
return;
}
if (finalConnectionCheckInProgress == connectionCheckInProgress) {
/**
* {@link #connectionCheckInProgress} will be completed when
* {@link #channelInactive(ChannelHandlerContext)} event occurs, so skip set it here.
*/
log.warn("[{}] Connection check timed out. Closing connection.", this.toString());
ctx.close();
} else {
log.error("[{}] Reached unexpected code block. Completing connection check.",
this.toString());
finalConnectionCheckInProgress.complete(Optional.of(true));
}
}, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS);
sendPing();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class ServerCnxNonInjectionTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 60 * 1000)
public void testCheckConnectionLivenessAfterClosed() throws Exception {
// Create a ServerCnx
final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(tp).create();
ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService().getTopic(tp, false).join().get()
.getProducers().values().iterator().next().getCnx();
// Call "CheckConnectionLiveness" after serverCnx is closed. The resulted future should be done eventually.
p.close();
serverCnx.close();
Thread.sleep(1000);
serverCnx.checkConnectionLiveness().join();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.resources.BaseResources;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -1008,28 +1008,36 @@ public void testActiveConsumerCleanup() throws Exception {

int numMessages = 100;
final CountDownLatch latch = new CountDownLatch(numMessages);
String topic = "persistent://my-property/my-ns/closed-cnx-topic";
String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/closed-cnx-topic");
admin.topics().createNonPartitionedTopic(topic);
String sub = "my-subscriber-name";
@Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
c1.acknowledgeAsync(msg);
latch.countDown();
}).subscribe();

ConsumerImpl c =
(ConsumerImpl) pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
c1.acknowledgeAsync(msg);
latch.countDown();
}).subscribe();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();

AbstractDispatcherSingleActiveConsumer dispatcher = (AbstractDispatcherSingleActiveConsumer) topicRef
.getSubscription(sub).getDispatcher();
ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx();
Field field = ServerCnx.class.getDeclaredField("isActive");
field.setAccessible(true);
field.set(cnx, false);

assertNotNull(dispatcher.getActiveConsumer());

// Inject an blocker to make the "ping & pong" does not work.
CountDownLatch countDownLatch = new CountDownLatch(1);
ConnectionHandler connectionHandler = c.getConnectionHandler();
ClientCnx clientCnx = connectionHandler.cnx();
clientCnx.ctx().executor().submit(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

@Cleanup
PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> consumer = null;
Expand All @@ -1042,15 +1050,19 @@ public void testActiveConsumerCleanup() throws Exception {
c1.acknowledgeAsync(msg);
latch.countDown();
}).subscribe();
if (i == 0) {
fail("Should failed with ConsumerBusyException!");
}
} catch (PulsarClientException.ConsumerBusyException ignore) {
// It's ok.
}
}
assertNotNull(consumer);
log.info("-- Exiting {} test --", methodName);

// cleanup.
countDownLatch.countDown();
consumer.close();
pulsarClient.close();
pulsarClient2.close();
admin.topics().delete(topic, false);
}

@Test
Expand Down

0 comments on commit 9c45082

Please sign in to comment.