diff --git a/.github/workflows/cov.yaml b/.github/workflows/cov.yaml index 080496e4daf..2e359b7583e 100644 --- a/.github/workflows/cov.yaml +++ b/.github/workflows/cov.yaml @@ -5,6 +5,9 @@ on: schedule: - cron: "40 4 * * *" +permissions: + contents: read + jobs: nightly_coverage: runs-on: ubuntu-latest diff --git a/.github/workflows/long-tests.yaml b/.github/workflows/long-tests.yaml index 50e62fc41c2..b5ed18534c7 100644 --- a/.github/workflows/long-tests.yaml +++ b/.github/workflows/long-tests.yaml @@ -7,6 +7,9 @@ on: schedule: - cron: "30 12 * * *" +permissions: + contents: read + concurrency: # At most one of these workflow per ref running group: ${{ github.workflow }}-${{ github.ref }} diff --git a/.github/workflows/mqtt-test.yaml b/.github/workflows/mqtt-test.yaml index 4267967b222..b693cc62fb7 100644 --- a/.github/workflows/mqtt-test.yaml +++ b/.github/workflows/mqtt-test.yaml @@ -1,6 +1,9 @@ name: MQTT External Tests on: [pull_request] +permissions: + contents: read + jobs: test: env: diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml index c2c74d2801f..f93a6094b04 100644 --- a/.github/workflows/nightly.yaml +++ b/.github/workflows/nightly.yaml @@ -10,9 +10,14 @@ on: schedule: - cron: "40 4 * * *" +permissions: + contents: read + jobs: run: runs-on: ${{ vars.GHA_WORKER_RELEASE || 'ubuntu-latest' }} + permissions: + contents: write steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 30f569ee3dc..eef8928c6ec 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -5,12 +5,14 @@ on: - v* permissions: - contents: write + contents: read jobs: run: name: GitHub Release runs-on: ${{ vars.GHA_WORKER_RELEASE || 'ubuntu-latest' }} + permissions: + contents: write steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 41ce9ecb3be..a9d6a574d67 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -5,6 +5,9 @@ on: pull_request: workflow_dispatch: +permissions: + contents: read + env: RACE: ${{ (github.ref != 'refs/heads/main' && !startsWith(github.ref, 'refs/heads/release/') && github.event_name != 'pull_request') && '-race' || '' }} @@ -51,11 +54,13 @@ jobs: - name: Check PR description is signed off if: github.event_name == 'pull_request' + env: + PR_DESC: ${{ github.event.pull_request.body }} run: | - if ! echo "${{ github.event.pull_request.body }}" | grep -Pq '^Signed-off-by:\s*(?!Your Name|.*)'; then - echo "::error ::Pull request has not been signed off in the PR description with a \`Signed-off-by:\` line" - exit 1 - fi + grep -Pq '^Signed-off-by:\s*(?!Your Name|.*)' <<<"$PR_DESC" || { + echo "::error ::Pull request has not been signed off in the PR description with a \`Signed-off-by:\` line" + exit 1 + } lint: name: Lint diff --git a/.goreleaser.yml b/.goreleaser.yml index 60ef70a82a3..bf3d17278d7 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -21,7 +21,7 @@ builds: env: # This is the toolchain version we use for releases. To override, set the env var, e.g.: # GORELEASER_TOOLCHAIN="go1.22.8" TARGET='linux_amd64' goreleaser build --snapshot --clean --single-target - - GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.24.4" }} + - GOTOOLCHAIN={{ envOrDefault "GORELEASER_TOOLCHAIN" "go1.24.5" }} - GO111MODULE=on - CGO_ENABLED=0 goos: @@ -34,6 +34,7 @@ builds: - arm - arm64 - 386 + - loong64 - mips64le - s390x - ppc64le diff --git a/go.mod b/go.mod index f30b4e7e778..53b5c6a4cf4 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/nats-io/nats-server/v2 go 1.23.0 -toolchain go1.23.10 +toolchain go1.23.11 require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op @@ -14,7 +14,7 @@ require ( github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.39.0 - golang.org/x/sys v0.33.0 + golang.org/x/crypto v0.40.0 + golang.org/x/sys v0.34.0 golang.org/x/time v0.12.0 ) diff --git a/go.sum b/go.sum index 932b7a55073..d4caee25f0a 100644 --- a/go.sum +++ b/go.sum @@ -24,11 +24,11 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= -golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= +golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/server/auth_callout_test.go b/server/auth_callout_test.go index ad6af9f2256..7021d7c5b29 100644 --- a/server/auth_callout_test.go +++ b/server/auth_callout_test.go @@ -231,10 +231,14 @@ func TestAuthCalloutBasics(t *testing.T) { require_True(t, si.Name == "A") require_True(t, ci.Host == "127.0.0.1") // Allow dlc user. - if opts.Username == "dlc" && opts.Password == "zzz" { + if (opts.Username == "dlc" && opts.Password == "zzz") || opts.Token == "SECRET_TOKEN" { var j jwt.UserPermissionLimits j.Pub.Allow.Add("$SYS.>") j.Payload = 1024 + if opts.Token == "SECRET_TOKEN" { + // Token MUST NOT be exposed in user info. + require_Equal(t, ci.User, "[REDACTED]") + } ujwt := createAuthUser(t, user, _EMPTY_, globalAccountName, "", nil, 10*time.Minute, &j) m.Respond(serviceResponse(t, user, si.ID, ujwt, "", 0)) } else { @@ -279,6 +283,39 @@ func TestAuthCalloutBasics(t *testing.T) { if expires > 10*time.Minute || expires < (10*time.Minute-5*time.Second) { t.Fatalf("Expected expires of ~%v, got %v", 10*time.Minute, expires) } + + // Callout with a token should also work, regardless of it being redacted in the user info. + nc.Close() + nc = at.Connect(nats.Token("SECRET_TOKEN")) + defer nc.Close() + + resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) + require_NoError(t, err) + response = ServerAPIResponse{Data: &UserInfo{}} + err = json.Unmarshal(resp.Data, &response) + require_NoError(t, err) + + userInfo = response.Data.(*UserInfo) + dlc = &UserInfo{ + // Token MUST NOT be exposed in user info. + UserID: "[REDACTED]", + Account: globalAccountName, + Permissions: &Permissions{ + Publish: &SubjectPermission{ + Allow: []string{"$SYS.>"}, + Deny: []string{AuthCalloutSubject}, // Will be auto-added since in auth account. + }, + Subscribe: &SubjectPermission{}, + }, + } + expires = userInfo.Expires + userInfo.Expires = 0 + if !reflect.DeepEqual(dlc, userInfo) { + t.Fatalf("User info for %q did not match", "dlc") + } + if expires > 10*time.Minute || expires < (10*time.Minute-5*time.Second) { + t.Fatalf("Expected expires of ~%v, got %v", 10*time.Minute, expires) + } } func TestAuthCalloutMultiAccounts(t *testing.T) { diff --git a/server/client.go b/server/client.go index caf3f91532c..8accdb27429 100644 --- a/server/client.go +++ b/server/client.go @@ -4360,7 +4360,7 @@ func sliceHeader(key string, hdr []byte) []byte { if len(hdr) == 0 { return nil } - index := bytes.Index(hdr, stringToBytes(key)) + index := bytes.Index(hdr, stringToBytes(key+":")) hdrLen := len(hdr) // Check that we have enough characters, this will handle the -1 case of the key not // being found and will also handle not having enough characters for trailing CRLF. diff --git a/server/client_test.go b/server/client_test.go index eb59248fedc..8bd2e550167 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -3040,6 +3040,26 @@ func TestSliceHeader(t *testing.T) { require_True(t, bytes.Equal(sliced, copied)) } +func TestSliceHeaderOrdering(t *testing.T) { + hdr := []byte("NATS/1.0\r\n\r\n") + + // These headers share the same prefix, the longer subject + // must not invalidate the existence of the shorter one. + hdr = genHeader(hdr, JSExpectedLastSubjSeqSubj, "foo") + hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24") + + sliced := sliceHeader(JSExpectedLastSubjSeq, hdr) + copied := getHeader(JSExpectedLastSubjSeq, hdr) + + require_NotNil(t, sliced) + require_Equal(t, cap(sliced), 2) + + require_NotNil(t, copied) + require_Equal(t, cap(copied), len(copied)) + + require_True(t, bytes.Equal(sliced, copied)) +} + func TestInProcessAllowedConnectionType(t *testing.T) { tmpl := ` listen: "127.0.0.1:-1" diff --git a/server/configs/reload/defaultsentinel_1.conf b/server/configs/reload/defaultsentinel_1.conf deleted file mode 100644 index 63549ebf034..00000000000 --- a/server/configs/reload/defaultsentinel_1.conf +++ /dev/null @@ -1,16 +0,0 @@ -listen: 127.0.0.1:-1 - -// Operator "O" -operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJDUUsyV1c3SkhGVVhNR0g1TlFEWUlJT0UyWTRRWlZMVlFTS1ZKTVZITzZZRFBMWVg2Q1hRIiwiaWF0IjoxNzQ0NzI2MzA4LCJpc3MiOiJPRDQzNElRTVdGQTNTNzVBS1kyMlQ3UkdLRkdSSkRBNFFaU0xQRFFCRVZQVktONktBUTRTWVVMNiIsIm5hbWUiOiJPIiwic3ViIjoiT0Q0MzRJUU1XRkEzUzc1QUtZMjJUN1JHS0ZHUkpEQTRRWlNMUERRQkVWUFZLTjZLQVE0U1lVTDYiLCJuYXRzIjp7InR5cGUiOiJvcGVyYXRvciIsInZlcnNpb24iOjJ9fQ.DKuOHqWHgooC6x_vFaY66hLY5ZxoDKk2VVq71-gl_FDn2o_Amu_gQAxYvqLsNZvtlggWF-vKq944xcZmp0N1Dg - -resolver: MEMORY - -resolver_preload: { - // Account "SYS" - AB3GSO5OVHXQJYNNEMX4SGZCS3SPMNMDX4YA65M3SQX7XXHI4NFM2TF6: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJJSEtXN0RLTk9MRVY2T1hVUUZGQlRCN1pWM0NHNzc2M0RYVFdGNEsyU0NUVkNWUDNOSEFBIiwiaWF0IjoxNzQ0NzMwMDcyLCJpc3MiOiJPRDQzNElRTVdGQTNTNzVBS1kyMlQ3UkdLRkdSSkRBNFFaU0xQRFFCRVZQVktONktBUTRTWVVMNiIsIm5hbWUiOiJTWVMiLCJzdWIiOiJBQjNHU081T1ZIWFFKWU5ORU1YNFNHWkNTM1NQTU5NRFg0WUE2NU0zU1FYN1hYSEk0TkZNMlRGNiIsIm5hdHMiOnsibGltaXRzIjp7InN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsImltcG9ydHMiOi0xLCJleHBvcnRzIjotMSwid2lsZGNhcmRzIjp0cnVlLCJjb25uIjotMSwibGVhZiI6LTF9LCJkZWZhdWx0X3Blcm1pc3Npb25zIjp7InB1YiI6e30sInN1YiI6e319LCJhdXRob3JpemF0aW9uIjp7fSwidHlwZSI6ImFjY291bnQiLCJ2ZXJzaW9uIjoyfX0.cteeeg1_aw8FNGp6EMS1sGmggC165ZJ0QRynaHlg1j43ZrjJ5qZ43trW5BxbthY5EcplqmnvcCC7cmO7INLQDg - - // Account "A" - ACX3MVLHNZ4SJZQFR72E2RQ7BUIDTQAAFOUAZLDQ7TK44RS55OSVSWU5: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiIyV1NPTDIzQzNISllLTTc2TkJBTzU0Tlc3QldNVU81QVo3Rk5ORE9XUVlRVVhORkxNTldBIiwiaWF0IjoxNzQ0NzI2MzEzLCJpc3MiOiJPRDQzNElRTVdGQTNTNzVBS1kyMlQ3UkdLRkdSSkRBNFFaU0xQRFFCRVZQVktONktBUTRTWVVMNiIsIm5hbWUiOiJBIiwic3ViIjoiQUNYM01WTEhOWjRTSlpRRlI3MkUyUlE3QlVJRFRRQUFGT1VBWkxEUTdUSzQ0UlM1NU9TVlNXVTUiLCJuYXRzIjp7ImxpbWl0cyI6eyJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpbXBvcnRzIjotMSwiZXhwb3J0cyI6LTEsIndpbGRjYXJkcyI6dHJ1ZSwiY29ubiI6LTEsImxlYWYiOi0xfSwiZGVmYXVsdF9wZXJtaXNzaW9ucyI6eyJwdWIiOnt9LCJzdWIiOnt9fSwiYXV0aG9yaXphdGlvbiI6e30sInR5cGUiOiJhY2NvdW50IiwidmVyc2lvbiI6Mn19.AFbpHhn0eht07x-z_QdJOGF2mpmRyEm2MUZR5ZhTZ6m5uPAsOJh1coNqyLUDdrA7n1sLHKpwb26tzbZm4Ws7BA -} - -default_sentinel one \ No newline at end of file diff --git a/server/configs/reload/defaultsentinel_2.conf b/server/configs/reload/defaultsentinel_2.conf deleted file mode 100644 index 4f70f9e2fe0..00000000000 --- a/server/configs/reload/defaultsentinel_2.conf +++ /dev/null @@ -1,16 +0,0 @@ -listen: 127.0.0.1:-1 - -// Operator "O" -operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJDUUsyV1c3SkhGVVhNR0g1TlFEWUlJT0UyWTRRWlZMVlFTS1ZKTVZITzZZRFBMWVg2Q1hRIiwiaWF0IjoxNzQ0NzI2MzA4LCJpc3MiOiJPRDQzNElRTVdGQTNTNzVBS1kyMlQ3UkdLRkdSSkRBNFFaU0xQRFFCRVZQVktONktBUTRTWVVMNiIsIm5hbWUiOiJPIiwic3ViIjoiT0Q0MzRJUU1XRkEzUzc1QUtZMjJUN1JHS0ZHUkpEQTRRWlNMUERRQkVWUFZLTjZLQVE0U1lVTDYiLCJuYXRzIjp7InR5cGUiOiJvcGVyYXRvciIsInZlcnNpb24iOjJ9fQ.DKuOHqWHgooC6x_vFaY66hLY5ZxoDKk2VVq71-gl_FDn2o_Amu_gQAxYvqLsNZvtlggWF-vKq944xcZmp0N1Dg - -resolver: MEMORY - -resolver_preload: { - // Account "SYS" - AB3GSO5OVHXQJYNNEMX4SGZCS3SPMNMDX4YA65M3SQX7XXHI4NFM2TF6: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJJSEtXN0RLTk9MRVY2T1hVUUZGQlRCN1pWM0NHNzc2M0RYVFdGNEsyU0NUVkNWUDNOSEFBIiwiaWF0IjoxNzQ0NzMwMDcyLCJpc3MiOiJPRDQzNElRTVdGQTNTNzVBS1kyMlQ3UkdLRkdSSkRBNFFaU0xQRFFCRVZQVktONktBUTRTWVVMNiIsIm5hbWUiOiJTWVMiLCJzdWIiOiJBQjNHU081T1ZIWFFKWU5ORU1YNFNHWkNTM1NQTU5NRFg0WUE2NU0zU1FYN1hYSEk0TkZNMlRGNiIsIm5hdHMiOnsibGltaXRzIjp7InN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsImltcG9ydHMiOi0xLCJleHBvcnRzIjotMSwid2lsZGNhcmRzIjp0cnVlLCJjb25uIjotMSwibGVhZiI6LTF9LCJkZWZhdWx0X3Blcm1pc3Npb25zIjp7InB1YiI6e30sInN1YiI6e319LCJhdXRob3JpemF0aW9uIjp7fSwidHlwZSI6ImFjY291bnQiLCJ2ZXJzaW9uIjoyfX0.cteeeg1_aw8FNGp6EMS1sGmggC165ZJ0QRynaHlg1j43ZrjJ5qZ43trW5BxbthY5EcplqmnvcCC7cmO7INLQDg - - // Account "A" - ACX3MVLHNZ4SJZQFR72E2RQ7BUIDTQAAFOUAZLDQ7TK44RS55OSVSWU5: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiIyV1NPTDIzQzNISllLTTc2TkJBTzU0Tlc3QldNVU81QVo3Rk5ORE9XUVlRVVhORkxNTldBIiwiaWF0IjoxNzQ0NzI2MzEzLCJpc3MiOiJPRDQzNElRTVdGQTNTNzVBS1kyMlQ3UkdLRkdSSkRBNFFaU0xQRFFCRVZQVktONktBUTRTWVVMNiIsIm5hbWUiOiJBIiwic3ViIjoiQUNYM01WTEhOWjRTSlpRRlI3MkUyUlE3QlVJRFRRQUFGT1VBWkxEUTdUSzQ0UlM1NU9TVlNXVTUiLCJuYXRzIjp7ImxpbWl0cyI6eyJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpbXBvcnRzIjotMSwiZXhwb3J0cyI6LTEsIndpbGRjYXJkcyI6dHJ1ZSwiY29ubiI6LTEsImxlYWYiOi0xfSwiZGVmYXVsdF9wZXJtaXNzaW9ucyI6eyJwdWIiOnt9LCJzdWIiOnt9fSwiYXV0aG9yaXphdGlvbiI6e30sInR5cGUiOiJhY2NvdW50IiwidmVyc2lvbiI6Mn19.AFbpHhn0eht07x-z_QdJOGF2mpmRyEm2MUZR5ZhTZ6m5uPAsOJh1coNqyLUDdrA7n1sLHKpwb26tzbZm4Ws7BA -} - -default_sentinel two \ No newline at end of file diff --git a/server/consumer.go b/server/consumer.go index 54504495846..1e1fd35310e 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -437,7 +437,8 @@ type consumer struct { rdqi avl.SequenceSet rdc map[uint64]uint64 replies map[uint64]string - pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum. + pendingDeliveries map[uint64]*jsPubMsg // Messages that can be delivered after achieving quorum. + waitingDeliveries map[string]*waitingDelivery // (Optional) request timeout messages that need to wait for replicated deliveries first. maxdc uint64 waiting *waitQueue cfg ConsumerConfig @@ -819,6 +820,9 @@ func checkConsumerCfg( } if config.PriorityPolicy != PriorityNone { + if config.DeliverSubject != "" { + return NewJSConsumerPushWithPriorityGroupError() + } if len(config.PriorityGroups) == 0 { return NewJSConsumerPriorityPolicyWithoutGroupError() } @@ -1846,7 +1850,7 @@ func (o *consumer) deleteNotActive() { } else { // Pull mode. elapsed := time.Since(o.waiting.last) - if elapsed <= o.cfg.InactiveThreshold { + if elapsed < o.dthresh { // These need to keep firing so reset but use delta. if o.dtmr != nil { o.dtmr.Reset(o.dthresh - elapsed) @@ -1866,6 +1870,43 @@ func (o *consumer) deleteNotActive() { o.mu.Unlock() return } + + // We now know we have no waiting requests, and our last request was long ago. + // However, based on AckWait the consumer could still be actively processing, + // even if we haven't been informed if there were no acks in the meantime. + // We must wait for the message that expires last and start counting down the + // inactive threshold from there. + now := time.Now().UnixNano() + l := len(o.cfg.BackOff) + var delay time.Duration + var ackWait time.Duration + for _, p := range o.pending { + if l == 0 { + ackWait = o.ackWait(0) + } else { + bi := int(o.rdc[p.Sequence]) + if bi < 0 { + bi = 0 + } else if bi >= l { + bi = l - 1 + } + ackWait = o.ackWait(o.cfg.BackOff[bi]) + } + if ts := p.Timestamp + ackWait.Nanoseconds() + o.dthresh.Nanoseconds(); ts > now { + delay = max(delay, time.Duration(ts-now)) + } + } + // We'll wait for the latest time we expect an ack, plus the inactive threshold. + // Acknowledging a message will reset this back down to just the inactive threshold. + if delay > 0 { + if o.dtmr != nil { + o.dtmr.Reset(delay) + } else { + o.dtmr = time.AfterFunc(delay, o.deleteNotActive) + } + o.mu.Unlock() + return + } } s, js := o.mset.srv, o.srv.js.Load() @@ -2540,11 +2581,23 @@ func (o *consumer) addAckReply(sseq uint64, reply string) { // Used to remember messages that need to be sent for a replicated consumer, after delivered quorum. // Lock should be held. func (o *consumer) addReplicatedQueuedMsg(pmsg *jsPubMsg) { - // Is not explicitly limited in size, but will at maximum hold maximum ack pending. + // Is not explicitly limited in size, but will at most hold maximum ack pending. if o.pendingDeliveries == nil { o.pendingDeliveries = make(map[uint64]*jsPubMsg) } o.pendingDeliveries[pmsg.seq] = pmsg + + // Is not explicitly limited in size, but will at most hold maximum waiting requests. + if o.waitingDeliveries == nil { + o.waitingDeliveries = make(map[string]*waitingDelivery) + } + if wd, ok := o.waitingDeliveries[pmsg.dsubj]; ok { + wd.seq = pmsg.seq + } else { + wd := wdPool.Get().(*waitingDelivery) + wd.seq = pmsg.seq + o.waitingDeliveries[pmsg.dsubj] = wd + } } // Lock should be held. @@ -3446,6 +3499,28 @@ func (wr *waitingRequest) recycle() { } } +// Represents an (optional) request timeout that's sent after waiting for replicated deliveries. +type waitingDelivery struct { + seq uint64 + pn int // Pending messages. + pb int // Pending bytes. +} + +// sync.Pool for waiting deliveries. +var wdPool = sync.Pool{ + New: func() any { + return new(waitingDelivery) + }, +} + +// Force a recycle. +func (wd *waitingDelivery) recycle() { + if wd != nil { + wd.seq, wd.pn, wd.pb = 0, 0, 0 + wdPool.Put(wd) + } +} + // waiting queue for requests that are waiting for new messages to arrive. type waitQueue struct { n, max int @@ -3721,8 +3796,19 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { } } else { // We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there. - hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) - o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + rdWait := o.replicateDeliveries() + if rdWait { + // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. + if wd, ok := o.waitingDeliveries[wr.reply]; !ok { + rdWait = false + } else { + wd.pn, wd.pb = wr.n, wr.b + } + } + if !rdWait { + hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } o.waiting.removeCurrent() if o.node != nil { o.removeClusterPendingRequest(wr.reply) @@ -4187,8 +4273,19 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { for wr := wq.head; wr != nil; { // Check expiration. if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) { - hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) - o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + rdWait := o.replicateDeliveries() + if rdWait { + // Check if we need to send the timeout after pending replicated deliveries, or can do so immediately. + if wd, ok := o.waitingDeliveries[wr.reply]; !ok { + rdWait = false + } else { + wd.pn, wd.pb = wr.n, wr.b + } + } + if !rdWait { + hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b) + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } wr = remove(pre, wr) continue } @@ -4425,8 +4522,11 @@ func (o *consumer) suppressDeletion() { // if dtmr is not nil we have started the countdown, simply reset to threshold. o.dtmr.Reset(o.dthresh) } else if o.isPullMode() && o.waiting != nil { - // Pull mode always has timer running, just update last on waiting queue. + // Pull mode always has timer running, update last on waiting queue. o.waiting.last = time.Now() + if o.dtmr != nil { + o.dtmr.Reset(o.dthresh) + } } } @@ -4485,7 +4585,6 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { delay time.Duration sz int wrn, wrb int - wrNoWait bool ) o.mu.Lock() @@ -4564,7 +4663,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { if o.isPushMode() { dsubj = o.dsubj } else if wr := o.nextWaiting(sz); wr != nil { - wrn, wrb, wrNoWait = wr.n, wr.b, wr.noWait + wrn, wrb = wr.n, wr.b dsubj = wr.reply if o.cfg.PriorityPolicy == PriorityPinnedClient { // FIXME(jrm): Can we make this prettier? @@ -4639,7 +4738,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { } // Do actual delivery. - o.deliverMsg(dsubj, ackReply, pmsg, dc, rp, wrNoWait) + o.deliverMsg(dsubj, ackReply, pmsg, dc, rp) // If given request fulfilled batch size, but there are still pending bytes, send information about it. if wrn <= 0 && wrb > 0 { @@ -4838,7 +4937,7 @@ func convertToHeadersOnly(pmsg *jsPubMsg) { // Deliver a msg to the consumer. // Lock should be held and o.mset validated to be non-nil. -func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy, wrNoWait bool) { +func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, rp RetentionPolicy) { if o.mset == nil { pmsg.returnToPool() return @@ -4871,15 +4970,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, } // Send message. - // If we're replicated we MUST only send the message AFTER we've got quorum for updating - // delivered state. Otherwise, we could be in an invalid state after a leader change. - // We can send immediately if not replicated, not using acks, or using flow control (incompatible). - // TODO(mvv): If NoWait we also bypass replicating first. - // Ideally we'd only send the NoWait request timeout after replication and delivery. - if o.node == nil || ap == AckNone || o.cfg.FlowControl || wrNoWait { - o.outq.send(pmsg) - } else { + if o.replicateDeliveries() { o.addReplicatedQueuedMsg(pmsg) + } else { + o.outq.send(pmsg) } // Flow control. @@ -4902,6 +4996,15 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, } } +// replicateDeliveries returns whether deliveries should be replicated before sending them. +// If we're replicated we MUST only send the message AFTER we've got quorum for updating +// delivered state. Otherwise, we could be in an invalid state after a leader change. +// We can send immediately if not replicated, not using acks, or using flow control (incompatible). +// Lock should be held. +func (o *consumer) replicateDeliveries() bool { + return o.node != nil && o.cfg.AckPolicy != AckNone && !o.cfg.FlowControl +} + func (o *consumer) needFlowControl(sz int) bool { if o.maxpb == 0 { return false @@ -6148,4 +6251,8 @@ func (o *consumer) resetPendingDeliveries() { pmsg.returnToPool() } o.pendingDeliveries = nil + for _, wd := range o.waitingDeliveries { + wd.recycle() + } + o.waitingDeliveries = nil } diff --git a/server/errors.json b/server/errors.json index 7b90366a6ed..3a80cc4d642 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1658,5 +1658,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSConsumerPushWithPriorityGroupErr", + "code": 400, + "error_code": 10178, + "description": "priority groups can not be used with push consumers", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/events.go b/server/events.go index d579618bb18..3d44340b7b2 100644 --- a/server/events.go +++ b/server/events.go @@ -1401,10 +1401,9 @@ func (s *Server) initEventTracking() { } } - // User info. - // TODO(dlc) - Can be internal and not forwarded since bound server for the client connection - // is only one that will answer. This breaks tests since we still forward on remote server connect. - if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil { + // User info. Do not propagate interest so that we know the local server to the connection + // is the only one that will answer the requests. + if _, err := s.sysSubscribeInternal(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) return } diff --git a/server/filestore.go b/server/filestore.go index 5a8b22cb61f..cb7a1d8cd2b 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1271,10 +1271,13 @@ func (mb *msgBlock) convertCipher() error { buf, _ := mb.loadBlock(nil) bek.XORKeyStream(buf, buf) - // Make sure we can parse with old cipher and key file. - if err = mb.indexCacheBuf(buf); err != nil { + // Check for compression, and make sure we can parse with old cipher and key file. + if nbuf, err := mb.decompressIfNeeded(buf); err != nil { + return err + } else if err = mb.indexCacheBuf(nbuf); err != nil { return err } + // Reset the cache since we just read everything in. mb.cache = nil @@ -1917,31 +1920,34 @@ func (fs *fileStore) recoverTTLState() error { defer fs.resetAgeChk(0) if fs.state.Msgs > 0 && ttlseq <= fs.state.LastSeq { fs.warn("TTL state is outdated; attempting to recover using linear scan (seq %d to %d)", ttlseq, fs.state.LastSeq) - var sm StoreMsg - mb := fs.selectMsgBlock(ttlseq) - if mb == nil { - return nil - } - mblseq := atomic.LoadUint64(&mb.last.seq) + var ( + mb *msgBlock + sm StoreMsg + mblseq uint64 + ) for seq := ttlseq; seq <= fs.state.LastSeq; seq++ { retry: + if mb == nil { + if mb = fs.selectMsgBlock(seq); mb == nil { + // Selecting the message block should return a block that contains this sequence, + // or a later block if it can't be found. + // It's an error if we can't find any block within the bounds of first and last seq. + fs.warn("Error loading msg block with seq %d for recovering TTL: %s", seq) + continue + } + seq = atomic.LoadUint64(&mb.first.seq) + mblseq = atomic.LoadUint64(&mb.last.seq) + } if mb.ttls == 0 { // None of the messages in the block have message TTLs so don't // bother doing anything further with this block, skip to the end. seq = atomic.LoadUint64(&mb.last.seq) + 1 } if seq > mblseq { - // We've reached the end of the loaded block, see if we can continue - // by loading the next one. + // We've reached the end of the loaded block, so let's go back to the + // beginning and process the next block. mb.tryForceExpireCache() - if mb = fs.selectMsgBlock(seq); mb == nil { - // TODO(nat): Deal with gaps properly. Right now this will be - // probably expensive on CPU. - continue - } - mblseq = atomic.LoadUint64(&mb.last.seq) - // At this point we've loaded another block, so let's go back to the - // beginning and see if we need to skip this one too. + mb = nil goto retry } msg, _, err := mb.fetchMsgNoCopy(seq, &sm) @@ -5649,7 +5655,7 @@ func (fs *fileStore) expireMsgs() { // if it was the last message of that particular subject that we just deleted. if sdmEnabled { if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0 + sdm := last && isSubjectDeleteMarker(sm.hdr) fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -5682,7 +5688,7 @@ func (fs *fileStore) expireMsgs() { if ttlSdm == nil { ttlSdm = make(map[string][]SDMBySubj, 1) } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0}) + ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) } else { // Collect sequences to remove. Don't remove messages inline here, // as that releases the lock and THW is not thread-safe. diff --git a/server/filestore_test.go b/server/filestore_test.go index 112437c02cc..bdcf4660d1a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -9726,3 +9726,42 @@ func TestFileStoreFirstMatchingMultiExpiry(t *testing.T) { require_True(t, didLoad) // last message, should expire }) } + +func TestFileStoreNoPanicOnRecoverTTLWithCorruptBlocks(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage, AllowMsgTTL: true}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + hdr := genHeader(nil, JSMessageTTL, "1") + for i := range 3 { + if i > 0 { + _, err = fs.newMsgBlockForWrite() + require_NoError(t, err) + } + _, _, err = fs.StoreMsg("foo", hdr, []byte("A"), 1) + require_NoError(t, err) + } + + fs.mu.Lock() + if blks := len(fs.blks); blks != 3 { + fs.mu.Unlock() + t.Fatalf("Expected 3 blocks, got %d", blks) + } + + // Manually corrupt the blocks by removing the second and changing the + // sequence range for the last to that of the first. + fmb := fs.blks[0] + smb := fs.blks[1] + lmb := fs.lmb + fseq, lseq := atomic.LoadUint64(&fmb.first.seq), atomic.LoadUint64(&fmb.last.seq) + smb.mu.Lock() + fs.removeMsgBlock(smb) + smb.mu.Unlock() + fs.mu.Unlock() + atomic.StoreUint64(&lmb.first.seq, fseq) + atomic.StoreUint64(&lmb.last.seq, lseq) + + require_NoError(t, fs.recoverTTLState()) + }) +} diff --git a/server/jetstream.go b/server/jetstream.go index d5e3c9f11c6..b9ececa88f7 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -40,15 +40,15 @@ import ( // JetStreamConfig determines this server's configuration. // MaxMemory and MaxStore are in bytes. type JetStreamConfig struct { - MaxMemory int64 `json:"max_memory"` - MaxStore int64 `json:"max_storage"` - StoreDir string `json:"store_dir,omitempty"` - SyncInterval time.Duration `json:"sync_interval,omitempty"` - SyncAlways bool `json:"sync_always,omitempty"` - Domain string `json:"domain,omitempty"` - CompressOK bool `json:"compress_ok,omitempty"` - UniqueTag string `json:"unique_tag,omitempty"` - Strict bool `json:"strict,omitempty"` + MaxMemory int64 `json:"max_memory"` // MaxMemory is the maximum size of memory type streams + MaxStore int64 `json:"max_storage"` // MaxStore is the maximum size of file store type streams + StoreDir string `json:"store_dir,omitempty"` // StoreDir is where storage files are stored + SyncInterval time.Duration `json:"sync_interval,omitempty"` // SyncInterval is how frequently we sync to disk in the background by calling fsync + SyncAlways bool `json:"sync_always,omitempty"` // SyncAlways indicates flushes are done after every write + Domain string `json:"domain,omitempty"` // Domain is the JetStream domain + CompressOK bool `json:"compress_ok,omitempty"` // CompressOK indicates if compression is supported + UniqueTag string `json:"unique_tag,omitempty"` // UniqueTag is the unique tag assigned to this instance + Strict bool `json:"strict,omitempty"` // Strict indicates if strict JSON parsing is performed } // Statistics about JetStream for this server. diff --git a/server/jetstream_benchmark_test.go b/server/jetstream_benchmark_test.go index 1381d7fa645..68cd59b44a6 100644 --- a/server/jetstream_benchmark_test.go +++ b/server/jetstream_benchmark_test.go @@ -403,6 +403,92 @@ func BenchmarkJetStreamConsume(b *testing.B) { } } +// BenchmarkJetStreamConsumeFilteredContiguous verifies the fix in +// https://github.com/nats-io/nats-server/pull/7015 and should +// capture future regressions. +func BenchmarkJetStreamConsumeFilteredContiguous(b *testing.B) { + clusterSizeCases := []struct { + clusterSize int // Single node or cluster + replicas int // Stream replicas + storage nats.StorageType // Stream storage + filters int // How many subject filters? + }{ + {1, 1, nats.MemoryStorage, 1}, + {1, 1, nats.MemoryStorage, 2}, + {3, 3, nats.MemoryStorage, 1}, + {3, 3, nats.MemoryStorage, 2}, + {1, 1, nats.FileStorage, 1}, + {1, 1, nats.FileStorage, 2}, + {3, 3, nats.FileStorage, 1}, + {3, 3, nats.FileStorage, 2}, + } + + for _, cs := range clusterSizeCases { + name := fmt.Sprintf( + "N=%d,R=%d,storage=%s", + cs.clusterSize, + cs.replicas, + cs.storage.String(), + ) + if cs.filters != 2 { // historical default is 2 + name = name + ",SF" + } + b.Run(name, func(b *testing.B) { + _, _, shutdown, nc, js := startJSClusterAndConnect(b, cs.clusterSize) + defer shutdown() + defer nc.Close() + + var msgs = b.N + payload := make([]byte, 1024) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "test", + Subjects: []string{"foo"}, + Retention: nats.LimitsPolicy, + Storage: cs.storage, + Replicas: cs.replicas, + }) + require_NoError(b, err) + + for range msgs { + _, err = js.Publish("foo", payload) + require_NoError(b, err) + } + + // Subject filters deliberately vary from the stream, ensures that we hit + // the right paths in the filestore, rather than detecting 1:1 overlap. + ocfg := &nats.ConsumerConfig{ + Name: "test_consumer", + DeliverPolicy: nats.DeliverAllPolicy, + AckPolicy: nats.AckNonePolicy, + Replicas: cs.replicas, + MemoryStorage: true, + } + switch cs.filters { + case 1: + ocfg.FilterSubject = "foo" + case 2: + ocfg.FilterSubjects = []string{"foo", "bar"} + } + _, err = js.AddConsumer("test", ocfg) + require_NoError(b, err) + + ps, err := js.PullSubscribe("foo", _EMPTY_, nats.Bind("test", "test_consumer")) + require_NoError(b, err) + + b.SetBytes(int64(len(payload))) + b.ReportAllocs() + b.ResetTimer() + for range msgs { + msgs, err := ps.Fetch(1) + require_NoError(b, err) + require_Len(b, len(msgs), 1) + } + b.StopTimer() + }) + } +} + func BenchmarkJetStreamConsumeWithFilters(b *testing.B) { const ( verbose = false diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 84f3675dff0..edb54368bdb 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1225,6 +1225,8 @@ func (js *jetStream) monitorCluster() { doSnapshot() return case <-rqch: + // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. + doSnapshot() return case <-qch: // Clean signal from shutdown routine so do best effort attempt to snapshot meta layer. @@ -1280,10 +1282,10 @@ func (js *jetStream) monitorCluster() { } else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta { doSnapshot() } - ce.ReturnToPool() } else { s.Warnf("Error applying JetStream cluster entries: %v", err) } + ce.ReturnToPool() } aq.recycle(&ces) @@ -2455,6 +2457,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ne, nb = n.Applied(ce.Index) ce.ReturnToPool() } else { + // Make sure to clean up. + ce.ReturnToPool() // Our stream was closed out from underneath of us, simply return here. if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning { aq.recycle(&ces) @@ -4862,13 +4866,14 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if n.NeedSnapshot() { doSnapshot(true) } - } else if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { + continue + } + if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { var ne, nb uint64 // We can't guarantee writes are flushed while we're shutting down. Just rely on replay during recovery. if !js.isShuttingDown() { ne, nb = n.Applied(ce.Index) } - ce.ReturnToPool() // If we have at least min entries to compact, go ahead and snapshot/compact. if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { doSnapshot(false) @@ -4876,6 +4881,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } else if err != errConsumerClosed { s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) } + ce.ReturnToPool() } aq.recycle(&ces) @@ -5058,8 +5064,20 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea o.ldt = time.Now() // Need to send message to the client, since we have quorum to do so now. if pmsg, ok := o.pendingDeliveries[sseq]; ok { + // Copy delivery subject and sequence first, as the send returns it to the pool and clears it. + dsubj, seq := pmsg.dsubj, pmsg.seq o.outq.send(pmsg) delete(o.pendingDeliveries, sseq) + + // Might need to send a request timeout after sending the last replicated delivery. + if wd, ok := o.waitingDeliveries[dsubj]; ok && wd.seq == seq { + if wd.pn > 0 || wd.pb > 0 { + hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wd.pn, JSPullRequestPendingBytes, wd.pb) + o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } + wd.recycle() + delete(o.waitingDeliveries, dsubj) + } } o.mu.Unlock() if err != nil { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 2a04e46bcf1..da758af19f5 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -8097,6 +8097,70 @@ func TestJetStreamClusterSubjectDeleteMarkersTimingWithMaxAge(t *testing.T) { } } +func TestJetStreamClusterDesyncAfterFailedScaleUp(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, + } + _, err := js.AddStream(cfg) + require_NoError(t, err) + + // Set up some initial state for the stream. + pubAck, err := js.Publish("foo", nil) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + // Scale up the stream. + cfg.Replicas = 3 + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + // Stop stream leader, and clear stream state on the followers. + sl := c.streamLeader(globalAccountName, "TEST") + sl.Shutdown() + for _, s := range c.servers { + if s == sl { + continue + } + // Install snapshot. + mset, err := s.globalAccount().lookupStream("TEST") + require_NoError(t, err) + require_NoError(t, mset.raftNode().InstallSnapshot(mset.stateSnapshot())) + sd := s.StoreDir() + s.Shutdown() + require_NoError(t, os.RemoveAll(filepath.Join(sd, globalAccountName, streamsDir, "TEST"))) + } + + // Restart all servers except the leader. + for _, s := range c.servers { + if s == sl { + continue + } + c.restartServer(s) + } + + // Allow some time for the restarted servers to try and become leader. + time.Sleep(2 * time.Second) + require_True(t, c.streamLeader(globalAccountName, "TEST") == nil) + + // Restart the old leader, now the state should converge. + c.restartServer(sl) + c.waitOnStreamLeader(globalAccountName, "TEST") + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 1b070fabdfc..a9851482f7f 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1425,7 +1425,7 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) } // Pull Consumer - sub, err := js.PullSubscribe("foo", "d", nats.InactiveThreshold(time.Second)) + sub, err := js.PullSubscribe("foo", "d", nats.InactiveThreshold(time.Second), nats.AckWait(time.Second)) require_NoError(t, err) fetchMsgs(t, sub, n/2, time.Second) @@ -1453,7 +1453,7 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) require_NoError(t, err) // Make sure it gets cleaned up. - time.Sleep(2 * time.Second) + time.Sleep(3500 * time.Millisecond) _, err = js.ConsumerInfo("TEST", "d") require_Error(t, err, nats.ErrConsumerNotFound) } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 629f9b9c19d..7ce6ff23288 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -2170,27 +2170,29 @@ func TestJetStreamConsumerWithPriorityGroups(t *testing.T) { cluster.waitOnStreamLeader("$G", "TEST") for _, test := range []struct { - name string - nc *nats.Conn - stream string - consumer string - groups []string - mode PriorityPolicy - err *ApiError + name string + nc *nats.Conn + stream string + consumer string + groups []string + mode PriorityPolicy + deliverSubject string + err *ApiError }{ - {"Pinned Consumer with Priority Group", nc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, nil}, - {"Pinned Consumer with Priority Group, clustered", cnc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, nil}, - {"Overflow Consumer with Priority Group", nc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, nil}, - {"Overflow Consumer with Priority Group, clustered", cnc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, nil}, - {"Pinned Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Pinned Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Overflow Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Overflow Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, - {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, - {"Consumer with `none` policy priority", nc, "TEST", "NONE", []string{"A"}, PriorityNone, nil}, + {"Pinned Consumer with Priority Group", nc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, "", nil}, + {"Pinned Consumer with Priority Group, clustered", cnc, "TEST", "PINNED", []string{"A"}, PriorityPinnedClient, "", nil}, + {"Overflow Consumer with Priority Group", nc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, "", nil}, + {"Overflow Consumer with Priority Group, clustered", cnc, "TEST", "OVERFLOW", []string{"A"}, PriorityOverflow, "", nil}, + {"Pinned Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Pinned Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Overflow Consumer without Priority Group", nc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Overflow Consumer without Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", nil, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerPriorityPolicyWithoutGroup)}}, + {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityPinnedClient, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Pinned Consumer with empty Priority Group", nc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Pinned Consumer with empty Priority Group, clustered", cnc, "TEST", "PINNED_NO_GROUP", []string{""}, PriorityOverflow, "", &ApiError{ErrCode: uint16(JSConsumerEmptyGroupName)}}, + {"Consumer with `none` policy priority", nc, "TEST", "NONE", []string{"A"}, PriorityNone, "", nil}, + {"Push consumer with Priority Group", nc, "TEST", "PUSH_WITH_POLICY", []string{"A"}, PriorityOverflow, "subject", &ApiError{ErrCode: uint16(JSConsumerPushWithPriorityGroupErr)}}, } { t.Run(test.name, func(t *testing.T) { @@ -2203,6 +2205,7 @@ func TestJetStreamConsumerWithPriorityGroups(t *testing.T) { PriorityGroups: test.groups, PriorityPolicy: test.mode, AckPolicy: AckExplicit, + DeliverSubject: test.deliverSubject, PinnedTTL: 10 * time.Second, }, } @@ -9688,44 +9691,179 @@ func TestJetStreamConsumerStateAlwaysFromStore(t *testing.T) { } func TestJetStreamConsumerPullNoWaitBatchLargerThanPending(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() + test := func(t *testing.T, replicas int) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + }) + require_NoError(t, err) - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ - Durable: "C", - AckPolicy: nats.AckExplicitPolicy, - FilterSubject: "foo", - }) - require_NoError(t, err) + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C", + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: "foo", + Replicas: replicas, + }) + require_NoError(t, err) - req := JSApiConsumerGetNextRequest{Batch: 10, NoWait: true} + req := JSApiConsumerGetNextRequest{Batch: 10, NoWait: true} - for range 5 { - _, err := js.Publish("foo", []byte("OK")) + for range 5 { + _, err := js.Publish("foo", []byte("OK")) + require_NoError(t, err) + } + + sub := sendRequest(t, nc, "rply", req) + defer sub.Unsubscribe() + + // Should get all 5 messages. + for range 5 { + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + if len(msg.Data) == 0 && msg.Header != nil { + t.Fatalf("Expected data, got: %s", msg.Header.Get("Description")) + } + } + } + + t.Run("R1", func(t *testing.T) { test(t, 1) }) + t.Run("R3", func(t *testing.T) { test(t, 3) }) +} + +func TestJetStreamConsumerNotInactiveDuringAckWait(t *testing.T) { + test := func(t *testing.T, replicas int) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + AckPolicy: nats.AckExplicitPolicy, + Replicas: replicas, + InactiveThreshold: 500 * time.Millisecond, // Pull mode adds up to 1 second randomly. + AckWait: time.Minute, + }) + require_NoError(t, err) + + _, err = js.ConsumerInfo("TEST", "CONSUMER") require_NoError(t, err) + + sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", nats.BindStream("TEST")) + require_NoError(t, err) + defer sub.Drain() + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + + // AckWait is still active, so must not delete the consumer while waiting for an ack. + time.Sleep(1750 * time.Millisecond) + + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + require_NoError(t, msgs[0].AckSync()) + + // Not waiting on AckWait anymore, consumer is deleted after the inactivity threshold. + time.Sleep(1750 * time.Millisecond) + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_Error(t, err, nats.ErrConsumerNotFound) } - sub := sendRequest(t, nc, "rply", req) - defer sub.Unsubscribe() + t.Run("R1", func(t *testing.T) { test(t, 1) }) + t.Run("R3", func(t *testing.T) { test(t, 3) }) +} - // Should get all 5 messages. - // TODO(mvv): Currently bypassing replicating first, need to figure out - // how to send NoWait's request timeout after replication. - for range 5 { - msg, err := sub.NextMsg(time.Second) +func TestJetStreamConsumerNotInactiveDuringAckWaitBackoff(t *testing.T) { + test := func(t *testing.T, replicas int) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: replicas, + }) require_NoError(t, err) - if len(msg.Data) == 0 && msg.Header != nil { - t.Fatalf("Expected data, got: %s", msg.Header.Get("Description")) - } + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + AckPolicy: nats.AckExplicitPolicy, + Replicas: replicas, + InactiveThreshold: 500 * time.Millisecond, // Pull mode adds up to 1 second randomly. + BackOff: []time.Duration{ + 2 * time.Second, + 4 * time.Second, + }, + }) + require_NoError(t, err) + + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + + sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", nats.BindStream("TEST")) + require_NoError(t, err) + defer sub.Drain() + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + + // AckWait is still active, so must not delete the consumer while waiting for an ack. + time.Sleep(1750 * time.Millisecond) + + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + require_NoError(t, msgs[0].Nak()) + + msgs, err = sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + + // AckWait is still active, now based on backoff, so must not delete the consumer while waiting for an ack. + // We've confirmed can wait 2s AckWait + InactiveThreshold, now check we can also wait for the backoff. + time.Sleep(3750 * time.Millisecond) + + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_NoError(t, err) + require_NoError(t, msgs[0].AckSync()) + + // Not waiting on AckWait anymore, consumer is deleted after the inactivity threshold. + time.Sleep(1750 * time.Millisecond) + _, err = js.ConsumerInfo("TEST", "CONSUMER") + require_Error(t, err, nats.ErrConsumerNotFound) } + + t.Run("R1", func(t *testing.T) { test(t, 1) }) + t.Run("R3", func(t *testing.T) { test(t, 3) }) } diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 30ed884e0f2..162ca4f0273 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -197,6 +197,9 @@ const ( // JSConsumerPushMaxWaitingErr consumer in push mode can not set max waiting JSConsumerPushMaxWaitingErr ErrorIdentifier = 10080 + // JSConsumerPushWithPriorityGroupErr priority groups can not be used with push consumers + JSConsumerPushWithPriorityGroupErr ErrorIdentifier = 10178 + // JSConsumerReplacementWithDifferentNameErr consumer replacement durable config not the same JSConsumerReplacementWithDifferentNameErr ErrorIdentifier = 10106 @@ -570,6 +573,7 @@ var ( JSConsumerPullRequiresAckErr: {Code: 400, ErrCode: 10084, Description: "consumer in pull mode requires explicit ack policy on workqueue stream"}, JSConsumerPullWithRateLimitErr: {Code: 400, ErrCode: 10086, Description: "consumer in pull mode can not have rate limit set"}, JSConsumerPushMaxWaitingErr: {Code: 400, ErrCode: 10080, Description: "consumer in push mode can not set max waiting"}, + JSConsumerPushWithPriorityGroupErr: {Code: 400, ErrCode: 10178, Description: "priority groups can not be used with push consumers"}, JSConsumerReplacementWithDifferentNameErr: {Code: 400, ErrCode: 10106, Description: "consumer replacement durable config not the same"}, JSConsumerReplicasExceedsStream: {Code: 400, ErrCode: 10126, Description: "consumer config replica count exceeds parent stream"}, JSConsumerReplicasShouldMatchStream: {Code: 400, ErrCode: 10134, Description: "consumer config replicas must match interest retention stream's replicas"}, @@ -1397,6 +1401,16 @@ func NewJSConsumerPushMaxWaitingError(opts ...ErrorOption) *ApiError { return ApiErrors[JSConsumerPushMaxWaitingErr] } +// NewJSConsumerPushWithPriorityGroupError creates a new JSConsumerPushWithPriorityGroupErr error: "priority groups can not be used with push consumers" +func NewJSConsumerPushWithPriorityGroupError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSConsumerPushWithPriorityGroupErr] +} + // NewJSConsumerReplacementWithDifferentNameError creates a new JSConsumerReplacementWithDifferentNameErr error: "consumer replacement durable config not the same" func NewJSConsumerReplacementWithDifferentNameError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 496f4f30760..a72ee918ed0 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -43,6 +43,7 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" ) @@ -16094,7 +16095,8 @@ func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) { func TestJetStreamServerReencryption(t *testing.T) { storeDir := t.TempDir() - for i, algo := range []struct { + var i int + for _, algo := range []struct { from string to string }{ @@ -16103,40 +16105,42 @@ func TestJetStreamServerReencryption(t *testing.T) { {"chacha", "chacha"}, {"chacha", "aes"}, } { - t.Run(fmt.Sprintf("%s_to_%s", algo.from, algo.to), func(t *testing.T) { - streamName := fmt.Sprintf("TEST_%d", i) - subjectName := fmt.Sprintf("foo_%d", i) - expected := 30 - - checkStream := func(js nats.JetStreamContext) { - si, err := js.StreamInfo(streamName) - if err != nil { - t.Fatal(err) - } + for _, compression := range []StoreCompression{NoCompression, S2Compression} { + t.Run(fmt.Sprintf("%s_to_%s/%s", algo.from, algo.to, compression), func(t *testing.T) { + i++ + streamName := fmt.Sprintf("TEST_%d", i) + subjectName := fmt.Sprintf("foo_%d", i) + expected := 30 + + checkStream := func(js nats.JetStreamContext) { + si, err := js.StreamInfo(streamName) + if err != nil { + t.Fatal(err) + } - if si.State.Msgs != uint64(expected) { - t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs) - } + if si.State.Msgs != uint64(expected) { + t.Fatalf("Should be %d messages but got %d messages", expected, si.State.Msgs) + } - sub, err := js.PullSubscribe(subjectName, "") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + sub, err := js.PullSubscribe(subjectName, "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } - c := 0 - for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) { - m.AckSync() - c++ - } - if c != expected { - t.Fatalf("Should have read back %d messages but got %d messages", expected, c) + c := 0 + for _, m := range fetchMsgs(t, sub, expected, 5*time.Second) { + m.AckSync() + c++ + } + if c != expected { + t.Fatalf("Should have read back %d messages but got %d messages", expected, c) + } } - } - // First off, we start up using the original encryption key and algorithm. - // We'll create a stream and populate it with some messages. - t.Run("setup", func(t *testing.T) { - conf := createConfFile(t, []byte(fmt.Sprintf(` + // First off, we start up using the original encryption key and algorithm. + // We'll create a stream and populate it with some messages. + t.Run("setup", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` server_name: S22 listen: 127.0.0.1:-1 jetstream: { @@ -16146,34 +16150,37 @@ func TestJetStreamServerReencryption(t *testing.T) { } `, "firstencryptionkey", algo.from, storeDir))) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - cfg := &nats.StreamConfig{ - Name: streamName, - Subjects: []string{subjectName}, - } - if _, err := js.AddStream(cfg); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + cfg := &StreamConfig{ + Name: streamName, + Subjects: []string{subjectName}, + Storage: FileStorage, + Compression: compression, + } + if _, err := jsStreamCreate(t, nc, cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } - for i := 0; i < expected; i++ { - if _, err := js.Publish(subjectName, []byte("ENCRYPTED PAYLOAD!!")); err != nil { - t.Fatalf("Unexpected publish error: %v", err) + payload := strings.Repeat("A", 512*1024) + for i := 0; i < expected; i++ { + if _, err := js.Publish(subjectName, []byte(payload)); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } } - } - checkStream(js) - }) + checkStream(js) + }) - // Next up, we will restart the server, this time with both the new key - // and algorithm and also the old key. At startup, the server will detect - // the change in encryption key and/or algorithm and re-encrypt the stream. - t.Run("reencrypt", func(t *testing.T) { - conf := createConfFile(t, []byte(fmt.Sprintf(` + // Next up, we will restart the server, this time with both the new key + // and algorithm and also the old key. At startup, the server will detect + // the change in encryption key and/or algorithm and re-encrypt the stream. + t.Run("reencrypt", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` server_name: S22 listen: 127.0.0.1:-1 jetstream: { @@ -16184,20 +16191,20 @@ func TestJetStreamServerReencryption(t *testing.T) { } `, "secondencryptionkey", algo.to, "firstencryptionkey", storeDir))) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - checkStream(js) - }) + checkStream(js) + }) - // Finally, we'll restart the server using only the new key and algorithm. - // At this point everything should have been re-encrypted, so we should still - // be able to access the stream. - t.Run("restart", func(t *testing.T) { - conf := createConfFile(t, []byte(fmt.Sprintf(` + // Finally, we'll restart the server using only the new key and algorithm. + // At this point everything should have been re-encrypted, so we should still + // be able to access the stream. + t.Run("restart", func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(` server_name: S22 listen: 127.0.0.1:-1 jetstream: { @@ -16207,15 +16214,16 @@ func TestJetStreamServerReencryption(t *testing.T) { } `, "secondencryptionkey", algo.to, storeDir))) - s, _ := RunServerWithConfig(conf) - defer s.Shutdown() + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() - nc, js := jsClientConnect(t, s) - defer nc.Close() + nc, js := jsClientConnect(t, s) + defer nc.Close() - checkStream(js) + checkStream(js) + }) }) - }) + } } } @@ -20420,3 +20428,63 @@ func TestJetStreamMaxMsgsPerSubjectAndDeliverLastPerSubject(t *testing.T) { o.mu.RUnlock() require_Equal(t, sseq, resume+1) } + +func TestJetStreamKVNoSubjectDeleteMarkerOnPurgeMarker(t *testing.T) { + for _, storage := range []jetstream.StorageType{jetstream.FileStorage, jetstream.MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnectNewAPI(t, s) + defer nc.Close() + + ctx := context.Background() + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "bucket", + History: 1, + Storage: storage, + TTL: 2 * time.Second, + LimitMarkerTTL: time.Minute, + }) + require_NoError(t, err) + + stream, err := js.Stream(ctx, "KV_bucket") + require_NoError(t, err) + + // Purge such that the bucket TTL expires this message. + require_NoError(t, kv.Purge(ctx, "key")) + rsm, err := stream.GetMsg(ctx, 1) + require_NoError(t, err) + require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE") + + // The bucket TTL should have removed the message by now. + time.Sleep(2500 * time.Millisecond) + + // Confirm the purge marker is gone. + _, err = stream.GetMsg(ctx, 1) + require_Error(t, err, jetstream.ErrMsgNotFound) + require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE") + + // Confirm we don't get a redundant subject delete marker. + _, err = stream.GetMsg(ctx, 2) + require_Error(t, err, jetstream.ErrMsgNotFound) + + // Purge with a TTL so it expires this message. + require_NoError(t, kv.Purge(ctx, "key", jetstream.PurgeTTL(time.Second))) + rsm, err = stream.GetMsg(ctx, 2) + require_NoError(t, err) + require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE") + + // The purge TTL should have removed the message by now. + time.Sleep(1500 * time.Millisecond) + + // Confirm the purge marker is gone. + _, err = stream.GetMsg(ctx, 2) + require_Error(t, err, jetstream.ErrMsgNotFound) + + // Confirm we don't get a redundant subject delete marker. + _, err = stream.GetMsg(ctx, 3) + require_Error(t, err, jetstream.ErrMsgNotFound) + }) + } +} diff --git a/server/jwt.go b/server/jwt.go index e8da5213cc5..04d7dc60a3e 100644 --- a/server/jwt.go +++ b/server/jwt.go @@ -70,11 +70,20 @@ func wipeSlice(buf []byte) { func validateTrustedOperators(o *Options) error { if len(o.TrustedOperators) == 0 { // if we have no operator, default sentinel shouldn't be set - if o.DefaultSentinel != "" { + if o.DefaultSentinel != _EMPTY_ { return fmt.Errorf("default sentinel requires operators and accounts") } return nil } + if o.DefaultSentinel != _EMPTY_ { + juc, err := jwt.DecodeUserClaims(o.DefaultSentinel) + if err != nil { + return fmt.Errorf("default sentinel JWT not valid") + } + if !juc.BearerToken { + return fmt.Errorf("default sentinel must be a bearer token") + } + } if o.AccountResolver == nil { return fmt.Errorf("operators require an account resolver to be configured") } diff --git a/server/jwt_test.go b/server/jwt_test.go index 331e3117769..92e3450c33d 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -7155,7 +7155,7 @@ func TestDefaultSentinelUser(t *testing.T) { uPub, err := uKP.PublicKey() require_NoError(t, err) uc := jwt.NewUserClaims(uPub) - uc.BearerToken = true + uc.BearerToken = false uc.Name = "sentinel" sentinelToken, err := uc.Encode(aKP) require_NoError(t, err) @@ -7168,6 +7168,25 @@ func TestDefaultSentinelUser(t *testing.T) { default_sentinel: %s `, ojwt, sysPub, preloadConfig, sentinelToken))) + // test non-bearer sentinel is rejected + opts, err := ProcessConfigFile(conf) + require_NoError(t, err) + _, err = NewServer(opts) + require_Error(t, err, fmt.Errorf("default sentinel must be a bearer token")) + + // correct and start server + uc.BearerToken = true + sentinelToken, err = uc.Encode(aKP) + require_NoError(t, err) + conf = createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:4747 + operator: %s + system_account: %s + resolver: MEM + resolver_preload: %s + default_sentinel: %s +`, ojwt, sysPub, preloadConfig, sentinelToken))) + ns, _ = RunServerWithConfig(conf) defer ns.Shutdown() nc, err := nats.Connect(ns.ClientURL()) @@ -7347,3 +7366,60 @@ func TestJWTJetStreamClientsExcludedForMaxConnsUpdate(t *testing.T) { _, err = js.Publish("foo", nil) require_NoError(t, err) } + +func TestJWTClusterUserInfoContainsPermissions(t *testing.T) { + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + ` + opFrag := ` + operator: %s + system_account: %s + resolver: { type: MEM } + resolver_preload = { + %s : %s + %s : %s + } + ` + + _, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + + accKp, aExpPub := createKey(t) + accClaim := jwt.NewAccountClaims(aExpPub) + accClaim.DefaultPermissions.Sub = jwt.Permission{ + Deny: []string{"foo"}, + } + accJwt := encodeClaim(t, accClaim, aExpPub) + accCreds := newUser(t, accKp) + + template := tmpl + fmt.Sprintf(opFrag, ojwt, syspub, syspub, sysJwt, aExpPub, accJwt) + c := createJetStreamClusterWithTemplate(t, template, "R3S", 3) + defer c.shutdown() + + // Since it's a bit of a race whether the local server responds via the + // service import before a remote server does, we need to keep trying. + // In 1000 attempts it is quite easy to reproduce the problem. + test := func() { + nc, _ := jsClientConnect(t, c.randomServer(), nats.UserCredentials(accCreds)) + defer nc.Close() + + resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) + require_NoError(t, err) + + response := ServerAPIResponse{Data: &UserInfo{}} + require_NoError(t, json.Unmarshal(resp.Data, &response)) + + userInfo := response.Data.(*UserInfo) + require_NotNil(t, userInfo.Permissions) + } + for range 1000 { + test() + } +} diff --git a/server/memstore.go b/server/memstore.go index 28b25f97805..4355cd9bf0d 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1075,7 +1075,7 @@ func (ms *memStore) expireMsgs() { } if sdmEnabled { if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0 + sdm := last && isSubjectDeleteMarker(sm.hdr) ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -1105,7 +1105,7 @@ func (ms *memStore) expireMsgs() { if ttlSdm == nil { ttlSdm = make(map[string][]SDMBySubj, 1) } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0}) + ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) return false } } else { diff --git a/server/monitor.go b/server/monitor.go index 6a0f9c6ac8a..5c7bf7e4d3a 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1184,184 +1184,185 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { // Varz will output server information on the monitoring port at /varz. type Varz struct { - ID string `json:"server_id"` - Name string `json:"server_name"` - Version string `json:"version"` - Proto int `json:"proto"` - GitCommit string `json:"git_commit,omitempty"` - GoVersion string `json:"go"` - Host string `json:"host"` - Port int `json:"port"` - AuthRequired bool `json:"auth_required,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` - IP string `json:"ip,omitempty"` - ClientConnectURLs []string `json:"connect_urls,omitempty"` - WSConnectURLs []string `json:"ws_connect_urls,omitempty"` - MaxConn int `json:"max_connections"` - MaxSubs int `json:"max_subscriptions,omitempty"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPBasePath string `json:"http_base_path"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int32 `json:"max_control_line"` - MaxPayload int `json:"max_payload"` - MaxPending int64 `json:"max_pending"` - Cluster ClusterOptsVarz `json:"cluster,omitempty"` - Gateway GatewayOptsVarz `json:"gateway,omitempty"` - LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` - MQTT MQTTOptsVarz `json:"mqtt,omitempty"` - Websocket WebsocketOptsVarz `json:"websocket,omitempty"` - JetStream JetStreamVarz `json:"jetstream,omitempty"` - TLSTimeout float64 `json:"tls_timeout"` - WriteDeadline time.Duration `json:"write_deadline"` - Start time.Time `json:"start"` - Now time.Time `json:"now"` - Uptime string `json:"uptime"` - Mem int64 `json:"mem"` - Cores int `json:"cores"` - MaxProcs int `json:"gomaxprocs"` - MemLimit int64 `json:"gomemlimit,omitempty"` - CPU float64 `json:"cpu"` - Connections int `json:"connections"` - TotalConnections uint64 `json:"total_connections"` - Routes int `json:"routes"` - Remotes int `json:"remotes"` - Leafs int `json:"leafnodes"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - SlowConsumers int64 `json:"slow_consumers"` - Subscriptions uint32 `json:"subscriptions"` - HTTPReqStats map[string]uint64 `json:"http_req_stats"` - ConfigLoadTime time.Time `json:"config_load_time"` - ConfigDigest string `json:"config_digest"` - Tags jwt.TagList `json:"tags,omitempty"` - TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` - TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` - SystemAccount string `json:"system_account,omitempty"` - PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` - OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` - SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` + ID string `json:"server_id"` // ID is the unique server ID generated at start + Name string `json:"server_name"` // Name is the configured server name, equals ID when not set + Version string `json:"version"` // Version is the version of the running server + Proto int `json:"proto"` // Proto is the protocol version this server supports + GitCommit string `json:"git_commit,omitempty"` // GitCommit is the git repository commit hash that the build corresponds with + GoVersion string `json:"go"` // GoVersion is the version of Go used to build this binary + Host string `json:"host"` // Host is the hostname the server runs on + Port int `json:"port"` // Port is the port the server listens on for client connections + AuthRequired bool `json:"auth_required,omitempty"` // AuthRequired indicates if users are required to authenticate to join the server + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if connections must use TLS when connecting to this server + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full TLS verification will be performed + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if the OCSP protocol will be used to verify peers + IP string `json:"ip,omitempty"` // IP is the IP address the server listens on if set + ClientConnectURLs []string `json:"connect_urls,omitempty"` // ClientConnectURLs is the list of URLs NATS clients can use to connect to this server + WSConnectURLs []string `json:"ws_connect_urls,omitempty"` // WSConnectURLs is the list of URLs websocket clients can use to connect to this server + MaxConn int `json:"max_connections"` // MaxConn is the maximum amount of connections the server can accept + MaxSubs int `json:"max_subscriptions,omitempty"` // MaxSubs is the maximum amount of subscriptions the server can manage + PingInterval time.Duration `json:"ping_interval"` // PingInterval is the interval the server will send PING messages during periods of inactivity on a connection + MaxPingsOut int `json:"ping_max"` // MaxPingsOut is the number of unanswered PINGs after which the connection will be considered stale + HTTPHost string `json:"http_host"` // HTTPHost is the HTTP host monitoring connections are accepted on + HTTPPort int `json:"http_port"` // HTTPPort is the port monitoring connections are accpted on + HTTPBasePath string `json:"http_base_path"` // HTTPBasePath is the path prefix for access to monitor endpoints + HTTPSPort int `json:"https_port"` // HTTPSHost is the HTTPS host monitoring connections are accepted on` + AuthTimeout float64 `json:"auth_timeout"` // AuthTimeout is the amount of seconds connections have to complete authentication + MaxControlLine int32 `json:"max_control_line"` // MaxControlLine is the amount of bytes a signal control message may be + MaxPayload int `json:"max_payload"` // MaxPayload is the maximum amount of bytes a message may have as payload + MaxPending int64 `json:"max_pending"` // MaxPending is the maximum amount of unprocessed bytes a connection may have + Cluster ClusterOptsVarz `json:"cluster,omitempty"` // Cluster is the Cluster state + Gateway GatewayOptsVarz `json:"gateway,omitempty"` // Gateway is the Super Cluster state + LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` // LeafNode is the Leafnode state + MQTT MQTTOptsVarz `json:"mqtt,omitempty"` // MQTT is the MQTT state + Websocket WebsocketOptsVarz `json:"websocket,omitempty"` // Websocket is the Webscocket client state + JetStream JetStreamVarz `json:"jetstream,omitempty"` // JetStream is the JetStream state + TLSTimeout float64 `json:"tls_timeout"` // TLSTimeout is how long TLS operations have to complete + WriteDeadline time.Duration `json:"write_deadline"` // WriteDeadline is the maximum time writes to sockets have to complete + Start time.Time `json:"start"` // Start is time when the server was started + Now time.Time `json:"now"` // Now is the current time of the server + Uptime string `json:"uptime"` // Uptime is how long the server has been running + Mem int64 `json:"mem"` // Mem is the resident memory allocation + Cores int `json:"cores"` // Cores is the number of cores the process has access to + MaxProcs int `json:"gomaxprocs"` // MaxProcs is the configured GOMAXPROCS value + MemLimit int64 `json:"gomemlimit,omitempty"` // MemLimit is the configured GOMEMLIMIT value + CPU float64 `json:"cpu"` // CPU is the current total CPU usage + Connections int `json:"connections"` // Connections is the current connected connections + TotalConnections uint64 `json:"total_connections"` // TotalConnections is the total connections the server have ever handled + Routes int `json:"routes"` // Routes is the number of connected route servers + Remotes int `json:"remotes"` // Remotes is the configured route remote endpoints + Leafs int `json:"leafnodes"` // Leafs is the number connected leafnode clients + InMsgs int64 `json:"in_msgs"` // InMsgs is the number of messages this server received + OutMsgs int64 `json:"out_msgs"` // OutMsgs is the number of message this server sent + InBytes int64 `json:"in_bytes"` // InBytes is the number of bytes this server received + OutBytes int64 `json:"out_bytes"` // OutMsgs is the number of bytes this server sent + SlowConsumers int64 `json:"slow_consumers"` // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers + Subscriptions uint32 `json:"subscriptions"` // Subscriptions is the count of active subscriptions + HTTPReqStats map[string]uint64 `json:"http_req_stats"` // HTTPReqStats is the number of requests each HTTP endpoint received + ConfigLoadTime time.Time `json:"config_load_time"` // ConfigLoadTime is the time the configuration was loaded or reloaded + ConfigDigest string `json:"config_digest"` // ConfigDigest is a calculated hash of the current configuration + Tags jwt.TagList `json:"tags,omitempty"` // Tags are the tags assigned to the server in configuration + Metadata map[string]string `json:"metadata,omitempty"` // Metadata is the metadata assigned to the server in configuration + TrustedOperatorsJwt []string `json:"trusted_operators_jwt,omitempty"` // TrustedOperatorsJwt is the JWTs for all trusted operators + TrustedOperatorsClaim []*jwt.OperatorClaims `json:"trusted_operators_claim,omitempty"` // TrustedOperatorsClaim is the decoded claims for each trusted operator + SystemAccount string `json:"system_account,omitempty"` // SystemAccount is the name of the System account + PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` // PinnedAccountFail is how often user logon fails due to the issuer account not being pinned. + OCSPResponseCache *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` // OCSPResponseCache is the state of the OSCAP cache // OCSPResponseCache holds information about + SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` // SlowConsumersStats is statistics about all detected Slow Consumer } // JetStreamVarz contains basic runtime information about jetstream type JetStreamVarz struct { - Config *JetStreamConfig `json:"config,omitempty"` - Stats *JetStreamStats `json:"stats,omitempty"` - Meta *MetaClusterInfo `json:"meta,omitempty"` - Limits *JSLimitOpts `json:"limits,omitempty"` + Config *JetStreamConfig `json:"config,omitempty"` // Config is the active JetStream configuration + Stats *JetStreamStats `json:"stats,omitempty"` // Stats is the statistics for the JetStream server + Meta *MetaClusterInfo `json:"meta,omitempty"` // Meta is information about the JetStream metalayer + Limits *JSLimitOpts `json:"limits,omitempty"` // Limits are the configured JetStream limits } // ClusterOptsVarz contains monitoring cluster information type ClusterOptsVarz struct { - Name string `json:"name,omitempty"` - Host string `json:"addr,omitempty"` - Port int `json:"cluster_port,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - URLs []string `json:"urls,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - PoolSize int `json:"pool_size,omitempty"` + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"addr,omitempty"` // Host is the host the cluster listens on for connections + Port int `json:"cluster_port,omitempty"` // Port is the port the cluster listens on for connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + URLs []string `json:"urls,omitempty"` // URLs is the list of cluster URLs + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + PoolSize int `json:"pool_size,omitempty"` // PoolSize is the configured route connection pool size } // GatewayOptsVarz contains monitoring gateway information type GatewayOptsVarz struct { - Name string `json:"name,omitempty"` - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - Advertise string `json:"advertise,omitempty"` - ConnectRetries int `json:"connect_retries,omitempty"` - Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` - RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster + Name string `json:"name,omitempty"` // Name is the configured cluster name + Host string `json:"host,omitempty"` // Host is the host the gateway listens on for connections + Port int `json:"port,omitempty"` // Port is the post gateway connections listens on + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time cluster connections have to complete authentication + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + Advertise string `json:"advertise,omitempty"` // Advertise is the URL advertised to remote gateway clients + ConnectRetries int `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make + Gateways []RemoteGatewayOptsVarz `json:"gateways,omitempty"` // Gateways is state of configured gateway remotes + RejectUnknown bool `json:"reject_unknown,omitempty"` // RejectUnknown indicates if unknown cluster connections will be rejected } // RemoteGatewayOptsVarz contains monitoring remote gateway information type RemoteGatewayOptsVarz struct { - Name string `json:"name"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - URLs []string `json:"urls,omitempty"` + Name string `json:"name"` // Name is the name of the remote gateway + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + URLs []string `json:"urls,omitempty"` // URLs is the list of Gateway URLs } // LeafNodeOptsVarz contains monitoring leaf node information type LeafNodeOptsVarz struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSRequired bool `json:"tls_required,omitempty"` - TLSVerify bool `json:"tls_verify,omitempty"` - Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + Host string `json:"host,omitempty"` // Host is the host the server listens on + Port int `json:"port,omitempty"` // Port is the port the server listens on + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is the time Leafnode connections have to complete authentication + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSRequired bool `json:"tls_required,omitempty"` // TLSRequired indicates if TLS is required for connections + TLSVerify bool `json:"tls_verify,omitempty"` // TLSVerify indicates if full verification of TLS connections is performed + Remotes []RemoteLeafOptsVarz `json:"remotes,omitempty"` // Remotes is state of configured Leafnode remotes + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed } // DenyRules Contains lists of subjects not allowed to be imported/exported type DenyRules struct { - Exports []string `json:"exports,omitempty"` - Imports []string `json:"imports,omitempty"` + Exports []string `json:"exports,omitempty"` // Exports are denied exports + Imports []string `json:"imports,omitempty"` // Imports are denied imports } // RemoteLeafOptsVarz contains monitoring remote leaf node information type RemoteLeafOptsVarz struct { - LocalAccount string `json:"local_account,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - URLs []string `json:"urls,omitempty"` - Deny *DenyRules `json:"deny,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + LocalAccount string `json:"local_account,omitempty"` // LocalAccount is the local account this leaf is logged into + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + URLs []string `json:"urls,omitempty"` // URLs is the list of URLs for the remote Leafnode connection + Deny *DenyRules `json:"deny,omitempty"` // Deny is the configured import and exports that the Leafnode may not access + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done } // MQTTOptsVarz contains monitoring MQTT information type MQTTOptsVarz struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - NoAuthUser string `json:"no_auth_user,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - TLSMap bool `json:"tls_map,omitempty"` - TLSTimeout float64 `json:"tls_timeout,omitempty"` - TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` - JsDomain string `json:"js_domain,omitempty"` - AckWait time.Duration `json:"ack_wait,omitempty"` - MaxAckPending uint16 `json:"max_ack_pending,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + Host string `json:"host,omitempty"` // Host is the host the server listens on + Port int `json:"port,omitempty"` // Port is the port the server listens on + NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete + TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled + TLSTimeout float64 `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete + TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection + JsDomain string `json:"js_domain,omitempty"` // JsDomain is the JetStream domain used for MQTT state + AckWait time.Duration `json:"ack_wait,omitempty"` // AckWait is how long the internal JetStream state store will allow acks to complete + MaxAckPending uint16 `json:"max_ack_pending,omitempty"` // MaxAckPending is how many outstanding acks the internal JetStream state store will allow + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done } // WebsocketOptsVarz contains monitoring websocket information type WebsocketOptsVarz struct { - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - Advertise string `json:"advertise,omitempty"` - NoAuthUser string `json:"no_auth_user,omitempty"` - JWTCookie string `json:"jwt_cookie,omitempty"` - HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - NoTLS bool `json:"no_tls,omitempty"` - TLSMap bool `json:"tls_map,omitempty"` - TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` - SameOrigin bool `json:"same_origin,omitempty"` - AllowedOrigins []string `json:"allowed_origins,omitempty"` - Compression bool `json:"compression,omitempty"` - TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` + Host string `json:"host,omitempty"` // Host is the host the server listens on + Port int `json:"port,omitempty"` // Port is the port the server listens on + Advertise string `json:"advertise,omitempty"` // Advertise is the connection URL the server advertises + NoAuthUser string `json:"no_auth_user,omitempty"` // NoAuthUser is the user that will be used for unauthenticated connections + JWTCookie string `json:"jwt_cookie,omitempty"` // JWTCookie is the name of a cookie the server will read for the connection JWT + HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"` // HandshakeTimeout is how long the connection has to complete the websocket setup + AuthTimeout float64 `json:"auth_timeout,omitempty"` // AuthTimeout is how long authentication has to complete + NoTLS bool `json:"no_tls,omitempty"` // NoTLS indicates if TLS is disabled + TLSMap bool `json:"tls_map,omitempty"` // TLSMap indicates if TLS Mapping is enabled + TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` // TLSPinnedCerts is the list of certificates pinned to this connection + SameOrigin bool `json:"same_origin,omitempty"` // SameOrigin indicates if same origin connections are allowed + AllowedOrigins []string `json:"allowed_origins,omitempty"` // AllowedOrigins list of configured trusted origins + Compression bool `json:"compression,omitempty"` // Compression indicates if compression is supported + TLSOCSPPeerVerify bool `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done } // OCSPResponseCacheVarz contains OCSP response cache information type OCSPResponseCacheVarz struct { - Type string `json:"cache_type,omitempty"` - Hits int64 `json:"cache_hits,omitempty"` - Misses int64 `json:"cache_misses,omitempty"` - Responses int64 `json:"cached_responses,omitempty"` - Revokes int64 `json:"cached_revoked_responses,omitempty"` - Goods int64 `json:"cached_good_responses,omitempty"` - Unknowns int64 `json:"cached_unknown_responses,omitempty"` + Type string `json:"cache_type,omitempty"` // Type is the kind of cache being used + Hits int64 `json:"cache_hits,omitempty"` // Hits is how many times the cache was able to answer a request + Misses int64 `json:"cache_misses,omitempty"` // Misses is how many times the cache failed to answer a request + Responses int64 `json:"cached_responses,omitempty"` // Responses is how many responses are currently stored in the cache + Revokes int64 `json:"cached_revoked_responses,omitempty"` // Revokes is how many of the stored cache entries are revokes + Goods int64 `json:"cached_good_responses,omitempty"` // Goods is how many of the stored cache entries are good responses + Unknowns int64 `json:"cached_unknown_responses,omitempty"` // Unknowns is how many of the stored cache entries are unknown responses } // VarzOptions are the options passed to Varz(). @@ -1370,10 +1371,10 @@ type VarzOptions struct{} // SlowConsumersStats contains information about the slow consumers from different type of connections. type SlowConsumersStats struct { - Clients uint64 `json:"clients"` - Routes uint64 `json:"routes"` - Gateways uint64 `json:"gateways"` - Leafs uint64 `json:"leafs"` + Clients uint64 `json:"clients"` // Clients is how many Clients were slow consumers + Routes uint64 `json:"routes"` // Routes is how many Routes were slow consumers + Gateways uint64 `json:"gateways"` // Gateways is how many Gateways were slow consumers + Leafs uint64 `json:"leafs"` // Leafs is how many Leafnodes were slow consumers } func myUptime(d time.Duration) string { @@ -1431,6 +1432,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { a.last { padding-bottom: 16px } a.version { font-size: 14; font-weight: 400; width: 312px; text-align: right; margin-top: -2rem } a.version:hover { color: rgb(22 22 32) } + .endpoint { font-size: 12px; color: #999; font-family: monospace; display: none } + a:hover .endpoint { display: inline } @@ -1441,33 +1444,33 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
- General - JetStream - Connections - Accounts - Account Stats - Subscriptions - Routes - LeafNodes - Gateways - Raft Groups - Health Probe + General %s + JetStream %s + Connections %s + Accounts %s + Account Stats %s + Subscriptions %s + Routes %s + LeafNodes %s + Gateways %s + Raft Groups %s + Health Probe %s Help `, srcUrl, VERSION, - s.basePath(VarzPath), - s.basePath(JszPath), - s.basePath(ConnzPath), - s.basePath(AccountzPath), - s.basePath(AccountStatzPath), - s.basePath(SubszPath), - s.basePath(RoutezPath), - s.basePath(LeafzPath), - s.basePath(GatewayzPath), - s.basePath(RaftzPath), - s.basePath(HealthzPath), + s.basePath(VarzPath), VarzPath, + s.basePath(JszPath), JszPath, + s.basePath(ConnzPath), ConnzPath, + s.basePath(AccountzPath), AccountzPath, + s.basePath(AccountStatzPath), AccountStatzPath, + s.basePath(SubszPath), SubszPath, + s.basePath(RoutezPath), RoutezPath, + s.basePath(LeafzPath), LeafzPath, + s.basePath(GatewayzPath), GatewayzPath, + s.basePath(RaftzPath), RaftzPath, + s.basePath(HealthzPath), HealthzPath, ) } @@ -2245,6 +2248,7 @@ type LeafzOptions struct { // LeafInfo has detailed information on each remote leafnode connection. type LeafInfo struct { + ID uint64 `json:"id"` Name string `json:"name"` IsSpoke bool `json:"is_spoke"` Account string `json:"account"` @@ -2287,6 +2291,7 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) { for _, ln := range lconns { ln.mu.Lock() lni := &LeafInfo{ + ID: ln.cid, Name: ln.leaf.remoteServer, IsSpoke: ln.isSpokeLeafNode(), Account: ln.acc.Name, diff --git a/server/parser_fuzz_test.go b/server/parser_fuzz_test.go new file mode 100644 index 00000000000..4893fb55299 --- /dev/null +++ b/server/parser_fuzz_test.go @@ -0,0 +1,95 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "sync" + "testing" +) + +func dummyFuzzClient(kind int) *client { + var r *route + var gw *gateway + var lf *leaf + + switch kind { + case ROUTER: + r = &route{} + case GATEWAY: + gw = &gateway{outbound: false, connected: true, insim: make(map[string]*insie), outsim: &sync.Map{}} + case LEAF: + lf = &leaf{} + } + + return &client{ + srv: New(&defaultServerOptions), + kind: kind, + msubs: -1, + in: readCache{ + results: make(map[string]*SublistResult), + pacache: make(map[string]*perAccountCache), + }, + mpay: MAX_PAYLOAD_SIZE, + mcl: MAX_CONTROL_LINE_SIZE, + route: r, + gw: gw, + leaf: lf, + } +} + +// FuzzParser performs fuzz testing on the NATS protocol parser implementation. +// It tests the parser's ability to handle various NATS protocol messages, including +// partial (chunked) message delivery scenarios that may occur in real-world usage. +func FuzzParser(f *testing.F) { + msgs := []string{ + "PING\r\n", + "PONG\r\n", + "PUB foo 33333\r\n", + "HPUB foo INBOX.22 0 5\r\nHELLO\r", + "HMSG $foo foo 10 8\r\nXXXhello\r", + "MSG $foo foo 5\r\nhello\r", + "SUB foo 1\r\nSUB foo 2\r\n", + "UNSUB 1 5\r\n", + "RMSG $G foo.bar | baz 11\r\nhello world\r", + "CONNECT {\"verbose\":false,\"pedantic\":true,\"tls_required\":false}\r\n", + } + + clientKinds := []int{ + CLIENT, + ROUTER, + GATEWAY, + LEAF, + } + + for _, ck := range clientKinds { + for _, crp := range msgs { + f.Add(ck, crp) + } + } + + f.Fuzz(func(t *testing.T, kind int, orig string) { + c := dummyFuzzClient(kind) + + data := []byte(orig) + half := len(data) / 2 + + if err := c.parse(data[:half]); err != nil { + return + } + + if err := c.parse(data[half:]); err != nil { + return + } + }) +} diff --git a/server/raft.go b/server/raft.go index 084c9456347..d02eaf66910 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1184,7 +1184,7 @@ func (n *raft) InstallSnapshot(data []byte) error { return errNoSnapAvailable } - n.debug("Installing snapshot of %d bytes", len(data)) + n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied) return n.installSnapshot(&snapshot{ lastTerm: term, @@ -1314,8 +1314,9 @@ func (n *raft) setupLastSnapshot() { // Compact the WAL when we're done if needed. n.pindex = snap.lastIndex n.pterm = snap.lastTerm + // Explicitly only set commit, and not applied. + // Applied will move up when the snapshot is actually applied. n.commit = snap.lastIndex - n.applied = snap.lastIndex n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) @@ -1957,7 +1958,7 @@ func (n *raft) run() { n.apply.push(nil) runner: - for s.isRunning() { + for { switch n.State() { case Follower: n.runAsFollower() @@ -2691,7 +2692,7 @@ func (n *raft) loadFirstEntry() (ae *appendEntry, err error) { func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64]) { n.RLock() s, reply := n.s, n.areply - peer, subj, term, last := ar.peer, ar.reply, n.term, n.pindex + peer, subj, term, pterm, last := ar.peer, ar.reply, n.term, n.pterm, n.pindex n.RUnlock() defer s.grWG.Done() @@ -2713,7 +2714,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64 indexUpdatesQ.unregister() }() - n.debug("Running catchup for %q", peer) + n.debug("Running catchup for %q [%d:%d] to [%d:%d]", peer, ar.term, ar.index, pterm, last) const maxOutstanding = 2 * 1024 * 1024 // 2MB for now. next, total, om := uint64(0), 0, make(map[uint64]int) @@ -3515,29 +3516,39 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { - n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) + n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse var success bool if ae.pindex < n.commit { // If we have already committed this entry, just mark success. success = true + n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit) } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { success = true + n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit) } else if ae.pindex == n.pindex { // Check if only our terms do not match here. // Make sure pterms match and we take on the leader's. // This prevents constant spinning. n.truncateWAL(ae.pterm, ae.pindex) - } else if ae.pindex == n.applied { - // Entry can't be found, this is normal because we have a snapshot at this index. - // Truncate back to where we've created the snapshot. - n.truncateWAL(ae.pterm, ae.pindex) } else { - n.resetWAL() + snap, err := n.loadLastSnapshot() + if err == nil && snap.lastIndex == ae.pindex && snap.lastTerm == ae.pterm { + // Entry can't be found, this is normal because we have a snapshot at this index. + // Truncate back to where we've created the snapshot. + n.truncateWAL(snap.lastTerm, snap.lastIndex) + // Only continue if truncation was successful, and we ended up such that we can safely continue. + if ae.pterm == n.pterm && ae.pindex == n.pindex { + goto CONTINUE + } + } else { + // Otherwise, something has gone very wrong and we need to reset. + n.resetWAL() + } } } else if eae.term == ae.pterm { // If terms match we can delete all entries past this one, and then continue storing the current entry. @@ -3594,12 +3605,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.pterm = ae.pterm n.commit = ae.pindex - if _, err := n.wal.Compact(n.pindex + 1); err != nil { - n.setWriteErrLocked(err) - n.Unlock() - return - } - snap := &snapshot{ lastTerm: n.pterm, lastIndex: n.pindex, @@ -3620,7 +3625,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Setup our state for catching up. - n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) + n.debug("AppendEntry did not match [%d:%d] with [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) inbox := n.createCatchup(ae) ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) n.Unlock() diff --git a/server/raft_test.go b/server/raft_test.go index 939ddd64b80..b65d1bcb915 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -2739,6 +2739,166 @@ func TestNRGSnapshotCatchup(t *testing.T) { t.Run("with-restart", func(t *testing.T) { test(t, true) }) } +func TestNRGSnapshotRecovery(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries}) + + // Store one entry. + n.processAppendEntry(aeMsg, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 1) + require_Equal(t, n.applied, 0) + + // Apply it. + n.Applied(1) + require_Equal(t, n.applied, 1) + + // Install the snapshot. + require_NoError(t, n.InstallSnapshot(nil)) + + // Restoring the snapshot should not up applied, because the apply queue is async. + n.pindex, n.commit, n.applied = 0, 0, 0 + n.setupLastSnapshot() + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 1) + require_Equal(t, n.applied, 0) +} + +func TestNRGKeepRunningOnServerShutdown(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + n.RLock() + s := n.s + wal := n.wal.(*memStore) + n.RUnlock() + + n.wg.Add(1) + s.startGoRoutine(n.run, nil) + + s.running.Store(false) + time.Sleep(time.Second) + + wal.mu.RLock() + msgs := wal.msgs + wal.mu.RUnlock() + require_NotNil(t, msgs) + + n.Stop() + n.WaitForStop() + + wal.mu.RLock() + msgs = wal.msgs + wal.mu.RUnlock() + require_True(t, msgs == nil) +} + +func TestNRGReplayOnSnapshotSameTerm(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + + // Process the first append entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.pindex, 1) + + // Commit and apply. + require_NoError(t, n.applyCommit(1)) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_Equal(t, n.applied, 1) + + // Install snapshot. + require_NoError(t, n.InstallSnapshot(nil)) + snap, err := n.loadLastSnapshot() + require_NoError(t, err) + require_Equal(t, snap.lastIndex, 1) + + // Process other messages. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) + + // Replay the append entry that matches our snapshot. + // This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup. + // Should be recognized as a replay with the same term, marked as success, and not truncate. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 3) +} + +func TestNRGReplayOnSnapshotDifferentTerm(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries, lterm: 2}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries, lterm: 2}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 2, pindex: 2, entries: entries, lterm: 2}) + + // Process the first append entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.pindex, 1) + + // Commit and apply. + require_NoError(t, n.applyCommit(1)) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_Equal(t, n.applied, 1) + + // Install snapshot. + require_NoError(t, n.InstallSnapshot(nil)) + snap, err := n.loadLastSnapshot() + require_NoError(t, err) + require_Equal(t, snap.lastIndex, 1) + + // Reset applied to simulate having received the snapshot from + // another leader, and we didn't apply yet since it's async. + n.applied = 0 + + // Process other messages. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) + + // Replay the append entry that matches our snapshot. + // This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup. + // Should be recognized as truncating back to the installed snapshot, not reset the WAL fully. + // Since all is aligned after truncation, should also be able to apply the entry. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + + // Should now also be able to apply the third entry. + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: diff --git a/server/reload_test.go b/server/reload_test.go index 99bf4991dc8..1b30cdc2d02 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -598,22 +598,53 @@ func TestConfigReloadRotateTLSMultiCert(t *testing.T) { } func TestConfigReloadDefaultSentinel(t *testing.T) { - server, opts, config := runReloadServerWithConfig(t, "./configs/reload/defaultsentinel_1.conf") - defer server.Shutdown() + var err error + preload := make(map[string]string) - if opts.DefaultSentinel != "one" { - t.Fatalf("Expected default sentinel to be 'one', got %s", opts.DefaultSentinel) - } + _, sysPub, sysAC := NewJwtAccountClaim("SYS") + preload[sysPub], err = sysAC.Encode(oKp) + require_NoError(t, err) + + aKP, aPub, aAC := NewJwtAccountClaim("A") + preload[aPub], err = aAC.Encode(oKp) + require_NoError(t, err) + + preloadConfig, err := json.MarshalIndent(preload, "", " ") + require_NoError(t, err) - changeCurrentConfigContent(t, config, "./configs/reload/defaultsentinel_2.conf") + uKP, err := nkeys.CreateUser() + require_NoError(t, err) + uPub, err := uKP.PublicKey() + require_NoError(t, err) + uc := jwt.NewUserClaims(uPub) + uc.BearerToken = true + uc.Name = "sentinel" + sentinelToken, err := uc.Encode(aKP) + require_NoError(t, err) + content := func() []byte { + return []byte(fmt.Sprintf(` + listen: 127.0.0.1:4747 + operator: %s + system_account: %s + resolver: MEM + resolver_preload: %s + default_sentinel: %s +`, ojwt, sysPub, preloadConfig, sentinelToken)) + } + + server, opts, config := runReloadServerWithContent(t, content()) + defer server.Shutdown() + require_Equal(t, opts.DefaultSentinel, sentinelToken) + + uc.Name = "sentinel-updated" + sentinelToken, err = uc.Encode(aKP) + require_NoError(t, err) + changeCurrentConfigContentWithNewContent(t, config, content()) if err := server.Reload(); err != nil { t.Fatalf("Error reloading config: %v", err) } - opts = server.getOpts() - if opts.DefaultSentinel != "two" { - t.Fatalf("Expected default sentinel to be 'two', got %s", opts.DefaultSentinel) - } + require_Equal(t, opts.DefaultSentinel, sentinelToken) } // Ensure Reload supports single user authentication config changes. Test this diff --git a/server/sdm.go b/server/sdm.go index 7431479580b..88a1be4e494 100644 --- a/server/sdm.go +++ b/server/sdm.go @@ -13,7 +13,10 @@ package server -import "time" +import ( + "bytes" + "time" +) // SDMMeta holds pending/proposed data for subject delete markers or message removals. type SDMMeta struct { @@ -40,6 +43,12 @@ func newSDMMeta() *SDMMeta { } } +// isSubjectDeleteMarker returns whether the headers indicate this message is a subject delete marker. +// Either it's a usual marker with JSMarkerReason, or it's a KV Purge marker as the KVOperation. +func isSubjectDeleteMarker(hdr []byte) bool { + return len(sliceHeader(JSMarkerReason, hdr)) == 0 && !bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge) +} + // empty clears all data. func (sdm *SDMMeta) empty() { if sdm == nil { diff --git a/server/server.go b/server/server.go index 671a16d092e..cf539630fbc 100644 --- a/server/server.go +++ b/server/server.go @@ -2704,7 +2704,7 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // Setup state that can enable shutdown s.mu.Lock() hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) - l, e := natsListen("tcp", hp) + l, e := s.getServerListener(hp) s.listenerErr = e if e != nil { s.mu.Unlock() @@ -2760,6 +2760,18 @@ func (s *Server) AcceptLoop(clr chan struct{}) { clr = nil } +// getServerListener returns a network listener for the given host-port address. +// If the Server already has an active listener (s.listener), it returns that listener +// along with any previous error (s.listenerErr). Otherwise, it creates and returns +// a new TCP listener on the specified address using natsListen. +func (s *Server) getServerListener(hp string) (net.Listener, error) { + if s.listener != nil { + return s.listener, s.listenerErr + } + + return natsListen("tcp", hp) +} + // InProcessConn returns an in-process connection to the server, // avoiding the need to use a TCP listener for local connectivity // within the same process. This can be used regardless of the diff --git a/server/server_fuzz_test.go b/server/server_fuzz_test.go new file mode 100644 index 00000000000..abd3c6fdd67 --- /dev/null +++ b/server/server_fuzz_test.go @@ -0,0 +1,410 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "bufio" + "crypto/tls" + "fmt" + "io" + "math" + "net" + "testing" + "time" +) + +const ( + tlsHandshake byte = 22 + tlsClientHello byte = 1 +) + +type ClientHelloInjector struct { + sock io.ReadWriteCloser + tlsVersion uint16 + buf []byte +} + +func NewClientHelloInjector(s io.ReadWriteCloser, tlsVer uint16, b []byte) *ClientHelloInjector { + return &ClientHelloInjector{ + sock: s, + tlsVersion: tlsVer, + buf: b, + } +} + +func (i *ClientHelloInjector) inject(b []byte) []byte { + if !(b != nil && b[0] == tlsHandshake) { + return b + } + + hsLen := (uint16(b[3]) << 8) + uint16(b[4]) + + if !(hsLen > 0 && b[5] == tlsClientHello) { + return b + } + + // fuzz tls version in client hello + b[9] = uint8(i.tlsVersion >> 8) + b[10] = uint8(i.tlsVersion & 0xFF) + + // Go to begin of random opaque + offset := 11 + + randomOpaque := b[offset : offset+32] + + copy(randomOpaque, i.buf) + + offset += 32 + + sessionIDLen := b[offset] + + // Skip session id len + opaque + offset += 1 + int(sessionIDLen) + + cypherSuiteLen := (uint16(b[offset]) << 8) + uint16(b[offset+1]) + + // Skip cypherSuiteLen + offset += 2 + + cupherSuites := b[offset : offset+int(cypherSuiteLen)] + + // Leave unchanged if i.cypherSuites empty + copy(cupherSuites, i.buf) + + // Skip cypherSuites + offset += int(cypherSuiteLen) + + // Skip CompressionMethod + offset += 2 + + extensionsLen := (uint16(b[offset]) << 8) + uint16(b[offset+1]) + + // Skip extensions length + offset += 2 + + // Extensions slice. Stub for future use + _ = b[offset : offset+int(extensionsLen)] + + return b +} + +func (i *ClientHelloInjector) Write(b []byte) (int, error) { + return i.sock.Write(i.inject(b)) +} + +func (i *ClientHelloInjector) Read(b []byte) (int, error) { + return i.sock.Read(b) +} + +func (i *ClientHelloInjector) Close() error { + return i.sock.Close() +} + +type FakeSocket struct { + sockName string + buf []byte + data chan []byte + done chan struct{} +} + +func NewFakeSocket(name string, capacity int) *FakeSocket { + return &FakeSocket{ + sockName: name, + data: make(chan []byte, capacity), + done: make(chan struct{}), + } +} + +func (s *FakeSocket) Write(b []byte) (int, error) { + select { + case s.data <- b: + return len(b), nil + case <-s.done: + return 0, net.ErrClosed + } +} + +func (s *FakeSocket) readChunk(b []byte) (int, error) { + n := copy(b, s.buf) + s.buf = s.buf[n:] + return n, nil +} + +func (s *FakeSocket) Read(b []byte) (int, error) { + if len(s.buf) > 0 { + return s.readChunk(b) + } + + select { + case buf, ok := <-s.data: + if !ok { + return 0, nil + } + s.buf = buf + return s.readChunk(b) + case <-s.done: + return 0, nil + } +} + +func (s *FakeSocket) Close() error { + close(s.done) + return nil +} + +type FakeConn struct { + local io.ReadWriteCloser + remote io.ReadWriteCloser +} + +func NewFakeConn(loc io.ReadWriteCloser, rem io.ReadWriteCloser) *FakeConn { + return &FakeConn{ + local: loc, + remote: rem, + } +} + +func (c *FakeConn) Read(b []byte) (int, error) { + return c.local.Read(b) +} + +func (c *FakeConn) Write(b []byte) (int, error) { + return c.remote.Write(b) +} + +func (c *FakeConn) Close() error { + return c.local.Close() +} + +func (c *FakeConn) LocalAddr() net.Addr { + return &net.TCPAddr{IP: net.IP{127, 0, 0, 1}, Port: 4222, Zone: ""} +} + +func (c *FakeConn) RemoteAddr() net.Addr { + return &net.TCPAddr{IP: net.IP{127, 0, 0, 1}, Port: 4222, Zone: ""} +} + +func (c *FakeConn) SetDeadline(t time.Time) error { + return nil +} + +func (c *FakeConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *FakeConn) SetWriteDeadline(t time.Time) error { + return nil +} + +type FakeListener struct { + ch chan *FakeConn + acceptErr error +} + +func NewFakeListener() *FakeListener { + return &FakeListener{ + ch: make(chan *FakeConn), + acceptErr: nil, + } +} + +func (ln *FakeListener) Accept() (c net.Conn, err error) { + return <-ln.ch, ln.acceptErr +} + +func (ln *FakeListener) Close() error { + ln.acceptErr = io.EOF + close(ln.ch) + return nil +} + +func (ln *FakeListener) Addr() net.Addr { + return &net.TCPAddr{IP: net.IP{127, 0, 0, 1}, Port: 4222, Zone: ""} +} + +func getTlsVersion(useTls13 bool) uint16 { + if useTls13 { + return tls.VersionTLS13 + } + + return tls.VersionTLS12 +} + +func corruptCert(crt []byte, i uint16) []byte { + crt[int(i)%len(crt)] ^= 0xFF + return crt +} + +func runServerWithListener(ln net.Listener, opts *Options) *Server { + if opts == nil { + opts = DefaultOptions() + } + s, err := NewServer(opts) + if err != nil || s == nil { + panic(fmt.Sprintf("No NATS Server object returned: %v", err)) + } + + if !opts.NoLog { + s.ConfigureLogger() + } + + s.listener = ln + s.listenerErr = nil + + // Run server in Go routine. + s.Start() + + // Wait for accept loop(s) to be started + if err := s.readyForConnections(10 * time.Second); err != nil { + panic(err) + } + return s +} + +type MathRandReader byte + +func (m MathRandReader) Read(buf []byte) (int, error) { + for i := range buf { + buf[i] = byte(m) + } + return len(buf), nil +} + +// FuzzServerTLS performs fuzz testing of the NATS server's TLS handshake implementation. +// It verifies the server's ability to handle various TLS connection scenarios, including: +// - Different TLS versions (1.2 and 1.3) +// - Malformed/mutated client certificates +// - Corrupted TLS handshake data +// - Edge cases in the TLS negotiation process +// +// Test Setup: +// - Configures a server with mutual TLS authentication using test certificates +// - Creates a client with configurable TLS parameters +// - Uses fake network connections to inject test cases +// +// Fuzzing Parameters: +// - useTls13: Boolean flag to test TLS 1.3 (true) or TLS 1.2 (false) +// - tlsVer: TLS version number to use in ClientHello +// - buf: Additional bytes to inject into ClientHello message +// - corruptCertOffset: Position to corrupt in client certificate (MaxUint16 = no corruption) +// +// Expectations +// The server should either: +// - Successfully complete the TLS handshake and protocol exchange, or +// - Cleanly reject invalid connections without crashing +func FuzzServerTLS(f *testing.F) { + srvTc := &TLSConfigOpts{ + CertFile: "../test/configs/certs/tlsauth/server.pem", + KeyFile: "../test/configs/certs/tlsauth/server-key.pem", + CaFile: "../test/configs/certs/tlsauth/ca.pem", + Insecure: false, + Verify: true, + } + + srvTlsCfg, err := GenTLSConfig(srvTc) + if err != nil { + f.Fatalf("Error generating server tls config: %v", err) + } + + opts := &Options{ + Host: "127.0.0.1", + Port: 4222, + NoLog: true, + NoSigs: true, + Debug: true, + Trace: true, + TLSHandshakeFirst: true, + AllowNonTLS: false, + JetStream: false, + TLSConfig: srvTlsCfg, + CheckConfig: false, + } + + clientCerts, err := tls.LoadX509KeyPair("../test/configs/certs/tlsauth/client.pem", "../test/configs/certs/tlsauth/client-key.pem") + if err != nil { + f.Fatalf("client1 certificate load error: %s", err) + } + + clientTlsCfg := &tls.Config{ + InsecureSkipVerify: true, + Rand: MathRandReader(0), + Time: func() time.Time { return time.Date(2025, 1, 1, 1, 1, 1, 1, nil) }, + } + + tlsVer := uint16(0x0303) + + corpuses := []struct { + useTls13 bool + clientHelloTlsVer uint16 + buf []byte + corruptCertOffset uint16 + }{ + {useTls13: false, clientHelloTlsVer: tlsVer, buf: []byte{}, corruptCertOffset: math.MaxUint16}, + {useTls13: true, clientHelloTlsVer: tlsVer, buf: []byte{}, corruptCertOffset: math.MaxUint16}, + } + + for _, crp := range corpuses { + f.Add(crp.useTls13, crp.clientHelloTlsVer, crp.buf, crp.corruptCertOffset) + } + + f.Fuzz(func(t *testing.T, useTls13 bool, tlsVer uint16, buf []byte, corruptCertOffset uint16) { + ln := NewFakeListener() + s := runServerWithListener(ln, opts.Clone()) + defer s.Shutdown() + + clientSocket := NewFakeSocket("CLIENT", 8) + serverSocket := NewClientHelloInjector(NewFakeSocket("SERVER", 8), tlsVer, buf) + + clientConn := NewFakeConn(clientSocket, serverSocket) + serverConn := NewFakeConn(serverSocket, clientSocket) + + // Connect to server + ln.ch <- serverConn + + tlsVersion := getTlsVersion(useTls13) + + tlsCfg := clientTlsCfg.Clone() + tlsCfg.GetClientCertificate = func(info *tls.CertificateRequestInfo) (*tls.Certificate, error) { + if corruptCertOffset == math.MaxUint16 { + t.Log("Leave certificate unchanged") + return &clientCerts, nil + } + + origCert := clientCerts.Certificate[0] + newCert := make([]byte, len(origCert)) + copy(newCert, origCert) + + newTlsCerts := clientCerts + newTlsCerts.Certificate[0] = corruptCert(newCert, corruptCertOffset) + + return &newTlsCerts, nil + } + tlsCfg.MaxVersion = tlsVersion + tlsCfg.MinVersion = tlsVersion + + tlsClientConn := tls.Client(clientConn, tlsCfg) + defer tlsClientConn.Close() + + if err := tlsClientConn.Handshake(); err != nil { + t.Logf("Handshake error: %v", err) + return + } + + br := bufio.NewReaderSize(tlsClientConn, 128) + if _, err := br.ReadString('\n'); err != nil { + t.Logf("Unexpected error reading INFO message: %v", err) + return + } + }) +} diff --git a/server/stream.go b/server/stream.go index cf70ad9142c..b1e4ea1998b 100644 --- a/server/stream.go +++ b/server/stream.go @@ -432,6 +432,12 @@ const ( JSMarkerReason = "Nats-Marker-Reason" ) +// Headers for published KV messages. +var ( + KVOperation = "KV-Operation" + KVOperationValuePurge = []byte("PURGE") +) + // Headers for republished messages and direct gets. const ( JSStream = "Nats-Stream" diff --git a/server/subject_fuzz_test.go b/server/subject_fuzz_test.go new file mode 100644 index 00000000000..bbd6628d9bb --- /dev/null +++ b/server/subject_fuzz_test.go @@ -0,0 +1,71 @@ +// Copyright 2016-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import "testing" + +// FuzzSubjectsCollide performs fuzz testing on the NATS subject collision detection logic. +// It verifies the behavior of the SubjectsCollide function which determines if two NATS +// subjects/subscriptions could potentially overlap in the NATS pub-sub system. +func FuzzSubjectsCollide(f *testing.F) { + corpuses := []struct { + s1 string + s2 string + }{ + // SubjectsCollide true + {s1: "", s2: ""}, + {s1: "a", s2: "a"}, + {s1: "a.b.c", s2: "a.b.c"}, + {s1: "$JS.b.c", s2: "$JS.b.c"}, + {s1: "a.b.c", s2: "a.*.c"}, + {s1: "a.b.*", s2: "a.*.c"}, + {s1: "aaa.bbb.ccc", s2: "aaa.bbb.ccc"}, + {s1: "aaa.*.ccc", s2: "*.bbb.ccc"}, + {s1: "*", s2: "*"}, + {s1: "**", s2: "*"}, + {s1: "", s2: ">"}, + {s1: ">", s2: ">"}, + {s1: ">>", s2: ">"}, + {s1: "a", s2: ">"}, + {s1: "a.b.c", s2: ">"}, + {s1: "a.b.c.>", s2: "a.b.>"}, + {s1: "a.b.c.d.*", s2: "a.b.c.*.e"}, + {s1: "a.*.*.d.>", s2: "a.bbb.ccc.*.e"}, + + // SubjectsCollide false + {s1: "a", s2: ""}, + {s1: "a.b", s2: "b.a"}, + {s1: "a.bbbbb.*.d", s2: "a.b.>"}, + {s1: "a.b", s2: "a.b.c"}, + {s1: "a.b.c", s2: "a.b"}, + {s1: "a.b", s2: ""}, + {s1: "a.*.*.d.e.>", s2: "a.bbb.ccc.*.e"}, + } + + for _, crp := range corpuses { + f.Add(crp.s1, crp.s2) + } + + f.Fuzz(func(t *testing.T, s1, s2 string) { + if !IsValidSubject(s1) { + return + } + + if !IsValidSubject(s2) { + return + } + + SubjectsCollide(s1, s2) + }) +} diff --git a/server/sublist.go b/server/sublist.go index 54e93239645..078445c9888 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1433,6 +1433,12 @@ func tokenizeSubjectIntoSlice(tts []string, subject string) []string { return tts } +// SubjectMatchesFilter returns true if the subject matches the provided +// filter or false otherwise. +func SubjectMatchesFilter(subject, filter string) bool { + return subjectIsSubsetMatch(subject, filter) +} + // Calls into the function isSubsetMatch() func subjectIsSubsetMatch(subject, test string) bool { tsa := [32]string{} diff --git a/server/thw/thw.go b/server/thw/thw.go index bc2b271e660..a74265ef013 100644 --- a/server/thw/thw.go +++ b/server/thw/thw.go @@ -152,51 +152,49 @@ func (hw *HashWheel) Update(seq uint64, oldExpires int64, newExpires int64) erro // ExpireTasks processes all expired tasks using a callback, but only expires a task if the callback returns true. func (hw *HashWheel) ExpireTasks(callback func(seq uint64, expires int64) bool) { now := time.Now().UnixNano() + hw.expireTasks(now, callback) +} +func (hw *HashWheel) expireTasks(ts int64, callback func(seq uint64, expires int64) bool) { // Quick return if nothing is expired. - if hw.lowest > now { + if hw.lowest > ts { return } - // Start from the slot containing the lowest expiration. - startPos, exitPos := hw.getPosition(hw.lowest), hw.getPosition(now+tickDuration) - var updateLowest bool - - for offset := int64(0); ; offset++ { - pos := (startPos + offset) & wheelMask - if pos == exitPos { - if updateLowest { - hw.updateLowestExpires() + globalLowest := int64(math.MaxInt64) + for pos, s := range hw.wheel { + // Skip s if nothing to expire. + if s == nil || s.lowest > ts { + if s != nil && s.lowest < globalLowest { + globalLowest = s.lowest } - return - } - // Grab our slot. - slot := hw.wheel[pos] - if slot == nil || slot.lowest > now { continue } // Track new lowest while processing expirations - newLowest := int64(math.MaxInt64) - for seq, expires := range slot.entries { - if expires <= now && callback(seq, expires) { - delete(slot.entries, seq) + slotLowest := int64(math.MaxInt64) + for seq, expires := range s.entries { + if expires <= ts && callback(seq, expires) { + delete(s.entries, seq) hw.count-- - updateLowest = true continue } - if expires < newLowest { - newLowest = expires + if expires < slotLowest { + slotLowest = expires } } // Nil out if we are empty. - if len(slot.entries) == 0 { + if len(s.entries) == 0 { hw.wheel[pos] = nil } else { - slot.lowest = newLowest + s.lowest = slotLowest + if slotLowest < globalLowest { + globalLowest = slotLowest + } } } + hw.lowest = globalLowest } // GetNextExpiration returns the earliest expiration time before the given time. diff --git a/server/thw/thw_test.go b/server/thw/thw_test.go index 06fe9924bae..5c0a2b18cc0 100644 --- a/server/thw/thw_test.go +++ b/server/thw/thw_test.go @@ -140,6 +140,34 @@ func TestHashWheelManualExpiration(t *testing.T) { require_Equal(t, hw.count, 0) } +func TestHashWheelExpirationLargerThanWheel(t *testing.T) { + hw := NewHashWheel() + + // Add sequences such that they can be expired immediately. + seqs := map[uint64]int64{ + 1: 0, + 2: int64(time.Second), + } + for seq, expires := range seqs { + require_NoError(t, hw.Add(seq, expires)) + } + require_Equal(t, hw.count, 2) + + // Pick a timestamp such that the expiration needs to wrap around the whole wheel. + now := int64(time.Second) * wheelMask + + // Process expired tasks. + expired := make(map[uint64]bool) + hw.expireTasks(now, func(seq uint64, expires int64) bool { + expired[seq] = true + return true + }) + + // Verify both sequences are expired. + require_Equal(t, len(expired), 2) + require_Equal(t, hw.count, 0) +} + func TestHashWheelNextExpiration(t *testing.T) { hw := NewHashWheel() now := time.Now() diff --git a/test/gateway_test.go b/test/gateway_test.go index dc37d16d536..de9cd48c0fe 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -621,6 +621,7 @@ func TestGatewayTLSMixedIPAndDNS(t *testing.T) { ca_file: "./configs/certs/ca.pem" timeout: 2 } + connect_retries: 3 } cluster { listen: "127.0.0.1:-1" @@ -640,6 +641,7 @@ func TestGatewayTLSMixedIPAndDNS(t *testing.T) { ca_file: "./configs/certs/ca.pem" timeout: 2 } + connect_retries: 3 } cluster { listen: "127.0.0.1:-1"