From bdeab6cd6a382e1d0c2056fefb3fb09ffd025a5c Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 11 Nov 2024 12:26:06 -0800 Subject: [PATCH] [fix][broker] Broker is failing to create non-durable sub if topic is fenced (#23579) (cherry picked from commit 7822dca1ffe45324d7af7ef830c617f6881a5431) --- .../mledger/impl/ManagedLedgerImpl.java | 3 ++- .../service/persistent/PersistentTopic.java | 5 ++++ .../api/SimpleProducerConsumerTest.java | 27 +++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1e0f28dff180c..4b78dacb033a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3928,7 +3928,8 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException { } } - synchronized void setFenced() { + @VisibleForTesting + public synchronized void setFenced() { log.info("{} Moving to Fenced state", name); STATE_UPDATER.set(this, State.Fenced); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 14a5e9f46b265..337fad5e6b600 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1035,6 +1035,11 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException && isCompactionSubscription(subscriptionName)) { log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage()); + } else if (ex.getCause() instanceof ManagedLedgerFencedException) { + // If the topic has been fenced, we cannot continue using it. We need to close and reopen + log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName, + ex.getMessage()); + close(); } else { log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index d8e981220f89e..6edfb634eef52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4875,4 +4875,31 @@ private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2 return 0; } } + + @Test + public void testFencedLedger() throws Exception { + log.info("-- Starting {} test --", methodName); + + final String topic = "persistent://my-property/my-ns/fencedLedger"; + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build(); + + @Cleanup + Producer producer = newPulsarClient.newProducer().topic(topic).enableBatching(false).create(); + + final int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync(); + } + producer.flush(); + + PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger(); + ml.setFenced(); + + Reader reader = newPulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest) + .createAsync().get(5, TimeUnit.SECONDS); + assertNotNull(reader); + } }