From 2ca25cb8e241601a8009036637b7a9fbbe843bc8 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 25 Nov 2024 10:19:54 +0100 Subject: [PATCH 01/10] NRG: empty snapshots dir if memory WAL Signed-off-by: Maurice van Veen --- server/raft.go | 12 +++---- server/raft_helpers_test.go | 14 +++++--- server/raft_test.go | 68 +++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 10 deletions(-) diff --git a/server/raft.go b/server/raft.go index aaf8a8d14ff..563af0d11dc 100644 --- a/server/raft.go +++ b/server/raft.go @@ -419,20 +419,20 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel n.vote = vote } - // Make sure that the snapshots directory exists. - if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil { - return nil, fmt.Errorf("could not create snapshots directory - %v", err) - } - // Can't recover snapshots if memory based since wal will be reset. // We will inherit from the current leader. if _, ok := n.wal.(*memStore); ok { - os.Remove(filepath.Join(n.sd, snapshotsDir, "*")) + _ = os.RemoveAll(filepath.Join(n.sd, snapshotsDir)) } else { // See if we have any snapshots and if so load and process on startup. n.setupLastSnapshot() } + // Make sure that the snapshots directory exists. + if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil { + return nil, fmt.Errorf("could not create snapshots directory - %v", err) + } + truncateAndErr := func(index uint64) { if err := n.wal.Truncate(index); err != nil { n.setWriteErr(err) diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 837b434a7c8..218258a88db 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -319,6 +319,15 @@ func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine { } func initSingleMemRaftNode(t *testing.T) (*raft, func()) { + t.Helper() + n, c := initSingleMemRaftNodeWithCluster(t) + cleanup := func() { + c.shutdown() + } + return n, cleanup +} + +func initSingleMemRaftNodeWithCluster(t *testing.T) (*raft, *cluster) { t.Helper() c := createJetStreamClusterExplicit(t, "R3S", 3) s := c.servers[0] // RunBasicJetStreamServer not available @@ -332,10 +341,7 @@ func initSingleMemRaftNode(t *testing.T) (*raft, func()) { n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) require_NoError(t, err) - cleanup := func() { - c.shutdown() - } - return n, cleanup + return n, c } // Encode an AppendEntry. diff --git a/server/raft_test.go b/server/raft_test.go index 7573ae482b3..29d50e57db9 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "encoding/binary" "errors" "fmt" "math" @@ -1631,3 +1632,70 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { n.processAppendEntry(aeHeartbeat, n.aesub) require_Equal(t, n.commit, 2) } + +func TestNRGForwardProposalResponse(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + n := rg.nonLeader().node().(*raft) + psubj := n.psubj + + data := make([]byte, binary.MaxVarintLen64) + dn := binary.PutVarint(data, int64(123)) + + _, err := nc.Request(psubj, data[:dn], time.Second*5) + require_NoError(t, err) + + rg.waitOnTotal(t, 123) +} + +func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) { + n, c := initSingleMemRaftNodeWithCluster(t) + defer c.shutdown() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Simply receive first message. + n.processAppendEntry(aeMsg, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 0) + + // Heartbeat moves commit up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // Manually call back down to applied, and then snapshot. + n.Applied(1) + err := n.InstallSnapshot(nil) + require_NoError(t, err) + + // Stop current node and restart it. + n.Stop() + n.WaitForStop() + + s := c.servers[0] + ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage}) + require_NoError(t, err) + cfg := &RaftConfig{Name: "TEST", Store: n.sd, Log: ms} + n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + // Since the WAL is in-memory, the snapshots dir should've been emptied upon restart. + files, err := os.ReadDir(filepath.Join(n.sd, snapshotsDir)) + require_NoError(t, err) + require_Len(t, len(files), 0) +} From 5fbea2b7c8422d17d7b4acfb88e869cd56fac78d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 23 Nov 2024 16:55:41 -0600 Subject: [PATCH 02/10] Improve very large meta snapshots by moving to goccy json pkg and utilize s2.Encode vs EncodeBetter wich is faster and compresses better on our meta snapshots. Signed-off-by: Derek Collison --- go.mod | 1 + go.sum | 2 + server/accounts_test.go | 4 +- server/auth_callout_test.go | 4 +- server/auth_test.go | 2 +- server/certidp/certidp.go | 2 +- server/client.go | 2 +- server/client_test.go | 7 ++-- server/consumer.go | 2 +- server/events.go | 3 +- server/events_test.go | 2 +- server/filestore.go | 2 +- server/filestore_test.go | 4 +- server/gateway.go | 3 +- server/gateway_test.go | 5 ++- server/jetstream.go | 2 +- server/jetstream_api.go | 4 +- server/jetstream_cluster.go | 4 +- server/jetstream_cluster_1_test.go | 2 +- server/jetstream_cluster_2_test.go | 2 +- server/jetstream_cluster_3_test.go | 2 +- server/jetstream_cluster_4_test.go | 2 +- server/jetstream_consumer_test.go | 4 +- server/jetstream_events.go | 5 ++- server/jetstream_helpers_test.go | 2 +- server/jetstream_jwt_test.go | 7 ++-- server/jetstream_super_cluster_test.go | 2 +- server/jetstream_test.go | 2 +- server/jwt_test.go | 4 +- server/leafnode.go | 2 +- server/monitor.go | 2 +- server/monitor_test.go | 4 +- server/mqtt.go | 4 +- server/mqtt_ex_test_test.go | 2 +- server/mqtt_test.go | 4 +- server/nkey_test.go | 9 +++-- server/norace_test.go | 56 +++++++++++++++++++++++++- server/ocsp_responsecache.go | 4 +- server/opts_test.go | 4 +- server/reload_test.go | 4 +- server/route.go | 2 +- server/routes_test.go | 4 +- server/server.go | 17 ++++---- server/server_test.go | 2 +- server/stream.go | 2 +- server/util.go | 5 ++- server/websocket_test.go | 5 +-- test/auth_test.go | 4 +- test/gateway_test.go | 4 +- test/leafnode_test.go | 2 +- test/monitor_test.go | 4 +- test/new_routes_test.go | 4 +- test/norace_test.go | 2 +- test/ocsp_peer_test.go | 8 ++-- test/ports_test.go | 4 +- test/proto_test.go | 4 +- test/route_discovery_test.go | 4 +- test/routes_test.go | 4 +- test/service_latency_test.go | 4 +- test/test.go | 4 +- 60 files changed, 165 insertions(+), 108 deletions(-) diff --git a/go.mod b/go.mod index a9fa41d8be1..bf7ed9d760a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/nats-io/nats-server/v2 go 1.21.0 require ( + github.com/goccy/go-json v0.10.3 github.com/klauspost/compress v1.17.11 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.5.8 diff --git a/go.sum b/go.sum index afb9d291c01..3dcb4412ee9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= diff --git a/server/accounts_test.go b/server/accounts_test.go index 92778c66629..8f593cd4a0a 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2023 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,7 +15,6 @@ package server import ( "encoding/base64" - "encoding/json" "fmt" "net/http" "strconv" @@ -25,6 +24,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/auth_callout_test.go b/server/auth_callout_test.go index 29b7d83b049..b4f39045468 100644 --- a/server/auth_callout_test.go +++ b/server/auth_callout_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 The NATS Authors +// Copyright 2022-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,6 @@ package server import ( "bytes" "crypto/x509" - "encoding/json" "encoding/pem" "errors" "fmt" @@ -29,6 +28,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/auth_test.go b/server/auth_test.go index 05c3402f7d6..4ec9030dc81 100644 --- a/server/auth_test.go +++ b/server/auth_test.go @@ -15,7 +15,6 @@ package server import ( "context" - "encoding/json" "fmt" "net" "net/url" @@ -25,6 +24,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) diff --git a/server/certidp/certidp.go b/server/certidp/certidp.go index a26618577be..e92aebabf8e 100644 --- a/server/certidp/certidp.go +++ b/server/certidp/certidp.go @@ -17,12 +17,12 @@ import ( "crypto/sha256" "crypto/x509" "encoding/base64" - "encoding/json" "fmt" "net/url" "strings" "time" + "github.com/goccy/go-json" "golang.org/x/crypto/ocsp" ) diff --git a/server/client.go b/server/client.go index fef19dd5285..472ec77e7a4 100644 --- a/server/client.go +++ b/server/client.go @@ -17,7 +17,6 @@ import ( "bytes" "crypto/tls" "crypto/x509" - "encoding/json" "errors" "fmt" "io" @@ -33,6 +32,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/internal/fastrand" diff --git a/server/client_test.go b/server/client_test.go index aa5daf2a71f..fa03bd48073 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2022 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,7 @@ package server import ( "bufio" "bytes" - "encoding/json" + "crypto/tls" "fmt" "io" "math" @@ -31,8 +31,7 @@ import ( "testing" "time" - "crypto/tls" - + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/consumer.go b/server/consumer.go index a91c1925441..c569f636616 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -16,7 +16,6 @@ package server import ( "bytes" "encoding/binary" - "encoding/json" "errors" "fmt" "math/rand" @@ -28,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server/avl" "github.com/nats-io/nuid" "golang.org/x/time/rate" diff --git a/server/events.go b/server/events.go index 3f8ef050148..b21a734ae66 100644 --- a/server/events.go +++ b/server/events.go @@ -18,7 +18,6 @@ import ( "compress/gzip" "crypto/sha256" "crypto/x509" - "encoding/json" "errors" "fmt" "math/rand" @@ -30,8 +29,8 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" - "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/certidp" "github.com/nats-io/nats-server/v2/server/pse" diff --git a/server/events_test.go b/server/events_test.go index 6545db66753..ed87bfe4e02 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -16,7 +16,6 @@ package server import ( "bytes" "crypto/sha256" - "encoding/json" "errors" "fmt" "math/rand" @@ -30,6 +29,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/filestore.go b/server/filestore.go index 236ef62be40..1e6748b5ba2 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -22,7 +22,6 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" - "encoding/json" "errors" "fmt" "hash" @@ -40,6 +39,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" "github.com/nats-io/nats-server/v2/server/avl" diff --git a/server/filestore_test.go b/server/filestore_test.go index 1be968f8b11..cf0061410c2 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -24,7 +24,6 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" - "encoding/json" "errors" "fmt" "io" @@ -39,6 +38,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/nuid" ) @@ -1483,8 +1483,6 @@ func TestFileStoreMeta(t *testing.T) { if err := json.Unmarshal(buf, &oconfig2); err != nil { t.Fatalf("Error unmarshalling: %v", err) } - // Since we set name we will get that back now. - oconfig.Name = oname if !reflect.DeepEqual(oconfig2, oconfig) { t.Fatalf("Consumer configs not equal, got %+v vs %+v", oconfig2, oconfig) } diff --git a/server/gateway.go b/server/gateway.go index 46dd7260ec7..0c265c6dc95 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -18,7 +18,6 @@ import ( "cmp" "crypto/sha256" "crypto/tls" - "encoding/json" "fmt" "math/rand" "net" @@ -28,6 +27,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/goccy/go-json" ) const ( diff --git a/server/gateway_test.go b/server/gateway_test.go index b397bfedc10..665c9684520 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "crypto/tls" - "encoding/json" "fmt" "net" "net/url" @@ -31,10 +30,12 @@ import ( "testing" "time" - . "github.com/nats-io/nats-server/v2/internal/ocsp" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats.go" "golang.org/x/crypto/ocsp" + + . "github.com/nats-io/nats-server/v2/internal/ocsp" ) func init() { diff --git a/server/jetstream.go b/server/jetstream.go index e3f073fa95f..e0b17996014 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -18,7 +18,6 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" - "encoding/json" "fmt" "math" "os" @@ -30,6 +29,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/minio/highwayhash" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nkeys" diff --git a/server/jetstream_api.go b/server/jetstream_api.go index de014e74b72..6ad152e471b 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1,4 +1,4 @@ -// Copyright 2020-2023 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,6 @@ package server import ( "bytes" "cmp" - "encoding/json" "errors" "fmt" "math/rand" @@ -30,6 +29,7 @@ import ( "time" "unicode" + "github.com/goccy/go-json" "github.com/nats-io/nuid" ) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e8456c09487..8470873eccb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -18,7 +18,6 @@ import ( "cmp" crand "crypto/rand" "encoding/binary" - "encoding/json" "errors" "fmt" "math" @@ -32,6 +31,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/minio/highwayhash" "github.com/nats-io/nuid" @@ -1572,7 +1572,7 @@ func (js *jetStream) metaSnapshot() []byte { b, _ := json.Marshal(streams) js.mu.RUnlock() - return s2.EncodeBetter(nil, b) + return s2.Encode(nil, b) } func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 2c8c1ca5566..3621089eeec 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -20,7 +20,6 @@ import ( "bytes" "context" crand "crypto/rand" - "encoding/json" "errors" "fmt" "math/rand" @@ -33,6 +32,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index d24351a18a8..473e6324c14 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -22,7 +22,6 @@ import ( crand "crypto/rand" "encoding/binary" "encoding/hex" - "encoding/json" "errors" "fmt" "math/rand" @@ -37,6 +36,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats.go" ) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index a5b32c0e7b8..f2010b4180e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -19,7 +19,6 @@ package server import ( "bytes" "context" - "encoding/json" "errors" "fmt" "math/rand" @@ -33,6 +32,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" ) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index fe587d1d05b..54c93755ea9 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -18,7 +18,6 @@ package server import ( "context" - "encoding/json" "errors" "fmt" "io" @@ -36,6 +35,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" ) diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 8e62dbb12a0..5d40c0a621b 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 The NATS Authors +// Copyright 2022-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,7 +17,6 @@ package server import ( - "encoding/json" "errors" "fmt" "math/rand" @@ -29,6 +28,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" ) diff --git a/server/jetstream_events.go b/server/jetstream_events.go index 8302fcc4048..adb2a396e3d 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -1,4 +1,4 @@ -// Copyright 2020-2021 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,8 +14,9 @@ package server import ( - "encoding/json" "time" + + "github.com/goccy/go-json" ) func (s *Server) publishAdvisory(acc *Account, subject string, adv any) { diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index eb21057a04c..f5cbe05b5e7 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -18,7 +18,6 @@ package server import ( "context" - "encoding/json" "errors" "fmt" "io" @@ -33,6 +32,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats.go" "golang.org/x/time/rate" ) diff --git a/server/jetstream_jwt_test.go b/server/jetstream_jwt_test.go index f8f6466f69e..aca5e63a88a 100644 --- a/server/jetstream_jwt_test.go +++ b/server/jetstream_jwt_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2023 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,7 +17,6 @@ package server import ( - "encoding/json" "errors" "fmt" "net/http" @@ -27,9 +26,11 @@ import ( "testing" "time" - jwt "github.com/nats-io/jwt/v2" + "github.com/goccy/go-json" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" + + jwt "github.com/nats-io/jwt/v2" ) func TestJetStreamJWTLimits(t *testing.T) { diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 80fd87a6202..84b74688808 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -17,7 +17,6 @@ package server import ( - "encoding/json" "errors" "fmt" "math/rand" @@ -30,6 +29,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/jetstream_test.go b/server/jetstream_test.go index a8b10e7fc7c..0bc18107329 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21,7 +21,6 @@ import ( "context" crand "crypto/rand" "encoding/base64" - "encoding/json" "errors" "fmt" "io" @@ -42,6 +41,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nats.go" diff --git a/server/jwt_test.go b/server/jwt_test.go index f4235a2b973..c6c9f1d1446 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2020 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,7 +17,6 @@ import ( "bufio" "context" "encoding/base64" - "encoding/json" "errors" "fmt" "io" @@ -31,6 +30,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/leafnode.go b/server/leafnode.go index e163a6a2797..b2dd38ab363 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -18,7 +18,6 @@ import ( "bytes" "crypto/tls" "encoding/base64" - "encoding/json" "fmt" "math/rand" "net" @@ -36,6 +35,7 @@ import ( "time" "unicode" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" diff --git a/server/monitor.go b/server/monitor.go index 2bd25f9a7be..86aeab00b96 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -20,7 +20,6 @@ import ( "crypto/tls" "crypto/x509" "encoding/hex" - "encoding/json" "expvar" "fmt" "net" @@ -37,6 +36,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/pse" ) diff --git a/server/monitor_test.go b/server/monitor_test.go index 7003de12316..f7c214c3609 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2023 The NATS Authors +// Copyright 2013-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,6 @@ package server import ( "bytes" "crypto/tls" - "encoding/json" "errors" "fmt" "io" @@ -34,6 +33,7 @@ import ( "time" "unicode" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/mqtt.go b/server/mqtt.go index 35c18ba154d..0cad975bb18 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -1,4 +1,4 @@ -// Copyright 2020-2023 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -18,7 +18,6 @@ import ( "cmp" "crypto/tls" "encoding/binary" - "encoding/json" "errors" "fmt" "io" @@ -31,6 +30,7 @@ import ( "time" "unicode/utf8" + "github.com/goccy/go-json" "github.com/nats-io/nuid" ) diff --git a/server/mqtt_ex_test_test.go b/server/mqtt_ex_test_test.go index 9acad558779..4280b714f99 100644 --- a/server/mqtt_ex_test_test.go +++ b/server/mqtt_ex_test_test.go @@ -18,7 +18,6 @@ package server import ( "bytes" - "encoding/json" "fmt" "io" "os" @@ -27,6 +26,7 @@ import ( "strings" "testing" + "github.com/goccy/go-json" "github.com/nats-io/nuid" ) diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 09b372b910a..84bedafbc1e 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -1,4 +1,4 @@ -// Copyright 2020-2023 The NATS Authors +// Copyright 2020-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -20,7 +20,6 @@ import ( "bufio" "bytes" "crypto/tls" - "encoding/json" "errors" "fmt" "io" @@ -33,6 +32,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/nkey_test.go b/server/nkey_test.go index f593a2f7548..174f7067100 100644 --- a/server/nkey_test.go +++ b/server/nkey_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,15 +15,16 @@ package server import ( "bufio" - crand "crypto/rand" "encoding/base64" - "encoding/json" "fmt" - mrand "math/rand" "strings" "testing" "time" + crand "crypto/rand" + mrand "math/rand" + + "github.com/goccy/go-json" "github.com/nats-io/nkeys" ) diff --git a/server/norace_test.go b/server/norace_test.go index a0bfac991d5..b0d08d5482e 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -22,7 +22,6 @@ import ( "compress/gzip" "context" "encoding/binary" - "encoding/json" "errors" "fmt" "io" @@ -48,6 +47,7 @@ import ( crand "crypto/rand" "crypto/sha256" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/avl" @@ -11187,3 +11187,57 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing. mset.checkInterestState() require_True(t, time.Since(start) < elapsed/100) } + +func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) { + // This test was to show improvements in speed for marshaling the meta layer with lots of assets. + // Move to S2.Encode vs EncodeBetter which is 2x faster and actually better compression. + // Also moved to goccy json which is faster then the default and in my tests now always matches + // the default encoder byte for byte which last time I checked it did not. + t.Skip() + + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + // Create 200 streams, each with 500 consumers. + numStreams := 200 + numConsumers := 500 + wg := sync.WaitGroup{} + wg.Add(numStreams) + for i := 0; i < numStreams; i++ { + go func() { + defer wg.Done() + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + sname := fmt.Sprintf("TEST-SNAPSHOT-%d", i) + subj := fmt.Sprintf("foo.%d", i) + _, err := js.AddStream(&nats.StreamConfig{ + Name: sname, + Subjects: []string{subj}, + Replicas: 3, + }) + require_NoError(t, err) + + // Now consumers. + for c := 0; c < numConsumers; c++ { + _, err = js.AddConsumer(sname, &nats.ConsumerConfig{ + Durable: fmt.Sprintf("C-%d", c), + FilterSubject: subj, + AckPolicy: nats.AckExplicitPolicy, + Replicas: 1, + }) + require_NoError(t, err) + } + }() + } + wg.Wait() + + s := c.leader() + js := s.getJetStream() + n := js.getMetaGroup() + // Now let's see how long it takes to create a meta snapshot and how big it is. + start := time.Now() + snap := js.metaSnapshot() + require_NoError(t, n.InstallSnapshot(snap)) + t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap))) +} diff --git a/server/ocsp_responsecache.go b/server/ocsp_responsecache.go index 455fdd3a270..b62166f25d4 100644 --- a/server/ocsp_responsecache.go +++ b/server/ocsp_responsecache.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,7 +15,6 @@ package server import ( "bytes" - "encoding/json" "errors" "fmt" "io" @@ -27,6 +26,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "golang.org/x/crypto/ocsp" diff --git a/server/opts_test.go b/server/opts_test.go index 0755c7d02dd..443ce1f67a5 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2020 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,6 @@ package server import ( "bytes" "crypto/tls" - "encoding/json" "flag" "fmt" "net/url" @@ -29,6 +28,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/reload_test.go b/server/reload_test.go index 7a4c9d3281c..37de82601b8 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1,4 +1,4 @@ -// Copyright 2017-2022 The NATS Authors +// Copyright 2017-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,7 +17,6 @@ import ( "bytes" "crypto/tls" "encoding/base64" - "encoding/json" "flag" "fmt" "io" @@ -35,6 +34,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/route.go b/server/route.go index 0c455547c98..b0f56681748 100644 --- a/server/route.go +++ b/server/route.go @@ -16,7 +16,6 @@ package server import ( "bytes" "crypto/tls" - "encoding/json" "fmt" "math/rand" "net" @@ -28,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" ) diff --git a/server/routes_test.go b/server/routes_test.go index 6696fb7b7bd..0d7b24d1951 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2023 The NATS Authors +// Copyright 2013-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -18,7 +18,6 @@ import ( "bytes" "context" "crypto/tls" - "encoding/json" "fmt" "math/rand" "net" @@ -35,6 +34,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" diff --git a/server/server.go b/server/server.go index e6f6c728d42..200ba03767d 100644 --- a/server/server.go +++ b/server/server.go @@ -17,7 +17,6 @@ import ( "bytes" "context" "crypto/tls" - "encoding/json" "errors" "flag" "fmt" @@ -28,28 +27,28 @@ import ( "net" "net/http" "net/url" - "regexp" - "runtime/pprof" - "unicode" - - // Allow dynamic profiling. - _ "net/http/pprof" "os" "path" "path/filepath" + "regexp" "runtime" + "runtime/pprof" "strconv" "strings" "sync" "sync/atomic" "time" + "unicode" + + // Allow dynamic profiling. + _ "net/http/pprof" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" + "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" - - "github.com/nats-io/nats-server/v2/logger" ) const ( diff --git a/server/server_test.go b/server/server_test.go index 03cdc71baf3..23a752724e1 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "crypto/tls" - "encoding/json" "errors" "flag" "fmt" @@ -35,6 +34,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats.go" ) diff --git a/server/stream.go b/server/stream.go index 1c0824cdb4d..0ddf9bb448a 100644 --- a/server/stream.go +++ b/server/stream.go @@ -17,7 +17,6 @@ import ( "archive/tar" "bytes" "encoding/binary" - "encoding/json" "errors" "fmt" "io" @@ -32,6 +31,7 @@ import ( "sync/atomic" "time" + "github.com/goccy/go-json" "github.com/klauspost/compress/s2" "github.com/nats-io/nuid" ) diff --git a/server/util.go b/server/util.go index aea3dcf17e2..243dcb42b41 100644 --- a/server/util.go +++ b/server/util.go @@ -1,4 +1,4 @@ -// Copyright 2012-2019 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,6 @@ package server import ( "bytes" "context" - "encoding/json" "errors" "fmt" "math" @@ -27,6 +26,8 @@ import ( "strconv" "strings" "time" + + "github.com/goccy/go-json" ) // This map is used to store URLs string as the key with a reference count as diff --git a/server/websocket_test.go b/server/websocket_test.go index 368a9fe8d6f..31d3cd28026 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -19,7 +19,6 @@ import ( "crypto/tls" "encoding/base64" "encoding/binary" - "encoding/json" "errors" "fmt" "io" @@ -34,11 +33,11 @@ import ( "testing" "time" + "github.com/goccy/go-json" + "github.com/klauspost/compress/flate" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" - - "github.com/klauspost/compress/flate" ) type testReader struct { diff --git a/test/auth_test.go b/test/auth_test.go index 232ae717769..4d634a51c24 100644 --- a/test/auth_test.go +++ b/test/auth_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2019 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,12 +14,12 @@ package test import ( - "encoding/json" "fmt" "net" "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/gateway_test.go b/test/gateway_test.go index 49e7274c6c7..bbaad336089 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2019 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,7 +17,6 @@ import ( "bufio" "bytes" "crypto/tls" - "encoding/json" "fmt" "net" "net/url" @@ -25,6 +24,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) diff --git a/test/leafnode_test.go b/test/leafnode_test.go index a9451732818..a6103882c27 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -17,7 +17,6 @@ import ( "bytes" "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "math/rand" "net" @@ -30,6 +29,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats-server/v2/server" diff --git a/test/monitor_test.go b/test/monitor_test.go index 91d2f4f07db..661d8e475d7 100644 --- a/test/monitor_test.go +++ b/test/monitor_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2020 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,6 @@ package test import ( "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "io" "net" @@ -28,6 +27,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" ) diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 5810fe4eaf7..01155cf1814 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2022 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,12 +14,12 @@ package test import ( - "encoding/json" "fmt" "net" "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" diff --git a/test/norace_test.go b/test/norace_test.go index eb5252e2a63..d78766efe87 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -19,7 +19,6 @@ package test import ( "context" crand "crypto/rand" - "encoding/json" "fmt" "net" "net/url" @@ -31,6 +30,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/nats-io/nuid" diff --git a/test/ocsp_peer_test.go b/test/ocsp_peer_test.go index bc66e37dbf2..e97fa9a7e61 100644 --- a/test/ocsp_peer_test.go +++ b/test/ocsp_peer_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,7 +17,6 @@ import ( "context" "crypto/tls" "crypto/x509" - "encoding/json" "errors" "fmt" "io" @@ -27,6 +26,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" . "github.com/nats-io/nats-server/v2/internal/ocsp" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" @@ -3013,12 +3013,12 @@ func TestOCSPMonitoringPort(t *testing.T) { net: 127.0.0.1 port: -1 https: -1 - ocsp { + ocsp { mode = always url = http://127.0.0.1:18888 } store_dir = %s - + tls: { cert_file: "configs/certs/ocsp_peer/mini-ca/server1/TestServer1_bundle.pem" key_file: "configs/certs/ocsp_peer/mini-ca/server1/private/TestServer1_keypair.pem" diff --git a/test/ports_test.go b/test/ports_test.go index 68b05bbf4c7..2959c00af29 100644 --- a/test/ports_test.go +++ b/test/ports_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2019 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,7 +14,6 @@ package test import ( - "encoding/json" "errors" "fmt" "os" @@ -23,6 +22,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/proto_test.go b/test/proto_test.go index a03a4446ebc..1ab2df6e538 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2020 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,10 +14,10 @@ package test import ( - "encoding/json" "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/route_discovery_test.go b/test/route_discovery_test.go index 3c7a87da259..9c77790e431 100644 --- a/test/route_discovery_test.go +++ b/test/route_discovery_test.go @@ -1,4 +1,4 @@ -// Copyright 2015-2019 The NATS Authors +// Copyright 2015-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,7 +15,6 @@ package test import ( "bufio" - "encoding/json" "fmt" "io" "net" @@ -26,6 +25,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) diff --git a/test/routes_test.go b/test/routes_test.go index 16254f06c83..76180a63ffd 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2019 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,7 +14,6 @@ package test import ( - "encoding/json" "fmt" "io" "net" @@ -25,6 +24,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/internal/testhelper" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" diff --git a/test/service_latency_test.go b/test/service_latency_test.go index ca9c35ede67..6c7e167a7f8 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,7 +14,6 @@ package test import ( - "encoding/json" "fmt" "math/rand" "net/http" @@ -26,6 +25,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" diff --git a/test/test.go b/test/test.go index 072b8ca95cd..5404bee830b 100644 --- a/test/test.go +++ b/test/test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2019 The NATS Authors +// Copyright 2012-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,7 +16,6 @@ package test import ( "crypto/rand" "encoding/hex" - "encoding/json" "fmt" "io" "net" @@ -28,6 +27,7 @@ import ( "testing" "time" + "github.com/goccy/go-json" "github.com/nats-io/nats-server/v2/server" ) From af02887346d6f02902140294c02aed7f541c34d3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 23 Nov 2024 18:45:31 -0600 Subject: [PATCH 03/10] Fix for test that depended on stdlib json decode to wrong type to work. Signed-off-by: Derek Collison --- server/norace_test.go | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index b0d08d5482e..198bfcccb08 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -5371,7 +5371,7 @@ func TestNoRaceJetStreamClusterStreamNamesAndInfosMoreThanAPILimit(t *testing.T) createStream(name) } - // Not using the JS API here beacause we want to make sure that the + // Not using the JS API here because we want to make sure that the // server returns the proper Total count, but also that it does not // send more than when the API limit is in one go. check := func(subj string, limit int) { @@ -5379,19 +5379,35 @@ func TestNoRaceJetStreamClusterStreamNamesAndInfosMoreThanAPILimit(t *testing.T) nreq := JSApiStreamNamesRequest{} b, _ := json.Marshal(nreq) + msg, err := nc.Request(subj, b, 2*time.Second) require_NoError(t, err) - nresp := JSApiStreamNamesResponse{} - json.Unmarshal(msg.Data, &nresp) - if n := nresp.ApiPaged.Total; n != max { - t.Fatalf("Expected total to be %v, got %v", max, n) - } - if n := nresp.ApiPaged.Limit; n != limit { - t.Fatalf("Expected limit to be %v, got %v", limit, n) - } - if n := len(nresp.Streams); n != limit { - t.Fatalf("Expected number of streams to be %v, got %v", limit, n) + switch subj { + case JSApiStreams: + nresp := JSApiStreamNamesResponse{} + json.Unmarshal(msg.Data, &nresp) + if n := nresp.ApiPaged.Total; n != max { + t.Fatalf("Expected total to be %v, got %v", max, n) + } + if n := nresp.ApiPaged.Limit; n != limit { + t.Fatalf("Expected limit to be %v, got %v", limit, n) + } + if n := len(nresp.Streams); n != limit { + t.Fatalf("Expected number of streams to be %v, got %v", limit, n) + } + case JSApiStreamList: + nresp := JSApiStreamListResponse{} + json.Unmarshal(msg.Data, &nresp) + if n := nresp.ApiPaged.Total; n != max { + t.Fatalf("Expected total to be %v, got %v", max, n) + } + if n := nresp.ApiPaged.Limit; n != limit { + t.Fatalf("Expected limit to be %v, got %v", limit, n) + } + if n := len(nresp.Streams); n != limit { + t.Fatalf("Expected number of streams to be %v, got %v", limit, n) + } } } From 53564d7ef960fe383291169901e15e47bff5eea0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 24 Nov 2024 10:07:24 -0600 Subject: [PATCH 04/10] Increase minimum interval for meta snapshots and do not pre-empt on consumer removals to avoind excesive snapshotting. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 8470873eccb..476841cd75a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1320,7 +1320,7 @@ func (js *jetStream) monitorCluster() { isLeader bool lastSnapTime time.Time compactSizeMin = uint64(8 * 1024 * 1024) // 8MB - minSnapDelta = 10 * time.Second + minSnapDelta = 30 * time.Second ) // Highwayhash key for generating hashes. @@ -1410,7 +1410,7 @@ func (js *jetStream) monitorCluster() { go checkHealth() continue } - if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { + if didSnap, didStreamRemoval, _, err := js.applyMetaEntries(ce.Entries, ru); err == nil { var nb uint64 // Some entries can fail without an error when shutting down, don't move applied forward. if !js.isShuttingDown() { @@ -1418,8 +1418,6 @@ func (js *jetStream) monitorCluster() { } if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) { doSnapshot() - } else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 { - doSnapshot() } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } From 85fb627cb58901a451f6f98c1a2a1812b1c9cc96 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 25 Nov 2024 15:31:28 +0100 Subject: [PATCH 05/10] [FIXED] Hold JS lock when accessing stream assignment Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 3 ++- server/jetstream_cluster_2_test.go | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 476841cd75a..240a47e8611 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4235,8 +4235,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { return } + js.mu.Lock() sa := js.streamAssignment(accName, stream) if sa == nil { + js.mu.Unlock() s.Debugf("Consumer create failed, could not locate stream '%s > %s'", accName, stream) return } @@ -4248,7 +4250,6 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { var wasExisting bool // Check if we have an existing consumer assignment. - js.mu.Lock() if sa.consumers == nil { sa.consumers = make(map[string]*consumerAssignment) } else if oca := sa.consumers[ca.Name]; oca != nil { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 473e6324c14..7bb5db1bea0 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2075,11 +2075,13 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { metaLeader := c.leader() mjs := metaLeader.getJetStream() + mjs.mu.RLock() sa := mjs.streamAssignment(globalAccountName, "MAXCC") require_NotNil(t, sa) for _, ca := range sa.consumers { require_False(t, ca.pending) } + mjs.mu.RUnlock() } func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) { From 95050f6183e56b3b1b8b022fb524cfe03075b9f5 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Mon, 25 Nov 2024 20:03:52 -0500 Subject: [PATCH 06/10] fix(leafnode): credentials parsing for leafnode connections doesn't handle CRLFs correctly (#6175) Leaf node credential parsing was using a regular expression that didn't handle CRLFs correctly Fix #6167 Signed-off-by: Your Name --------- Signed-off-by: Alberto Ricart --- server/leafnode.go | 2 +- server/leafnode_test.go | 97 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/server/leafnode.go b/server/leafnode.go index b2dd38ab363..bf69b0a3f04 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -774,7 +774,7 @@ func (s *Server) startLeafNodeAcceptLoop() { } // RegEx to match a creds file with user JWT and Seed. -var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`) +var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`) // clusterName is provided as argument to avoid lock ordering issues with the locked client c // Lock should be held entering here. diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 9c8bb729006..9fb8743d7b9 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -24,6 +24,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" "reflect" "strings" "sync" @@ -8831,3 +8832,99 @@ func TestLeafNodeServerKickClient(t *testing.T) { require_True(t, lid != ln.Cid) require_True(t, ln.Start.After(disconnectTime)) } + +func TestLeafCredFormatting(t *testing.T) { + //create the operator/sys/account tree + oKP, err := nkeys.CreateOperator() + require_NoError(t, err) + oPK, err := oKP.PublicKey() + require_NoError(t, err) + + oc := jwt.NewOperatorClaims(oPK) + oc.Name = "O" + oJWT, err := oc.Encode(oKP) + require_NoError(t, err) + + sysKP, err := nkeys.CreateAccount() + require_NoError(t, err) + sysPK, err := sysKP.PublicKey() + require_NoError(t, err) + + sys := jwt.NewAccountClaims(sysPK) + sys.Name = "SYS" + sysJWT, err := sys.Encode(oKP) + require_NoError(t, err) + + aKP, err := nkeys.CreateAccount() + require_NoError(t, err) + aPK, err := aKP.PublicKey() + require_NoError(t, err) + + ac := jwt.NewAccountClaims(aPK) + ac.Name = "A" + aJWT, err := ac.Encode(oKP) + require_NoError(t, err) + + uKP, err := nkeys.CreateUser() + require_NoError(t, err) + uSeed, err := uKP.Seed() + require_NoError(t, err) + uPK, err := uKP.PublicKey() + require_NoError(t, err) + + // build the config + stmpl := fmt.Sprintf(` + listen: 127.0.0.1:-1 + operator: %s + system_account: %s + resolver: MEM + resolver_preload: { + %s: %s + %s: %s + } + leaf { listen: 127.0.0.1:-1 } + `, oJWT, sysPK, sysPK, sysJWT, aPK, aJWT) + conf := createConfFile(t, []byte(stmpl)) + s, o := RunServerWithConfig(conf) + defer s.Shutdown() + + // create the leaf node + // generate the user credentials + uc := jwt.NewUserClaims(uPK) + uc.Name = "U" + uc.Limits.Data = -1 + uc.Limits.Payload = -1 + uc.Permissions.Pub.Allow.Add(">") + uc.Permissions.Sub.Allow.Add(">") + uJWT, err := uc.Encode(aKP) + require_NoError(t, err) + + runLeaf := func(t *testing.T, creds []byte) { + file, err := os.CreateTemp("", "tmp-*.creds") + require_NoError(t, err) + _, err = file.Write(creds) + require_NoError(t, err) + require_NoError(t, file.Close()) + + template := fmt.Sprintf(` + listen: 127.0.0.1:-1 + leaf { remotes: [ + { + urls: [ nats-leaf://127.0.0.1:%d ] + credentials: "%s" + } + ] }`, o.LeafNode.Port, file.Name()) + + conf := createConfFile(t, []byte(template)) + leaf, _ := RunServerWithConfig(conf) + defer leaf.Shutdown() + defer os.Remove(file.Name()) + checkLeafNodeConnected(t, leaf) + } + + creds, err := jwt.FormatUserConfig(uJWT, uSeed) + require_NoError(t, err) + + runLeaf(t, creds) + runLeaf(t, bytes.ReplaceAll(creds, []byte{'\n'}, []byte{'\r', '\n'})) +} From a999f6a66c1d3e97de929429c57b6540545d28ed Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 25 Nov 2024 15:57:35 -0600 Subject: [PATCH 07/10] Move jetstream clustered and sys account to atomics. Add in lookup for both stream and consumer assignments at the same time. Signed-off-by: Derek Collison --- server/jetstream.go | 4 ++++ server/jetstream_cluster.go | 14 +++++++++----- server/server.go | 14 +++++++------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index e0b17996014..6e43842ce4d 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -461,6 +461,8 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { if err := s.enableJetStreamClustering(); err != nil { return err } + // Set our atomic bool to clustered. + s.jsClustered.Store(true) } // Mark when we are up and running. @@ -965,6 +967,8 @@ func (s *Server) shutdownJetStream() { cc.c = nil } cc.meta = nil + // Set our atomic bool to false. + s.jsClustered.Store(false) } js.mu.Unlock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 240a47e8611..222ae8d744b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -224,11 +224,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { } func (s *Server) JetStreamIsClustered() bool { - js := s.getJetStream() - if js == nil { - return false - } - return js.isClustered() + return s.jsClustered.Load() } func (s *Server) JetStreamIsLeader() bool { @@ -4720,6 +4716,14 @@ func (js *jetStream) consumerAssignment(account, stream, consumer string) *consu return nil } +// Return both the stream and consumer assignments. +func (js *jetStream) assignments(account, stream, consumer string) (*streamAssignment, *consumerAssignment) { + if sa := js.streamAssignment(account, stream); sa != nil { + return sa, sa.consumers[consumer] + } + return nil, nil +} + // consumerAssigned informs us if this server has this consumer assigned. func (jsa *jsAccount) consumerAssigned(stream, consumer string) bool { jsa.mu.RLock() diff --git a/server/server.go b/server/server.go index 200ba03767d..7f39e67110e 100644 --- a/server/server.go +++ b/server/server.go @@ -140,8 +140,10 @@ type Server struct { listenerErr error gacc *Account sys *internal + sysAcc atomic.Pointer[Account] js atomic.Pointer[jetStream] isMetaLeader atomic.Bool + jsClustered atomic.Bool accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32 @@ -1280,6 +1282,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) if err == nil && s.sys != nil && acc != s.sys.account { // sys.account.clients (including internal client)/respmap/etc... are transferred separately s.sys.account = acc + s.sysAcc.Store(acc) } if err != nil { return awcsti, fmt.Errorf("error resolving system account: %v", err) @@ -1635,13 +1638,7 @@ func (s *Server) SetSystemAccount(accName string) error { // SystemAccount returns the system account if set. func (s *Server) SystemAccount() *Account { - var sacc *Account - s.mu.RLock() - if s.sys != nil { - sacc = s.sys.account - } - s.mu.RUnlock() - return sacc + return s.sysAcc.Load() } // GlobalAccount returns the global account. @@ -1713,6 +1710,9 @@ func (s *Server) setSystemAccount(acc *Account) error { s.sys.wg.Add(1) s.mu.Unlock() + // Store in atomic for fast lookup. + s.sysAcc.Store(acc) + // Register with the account. s.sys.client.registerWithAccount(acc) From 77a5ba595b7f523a85bbe2d6077f591642a4010d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 25 Nov 2024 16:00:30 -0600 Subject: [PATCH 08/10] Bypass meta-layer if a consumerInfo request is for a missing consumer or stream. Processing a missing consumer was a heavier operation on the meta-layer and could lead to API queue buildup. This processes that condition at point of entry to the system vs at the meta-layer. If the consumer assignment exists we do as normal for now. Signed-off-by: Derek Collison --- server/client.go | 20 ++++++++-- server/jetstream_api.go | 63 +++++++++++++++++++++++++++++- server/jetstream_cluster.go | 1 + server/jetstream_cluster_1_test.go | 15 ++++--- server/norace_test.go | 57 +++++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 11 deletions(-) diff --git a/server/client.go b/server/client.go index 472ec77e7a4..c4e6ba27b47 100644 --- a/server/client.go +++ b/server/client.go @@ -4160,8 +4160,9 @@ func getHeader(key string, hdr []byte) []byte { // For bytes.HasPrefix below. var ( - jsRequestNextPreB = []byte(jsRequestNextPre) - jsDirectGetPreB = []byte(jsDirectGetPre) + jsRequestNextPreB = []byte(jsRequestNextPre) + jsDirectGetPreB = []byte(jsDirectGetPre) + jsConsumerInfoPreB = []byte(JSApiConsumerInfoPre) ) // processServiceImport is an internal callback when a subscription matches an imported service @@ -4181,12 +4182,16 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt } } + var checkJS, checkConsumerInfo bool + acc.mu.RLock() - var checkJS bool shouldReturn := si.invalid || acc.sl == nil if !shouldReturn && !isResponse && si.to == jsAllAPI { if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) { checkJS = true + } else if len(c.pa.psi) == 0 && bytes.HasPrefix(c.pa.subject, jsConsumerInfoPreB) { + // Only check if we are clustered and expecting a reply. + checkConsumerInfo = len(c.pa.reply) > 0 && c.srv.JetStreamIsClustered() } } siAcc := si.acc @@ -4200,6 +4205,15 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt return } + // Here we will do a fast check for consumer info only to check if it does not exists. This will spread the + // load to all servers with connected clients since service imports are processed at point of entry. + // Only call for clustered setups. + if checkConsumerInfo && si.se != nil && si.se.acc == c.srv.SystemAccount() { + if c.srv.jsConsumerProcessMissing(c, acc) { + return + } + } + var nrr []byte var rsi *serviceImport diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6ad152e471b..d60c93283ec 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -158,8 +158,9 @@ const ( // JSApiConsumerInfo is for obtaining general information about a consumer. // Will return JSON response. - JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*" - JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s" + JSApiConsumerInfoPre = "$JS.API.CONSUMER.INFO." + JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*" + JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s" // JSApiConsumerDelete is the endpoint to delete consumers. // Will return JSON response. @@ -972,6 +973,15 @@ func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response) } +// Use the account acc to send actual result from non-system account. +func (s *Server) sendAPIErrResponseFromAccount(ci *ClientInfo, acc *Account, subject, reply, request, response string) { + acc.trackAPIErr() + if reply != _EMPTY_ { + s.sendInternalAccountMsg(acc, reply, response) + } + s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response) +} + const errRespDelay = 500 * time.Millisecond func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup) { @@ -4233,6 +4243,55 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } +// This will be a quick check on point of entry for a consumer that does +// not exist. If that is the case we will return the response and return +// true which will shortcut the service import to alleviate pressure on +// the JS API queues. +func (s *Server) jsConsumerProcessMissing(c *client, acc *Account) bool { + subject := bytesToString(c.pa.subject) + streamName, consumerName := streamNameFromSubject(subject), consumerNameFromSubject(subject) + + // Check to make sure the consumer is assigned. + // All JS servers will have the meta information. + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil { + return false + } + js.mu.RLock() + sa, ca := js.assignments(acc.Name, streamName, consumerName) + js.mu.RUnlock() + + // If we have a consumer assignment return false here and let normally processing takeover. + if ca != nil { + return false + } + + // We can't find the consumer, so mimic what would be the errors below. + var resp = JSApiConsumerInfoResponse{ApiResponse: ApiResponse{Type: JSApiConsumerInfoResponseType}} + + // Need to make subject and reply real here for queued response processing. + subject = string(c.pa.subject) + reply := string(c.pa.reply) + + ci := c.getClientInfo(true) + + if hasJS, doErr := acc.checkJetStream(); !hasJS { + if doErr { + resp.Error = NewJSNotEnabledForAccountError() + s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + } + } else if sa == nil { + resp.Error = NewJSStreamNotFoundError() + s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + } else { + // If we are here the consumer is not present. + resp.Error = NewJSConsumerNotFoundError() + s.sendAPIErrResponseFromAccount(ci, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp)) + } + + return true +} + // Request for information about an consumer. func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 222ae8d744b..3cda1f77990 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4717,6 +4717,7 @@ func (js *jetStream) consumerAssignment(account, stream, consumer string) *consu } // Return both the stream and consumer assignments. +// Lock should be held. func (js *jetStream) assignments(account, stream, consumer string) (*streamAssignment, *consumerAssignment) { if sa := js.streamAssignment(account, stream); sa != nil { return sa, sa.consumers[consumer] diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 3621089eeec..562bbfa5915 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -3412,7 +3412,6 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { // Go client will lag so use direct for now. getAccountInfo := func() *nats.AccountInfo { t.Helper() - info, err := js.AccountInfo() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -3437,10 +3436,13 @@ func TestJetStreamClusterExtendedAccountInfo(t *testing.T) { js.ConsumerInfo("TEST-2", "NO-CONSUMER") js.ConsumerInfo("TEST-3", "NO-CONSUMER") - ai = getAccountInfo() - if ai.API.Errors != 4 { - t.Fatalf("Expected 4 API calls to be errors, got %d", ai.API.Errors) - } + checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { + ai = getAccountInfo() + if ai.API.Errors != 4 { + return fmt.Errorf("Expected 4 API calls to be errors, got %d", ai.API.Errors) + } + return nil + }) } func TestJetStreamClusterPeerRemovalAPI(t *testing.T) { @@ -4319,7 +4321,8 @@ func TestJetStreamClusterNoQuorumStepdown(t *testing.T) { if err := js.DeleteConsumer("NO-Q", "dlc"); !notAvailableErr(err) { t.Fatalf("Expected an 'unavailable' error, got %v", err) } - if _, err := js.ConsumerInfo("NO-Q", "dlc"); !notAvailableErr(err) { + // Since we did not create the consumer our bypass will respond from the local server. + if _, err := js.ConsumerInfo("NO-Q", "dlc"); err != nats.ErrConsumerNotFound { t.Fatalf("Expected an 'unavailable' error, got %v", err) } // Listers diff --git a/server/norace_test.go b/server/norace_test.go index 198bfcccb08..2d4e94e612f 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11257,3 +11257,60 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) { require_NoError(t, n.InstallSnapshot(snap)) t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap))) } + +func TestNoRaceJetStreamClusterInfoOnMissingConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Create a stream just so the consumer info processing misses on the consumer only. + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + done := make(chan bool) + pending := make(chan int, 1) + + // Check to make sure we never have any pending on the API queue. + go func() { + ml := c.leader() + for { + select { + case <-done: + return + case <-time.After(100 * time.Millisecond): + qlen := ml.jsAPIRoutedReqs.len() + int(ml.jsAPIRoutedReqs.inProgress()) + if qlen > 0 { + pending <- qlen + return + } + } + } + }() + + wg := sync.WaitGroup{} + wg.Add(500) + for i := 0; i < 500; i++ { + go func() { + defer wg.Done() + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + // Check for non-existent consumers. + for c := 0; c < 1000; c++ { + _, err := js.ConsumerInfo("TEST", fmt.Sprintf("C-%d", c)) + require_Error(t, err) + } + }() + } + wg.Wait() + close(done) + if len(pending) > 0 { + t.Fatalf("Saw API pending of %d, expected always 0", <-pending) + } +} From 6f3ab2f100963e6c01a00b88c564893d9255048f Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 26 Nov 2024 06:54:36 -0800 Subject: [PATCH 09/10] Add logging for large metalayer snapshots duration Signed-off-by: Waldemar Quevedo --- server/jetstream_cluster.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3cda1f77990..d3bb8f58f8b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1529,9 +1529,12 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf } func (js *jetStream) metaSnapshot() []byte { + start := time.Now() js.mu.RLock() + s := js.srv cc := js.cluster nsa := 0 + nca := 0 for _, asa := range cc.streams { nsa += len(asa) } @@ -1553,6 +1556,7 @@ func (js *jetStream) metaSnapshot() []byte { continue } wsa.Consumers = append(wsa.Consumers, ca) + nca++ } streams = append(streams, wsa) } @@ -1563,10 +1567,15 @@ func (js *jetStream) metaSnapshot() []byte { return nil } + mstart := time.Now() b, _ := json.Marshal(streams) js.mu.RUnlock() - - return s2.Encode(nil, b) + snap := s2.Encode(nil, b) + if took := time.Since(start); took > time.Second { + s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, uncompressed: %d, compressed: %d)", + took.Seconds(), nsa, nca, time.Since(mstart).Seconds(), len(b), len(snap)) + } + return snap } func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error { From 97a4010fdb5978ad81b2fc99f0de74c19ad84446 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 26 Nov 2024 16:41:05 +0000 Subject: [PATCH 10/10] Improve granuality of meta snapshot measurements Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index d3bb8f58f8b..680909d5b90 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1567,13 +1567,21 @@ func (js *jetStream) metaSnapshot() []byte { return nil } + // Track how long it took to marshal the JSON mstart := time.Now() b, _ := json.Marshal(streams) + mend := time.Since(mstart) + js.mu.RUnlock() + + // Track how long it took to compress the JSON + cstart := time.Now() snap := s2.Encode(nil, b) + cend := time.Since(cstart) + if took := time.Since(start); took > time.Second { - s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, uncompressed: %d, compressed: %d)", - took.Seconds(), nsa, nca, time.Since(mstart).Seconds(), len(b), len(snap)) + s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)", + took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap)) } return snap }