Skip to content

Commit

Permalink
[fix] [proxy] Fix pattern consumer does not work when using Proxy (#2…
Browse files Browse the repository at this point in the history
…3489)

(cherry picked from commit a0beab0)
  • Loading branch information
poorbarcode authored and lhotari committed Oct 21, 2024
1 parent 986a4db commit 2bdf677
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ private void performGetTopicsOfNamespace(long clientRequestId,
Commands.newError(clientRequestId, getServerError(t), t.getMessage()));
} else {
writeAndFlush(
Commands.newGetTopicsOfNamespaceResponse(r.getTopics(), r.getTopicsHash(), r.isFiltered(),
Commands.newGetTopicsOfNamespaceResponse(r.getNonPartitionedOrPartitionTopics(),
r.getTopicsHash(), r.isFiltered(),
r.isChanged(), clientRequestId));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import static java.util.Objects.requireNonNull;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,6 +39,7 @@
import lombok.ToString;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Authentication;
Expand All @@ -56,6 +60,9 @@
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand All @@ -67,6 +74,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ProxyTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -102,6 +110,13 @@ protected void setup() throws Exception {
.createConfigurationMetadataStore();

proxyService.start();

// create default resources.
admin.clusters().createCluster(conf.getClusterName(),
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfo tenantInfo = new TenantInfoImpl(Collections.emptySet(), Collections.singleton(conf.getClusterName()));
admin.tenants().createTenant("public", tenantInfo);
admin.namespaces().createNamespace("public/default");
}

protected void initializeProxyConfig() throws Exception {
Expand Down Expand Up @@ -275,30 +290,65 @@ public void testRegexSubscription() throws Exception {
}
}

@Test(timeOut = 60_000)
public void testRegexSubscriptionWithTopicDiscovery() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build();
String subName = "regex-proxy-test-" + System.currentTimeMillis();
String regexSubscriptionPattern = "persistent://sample/test/local/.*";
try (Consumer<byte[]> consumer = client.newConsumer()
@DataProvider(name = "topicTypes")
public Object[][] topicTypes() {
return new Object[][]{
{TopicType.PARTITIONED},
{TopicType.NON_PARTITIONED}
};
}

@Test(timeOut = 60_000, dataProvider = "topicTypes", invocationCount = 100)
public void testRegexSubscriptionWithTopicDiscovery(TopicType topicType) throws Exception {
final PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build();
final int topics = 10;
final String subName = "s1";
final String namespace = "public/default";
final String topicPrefix = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp");
final String regexSubscriptionPattern = topicPrefix + ".*";
// Set retention policy to avoid flaky.
RetentionPolicies retentionPolicies = new RetentionPolicies(3600, 1024);
admin.namespaces().setRetention(namespace, retentionPolicies);
final List<String> topicList = new ArrayList<>();
// create topics
for (int i = 0; i < topics; i++) {
String topic = topicPrefix + i;
topicList.add(topic);
if (TopicType.PARTITIONED.equals(topicType)) {
admin.topics().createPartitionedTopic(topic, 2);
} else {
admin.topics().createNonPartitionedTopic(topic);
}
}
// Create consumer.
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topicsPattern(regexSubscriptionPattern)
.subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.patternAutoDiscoveryPeriod(10, TimeUnit.MINUTES)
.subscribe()) {
final int topics = 10;
final String topicPrefix = "persistent://sample/test/local/regex-topic-";
for (int i = 0; i < topics; i++) {
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(topicPrefix + i)
.create();
producer.send(("" + i).getBytes(UTF_8));
producer.close();
}
for (int i = 0; i < topics; i++) {
Message<byte[]> msg = consumer.receive();
assertEquals(topicPrefix + new String(msg.getValue(), UTF_8), msg.getTopicName());
.isAckReceiptEnabled(true)
.subscribe();
// Pub & Sub -> Verify
for (String topic : topicList) {
String msgPublished = topic + " -> msg";
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();
producer.send(msgPublished);
producer.close();
// Verify.
Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getValue(), msgPublished);
consumer.acknowledge(msg);
}
// cleanup.
consumer.close();
for (String topic : topicList) {
if (TopicType.PARTITIONED.equals(topicType)) {
admin.topics().deletePartitionedTopic(topic);
} else {
admin.topics().delete(topic);
}
}
}
Expand Down

0 comments on commit 2bdf677

Please sign in to comment.