Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Sep 23, 2024
1 parent 6c5a4ef commit 446c066
Showing 1 changed file with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -101,6 +102,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.store.TableViewLoadDataStoreImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
Expand All @@ -110,6 +112,7 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -130,12 +133,14 @@
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
Expand Down Expand Up @@ -2106,6 +2111,43 @@ public void compactionScheduleTest() {
});
}

@Test
public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception {
String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService());
Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());

var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class);
brokerLoadDataStore.init();
brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get();
Awaitility.await().until(() -> {
var data = brokerLoadDataStore.get("key");
return data.isPresent();
});
brokerLoadDataStore.pushAsync("key", null).get();
brokerLoadDataStore.close();
}

@Data
private static class BrokerLoadDataV1 {
private ResourceUsage cpu;
private ResourceUsage memory;
private ResourceUsage directMemory;
private ResourceUsage bandwidthIn;
private ResourceUsage bandwidthOut;
private double msgThroughputIn;
private double msgThroughputOut;
private double msgRateIn;
private double msgRateOut;
private int bundleCount;
private int topics;
private double maxResourceUsage;
private double weightedMaxEMA;
private double msgThroughputEMA;
private long updatedAt;
private long reportedAt;
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down

0 comments on commit 446c066

Please sign in to comment.