Skip to content

Commit

Permalink
[fix] [txn] Get previous position by managed ledger. (#22024)
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled authored and Technoboy- committed Feb 22, 2024
1 parent 30697bd commit dade5e0
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
.checkAbortedTransaction(txnId)) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey());
//max read position is less than first ongoing transaction message position, so entryId -1
maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), firstPosition.getEntryId() - 1);
// max read position is less than first ongoing transaction message position
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.pulsar.broker.transaction.buffer;

import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
Expand All @@ -31,7 +33,14 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
Expand All @@ -46,13 +55,21 @@

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.fail;

public class TopicTransactionBufferTest extends TransactionTestBase {


Expand Down Expand Up @@ -130,15 +147,15 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep
}
})
.when(brokerService)
.newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService),
.newTopic(Mockito.eq(topic), any(), Mockito.eq(brokerService),
Mockito.eq(PersistentTopic.class));

brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap());

Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
PersistentTopic persistentTopic = reference.get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
assertTrue(buffer instanceof TopicTransactionBuffer);
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
Assert.assertEquals(ttb.getState(), expectState);
Expand All @@ -163,7 +180,7 @@ public void testCloseTransactionBufferWhenTimeout() throws Exception {
return persistentTopic;
})
.when(brokerService)
.newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService),
.newTopic(Mockito.eq(topic), any(), Mockito.eq(brokerService),
Mockito.eq(PersistentTopic.class));

CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, true);
Expand All @@ -172,11 +189,184 @@ public void testCloseTransactionBufferWhenTimeout() throws Exception {
.pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null);
PersistentTopic persistentTopic = reference.get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
assertTrue(buffer instanceof TopicTransactionBuffer);
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
Assert.assertEquals(ttb.getState(), expectState);
Assert.assertTrue(f.isCompletedExceptionally());
assertTrue(f.isCompletedExceptionally());
}

/**
* This test mainly test the following two point:
* 1. `getLastMessageIds` will get max read position.
* Send two message |1:0|1:1|; mock max read position as |1:0|; `getLastMessageIds` will get |1:0|.
* 2. `getLastMessageIds` will wait Transaction buffer recover completely.
* Mock `checkIfTBRecoverCompletely` return an exception, `getLastMessageIds` will fail too.
* Mock `checkIfTBRecoverCompletely` return null, `getLastMessageIds` will get correct result.
*/
@Test
public void testGetMaxPositionAfterTBReady() throws Exception {
// 1. Prepare test environment.
String topic = "persistent://" + NAMESPACE1 + "/testGetMaxReadyPositionAfterTBReady";
// 1.1 Mock component.
TransactionBuffer transactionBuffer = Mockito.spy(TransactionBuffer.class);
when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean()))
// Handle producer will check transaction buffer recover completely.
.thenReturn(CompletableFuture.completedFuture(null))
// If the Transaction buffer failed to recover, we can not get the correct last max read id.
.thenReturn(CompletableFuture.failedFuture(new Throwable("Mock fail")))
// If the transaction buffer recover successfully, the max read position can be acquired successfully.
.thenReturn(CompletableFuture.completedFuture(null));
TransactionBufferProvider transactionBufferProvider = Mockito.spy(TransactionBufferProvider.class);
Mockito.doReturn(transactionBuffer).when(transactionBufferProvider).newTransactionBuffer(any());
TransactionBufferProvider originalTBProvider = getPulsarServiceList().get(0).getTransactionBufferProvider();
Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
// 2. Building producer and consumer.
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
// 3. Send message and test the exception can be handled as expected.
MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().send();
producer.newMessage().send();
Mockito.doReturn(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId()))
.when(transactionBuffer).getMaxReadPosition();
try {
consumer.getLastMessageIds();
fail();
} catch (PulsarClientException exception) {
assertTrue(exception.getMessage().contains("Failed to recover Transaction Buffer."));
}
List<TopicMessageId> messageIdList = consumer.getLastMessageIds();
assertEquals(messageIdList.size(), 1);
TopicMessageIdImpl actualMessageID = (TopicMessageIdImpl) messageIdList.get(0);
assertEquals(messageId.getLedgerId(), actualMessageID.getLedgerId());
assertEquals(messageId.getEntryId(), actualMessageID.getEntryId());
// 4. Clean resource
Mockito.doReturn(originalTBProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
}

/**
* Add a E2E test for the get last message ID. It tests 4 cases.
* <p>
* 1. Only normal messages in the topic.
* 2. There are ongoing transactions, last message ID will not be updated until transaction end.
* 3. Aborted transaction will make the last message ID be updated as expected.
* 4. Committed transaction will make the last message ID be updated as expected.
* </p>
*/
@Test
public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
// 1. Prepare environment
String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions";
String subName = "my-subscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();

// 2. Test last max read position can be required correctly.
// 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
MessageIdImpl expectedLastMessageID = null;
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
}
assertMessageId(consumer, expectedLastMessageID, 0);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
// |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
producer.newMessage(txn1).send();
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();
producer.newMessage(txn2).send();
MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send();
// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
assertMessageId(consumer, expectedLastMessageID, 0);
// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
txn1.commit().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1, 0);
// 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
txn2.abort().get(5, TimeUnit.SECONDS);
// Todo: We can not ignore the marker's position in this fix.
assertMessageId(consumer, expectedLastMessageID2, 2);
}

/**
* produce 3 messages and then trigger a ledger switch,
* then create a transaction and send a transactional message.
* As there are messages in the new ledger, the reader should be able to read the messages.
* But reader.hasMessageAvailable() returns false if the entry id of max read position is -1.
* @throws Exception
*/
@Test
public void testGetLastMessageIdsWithOpenTransactionAtLedgerHead() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOpenTransactionAtLedgerHead";
String subName = "my-subscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
MessageId expectedLastMessageID = null;
for (int i = 0; i < 3; i++) {
expectedLastMessageID = producer.newMessage().value(String.valueOf(i).getBytes()).send();
System.out.println("expectedLastMessageID: " + expectedLastMessageID);
}
triggerLedgerSwitch(topic);
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
producer.newMessage(txn).send();

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
assertTrue(reader.hasMessageAvailable());
}

private void triggerLedgerSwitch(String topicName) throws Exception{
admin.topics().unload(topicName);
Awaitility.await().until(() -> {
CompletableFuture<Optional<Topic>> topicFuture =
getPulsarServiceList().get(0).getBrokerService().getTopic(topicName, false);
if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
return false;
}
Optional<Topic> topicOptional = topicFuture.join();
if (!topicOptional.isPresent()){
return false;
}
PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
});
}

private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, int entryOffset) throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0);
assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);
assertEquals(expected.getLedgerId(), actual.getLedgerId());
}

}

0 comments on commit dade5e0

Please sign in to comment.