From 1ba4e9ee610df6b0e32fc8edc1198fd5434328cd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 7 Feb 2024 08:27:19 -0800 Subject: [PATCH 1/3] fix: cleanup the RMS consumer name --- server/mqtt.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index f64c160ee58..ff55ed82a3a 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1458,9 +1458,8 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc // // So we use a durable consumer, and create a new one each time we start. // The old one should expire and get deleted due to inactivity. The name for - // the durable is $MQTT_rmsgs_{uuid}_{server-name}, the server name is just - // for readability. - rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() + "_" + s.String() + // the durable is $MQTT_rmsgs_{uuid}. + rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() ccfg := &CreateConsumerRequest{ Stream: mqttRetainedMsgsStreamName, From df61ccee9a39e8ff2e40d22ca80d33e4227ec674 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 7 Feb 2024 08:33:42 -0800 Subject: [PATCH 2/3] change: do not use a durable consumer for RMS --- server/mqtt.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index ff55ed82a3a..b7e222587d2 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1451,20 +1451,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName) - // Using ephemeral consumer is too risky because if this server were to be - // disconnected from the rest for few seconds, then the leader would remove - // the consumer, so even after a reconnect, we would no longer receive - // retained messages. - // - // So we use a durable consumer, and create a new one each time we start. - // The old one should expire and get deleted due to inactivity. The name for - // the durable is $MQTT_rmsgs_{uuid}. - rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() - + // Create a new, uniquely names consumer for retained messages for this + // server. The prior one will expire eventually. ccfg := &CreateConsumerRequest{ Stream: mqttRetainedMsgsStreamName, Config: ConsumerConfig{ - Durable: rmDurName, + Name: mqttRetainedMsgsStreamName + "_" + nuid.Next(), FilterSubject: mqttRetainedMsgsStreamSubject + ">", DeliverSubject: rmsubj, ReplayPolicy: ReplayInstant, @@ -1657,12 +1649,21 @@ func (jsa *mqttJSA) createConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCr if err != nil { return nil, err } - var subj string - if cfg.Config.Durable != _EMPTY_ { - subj = fmt.Sprintf(JSApiDurableCreateT, cfg.Stream, cfg.Config.Durable) - } else { - subj = fmt.Sprintf(JSApiConsumerCreateT, cfg.Stream) + subj := fmt.Sprintf(JSApiConsumerCreateT, cfg.Stream) + ccri, err := jsa.newRequest(mqttJSAConsumerCreate, subj, 0, cfgb) + if err != nil { + return nil, err + } + ccr := ccri.(*JSApiConsumerCreateResponse) + return ccr, ccr.ToError() +} + +func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) { + cfgb, err := json.Marshal(cfg) + if err != nil { + return nil, err } + subj := fmt.Sprintf(JSApiDurableCreateT, cfg.Stream, cfg.Config.Durable) ccri, err := jsa.newRequest(mqttJSAConsumerCreate, subj, 0, cfgb) if err != nil { return nil, err @@ -4940,7 +4941,7 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error { if opts.MQTT.ConsumerInactiveThreshold > 0 { ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold } - if _, err := sess.jsa.createConsumer(ccr); err != nil { + if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err) return err } @@ -5046,7 +5047,7 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string, if opts.MQTT.ConsumerInactiveThreshold > 0 { ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold } - if _, err := sess.jsa.createConsumer(ccr); err != nil { + if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err) return nil, nil, err } From ac44715036a07916a6818a3d2c5930f80f7b53bd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 7 Feb 2024 08:57:56 -0800 Subject: [PATCH 3/3] nit: Intention-revealing function names --- server/mqtt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index b7e222587d2..a1abcb3ef7f 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1464,7 +1464,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc InactiveThreshold: 5 * time.Minute, }, } - if _, err := jsa.createConsumer(ccfg); err != nil { + if _, err := jsa.createEphemeralConsumer(ccfg); err != nil { return nil, fmt.Errorf("create retained messages consumer for account %q: %v", accName, err) } @@ -1644,7 +1644,7 @@ func (jsa *mqttJSA) sendAck(ackSubject string) { jsa.sendq.push(&mqttJSPubMsg{subj: ackSubject, hdr: -1}) } -func (jsa *mqttJSA) createConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) { +func (jsa *mqttJSA) createEphemeralConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) { cfgb, err := json.Marshal(cfg) if err != nil { return nil, err