Skip to content

Commit 34f88c0

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (apache#22856)
(cherry picked from commit 5fc0eaf) (cherry picked from commit ab8dba3)
1 parent e02f0ef commit 34f88c0

File tree

4 files changed

+167
-3
lines changed

4 files changed

+167
-3
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+6
Original file line numberDiff line numberDiff line change
@@ -1332,6 +1332,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
13321332
doc = "Max number of snapshot to be cached per subscription.")
13331333
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
13341334

1335+
@FieldContext(
1336+
category = CATEGORY_SERVER,
1337+
dynamic = true,
1338+
doc = "The position that replication task start at, it can be set to earliest or latest (default).")
1339+
private String replicationStartAt = "latest";
1340+
13351341
@FieldContext(
13361342
category = CATEGORY_SERVER,
13371343
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -1919,7 +1919,14 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
19191919
final CompletableFuture<Void> future = new CompletableFuture<>();
19201920

19211921
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
1922-
ledger.asyncOpenCursor(name, new OpenCursorCallback() {
1922+
final InitialPosition initialPosition;
1923+
if (MessageId.earliest.toString()
1924+
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
1925+
initialPosition = InitialPosition.Earliest;
1926+
} else {
1927+
initialPosition = InitialPosition.Latest;
1928+
}
1929+
ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() {
19231930
@Override
19241931
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
19251932
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

+101-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertFalse;
2727
import static org.testng.Assert.assertNotEquals;
28+
import static org.testng.Assert.assertNotNull;
2829
import static org.testng.Assert.assertNull;
2930
import static org.testng.Assert.assertTrue;
3031
import static org.testng.Assert.fail;
@@ -34,6 +35,7 @@
3435
import java.lang.reflect.Method;
3536
import java.time.Duration;
3637
import java.util.Arrays;
38+
import java.util.Collections;
3739
import java.util.Optional;
3840
import java.util.UUID;
3941
import java.util.concurrent.CompletableFuture;
@@ -68,11 +70,12 @@
6870
import org.apache.pulsar.client.impl.ProducerImpl;
6971
import org.apache.pulsar.client.impl.PulsarClientImpl;
7072
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
73+
import org.apache.pulsar.common.policies.data.RetentionPolicies;
74+
import org.apache.pulsar.common.policies.data.TopicStats;
75+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
7176
import org.apache.pulsar.common.naming.TopicName;
7277
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
73-
import org.apache.pulsar.common.policies.data.TopicStats;
7478
import org.apache.pulsar.common.util.FutureUtil;
75-
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
7679
import org.awaitility.Awaitility;
7780
import org.awaitility.reflect.WhiteboxImpl;
7881
import org.mockito.Mockito;
@@ -903,4 +906,100 @@ public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationL
903906
});
904907
}
905908
}
909+
910+
protected void enableReplication(String topic) throws Exception {
911+
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
912+
}
913+
914+
protected void disableReplication(String topic) throws Exception {
915+
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
916+
}
917+
918+
@Test
919+
public void testConfigReplicationStartAt() throws Exception {
920+
// Initialize.
921+
String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
922+
String subscription1 = "s1";
923+
admin1.namespaces().createNamespace(ns1);
924+
if (!usingGlobalZK) {
925+
admin2.namespaces().createNamespace(ns1);
926+
}
927+
928+
RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024);
929+
admin1.namespaces().setRetention(ns1, retentionPolicies);
930+
admin2.namespaces().setRetention(ns1, retentionPolicies);
931+
932+
// 1. default config.
933+
// Enable replication for topic1.
934+
final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
935+
admin1.topics().createNonPartitionedTopicAsync(topic1);
936+
admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest);
937+
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(topic1).create();
938+
p1.send("msg-1");
939+
p1.close();
940+
enableReplication(topic1);
941+
// Verify: since the replication was started at latest, there is no message to consume.
942+
Consumer<String> c1 = client2.newConsumer(Schema.STRING).topic(topic1).subscriptionName(subscription1)
943+
.subscribe();
944+
Message<String> msg1 = c1.receive(2, TimeUnit.SECONDS);
945+
assertNull(msg1);
946+
c1.close();
947+
disableReplication(topic1);
948+
949+
// 2.Update config: start at "earliest".
950+
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
951+
Awaitility.await().untilAsserted(() -> {
952+
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
953+
});
954+
955+
final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
956+
admin1.topics().createNonPartitionedTopicAsync(topic2);
957+
admin1.topics().createSubscription(topic2, subscription1, MessageId.earliest);
958+
Producer<String> p2 = client1.newProducer(Schema.STRING).topic(topic2).create();
959+
p2.send("msg-1");
960+
p2.close();
961+
enableReplication(topic2);
962+
// Verify: since the replication was started at earliest, there is one message to consume.
963+
Consumer<String> c2 = client2.newConsumer(Schema.STRING).topic(topic2).subscriptionName(subscription1)
964+
.subscribe();
965+
Message<String> msg2 = c2.receive(2, TimeUnit.SECONDS);
966+
assertNotNull(msg2);
967+
assertEquals(msg2.getValue(), "msg-1");
968+
c2.close();
969+
disableReplication(topic2);
970+
971+
// 2.Update config: start at "latest".
972+
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString());
973+
Awaitility.await().untilAsserted(() -> {
974+
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
975+
});
976+
977+
final String topic3 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
978+
admin1.topics().createNonPartitionedTopicAsync(topic3);
979+
admin1.topics().createSubscription(topic3, subscription1, MessageId.earliest);
980+
Producer<String> p3 = client1.newProducer(Schema.STRING).topic(topic3).create();
981+
p3.send("msg-1");
982+
p3.close();
983+
enableReplication(topic3);
984+
// Verify: since the replication was started at latest, there is no message to consume.
985+
Consumer<String> c3 = client2.newConsumer(Schema.STRING).topic(topic3).subscriptionName(subscription1)
986+
.subscribe();
987+
Message<String> msg3 = c3.receive(2, TimeUnit.SECONDS);
988+
assertNull(msg3);
989+
c3.close();
990+
disableReplication(topic3);
991+
992+
// cleanup.
993+
// There is no good way to delete topics when using global ZK, skip cleanup.
994+
admin1.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster1));
995+
admin1.namespaces().unload(ns1);
996+
admin2.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster2));
997+
admin2.namespaces().unload(ns1);
998+
admin1.topics().delete(topic1, false);
999+
admin2.topics().delete(topic1, false);
1000+
admin1.topics().delete(topic2, false);
1001+
admin2.topics().delete(topic2, false);
1002+
admin1.topics().delete(topic3, false);
1003+
admin2.topics().delete(topic3, false);
1004+
}
9061005
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

+52
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,19 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertNotNull;
23+
import java.util.Arrays;
24+
import java.util.HashSet;
25+
import java.util.UUID;
26+
import java.util.concurrent.TimeUnit;
2127
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.pulsar.broker.BrokerTestUtil;
29+
import org.apache.pulsar.client.api.Message;
30+
import org.apache.pulsar.client.api.MessageId;
31+
import org.apache.pulsar.client.api.Schema;
32+
import org.apache.pulsar.common.policies.data.RetentionPolicies;
33+
import org.awaitility.Awaitility;
2234
import org.testng.annotations.AfterClass;
2335
import org.testng.annotations.BeforeClass;
2436
import org.testng.annotations.Test;
@@ -109,4 +121,44 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except
109121
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
110122
super.testReloadWithTopicLevelGeoReplication(replicationLevel);
111123
}
124+
125+
@Test
126+
@Override
127+
public void testConfigReplicationStartAt() throws Exception {
128+
// Initialize.
129+
String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
130+
String subscription1 = "s1";
131+
admin1.namespaces().createNamespace(ns1);
132+
RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024);
133+
admin1.namespaces().setRetention(ns1, retentionPolicies);
134+
admin2.namespaces().setRetention(ns1, retentionPolicies);
135+
136+
// Update config: start at "earliest".
137+
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
138+
Awaitility.await().untilAsserted(() -> {
139+
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
140+
});
141+
142+
// Verify: since the replication was started at earliest, there is one message to consume.
143+
final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
144+
admin1.topics().createNonPartitionedTopicAsync(topic1);
145+
admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest);
146+
org.apache.pulsar.client.api.Producer<String> p1 = client1.newProducer(Schema.STRING).topic(topic1).create();
147+
p1.send("msg-1");
148+
p1.close();
149+
150+
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
151+
org.apache.pulsar.client.api.Consumer<String> c1 = client2.newConsumer(Schema.STRING).topic(topic1)
152+
.subscriptionName(subscription1).subscribe();
153+
Message<String> msg2 = c1.receive(2, TimeUnit.SECONDS);
154+
assertNotNull(msg2);
155+
assertEquals(msg2.getValue(), "msg-1");
156+
c1.close();
157+
158+
// cleanup.
159+
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString());
160+
Awaitility.await().untilAsserted(() -> {
161+
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
162+
});
163+
}
112164
}

0 commit comments

Comments
 (0)