From 0e59678194c36ef1a4f443ac18dba08ae8a1c372 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 29 Jul 2024 23:06:45 +0800 Subject: [PATCH] [fix] [broker] Internal reader of __change_events can not started after metadata store session rebuilt (#23018) (cherry picked from commit b955c6520d8db948048a1b2dc548a001ee396076) --- .../extensions/ExtensibleLoadManagerImpl.java | 3 +- .../impl/ModularLoadManagerImpl.java | 3 +- .../pulsar/broker/service/Ipv4Proxy.java | 197 ++++++++++++ .../broker/service/NetworkErrorTestBase.java | 297 ++++++++++++++++++ .../broker/service/ZkSessionExpireTest.java | 184 +++++++++++ .../metadata/impl/AbstractMetadataStore.java | 10 + 6 files changed, 692 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index dc57a923c7adc..e0ed8c9198d57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -125,7 +125,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private static final Set INTERNAL_TOPICS = Set.of(BROKER_LOAD_DATA_STORE_TOPIC, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC); - private PulsarService pulsar; + @VisibleForTesting + protected PulsarService pulsar; private ServiceConfiguration conf; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 990c918699386..b5c4de9e5a22d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -169,8 +169,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // Policies used to determine which brokers are available for particular namespaces. private SimpleResourceAllocationPolicies policies; + @VisibleForTesting // Pulsar service used to initialize this. - private PulsarService pulsar; + protected PulsarService pulsar; // Executor service used to update broker data. private final ExecutorService executors; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java new file mode 100644 index 0000000000000..a84dab4d17dff --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/Ipv4Proxy.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; + +public class Ipv4Proxy { + @Getter + private final int localPort; + private final String backendServerHost; + private final int backendServerPort; + private final EventLoopGroup serverGroup = new NioEventLoopGroup(1); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + private ChannelFuture localServerChannel; + private ServerBootstrap serverBootstrap = new ServerBootstrap(); + private List frontChannels = Collections.synchronizedList(new ArrayList<>()); + private AtomicBoolean rejectAllConnections = new AtomicBoolean(); + + public Ipv4Proxy(int localPort, String backendServerHost, int backendServerPort) { + this.localPort = localPort; + this.backendServerHost = backendServerHost; + this.backendServerPort = backendServerPort; + } + + public synchronized void startup() throws InterruptedException { + localServerChannel = serverBootstrap.group(serverGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new FrontendHandler()); + } + }).childOption(ChannelOption.AUTO_READ, false) + .bind(localPort).sync(); + } + + public synchronized void stop() throws InterruptedException{ + localServerChannel.channel().close().sync(); + serverGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + + private static void closeOnFlush(Channel ch) { + if (ch.isActive()) { + ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void disconnectFrontChannels() throws InterruptedException { + for (Channel channel : frontChannels) { + channel.close(); + } + } + + public void rejectAllConnections() throws InterruptedException { + rejectAllConnections.set(true); + } + + public void unRejectAllConnections() throws InterruptedException { + rejectAllConnections.set(false); + } + + private class FrontendHandler extends ChannelInboundHandlerAdapter { + + private Channel backendChannel; + + @Override + public void channelActive(ChannelHandlerContext ctx) { + if (rejectAllConnections.get()) { + ctx.close(); + return; + } + final Channel frontendChannel = ctx.channel(); + frontChannels.add(frontendChannel); + Bootstrap backendBootstrap = new Bootstrap(); + backendBootstrap.group(frontendChannel.eventLoop()) + .channel(ctx.channel().getClass()) + .handler(new BackendHandler(frontendChannel)) + .option(ChannelOption.AUTO_READ, false); + ChannelFuture backendChannelFuture = + backendBootstrap.connect(Ipv4Proxy.this.backendServerHost, Ipv4Proxy.this.backendServerPort); + backendChannel = backendChannelFuture.channel(); + backendChannelFuture.addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + frontendChannel.read(); + } else { + frontChannels.remove(frontendChannel); + frontendChannel.close(); + } + }); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + if (backendChannel.isActive()) { + backendChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + ctx.channel().read(); + } else { + future.channel().close(); + } + }); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + frontChannels.remove(ctx.channel()); + if (backendChannel != null) { + closeOnFlush(backendChannel); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + closeOnFlush(ctx.channel()); + } + } + + private class BackendHandler extends ChannelInboundHandlerAdapter { + + private final Channel frontendChannel; + + public BackendHandler(Channel inboundChannel) { + this.frontendChannel = inboundChannel; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + if (!frontendChannel.isActive()) { + closeOnFlush(ctx.channel()); + } else { + ctx.read(); + } + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + frontendChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + ctx.channel().read(); + } else { + future.channel().close(); + } + }); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + closeOnFlush(frontendChannel); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + closeOnFlush(ctx.channel()); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java new file mode 100644 index 0000000000000..36f8cb4761248 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.collect.Sets; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; +import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.tests.TestRetrySupport; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.reflect.WhiteboxImpl; + +@Slf4j +public abstract class NetworkErrorTestBase extends TestRetrySupport { + + protected final String defaultTenant = "public"; + protected final String defaultNamespace = defaultTenant + "/default"; + protected final String cluster1 = "r1"; + protected URL url1; + protected URL urlTls1; + protected URL url2; + protected URL urlTls2; + protected ServiceConfiguration config1 = new ServiceConfiguration(); + protected ServiceConfiguration config2 = new ServiceConfiguration(); + protected ZookeeperServerTest brokerConfigZk1; + protected Ipv4Proxy metadataZKProxy; + protected LocalBookkeeperEnsemble bkEnsemble1; + protected PulsarService pulsar1; + protected PulsarService pulsar2; + protected BrokerService broker1; + protected BrokerService broker2; + protected PulsarAdmin admin1; + protected PulsarAdmin admin2; + protected PulsarClient client1; + protected PulsarClient client2; + + private final static AtomicReference preferBroker = new AtomicReference<>(); + + protected void startZKAndBK() throws Exception { + // Start ZK & BK. + bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble1.start(); + + metadataZKProxy = new Ipv4Proxy(getOneFreePort(), "127.0.0.1", bkEnsemble1.getZookeeperPort()); + metadataZKProxy.startup(); + } + + protected void startBrokers() throws Exception { + // Start brokers. + setConfigDefaults(config1, cluster1, metadataZKProxy.getLocalPort()); + pulsar1 = new PulsarService(config1); + pulsar1.start(); + broker1 = pulsar1.getBrokerService(); + url1 = new URL(pulsar1.getWebServiceAddress()); + urlTls1 = new URL(pulsar1.getWebServiceAddressTls()); + + setConfigDefaults(config2, cluster1, bkEnsemble1.getZookeeperPort()); + pulsar2 = new PulsarService(config2); + pulsar2.start(); + broker2 = pulsar2.getBrokerService(); + url2 = new URL(pulsar2.getWebServiceAddress()); + urlTls2 = new URL(pulsar2.getWebServiceAddressTls()); + + log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(), broker2.getListenPort()); + } + + protected int getOneFreePort() throws IOException { + ServerSocket serverSocket = new ServerSocket(0); + int port = serverSocket.getLocalPort(); + serverSocket.close(); + return port; + } + + protected void startAdminClient() throws Exception { + admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); + admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); + } + + protected void startPulsarClient() throws Exception{ + ClientBuilder clientBuilder1 = PulsarClient.builder().serviceUrl(url1.toString()); + client1 = initClient(clientBuilder1); + ClientBuilder clientBuilder2 = PulsarClient.builder().serviceUrl(url2.toString()); + client2 = initClient(clientBuilder2); + } + + protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { + admin1.clusters().createCluster(cluster1, ClusterData.builder() + .serviceUrl(url1.toString()) + .serviceUrlTls(urlTls1.toString()) + .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()) + .brokerClientTlsEnabled(false) + .build()); + admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(), + Sets.newHashSet(cluster1))); + admin1.namespaces().createNamespace(defaultNamespace, Sets.newHashSet(cluster1)); + } + + @Override + protected void setup() throws Exception { + incrementSetupNumber(); + + log.info("--- Starting OneWayReplicatorTestBase::setup ---"); + + startZKAndBK(); + + startBrokers(); + + startAdminClient(); + + createDefaultTenantsAndClustersAndNamespace(); + + startPulsarClient(); + + Thread.sleep(100); + log.info("--- OneWayReplicatorTestBase::setup completed ---"); + } + + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, int zkPort) { + config.setClusterName(clusterName); + config.setAdvertisedAddress("localhost"); + config.setWebServicePort(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort); + config.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + zkPort + "/config_meta"); + config.setBrokerDeleteInactiveTopicsEnabled(false); + config.setBrokerDeleteInactiveTopicsFrequencySeconds(60); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + config.setBrokerServicePortTls(Optional.of(0)); + config.setBacklogQuotaCheckIntervalInSeconds(5); + config.setDefaultNumberOfNamespaceBundles(1); + config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + config.setEnableReplicatedSubscriptions(true); + config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + config.setLoadBalancerSheddingEnabled(false); + config.setForceDeleteNamespaceAllowed(true); + config.setLoadManagerClassName(PreferBrokerModularLoadManager.class.getName()); + config.setMetadataStoreSessionTimeoutMillis(5000); + } + + @Override + protected void cleanup() throws Exception { + // shutdown. + markCurrentSetupNumberCleaned(); + log.info("--- Shutting down ---"); + + // Stop brokers. + if (client1 != null) { + client1.close(); + client1 = null; + } + if (admin1 != null) { + admin1.close(); + admin1 = null; + } + if (client2 != null) { + client2.close(); + client2 = null; + } + if (admin2 != null) { + admin2.close(); + admin2 = null; + } + if (pulsar1 != null) { + pulsar1.close(); + pulsar1 = null; + } + if (pulsar2 != null) { + pulsar2.close(); + pulsar2 = null; + } + + // Stop ZK and BK. + if (bkEnsemble1 != null) { + bkEnsemble1.stop(); + bkEnsemble1 = null; + } + if (metadataZKProxy != null) { + metadataZKProxy.stop(); + } + if (brokerConfigZk1 != null) { + brokerConfigZk1.stop(); + brokerConfigZk1 = null; + } + + // Reset configs. + config1 = new ServiceConfiguration(); + preferBroker.set(null); + } + + protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception { + return clientBuilder.build(); + } + + protected static class PreferBrokerModularLoadManager extends ModularLoadManagerImpl { + + @Override + public String setNamespaceBundleAffinity(String bundle, String broker) { + if (StringUtils.isNotBlank(broker)) { + return broker; + } + Set availableBrokers = NetworkErrorTestBase.getAvailableBrokers(super.pulsar); + String prefer = preferBroker.get(); + if (availableBrokers.contains(prefer)) { + return prefer; + } else { + return null; + } + } + } + + protected static class PreferExtensibleLoadManager extends ExtensibleLoadManagerImpl { + + @Override + public CompletableFuture> selectAsync(ServiceUnitId bundle, + Set excludeBrokerSet, + LookupOptions options) { + Set availableBrokers = NetworkErrorTestBase.getAvailableBrokers(super.pulsar); + String prefer = preferBroker.get(); + if (availableBrokers.contains(prefer)) { + return CompletableFuture.completedFuture(Optional.of(prefer)); + } else { + return super.selectAsync(bundle, excludeBrokerSet, options); + } + } + } + + public void setPreferBroker(PulsarService target) { + for (PulsarService pulsar : Arrays.asList(pulsar1, pulsar2)) { + for (String broker : getAvailableBrokers(pulsar)) { + if (broker.endsWith(target.getBrokerListenPort().orElse(-1) + "") + || broker.endsWith(target.getListenPortHTTPS().orElse(-1) + "") + || broker.endsWith(target.getListenPortHTTP().orElse(-1) + "") + || broker.endsWith(target.getBrokerListenPortTls().orElse(-1) + "")) { + preferBroker.set(broker); + } + } + } + } + + public static Set getAvailableBrokers(PulsarService pulsar) { + Object loadManagerWrapper = pulsar.getLoadManager().get(); + Object loadManager = WhiteboxImpl.getInternalState(loadManagerWrapper, "loadManager"); + if (loadManager instanceof ModularLoadManagerImpl) { + return ((ModularLoadManagerImpl) loadManager).getAvailableBrokers(); + } else if (loadManager instanceof ExtensibleLoadManagerImpl) { + return new HashSet<>(((ExtensibleLoadManagerImpl) loadManager).getBrokerRegistry() + .getAvailableBrokersAsync().join()); + } else { + throw new RuntimeException("Not support for the load manager: " + loadManager.getClass().getName()); + } + } + + public void clearPreferBroker() { + preferBroker.set(null); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java new file mode 100644 index 0000000000000..143557b008b23 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ZkSessionExpireTest extends NetworkErrorTestBase { + + private java.util.function.Consumer settings; + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.cleanup(); + } + + private void setupWithSettings(java.util.function.Consumer settings) throws Exception { + this.settings = settings; + super.setup(); + } + + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, int zkPort) { + super.setConfigDefaults(config, clusterName, zkPort); + settings.accept(config); + } + + @DataProvider(name = "settings") + public Object[][] settings() { + return new Object[][]{ + {false, NetworkErrorTestBase.PreferBrokerModularLoadManager.class}, + {true, NetworkErrorTestBase.PreferBrokerModularLoadManager.class} + // Create a separate PR to add this test case. + // {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class}. + }; + } + + @Test(timeOut = 600 * 1000, dataProvider = "settings") + public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, Class loadManager) throws Exception { + // Setup. + setupWithSettings(config -> { + config.setManagedLedgerMaxEntriesPerLedger(1); + config.setSystemTopicEnabled(enableSystemTopic); + config.setTopicLevelPoliciesEnabled(enableSystemTopic); + if (loadManager != null) { + config.setLoadManagerClassName(loadManager.getName()); + } + }); + + // Init topic. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + + // Inject a prefer mechanism, so that all topics will be assigned to broker1, which can be injected a ZK + // session expire error. + setPreferBroker(pulsar1); + admin1.namespaces().unload(defaultNamespace); + admin2.namespaces().unload(defaultNamespace); + + // Confirm all brokers registered. + Awaitility.await().untilAsserted(() -> { + assertEquals(getAvailableBrokers(pulsar1).size(), 2); + assertEquals(getAvailableBrokers(pulsar2).size(), 2); + }); + + // Load up a topic, and it will be assigned to broker1. + ProducerImpl p1 = (ProducerImpl) client1.newProducer(Schema.STRING).topic(topicName) + .sendTimeout(10, TimeUnit.SECONDS).create(); + Topic broker1Topic1 = pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + assertNotNull(broker1Topic1); + clearPreferBroker(); + + // Inject a ZK session expire error, and wait for broker1 to offline. + metadataZKProxy.rejectAllConnections(); + metadataZKProxy.disconnectFrontChannels(); + Awaitility.await().untilAsserted(() -> { + assertEquals(getAvailableBrokers(pulsar2).size(), 1); + }); + + // Send messages continuously. + // Verify: the topic was transferred to broker2. + CompletableFuture broker1Send1 = p1.sendAsync("broker1_msg1"); + Producer p2 = client2.newProducer(Schema.STRING).topic(topicName) + .sendTimeout(10, TimeUnit.SECONDS).create(); + CompletableFuture broker2Send1 = p2.sendAsync("broker2_msg1"); + Awaitility.await().untilAsserted(() -> { + CompletableFuture> future = pulsar2.getBrokerService().getTopic(topicName, false); + assertNotNull(future); + assertTrue(future.isDone() && !future.isCompletedExceptionally()); + Optional optional = future.join(); + assertTrue(optional != null && !optional.isEmpty()); + }); + + // Both two brokers assumed they are the owner of the topic. + Topic broker1Topic2 = pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + Topic broker2Topic2 = pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + assertNotNull(broker1Topic2); + assertNotNull(broker2Topic2); + + // Send messages continuously. + // Publishing to broker-1 will fail. + // Publishing to broker-2 will success. + CompletableFuture broker1Send2 = p1.sendAsync("broker1_msg2"); + CompletableFuture broker2Send2 = p2.sendAsync("broker2_msg2"); + try { + broker1Send1.join(); + broker1Send2.join(); + p1.getClientCnx(); + fail("expected a publish error"); + } catch (Exception ex) { + // Expected. + } + broker2Send1.join(); + broker2Send2.join(); + + // Broker rebuild ZK session. + metadataZKProxy.unRejectAllConnections(); + Awaitility.await().untilAsserted(() -> { + assertEquals(getAvailableBrokers(pulsar1).size(), 2); + assertEquals(getAvailableBrokers(pulsar2).size(), 2); + }); + + // Verify: the topic on broker-1 will be unloaded. + // Verify: the topic on broker-2 is fine. + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + CompletableFuture> future = pulsar1.getBrokerService().getTopic(topicName, false); + assertTrue(future == null || future.isCompletedExceptionally()); + }); + Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + assertNotNull(broker2Topic3); + + // Send messages continuously. + // Verify: p1.send will success(it will connect to broker-2). + // Verify: p2.send will success. + CompletableFuture broker1Send3 = p1.sendAsync("broker1_msg3"); + CompletableFuture broker2Send3 = p2.sendAsync("broker2_msg3"); + broker1Send3.join(); + broker2Send3.join(); + + long msgBacklog = admin2.topics().getStats(topicName).getSubscriptions().get("s1").getMsgBacklog(); + log.info("msgBacklog: {}", msgBacklog); + + // cleanup. + p1.close(); + p2.close(); + admin2.topics().delete(topicName, false); + } +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index d099d79d05c4d..8d7743dd791f1 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -487,6 +487,16 @@ public void registerSessionListener(Consumer listener) { protected void receivedSessionEvent(SessionEvent event) { isConnected = event.isConnected(); + + // Clear cache after session expired. + if (event == SessionEvent.SessionReestablished || event == SessionEvent.Reconnected) { + for (MetadataCacheImpl metadataCache : metadataCaches) { + metadataCache.invalidateAll(); + } + invalidateAll(); + } + + // Notice listeners. try { executor.execute(() -> { sessionListeners.forEach(l -> {