Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"reflect"
"regexp"
"slices"
Expand Down Expand Up @@ -71,6 +73,13 @@ type ConsumerInfo struct {
PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"`
}

// consumerInfoClusterResponse is a response used in a cluster to communicate the consumer info
// back to the meta leader as part of a consumer list request.
type consumerInfoClusterResponse struct {
ConsumerInfo
OfflineReason string `json:"offline_reason,omitempty"` // Reporting when a consumer is offline.
}

type PriorityGroupState struct {
Group string `json:"group"`
PinnedClientID string `json:"pinned_client_id,omitempty"`
Expand Down Expand Up @@ -510,6 +519,10 @@ type consumer struct {
/// pinnedTtl is the remaining time before the current PinId expires.
pinnedTtl *time.Timer
pinnedTS time.Time

// If standalone/single-server, the offline reason needs to be stored directly in the consumer.
// Otherwise, if clustered it will be part of the consumer assignment.
offlineReason string
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to merge these so code is the same between single server and clustered?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this is not easily or at all possible due to the differences between single server and clustered.

We use streamAssignment and consumerAssignment to host this data and set up an info sub when clustered. We don't use those structs at all as single server (and don't use that info sub), so need to keep them somewhere else.

}

// A single subject filter.
Expand Down Expand Up @@ -5059,7 +5072,7 @@ func (o *consumer) setMaxPendingBytes(limit int) {
// This does some quick sanity checks to see if we should re-calculate num pending.
// Lock should be held.
func (o *consumer) checkNumPending() uint64 {
if o.mset != nil {
if o.mset != nil && o.mset.store != nil {
var state StreamState
o.mset.store.FastState(&state)
npc := o.numPending()
Expand Down Expand Up @@ -6103,6 +6116,14 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
} else {
err = store.Stop()
}
} else if dflag {
// If there's no store (for example, when it's offline), manually delete the directories.
o.mu.RLock()
stream, consumer := o.stream, o.name
o.mu.RUnlock()
accDir := filepath.Join(js.config.StoreDir, a.GetName())
consumersDir := filepath.Join(accDir, streamsDir, stream, consumerDir)
os.RemoveAll(filepath.Join(consumersDir, consumer))
Comment thread
derekcollison marked this conversation as resolved.
}

return err
Expand Down
20 changes: 20 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1918,5 +1918,25 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamOfflineReasonErrF",
"code": 500,
"error_code": 10194,
"description": "stream is offline: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerOfflineReasonErrF",
"code": 500,
"error_code": 10195,
"description": "consumer is offline: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
110 changes: 105 additions & 5 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/binary"
Expand Down Expand Up @@ -1333,8 +1334,54 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
}

var cfg FileStreamInfo
if err := json.Unmarshal(buf, &cfg); err != nil {
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err)
decoder := json.NewDecoder(bytes.NewReader(buf))
decoder.DisallowUnknownFields()
strictErr := decoder.Decode(&cfg)
if strictErr != nil {
cfg = FileStreamInfo{}
if err := json.Unmarshal(buf, &cfg); err != nil {
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, err)
continue
}
}
if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil {
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported stream '%s > %s', delete the stream or upgrade the server to API level %s", a.Name, cfg.StreamConfig.Name, apiLevel)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling stream metafile %q: %v", metafile, strictErr)
}
singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode()
if singleServerMode {
// Fake a stream, so we can respond to API requests as single-server.
mset := &stream{
acc: a,
jsa: jsa,
cfg: cfg.StreamConfig,
js: js,
srv: s,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
active: false,
created: time.Now().UTC(),
offlineReason: offlineReason,
}
if !cfg.Created.IsZero() {
mset.created = cfg.Created
}
mset.closed.Store(true)

jsa.mu.Lock()
jsa.streams[cfg.Name] = mset
jsa.mu.Unlock()

// Now do the consumers.
odir := filepath.Join(sdir, fi.Name(), consumerDir)
consumers = append(consumers, &ce{mset, odir})
}
continue
}

Expand Down Expand Up @@ -1510,13 +1557,66 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
}

var cfg FileConsumerInfo
if err := json.Unmarshal(buf, &cfg); err != nil {
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err)
decoder := json.NewDecoder(bytes.NewReader(buf))
decoder.DisallowUnknownFields()
strictErr := decoder.Decode(&cfg)
if strictErr != nil {
cfg = FileConsumerInfo{}
if err := json.Unmarshal(buf, &cfg); err != nil {
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, err)
continue
}
}
if supported := supportsRequiredApiLevel(cfg.Metadata); !supported || strictErr != nil {
var offlineReason string
if !supported {
apiLevel := getRequiredApiLevel(cfg.Metadata)
offlineReason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", apiLevel, JSApiLevel)
s.Warnf(" Detected unsupported consumer '%s > %s > %s', delete the consumer or upgrade the server to API level %s", a.Name, e.mset.name(), cfg.Name, apiLevel)
} else {
offlineReason = fmt.Sprintf("decoding error: %v", strictErr)
s.Warnf(" Error unmarshalling consumer metafile %q: %v", metafile, strictErr)
}
singleServerMode := !s.JetStreamIsClustered() && s.standAloneMode()
if singleServerMode {
if !e.mset.closed.Load() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this condition really happen?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we specifically want to stop the stream if it wasn't already.

The case is where the stream's config is fully supported. But we then encounter a consumer here that is NOT supported. That means we need to stop the stream and put it into the offline "stopped" mode.

So, this condition makes sure we stop the stream if it wasn't already, and put it into unsupported/offline/stopped mode. Stopping the stream as part of e.mset.stop below makes sure the stream will be mset.closed.Load() as well.

s.Warnf(" Stopping unsupported stream '%s > %s'", a.Name, e.mset.name())
e.mset.mu.Lock()
e.mset.offlineReason = "stopped"
e.mset.mu.Unlock()
e.mset.stop(false, false)
}

// Fake a consumer, so we can respond to API requests as single-server.
o := &consumer{
mset: e.mset,
js: s.getJetStream(),
acc: a,
srv: s,
cfg: cfg.ConsumerConfig,
active: false,
stream: e.mset.name(),
name: cfg.Name,
dseq: 1,
sseq: 1,
created: time.Now().UTC(),
closed: true,
offlineReason: offlineReason,
}
if !cfg.Created.IsZero() {
o.created = cfg.Created
}

e.mset.mu.Lock()
e.mset.setConsumer(o)
e.mset.mu.Unlock()
}
continue
}

isEphemeral := !isDurableConsumer(&cfg.ConsumerConfig)
if isEphemeral {
// This is an ephermal consumer and this could fail on restart until
// This is an ephemeral consumer and this could fail on restart until
// the consumer can reconnect. We will create it as a durable and switch it.
cfg.ConsumerConfig.Durable = ofi.Name()
}
Expand Down
Loading