From 48f75063045af9e765f0d67ad09d5d2ab19b6d2a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 31 Dec 2024 12:51:23 +0800 Subject: [PATCH 1/6] [fix] [broker] Msg delivery is stuck due to concurrency modifying the un-thread-safety collection recentlyJoinedConsumers --- ...PersistentStickyKeyDispatcherMultipleConsumersClassic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index c227bf5b435bc..0380b85e40297 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -560,8 +560,8 @@ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) { && ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery); } - public LinkedHashMap getRecentlyJoinedConsumers() { - return recentlyJoinedConsumers; + public synchronized LinkedHashMap getRecentlyJoinedConsumers() { + return new LinkedHashMap<>(recentlyJoinedConsumers); } public Map> getConsumerKeyHashRanges() { From ead3a51cca68ce113818a2d3b7219d34ea3ecafb Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 31 Dec 2024 16:40:21 +0800 Subject: [PATCH 2/6] [fix] [broker] Msg delivery is stuck due to concurrency modifying the un-thread-safety collection recentlyJoinedConsumers --- .../PersistentStickyKeyDispatcherMultipleConsumersClassic.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 0380b85e40297..f5208f0b2ec35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -436,8 +436,6 @@ private boolean removeConsumersFromRecentJoinedConsumers() { if (entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) { itr.remove(); hasConsumerRemovedFromTheRecentJoinedConsumers = true; - } else { - break; } } } From 46f2d797eb6884abe17a8c53515252c06b9971ef Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 31 Dec 2024 17:55:46 +0800 Subject: [PATCH 3/6] [fix] [broker] Msg delivery is stuck due to concurrency modifying the un-thread-safety collection recentlyJoinedConsumers --- ...KeyDispatcherMultipleConsumersClassic.java | 34 +++++++++ ...ispatcherMultipleConsumersClassicTest.java | 70 +++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index f5208f0b2ec35..0ea0fe7c12221 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -146,11 +146,43 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { && consumerList.size() > 1 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); + sortRecentlyJoinedConsumersIfNeeded(); } } }); } + private void sortRecentlyJoinedConsumersIfNeeded() { + if (recentlyJoinedConsumers.size() == 1) { + return; + } + boolean sortNeeded = false; + Position posPre = null; + Position posAfter = null; + for(Map.Entry entry : recentlyJoinedConsumers.entrySet()) { + if (posPre == null) { + posPre = entry.getValue(); + } else { + posAfter = entry.getValue(); + } + if (posPre != null && posAfter != null) { + if (posPre.compareTo(posAfter) > 0) { + sortNeeded = true; + break; + } + } + } + + if (sortNeeded) { + List> sortedList = new ArrayList<>(recentlyJoinedConsumers.entrySet()); + Collections.sort(sortedList, Map.Entry.comparingByValue()); + recentlyJoinedConsumers.clear(); + for(Map.Entry entry : sortedList) { + recentlyJoinedConsumers.put(entry.getKey(), entry.getValue()); + } + } + } + @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { // The consumer must be removed from the selector before calling the superclass removeConsumer method. @@ -436,6 +468,8 @@ private boolean removeConsumersFromRecentJoinedConsumers() { if (entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) { itr.remove(); hasConsumerRemovedFromTheRecentJoinedConsumers = true; + } else { + break; } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java index 1f40fd46aa344..0e7c212b8a4a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java @@ -45,7 +45,9 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -182,6 +184,74 @@ public void testAddConsumerWhenClosed() throws Exception { assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty()); } + @Test + public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception { + PersistentStickyKeyDispatcherMultipleConsumersClassic persistentDispatcher = + new PersistentStickyKeyDispatcherMultipleConsumersClassic( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + + Consumer consumer0 = mock(Consumer.class); + when(consumer0.consumerName()).thenReturn("c0"); + Consumer consumer1 = mock(Consumer.class); + when(consumer1.consumerName()).thenReturn("c1"); + Consumer consumer2 = mock(Consumer.class); + when(consumer2.consumerName()).thenReturn("c2"); + Consumer consumer3 = mock(Consumer.class); + when(consumer3.consumerName()).thenReturn("c3"); + Consumer consumer4 = mock(Consumer.class); + when(consumer4.consumerName()).thenReturn("c4"); + Consumer consumer5 = mock(Consumer.class); + when(consumer5.consumerName()).thenReturn("c5"); + + when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L); + when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionFactory.create(-1, -1)); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(0, 0)); + persistentDispatcher.addConsumer(consumer0).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 2)); + persistentDispatcher.addConsumer(consumer1).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 1)); + persistentDispatcher.addConsumer(consumer2).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 3)); + persistentDispatcher.addConsumer(consumer3).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(4, 1)); + persistentDispatcher.addConsumer(consumer4).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(6, 1)); + persistentDispatcher.addConsumer(consumer5).join(); + + Iterator> itr + = persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator(); + + Map.Entry entry1 = itr.next(); + assertEquals(entry1.getKey(), consumer4); + assertEquals(entry1.getValue(), PositionFactory.create(4, 1)); + + Map.Entry entry2 = itr.next(); + assertEquals(entry2.getKey(), consumer2); + assertEquals(entry2.getValue(), PositionFactory.create(5, 1)); + + Map.Entry entry3 = itr.next(); + assertEquals(entry3.getKey(), consumer1); + assertEquals(entry3.getValue(), PositionFactory.create(5, 2)); + + Map.Entry entry4 = itr.next(); + assertEquals(entry4.getKey(), consumer3); + assertEquals(entry4.getValue(), PositionFactory.create(5, 3)); + + Map.Entry entry5 = itr.next(); + assertEquals(entry5.getKey(), consumer5); + assertEquals(entry5.getValue(), PositionFactory.create(6, 1)); + + // cleanup. + persistentDispatcher.close(); + } + @Test public void testSendMarkerMessage() { try { From dfbb11f747d3fda55f4e6b00d1b24db6ab11e7a0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 31 Dec 2024 18:45:45 +0800 Subject: [PATCH 4/6] checkstyle --- ...ersistentStickyKeyDispatcherMultipleConsumersClassic.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 0ea0fe7c12221..94467eb16d783 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -159,7 +159,7 @@ private void sortRecentlyJoinedConsumersIfNeeded() { boolean sortNeeded = false; Position posPre = null; Position posAfter = null; - for(Map.Entry entry : recentlyJoinedConsumers.entrySet()) { + for (Map.Entry entry : recentlyJoinedConsumers.entrySet()) { if (posPre == null) { posPre = entry.getValue(); } else { @@ -177,7 +177,7 @@ private void sortRecentlyJoinedConsumersIfNeeded() { List> sortedList = new ArrayList<>(recentlyJoinedConsumers.entrySet()); Collections.sort(sortedList, Map.Entry.comparingByValue()); recentlyJoinedConsumers.clear(); - for(Map.Entry entry : sortedList) { + for (Map.Entry entry : sortedList) { recentlyJoinedConsumers.put(entry.getKey(), entry.getValue()); } } @@ -195,6 +195,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE super.removeConsumer(consumer); if (recentlyJoinedConsumers != null) { recentlyJoinedConsumers.remove(consumer); + sortRecentlyJoinedConsumersIfNeeded(); if (consumerList.size() == 1) { recentlyJoinedConsumers.clear(); } From 6ef7d25bd134a4e5fc1cbc12cca3cd970aea0680 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 1 Jan 2025 20:47:33 +0800 Subject: [PATCH 5/6] address comment --- ...KeyDispatcherMultipleConsumersClassic.java | 2 +- ...ispatcherMultipleConsumersClassicTest.java | 41 ++++++++++++------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 94467eb16d783..451eb234a6aef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -170,6 +170,7 @@ private void sortRecentlyJoinedConsumersIfNeeded() { sortNeeded = true; break; } + posPre = posAfter; } } @@ -195,7 +196,6 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE super.removeConsumer(consumer); if (recentlyJoinedConsumers != null) { recentlyJoinedConsumers.remove(consumer); - sortRecentlyJoinedConsumersIfNeeded(); if (consumerList.size() == 1) { recentlyJoinedConsumers.clear(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java index 0e7c212b8a4a9..af42fc3dca402 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java @@ -192,7 +192,7 @@ public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception { new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); Consumer consumer0 = mock(Consumer.class); - when(consumer0.consumerName()).thenReturn("c0"); + when(consumer0.consumerName()).thenReturn("c0-1"); Consumer consumer1 = mock(Consumer.class); when(consumer1.consumerName()).thenReturn("c1"); Consumer consumer2 = mock(Consumer.class); @@ -203,6 +203,8 @@ public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception { when(consumer4.consumerName()).thenReturn("c4"); Consumer consumer5 = mock(Consumer.class); when(consumer5.consumerName()).thenReturn("c5"); + Consumer consumer6 = mock(Consumer.class); + when(consumer6.consumerName()).thenReturn("c6"); when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L); when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionFactory.create(-1, -1)); @@ -210,43 +212,52 @@ public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception { when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(0, 0)); persistentDispatcher.addConsumer(consumer0).join(); - when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 2)); + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(4, 1)); persistentDispatcher.addConsumer(consumer1).join(); - when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 1)); + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 2)); persistentDispatcher.addConsumer(consumer2).join(); - when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 3)); + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 1)); persistentDispatcher.addConsumer(consumer3).join(); - when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(4, 1)); + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(5, 3)); persistentDispatcher.addConsumer(consumer4).join(); - when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(6, 1)); + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(4, 2)); persistentDispatcher.addConsumer(consumer5).join(); + when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(6, 1)); + persistentDispatcher.addConsumer(consumer6).join(); + + assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 6); + Iterator> itr = persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator(); Map.Entry entry1 = itr.next(); - assertEquals(entry1.getKey(), consumer4); assertEquals(entry1.getValue(), PositionFactory.create(4, 1)); + assertEquals(entry1.getKey(), consumer1); Map.Entry entry2 = itr.next(); - assertEquals(entry2.getKey(), consumer2); - assertEquals(entry2.getValue(), PositionFactory.create(5, 1)); + assertEquals(entry2.getValue(), PositionFactory.create(4, 2)); + assertEquals(entry2.getKey(), consumer5); Map.Entry entry3 = itr.next(); - assertEquals(entry3.getKey(), consumer1); - assertEquals(entry3.getValue(), PositionFactory.create(5, 2)); + assertEquals(entry3.getValue(), PositionFactory.create(5, 1)); + assertEquals(entry3.getKey(), consumer3); Map.Entry entry4 = itr.next(); - assertEquals(entry4.getKey(), consumer3); - assertEquals(entry4.getValue(), PositionFactory.create(5, 3)); + assertEquals(entry4.getValue(), PositionFactory.create(5, 2)); + assertEquals(entry4.getKey(), consumer2); Map.Entry entry5 = itr.next(); - assertEquals(entry5.getKey(), consumer5); - assertEquals(entry5.getValue(), PositionFactory.create(6, 1)); + assertEquals(entry5.getValue(), PositionFactory.create(5, 3)); + assertEquals(entry5.getKey(), consumer4); + + Map.Entry entry6 = itr.next(); + assertEquals(entry6.getValue(), PositionFactory.create(6, 1)); + assertEquals(entry6.getKey(), consumer6); // cleanup. persistentDispatcher.close(); From 5a190f42d1156d0145c90fc1f87aba25a283cb6c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 1 Jan 2025 22:55:16 +0800 Subject: [PATCH 6/6] fix tests --- .../PersistentStickyKeyDispatcherMultipleConsumersClassic.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 451eb234a6aef..71f37c5939d6a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -594,6 +594,9 @@ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) { } public synchronized LinkedHashMap getRecentlyJoinedConsumers() { + if (recentlyJoinedConsumers == null) { + return null; + } return new LinkedHashMap<>(recentlyJoinedConsumers); }