diff --git a/.travis.yml b/.travis.yml index 1c62c7788be..cb32ceb1e0f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,8 +8,8 @@ language: go go: # This should be quoted or use .x, but should not be unquoted. # Remember that a YAML bare float drops trailing zeroes. - - "1.22.5" - - "1.21.12" + - "1.22.6" + - "1.21.13" go_import_path: github.com/nats-io/nats-server @@ -47,7 +47,7 @@ jobs: - name: "Run all tests from all other packages" env: TEST_SUITE=non_srv_pkg_tests - name: "Compile with older Go release" - go: "1.20" + go: "1.21.x" env: TEST_SUITE=build_only script: ./scripts/runTestsOnTravis.sh $TEST_SUITE diff --git a/go.mod b/go.mod index dd1f4cb832d..0e6c6627f21 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/nats-io/nats-server/v2 -go 1.21 +go 1.21.0 require ( github.com/klauspost/compress v1.17.9 @@ -11,6 +11,6 @@ require ( github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.5.3 golang.org/x/crypto v0.25.0 - golang.org/x/sys v0.22.0 - golang.org/x/time v0.5.0 + golang.org/x/sys v0.23.0 + golang.org/x/time v0.6.0 ) diff --git a/go.sum b/go.sum index 273dd845241..c7e25f5acf1 100644 --- a/go.sum +++ b/go.sum @@ -25,7 +25,11 @@ golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/ golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/accounts.go b/server/accounts.go index 4b24903a0d1..5e2609c158a 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1478,11 +1478,13 @@ func (a *Account) addServiceImportWithClaim(destination *Account, from, to strin } // Check if this introduces a cycle before proceeding. - if err := a.serviceImportFormsCycle(destination, from); err != nil { - return err + // From will be the mapped subject. + // If the 'to' has a wildcard make sure we pre-transform the 'from' before we check for cycles, e.g. '$1' + fromT := from + if subjectHasWildcard(to) { + fromT, _ = transformUntokenize(from) } - - if err := a.serviceImportFormsCycle(destination, to); err != nil { + if err := a.serviceImportFormsCycle(destination, fromT); err != nil { return err } @@ -1807,7 +1809,7 @@ func (a *Account) _checkForReverseEntry(reply string, si *serviceImport, checkIn // Note that if we are here reply has to be a literal subject. if checkInterest { // If interest still exists we can not clean these up yet. - if rr := a.sl.Match(reply); len(rr.psubs)+len(rr.qsubs) > 0 { + if a.sl.HasInterest(reply) { a.mu.RUnlock() return } @@ -1925,6 +1927,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im tr *subjectTransform err error ) + if subjectHasWildcard(to) { // If to and from match, then we use the published subject. if to == from { @@ -3774,7 +3777,7 @@ func fetchAccount(res AccountResolver, name string) (string, error) { if !nkeys.IsValidPublicAccountKey(name) { return _EMPTY_, fmt.Errorf("will only fetch valid account keys") } - return res.Fetch(name) + return res.Fetch(copyString(name)) } // AccountResolver interface. This is to fetch Account JWTs by public nkeys diff --git a/server/auth.go b/server/auth.go index 716ecbfb4db..bdaf73350cf 100644 --- a/server/auth.go +++ b/server/auth.go @@ -978,12 +978,12 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au deniedSub := []string{} for _, sub := range denyAllJs { if c.perms.pub.deny != nil { - if r := c.perms.pub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 { + if c.perms.pub.deny.HasInterest(sub) { deniedPub = append(deniedPub, sub) } } if c.perms.sub.deny != nil { - if r := c.perms.sub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 { + if c.perms.sub.deny.HasInterest(sub) { deniedSub = append(deniedSub, sub) } } diff --git a/server/client.go b/server/client.go index e68d70fa947..3c48a0ae9e2 100644 --- a/server/client.go +++ b/server/client.go @@ -312,6 +312,8 @@ type outbound struct { cw *s2.Writer } +const nbMaxVectorSize = 1024 // == IOV_MAX on Linux/Darwin and most other Unices (except Solaris/AIX) + const nbPoolSizeSmall = 512 // Underlying array size of small buffer const nbPoolSizeMedium = 4096 // Underlying array size of medium buffer const nbPoolSizeLarge = 65536 // Underlying array size of large buffer @@ -1611,7 +1613,7 @@ func (c *client) flushOutbound() bool { // referenced in c.out.nb (which can be modified in queueOutboud() while // the lock is released). c.out.wnb = append(c.out.wnb, collapsed...) - var _orig [1024][]byte + var _orig [nbMaxVectorSize][]byte orig := append(_orig[:0], c.out.wnb...) // Since WriteTo is lopping things off the beginning, we need to remember @@ -1622,13 +1624,31 @@ func (c *client) flushOutbound() bool { // flush here start := time.Now() - // FIXME(dlc) - writev will do multiple IOs past 1024 on - // most platforms, need to account for that with deadline? - nc.SetWriteDeadline(start.Add(wdl)) - - // Actual write to the socket. - n, err := c.out.wnb.WriteTo(nc) - nc.SetWriteDeadline(time.Time{}) + var n int64 // Total bytes written + var wn int64 // Bytes written per loop + var err error // Error from last write, if any + for len(c.out.wnb) > 0 { + // Limit the number of vectors to no more than nbMaxVectorSize, + // which if 1024, will mean a maximum of 64MB in one go. + wnb := c.out.wnb + if len(wnb) > nbMaxVectorSize { + wnb = wnb[:nbMaxVectorSize] + } + consumed := len(wnb) + + // Actual write to the socket. + nc.SetWriteDeadline(start.Add(wdl)) + wn, err = wnb.WriteTo(nc) + nc.SetWriteDeadline(time.Time{}) + + // Update accounting, move wnb slice onwards if needed, or stop + // if a write error was reported that wasn't a short write. + n += wn + c.out.wnb = c.out.wnb[consumed-len(wnb):] + if err != nil && err != io.ErrShortWrite { + break + } + } lft := time.Since(start) @@ -1810,7 +1830,9 @@ func (c *client) markConnAsClosed(reason ClosedState) { if nc := c.nc; nc != nil && c.srv != nil { // TODO: May want to send events to single go routine instead // of creating a new go routine for each save. - go c.srv.saveClosedClient(c, nc, reason) + // Pass the c.subs as a reference. It may be set to nil in + // closeConnection. + go c.srv.saveClosedClient(c, nc, c.subs, reason) } } // If writeLoop exists, let it do the final flush, close and teardown. @@ -3964,7 +3986,7 @@ func (c *client) subForReply(reply []byte) *subscription { func (c *client) handleGWReplyMap(msg []byte) bool { // Check for leaf nodes if c.srv.gwLeafSubs.Count() > 0 { - if r := c.srv.gwLeafSubs.Match(string(c.pa.subject)); len(r.psubs) > 0 { + if r := c.srv.gwLeafSubs.MatchBytes(c.pa.subject); len(r.psubs) > 0 { c.processMsgResults(c.acc, r, msg, c.pa.deliver, c.pa.subject, c.pa.reply, pmrNoFlag) } } @@ -5284,6 +5306,14 @@ func (c *client) closeConnection(reason ClosedState) { } } + // Now that we are done with subscriptions, clear the field so that the + // connection can be released and gc'ed. + if kind == CLIENT || kind == LEAF { + c.mu.Lock() + c.subs = nil + c.mu.Unlock() + } + // Don't reconnect connections that have been marked with // the no reconnect flag. if noReconnect { @@ -5441,14 +5471,14 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { } } else { // Match correct account and sublist. - if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { + if acc, _ = c.srv.LookupAccount(bytesToString(c.pa.account)); acc == nil { return nil, nil } } sl := acc.sl // Match against the account sublist. - r = sl.Match(string(c.pa.subject)) + r = sl.MatchBytes(c.pa.subject) // Check if we need to prune. if len(c.in.pacache) >= maxPerAccountCacheSize { diff --git a/server/client_test.go b/server/client_test.go index ed42b9568cc..59e394f35fa 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2137,7 +2137,7 @@ type testConnWritePartial struct { func (c *testConnWritePartial) Write(p []byte) (int, error) { n := len(p) if c.partial { - n = 15 + n = n/2 + 1 } return c.buf.Write(p[:n]) } @@ -2962,3 +2962,71 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) { t.Fatalf("Expected headers to be stripped, got %q", hdr) } } + +func TestClientFlushOutboundNoSlowConsumer(t *testing.T) { + opts := DefaultOptions() + opts.MaxPending = 1024 * 1024 * 140 // 140MB + opts.MaxPayload = 1024 * 1024 * 16 // 16MB + opts.WriteDeadline = time.Second * 30 + s := RunServer(opts) + defer s.Shutdown() + + nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + defer nc.Close() + + proxy := newNetProxy(0, 1024*1024*8, 1024*1024*8, s.ClientURL()) // 8MB/s + defer proxy.stop() + + wait := make(chan error) + + nca, err := nats.Connect(proxy.clientURL()) + require_NoError(t, err) + nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) { + wait <- err + close(wait) + }) + + ncb, err := nats.Connect(s.ClientURL()) + require_NoError(t, err) + + _, err = nca.Subscribe("test", func(msg *nats.Msg) { + wait <- nil + }) + require_NoError(t, err) + + // Publish 128MB of data onto the test subject. This will + // mean that the outbound queue for nca has more than 64MB, + // which is the max we will send into a single writev call. + payload := make([]byte, 1024*1024*16) // 16MB + for i := 0; i < 8; i++ { + require_NoError(t, ncb.Publish("test", payload)) + } + + // Get the client ID for nca. + cid, err := nca.GetClientID() + require_NoError(t, err) + + // Check that the client queue has more than 64MB queued + // up in it. + s.mu.RLock() + ca := s.clients[cid] + s.mu.RUnlock() + ca.mu.Lock() + pba := ca.out.pb + ca.mu.Unlock() + require_True(t, pba > 1024*1024*64) + + // Wait for our messages to be delivered. This will take + // a few seconds as the client is limited to 8MB/s, so it + // can't deliver messages to us as quickly as the other + // client can publish them. + var msgs int + for err := range wait { + require_NoError(t, err) + msgs++ + if msgs == 8 { + break + } + } + require_Equal(t, msgs, 8) +} diff --git a/server/consumer.go b/server/consumer.go index 6eb0b4745ba..a2693470cf2 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1012,8 +1012,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Check if we are running only 1 replica and that the delivery subject has interest. // Check in place here for interest. Will setup properly in setLeader. if config.replicas(&mset.cfg) == 1 { - r := o.acc.sl.Match(o.cfg.DeliverSubject) - if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) { + interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject) + if !o.hasDeliveryInterest(interest) { // Let the interest come to us eventually, but setup delete timer. o.updateDeliveryInterest(false) } @@ -1508,7 +1508,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool { gw.RLock() defer gw.RUnlock() for _, gwc := range gw.outo { - psi, qr := gwc.gatewayInterest(account, subject) + psi, qr := gwc.gatewayInterest(account, stringToBytes(subject)) if psi || qr != nil { return true } @@ -1802,8 +1802,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error { if ncfg.DeliverSubject == _EMPTY_ { return errors.New("can not update push consumer to pull based") } - rr := acc.sl.Match(cfg.DeliverSubject) - if len(rr.psubs)+len(rr.qsubs) != 0 { + if acc.sl.HasInterest(cfg.DeliverSubject) { return NewJSConsumerNameExistError() } } @@ -3221,8 +3220,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { } if wr.expires.IsZero() || time.Now().Before(wr.expires) { - rr := wr.acc.sl.Match(wr.interest) - if len(rr.psubs)+len(rr.qsubs) > 0 { + if wr.acc.sl.HasInterest(wr.interest) { return o.waiting.pop() } else if time.Since(wr.received) < defaultGatewayRecentSubExpiration && (o.srv.leafNodeEnabled || o.srv.gateway.enabled) { return o.waiting.pop() @@ -3649,8 +3647,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { continue } // Now check interest. - rr := wr.acc.sl.Match(wr.interest) - interest := len(rr.psubs)+len(rr.qsubs) > 0 + interest := wr.acc.sl.HasInterest(wr.interest) if !interest && (s.leafNodeEnabled || s.gateway.enabled) { // If we are here check on gateways and leaf nodes (as they can mask gateways on the other end). // If we have interest or the request is too young break and do not expire. @@ -4954,9 +4951,9 @@ func (o *consumer) isActive() bool { // hasNoLocalInterest return true if we have no local interest. func (o *consumer) hasNoLocalInterest() bool { o.mu.RLock() - rr := o.acc.sl.Match(o.cfg.DeliverSubject) + interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject) o.mu.RUnlock() - return len(rr.psubs)+len(rr.qsubs) == 0 + return !interest } // This is when the underlying stream has been purged. @@ -5320,13 +5317,13 @@ func (o *consumer) switchToEphemeral() { o.mu.Lock() o.cfg.Durable = _EMPTY_ store, ok := o.store.(*consumerFileStore) - rr := o.acc.sl.Match(o.cfg.DeliverSubject) + interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject) // Setup dthresh. o.updateInactiveThreshold(&o.cfg) o.mu.Unlock() // Update interest - o.updateDeliveryInterest(len(rr.psubs)+len(rr.qsubs) > 0) + o.updateDeliveryInterest(interest) // Write out new config if ok { store.updateConfig(o.cfg) diff --git a/server/errors.go b/server/errors.go index 8efa7ac02ea..c096bbef92b 100644 --- a/server/errors.go +++ b/server/errors.go @@ -180,6 +180,9 @@ var ( // ErrClusterNameRemoteConflict signals that a remote server has a different cluster name. ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts") + // ErrClusterNameHasSpaces signals that the cluster name contains spaces, which is not allowed. + ErrClusterNameHasSpaces = errors.New("cluster name cannot contain spaces or new lines") + // ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules. ErrMalformedSubject = errors.New("malformed subject") diff --git a/server/filestore.go b/server/filestore.go index 975e312ed89..a1a54fb3e0b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -27,6 +27,7 @@ import ( "fmt" "hash" "io" + "io/fs" "math" "net" "os" @@ -765,9 +766,7 @@ func (fs *fileStore) setupAEK() error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - <-dios - err = os.WriteFile(keyFile, encrypted, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms) if err != nil { return err } @@ -803,9 +802,7 @@ func (fs *fileStore) writeStreamMeta() error { b = fs.aek.Seal(nonce, nonce, b, nil) } - <-dios - err = os.WriteFile(meta, b, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(meta, b, defaultFilePerms) if err != nil { return err } @@ -813,9 +810,7 @@ func (fs *fileStore) writeStreamMeta() error { fs.hh.Write(b) checksum := hex.EncodeToString(fs.hh.Sum(nil)) sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) - <-dios - err = os.WriteFile(sum, []byte(checksum), defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms) if err != nil { return err } @@ -1206,9 +1201,7 @@ func (mb *msgBlock) convertCipher() error { // the old keyfile back. if err := fs.genEncryptionKeysForBlock(mb); err != nil { keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)) - <-dios - os.WriteFile(keyFile, ekey, defaultFilePerms) - dios <- struct{}{} + fs.writeFileWithOptionalSync(keyFile, ekey, defaultFilePerms) return err } mb.bek.XORKeyStream(buf, buf) @@ -3288,9 +3281,7 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - <-dios - err = os.WriteFile(keyFile, encrypted, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms) if err != nil { return err } @@ -8582,9 +8573,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt if err != nil { return nil, err } - <-dios - err = os.WriteFile(o.ifn, state, defaultFilePerms) - dios <- struct{}{} + err = fs.writeFileWithOptionalSync(o.ifn, state, defaultFilePerms) if err != nil { if didCreate { os.RemoveAll(odir) @@ -9058,9 +9047,7 @@ func (o *consumerFileStore) writeState(buf []byte) error { o.mu.Unlock() // Lock not held here but we do limit number of outstanding calls that could block OS threads. - <-dios - err := os.WriteFile(ifn, buf, defaultFilePerms) - dios <- struct{}{} + err := o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms) o.mu.Lock() if err != nil { @@ -9099,9 +9086,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) { return err } - <-dios - err = os.WriteFile(keyFile, encrypted, defaultFilePerms) - dios <- struct{}{} + err = cfs.fs.writeFileWithOptionalSync(keyFile, encrypted, defaultFilePerms) if err != nil { return err } @@ -9122,9 +9107,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { b = cfs.aek.Seal(nonce, nonce, b, nil) } - <-dios - err = os.WriteFile(meta, b, defaultFilePerms) - dios <- struct{}{} + err = cfs.fs.writeFileWithOptionalSync(meta, b, defaultFilePerms) if err != nil { return err } @@ -9133,9 +9116,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { checksum := hex.EncodeToString(cfs.hh.Sum(nil)) sum := filepath.Join(cfs.odir, JetStreamMetaFileSum) - <-dios - err = os.WriteFile(sum, []byte(checksum), defaultFilePerms) - dios <- struct{}{} + err = cfs.fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms) if err != nil { return err } @@ -9430,9 +9411,7 @@ func (o *consumerFileStore) Stop() error { if len(buf) > 0 { o.waitOnFlusher() - <-dios - err = os.WriteFile(ifn, buf, defaultFilePerms) - dios <- struct{}{} + err = o.fs.writeFileWithOptionalSync(ifn, buf, defaultFilePerms) } return err } @@ -9666,3 +9645,26 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) { return output, reader.Close() } + +// writeFileWithOptionalSync is equivalent to os.WriteFile() but optionally +// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is +// handled automatically by this function, so don't wrap calls to it in dios. +func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error { + <-dios + defer func() { + dios <- struct{}{} + }() + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + if fs.fcfg.SyncAlways { + flags |= os.O_SYNC + } + f, err := os.OpenFile(name, flags, perm) + if err != nil { + return err + } + if _, err = f.Write(data); err != nil { + _ = f.Close() + return err + } + return f.Close() +} diff --git a/server/filestore_test.go b/server/filestore_test.go index a03d63e00ab..7e74368abe8 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7653,3 +7653,29 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) { require_Error(b, err, ErrStoreEOF) } } + +func Benchmark_FileStoreCreateConsumerStores(b *testing.B) { + for _, syncAlways := range []bool{true, false} { + b.Run(fmt.Sprintf("%v", syncAlways), func(b *testing.B) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: b.TempDir(), SyncAlways: syncAlways}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}) + require_NoError(b, err) + defer fs.Stop() + + oconfig := ConsumerConfig{ + DeliverSubject: "d", + FilterSubject: "foo", + AckPolicy: AckAll, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + oname := fmt.Sprintf("obs22_%d", i) + ofs, err := fs.ConsumerStore(oname, &oconfig) + require_NoError(b, err) + require_NoError(b, ofs.Stop()) + } + }) + } +} diff --git a/server/gateway.go b/server/gateway.go index 6c46eea889c..1f462175504 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2131,7 +2131,7 @@ func (c *client) processGatewayRSub(arg []byte) error { // for queue subscriptions. // -func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { +func (c *client) gatewayInterest(acc string, subj []byte) (bool, *SublistResult) { ei, accountInMap := c.gw.outsim.Load(acc) // If there is an entry for this account and ei is nil, // it means that the remote is not interested at all in @@ -2152,14 +2152,14 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { // but until e.ni is nil, use it to know if we // should suppress interest or not. if !c.gw.interestOnlyMode && e.ni != nil { - if _, inMap := e.ni[subj]; !inMap { + if _, inMap := e.ni[string(subj)]; !inMap { psi = true } } // If we are in modeInterestOnly (e.ni will be nil) // or if we have queue subs, we also need to check sl.Match. if e.ni == nil || e.qsubs > 0 { - r = e.sl.Match(subj) + r = e.sl.MatchBytes(subj) if len(r.psubs) > 0 { psi = true } @@ -2482,7 +2482,7 @@ func (g *srvGateway) shouldMapReplyForGatewaySend(acc *Account, reply []byte) bo } sl := sli.(*Sublist) if sl.Count() > 0 { - if r := sl.Match(string(reply)); len(r.psubs)+len(r.qsubs) > 0 { + if sl.HasInterest(string(reply)) { return true } } @@ -2568,7 +2568,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr } } else { // Plain sub interest and queue sub results for this account/subject - psi, qr := gwc.gatewayInterest(accName, string(subject)) + psi, qr := gwc.gatewayInterest(accName, subject) if !psi && qr == nil { continue } diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 92c88bf12c9..5f5e726c676 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2003,3 +2003,360 @@ func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { require_Equal(t, si.State.NumDeleted, 20) require_Equal(t, si.State.NumSubjects, 4) } + +type captureLeafClusterSpacesLogger struct { + DummyLogger + ch chan string + warnCh chan string +} + +func (l *captureLeafClusterSpacesLogger) Warnf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, `Server name has spaces and used as the cluster name, leaf remotes may not connect properly`) { + select { + case l.warnCh <- msg: + default: + } + } +} + +func (l *captureLeafClusterSpacesLogger) Errorf(format string, args ...any) { + msg := fmt.Sprintf(format, args...) + if strings.Contains(msg, `Leafnode Error 'cluster name cannot contain spaces or new lines'`) { + select { + case l.ch <- msg: + default: + } + } +} + +func TestJetStreamClusterAndNamesWithSpaces(t *testing.T) { + gwConf := ` + listen: 127.0.0.1:-1 + http: 127.0.0.1:-1 + server_name: 'SRV %s' + jetstream: { + store_dir: '%s', + } + cluster { + name: '%s' + listen: 127.0.0.1:%d + routes = [%s] + } + server_tags: ["test"] + system_account: sys + no_auth_user: js + + leafnodes { + host: "127.0.0.1" + port: -1 + } + + accounts { + sys { + users = [ { user: sys, pass: sys } ] } + js { + jetstream: enabled + users = [ { user: js, pass: js } ] + } + } + ` + c := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 1", "GW_1_", 3, 15022, false, + func(serverName, clusterName, storeDir, conf string) string { + conf += ` + gateway { + name: "S P A C E 1" + listen: 127.0.0.1:-1 + } + ` + return conf + }) + defer c.shutdown() + + c2 := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 2", "GW_2_", 3, 16022, false, + func(serverName, clusterName, storeDir, conf string) string { + conf += fmt.Sprintf(` + gateway { + name: "S P A C E 2" + listen: 127.0.0.1:-1 + gateways: [{ + name: "S P A C E 1" + url: "nats://127.0.0.1:%d" + }] + } + `, c.servers[0].opts.Gateway.Port) + return conf + }) + defer c2.shutdown() + + c3 := createJetStreamClusterAndModHook(t, gwConf, "S P A C E 3", "GW_3_", 3, 17022, false, + func(serverName, clusterName, storeDir, conf string) string { + conf += fmt.Sprintf(` + gateway { + name: "S P A C E 3" + listen: 127.0.0.1:-1 + gateways: [{ + name: "S P A C E 1" + url: "nats://127.0.0.1:%d" + }] + } + `, c.servers[0].opts.Gateway.Port) + return conf + }) + defer c3.shutdown() + + for _, s := range c2.servers { + waitForOutboundGateways(t, s, 2, 2*time.Second) + } + for _, s := range c3.servers { + waitForOutboundGateways(t, s, 2, 2*time.Second) + } + + // Leaf with spaces in name which becomes its cluster name as well. + leafConfA := ` + host: "127.0.0.1" + port: -1 + + server_name: "L E A F S P A C E" + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfA = fmt.Sprintf(leafConfA, c.servers[0].opts.LeafNode.Port) + sconfA := createConfFile(t, []byte(leafConfA)) + oA := LoadConfig(sconfA) + leafA, err := NewServer(oA) + require_NoError(t, err) + lA := &captureLeafClusterSpacesLogger{ch: make(chan string, 10), warnCh: make(chan string, 10)} + leafA.SetLogger(lA, false, false) + leafA.Start() + defer leafA.Shutdown() + + // Leaf with spaces in name but with a valid cluster name is able to connect. + leafConfB := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "L E A F 2" + cluster { name: "LEAF" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfB = fmt.Sprintf(leafConfB, c.servers[0].opts.LeafNode.Port) + sconfB := createConfFile(t, []byte(leafConfB)) + leafB, _ := RunServerWithConfig(sconfB) + lB := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafB.SetLogger(lB, false, false) + defer leafB.Shutdown() + + // Leaf with valid server name but cluster name with spaces. + leafConfC := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF3" + cluster { name: "L E A F 3" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfC = fmt.Sprintf(leafConfC, c.servers[0].opts.LeafNode.Port) + sconfC := createConfFile(t, []byte(leafConfC)) + leafC, _ := RunServerWithConfig(sconfC) + lC := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafC.SetLogger(lC, false, false) + defer leafC.Shutdown() + + // Leafs with valid server name but using protocol special characters in cluster name. + leafConfD := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF4" + cluster { name: "LEAF +4" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfD = fmt.Sprintf(leafConfD, c.servers[0].opts.LeafNode.Port) + sconfD := createConfFile(t, []byte(leafConfD)) + leafD, _ := RunServerWithConfig(sconfD) + lD := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafD.SetLogger(lD, false, false) + defer leafD.Shutdown() + + leafConfD2 := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF42" + cluster { name: "LEAF4\r2" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfD2 = fmt.Sprintf(leafConfD2, c.servers[0].opts.LeafNode.Port) + sconfD2 := createConfFile(t, []byte(leafConfD2)) + leafD2, _ := RunServerWithConfig(sconfD2) + lD2 := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafD2.SetLogger(lD2, false, false) + defer leafD2.Shutdown() + + leafConfD3 := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF43" + cluster { name: "LEAF4\t3" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfD3 = fmt.Sprintf(leafConfD3, c.servers[0].opts.LeafNode.Port) + sconfD3 := createConfFile(t, []byte(leafConfD3)) + leafD3, _ := RunServerWithConfig(sconfD3) + lD3 := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafD3.SetLogger(lD3, false, false) + defer leafD3.Shutdown() + + // Leaf with valid configuration should be able to connect to GW cluster with spaces in names. + leafConfE := ` + host: "127.0.0.1" + port: -1 + http: 127.0.0.1:-1 + + server_name: "LEAF5" + cluster { name: "LEAF5" } + + leafnodes { + host: "127.0.0.1" + port: -1 + advertise: "127.0.0.1" + remotes: [ { + url: "nats://127.0.0.1:%d" + } ] + } + ` + leafConfE = fmt.Sprintf(leafConfE, c.servers[0].opts.LeafNode.Port) + sconfE := createConfFile(t, []byte(leafConfE)) + leafE, _ := RunServerWithConfig(sconfE) + lE := &captureLeafClusterSpacesLogger{ch: make(chan string, 10)} + leafE.SetLogger(lE, false, false) + defer leafE.Shutdown() + + // Finally do a smoke test of connectivity among gateways and that JS is working + // when using clusters with spaces still. + nc1, js1 := jsClientConnect(t, c.servers[1]) + defer nc1.Close() + + _, err = js1.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + c.waitOnStreamLeader("js", "foo") + + sub1, err := nc1.SubscribeSync("foo") + require_NoError(t, err) + nc1.Flush() + + // Check that invalid configs got the errors. + select { + case <-lA.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lC.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lD.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lD2.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + select { + case <-lD3.ch: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for error") + } + + // Check that we got a warning about the server name being reused + // for the cluster name. + select { + case <-lA.warnCh: + case <-time.After(5 * time.Second): + t.Errorf("Timed out waiting for warning") + } + + // Check that valid configs were ok still. + select { + case <-lB.ch: + t.Errorf("Unexpected error from valid leafnode config") + case <-lE.ch: + t.Errorf("Unexpected error from valid leafnode config") + case <-time.After(2 * time.Second): + } + + nc2, js2 := jsClientConnect(t, c2.servers[1]) + defer nc2.Close() + nc2.Publish("foo", []byte("test")) + nc2.Flush() + time.Sleep(250 * time.Millisecond) + + msg, err := sub1.NextMsg(1 * time.Second) + require_NoError(t, err) + require_Equal(t, "test", string(msg.Data)) + sinfo, err := js2.StreamInfo("foo") + require_NoError(t, err) + require_Equal(t, sinfo.State.Msgs, 1) +} diff --git a/server/leafnode.go b/server/leafnode.go index 3c20cbdf437..3e66909c9c5 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -34,6 +34,7 @@ import ( "sync" "sync/atomic" "time" + "unicode" "github.com/klauspost/compress/s2" "github.com/nats-io/jwt/v2" @@ -1764,6 +1765,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return err } + // Reject a cluster that contains spaces or line breaks. + if proto.Cluster != _EMPTY_ && strings.ContainsFunc(proto.Cluster, unicode.IsSpace) { + c.sendErrAndErr(ErrClusterNameHasSpaces.Error()) + c.closeConnection(ProtocolViolation) + return ErrClusterNameHasSpaces + } + // Check for cluster name collisions. if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn { c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error()) diff --git a/server/monitor.go b/server/monitor.go index e3d020e0f46..8396555d1b1 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2750,8 +2750,9 @@ type HealthzOptions struct { // ProfilezOptions are options passed to Profilez type ProfilezOptions struct { - Name string `json:"name"` - Debug int `json:"debug"` + Name string `json:"name"` + Debug int `json:"debug"` + Duration time.Duration `json:"duration,omitempty"` } // StreamDetail shows information about the stream state and its consumers. @@ -3735,21 +3736,36 @@ type ProfilezStatus struct { } func (s *Server) profilez(opts *ProfilezOptions) *ProfilezStatus { - if opts.Name == _EMPTY_ { + var buffer bytes.Buffer + switch opts.Name { + case _EMPTY_: return &ProfilezStatus{ Error: "Profile name not specified", } - } - profile := pprof.Lookup(opts.Name) - if profile == nil { - return &ProfilezStatus{ - Error: fmt.Sprintf("Profile %q not found", opts.Name), + case "cpu": + if opts.Duration <= 0 || opts.Duration > 15*time.Second { + return &ProfilezStatus{ + Error: fmt.Sprintf("Duration %s should be between 0s and 15s", opts.Duration), + } } - } - var buffer bytes.Buffer - if err := profile.WriteTo(&buffer, opts.Debug); err != nil { - return &ProfilezStatus{ - Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err), + if err := pprof.StartCPUProfile(&buffer); err != nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Failed to start CPU profile: %s", err), + } + } + time.Sleep(opts.Duration) + pprof.StopCPUProfile() + default: + profile := pprof.Lookup(opts.Name) + if profile == nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Profile %q not found", opts.Name), + } + } + if err := profile.WriteTo(&buffer, opts.Debug); err != nil { + return &ProfilezStatus{ + Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err), + } } } return &ProfilezStatus{ diff --git a/server/norace_test.go b/server/norace_test.go index d91caec773b..e4d118261d4 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10016,6 +10016,7 @@ func TestNoRaceConnectionObjectReleased(t *testing.T) { // Start an independent MQTT server to check MQTT client connection. mo := testMQTTDefaultOptions() + mo.ServerName = "MQTTServer" sm := testMQTTRunServer(t, mo) defer testMQTTShutdownServer(sm) @@ -10028,6 +10029,7 @@ func TestNoRaceConnectionObjectReleased(t *testing.T) { cid, err := nc.GetClientID() require_NoError(t, err) natsSubSync(t, nc, "foo") + natsFlush(t, nc) ncWS := natsConnect(t, fmt.Sprintf("ws://a:pwd@127.0.0.1:%d", oa.Websocket.Port)) defer ncWS.Close() diff --git a/server/routes_test.go b/server/routes_test.go index acb626cd469..95338fabdca 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -4160,6 +4160,110 @@ func TestRouteNoLeakOnSlowConsumer(t *testing.T) { } } +func TestRouteNoAppSubLeakOnSlowConsumer(t *testing.T) { + o1 := DefaultOptions() + o1.MaxPending = 1024 + o1.MaxPayload = int32(o1.MaxPending) + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.MaxPending = 1024 + o2.MaxPayload = int32(o2.MaxPending) + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + checkSub := func(expected bool) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + subsz, err := s1.Subsz(&SubszOptions{Subscriptions: true, Account: globalAccountName}) + require_NoError(t, err) + for _, sub := range subsz.Subs { + if sub.Subject == "foo" { + if expected { + return nil + } + return fmt.Errorf("Subscription should not have been found: %+v", sub) + } + } + if expected { + return fmt.Errorf("Subscription on `foo` not found") + } + return nil + }) + } + + checkRoutedSub := func(expected bool) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + routez, err := s2.Routez(&RoutezOptions{Subscriptions: true}) + require_NoError(t, err) + for _, route := range routez.Routes { + if route.Account != _EMPTY_ { + continue + } + if len(route.Subs) == 1 && route.Subs[0] == "foo" { + if expected { + return nil + } + return fmt.Errorf("Subscription should not have been found: %+v", route.Subs) + } + } + if expected { + return fmt.Errorf("Did not find `foo` subscription") + } + return nil + }) + } + + checkClosed := func(cid uint64) { + t.Helper() + checkFor(t, 2*time.Second, 50*time.Millisecond, func() error { + connz, err := s1.Connz(&ConnzOptions{State: ConnClosed, CID: cid, Subscriptions: true}) + if err != nil { + return err + } + require_Len(t, len(connz.Conns), 1) + conn := connz.Conns[0] + require_Equal(t, conn.Reason, SlowConsumerPendingBytes.String()) + subs := conn.Subs + require_Len(t, len(subs), 1) + require_Equal[string](t, subs[0], "foo") + return nil + }) + } + + for i := 0; i < 5; i++ { + nc := natsConnect(t, s1.ClientURL()) + defer nc.Close() + + natsSubSync(t, nc, "foo") + natsFlush(t, nc) + + checkSub(true) + checkRoutedSub(true) + + cid, err := nc.GetClientID() + require_NoError(t, err) + c := s1.getClient(cid) + payload := make([]byte, 2048) + c.mu.Lock() + c.queueOutbound([]byte(fmt.Sprintf("MSG foo 1 2048\r\n%s\r\n", payload))) + closed := c.isClosed() + c.mu.Unlock() + + require_True(t, closed) + checkSub(false) + checkRoutedSub(false) + checkClosed(cid) + + nc.Close() + } +} + func TestRouteNoLeakOnAuthTimeout(t *testing.T) { opts := DefaultOptions() opts.Cluster.Username = "foo" diff --git a/server/server.go b/server/server.go index cc3130ebe5a..58be465a5c9 100644 --- a/server/server.go +++ b/server/server.go @@ -27,8 +27,10 @@ import ( "math/rand" "net" "net/http" + "net/url" "regexp" "runtime/pprof" + "unicode" // Allow dynamic profiling. _ "net/http/pprof" @@ -953,11 +955,13 @@ func (s *Server) serverName() string { func (s *Server) ClientURL() string { // FIXME(dlc) - should we add in user and pass if defined single? opts := s.getOpts() - scheme := "nats://" + var u url.URL + u.Scheme = "nats" if opts.TLSConfig != nil { - scheme = "tls://" + u.Scheme = "tls" } - return fmt.Sprintf("%s%s:%d", scheme, opts.Host, opts.Port) + u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port)) + return u.String() } func validateCluster(o *Options) error { @@ -2348,6 +2352,9 @@ func (s *Server) Start() { // Solicit remote servers for leaf node connections. if len(opts.LeafNode.Remotes) > 0 { s.solicitLeafNodeRemotes(opts.LeafNode.Remotes) + if opts.Cluster.Name == opts.ServerName && strings.ContainsFunc(opts.Cluster.Name, unicode.IsSpace) { + s.Warnf("Server name has spaces and used as the cluster name, leaf remotes may not connect properly") + } } // TODO (ik): I wanted to refactor this by starting the client @@ -3294,7 +3301,7 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { // This will save off a closed client in a ring buffer such that // /connz can inspect. Useful for debugging, etc. -func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { +func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) { now := time.Now() s.accountDisconnectEvent(c, now, reason.String()) @@ -3303,17 +3310,18 @@ func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { cc := &closedClient{} cc.fill(c, nc, now, false) + // Note that cc.fill is using len(c.subs), which may have been set to nil by now, + // so replace cc.NumSubs with len(subs). + cc.NumSubs = uint32(len(subs)) cc.Stop = &now cc.Reason = reason.String() // Do subs, do not place by default in main ConnInfo - if len(c.subs) > 0 { - cc.subs = make([]SubDetail, 0, len(c.subs)) - for _, sub := range c.subs { + if len(subs) > 0 { + cc.subs = make([]SubDetail, 0, len(subs)) + for _, sub := range subs { cc.subs = append(cc.subs, newSubDetail(sub)) } - // Now set this to nil to allow connection to be released. - c.subs = nil } // Hold user as well. cc.user = c.getRawAuthUser() diff --git a/server/server_test.go b/server/server_test.go index 425932cf6e4..3eb91eea8f8 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -2136,3 +2136,18 @@ func TestServerConfigLastLineComments(t *testing.T) { require_NoError(t, err) defer nc.Close() } + +func TestServerClientURL(t *testing.T) { + for host, expected := range map[string]string{ + "host.com": "nats://host.com:12345", + "1.2.3.4": "nats://1.2.3.4:12345", + "2000::1": "nats://[2000::1]:12345", + } { + o := DefaultOptions() + o.Host = host + o.Port = 12345 + s, err := NewServer(o) + require_NoError(t, err) + require_Equal(t, s.ClientURL(), expected) + } +} diff --git a/server/store.go b/server/store.go index 235452a30ff..abbc06f8998 100644 --- a/server/store.go +++ b/server/store.go @@ -765,3 +765,11 @@ func stringToBytes(s string) []byte { b := unsafe.Slice(p, len(s)) return b } + +// Forces a copy of a string, for use in the case that you might have been passed a value when bytesToString was used, +// but now you need a separate copy of it to store for longer-term use. +func copyString(s string) string { + b := make([]byte, len(s)) + copy(b, s) + return bytesToString(b) +} diff --git a/server/stree/stree.go b/server/stree/stree.go index d0835bf5d1b..d9167d94f7c 100644 --- a/server/stree/stree.go +++ b/server/stree/stree.go @@ -15,7 +15,7 @@ package stree import ( "bytes" - "sort" + "slices" ) // SubjectTree is an adaptive radix trie (ART) for storing subject information on literal subjects. @@ -382,7 +382,7 @@ func (t *SubjectTree[T]) iter(n node, pre []byte, cb func(subject []byte, val *T } } // Now sort. - sort.SliceStable(nodes, func(i, j int) bool { return bytes.Compare(nodes[i].path(), nodes[j].path()) < 0 }) + slices.SortStableFunc(nodes, func(a, b node) int { return bytes.Compare(a.path(), b.path()) }) // Now walk the nodes in order and call into next iter. for i := range nodes { if !t.iter(nodes[i], pre, cb) { diff --git a/server/stree/stree_test.go b/server/stree/stree_test.go index 7177cd6d94b..bbe26199198 100644 --- a/server/stree/stree_test.go +++ b/server/stree/stree_test.go @@ -669,6 +669,26 @@ func TestSubjectTreeMatchAllPerf(t *testing.T) { } } +func TestSubjectTreeIterPerf(t *testing.T) { + if !*runResults { + t.Skip() + } + st := NewSubjectTree[int]() + + for i := 0; i < 1_000_000; i++ { + subj := fmt.Sprintf("subj.%d.%d", rand.Intn(100)+1, i) + st.Insert(b(subj), 22) + } + + start := time.Now() + count := 0 + st.Iter(func(_ []byte, _ *int) bool { + count++ + return true + }) + t.Logf("Iter took %s and matched %d entries", time.Since(start), count) +} + func TestSubjectTreeNode48(t *testing.T) { var a, b, c leaf[int] var n node48 diff --git a/server/sublist.go b/server/sublist.go index 0000ad9f9a9..67eb88ae079 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -527,7 +527,13 @@ var emptyResult = &SublistResult{} // Match will match all entries to the literal subject. // It will return a set of results for both normal and queue subscribers. func (s *Sublist) Match(subject string) *SublistResult { - return s.match(subject, true) + return s.match(subject, true, false) +} + +// MatchBytes will match all entries to the literal subject. +// It will return a set of results for both normal and queue subscribers. +func (s *Sublist) MatchBytes(subject []byte) *SublistResult { + return s.match(bytesToString(subject), true, true) } // HasInterest will return whether or not there is any interest in the subject. @@ -537,10 +543,10 @@ func (s *Sublist) HasInterest(subject string) bool { } func (s *Sublist) matchNoLock(subject string) *SublistResult { - return s.match(subject, false) + return s.match(subject, false, false) } -func (s *Sublist) match(subject string, doLock bool) *SublistResult { +func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *SublistResult { atomic.AddUint64(&s.matches, 1) // Check cache first. @@ -595,6 +601,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { result = emptyResult } if cacheEnabled { + if doCopyOnCache { + subject = copyString(subject) + } s.cache[subject] = result n = len(s.cache) } diff --git a/test/accounts_cycles_test.go b/test/accounts_cycles_test.go index 0b3bac30ed7..9a192bb446b 100644 --- a/test/accounts_cycles_test.go +++ b/test/accounts_cycles_test.go @@ -524,6 +524,25 @@ func TestAccountCycleWithRenaming(t *testing.T) { } } +// https://github.com/nats-io/nats-server/issues/5752 +func TestAccountCycleFalsePositiveSubjectMapping(t *testing.T) { + conf := createConfFile(t, []byte(` + accounts { + A { + exports [ { service: "a.*" } ] + imports [ { service { subject: "a.*", account: B }, to: "b.*" } ] + } + B { + exports [ { service: "a.*" } ] + imports [ { service { subject: "a.foo", account: A }, to: "c.foo" } ] + } + } + `)) + if _, err := server.ProcessConfigFile(conf); err != nil { + t.Fatalf("Expected no errors on cycle service import, got error") + } +} + func clientConnectToServer(t *testing.T, s *server.Server) *nats.Conn { t.Helper() nc, err := nats.Connect(s.ClientURL(),