Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1661e30
Use dios gate on readfull
wallyqs Feb 2, 2024
e6f28e5
[FIX] [authcallout] when using authcallout with operator mode, the au…
aricart Feb 5, 2024
e2fc859
test: update certs for TestTLSClientCertificateCNBasedAuth
wallyqs Feb 3, 2024
0447a6f
test: update certs for TestTLSRoutesCertificateCNBasedAuth
wallyqs Feb 3, 2024
2fb30ed
[ADDED] MQTT: test/bench using external client
levb Feb 4, 2024
71c1ca0
Add in a JetStream domain elected advisory for when we have a new met…
derekcollison Feb 5, 2024
a636f64
Add in cluster name to advisory
derekcollison Feb 7, 2024
244f22d
FIXED: MQTT retained message consumer creation (#5048)
levb Feb 7, 2024
10e0a01
[FIXED] KeyValue not found after server restarts (#5054)
derekcollison Feb 9, 2024
584312b
Add test to check FS State Messages drifting after applying limits
wallyqs Feb 9, 2024
94fe77d
Make jsa limit checks consistent and check both pre proposal for clus…
derekcollison Feb 10, 2024
23b54aa
Add workqueue test using discard new policy
wallyqs Feb 10, 2024
88de9c3
When a stream is replicated and interest policy and has max msgs or m…
derekcollison Feb 11, 2024
649c9a1
Update dependencies.
derekcollison Feb 12, 2024
4cddcac
Support building for linux/ppc64le arch
bruth Feb 11, 2024
c29cb5b
When the server is run with GOMEMLIMIT, make sure to honor that for d…
derekcollison Feb 13, 2024
e0ebe8e
Fix cfg access data race
derekcollison Feb 13, 2024
a635526
Fix c.acc access data race
derekcollison Feb 13, 2024
a2d3055
Make sure to properly handle filtered purge with a consumer with a wi…
derekcollison Feb 13, 2024
fcf39ab
Bump Go version to v1.21.7
wallyqs Feb 13, 2024
9157fe1
Fixed flapping TestMQTTDecodeRetainedMessage
levb Feb 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions .github/workflows/MQTT_test.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
name: MQTT Compliance
name: MQTTEx
on: [push, pull_request]

permissions:
pull-requests: write # to comment on PRs
contents: write # to comment on commits (to upload artifacts?)

jobs:
test:
strategy:
matrix:
go: ["1.21"]
env:
GOPATH: /home/runner/work/nats-server
GO111MODULE: "on"

runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand All @@ -20,18 +20,48 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: ${{matrix.go}}
go-version-file: src/github.com/nats-io/nats-server/go.mod
cache-dependency-path: src/github.com/nats-io/nats-server/go.sum

- name: Install deps
shell: bash --noprofile --norc -x -eo pipefail {0}
- name: Set up testing tools and environment
shell: bash --noprofile --norc -eo pipefail {0}
id: setup
run: |
wget https://github.com/hivemq/mqtt-cli/releases/download/v4.20.0/mqtt-cli-4.20.0.deb
sudo apt install ./mqtt-cli-4.20.0.deb
go install github.com/ConnectEverything/mqtt-test@latest

# - name: Download benchmark result for ${{ github.base_ref || github.ref_name }}
# uses: dawidd6/action-download-artifact@v2
# continue-on-error: true
# with:
# path: src/github.com/nats-io/nats-server/bench
# name: bench-output-${{ runner.os }}
# branch: ${{ github.base_ref || github.ref }}

- name: Run tests
shell: bash --noprofile --norc -x -eo pipefail {0}
- name: Run tests and benchmarks
shell: bash --noprofile --norc -eo pipefail {0}
run: |
set -e
cd src/github.com/nats-io/nats-server/server
go test -v -vet=off --run=TestMQTTCLICompliance
set +e
cd src/github.com/nats-io/nats-server
go test -v --run='MQTTEx' ./server
# go test --run='-' --count=10 --bench 'MQTT_' ./server | tee output.txt
# go test --run='-' --count=10 --bench 'MQTTEx' --timeout=20m --benchtime=100x ./server | tee -a output.txt
go test --run='-' --count=3 --bench 'MQTTEx' --benchtime=100x ./server

# - name: Compare benchmarks
# uses: benchmark-action/github-action-benchmark@v1
# with:
# tool: go
# output-file-path: src/github.com/nats-io/nats-server/output.txt
# github-token: ${{ secrets.GITHUB_TOKEN }}
# alert-threshold: 140%
# comment-on-alert: true
# # fail-on-alert: true
# external-data-json-path: src/github.com/nats-io/nats-server/bench/benchmark-data.json

# - name: Store benchmark result for ${{ github.ref_name }}
# if: always()
# uses: actions/upload-artifact@v3
# with:
# path: src/github.com/nats-io/nats-server/bench
# name: bench-output-${{ runner.os }}
1 change: 1 addition & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ builds:
- 386
- mips64le
- s390x
- ppc64le
goarm:
- 6
- 7
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.21.6"
- "1.21.7"
- "1.20.13"

go_import_path: github.com/nats-io/nats-server
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ module github.com/nats-io/nats-server/v2
go 1.20

require (
github.com/klauspost/compress v1.17.5
github.com/klauspost/compress v1.17.6
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.5.3
github.com/nats-io/nats.go v1.32.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.18.0
golang.org/x/sys v0.16.0
golang.org/x/crypto v0.19.0
golang.org/x/sys v0.17.0
golang.org/x/time v0.5.0
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E=
github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
Expand All @@ -18,9 +20,13 @@ go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
6 changes: 6 additions & 0 deletions server/auth_callout.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,30 +241,35 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize

arc, err := decodeResponse(rc, rmsg, racc)
if err != nil {
c.authViolation()
respCh <- titleCase(err.Error())
return
}
vr := jwt.CreateValidationResults()
arc.Validate(vr)
if len(vr.Issues) > 0 {
c.authViolation()
respCh <- fmt.Sprintf("Error validating user JWT: %v", vr.Issues[0])
return
}

// Make sure that the user is what we requested.
if arc.Subject != pub {
c.authViolation()
respCh <- fmt.Sprintf("Expected authorized user of %q but got %q on account %q", pub, arc.Subject, racc.Name)
return
}

expiration, allowedConnTypes, err := getExpirationAndAllowedConnections(arc, racc.Name)
if err != nil {
c.authViolation()
respCh <- titleCase(err.Error())
return
}

targetAcc, err := assignAccountAndPermissions(arc, racc.Name)
if err != nil {
c.authViolation()
respCh <- titleCase(err.Error())
return
}
Expand All @@ -280,6 +285,7 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
// Build internal user and bind to the targeted account.
nkuser := buildInternalNkeyUser(arc, allowedConnTypes, targetAcc)
if err := c.RegisterNkeyUser(nkuser); err != nil {
c.authViolation()
respCh <- fmt.Sprintf("Could not register auth callout user: %v", err)
return
}
Expand Down
116 changes: 115 additions & 1 deletion server/auth_callout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,6 @@ func testAuthCalloutScopedUser(t *testing.T, allowAnyAccount bool) {
// Send the signing key token. This should switch us to the test account, but the user
// is signed with the account signing key
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(scopedToken))
require_NoError(t, err)

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1713,3 +1712,118 @@ func TestAuthCalloutWSClientTLSCerts(t *testing.T) {
require_Equal(t, userInfo.UserID, "dlc")
require_Equal(t, userInfo.Account, "FOO")
}

func testConfClientClose(t *testing.T, respondNil bool) {
conf := `
listen: "127.0.0.1:-1"
server_name: ZZ
accounts {
AUTH { users [ {user: "auth", password: "pwd"} ] }
}
authorization {
timeout: 1s
auth_callout {
# Needs to be a public account nkey, will work for both server config and operator mode.
issuer: "ABJHLOVMPA4CI6R5KLNGOB4GSLNIY7IOUPAJC4YFNDLQVIOBYQGUWVLA"
account: AUTH
auth_users: [ auth ]
}
}
`
handler := func(m *nats.Msg) {
user, si, _, _, _ := decodeAuthRequest(t, m.Data)
if respondNil {
m.Respond(nil)
} else {
m.Respond(serviceResponse(t, user, si.ID, "", "not today", 0))
}
}

at := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer at.Cleanup()

// This one will use callout since not defined in server config.
_, err := at.NewClient(nats.UserInfo("a", "x"))
require_Error(t, err)
require_True(t, strings.Contains(strings.ToLower(err.Error()), nats.AUTHORIZATION_ERR))
}

func TestAuthCallout_ClientAuthErrorConf(t *testing.T) {
testConfClientClose(t, true)
testConfClientClose(t, false)
}

func testAuthCall_ClientAuthErrorOperatorMode(t *testing.T, respondNil bool) {
_, spub := createKey(t)
sysClaim := jwt.NewAccountClaims(spub)
sysClaim.Name = "$SYS"
sysJwt, err := sysClaim.Encode(oKp)
require_NoError(t, err)

// AUTH service account.
akp, err := nkeys.FromSeed([]byte(authCalloutIssuerSeed))
require_NoError(t, err)

apub, err := akp.PublicKey()
require_NoError(t, err)

// The authorized user for the service.
upub, creds := createAuthServiceUser(t, akp)
defer removeFile(t, creds)

authClaim := jwt.NewAccountClaims(apub)
authClaim.Name = "AUTH"
authClaim.EnableExternalAuthorization(upub)
authClaim.Authorization.AllowedAccounts.Add("*")

// the scope for the bearer token which has no permissions
sentinelScope, authKP := newScopedRole(t, "sentinel", nil, nil, false)
sentinelScope.Template.Sub.Deny.Add(">")
sentinelScope.Template.Pub.Deny.Add(">")
sentinelScope.Template.Limits.Subs = 0
sentinelScope.Template.Payload = 0
authClaim.SigningKeys.AddScopedSigner(sentinelScope)

authJwt, err := authClaim.Encode(oKp)
require_NoError(t, err)

conf := fmt.Sprintf(`
listen: 127.0.0.1:-1
operator: %s
system_account: %s
resolver: MEM
resolver_preload: {
%s: %s
%s: %s
}
`, ojwt, spub, apub, authJwt, spub, sysJwt)

handler := func(m *nats.Msg) {
user, si, _, _, _ := decodeAuthRequest(t, m.Data)
if respondNil {
m.Respond(nil)
} else {
m.Respond(serviceResponse(t, user, si.ID, "", "not today", 0))
}
}

ac := NewAuthTest(t, conf, handler, nats.UserCredentials(creds))
defer ac.Cleanup()

// Bearer token - this has no permissions see sentinelScope
// This is used by all users, and the customization will be in other connect args.
// This needs to also be bound to the authorization account.
creds = createScopedUser(t, akp, authKP)
defer removeFile(t, creds)

// Send the signing key token. This should switch us to the test account, but the user
// is signed with the account signing key
_, err = ac.NewClient(nats.UserCredentials(creds))
require_Error(t, err)
require_True(t, strings.Contains(strings.ToLower(err.Error()), nats.AUTHORIZATION_ERR))
}

func TestAuthCallout_ClientAuthErrorOperatorMode(t *testing.T) {
testAuthCall_ClientAuthErrorOperatorMode(t, true)
testAuthCall_ClientAuthErrorOperatorMode(t, false)
}
4 changes: 4 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,8 +854,12 @@ func (c *client) applyAccountLimits() {
}
}
}

c.acc.mu.RLock()
minLimit(&c.mpay, c.acc.mpay)
minLimit(&c.msubs, c.acc.msubs)
c.acc.mu.RUnlock()

s := c.srv
opts := s.getOpts()
mPay := opts.MaxPayload
Expand Down
32 changes: 28 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4901,17 +4901,26 @@ func (o *consumer) hasNoLocalInterest() bool {

// This is when the underlying stream has been purged.
// sseq is the new first seq for the stream after purge.
// Lock should be held.
func (o *consumer) purge(sseq uint64, slseq uint64) {
// Lock should NOT be held.
func (o *consumer) purge(sseq uint64, slseq uint64, isWider bool) {
// Do not update our state unless we know we are the leader.
if !o.isLeader() {
return
}
// Signals all have been purged for this consumer.
if sseq == 0 {
if sseq == 0 && !isWider {
sseq = slseq + 1
}

var store StreamStore
if isWider {
o.mu.RLock()
if o.mset != nil {
store = o.mset.store
}
o.mu.RUnlock()
}

o.mu.Lock()
// Do not go backwards
if o.sseq < sseq {
Expand All @@ -4920,7 +4929,6 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {

if o.asflr < sseq {
o.asflr = sseq - 1

// We need to remove those no longer relevant from pending.
for seq, p := range o.pending {
if seq <= o.asflr {
Expand All @@ -4934,8 +4942,24 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
delete(o.rdc, seq)
// rdq handled below.
}
if isWider && store != nil {
// Our filtered subject, which could be all, is wider than the underlying purge.
// We need to check if the pending items left are still valid.
var smv StoreMsg
if _, err := store.LoadMsg(seq, &smv); err == errDeletedMsg || err == ErrStoreMsgNotFound {
if p.Sequence > o.adflr {
o.adflr = p.Sequence
if o.adflr > o.dseq {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
delete(o.rdc, seq)
}
}
}
}

// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
Expand Down
2 changes: 2 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5553,7 +5553,9 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
buf = buf[:sz]
}

<-dios
n, err := io.ReadFull(f, buf)
dios <- struct{}{}
// On success capture raw bytes size.
if err == nil {
mb.rbytes = uint64(n)
Expand Down
Loading