Skip to content

Commit f48bce5

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix] [broker] Fix config replicationStartAt does not work when set it to earliest (apache#23719)
Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 39f4ccd) (cherry picked from commit ab69c3d)
1 parent d5dfffb commit f48bce5

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1988,9 +1988,13 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
19881988
final CompletableFuture<Void> future = new CompletableFuture<>();
19891989

19901990
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
1991+
String replicationStartAt = getBrokerService().getPulsar().getConfiguration().getReplicationStartAt();
19911992
final InitialPosition initialPosition;
1992-
if (MessageId.earliest.toString()
1993-
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
1993+
// "MessageId.earliest.toString()" is "-1:-1:-1", which is not suggested, just guarantee compatibility with the
1994+
// previous version.
1995+
// "InitialPosition.Earliest.name()" is "Earliest", which is suggested.
1996+
if (MessageId.earliest.toString().equalsIgnoreCase(replicationStartAt)
1997+
|| InitialPosition.Earliest.name().equalsIgnoreCase(replicationStartAt)) {
19941998
initialPosition = InitialPosition.Earliest;
19951999
} else {
19962000
initialPosition = InitialPosition.Latest;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -993,9 +993,9 @@ public void testConfigReplicationStartAt() throws Exception {
993993
disableReplication(topic1);
994994

995995
// 2.Update config: start at "earliest".
996-
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
996+
admin1.brokers().updateDynamicConfiguration("replicationStartAt", "earliest");
997997
Awaitility.await().untilAsserted(() -> {
998-
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
998+
assertEquals(pulsar1.getConfiguration().getReplicationStartAt(), "earliest");
999999
});
10001000

10011001
final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");

0 commit comments

Comments
 (0)