Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
92f5f6a
Fix go directive version in go.mod
mauri870 Jul 9, 2024
f88e84f
Drop Go 1.20 from CI
wallyqs Jul 23, 2024
29d3db5
Ensure `ClientURL()` properly formats IPv6 literal addresses
neilalexander Jul 31, 2024
ec969e8
Use `O_SYNC` for filestore meta files when `SyncAlways` is enabled
neilalexander Jul 31, 2024
b09f243
Also use `O_SYNC` for writing consumer state as well as newly generat…
neilalexander Jul 31, 2024
c177d5e
stree: Make `Iter` faster by using generic sorting function
neilalexander Aug 1, 2024
cabb556
Use `HasInterest` in place of `Match` where `SublistResult` not needed
neilalexander Aug 1, 2024
4d48123
Optimise sublist matching for gateways, client replies
neilalexander Aug 1, 2024
dd1ca9d
Add support for collecting CPU profiles via `profilez`
neilalexander Aug 2, 2024
74e0036
Optimise `getAccAndResultFromCache` by reducing allocations
neilalexander Aug 2, 2024
bb9e05c
Depenedency updates for x/sys and x/time
derekcollison Aug 5, 2024
f12eeaf
When checking for import cycles, only need to check 'from', not the '…
derekcollison Aug 5, 2024
c713e93
[FIXED] Subscription interest could remain after slow consumer error
kozlovic Aug 5, 2024
1e3fc7e
Pass c.subs as a reference to saveClosedConnection
kozlovic Aug 5, 2024
0c6517e
Reject leafnode cluster names with spaces (#5732)
wallyqs Aug 6, 2024
55e54b8
Limit vector count in client outbound flushes
neilalexander Aug 5, 2024
c9b0f22
Add `TestClientFlushOutboundNoSlowConsumer` unit test
neilalexander Aug 6, 2024
8c04528
Update Go versions
wallyqs Aug 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.22.5"
- "1.21.12"
- "1.22.6"
- "1.21.13"

go_import_path: github.com/nats-io/nats-server

Expand Down Expand Up @@ -47,7 +47,7 @@ jobs:
- name: "Run all tests from all other packages"
env: TEST_SUITE=non_srv_pkg_tests
- name: "Compile with older Go release"
go: "1.20"
go: "1.21.x"
env: TEST_SUITE=build_only

script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/nats-io/nats-server/v2

go 1.21
go 1.21.0

require (
github.com/klauspost/compress v1.17.9
Expand All @@ -11,6 +11,6 @@ require (
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.25.0
golang.org/x/sys v0.22.0
golang.org/x/time v0.5.0
golang.org/x/sys v0.23.0
golang.org/x/time v0.6.0
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
15 changes: 9 additions & 6 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,11 +1478,13 @@ func (a *Account) addServiceImportWithClaim(destination *Account, from, to strin
}

// Check if this introduces a cycle before proceeding.
if err := a.serviceImportFormsCycle(destination, from); err != nil {
return err
// From will be the mapped subject.
// If the 'to' has a wildcard make sure we pre-transform the 'from' before we check for cycles, e.g. '$1'
fromT := from
if subjectHasWildcard(to) {
fromT, _ = transformUntokenize(from)
}

if err := a.serviceImportFormsCycle(destination, to); err != nil {
if err := a.serviceImportFormsCycle(destination, fromT); err != nil {
return err
}

Expand Down Expand Up @@ -1807,7 +1809,7 @@ func (a *Account) _checkForReverseEntry(reply string, si *serviceImport, checkIn
// Note that if we are here reply has to be a literal subject.
if checkInterest {
// If interest still exists we can not clean these up yet.
if rr := a.sl.Match(reply); len(rr.psubs)+len(rr.qsubs) > 0 {
if a.sl.HasInterest(reply) {
a.mu.RUnlock()
return
}
Expand Down Expand Up @@ -1925,6 +1927,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
tr *subjectTransform
err error
)

if subjectHasWildcard(to) {
// If to and from match, then we use the published subject.
if to == from {
Expand Down Expand Up @@ -3774,7 +3777,7 @@ func fetchAccount(res AccountResolver, name string) (string, error) {
if !nkeys.IsValidPublicAccountKey(name) {
return _EMPTY_, fmt.Errorf("will only fetch valid account keys")
}
return res.Fetch(name)
return res.Fetch(copyString(name))
}

// AccountResolver interface. This is to fetch Account JWTs by public nkeys
Expand Down
4 changes: 2 additions & 2 deletions server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,12 +978,12 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
deniedSub := []string{}
for _, sub := range denyAllJs {
if c.perms.pub.deny != nil {
if r := c.perms.pub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 {
if c.perms.pub.deny.HasInterest(sub) {
deniedPub = append(deniedPub, sub)
}
}
if c.perms.sub.deny != nil {
if r := c.perms.sub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 {
if c.perms.sub.deny.HasInterest(sub) {
deniedSub = append(deniedSub, sub)
}
}
Expand Down
54 changes: 42 additions & 12 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ type outbound struct {
cw *s2.Writer
}

const nbMaxVectorSize = 1024 // == IOV_MAX on Linux/Darwin and most other Unices (except Solaris/AIX)

const nbPoolSizeSmall = 512 // Underlying array size of small buffer
const nbPoolSizeMedium = 4096 // Underlying array size of medium buffer
const nbPoolSizeLarge = 65536 // Underlying array size of large buffer
Expand Down Expand Up @@ -1611,7 +1613,7 @@ func (c *client) flushOutbound() bool {
// referenced in c.out.nb (which can be modified in queueOutboud() while
// the lock is released).
c.out.wnb = append(c.out.wnb, collapsed...)
var _orig [1024][]byte
var _orig [nbMaxVectorSize][]byte
orig := append(_orig[:0], c.out.wnb...)

// Since WriteTo is lopping things off the beginning, we need to remember
Expand All @@ -1622,13 +1624,31 @@ func (c *client) flushOutbound() bool {
// flush here
start := time.Now()

// FIXME(dlc) - writev will do multiple IOs past 1024 on
// most platforms, need to account for that with deadline?
nc.SetWriteDeadline(start.Add(wdl))

// Actual write to the socket.
n, err := c.out.wnb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})
var n int64 // Total bytes written
var wn int64 // Bytes written per loop
var err error // Error from last write, if any
for len(c.out.wnb) > 0 {
// Limit the number of vectors to no more than nbMaxVectorSize,
// which if 1024, will mean a maximum of 64MB in one go.
wnb := c.out.wnb
if len(wnb) > nbMaxVectorSize {
wnb = wnb[:nbMaxVectorSize]
}
consumed := len(wnb)

// Actual write to the socket.
nc.SetWriteDeadline(start.Add(wdl))
wn, err = wnb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})

// Update accounting, move wnb slice onwards if needed, or stop
// if a write error was reported that wasn't a short write.
n += wn
c.out.wnb = c.out.wnb[consumed-len(wnb):]
if err != nil && err != io.ErrShortWrite {
break
}
}

lft := time.Since(start)

Expand Down Expand Up @@ -1810,7 +1830,9 @@ func (c *client) markConnAsClosed(reason ClosedState) {
if nc := c.nc; nc != nil && c.srv != nil {
// TODO: May want to send events to single go routine instead
// of creating a new go routine for each save.
go c.srv.saveClosedClient(c, nc, reason)
// Pass the c.subs as a reference. It may be set to nil in
// closeConnection.
go c.srv.saveClosedClient(c, nc, c.subs, reason)
}
}
// If writeLoop exists, let it do the final flush, close and teardown.
Expand Down Expand Up @@ -3964,7 +3986,7 @@ func (c *client) subForReply(reply []byte) *subscription {
func (c *client) handleGWReplyMap(msg []byte) bool {
// Check for leaf nodes
if c.srv.gwLeafSubs.Count() > 0 {
if r := c.srv.gwLeafSubs.Match(string(c.pa.subject)); len(r.psubs) > 0 {
if r := c.srv.gwLeafSubs.MatchBytes(c.pa.subject); len(r.psubs) > 0 {
c.processMsgResults(c.acc, r, msg, c.pa.deliver, c.pa.subject, c.pa.reply, pmrNoFlag)
}
}
Expand Down Expand Up @@ -5284,6 +5306,14 @@ func (c *client) closeConnection(reason ClosedState) {
}
}

// Now that we are done with subscriptions, clear the field so that the
// connection can be released and gc'ed.
if kind == CLIENT || kind == LEAF {
c.mu.Lock()
c.subs = nil
c.mu.Unlock()
}

// Don't reconnect connections that have been marked with
// the no reconnect flag.
if noReconnect {
Expand Down Expand Up @@ -5441,14 +5471,14 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
}
} else {
// Match correct account and sublist.
if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil {
if acc, _ = c.srv.LookupAccount(bytesToString(c.pa.account)); acc == nil {
return nil, nil
}
}
sl := acc.sl

// Match against the account sublist.
r = sl.Match(string(c.pa.subject))
r = sl.MatchBytes(c.pa.subject)

// Check if we need to prune.
if len(c.in.pacache) >= maxPerAccountCacheSize {
Expand Down
70 changes: 69 additions & 1 deletion server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2137,7 +2137,7 @@ type testConnWritePartial struct {
func (c *testConnWritePartial) Write(p []byte) (int, error) {
n := len(p)
if c.partial {
n = 15
n = n/2 + 1
}
return c.buf.Write(p[:n])
}
Expand Down Expand Up @@ -2962,3 +2962,71 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) {
t.Fatalf("Expected headers to be stripped, got %q", hdr)
}
}

func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {
opts := DefaultOptions()
opts.MaxPending = 1024 * 1024 * 140 // 140MB
opts.MaxPayload = 1024 * 1024 * 16 // 16MB
opts.WriteDeadline = time.Second * 30
s := RunServer(opts)
defer s.Shutdown()

nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer nc.Close()

proxy := newNetProxy(0, 1024*1024*8, 1024*1024*8, s.ClientURL()) // 8MB/s
defer proxy.stop()

wait := make(chan error)

nca, err := nats.Connect(proxy.clientURL())
require_NoError(t, err)
nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) {
wait <- err
close(wait)
})

ncb, err := nats.Connect(s.ClientURL())
require_NoError(t, err)

_, err = nca.Subscribe("test", func(msg *nats.Msg) {
wait <- nil
})
require_NoError(t, err)

// Publish 128MB of data onto the test subject. This will
// mean that the outbound queue for nca has more than 64MB,
// which is the max we will send into a single writev call.
payload := make([]byte, 1024*1024*16) // 16MB
for i := 0; i < 8; i++ {
require_NoError(t, ncb.Publish("test", payload))
}

// Get the client ID for nca.
cid, err := nca.GetClientID()
require_NoError(t, err)

// Check that the client queue has more than 64MB queued
// up in it.
s.mu.RLock()
ca := s.clients[cid]
s.mu.RUnlock()
ca.mu.Lock()
pba := ca.out.pb
ca.mu.Unlock()
require_True(t, pba > 1024*1024*64)

// Wait for our messages to be delivered. This will take
// a few seconds as the client is limited to 8MB/s, so it
// can't deliver messages to us as quickly as the other
// client can publish them.
var msgs int
for err := range wait {
require_NoError(t, err)
msgs++
if msgs == 8 {
break
}
}
require_Equal(t, msgs, 8)
}
23 changes: 10 additions & 13 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Check if we are running only 1 replica and that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
if config.replicas(&mset.cfg) == 1 {
r := o.acc.sl.Match(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(interest) {
// Let the interest come to us eventually, but setup delete timer.
o.updateDeliveryInterest(false)
}
Expand Down Expand Up @@ -1508,7 +1508,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool {
gw.RLock()
defer gw.RUnlock()
for _, gwc := range gw.outo {
psi, qr := gwc.gatewayInterest(account, subject)
psi, qr := gwc.gatewayInterest(account, stringToBytes(subject))
if psi || qr != nil {
return true
}
Expand Down Expand Up @@ -1802,8 +1802,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
if ncfg.DeliverSubject == _EMPTY_ {
return errors.New("can not update push consumer to pull based")
}
rr := acc.sl.Match(cfg.DeliverSubject)
if len(rr.psubs)+len(rr.qsubs) != 0 {
if acc.sl.HasInterest(cfg.DeliverSubject) {
return NewJSConsumerNameExistError()
}
}
Expand Down Expand Up @@ -3221,8 +3220,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}

if wr.expires.IsZero() || time.Now().Before(wr.expires) {
rr := wr.acc.sl.Match(wr.interest)
if len(rr.psubs)+len(rr.qsubs) > 0 {
if wr.acc.sl.HasInterest(wr.interest) {
return o.waiting.pop()
} else if time.Since(wr.received) < defaultGatewayRecentSubExpiration && (o.srv.leafNodeEnabled || o.srv.gateway.enabled) {
return o.waiting.pop()
Expand Down Expand Up @@ -3649,8 +3647,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
continue
}
// Now check interest.
rr := wr.acc.sl.Match(wr.interest)
interest := len(rr.psubs)+len(rr.qsubs) > 0
interest := wr.acc.sl.HasInterest(wr.interest)
if !interest && (s.leafNodeEnabled || s.gateway.enabled) {
// If we are here check on gateways and leaf nodes (as they can mask gateways on the other end).
// If we have interest or the request is too young break and do not expire.
Expand Down Expand Up @@ -4954,9 +4951,9 @@ func (o *consumer) isActive() bool {
// hasNoLocalInterest return true if we have no local interest.
func (o *consumer) hasNoLocalInterest() bool {
o.mu.RLock()
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
o.mu.RUnlock()
return len(rr.psubs)+len(rr.qsubs) == 0
return !interest
}

// This is when the underlying stream has been purged.
Expand Down Expand Up @@ -5320,13 +5317,13 @@ func (o *consumer) switchToEphemeral() {
o.mu.Lock()
o.cfg.Durable = _EMPTY_
store, ok := o.store.(*consumerFileStore)
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
// Setup dthresh.
o.updateInactiveThreshold(&o.cfg)
o.mu.Unlock()

// Update interest
o.updateDeliveryInterest(len(rr.psubs)+len(rr.qsubs) > 0)
o.updateDeliveryInterest(interest)
// Write out new config
if ok {
store.updateConfig(o.cfg)
Expand Down
3 changes: 3 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ var (
// ErrClusterNameRemoteConflict signals that a remote server has a different cluster name.
ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts")

// ErrClusterNameHasSpaces signals that the cluster name contains spaces, which is not allowed.
ErrClusterNameHasSpaces = errors.New("cluster name cannot contain spaces or new lines")

// ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules.
ErrMalformedSubject = errors.New("malformed subject")

Expand Down
Loading