-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
FIXED: MQTT retained message consumer creation #5048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1451,29 +1451,20 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc | |
| rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id | ||
| jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName) | ||
|
|
||
| // Using ephemeral consumer is too risky because if this server were to be | ||
| // disconnected from the rest for few seconds, then the leader would remove | ||
| // the consumer, so even after a reconnect, we would no longer receive | ||
| // retained messages. | ||
| // | ||
| // So we use a durable consumer, and create a new one each time we start. | ||
| // The old one should expire and get deleted due to inactivity. The name for | ||
| // the durable is $MQTT_rmsgs_{uuid}_{server-name}, the server name is just | ||
| // for readability. | ||
| rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() + "_" + s.String() | ||
|
|
||
| // Create a new, uniquely names consumer for retained messages for this | ||
| // server. The prior one will expire eventually. | ||
| ccfg := &CreateConsumerRequest{ | ||
| Stream: mqttRetainedMsgsStreamName, | ||
| Config: ConsumerConfig{ | ||
| Durable: rmDurName, | ||
| Name: mqttRetainedMsgsStreamName + "_" + nuid.Next(), | ||
| FilterSubject: mqttRetainedMsgsStreamSubject + ">", | ||
| DeliverSubject: rmsubj, | ||
| ReplayPolicy: ReplayInstant, | ||
| AckPolicy: AckNone, | ||
| InactiveThreshold: 5 * time.Minute, | ||
| }, | ||
| } | ||
| if _, err := jsa.createConsumer(ccfg); err != nil { | ||
| if _, err := jsa.createEphemeralConsumer(ccfg); err != nil { | ||
| return nil, fmt.Errorf("create retained messages consumer for account %q: %v", accName, err) | ||
| } | ||
|
|
||
|
|
@@ -1653,17 +1644,26 @@ func (jsa *mqttJSA) sendAck(ackSubject string) { | |
| jsa.sendq.push(&mqttJSPubMsg{subj: ackSubject, hdr: -1}) | ||
| } | ||
|
|
||
| func (jsa *mqttJSA) createConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) { | ||
| func (jsa *mqttJSA) createEphemeralConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) { | ||
| cfgb, err := json.Marshal(cfg) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| var subj string | ||
| if cfg.Config.Durable != _EMPTY_ { | ||
| subj = fmt.Sprintf(JSApiDurableCreateT, cfg.Stream, cfg.Config.Durable) | ||
| } else { | ||
| subj = fmt.Sprintf(JSApiConsumerCreateT, cfg.Stream) | ||
| subj := fmt.Sprintf(JSApiConsumerCreateT, cfg.Stream) | ||
| ccri, err := jsa.newRequest(mqttJSAConsumerCreate, subj, 0, cfgb) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| ccr := ccri.(*JSApiConsumerCreateResponse) | ||
| return ccr, ccr.ToError() | ||
| } | ||
|
|
||
| func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this materially different from createConsumer above? |
||
| cfgb, err := json.Marshal(cfg) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| subj := fmt.Sprintf(JSApiDurableCreateT, cfg.Stream, cfg.Config.Durable) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this durable and one above is not?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I broke out the function into 2 to make it more explicit. Do you prefer to keep it exactly as before, 1 function triggering which API to use based on |
||
| ccri, err := jsa.newRequest(mqttJSAConsumerCreate, subj, 0, cfgb) | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -4941,7 +4941,7 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error { | |
| if opts.MQTT.ConsumerInactiveThreshold > 0 { | ||
| ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold | ||
| } | ||
| if _, err := sess.jsa.createConsumer(ccr); err != nil { | ||
| if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { | ||
| c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err) | ||
| return err | ||
| } | ||
|
|
@@ -5047,7 +5047,7 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string, | |
| if opts.MQTT.ConsumerInactiveThreshold > 0 { | ||
| ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold | ||
| } | ||
| if _, err := sess.jsa.createConsumer(ccr); err != nil { | ||
| if _, err := sess.jsa.createDurableConsumer(ccr); err != nil { | ||
| c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err) | ||
| return nil, nil, err | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.