Skip to content

Commit

Permalink
[Issue 6173][compaction] Fix log compaction for flow control/empty to…
Browse files Browse the repository at this point in the history
…pic/last deletion (#6237)

Fixes #6173

### Motivation

Fixes problems for log compaction found in issue #6173 :

1. Compaction fails for an empty topic. 
2. Compaction never ends if the value of the last message is an empty batch message when the compaction is triggered. 
3. Compaction fails for a topic with batch messages because RawReader flow control doesn't handle batch messages properly.

### Modifications

1. Check if any message is available before compaction phases, and finish the compaction immediately if there is no messages to read to avoid timeout exception.
2. Add missing check for empty batch message for the condition to end the phase 2 loop.
3. Increase correct number of available permits in RawConsumer for batch messages.

### Verifying this change

Producing messages in both batch and not-batch mode in corresponding tests.
  • Loading branch information
fantapsody authored Feb 10, 2020
1 parent 5fe096d commit d3f6c55
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public static CompletableFuture<RawReader> create(PulsarClient client, String to
*/
String getTopic();

/**
* Check if there is any message available to read.
*
* @return a completable future which will return whether there is any message available to read.
*/
CompletableFuture<Boolean> hasMessageAvailableAsync();

/**
* Seek to a location in the topic. After the seek, the first message read will be the one with
* with the specified message ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -67,6 +69,11 @@ public String getTopic() {
.findFirst().orElse(null);
}

@Override
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
return consumer.hasMessageAvailableAsync();
}

@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
return consumer.seekAsync(messageId);
Expand Down Expand Up @@ -134,14 +141,23 @@ void tryCompletePending() {
if (future == null) {
assert(messageAndCnx == null);
} else {
int numMsg;
try {
MessageMetadata msgMetadata = Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload());
numMsg = msgMetadata.getNumMessagesInBatch();
msgMetadata.recycle();
} catch (Throwable t) {
// TODO message validation
numMsg = 1;
}
if (!future.complete(messageAndCnx.msg)) {
messageAndCnx.msg.close();
closeAsync();
}

ClientCnx currentCnx = cnx();
if (currentCnx == messageAndCnx.cnx) {
increaseAvailablePermits(currentCnx);
increaseAvailablePermits(currentCnx, numMsg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,16 @@ public TwoPhaseCompactor(ServiceConfiguration conf,

@Override
protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
return phaseOne(reader).thenCompose(
(r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
return reader.hasMessageAvailableAsync()
.thenCompose(available -> {
if (available) {
return phaseOne(reader).thenCompose(
(r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
} else {
log.info("Skip compaction of the empty topic {}", reader.getTopic());
return CompletableFuture.completedFuture(-1L);
}
});
}

private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
Expand Down Expand Up @@ -233,36 +241,43 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
messageToAdd = Optional.of(m);
} else {
m.close();
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
// present under latestForKey. Complete the compaction.
if (to.equals(id)) {
promise.complete(null);
}
}
}

messageToAdd.ifPresent((toAdd) -> {
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, toAdd)
if (messageToAdd.isPresent()) {
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
.whenComplete((res, exception2) -> {
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
}
});
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
}
} else if (to.equals(id)) {
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
// present under latestForKey. Complete the compaction.
try {
// make sure all inflight writes have finished
outstanding.acquire(MAX_OUTSTANDING);
promise.complete(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
promise.completeExceptionally(e);
}
return;
}
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
}, scheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,16 @@ public void cleanup() throws Exception {
}

private Set<String> publishMessages(String topic, int count) throws Exception {
return publishMessages(topic, count, false);
}

private Set<String> publishMessages(String topic, int count, boolean batching) throws Exception {
Set<String> keys = new HashSet<>();

try (Producer<byte[]> producer = pulsarClient.newProducer()
.enableBatching(false)
.enableBatching(batching)
// easier to create enough batches with a small batch size
.batchingMaxMessages(10)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.maxPendingMessages(count)
.topic(topic)
Expand Down Expand Up @@ -233,6 +239,34 @@ public void testFlowControl() throws Exception {
Assert.assertEquals(keys.size(), numMessages);
}

@Test
public void testFlowControlBatch() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
String topic = "persistent://my-property/my-ns/my-raw-topic";

publishMessages(topic, numMessages, true);

RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
Set<String> keys = new HashSet<>();

while (true) {
try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
Assert.assertTrue(RawBatchConverter.isReadableBatch(m));
List<ImmutablePair<MessageId, String>> batchKeys = RawBatchConverter.extractIdsAndKeys(m);
// Assert each key is unique
for (ImmutablePair<MessageId, String> pair : batchKeys) {
String key = pair.right;
Assert.assertTrue(
keys.add(key),
"Received duplicated key '" + key + "' : already received keys = " + keys);
}
} catch (TimeoutException te) {
break;
}
}
Assert.assertEquals(keys.size(), numMessages);
}

@Test
public void testBatchingExtractKeysAndIds() throws Exception {
String topic = "persistent://my-property/my-ns/my-raw-topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class CompactionTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -1250,11 +1251,16 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
}
}

@Test(timeOut = 20000)
public void testCompactionWithLastDeletedKey() throws Exception {
@DataProvider(name = "lastDeletedBatching")
public static Object[][] lastDeletedBatching() {
return new Object[][] {{true}, {false}};
}

@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactionWithLastDeletedKey(boolean batching) throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();

pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
Expand All @@ -1277,11 +1283,11 @@ public void testCompactionWithLastDeletedKey() throws Exception {
}
}

@Test(timeOut = 20000)
public void testEmptyCompactionLedger() throws Exception {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testEmptyCompactionLedger(boolean batching) throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();

pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void testCompactedInOrder() throws Exception {
Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a"));
}

@Test(expectedExceptions = ExecutionException.class)
@Test
public void testCompactEmptyTopic() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}

private void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);

while (available >= receiverQueueRefillThreshold && !paused) {
Expand Down

0 comments on commit d3f6c55

Please sign in to comment.