Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type consumerCmd struct {
fPinned bool
placementPreferred string
apiLevel int
offline bool
}

func configureConsumerCommand(app commandHost) {
Expand Down Expand Up @@ -231,6 +232,7 @@ func configureConsumerCommand(app commandHost) {
consFind.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert)
consFind.Flag("expression", "Match consumers using an expression language").StringVar(&c.fExpression)
consFind.Flag("api-level", "Match consumers that support at least the given api level").IntVar(&c.apiLevel)
consFind.Flag("offline", "Match consumers that are offline").BoolVar(&c.offline)

consInfo := cons.Command("info", "Consumer information").Alias("nfo").Action(c.infoAction)
consInfo.Arg("stream", "Stream name").StringVar(&c.stream)
Expand Down Expand Up @@ -446,6 +448,20 @@ func (c *consumerCmd) findAction(_ *fisk.ParseContext) error {
return err
}

if c.offline {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah I think this should become a option into the query package like the others above?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think the option is defined in JSM. I'll look at adding it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, add it there pls

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filtered := found[:0]
for _, s := range found {
offline, _, err := s.IsOffline()
if err != nil {
return err
}
if offline {
filtered = append(filtered, s)
}
}
found = filtered
}

for _, c := range found {
fmt.Println(c.Name())
}
Expand Down
20 changes: 18 additions & 2 deletions cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type streamCmd struct {
subjectDeleteMarkerTTLSet bool
subjectDeleteMarkerTTL time.Duration
apiLevel int
offline bool
}

type streamStat struct {
Expand Down Expand Up @@ -323,6 +324,7 @@ Finding streams with certain subjects configured:
strFind.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert)
strFind.Flag("expression", "Match streams using an expression language").StringVar(&c.fExpression)
strFind.Flag("api-level", "Match streams that support at least the given api level").IntVar(&c.apiLevel)
strFind.Flag("offline", "Match streams that are offline").BoolVar(&c.offline)

strInfo := str.Command("info", "Stream information").Alias("nfo").Alias("i").Action(c.infoAction)
strInfo.Arg("stream", "Stream to retrieve information for").StringVar(&c.stream)
Expand Down Expand Up @@ -865,6 +867,20 @@ func (c *streamCmd) findAction(_ *fisk.ParseContext) (err error) {
return err
}

if c.offline {
filtered := found[:0]
for _, con := range found {
offline, _, err := con.IsOffline()
if err != nil {
return err
}
if offline {
filtered = append(filtered, con)
}
}
found = filtered
}

out := ""
switch {
case c.listNames:
Expand Down Expand Up @@ -3362,10 +3378,10 @@ func (c *streamCmd) renderStreamsAsTable(streams []*jsm.Stream, missing []string
table = iu.NewTableWriter(opts(), fmt.Sprintf("Streams matching %s", c.filterSubject))
}

table.AddHeaders("Name", "Description", "Created", "Messages", "Size", "Last Message")
table.AddHeaders("Name", "Description", "Created", "Messages", "Size", "Last Message", "Offline", "Offline Reason")
for _, s := range streams {
nfo, _ := s.LatestInformation()
table.AddRow(s.Name(), s.Description(), f(nfo.Created.Local()), f(nfo.State.Msgs), humanize.IBytes(nfo.State.Bytes), f(sinceRefOrNow(nfo.TimeStamp, nfo.State.LastTime)))
table.AddRow(s.Name(), s.Description(), f(nfo.Created.Local()), f(nfo.State.Msgs), humanize.IBytes(nfo.State.Bytes), f(sinceRefOrNow(nfo.TimeStamp, nfo.State.LastTime)), nfo.Offline, nfo.OfflineReason)
}

fmt.Fprintln(&out, table.Render())
Expand Down
21 changes: 21 additions & 0 deletions nats/tests/consumer_command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package main
import (
"encoding/json"
"fmt"
"math"
"math/rand"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -187,6 +189,25 @@ func TestConsumerFind(t *testing.T) {
return nil
})
})
t.Run("--offline", func(t *testing.T) {
withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
_, err := mgr.NewStream("T_OFFLINE")
if err != nil {
t.Fatalf("unable to create stream: %s", err)
}

_, err = mgr.NewConsumer("T_OFFLINE", jsm.DurableName("C1"), jsm.ConsumerMetadata(map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt - 1)}))
if err != nil {
t.Fatalf("unable to create consumer: %s", err)
}

output := string(runNatsCli(t, fmt.Sprintf("--server='%s' consumer find T_OFFLINE --offline", srv.ClientURL())))
if !expectMatchLine(t, output, "C1") {
t.Errorf("unexpected output. expected 1 streams: %s", output)
}
return nil
})
})
}

func TestConsumerInfo(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions nats/tests/stream_command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package main

import (
"fmt"
"math"
"math/rand"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -104,6 +106,20 @@ func TestStreamFind(t *testing.T) {
})
})

t.Run("--offline", func(t *testing.T) {
withJSServer(t, func(t *testing.T, srv *server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
_, err := mgr.NewStream("T2", jsm.AllowMsgTTL(), jsm.StreamMetadata(map[string]string{"_nats.req.level": strconv.Itoa(math.MaxInt - 1)}))
if err != nil {
t.Fatalf("unable to create stream: %s", err)
}
output := string(runNatsCli(t, fmt.Sprintf("--server='%s' stream find --offline", srv.ClientURL())))
if !expectMatchLine(t, output, "T_OFFLINE") {
t.Errorf("unexpected output. expected 1 streams: %s", output)
}
return nil
})
})

}

func TestStreamInfo(t *testing.T) {
Expand Down
Loading