diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5df276e8f3dd5..2f9e9b2a1ac2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -3605,8 +3605,14 @@ public String clientSourceAddressAndPort() { @Override public CompletableFuture> checkConnectionLiveness() { + if (!isActive()) { + return CompletableFuture.completedFuture(Optional.of(false)); + } if (connectionLivenessCheckTimeoutMillis > 0) { return NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> { + if (!isActive()) { + return CompletableFuture.completedFuture(Optional.of(false)); + } if (connectionCheckInProgress != null) { return connectionCheckInProgress; } else { @@ -3614,10 +3620,24 @@ public CompletableFuture> checkConnectionLiveness() { new CompletableFuture<>(); connectionCheckInProgress = finalConnectionCheckInProgress; ctx.executor().schedule(() -> { - if (finalConnectionCheckInProgress == connectionCheckInProgress - && !finalConnectionCheckInProgress.isDone()) { + if (!isActive()) { + finalConnectionCheckInProgress.complete(Optional.of(false)); + return; + } + if (finalConnectionCheckInProgress.isDone()) { + return; + } + if (finalConnectionCheckInProgress == connectionCheckInProgress) { + /** + * {@link #connectionCheckInProgress} will be completed when + * {@link #channelInactive(ChannelHandlerContext)} event occurs, so skip set it here. + */ log.warn("[{}] Connection check timed out. Closing connection.", this.toString()); ctx.close(); + } else { + log.error("[{}] Reached unexpected code block. Completing connection check.", + this.toString()); + finalConnectionCheckInProgress.complete(Optional.of(true)); } }, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS); sendPing(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java new file mode 100644 index 0000000000000..3acc941a2c8c2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxNonInjectionTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.service; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ServerCnxNonInjectionTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 60 * 1000) + public void testCheckConnectionLivenessAfterClosed() throws Exception { + // Create a ServerCnx + final String tp = BrokerTestUtil.newUniqueName("public/default/tp"); + Producer p = pulsarClient.newProducer(Schema.STRING).topic(tp).create(); + ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService().getTopic(tp, false).join().get() + .getProducers().values().iterator().next().getCnx(); + // Call "CheckConnectionLiveness" after serverCnx is closed. The resulted future should be done eventually. + p.close(); + serverCnx.close(); + Thread.sleep(1000); + serverCnx.checkConnectionLiveness().join(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index c2715de986ad8..06c6069ebae71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -67,11 +67,11 @@ import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.OwnershipCache; import org.apache.pulsar.broker.resources.BaseResources; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; -import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -1008,28 +1008,36 @@ public void testActiveConsumerCleanup() throws Exception { int numMessages = 100; final CountDownLatch latch = new CountDownLatch(numMessages); - String topic = "persistent://my-property/my-ns/closed-cnx-topic"; + String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/closed-cnx-topic"); + admin.topics().createNonPartitionedTopic(topic); String sub = "my-subscriber-name"; @Cleanup PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); - pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> { - Assert.assertNotNull(msg, "Message cannot be null"); - String receivedMessage = new String(msg.getData()); - log.debug("Received message [{}] in the listener", receivedMessage); - c1.acknowledgeAsync(msg); - latch.countDown(); - }).subscribe(); - + ConsumerImpl c = + (ConsumerImpl) pulsarClient.newConsumer().topic(topic).subscriptionName(sub).messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + c1.acknowledgeAsync(msg); + latch.countDown(); + }).subscribe(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); - AbstractDispatcherSingleActiveConsumer dispatcher = (AbstractDispatcherSingleActiveConsumer) topicRef .getSubscription(sub).getDispatcher(); - ServerCnx cnx = (ServerCnx) dispatcher.getActiveConsumer().cnx(); - Field field = ServerCnx.class.getDeclaredField("isActive"); - field.setAccessible(true); - field.set(cnx, false); - assertNotNull(dispatcher.getActiveConsumer()); + + // Inject an blocker to make the "ping & pong" does not work. + CountDownLatch countDownLatch = new CountDownLatch(1); + ConnectionHandler connectionHandler = c.getConnectionHandler(); + ClientCnx clientCnx = connectionHandler.cnx(); + clientCnx.ctx().executor().submit(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + @Cleanup PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0); Consumer consumer = null; @@ -1042,15 +1050,19 @@ public void testActiveConsumerCleanup() throws Exception { c1.acknowledgeAsync(msg); latch.countDown(); }).subscribe(); - if (i == 0) { - fail("Should failed with ConsumerBusyException!"); - } } catch (PulsarClientException.ConsumerBusyException ignore) { // It's ok. } } assertNotNull(consumer); log.info("-- Exiting {} test --", methodName); + + // cleanup. + countDownLatch.countDown(); + consumer.close(); + pulsarClient.close(); + pulsarClient2.close(); + admin.topics().delete(topic, false); } @Test