Skip to content

Commit a66ff17

Browse files
[improve][broker] Close protocol handlers before unloading namespace bundles (apache#22728)
1 parent ae9616b commit a66ff17

File tree

5 files changed

+258
-11
lines changed

5 files changed

+258
-11
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -444,13 +444,20 @@ public void close() throws PulsarServerException {
444444
public CompletableFuture<Void> closeAsync() {
445445
mutex.lock();
446446
try {
447+
// Close protocol handler before unloading namespace bundles because protocol handlers might maintain
448+
// Pulsar clients that could send lookup requests that affect unloading.
449+
if (protocolHandlers != null) {
450+
protocolHandlers.close();
451+
protocolHandlers = null;
452+
}
447453
if (closeFuture != null) {
448454
return closeFuture;
449455
}
450456
LOG.info("Closing PulsarService");
451457
if (brokerService != null) {
452458
brokerService.unloadNamespaceBundlesGracefully();
453459
}
460+
// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests
454461
state = State.Closing;
455462

456463
// close the service in reverse order v.s. in which they are started
@@ -512,11 +519,6 @@ public CompletableFuture<Void> closeAsync() {
512519
(long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT
513520
* getConfiguration()
514521
.getBrokerShutdownTimeoutMs())));
515-
// close protocol handler before closing broker service
516-
if (protocolHandlers != null) {
517-
protocolHandlers.close();
518-
protocolHandlers = null;
519-
}
520522

521523
// cancel loadShedding task and shutdown the loadManager executor before shutting down the broker
522524
cancelLoadBalancerTasks();

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
114114
public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
115115
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
116116
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
117-
private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
117+
public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
118118
public static final long VERSION_ID_INIT = 1; // initial versionId
119119
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
120120
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.protocol;
20+
21+
import io.netty.channel.ChannelInitializer;
22+
import io.netty.channel.socket.SocketChannel;
23+
import java.net.InetSocketAddress;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import lombok.Cleanup;
32+
import org.apache.pulsar.broker.ServiceConfiguration;
33+
import org.apache.pulsar.broker.service.BrokerService;
34+
import org.apache.pulsar.client.admin.PulsarAdmin;
35+
import org.apache.pulsar.client.admin.PulsarAdminException;
36+
import org.apache.pulsar.client.api.MessageId;
37+
import org.apache.pulsar.client.api.PulsarClient;
38+
import org.apache.pulsar.client.api.PulsarClientException;
39+
import org.apache.pulsar.client.api.Reader;
40+
import org.apache.pulsar.common.naming.TopicName;
41+
import org.apache.pulsar.common.policies.data.ClusterData;
42+
import org.apache.pulsar.common.policies.data.TenantInfo;
43+
44+
public class PulsarClientBasedHandler implements ProtocolHandler {
45+
46+
static final String PROTOCOL = "test";
47+
48+
private String topic;
49+
private int partitions;
50+
private String cluster;
51+
private PulsarClient client;
52+
private List<Reader<byte[]>> readers;
53+
private ExecutorService executor;
54+
private volatile boolean running = false;
55+
volatile long closeTimeMs;
56+
57+
@Override
58+
public String protocolName() {
59+
return PROTOCOL;
60+
}
61+
62+
@Override
63+
public boolean accept(String protocol) {
64+
return protocol.equals(PROTOCOL);
65+
}
66+
67+
@Override
68+
public void initialize(ServiceConfiguration conf) throws Exception {
69+
final var properties = conf.getProperties();
70+
topic = (String) properties.getOrDefault("metadata.topic", "metadata-topic");
71+
partitions = (Integer) properties.getOrDefault("metadata.partitions", 1);
72+
cluster = conf.getClusterName();
73+
}
74+
75+
@Override
76+
public String getProtocolDataToAdvertise() {
77+
return "";
78+
}
79+
80+
@Override
81+
public void start(BrokerService service) {
82+
try {
83+
final var port = service.getPulsar().getListenPortHTTP().orElseThrow();
84+
@Cleanup final var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build();
85+
try {
86+
admin.clusters().createCluster(cluster, ClusterData.builder()
87+
.serviceUrl(service.getPulsar().getWebServiceAddress())
88+
.serviceUrlTls(service.getPulsar().getWebServiceAddressTls())
89+
.brokerServiceUrl(service.getPulsar().getBrokerServiceUrl())
90+
.brokerServiceUrlTls(service.getPulsar().getBrokerServiceUrlTls())
91+
.build());
92+
} catch (PulsarAdminException ignored) {
93+
}
94+
try {
95+
admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Set.of(cluster)).build());
96+
} catch (PulsarAdminException ignored) {
97+
}
98+
try {
99+
admin.namespaces().createNamespace("public/default");
100+
} catch (PulsarAdminException ignored) {
101+
}
102+
} catch (PulsarClientException e) {
103+
throw new RuntimeException(e);
104+
}
105+
try {
106+
final var port = service.getListenPort().orElseThrow();
107+
client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + port).build();
108+
readers = new ArrayList<>();
109+
for (int i = 0; i < partitions; i++) {
110+
readers.add(client.newReader().topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)
111+
.startMessageId(MessageId.earliest).create());
112+
}
113+
running = true;
114+
executor = Executors.newSingleThreadExecutor();
115+
executor.execute(() -> {
116+
while (running) {
117+
readers.forEach(reader -> {
118+
try {
119+
reader.readNext(1, TimeUnit.MILLISECONDS);
120+
} catch (PulsarClientException ignored) {
121+
}
122+
});
123+
}
124+
});
125+
} catch (PulsarClientException e) {
126+
throw new RuntimeException(e);
127+
}
128+
}
129+
130+
@Override
131+
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
132+
return Map.of();
133+
}
134+
135+
@Override
136+
public void close() {
137+
final var start = System.currentTimeMillis();
138+
running = false;
139+
if (client != null) {
140+
try {
141+
client.close();
142+
} catch (PulsarClientException ignored) {
143+
}
144+
client = null;
145+
}
146+
if (executor != null) {
147+
executor.shutdown();
148+
executor = null;
149+
}
150+
closeTimeMs = System.currentTimeMillis() - start;
151+
}
152+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.protocol;
20+
21+
import java.io.File;
22+
import java.util.Optional;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.bookkeeper.util.PortManager;
25+
import org.apache.commons.io.FileUtils;
26+
import org.apache.pulsar.broker.PulsarServerException;
27+
import org.apache.pulsar.broker.PulsarService;
28+
import org.apache.pulsar.broker.ServiceConfiguration;
29+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
30+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
31+
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
32+
import org.testng.Assert;
33+
import org.testng.annotations.AfterClass;
34+
import org.testng.annotations.BeforeClass;
35+
import org.testng.annotations.Test;
36+
37+
@Slf4j
38+
public class PulsarClientBasedHandlerTest {
39+
40+
private final static String clusterName = "cluster";
41+
private final static int shutdownTimeoutMs = 100;
42+
private final int zkPort = PortManager.nextFreePort();
43+
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort);
44+
private File tempDirectory;
45+
private PulsarService pulsar;
46+
47+
@BeforeClass
48+
public void setup() throws Exception {
49+
bk.start();
50+
final var config = new ServiceConfiguration();
51+
config.setClusterName(clusterName);
52+
config.setAdvertisedAddress("localhost");
53+
config.setBrokerServicePort(Optional.of(0));
54+
config.setWebServicePort(Optional.of(0));
55+
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
56+
57+
tempDirectory = SimpleProtocolHandlerTestsBase.configureProtocolHandler(config,
58+
PulsarClientBasedHandler.class.getName(), true);
59+
60+
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
61+
config.setLoadBalancerDebugModeEnabled(true);
62+
config.setBrokerShutdownTimeoutMs(shutdownTimeoutMs);
63+
64+
pulsar = new PulsarService(config);
65+
pulsar.start();
66+
}
67+
68+
@Test(timeOut = 30000)
69+
public void testStopBroker() throws PulsarServerException {
70+
final var beforeStop = System.currentTimeMillis();
71+
final var handler = (PulsarClientBasedHandler) pulsar.getProtocolHandlers()
72+
.protocol(PulsarClientBasedHandler.PROTOCOL);
73+
pulsar.close();
74+
final var elapsedMs = System.currentTimeMillis() - beforeStop;
75+
log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs);
76+
Assert.assertTrue(elapsedMs < ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
77+
+ handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes
78+
}
79+
80+
@AfterClass(alwaysRun = true)
81+
public void cleanup() throws Exception {
82+
bk.stop();
83+
if (tempDirectory != null) {
84+
FileUtils.deleteDirectory(tempDirectory);
85+
}
86+
}
87+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,18 @@ public SimpleProtocolHandlerTestsBase(boolean useSeparateThreadPool) {
127127
@BeforeClass
128128
@Override
129129
protected void setup() throws Exception {
130-
tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
130+
tempDirectory = configureProtocolHandler(conf, MyProtocolHandler.class.getName(), useSeparateThreadPool);
131+
super.baseSetup();
132+
}
133+
134+
static File configureProtocolHandler(ServiceConfiguration conf, String className, boolean useSeparateThreadPool)
135+
throws Exception {
136+
final var tempDirectory = Files.createTempDirectory("SimpleProtocolHandlerTest").toFile();
131137
conf.setUseSeparateThreadPoolForProtocolHandlers(useSeparateThreadPool);
132138
conf.setProtocolHandlerDirectory(tempDirectory.getAbsolutePath());
133139
conf.setMessagingProtocols(Collections.singleton("test"));
134-
buildMockNarFile(tempDirectory);
135-
super.baseSetup();
140+
buildMockNarFile(tempDirectory, className);
141+
return tempDirectory;
136142
}
137143

138144
@Test
@@ -163,7 +169,7 @@ protected void cleanup() throws Exception {
163169
}
164170
}
165171

166-
private static void buildMockNarFile(File tempDirectory) throws Exception {
172+
private static void buildMockNarFile(File tempDirectory, String className) throws Exception {
167173
File file = new File(tempDirectory, "temp.nar");
168174
try (ZipOutputStream zipfile = new ZipOutputStream(new FileOutputStream(file))) {
169175

@@ -176,7 +182,7 @@ private static void buildMockNarFile(File tempDirectory) throws Exception {
176182
zipfile.putNextEntry(manifest);
177183
String yaml = "name: test\n" +
178184
"description: this is a test\n" +
179-
"handlerClass: " + MyProtocolHandler.class.getName() + "\n";
185+
"handlerClass: " + className + "\n";
180186
zipfile.write(yaml.getBytes(StandardCharsets.UTF_8));
181187
zipfile.closeEntry();
182188
}

0 commit comments

Comments
 (0)