From 98741f6d0ff7a89e7ed997eed47f623b973f75b6 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 5 Feb 2020 16:05:08 +0800 Subject: [PATCH 1/6] Skips compaction phases if the topic is empty. --- .../java/org/apache/pulsar/client/api/RawReader.java | 7 +++++++ .../org/apache/pulsar/client/impl/RawReaderImpl.java | 5 +++++ .../apache/pulsar/compaction/TwoPhaseCompactor.java | 12 ++++++++++-- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index f9d297f483d2f..caf44ee95653d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -44,6 +44,13 @@ public static CompletableFuture create(PulsarClient client, String to */ String getTopic(); + /** + * Check if there is any message available to read. + * + * @return a completable future which will return whether there is any message available to read. + */ + CompletableFuture hasMessageAvailableAsync(); + /** * Seek to a location in the topic. After the seek, the first message read will be the one with * with the specified message ID. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index a6db7c64739e7..de397e2fd2ae4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -67,6 +67,11 @@ public String getTopic() { .findFirst().orElse(null); } + @Override + public CompletableFuture hasMessageAvailableAsync() { + return consumer.hasMessageAvailableAsync(); + } + @Override public CompletableFuture seekAsync(MessageId messageId) { return consumer.seekAsync(messageId); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 62503ffffdd12..95715cf960210 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -72,8 +72,16 @@ public TwoPhaseCompactor(ServiceConfiguration conf, @Override protected CompletableFuture doCompaction(RawReader reader, BookKeeper bk) { - return phaseOne(reader).thenCompose( - (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk)); + return reader.hasMessageAvailableAsync() + .thenCompose(available -> { + if (available) { + return phaseOne(reader).thenCompose( + (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk)); + } else { + log.info("Skip compaction of the empty topic {}", reader.getTopic()); + return CompletableFuture.completedFuture(-1L); + } + }); } private CompletableFuture phaseOne(RawReader reader) { From 3112c7cd7d367540e09a0e8553e54da206f3b334 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 5 Feb 2020 18:32:14 +0800 Subject: [PATCH 2/6] End the phase two loop of TwoPhaseCompactor even if the last message is a deletion. --- .../pulsar/compaction/TwoPhaseCompactor.java | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 95715cf960210..c4e74281db0d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -249,28 +249,38 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map } } - messageToAdd.ifPresent((toAdd) -> { - try { - outstanding.acquire(); - CompletableFuture addFuture = addToCompactedLedger(lh, toAdd) + if (messageToAdd.isPresent()) { + try { + outstanding.acquire(); + CompletableFuture addFuture = addToCompactedLedger(lh, messageToAdd.get()) .whenComplete((res, exception2) -> { - outstanding.release(); - if (exception2 != null) { - promise.completeExceptionally(exception2); - } - }); - if (to.equals(id)) { - addFuture.whenComplete((res, exception2) -> { - if (exception2 == null) { - promise.complete(null); - } - }); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - promise.completeExceptionally(ie); + outstanding.release(); + if (exception2 != null) { + promise.completeExceptionally(exception2); + } + }); + if (to.equals(id)) { + addFuture.whenComplete((res, exception2) -> { + if (exception2 == null) { + promise.complete(null); + } + }); } - }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(ie); + } + } else if (to.equals(id)) { + try { + // make sure all inflight writes have finished + outstanding.acquire(MAX_OUTSTANDING); + promise.complete(null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(e); + } + return; + } phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise); }, scheduler); } From cd653e78d674c0d7b56c3ed85f56db390132ba6f Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Thu, 6 Feb 2020 10:32:22 +0800 Subject: [PATCH 3/6] Fixes flow control for RawReader --- .../apache/pulsar/client/impl/RawReaderImpl.java | 13 ++++++++++++- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index de397e2fd2ae4..6a4d682d2b8ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -33,9 +33,11 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,6 +141,15 @@ void tryCompletePending() { if (future == null) { assert(messageAndCnx == null); } else { + int numMsg; + try { + MessageMetadata msgMetadata = Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload()); + numMsg = msgMetadata.getNumMessagesInBatch(); + msgMetadata.recycle(); + } catch (Throwable t) { + // TODO message validation + numMsg = 1; + } if (!future.complete(messageAndCnx.msg)) { messageAndCnx.msg.close(); closeAsync(); @@ -146,7 +157,7 @@ void tryCompletePending() { ClientCnx currentCnx = cnx(); if (currentCnx == messageAndCnx.cnx) { - increaseAvailablePermits(currentCnx); + increaseAvailablePermits(currentCnx, numMsg); } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 33f05192aee33..919a280238676 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1142,7 +1142,7 @@ void increaseAvailablePermits(ClientCnx currentCnx) { increaseAvailablePermits(currentCnx, 1); } - private void increaseAvailablePermits(ClientCnx currentCnx, int delta) { + protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) { int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta); while (available >= receiverQueueRefillThreshold && !paused) { From 4bfdae2c8a6622f1dac4f367cea5ce048920ff59 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Thu, 6 Feb 2020 15:38:09 +0800 Subject: [PATCH 4/6] Fixes tests for compaction. --- .../pulsar/compaction/TwoPhaseCompactor.java | 7 ++----- .../pulsar/compaction/CompactionTest.java | 20 ++++++++++++------- .../pulsar/compaction/CompactorTest.java | 2 +- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index c4e74281db0d5..06afe9310433f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -241,11 +241,6 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map messageToAdd = Optional.of(m); } else { m.close(); - // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not - // present under latestForKey. Complete the compaction. - if (to.equals(id)) { - promise.complete(null); - } } } @@ -271,6 +266,8 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map promise.completeExceptionally(ie); } } else if (to.equals(id)) { + // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not + // present under latestForKey. Complete the compaction. try { // make sure all inflight writes have finished outstanding.acquire(MAX_OUTSTANDING); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 81d063ec28a98..95a234ecb5249 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -64,6 +64,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class CompactionTest extends MockedPulsarServiceBaseTest { @@ -1250,11 +1251,16 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception { } } - @Test(timeOut = 20000) - public void testCompactionWithLastDeletedKey() throws Exception { + @DataProvider(name = "lastDeletedBatching") + public static Object[][] lastDeletedBatching() { + return new Object[][] {{true}, {false}}; + } + + @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") + public void testCompactionWithLastDeletedKey(boolean batching) throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false) + Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -1277,11 +1283,11 @@ public void testCompactionWithLastDeletedKey() throws Exception { } } - @Test(timeOut = 20000) - public void testEmptyCompactionLedger() throws Exception { + @Test(timeOut = 20000, dataProvider = "lastDeletedBatching") + public void testEmptyCompactionLedger(boolean batching) throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false) + Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(batching) .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -1292,7 +1298,7 @@ public void testEmptyCompactionLedger() throws Exception { producer.newMessage().key("2").value("".getBytes()).send(); Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); - compactor.compact(topic).get(); + Long r = compactor.compact(topic).get(); // consumer with readCompacted enabled only get compacted entries try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index f418bc56d1ea6..130793d78b173 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -207,7 +207,7 @@ public void testCompactedInOrder() throws Exception { Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a")); } - @Test(expectedExceptions = ExecutionException.class) + @Test public void testCompactEmptyTopic() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; From 9a9ec877473472ac5e8a941c4eb2a68b84ec92f6 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Thu, 6 Feb 2020 16:17:27 +0800 Subject: [PATCH 5/6] Refines compaction tests. --- .../test/java/org/apache/pulsar/compaction/CompactionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 95a234ecb5249..233246ca7ad4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1298,7 +1298,7 @@ public void testEmptyCompactionLedger(boolean batching) throws Exception { producer.newMessage().key("2").value("".getBytes()).send(); Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); - Long r = compactor.compact(topic).get(); + compactor.compact(topic).get(); // consumer with readCompacted enabled only get compacted entries try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") From 04345d5637f6b3947e5c3844649a4db9fe9ccc87 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Thu, 6 Feb 2020 19:10:01 +0800 Subject: [PATCH 6/6] Adds a test for batch message flow control of RawReader. --- .../pulsar/client/impl/RawReaderTest.java | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index e57e88d72e9a1..b0c7cd1830055 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -74,10 +74,16 @@ public void cleanup() throws Exception { } private Set publishMessages(String topic, int count) throws Exception { + return publishMessages(topic, count, false); + } + + private Set publishMessages(String topic, int count, boolean batching) throws Exception { Set keys = new HashSet<>(); try (Producer producer = pulsarClient.newProducer() - .enableBatching(false) + .enableBatching(batching) + // easier to create enough batches with a small batch size + .batchingMaxMessages(10) .messageRoutingMode(MessageRoutingMode.SinglePartition) .maxPendingMessages(count) .topic(topic) @@ -233,6 +239,34 @@ public void testFlowControl() throws Exception { Assert.assertEquals(keys.size(), numMessages); } + @Test + public void testFlowControlBatch() throws Exception { + int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5; + String topic = "persistent://my-property/my-ns/my-raw-topic"; + + publishMessages(topic, numMessages, true); + + RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); + Set keys = new HashSet<>(); + + while (true) { + try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) { + Assert.assertTrue(RawBatchConverter.isReadableBatch(m)); + List> batchKeys = RawBatchConverter.extractIdsAndKeys(m); + // Assert each key is unique + for (ImmutablePair pair : batchKeys) { + String key = pair.right; + Assert.assertTrue( + keys.add(key), + "Received duplicated key '" + key + "' : already received keys = " + keys); + } + } catch (TimeoutException te) { + break; + } + } + Assert.assertEquals(keys.size(), numMessages); + } + @Test public void testBatchingExtractKeysAndIds() throws Exception { String topic = "persistent://my-property/my-ns/my-raw-topic";