From 446c066d5f4ab654ad4871faf5459a84a643faa2 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 23 Sep 2024 09:11:15 +0800 Subject: [PATCH] Add unit test --- .../ExtensibleLoadManagerImplTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 4f6a006918318..bb161a0844f0e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.Cleanup; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.reflect.FieldUtils; @@ -101,6 +102,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.extensions.store.TableViewLoadDataStoreImpl; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; @@ -110,6 +112,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -130,12 +133,14 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.AssertJUnit; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; @@ -2106,6 +2111,43 @@ public void compactionScheduleTest() { }); } + @Test + public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception { + String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC; + NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService()); + Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); + + var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class); + brokerLoadDataStore.init(); + brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get(); + Awaitility.await().until(() -> { + var data = brokerLoadDataStore.get("key"); + return data.isPresent(); + }); + brokerLoadDataStore.pushAsync("key", null).get(); + brokerLoadDataStore.close(); + } + + @Data + private static class BrokerLoadDataV1 { + private ResourceUsage cpu; + private ResourceUsage memory; + private ResourceUsage directMemory; + private ResourceUsage bandwidthIn; + private ResourceUsage bandwidthOut; + private double msgThroughputIn; + private double msgThroughputOut; + private double msgRateIn; + private double msgRateOut; + private int bundleCount; + private int topics; + private double maxResourceUsage; + private double weightedMaxEMA; + private double msgThroughputEMA; + private long updatedAt; + private long reportedAt; + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override