Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/nats-io/nats-server/v2
go 1.21.0

require (
github.com/goccy/go-json v0.10.3
github.com/klauspost/compress v1.17.11
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
Expand Down
4 changes: 2 additions & 2 deletions server/accounts_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2023 The NATS Authors
// Copyright 2018-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -15,7 +15,6 @@ package server

import (
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"strconv"
Expand All @@ -25,6 +24,7 @@ import (
"testing"
"time"

"github.com/goccy/go-json"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
Expand Down
4 changes: 2 additions & 2 deletions server/auth_callout_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -16,7 +16,6 @@ package server
import (
"bytes"
"crypto/x509"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
Expand All @@ -29,6 +28,7 @@ import (
"testing"
"time"

"github.com/goccy/go-json"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
Expand Down
2 changes: 1 addition & 1 deletion server/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package server

import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
Expand All @@ -25,6 +24,7 @@ import (
"testing"
"time"

"github.com/goccy/go-json"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
)
Expand Down
2 changes: 1 addition & 1 deletion server/certidp/certidp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
"crypto/sha256"
"crypto/x509"
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"

"github.com/goccy/go-json"
"golang.org/x/crypto/ocsp"
)

Expand Down
22 changes: 18 additions & 4 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -33,6 +32,7 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/klauspost/compress/s2"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/internal/fastrand"
Expand Down Expand Up @@ -4160,8 +4160,9 @@ func getHeader(key string, hdr []byte) []byte {

// For bytes.HasPrefix below.
var (
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
jsRequestNextPreB = []byte(jsRequestNextPre)
jsDirectGetPreB = []byte(jsDirectGetPre)
jsConsumerInfoPreB = []byte(JSApiConsumerInfoPre)
)

// processServiceImport is an internal callback when a subscription matches an imported service
Expand All @@ -4181,12 +4182,16 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
}

var checkJS, checkConsumerInfo bool

acc.mu.RLock()
var checkJS bool
shouldReturn := si.invalid || acc.sl == nil
if !shouldReturn && !isResponse && si.to == jsAllAPI {
if bytes.HasPrefix(c.pa.subject, jsDirectGetPreB) || bytes.HasPrefix(c.pa.subject, jsRequestNextPreB) {
checkJS = true
} else if len(c.pa.psi) == 0 && bytes.HasPrefix(c.pa.subject, jsConsumerInfoPreB) {
// Only check if we are clustered and expecting a reply.
checkConsumerInfo = len(c.pa.reply) > 0 && c.srv.JetStreamIsClustered()
}
}
siAcc := si.acc
Expand All @@ -4200,6 +4205,15 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
return
}

// Here we will do a fast check for consumer info only to check if it does not exists. This will spread the
// load to all servers with connected clients since service imports are processed at point of entry.
// Only call for clustered setups.
if checkConsumerInfo && si.se != nil && si.se.acc == c.srv.SystemAccount() {
if c.srv.jsConsumerProcessMissing(c, acc) {
return
}
}

var nrr []byte
var rsi *serviceImport

Expand Down
7 changes: 3 additions & 4 deletions server/client_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2022 The NATS Authors
// Copyright 2012-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -16,7 +16,7 @@ package server
import (
"bufio"
"bytes"
"encoding/json"
"crypto/tls"
"fmt"
"io"
"math"
Expand All @@ -31,8 +31,7 @@ import (
"testing"
"time"

"crypto/tls"

"github.com/goccy/go-json"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
Expand Down
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math/rand"
Expand All @@ -28,6 +27,7 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/nats-io/nats-server/v2/server/avl"
"github.com/nats-io/nuid"
"golang.org/x/time/rate"
Expand Down
3 changes: 1 addition & 2 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"compress/gzip"
"crypto/sha256"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"math/rand"
Expand All @@ -30,8 +29,8 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/klauspost/compress/s2"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/server/certidp"
"github.com/nats-io/nats-server/v2/server/pse"
Expand Down
2 changes: 1 addition & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server
import (
"bytes"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"math/rand"
Expand All @@ -30,6 +29,7 @@ import (
"testing"
"time"

"github.com/goccy/go-json"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
Expand Down
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash"
Expand All @@ -40,6 +39,7 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/klauspost/compress/s2"
"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/avl"
Expand Down
4 changes: 1 addition & 3 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -39,6 +38,7 @@ import (
"testing"
"time"

"github.com/goccy/go-json"
"github.com/klauspost/compress/s2"
"github.com/nats-io/nuid"
)
Expand Down Expand Up @@ -1483,8 +1483,6 @@ func TestFileStoreMeta(t *testing.T) {
if err := json.Unmarshal(buf, &oconfig2); err != nil {
t.Fatalf("Error unmarshalling: %v", err)
}
// Since we set name we will get that back now.
oconfig.Name = oname
if !reflect.DeepEqual(oconfig2, oconfig) {
t.Fatalf("Consumer configs not equal, got %+v vs %+v", oconfig2, oconfig)
}
Expand Down
3 changes: 2 additions & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"cmp"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"net"
Expand All @@ -28,6 +27,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
)

const (
Expand Down
5 changes: 3 additions & 2 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/url"
Expand All @@ -31,10 +30,12 @@ import (
"testing"
"time"

. "github.com/nats-io/nats-server/v2/internal/ocsp"
"github.com/goccy/go-json"
"github.com/nats-io/nats-server/v2/logger"
"github.com/nats-io/nats.go"
"golang.org/x/crypto/ocsp"

. "github.com/nats-io/nats-server/v2/internal/ocsp"
)

func init() {
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
Expand All @@ -30,6 +29,7 @@ import (
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/minio/highwayhash"
"github.com/nats-io/nats-server/v2/server/sysmem"
"github.com/nats-io/nkeys"
Expand Down Expand Up @@ -461,6 +461,8 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
if err := s.enableJetStreamClustering(); err != nil {
return err
}
// Set our atomic bool to clustered.
s.jsClustered.Store(true)
}

// Mark when we are up and running.
Expand Down Expand Up @@ -965,6 +967,8 @@ func (s *Server) shutdownJetStream() {
cc.c = nil
}
cc.meta = nil
// Set our atomic bool to false.
s.jsClustered.Store(false)
}
js.mu.Unlock()

Expand Down
Loading