Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-307: Add monitoring metrics for graceful closure of producers/consumers #21854

Merged
merged 40 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f032d14
Add ignoredAckCount metric during topic transfer
dragosvictor Dec 20, 2023
e3cbcbd
Merge remote-tracking branch 'origin/master' into pip-307-metrics
dragosvictor Dec 22, 2023
57dbc1d
Merge remote-tracking branch 'origin/master' into pip-307-metrics
dragosvictor Dec 26, 2023
143df2b
Refactor metrics
dragosvictor Dec 26, 2023
cf31c68
Add latency metrics
dragosvictor Dec 27, 2023
6a81f4b
Update metrics
dragosvictor Dec 27, 2023
7bd3933
Merge remote-tracking branch 'origin/master' into pip-307-metrics
dragosvictor Dec 28, 2023
e1e426c
Merge remote-tracking branch 'origin/master' into pip-307-metrics
dragosvictor Jan 2, 2024
04453eb
Move latency metrics to UnloadManager class
dragosvictor Jan 2, 2024
4060c61
Fix UnloadManagerTest fails
dragosvictor Jan 3, 2024
2bcb65c
Refactor common parts
dragosvictor Jan 3, 2024
528afdc
Cosmetic fix
dragosvictor Jan 3, 2024
dabbd4c
Merge remote-tracking branch 'origin/master' into pip-307-metrics
dragosvictor Jan 3, 2024
3fd7f7c
Measure RELEASE latency on exception
dragosvictor Jan 3, 2024
fd6c155
Notify state change listeners on ASSIGN for source brokers
dragosvictor Jan 3, 2024
2f97300
Notify destination broker listeners of RELEASE state
dragosvictor Jan 4, 2024
42bc848
Revert cosmetic fix
dragosvictor Jan 4, 2024
a792327
Add broker label to latency stats
dragosvictor Jan 4, 2024
f67e92a
Rename toMetrics -> getIgnoredCommandMetrics in ExtensibleLoadManager…
dragosvictor Jan 4, 2024
6359b85
Add units to latency metric
dragosvictor Jan 4, 2024
0956974
Add label metric bundleUnloading to latency metrics
dragosvictor Jan 4, 2024
6caa874
Move ignored commands counter to UnloadCounter class
dragosvictor Jan 4, 2024
9c0215a
Use histogram for latency metrics
dragosvictor Jan 5, 2024
d3ce1cf
Update histogram test
dragosvictor Jan 5, 2024
ec3a0a9
Revert to static fields to initialize latency metrics
dragosvictor Jan 5, 2024
febd46b
Add notify on arrival listener callback
dragosvictor Jan 5, 2024
3ecaf78
Fix label name for latency metrics
dragosvictor Jan 5, 2024
656feb7
Revert "Move ignored commands counter to UnloadCounter class"
dragosvictor Jan 5, 2024
89b31d1
Rename metric label bundleReleasing -> bundleUnloading for ignored co…
dragosvictor Jan 5, 2024
ebe573c
Fix notifyOnArrival callers
dragosvictor Jan 5, 2024
ef0f55f
Fix typo in brk_lb_disconnect_latency name
dragosvictor Jan 5, 2024
ebcdfb8
Change metric target for assign and disconnect latencies
dragosvictor Jan 5, 2024
0ce8e0d
Checkstyle fixes
dragosvictor Jan 5, 2024
4b7ad1f
Remove ASSIGN callbacks for source brokers
dragosvictor Jan 5, 2024
b99ebcd
Remove RELEASE callbacks for destination brokers
dragosvictor Jan 5, 2024
3865d60
Make notifyOnArrival private
dragosvictor Jan 5, 2024
f7ac553
Fix org.apache.pulsar.broker.stats.PrometheusMetricsTest#testBundlesM…
dragosvictor Jan 5, 2024
949710c
Merge remote-tracking branch 'origin/master' into pip-307-metrics
dragosvictor Jan 8, 2024
2b62397
Directly pass lookupServiceAddress parameter to UnloadManager
dragosvictor Jan 8, 2024
f9ce88b
Add comment regarding ExtensibleLoadManager enablement
dragosvictor Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

// Record the ignored send msg count during unloading
@Getter
private final AtomicLong ignoredSendMsgCounter = new AtomicLong();
private final AtomicLong ignoredSendMsgCount = new AtomicLong();
@Getter
private final AtomicLong ignoredAckCount = new AtomicLong();

// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference<>();
Expand Down Expand Up @@ -361,7 +363,7 @@ public void start() throws PulsarServerException {
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter);
this.unloadManager = new UnloadManager(unloadCounter, pulsar.getLookupServiceAddress());
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
Expand Down Expand Up @@ -874,12 +876,20 @@ public List<Metrics> getMetrics() {
}

metricsCollection.addAll(this.assignCounter.toMetrics(pulsar.getAdvertisedAddress()));

metricsCollection.addAll(this.serviceUnitStateChannel.getMetrics());
metricsCollection.addAll(getIgnoredCommandMetrics(pulsar.getAdvertisedAddress()));

return metricsCollection;
}

private List<Metrics> getIgnoredCommandMetrics(String advertisedBrokerAddress) {
var dimensions = Map.of("broker", advertisedBrokerAddress, "metric", "bundleUnloading");
var metric = Metrics.create(dimensions);
metric.put("brk_lb_ignored_ack_total", ignoredAckCount.get());
metric.put("brk_lb_ignored_send_total", ignoredSendMsgCount.get());
return List.of(metric);
}

private void monitor() {
try {
initWaiter.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public enum EventType {
Split,
Unload,
Override

}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,21 @@ public void close() {
public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T> future,
String serviceUnit,
ServiceUnitStateData data) {
return future.whenComplete((r, ex) -> notify(serviceUnit, data, ex));
return notifyOnArrival(serviceUnit, data).
thenCombine(future, (unused, t) -> t).
whenComplete((r, ex) -> notify(serviceUnit, data, ex));
}

private CompletableFuture<Void> notifyOnArrival(String serviceUnit, ServiceUnitStateData data) {
stateChangeListeners.forEach(listener -> {
try {
listener.beforeEvent(serviceUnit, data);
} catch (Throwable ex) {
log.error("StateChangeListener: {} exception while notifying arrival event {} for service unit {}",
listener, data, serviceUnit, ex);
}
});
return CompletableFuture.completedFuture(null);
}

public void notify(String serviceUnit, ServiceUnitStateData data, Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@
public interface StateChangeListener {

/**
* Handle the service unit state change.
* Called before the state change is handled.
*
* @param serviceUnit - Service Unit(Namespace bundle).
* @param data - Service unit state data.
*/
default void beforeEvent(String serviceUnit, ServiceUnitStateData data) { }

/**
* Called after the service unit state change has been handled.
*
* @param serviceUnit - Service Unit(Namespace bundle).
* @param data - Service unit state data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Histogram;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,13 +41,77 @@ public class UnloadManager implements StateChangeListener {

private final UnloadCounter counter;
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
private final String lookupServiceAddress;

public UnloadManager(UnloadCounter counter) {
@VisibleForTesting
public enum LatencyMetric {
UNLOAD(buildHistogram(
"brk_lb_unload_latency", "Total time duration of unload operations on source brokers"), true, false),
ASSIGN(buildHistogram(
"brk_lb_assign_latency", "Time spent in the load balancing ASSIGN state on destination brokers"),
false, true),
RELEASE(buildHistogram(
"brk_lb_release_latency", "Time spent in the load balancing RELEASE state on source brokers"), true, false),
DISCONNECT(buildHistogram(
"brk_lb_disconnect_latency", "Time spent in the load balancing disconnected state on source brokers"),
true, false);

private static Histogram buildHistogram(String name, String help) {
return Histogram.build(name, help).unit("ms").labelNames("broker", "metric").
buckets(new double[] {1.0, 10.0, 100.0, 200.0, 1000.0}).register();
}
private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);

private final Histogram histogram;
private final Map<String, CompletableFuture<Void>> futures = new ConcurrentHashMap<>();
private final boolean isSourceBrokerMetric;
private final boolean isDestinationBrokerMetric;

LatencyMetric(Histogram histogram, boolean isSourceBrokerMetric, boolean isDestinationBrokerMetric) {
this.histogram = histogram;
this.isSourceBrokerMetric = isSourceBrokerMetric;
this.isDestinationBrokerMetric = isDestinationBrokerMetric;
}

public void beginMeasurement(String serviceUnit, String lookupServiceAddress, ServiceUnitStateData data) {
if ((isSourceBrokerMetric && lookupServiceAddress.equals(data.sourceBroker()))
|| (isDestinationBrokerMetric && lookupServiceAddress.equals(data.dstBroker()))) {
var startTimeNs = System.nanoTime();
futures.computeIfAbsent(serviceUnit, ignore -> {
var future = new CompletableFuture<Void>();
future.completeOnTimeout(null, OP_TIMEOUT_NS, TimeUnit.NANOSECONDS).
thenAccept(__ -> {
var durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs);
log.info("Operation {} for service unit {} took {} ms", this, serviceUnit, durationMs);
histogram.labels(lookupServiceAddress, "bundleUnloading").observe(durationMs);
}).whenComplete((__, throwable) -> futures.remove(serviceUnit, future));
return future;
});
}
}

public void endMeasurement(String serviceUnit) {
var future = futures.get(serviceUnit);
if (future != null) {
future.complete(null);
}
}
}

public UnloadManager(UnloadCounter counter, String lookupServiceAddress) {
this.counter = counter;
this.inFlightUnloadRequest = new ConcurrentHashMap<>();
this.lookupServiceAddress = Objects.requireNonNull(lookupServiceAddress);
inFlightUnloadRequest = new ConcurrentHashMap<>();
}

private void complete(String serviceUnit, Throwable ex) {
LatencyMetric.UNLOAD.endMeasurement(serviceUnit);
LatencyMetric.DISCONNECT.endMeasurement(serviceUnit);
if (ex != null) {
LatencyMetric.RELEASE.endMeasurement(serviceUnit);
LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}

inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
if (!future.isDone()) {
if (ex != null) {
Expand All @@ -62,7 +129,6 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
UnloadDecision decision,
long timeout,
TimeUnit timeoutUnit) {

return eventPubFuture.thenCompose(__ -> inFlightUnloadRequest.computeIfAbsent(bundle, ignore -> {
if (log.isDebugEnabled()) {
log.debug("Handle unload bundle: {}, timeout: {} {}", bundle, timeout, timeoutUnit);
Expand All @@ -86,23 +152,40 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
});
}

@Override
public void beforeEvent(String serviceUnit, ServiceUnitStateData data) {
if (log.isDebugEnabled()) {
log.debug("Handling arrival of {} for service unit {}", data, serviceUnit);
}
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Free, Owned -> LatencyMetric.DISCONNECT.beginMeasurement(serviceUnit, lookupServiceAddress, data);
case Releasing -> {
LatencyMetric.RELEASE.beginMeasurement(serviceUnit, lookupServiceAddress, data);
LatencyMetric.UNLOAD.beginMeasurement(serviceUnit, lookupServiceAddress, data);
}
case Assigning -> LatencyMetric.ASSIGN.beginMeasurement(serviceUnit, lookupServiceAddress, data);
}
}

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
if (t != null) {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
}
this.complete(serviceUnit, t);
complete(serviceUnit, t);
return;
}

if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
}
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
default -> {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
}
}
case Free, Owned -> complete(serviceUnit, t);
case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit);
case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void updateUnloadBrokerCount(int unloadBrokerCount) {
}

public List<Metrics> toMetrics(String advertisedBrokerAddress) {

var metrics = new ArrayList<Metrics>();
var dimensions = new HashMap<String, String>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1788,15 +1788,18 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
printSendCommandDebug(send, headersAndPayload);
}

PulsarService pulsar = getBrokerService().pulsar();
// if the topic is transferring, we ignore send msg.
// New messages are silently ignored during topic transfer. Note that the transferring flag is only set when the
// Extensible Load Manager is enabled.
if (producer.getTopic().isTransferring()) {
long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
.getIgnoredSendMsgCounter().addAndGet(send.getNumMessages());
var pulsar = getBrokerService().pulsar();
var ignoredMsgCount = send.getNumMessages();
var ignoredSendMsgTotalCount = ExtensibleLoadManagerImpl.get(pulsar).getIgnoredSendMsgCount().
addAndGet(ignoredMsgCount);
if (log.isDebugEnabled()) {
log.debug("Ignored send msg from:{}:{} to fenced topic:{} while transferring."
+ " Ignored message count:{}.",
remoteAddress, send.getProducerId(), producer.getTopic().getName(), ignoredMsgCount);
log.debug("Ignoring {} messages from:{}:{} to fenced topic:{} while transferring."
+ " Total ignored message count: {}.",
ignoredMsgCount, remoteAddress, send.getProducerId(), producer.getTopic().getName(),
ignoredSendMsgTotalCount);
}
return;
}
Expand Down Expand Up @@ -1869,11 +1872,16 @@ protected void handleAck(CommandAck ack) {
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
Subscription subscription = consumer.getSubscription();
// Message acks are silently ignored during topic transfer. Note that the transferring flag is only set when
// the Extensible Load Manager is enabled.
if (subscription.getTopic().isTransferring()) {
// Message acks are silently ignored during topic transfer.
var pulsar = getBrokerService().getPulsar();
var ignoredAckCount = ack.getMessageIdsCount();
var ignoredAckTotalCount = ExtensibleLoadManagerImpl.get(pulsar).getIgnoredAckCount().
addAndGet(ignoredAckCount);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message acknowledgment during topic transfer, ack count: {}",
subscription, consumerId, ack.getMessageIdsCount());
log.debug("[{}] [{}] Ignoring {} message acks during topic transfer. Total ignored ack count: {}",
subscription, consumerId, ignoredAckCount, ignoredAckTotalCount);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Summary create() {
}
}

static class Child {
public static class Child {
private final DataSketchesSummaryLogger logger;
private final List<Double> quantiles;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,12 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
FieldUtils.writeDeclaredField(channel1, "handlerCounters", handlerCounters, true);
}

primaryLoadManager.getIgnoredSendMsgCount().incrementAndGet();
primaryLoadManager.getIgnoredSendMsgCount().incrementAndGet();
primaryLoadManager.getIgnoredAckCount().incrementAndGet();
primaryLoadManager.getIgnoredAckCount().incrementAndGet();
primaryLoadManager.getIgnoredAckCount().incrementAndGet();

var expected = Set.of(
"""
dimensions=[{broker=localhost, metric=loadBalancing}], metrics=[{brk_lb_bandwidth_in_usage=3.0, brk_lb_bandwidth_out_usage=4.0, brk_lb_cpu_usage=1.0, brk_lb_directMemory_usage=2.0, brk_lb_memory_usage=400.0}]
Expand Down Expand Up @@ -1311,8 +1317,9 @@ SplitDecision.Reason.Unknown, new AtomicLong(6))
dimensions=[{broker=localhost, metric=sunitStateChn, result=Schedule}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]
dimensions=[{broker=localhost, metric=sunitStateChn, result=Success}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, brk_sunit_state_chn_owned_su_total=10, brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
dimensions=[{broker=localhost, metric=bundleUnloading}], metrics=[{brk_lb_ignored_ack_total=3, brk_lb_ignored_send_total=2}]
""".split("\n"));
var actual = primaryLoadManager.getMetrics().stream().map(m -> m.toString()).collect(Collectors.toSet());
var actual = primaryLoadManager.getMetrics().stream().map(Metrics::toString).collect(Collectors.toSet());
assertEquals(actual, expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -49,7 +48,7 @@ public class UnloadManagerTest {
@Test
public void testEventPubFutureHasException() {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
UnloadManager manager = new UnloadManager(counter, "mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
CompletableFuture<Void> future =
Expand All @@ -69,7 +68,7 @@ public void testEventPubFutureHasException() {
@Test
public void testTimeout() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
UnloadManager manager = new UnloadManager(counter, "mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
CompletableFuture<Void> future =
Expand All @@ -93,7 +92,7 @@ public void testTimeout() throws IllegalAccessException {
@Test
public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
UnloadManager manager = new UnloadManager(counter, "mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
CompletableFuture<Void> future =
Expand Down Expand Up @@ -147,7 +146,7 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
@Test
public void testFailedStage() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
UnloadManager manager = new UnloadManager(counter, "mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
CompletableFuture<Void> future =
Expand Down Expand Up @@ -176,7 +175,7 @@ public void testFailedStage() throws IllegalAccessException {
@Test
public void testClose() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
UnloadManager manager = new UnloadManager(counter, "mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
CompletableFuture<Void> future =
Expand Down
Loading