From 1d78d7af87856f5c14085e8bb8c7e82473985433 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 6 May 2024 20:21:43 +0800 Subject: [PATCH 1/3] rename to changeMaxReadPositionTimes. --- .../buffer/impl/TopicTransactionBuffer.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index a36216bd6258b..fd535780c0f4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -76,8 +76,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen */ private final LinkedMap ongoingTxns = new LinkedMap<>(); - // when add abort or change max read position, the count will +1. Take snapshot will set 0 into it. - private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong(); + // when change max read position, the count will +1. Take snapshot will reset the count. + private final AtomicLong changeMaxReadPositionTimes = new AtomicLong(); private final LongAdder txnCommittedCounter = new LongAdder(); @@ -429,15 +429,15 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { } private void takeSnapshotByChangeTimes() { - if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) { - this.changeMaxReadPositionAndAddAbortTimes.set(0); + if (changeMaxReadPositionTimes.get() >= takeSnapshotIntervalNumber) { + this.changeMaxReadPositionTimes.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { - if (changeMaxReadPositionAndAddAbortTimes.get() > 0) { - this.changeMaxReadPositionAndAddAbortTimes.set(0); + if (changeMaxReadPositionTimes.get() > 0) { + this.changeMaxReadPositionTimes.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, @@ -454,7 +454,7 @@ void updateMaxReadPosition(TxnID txnID) { maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { - this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); + this.changeMaxReadPositionTimes.getAndIncrement(); } } @@ -489,7 +489,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { maxReadPosition = position; - changeMaxReadPositionAndAddAbortTimes.incrementAndGet(); + changeMaxReadPositionTimes.incrementAndGet(); } } } From 845122dc5024789e3a9db545158c97bba955856d Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 8 May 2024 10:10:10 +0800 Subject: [PATCH 2/3] rename. --- .../buffer/impl/TopicTransactionBuffer.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index fd535780c0f4e..81c9ecfc728e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -77,7 +77,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final LinkedMap ongoingTxns = new LinkedMap<>(); // when change max read position, the count will +1. Take snapshot will reset the count. - private final AtomicLong changeMaxReadPositionTimes = new AtomicLong(); + private final AtomicLong changeMaxReadPositionCount = new AtomicLong(); private final LongAdder txnCommittedCounter = new LongAdder(); @@ -429,15 +429,15 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { } private void takeSnapshotByChangeTimes() { - if (changeMaxReadPositionTimes.get() >= takeSnapshotIntervalNumber) { - this.changeMaxReadPositionTimes.set(0); + if (changeMaxReadPositionCount.get() >= takeSnapshotIntervalNumber) { + this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } } private void takeSnapshotByTimeout() { - if (changeMaxReadPositionTimes.get() > 0) { - this.changeMaxReadPositionTimes.set(0); + if (changeMaxReadPositionCount.get() > 0) { + this.changeMaxReadPositionCount.set(0); this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition); } this.timer.newTimeout(TopicTransactionBuffer.this, @@ -454,7 +454,7 @@ void updateMaxReadPosition(TxnID txnID) { maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { - this.changeMaxReadPositionTimes.getAndIncrement(); + this.changeMaxReadPositionCount.getAndIncrement(); } } @@ -489,7 +489,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) { } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { maxReadPosition = position; - changeMaxReadPositionTimes.incrementAndGet(); + changeMaxReadPositionCount.incrementAndGet(); } } } From 3e2e3a40186441b3eeec6fadbc0f8db4d268cb52 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 8 May 2024 16:13:16 +0800 Subject: [PATCH 3/3] fix test code --- .../pulsar/broker/transaction/TransactionTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index ed1b74c46e0f0..504482a43f383 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1095,10 +1095,10 @@ public void testCancelTxnTimeout() throws Exception{ } @Test - public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception { + public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService() - .getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true) + .getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true) .get().get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Field processorField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor"); @@ -1106,9 +1106,9 @@ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) processorField.get(buffer); Field changeTimeField = TopicTransactionBuffer - .class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes"); + .class.getDeclaredField("changeMaxReadPositionCount"); changeTimeField.setAccessible(true); - AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) changeTimeField.get(buffer); + AtomicLong changeMaxReadPositionCount = (AtomicLong) changeTimeField.get(buffer); Field field1 = TopicTransactionBufferState.class.getDeclaredField("state"); field1.setAccessible(true); @@ -1117,10 +1117,10 @@ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer); Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot); }); - Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L); + Assert.assertEquals(changeMaxReadPositionCount.get(), 0L); buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1)); - Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L); + Assert.assertEquals(changeMaxReadPositionCount.get(), 0L); }