diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 77a59c7150d..cefc1182297 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2976,20 +2976,27 @@ func TestJetStreamClusterAPILimitAdvisory(t *testing.T) { // pushing to and draining the queue, so make sure we've sent enough of them // to reliably trigger a drain and advisory. inbox := nc.NewRespInbox() - for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ { + total := 100 + for range total { require_NoError(t, nc.PublishMsg(&nats.Msg{ Subject: fmt.Sprintf(JSApiConsumerListT, "TEST"), Reply: inbox, })) } - // Wait for the advisory to come in. - msg, err := sub.NextMsg(time.Second * 5) - require_NoError(t, err) - var advisory JSAPILimitReachedAdvisory - require_NoError(t, json.Unmarshal(msg.Data, &advisory)) - require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set. - require_True(t, advisory.Dropped >= 1) // We dropped at least something. + for range total { + // Wait for the advisory to come in. + msg, err := sub.NextMsg(time.Second * 5) + require_NoError(t, err) + var advisory JSAPILimitReachedAdvisory + require_NoError(t, json.Unmarshal(msg.Data, &advisory)) + require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set. + if advisory.Dropped >= 1 { + // We are done! + return + } + } + t.Fatal("Did not get any advisory with dropped > 0") } func TestJetStreamClusterPendingRequestsInJsz(t *testing.T) { diff --git a/server/opts.go b/server/opts.go index 3cf8f23aa5e..e0c1d6a4ecc 100644 --- a/server/opts.go +++ b/server/opts.go @@ -5369,7 +5369,10 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { mergeRoutes(&opts, flagOpts) } if flagOpts.JetStream { - fileOpts.JetStream = flagOpts.JetStream + opts.JetStream = flagOpts.JetStream + } + if flagOpts.StoreDir != _EMPTY_ { + opts.StoreDir = flagOpts.StoreDir } return &opts } @@ -5834,6 +5837,8 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, trackExplicitVal(&FlagSnapshot.inCmdLine, "Syslog", FlagSnapshot.Syslog) case "no_advertise": trackExplicitVal(&FlagSnapshot.inCmdLine, "Cluster.NoAdvertise", FlagSnapshot.Cluster.NoAdvertise) + case "js": + trackExplicitVal(&FlagSnapshot.inCmdLine, "JetStream", FlagSnapshot.JetStream) } }) diff --git a/server/opts_test.go b/server/opts_test.go index 5eca8de2e3c..8a4bf62d9dd 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -288,6 +288,8 @@ func TestMergeOverrides(t *testing.T) { LameDuckDuration: 4 * time.Minute, ConnectErrorReports: 86400, ReconnectErrorReports: 5, + JetStream: true, + StoreDir: "/store/dir", authBlockDefined: true, configDigest: "sha256:314adbd9997c1183f028f5b620362daa45893da76bac746136bfb48b2fd14996", } @@ -308,6 +310,8 @@ func TestMergeOverrides(t *testing.T) { NoAdvertise: true, ConnectRetries: 2, }, + JetStream: true, + StoreDir: "/store/dir", } merged := MergeOptions(fopts, opts) diff --git a/server/reload_test.go b/server/reload_test.go index bc20f40641c..a44e6e0f0d7 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -3808,9 +3808,53 @@ func TestConfigReloadBoolFlags(t *testing.T) { true, func() bool { return opts.TraceVerbose }, }, + // --js override + { + "jetstream_not_in_config_no_override", + "", + nil, + false, + func() bool { return opts.JetStream }, + }, + { + "jetstream_not_in_config_override_true", + "", + []string{"--js"}, + true, + func() bool { return opts.JetStream }, + }, + { + "jetstream_false_in_config_no_override", + "jetstream: false", + nil, + false, + func() bool { return opts.JetStream }, + }, + { + "jetstream_false_in_config_override_true", + "jetstream: false", + []string{"--js"}, + true, + func() bool { return opts.JetStream }, + }, + { + "jetstream_true_in_config_no_override", + "jetstream: true", + nil, + true, + func() bool { return opts.JetStream }, + }, + { + "jetstream_true_in_config_override_false", + "jetstream: true", + []string{"--js=false"}, + false, + func() bool { return opts.JetStream }, + }, } { t.Run(test.name, func(t *testing.T) { - conf := createConfFile(t, []byte(fmt.Sprintf(template, logfile, test.content))) + content := fmt.Sprintf(template, logfile, test.content) + conf := createConfFile(t, []byte(content)) fs := flag.NewFlagSet("test", flag.ContinueOnError) var args []string @@ -3829,9 +3873,15 @@ func TestConfigReloadBoolFlags(t *testing.T) { if test.val() != test.expected { t.Fatalf("Expected to be set to %v, got %v", test.expected, test.val()) } - if err := s.Reload(); err != nil { - t.Fatalf("Error on reload: %v", err) - } + // Do a config reload with a modified config file so that s.Reload() + // actually does something (otherwise it would not because config + // digest would not have changed). We could alternatively change + // s.opts.configDigest to the empty string. + reloadUpdateConfig(t, s, conf, content+` + max_connections: 1000 + `) + // Have `opts` now point to the new options after the Reload() + opts = s.getOpts() if test.val() != test.expected { t.Fatalf("Expected to be set to %v, got %v", test.expected, test.val()) }