Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op
github.com/google/go-tpm v0.9.6
github.com/klauspost/compress v1.18.1
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.8.0
github.com/nats-io/nats.go v1.47.0
github.com/nats-io/nkeys v0.4.11
Expand All @@ -17,4 +16,9 @@ require (
golang.org/x/crypto v0.43.0
golang.org/x/sys v0.38.0
golang.org/x/time v0.14.0

// We don't usually pin non-tagged commits but so far no release has
// been made that includes https://github.com/minio/highwayhash/pull/29.
// This will be updated if a new tag covers this in the future.
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA=
github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
Expand Down
42 changes: 23 additions & 19 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"encoding/json"
"errors"
"fmt"
"hash"
"io"
"io/fs"
"math"
Expand Down Expand Up @@ -194,7 +193,7 @@ type fileStore struct {
psim *stree.SubjectTree[psi]
tsl int
adml int
hh hash.Hash64
hh *highwayhash.Digest64
qch chan struct{}
fsld chan struct{}
cmu sync.RWMutex
Expand Down Expand Up @@ -239,7 +238,7 @@ type msgBlock struct {
lrts int64
lsts int64
llseq uint64
hh hash.Hash64
hh *highwayhash.Digest64
ecache elastic.Pointer[cache]
cache *cache
cloads uint64
Expand Down Expand Up @@ -468,7 +467,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim

// Create highway hash for message blocks. Use sha256 of directory as key.
key := sha256.Sum256([]byte(cfg.Name))
fs.hh, err = highwayhash.New64(key[:])
fs.hh, err = highwayhash.NewDigest64(key[:])
if err != nil {
return nil, fmt.Errorf("could not create hash: %v", err)
}
Expand Down Expand Up @@ -939,7 +938,8 @@ func (fs *fileStore) writeStreamMeta() error {
}
fs.hh.Reset()
fs.hh.Write(b)
checksum := hex.EncodeToString(fs.hh.Sum(nil))
var hb [highwayhash.Size64]byte
checksum := hex.EncodeToString(fs.hh.Sum(hb[:0]))
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
err = fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
if err != nil {
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func (fs *fileStore) initMsgBlock(index uint32) *msgBlock {

if mb.hh == nil {
key := sha256.Sum256(fs.hashKeyForBlock(index))
mb.hh, _ = highwayhash.New64(key[:])
mb.hh, _ = highwayhash.NewDigest64(key[:])
}
return mb
}
Expand Down Expand Up @@ -4251,7 +4251,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {

// Now do local hash.
key := sha256.Sum256(fs.hashKeyForBlock(index))
hh, err := highwayhash.New64(key[:])
hh, err := highwayhash.NewDigest64(key[:])
if err != nil {
return nil, fmt.Errorf("could not create hash: %v", err)
}
Expand Down Expand Up @@ -7665,7 +7665,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
buf := mb.cache.buf[li:]

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
var hh *highwayhash.Digest64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
}
Expand Down Expand Up @@ -7764,22 +7764,22 @@ func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool)
// Internal function to return msg parts from a raw buffer.
// Raw buffer will be copied into sm.
// Lock should be held.
func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) {
func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh *highwayhash.Digest64) (*StoreMsg, error) {
return mb.msgFromBufEx(buf, sm, hh, true)
}

// Internal function to return msg parts from a raw buffer.
// Raw buffer will NOT be copied into sm.
// Only use for internal use, any message that is passed to upper layers should use mb.msgFromBuf.
// Lock should be held.
func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) {
func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh *highwayhash.Digest64) (*StoreMsg, error) {
return mb.msgFromBufEx(buf, sm, hh, false)
}

// Internal function to return msg parts from a raw buffer.
// copy boolean will determine if we make a copy or not.
// Lock should be held.
func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCopy bool) (*StoreMsg, error) {
func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh *highwayhash.Digest64, doCopy bool) (*StoreMsg, error) {
if len(buf) < emptyRecordLen {
return nil, errBadMsg{mb.mfn, "record too short"}
}
Expand Down Expand Up @@ -10325,7 +10325,8 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err
hh := fs.hh
hh.Reset()
hh.Write(meta)
sum := []byte(hex.EncodeToString(fs.hh.Sum(nil)))
var hb [highwayhash.Size64]byte
sum := []byte(hex.EncodeToString(fs.hh.Sum(hb[:0])))
fs.mu.Unlock()

// Meta first.
Expand Down Expand Up @@ -10428,7 +10429,8 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err
}
o.hh.Reset()
o.hh.Write(meta)
sum := []byte(hex.EncodeToString(o.hh.Sum(nil)))
var hb [highwayhash.Size64]byte
sum := []byte(hex.EncodeToString(o.hh.Sum(hb[:0])))

// We can have the running state directly encoded now.
state, err := o.encodeState()
Expand Down Expand Up @@ -10645,7 +10647,7 @@ type consumerFileStore struct {
name string
odir string
ifn string
hh hash.Hash64
hh *highwayhash.Digest64
state ConsumerState
fch chan struct{}
qch chan struct{}
Expand Down Expand Up @@ -10688,7 +10690,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
ifn: filepath.Join(odir, consumerState),
}
key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name))
hh, err := highwayhash.New64(key[:])
hh, err := highwayhash.NewDigest64(key[:])
if err != nil {
return nil, fmt.Errorf("could not create hash: %v", err)
}
Expand Down Expand Up @@ -11323,7 +11325,8 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
}
cfs.hh.Reset()
cfs.hh.Write(b)
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
var hb [highwayhash.Size64]byte
checksum := hex.EncodeToString(cfs.hh.Sum(hb[:0]))
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)

err = cfs.fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms)
Expand Down Expand Up @@ -11707,14 +11710,14 @@ func (fs *fileStore) RemoveConsumer(o ConsumerStore) error {
// Deprecated: stream templates are deprecated and will be removed in a future version.
type templateFileStore struct {
dir string
hh hash.Hash64
hh *highwayhash.Digest64
}

// Deprecated: stream templates are deprecated and will be removed in a future version.
func newTemplateFileStore(storeDir string) *templateFileStore {
tdir := filepath.Join(storeDir, tmplsDir)
key := sha256.Sum256([]byte("templates"))
hh, err := highwayhash.New64(key[:])
hh, err := highwayhash.NewDigest64(key[:])
if err != nil {
return nil
}
Expand Down Expand Up @@ -11743,7 +11746,8 @@ func (ts *templateFileStore) Store(t *streamTemplate) error {
// FIXME(dlc) - Do checksum
ts.hh.Reset()
ts.hh.Write(b)
checksum := hex.EncodeToString(ts.hh.Sum(nil))
var hb [highwayhash.Size64]byte
checksum := hex.EncodeToString(ts.hh.Sum(hb[:0]))
sum := filepath.Join(dir, JetStreamMetaFileSum)
if err := os.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
return err
Expand Down
10 changes: 6 additions & 4 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
tdir := filepath.Join(jsa.storeDir, tmplsDir)
if stat, err := os.Stat(tdir); err == nil && stat.IsDir() {
key := sha256.Sum256([]byte("templates"))
hh, err := highwayhash.New64(key[:])
hh, err := highwayhash.NewDigest64(key[:])
if err != nil {
return err
}
Expand All @@ -1242,7 +1242,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
}
hh.Reset()
hh.Write(buf)
checksum := hex.EncodeToString(hh.Sum(nil))
var hb [highwayhash.Size64]byte
checksum := hex.EncodeToString(hh.Sum(hb[:0]))
if checksum != string(sum) {
s.Warnf(" StreamTemplate checksums do not match %q vs %q", sum, checksum)
continue
Expand Down Expand Up @@ -1395,7 +1396,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
return nil
}
key := sha256.Sum256([]byte(fi.Name()))
hh, err := highwayhash.New64(key[:])
hh, err := highwayhash.NewDigest64(key[:])
if err != nil {
return err
}
Expand All @@ -1420,7 +1421,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
return nil
}
hh.Write(buf)
checksum := hex.EncodeToString(hh.Sum(nil))
var hb [highwayhash.Size64]byte
checksum := hex.EncodeToString(hh.Sum(hb[:0]))
if checksum != string(sum) {
s.Warnf(" Stream metafile %q: checksums do not match %q vs %q", metafile, sum, checksum)
return nil
Expand Down
11 changes: 6 additions & 5 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"hash"
"math"
"math/rand"
"net"
Expand Down Expand Up @@ -153,7 +152,7 @@ type raft struct {
state atomic.Int32 // RaftState
leaderState atomic.Bool // Is in (complete) leader state.
leaderSince atomic.Pointer[time.Time] // How long since becoming leader.
hh hash.Hash64 // Highwayhash, used for snapshots
hh *highwayhash.Digest64 // Highwayhash, used for snapshots
snapfile string // Snapshot filename

csz int // Cluster size
Expand Down Expand Up @@ -447,7 +446,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel

// Set up the highwayhash for the snapshots.
key := sha256.Sum256([]byte(n.group))
n.hh, _ = highwayhash.New64(key[:])
n.hh, _ = highwayhash.NewDigest64(key[:])

// If we have a term and vote file (tav.idx on the filesystem) then read in
// what we think the term and vote was. It's possible these are out of date
Expand Down Expand Up @@ -1225,7 +1224,8 @@ func (n *raft) encodeSnapshot(snap *snapshot) []byte {
// Now do the hash for the end.
n.hh.Reset()
n.hh.Write(buf[:wi])
checksum := n.hh.Sum(nil)
var hb [highwayhash.Size64]byte
checksum := n.hh.Sum(hb[:0])
copy(buf[wi:], checksum)
wi += len(checksum)
return buf[:wi]
Expand Down Expand Up @@ -1450,7 +1450,8 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) {
lchk := buf[hoff:]
n.hh.Reset()
n.hh.Write(buf[:hoff])
if !bytes.Equal(lchk[:], n.hh.Sum(nil)) {
var hb [highwayhash.Size64]byte
if !bytes.Equal(lchk[:], n.hh.Sum(hb[:0])) {
n.warn("Snapshot corrupt, checksums did not match")
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
Expand Down