From 5ab05129514c1e71a09ec3f28b2b2dda9ce3e47f Mon Sep 17 00:00:00 2001 From: Wenzhi Feng <52550727+thetumbled@users.noreply.github.com> Date: Wed, 8 May 2024 19:34:00 +0800 Subject: [PATCH] [fix] [broker] rename to changeMaxReadPositionCount (#22656) --- .../buffer/impl/TopicTransactionBuffer.java | 16 ++++++++-------- .../broker/transaction/TransactionTest.java | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index a36216bd6258b..81c9ecfc728e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen */ private final LinkedMap ongoingTxns = new LinkedMap<>(); - // when add abort or change max read position, the count will +1. Take snapshot will set 0 into it. - private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong(); + // when change max read position, the count will +1. Take snapshot will reset the count. + private final AtomicLong changeMaxReadPositionCount = new AtomicLong(); private final LongAdder txnCommittedCounter = new LongAdder(); @@ -429,15 +429,15 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { } private void takeSnapshotByChangeTimes() { - if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) { - this.changeMaxReadPositionAndAddAbortTimes.set(0); + if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) { + this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { - if (changeMaxReadPositionAndAddAbortTimes.get() > 0) { - this.changeMaxReadPositionAndAddAbortTimes.set(0); + if (changeMaxReadPositionCount.get() > 0) { + this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, @@ -454,7 +454,7 @@ void updateMaxReadPosition(TxnID txnID) { maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { - this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); + this.changeMaxReadPositionCount.getAndIncrement(); } } @@ -489,7 +489,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { maxReadPosition = position; - changeMaxReadPositionAndAddAbortTimes.incrementAndGet(); + changeMaxReadPositionCount.incrementAndGet(); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index e8c15d193a22d..5e806bb9ceee2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1095,10 +1095,10 @@ public void testCancelTxnTimeout() throws Exception{ } @Test - public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception { + public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService() - .getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true) + .getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true) .get().get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Field processorField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor"); @@ -1106,9 +1106,9 @@ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) processorField.get(buffer); Field changeTimeField = TopicTransactionBuffer - .class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes"); + .class.getDeclaredField("changeMaxReadPositionCount"); changeTimeField.setAccessible(true); - AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) changeTimeField.get(buffer); + AtomicLong changeMaxReadPositionCount = (AtomicLong) changeTimeField.get(buffer); Field field1 = TopicTransactionBufferState.class.getDeclaredField("state"); field1.setAccessible(true); @@ -1117,10 +1117,10 @@ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer); Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot); }); - Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L); + Assert.assertEquals(changeMaxReadPositionCount.get(), 0L); buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1)); - Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L); + Assert.assertEquals(changeMaxReadPositionCount.get(), 0L); }