Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 6173][compaction] Fix log compaction for flow control/empty topic/last deletion #6237

Merged
merged 7 commits into from
Feb 10, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public static CompletableFuture<RawReader> create(PulsarClient client, String to
*/
String getTopic();

/**
* Check if there is any message available to read.
*
* @return a completable future which will return whether there is any message available to read.
*/
CompletableFuture<Boolean> hasMessageAvailableAsync();

/**
* Seek to a location in the topic. After the seek, the first message read will be the one with
* with the specified message ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -67,6 +69,11 @@ public String getTopic() {
.findFirst().orElse(null);
}

@Override
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
return consumer.hasMessageAvailableAsync();
}

@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
return consumer.seekAsync(messageId);
Expand Down Expand Up @@ -134,14 +141,23 @@ void tryCompletePending() {
if (future == null) {
assert(messageAndCnx == null);
} else {
int numMsg;
try {
MessageMetadata msgMetadata = Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload());
numMsg = msgMetadata.getNumMessagesInBatch();
msgMetadata.recycle();
} catch (Throwable t) {
// TODO message validation
numMsg = 1;
}
if (!future.complete(messageAndCnx.msg)) {
messageAndCnx.msg.close();
closeAsync();
}

ClientCnx currentCnx = cnx();
if (currentCnx == messageAndCnx.cnx) {
increaseAvailablePermits(currentCnx);
increaseAvailablePermits(currentCnx, numMsg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,16 @@ public TwoPhaseCompactor(ServiceConfiguration conf,

@Override
protected CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk) {
return phaseOne(reader).thenCompose(
(r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
return reader.hasMessageAvailableAsync()
.thenCompose(available -> {
if (available) {
return phaseOne(reader).thenCompose(
(r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk));
} else {
log.info("Skip compaction of the empty topic {}", reader.getTopic());
return CompletableFuture.completedFuture(-1L);
}
});
}

private CompletableFuture<PhaseOneResult> phaseOne(RawReader reader) {
Expand Down Expand Up @@ -233,36 +241,43 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
messageToAdd = Optional.of(m);
} else {
m.close();
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
// present under latestForKey. Complete the compaction.
if (to.equals(id)) {
promise.complete(null);
}
}
}

messageToAdd.ifPresent((toAdd) -> {
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, toAdd)
if (messageToAdd.isPresent()) {
try {
outstanding.acquire();
CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
.whenComplete((res, exception2) -> {
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
outstanding.release();
if (exception2 != null) {
promise.completeExceptionally(exception2);
}
});
if (to.equals(id)) {
addFuture.whenComplete((res, exception2) -> {
if (exception2 == null) {
promise.complete(null);
}
});
}
});
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
promise.completeExceptionally(ie);
}
} else if (to.equals(id)) {
// Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
// present under latestForKey. Complete the compaction.
try {
// make sure all inflight writes have finished
outstanding.acquire(MAX_OUTSTANDING);
promise.complete(null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
promise.completeExceptionally(e);
}
return;
}
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
}, scheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,16 @@ public void cleanup() throws Exception {
}

private Set<String> publishMessages(String topic, int count) throws Exception {
return publishMessages(topic, count, false);
}

private Set<String> publishMessages(String topic, int count, boolean batching) throws Exception {
Set<String> keys = new HashSet<>();

try (Producer<byte[]> producer = pulsarClient.newProducer()
.enableBatching(false)
.enableBatching(batching)
// easier to create enough batches with a small batch size
.batchingMaxMessages(10)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.maxPendingMessages(count)
.topic(topic)
Expand Down Expand Up @@ -233,6 +239,34 @@ public void testFlowControl() throws Exception {
Assert.assertEquals(keys.size(), numMessages);
}

@Test
public void testFlowControlBatch() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
String topic = "persistent://my-property/my-ns/my-raw-topic";

publishMessages(topic, numMessages, true);

RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
Set<String> keys = new HashSet<>();

while (true) {
try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
Assert.assertTrue(RawBatchConverter.isReadableBatch(m));
List<ImmutablePair<MessageId, String>> batchKeys = RawBatchConverter.extractIdsAndKeys(m);
// Assert each key is unique
for (ImmutablePair<MessageId, String> pair : batchKeys) {
String key = pair.right;
Assert.assertTrue(
keys.add(key),
"Received duplicated key '" + key + "' : already received keys = " + keys);
}
} catch (TimeoutException te) {
break;
}
}
Assert.assertEquals(keys.size(), numMessages);
}

@Test
public void testBatchingExtractKeysAndIds() throws Exception {
String topic = "persistent://my-property/my-ns/my-raw-topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class CompactionTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -1250,11 +1251,16 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
}
}

@Test(timeOut = 20000)
public void testCompactionWithLastDeletedKey() throws Exception {
@DataProvider(name = "lastDeletedBatching")
public static Object[][] lastDeletedBatching() {
return new Object[][] {{true}, {false}};
}

@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactionWithLastDeletedKey(boolean batching) throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();

pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
Expand All @@ -1277,11 +1283,11 @@ public void testCompactionWithLastDeletedKey() throws Exception {
}
}

@Test(timeOut = 20000)
public void testEmptyCompactionLedger() throws Exception {
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testEmptyCompactionLedger(boolean batching) throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false)
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(batching)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();

pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void testCompactedInOrder() throws Exception {
Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a"));
}

@Test(expectedExceptions = ExecutionException.class)
@Test
public void testCompactEmptyTopic() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}

private void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);

while (available >= receiverQueueRefillThreshold && !paused) {
Expand Down