Skip to content

Commit

Permalink
[fix][broker] Fix setReplicatedSubscriptionStatus incorrect behavior (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liudezhi2098 authored Nov 12, 2023
1 parent ea1fc0f commit b949187
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,12 @@ public boolean setReplicated(boolean replicated) {

if (this.cursor != null) {
if (replicated) {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
if (!config.isEnableReplicatedSubscriptions()) {
log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the "
+ "configuration enableReplicatedSubscriptions", topicName, subName, replicated);
} else {
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
} else {
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -50,11 +51,13 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -621,6 +624,27 @@ public void testReplicatedSubscriptionRestApi2() throws Exception {
String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages));
}

@Test(timeOut = 30000)
public void testReplicatedSubscriptionRestApi3() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("geo/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/topic-rest-api3";
final String subName = "sub";
admin4.tenants().createTenant("geo",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid4"), Sets.newHashSet(cluster1, cluster4)));
admin4.namespaces().createNamespace(namespace);
admin4.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster4));
admin4.topics().createPartitionedTopic(topicName, 2);

@Cleanup
final PulsarClient client4 = PulsarClient.builder().serviceUrl(url4.toString())
.statsInterval(0, TimeUnit.SECONDS).build();

Consumer<byte[]> consumer4 = client4.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
Assert.expectThrows(PulsarAdminException.class, () ->
admin4.topics().setReplicatedSubscriptionStatus(topicName, subName, true));
consumer4.close();
}

/**
* Tests replicated subscriptions when replicator producer is closed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
PulsarAdmin admin3;
LocalBookkeeperEnsemble bkEnsemble3;

URL url4;
URL urlTls4;
ServiceConfiguration config4 = new ServiceConfiguration();
PulsarService pulsar4;
PulsarAdmin admin4;
LocalBookkeeperEnsemble bkEnsemble4;

ZookeeperServerTest globalZkS;

ExecutorService executor;
Expand Down Expand Up @@ -111,6 +118,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
protected final String cluster1 = "r1";
protected final String cluster2 = "r2";
protected final String cluster3 = "r3";
protected final String cluster4 = "r4";

// Default frequency
public int getBrokerServicePurgeInactiveFrequency() {
Expand Down Expand Up @@ -178,6 +186,21 @@ protected void setup() throws Exception {
urlTls3 = new URL(pulsar3.getWebServiceAddressTls());
admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();

// Start region 4

// Start zk & bks
bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble4.start();

setConfig4DefaultValue();
pulsar4 = new PulsarService(config4);
pulsar4.start();

url4 = new URL(pulsar4.getWebServiceAddress());
urlTls4 = new URL(pulsar4.getWebServiceAddressTls());
admin4 = PulsarAdmin.builder().serviceHttpUrl(url4.toString()).build();


// Provision the global namespace
admin1.clusters().createCluster(cluster1, ClusterData.builder()
.serviceUrl(url1.toString())
Expand Down Expand Up @@ -230,6 +253,23 @@ protected void setup() throws Exception {
.brokerClientTlsTrustStorePassword(keyStorePassword)
.brokerClientTlsTrustStoreType(keyStoreType)
.build());
admin4.clusters().createCluster(cluster4, ClusterData.builder()
.serviceUrl(url4.toString())
.serviceUrlTls(urlTls4.toString())
.brokerServiceUrl(pulsar4.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(true)
.brokerClientCertificateFilePath(clientCertFilePath)
.brokerClientKeyFilePath(clientKeyFilePath)
.brokerClientTrustCertsFilePath(caCertFilePath)
.brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore)
.brokerClientTlsKeyStore(clientKeyStorePath)
.brokerClientTlsKeyStorePassword(keyStorePassword)
.brokerClientTlsKeyStoreType(keyStoreType)
.brokerClientTlsTrustStore(clientTrustStorePath)
.brokerClientTlsTrustStorePassword(keyStorePassword)
.brokerClientTlsTrustStoreType(keyStoreType)
.build());

admin1.tenants().createTenant("pulsar",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
Expand Down Expand Up @@ -257,7 +297,7 @@ protected void setup() throws Exception {
}

public void setConfig3DefaultValue() {
setConfigDefaults(config3, "r3", bkEnsemble3);
setConfigDefaults(config3, cluster3, bkEnsemble3);
config3.setTlsEnabled(true);
}

Expand All @@ -269,6 +309,11 @@ public void setConfig2DefaultValue() {
setConfigDefaults(config2, cluster2, bkEnsemble2);
}

public void setConfig4DefaultValue() {
setConfigDefaults(config4, cluster4, bkEnsemble4);
config4.setEnableReplicatedSubscriptions(false);
}

private void setConfigDefaults(ServiceConfiguration config, String clusterName,
LocalBookkeeperEnsemble bookkeeperEnsemble) {
config.setClusterName(clusterName);
Expand Down Expand Up @@ -316,6 +361,11 @@ public void resetConfig3() {
setConfig3DefaultValue();
}

public void resetConfig4() {
config4 = new ServiceConfiguration();
setConfig4DefaultValue();
}

private int inSec(int time, TimeUnit unit) {
return (int) TimeUnit.SECONDS.convert(time, unit);
}
Expand All @@ -332,7 +382,11 @@ protected void cleanup() throws Exception {
admin1.close();
admin2.close();
admin3.close();
admin4.close();

if (pulsar4 != null) {
pulsar4.close();
}
if (pulsar3 != null) {
pulsar3.close();
}
Expand All @@ -346,11 +400,13 @@ protected void cleanup() throws Exception {
bkEnsemble1.stop();
bkEnsemble2.stop();
bkEnsemble3.stop();
bkEnsemble4.stop();
globalZkS.stop();

resetConfig1();
resetConfig2();
resetConfig3();
resetConfig4();
}

static class MessageProducer implements AutoCloseable {
Expand Down

0 comments on commit b949187

Please sign in to comment.