Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry #22058

Merged
merged 45 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a6cef35
Add Topic lookup Metrics
dragosvictor Feb 13, 2024
4d2aca1
Merge remote-tracking branch 'origin/master' into pip-264-metric-conv…
dragosvictor Feb 13, 2024
0011515
Fix conflict
dragosvictor Feb 13, 2024
5ea2aee
Add javadoc
dragosvictor Feb 13, 2024
1bb795c
Add test infra
dragosvictor Feb 13, 2024
291c7a0
Add TopicLookupMetricsTest
dragosvictor Feb 14, 2024
c065850
Add more tests
dragosvictor Feb 14, 2024
20c41c4
Update tests
dragosvictor Feb 14, 2024
03fd6af
Move some tests to BrokerServiceLookupTest
dragosvictor Feb 15, 2024
1f62a39
Validate metric pulsar.broker.lookup.answer
dragosvictor Feb 15, 2024
a954255
Validate metric pulsar.broker.lookup.latency
dragosvictor Feb 15, 2024
deb39b1
Validate metric pulsar.broker.lookup.pending.request
dragosvictor Feb 15, 2024
17f98da
Validate metric pulsar.broker.topic.load.pending.request
dragosvictor Feb 15, 2024
b0b0dc3
Remove redundant TopicLookupMetricsTest
dragosvictor Feb 15, 2024
94d27fd
Use seconds unit for latency histograms
dragosvictor Feb 15, 2024
118034c
Revert redundant test changes in BrokerServiceThrottlingTest
dragosvictor Feb 15, 2024
02522d3
Cosmetic fixes
dragosvictor Feb 15, 2024
8eef496
Add PulsarDeprecatedMetric annotation
dragosvictor Feb 16, 2024
301603f
Add metric utility class
dragosvictor Feb 16, 2024
a92d77f
Add limit metrics for throttling semaphores
dragosvictor Feb 17, 2024
1e9bc12
Test fixes
dragosvictor Feb 17, 2024
1b1be93
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 20, 2024
44933ce
Clarify decision to implement MetricsUtil.convertToSeconds
dragosvictor Feb 20, 2024
3cedfc0
Update metric types
dragosvictor Feb 20, 2024
102a230
Improve test readability
dragosvictor Feb 20, 2024
517335c
Use LongUpDownCounter for pending ops usage counters
dragosvictor Feb 20, 2024
29f42f6
Rename metrics pulsar.broker.topic.load.operation.pending.[usage,limit]
dragosvictor Feb 21, 2024
3141eb6
Add unit to pulsar.broker.topic.load.operation.pending.*
dragosvictor Feb 21, 2024
91f9ec8
Use duration histogram for pulsar broker lookup response metrics
dragosvictor Feb 21, 2024
4cd1c42
Cleanup debug statement
dragosvictor Feb 21, 2024
7ec86cb
Fix test build
dragosvictor Feb 21, 2024
c9eb651
Refactor BrokerServiceLookupTest#testMultipleBrokerLookup
dragosvictor Feb 21, 2024
4d9fc00
Refactor PulsarService constructor
dragosvictor Feb 21, 2024
c0bc9c5
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 22, 2024
a99b819
Update topic lookup pending ops metric name
dragosvictor Feb 22, 2024
34dd00d
Rename pendingTopicLoadOperations fields to match metric names
dragosvictor Feb 22, 2024
8810f7f
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 27, 2024
7ae864c
Move MetricsUtil class to pulsar-common
dragosvictor Feb 27, 2024
301f4f9
Use UpDownCounter for limit metrics
dragosvictor Feb 29, 2024
73b67dd
Rename metrics
dragosvictor Mar 5, 2024
a014fb2
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Mar 5, 2024
229a7d2
Fix build
dragosvictor Mar 5, 2024
05cdde0
Rename metrics
dragosvictor Mar 5, 2024
0016a6d
Use failure response attribute
dragosvictor Mar 5, 2024
4ef6988
Rename attribute to pulsar.lookup.response
dragosvictor Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -106,7 +105,6 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
Expand Down Expand Up @@ -153,35 +151,37 @@ public class NamespaceService implements AutoCloseable {

private final RedirectManager redirectManager;

@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.lookup.request.duration")
public static final String LOOKUP_REQUEST_DURATION_METRIC_NAME = "pulsar.broker.request.topic.lookup.duration";

private static final AttributeKey<String> PULSAR_LOOKUP_RESPONSE_TYPE =
AttributeKey.stringKey("pulsar.lookup.response.type");
public static final Attributes PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_TYPE, "broker")
.build();
public static final Attributes PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_TYPE, "redirect")
.build();
public static final Attributes PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_TYPE, "failure")
.build();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();

@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.lookup.request.duration")
@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();

@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.lookup.request.duration")
@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();

@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.lookup.request.duration")
@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();
private final DoubleHistogram lookupLatencyHistogram;
private static final AttributeKey<String> PULSAR_LOOKUP_RESPONSE_TYPE =
AttributeKey.stringKey("pulsar.lookup.response.type");
@VisibleForTesting
public static final Attributes PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES = Attributes.builder()
.putAll(OpenTelemetryAttributes.PULSAR_RESPONSE_STATUS_SUCCESS)
.put(PULSAR_LOOKUP_RESPONSE_TYPE, "broker")
.build();
@VisibleForTesting
public static final Attributes PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES = Attributes.builder()
.putAll(OpenTelemetryAttributes.PULSAR_RESPONSE_STATUS_SUCCESS)
.put(PULSAR_LOOKUP_RESPONSE_TYPE, "redirect")
.build();

/**
* Default constructor.
Expand All @@ -201,7 +201,7 @@ public NamespaceService(PulsarService pulsar) {
this.redirectManager = new RedirectManager(pulsar);

this.lookupLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
.histogramBuilder("pulsar.broker.lookup.request.duration")
.histogramBuilder(LOOKUP_REQUEST_DURATION_METRIC_NAME)
.setDescription("Lookup request duration")
.setUnit("s")
.build();
Expand Down Expand Up @@ -249,11 +249,11 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
}
} else {
// No lookup result, default to reporting as failure.
attributes = OpenTelemetryAttributes.PULSAR_RESPONSE_STATUS_FAILURE;
attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
} else {
lookupFailures.inc();
attributes = OpenTelemetryAttributes.PULSAR_RESPONSE_STATUS_FAILURE;
attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, TimeUnit.NANOSECONDS), attributes);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Histogram;
import java.io.Closeable;
Expand Down Expand Up @@ -244,15 +243,19 @@ public class BrokerService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;

@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.topic.lookup.operation.pending.usage")
public static final String TOPIC_LOOKUP_USAGE_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.usage";
public static final String TOPIC_LOOKUP_LIMIT_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.limit";
@PulsarDeprecatedMetric(newMetricName = TOPIC_LOOKUP_USAGE_METRIC_NAME)
private final ObserverGauge pendingLookupRequests;
private final ObservableLongUpDownCounter pendingLookupOperationsCounter;
private final ObservableLongGauge pendingLookupOperationsLimitGauge;
private final ObservableLongUpDownCounter pendingLookupOperationsLimitCounter;

@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.topic.load.operation.pending.usage")
public static final String TOPIC_LOAD_USAGE_METRIC_NAME = "pulsar.broker.topic.load.concurrent.usage";
public static final String TOPIC_LOAD_LIMIT_METRIC_NAME = "pulsar.broker.topic.load.concurrent.limit";
@PulsarDeprecatedMetric(newMetricName = TOPIC_LOAD_USAGE_METRIC_NAME)
private final ObserverGauge pendingTopicLoadRequests;
private final ObservableLongUpDownCounter pendingTopicLoadOperationsCounter;
private final ObservableLongGauge pendingTopicLoadOperationsLimitGauge;
private final ObservableLongUpDownCounter pendingTopicLoadOperationsLimitCounter;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
Expand Down Expand Up @@ -415,15 +418,14 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
.supplier(this::getPendingLookupRequest)
.register();
this.pendingLookupOperationsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder("pulsar.broker.topic.lookup.operation.pending.usage")
.upDownCounterBuilder(TOPIC_LOOKUP_USAGE_METRIC_NAME)
.setDescription("The number of pending lookup operations in the broker. "
+ "When it reaches threshold \"maxConcurrentLookupRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getPendingLookupRequest()));
this.pendingLookupOperationsLimitGauge = pulsar.getOpenTelemetry().getMeter()
.gaugeBuilder("pulsar.broker.topic.lookup.operation.pending.limit")
.ofLongs()
this.pendingLookupOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOOKUP_LIMIT_METRIC_NAME)
.setDescription("The maximum number of pending lookup operations in the broker. "
+ "Equal to \"maxConcurrentLookupRequest\" defined in broker.conf.")
.setUnit("{operation}")
Expand All @@ -435,15 +437,14 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
.supplier(this::getPendingTopicLoadRequests)
.register();
this.pendingTopicLoadOperationsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder("pulsar.broker.topic.load.operation.pending.usage")
.upDownCounterBuilder(TOPIC_LOAD_USAGE_METRIC_NAME)
.setDescription("The number of pending topic load operations in the broker. "
+ "When it reaches threshold \"maxConcurrentTopicLoadRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getPendingTopicLoadRequests()));
this.pendingTopicLoadOperationsLimitGauge = pulsar.getOpenTelemetry().getMeter()
.gaugeBuilder("pulsar.broker.topic.load.operation.pending.limit")
.ofLongs()
this.pendingTopicLoadOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOAD_LIMIT_METRIC_NAME)
.setDescription("The maximum number of pending topic load operations in the broker. "
+ "Equal to \"maxConcurrentTopicLoadRequest\" defined in broker.conf.")
.setUnit("{operation}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder b
public void testThrottlingLookupRequestSemaphore() throws Exception {
var lookupRequestSemaphore = pulsar.getBrokerService().lookupRequestSemaphore;
var configName = "maxConcurrentLookupRequest";
var metricName = "pulsar.broker.topic.lookup.operation.pending.limit";
var metricName = BrokerService.TOPIC_LOOKUP_LIMIT_METRIC_NAME;
// Validate that the configuration has not been overridden.
assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName);
assertLongGaugeValue(metricName, 50_000);
assertLongSumValue(metricName, 50_000);
assertThat(lookupRequestSemaphore.get().availablePermits()).isNotEqualTo(0);
admin.brokers().updateDynamicConfiguration(configName, Integer.toString(0));
waitAtMost(1, TimeUnit.SECONDS).until(() -> lookupRequestSemaphore.get().availablePermits() == 0);
assertLongGaugeValue(metricName, 0);
assertLongSumValue(metricName, 0);
}

/**
Expand All @@ -100,22 +100,22 @@ public void testThrottlingLookupRequestSemaphore() throws Exception {
public void testThrottlingTopicLoadRequestSemaphore() throws Exception {
var topicLoadRequestSemaphore = pulsar.getBrokerService().topicLoadRequestSemaphore;
var configName = "maxConcurrentTopicLoadRequest";
var metricName = "pulsar.broker.topic.load.operation.pending.limit";
var metricName = BrokerService.TOPIC_LOAD_LIMIT_METRIC_NAME;
// Validate that the configuration has not been overridden.
assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName);
assertLongGaugeValue(metricName, 5_000);
assertLongSumValue(metricName, 5_000);
assertThat(topicLoadRequestSemaphore.get().availablePermits()).isNotEqualTo(0);
admin.brokers().updateDynamicConfiguration(configName, Integer.toString(0));
waitAtMost(1, TimeUnit.SECONDS).until(() -> topicLoadRequestSemaphore.get().availablePermits() == 0);
assertLongGaugeValue(metricName, 0);
assertLongSumValue(metricName, 0);
}

private void assertLongGaugeValue(String metricName, int value) {
private void assertLongSumValue(String metricName, int value) {
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
.anySatisfy(metric -> assertThat(metric)
.hasName(metricName)
.hasLongGaugeSatisfying(
gauge -> gauge.hasPointsSatisfying(point -> point.hasValue(value))));
.hasLongSumSatisfying(
sum -> sum.hasPointsSatisfying(point -> point.hasValue(value))));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static org.apache.pulsar.broker.namespace.NamespaceService.LOOKUP_REQUEST_DURATION_METRIC_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -94,7 +95,6 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -223,9 +223,8 @@ public void testMultipleBrokerLookup() throws Exception {
topicLoadRequestSemaphoreField.set(pulsar2.getBrokerService(),
new AtomicReference<>(topicLoadRequestSemaphoreSpy));

var metricName = "pulsar.broker.lookup.request.duration";
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
.noneSatisfy(metric -> assertThat(metric).hasName("pulsar.broker.lookup.request.duration"));
.noneSatisfy(metric -> assertThat(metric).hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME));

/**** started broker-2 ****/
@Cleanup
Expand All @@ -237,15 +236,15 @@ public void testMultipleBrokerLookup() throws Exception {
cdlAfterLookupSemaphoreAcquire.await();
assertThat(metricReader.collectAllMetrics())
.anySatisfy(metric -> assertThat(metric)
.hasName("pulsar.broker.topic.lookup.operation.pending.usage")
.hasName(BrokerService.TOPIC_LOOKUP_USAGE_METRIC_NAME)
.hasLongSumSatisfying(
sum -> sum.hasPointsSatisfying(point -> point.hasValue(1))));
cdlLookupSemaphoreVerification.countDown();

cdlAfterTopicLoadSemaphoreAcquire.await();
assertThat(pulsarTestContext2.getOpenTelemetryMetricReader().collectAllMetrics())
.anySatisfy(metric -> assertThat(metric)
.hasName("pulsar.broker.topic.load.operation.pending.usage")
.hasName(BrokerService.TOPIC_LOAD_USAGE_METRIC_NAME)
.hasLongSumSatisfying(
sum -> sum.hasPointsSatisfying(point -> point.hasValue(1))));
cdlTopicLoadSemaphoreVerification.countDown();
Expand Down Expand Up @@ -274,7 +273,7 @@ public void testMultipleBrokerLookup() throws Exception {
var metrics = metricReader.collectAllMetrics();
assertThat(metrics)
.anySatisfy(metric -> assertThat(metric)
.hasName(metricName)
.hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME)
.hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying(
point -> point
.hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES)
Expand Down Expand Up @@ -1201,14 +1200,13 @@ public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Ex
assertTrue(lookupService instanceof BinaryProtoLookupService);
ClientCnx lookupConnection = pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();

var metricName = "pulsar.broker.lookup.request.duration";
var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
assertThat(metricReader.collectAllMetrics())
.noneSatisfy(metric -> assertThat(metric)
.hasName(metricName)
.hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME)
.hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying(
point -> point
.hasAttributes(OpenTelemetryAttributes.PULSAR_RESPONSE_STATUS_FAILURE),
.hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES),
point -> point
.hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES))));

Expand All @@ -1224,10 +1222,10 @@ public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Ex

assertThat(metricReader.collectAllMetrics())
.anySatisfy(metric -> assertThat(metric)
.hasName(metricName)
.hasName(LOOKUP_REQUEST_DURATION_METRIC_NAME)
.hasHistogramSatisfying(histogram -> histogram.hasPointsSatisfying(
point -> point
.hasAttributes(OpenTelemetryAttributes.PULSAR_RESPONSE_STATUS_FAILURE)
.hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES)
.hasCount(1),
point -> point
.hasAttributes(NamespaceService.PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.opentelemetry;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;

/**
* Common OpenTelemetry attributes to be used by Pulsar components.
Expand All @@ -30,8 +29,4 @@ public interface OpenTelemetryAttributes {
* {@link OpenTelemetryService}.
*/
AttributeKey<String> PULSAR_CLUSTER = AttributeKey.stringKey("pulsar.cluster");

AttributeKey<String> PULSAR_RESPONSE_STATUS = AttributeKey.stringKey("pulsar.response.status");
Attributes PULSAR_RESPONSE_STATUS_SUCCESS = Attributes.of(PULSAR_RESPONSE_STATUS, "success");
Attributes PULSAR_RESPONSE_STATUS_FAILURE = Attributes.of(PULSAR_RESPONSE_STATUS, "failure");
}
Loading