Skip to content

Commit

Permalink
[improve][broker] Add topic name to emitted error messages. (apache#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored and hanmz committed Feb 12, 2025
1 parent bd3a8d3 commit f62afd0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
}

} catch (Exception e) {
log.error("Encountered unexpected error during exclusive producer creation", e);
log.error("[{}] Encountered unexpected error during exclusive producer creation", topic, e);
return FutureUtil.failedFuture(new BrokerServiceException(e));
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -941,14 +941,14 @@ protected void checkTopicFenced() throws BrokerServiceException {
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"));
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
"Topic '" + topic + "' reached max producers limit"));
}

if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
return CompletableFuture.failedFuture(
new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"));
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
"Topic '" + topic + "' reached max same address producers limit"));
}

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -983,7 +983,7 @@ private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Pr
if (previousIsActive.isEmpty() || previousIsActive.get()) {
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName()
+ "' is already connected to topic"));
+ "' is already connected to topic '" + topic + "'"));
} else {
// If the connection of the previous producer is not active, the method
// "cnx().checkConnectionLiveness()" will trigger the close for it and kick off the previous
Expand All @@ -996,7 +996,8 @@ private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Pr
});
}
return CompletableFuture.failedFuture(new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"));
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic '"
+ topic + "'"));
}
}

Expand Down Expand Up @@ -1346,7 +1347,7 @@ public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar, S
return getMigratedClusterUrlAsync(pulsar, topic)
.get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Failed to get migration cluster URL", e);
log.warn("[{}] Failed to get migration cluster URL", topic, e);
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2888,7 +2888,8 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max producers limit"));
String expectMsg = "Topic '" + topic + "' reached max producers limit";
assertTrue(e.getMessage().contains(expectMsg));
}
//set the limit to 3
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Expand All @@ -2902,7 +2903,8 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).create();
fail("should fail");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains("Topic reached max producers limit"));
String expectMsg = "Topic '" + topic + "' reached max producers limit";
assertTrue(e.getMessage().contains(expectMsg));
}

//clean up
Expand Down

0 comments on commit f62afd0

Please sign in to comment.