From 402890e9efb4abe564e3ace109a4cc280e269f7c Mon Sep 17 00:00:00 2001 From: Pieter Loubser Date: Tue, 12 Aug 2025 12:45:48 +0100 Subject: [PATCH] (#691) Add --offline to stream and consumer find commands Signed-off-by: Pieter Loubser --- cli/consumer_command.go | 16 ++++++++++++++++ cli/stream_command.go | 20 ++++++++++++++++++-- nats/tests/consumer_command_test.go | 21 +++++++++++++++++++++ nats/tests/stream_command_test.go | 16 ++++++++++++++++ 4 files changed, 71 insertions(+), 2 deletions(-) diff --git a/cli/consumer_command.go b/cli/consumer_command.go index 1dfa81e4..9892f998 100644 --- a/cli/consumer_command.go +++ b/cli/consumer_command.go @@ -127,6 +127,7 @@ type consumerCmd struct { fPinned bool placementPreferred string apiLevel int + offline bool } func configureConsumerCommand(app commandHost) { @@ -231,6 +232,7 @@ func configureConsumerCommand(app commandHost) { consFind.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert) consFind.Flag("expression", "Match consumers using an expression language").StringVar(&c.fExpression) consFind.Flag("api-level", "Match consumers that support at least the given api level").IntVar(&c.apiLevel) + consFind.Flag("offline", "Match consumers that are offline").BoolVar(&c.offline) consInfo := cons.Command("info", "Consumer information").Alias("nfo").Action(c.infoAction) consInfo.Arg("stream", "Stream name").StringVar(&c.stream) @@ -446,6 +448,20 @@ func (c *consumerCmd) findAction(_ *fisk.ParseContext) error { return err } + if c.offline { + filtered := found[:0] + for _, s := range found { + offline, _, err := s.IsOffline() + if err != nil { + return err + } + if offline { + filtered = append(filtered, s) + } + } + found = filtered + } + for _, c := range found { fmt.Println(c.Name()) } diff --git a/cli/stream_command.go b/cli/stream_command.go index 51aa929d..b7cc2b74 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -168,6 +168,7 @@ type streamCmd struct { subjectDeleteMarkerTTLSet bool subjectDeleteMarkerTTL time.Duration apiLevel int + offline bool } type streamStat struct { @@ -323,6 +324,7 @@ Finding streams with certain subjects configured: strFind.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert) strFind.Flag("expression", "Match streams using an expression language").StringVar(&c.fExpression) strFind.Flag("api-level", "Match streams that support at least the given api level").IntVar(&c.apiLevel) + strFind.Flag("offline", "Match streams that are offline").BoolVar(&c.offline) strInfo := str.Command("info", "Stream information").Alias("nfo").Alias("i").Action(c.infoAction) strInfo.Arg("stream", "Stream to retrieve information for").StringVar(&c.stream) @@ -865,6 +867,20 @@ func (c *streamCmd) findAction(_ *fisk.ParseContext) (err error) { return err } + if c.offline { + filtered := found[:0] + for _, con := range found { + offline, _, err := con.IsOffline() + if err != nil { + return err + } + if offline { + filtered = append(filtered, con) + } + } + found = filtered + } + out := "" switch { case c.listNames: @@ -3362,10 +3378,10 @@ func (c *streamCmd) renderStreamsAsTable(streams []*jsm.Stream, missing []string table = iu.NewTableWriter(opts(), fmt.Sprintf("Streams matching %s", c.filterSubject)) } - table.AddHeaders("Name", "Description", "Created", "Messages", "Size", "Last Message") + table.AddHeaders("Name", "Description", "Created", "Messages", "Size", "Last Message", "Offline", "Offline Reason") for _, s := range streams { nfo, _ := s.LatestInformation() - table.AddRow(s.Name(), s.Description(), f(nfo.Created.Local()), f(nfo.State.Msgs), humanize.IBytes(nfo.State.Bytes), f(sinceRefOrNow(nfo.TimeStamp, nfo.State.LastTime))) + table.AddRow(s.Name(), s.Description(), f(nfo.Created.Local()), f(nfo.State.Msgs), humanize.IBytes(nfo.State.Bytes), f(sinceRefOrNow(nfo.TimeStamp, nfo.State.LastTime)), nfo.Offline, nfo.OfflineReason) } fmt.Fprintln(&out, table.Render()) diff --git a/nats/tests/consumer_command_test.go b/nats/tests/consumer_command_test.go index c64a5372..e23a1db1 100644 --- a/nats/tests/consumer_command_test.go +++ b/nats/tests/consumer_command_test.go @@ -16,7 +16,9 @@ package main import ( "encoding/json" "fmt" + "math" "math/rand" + "strconv" "strings" "testing" "time" @@ -187,6 +189,25 @@ func TestConsumerFind(t *testing.T) { return nil }) }) + t.Run("--offline", func(t *testing.T) { + withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error { + _, err := mgr.NewStream("T_OFFLINE") + if err != nil { + t.Fatalf("unable to create stream: %s", err) + } + + _, err = mgr.NewConsumer("T_OFFLINE", jsm.DurableName("C1"), jsm.ConsumerMetadata(map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt - 1)})) + if err != nil { + t.Fatalf("unable to create consumer: %s", err) + } + + output := string(runNatsCli(t, fmt.Sprintf("--server='%s' consumer find T_OFFLINE --offline", srv.ClientURL()))) + if !expectMatchLine(t, output, "C1") { + t.Errorf("unexpected output. expected 1 streams: %s", output) + } + return nil + }) + }) } func TestConsumerInfo(t *testing.T) { diff --git a/nats/tests/stream_command_test.go b/nats/tests/stream_command_test.go index 8b548a4b..66d18f5e 100644 --- a/nats/tests/stream_command_test.go +++ b/nats/tests/stream_command_test.go @@ -15,7 +15,9 @@ package main import ( "fmt" + "math" "math/rand" + "strconv" "testing" "time" @@ -104,6 +106,20 @@ func TestStreamFind(t *testing.T) { }) }) + t.Run("--offline", func(t *testing.T) { + withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error { + _, err := mgr.NewStream("T2", jsm.AllowMsgTTL(), jsm.StreamMetadata(map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt - 1)})) + if err != nil { + t.Fatalf("unable to create stream: %s", err) + } + output := string(runNatsCli(t, fmt.Sprintf("--server='%s' stream find --offline", srv.ClientURL()))) + if !expectMatchLine(t, output, "T_OFFLINE") { + t.Errorf("unexpected output. expected 1 streams: %s", output) + } + return nil + }) + }) + } func TestStreamInfo(t *testing.T) {