diff --git a/server/consumer.go b/server/consumer.go index 07fd4a079da..40239902aea 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1588,8 +1588,23 @@ func (o *consumer) unsubscribe(sub *subscription) { // We need to make sure we protect access to the outq. // Do all advisory sends here. -func (o *consumer) sendAdvisory(subj string, msg []byte) { - o.outq.sendMsg(subj, msg) +func (o *consumer) sendAdvisory(subject string, e any) { + if o.acc == nil { + return + } + + // If there is no one listening for this advisory then save ourselves the effort + // and don't bother encoding the JSON or sending it. + if sl := o.acc.sl; (sl != nil && !sl.HasInterest(subject)) && !o.srv.hasGatewayInterest(o.acc.Name, subject) { + return + } + + j, err := json.Marshal(e) + if err != nil { + return + } + + o.outq.sendMsg(subject, j) } func (o *consumer) sendDeleteAdvisoryLocked() { @@ -1605,13 +1620,8 @@ func (o *consumer) sendDeleteAdvisoryLocked() { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) } func (o *consumer) sendPinnedAdvisoryLocked(group string) { @@ -1629,13 +1639,8 @@ func (o *consumer) sendPinnedAdvisoryLocked(group string) { Group: group, } - j, err := json.Marshal(e) - if err != nil { - return - } - subj := JSAdvisoryConsumerPinnedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) } func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) { @@ -1653,13 +1658,8 @@ func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) { Reason: reason, } - j, err := json.Marshal(e) - if err != nil { - return - } - subj := JSAdvisoryConsumerUnpinnedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) } @@ -1679,13 +1679,8 @@ func (o *consumer) sendCreateAdvisory() { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) } func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) { @@ -1705,13 +1700,8 @@ func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) { e.Paused = time.Now().Before(e.PauseUntil) } - j, err := json.Marshal(e) - if err != nil { - return - } - subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) } // Created returns created time. @@ -2652,12 +2642,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - - o.sendAdvisory(o.nakEventT, j) + o.sendAdvisory(o.nakEventT, e) // Check to see if we have delays attached. if len(nak) > len(AckNak) { @@ -2732,15 +2717,8 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - // We had an error during the marshal, so we can't send the advisory, - // but we still need to tell the caller that the ack was processed. - return ackedInPlace - } - subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name - o.sendAdvisory(subj, j) + o.sendAdvisory(subj, e) return ackedInPlace } @@ -3052,12 +3030,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - - o.sendAdvisory(o.ackEventT, j) + o.sendAdvisory(o.ackEventT, e) } // Process an ACK. @@ -3946,12 +3919,7 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) { Domain: o.srv.getOpts().JetStreamDomain, } - j, err := json.Marshal(e) - if err != nil { - return - } - - o.sendAdvisory(o.deliveryExcEventT, j) + o.sendAdvisory(o.deliveryExcEventT, e) } // Check if the candidate subject matches a filter if its present. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 2e3e63a80f1..e60ad6f6780 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5229,3 +5229,34 @@ func TestJetStreamClusterMetaStepdownPreferred(t *testing.T) { require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF) }) } + +func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + subj := "$JS.ADVISORY.TEST" + s1 := c.servers[0] + s2 := c.servers[1] + + // On the first server, see if we think the advisory will be published. + require_False(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test")) + + // On the second server, subscribe to the advisory subject. + nc, _ := jsClientConnect(t, s2) + defer nc.Close() + + _, err := nc.Subscribe(subj, func(_ *nats.Msg) {}) + require_NoError(t, err) + + // Wait for the interest to propagate to the first server. + checkFor(t, time.Second, 25*time.Millisecond, func() error { + if !s1.GlobalAccount().sl.HasInterest(subj) { + return fmt.Errorf("expected interest in %q, not yet found", subj) + } + return nil + }) + + // On the first server, try and publish the advisory again. THis time + // it should succeed. + require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test")) +} diff --git a/server/jetstream_events.go b/server/jetstream_events.go index 8242430b5c0..01d4324a2fd 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -18,13 +18,22 @@ import ( "time" ) -func (s *Server) publishAdvisory(acc *Account, subject string, adv any) { +// publishAdvisory sends the given advisory into the account. Returns true if +// it was sent, false if not (i.e. due to lack of interest or a marshal error). +func (s *Server) publishAdvisory(acc *Account, subject string, adv any) bool { if acc == nil { acc = s.SystemAccount() if acc == nil { - return + return false } } + + // If there is no one listening for this advisory then save ourselves the effort + // and don't bother encoding the JSON or sending it. + if sl := acc.sl; (sl != nil && !sl.HasInterest(subject)) && !s.hasGatewayInterest(acc.Name, subject) { + return false + } + ej, err := json.Marshal(adv) if err == nil { err = s.sendInternalAccountMsg(acc, subject, ej) @@ -34,6 +43,7 @@ func (s *Server) publishAdvisory(acc *Account, subject string, adv any) { } else { s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err) } + return err == nil } // JSAPIAudit is an advisory about administrative actions taken on JetStream