diff --git a/go.mod b/go.mod index 7cf5ec40470..08a567e8c08 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op github.com/google/go-tpm v0.9.6 github.com/klauspost/compress v1.18.1 - github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.8.0 github.com/nats-io/nats.go v1.47.0 github.com/nats-io/nkeys v0.4.11 @@ -17,4 +16,9 @@ require ( golang.org/x/crypto v0.43.0 golang.org/x/sys v0.38.0 golang.org/x/time v0.14.0 + + // We don't usually pin non-tagged commits but so far no release has + // been made that includes https://github.com/minio/highwayhash/pull/29. + // This will be updated if a new tag covers this in the future. + github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 ) diff --git a/go.sum b/go.sum index 0febcfe4582..66d5d817667 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/google/go-tpm v0.9.6 h1:Ku42PT4LmjDu1H5C5ISWLlpI1mj+Zq7sPGKoRw2XROA= github.com/google/go-tpm v0.9.6/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= -github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= -github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM= diff --git a/server/filestore.go b/server/filestore.go index f2edebbdb72..c6e594e557f 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -25,7 +25,6 @@ import ( "encoding/json" "errors" "fmt" - "hash" "io" "io/fs" "math" @@ -194,7 +193,7 @@ type fileStore struct { psim *stree.SubjectTree[psi] tsl int adml int - hh hash.Hash64 + hh *highwayhash.Digest64 qch chan struct{} fsld chan struct{} cmu sync.RWMutex @@ -239,7 +238,7 @@ type msgBlock struct { lrts int64 lsts int64 llseq uint64 - hh hash.Hash64 + hh *highwayhash.Digest64 ecache elastic.Pointer[cache] cache *cache cloads uint64 @@ -468,7 +467,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Create highway hash for message blocks. Use sha256 of directory as key. key := sha256.Sum256([]byte(cfg.Name)) - fs.hh, err = highwayhash.New64(key[:]) + fs.hh, err = highwayhash.NewDigest64(key[:]) if err != nil { return nil, fmt.Errorf("could not create hash: %v", err) } @@ -939,7 +938,8 @@ func (fs *fileStore) writeStreamMeta() error { } fs.hh.Reset() fs.hh.Write(b) - checksum := hex.EncodeToString(fs.hh.Sum(nil)) + var hb [highwayhash.Size64]byte + checksum := hex.EncodeToString(fs.hh.Sum(hb[:0])) sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum) err = fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms) if err != nil { @@ -1040,7 +1040,7 @@ func (fs *fileStore) initMsgBlock(index uint32) *msgBlock { if mb.hh == nil { key := sha256.Sum256(fs.hashKeyForBlock(index)) - mb.hh, _ = highwayhash.New64(key[:]) + mb.hh, _ = highwayhash.NewDigest64(key[:]) } return mb } @@ -4251,7 +4251,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { // Now do local hash. key := sha256.Sum256(fs.hashKeyForBlock(index)) - hh, err := highwayhash.New64(key[:]) + hh, err := highwayhash.NewDigest64(key[:]) if err != nil { return nil, fmt.Errorf("could not create hash: %v", err) } @@ -7665,7 +7665,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store buf := mb.cache.buf[li:] // We use the high bit to denote we have already checked the checksum. - var hh hash.Hash64 + var hh *highwayhash.Digest64 if !hashChecked { hh = mb.hh // This will force the hash check in msgFromBuf. } @@ -7764,7 +7764,7 @@ func (fs *fileStore) msgForSeqLocked(seq uint64, sm *StoreMsg, needFSLock bool) // Internal function to return msg parts from a raw buffer. // Raw buffer will be copied into sm. // Lock should be held. -func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) { +func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh *highwayhash.Digest64) (*StoreMsg, error) { return mb.msgFromBufEx(buf, sm, hh, true) } @@ -7772,14 +7772,14 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store // Raw buffer will NOT be copied into sm. // Only use for internal use, any message that is passed to upper layers should use mb.msgFromBuf. // Lock should be held. -func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh hash.Hash64) (*StoreMsg, error) { +func (mb *msgBlock) msgFromBufNoCopy(buf []byte, sm *StoreMsg, hh *highwayhash.Digest64) (*StoreMsg, error) { return mb.msgFromBufEx(buf, sm, hh, false) } // Internal function to return msg parts from a raw buffer. // copy boolean will determine if we make a copy or not. // Lock should be held. -func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh hash.Hash64, doCopy bool) (*StoreMsg, error) { +func (mb *msgBlock) msgFromBufEx(buf []byte, sm *StoreMsg, hh *highwayhash.Digest64, doCopy bool) (*StoreMsg, error) { if len(buf) < emptyRecordLen { return nil, errBadMsg{mb.mfn, "record too short"} } @@ -10325,7 +10325,8 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err hh := fs.hh hh.Reset() hh.Write(meta) - sum := []byte(hex.EncodeToString(fs.hh.Sum(nil))) + var hb [highwayhash.Size64]byte + sum := []byte(hex.EncodeToString(fs.hh.Sum(hb[:0]))) fs.mu.Unlock() // Meta first. @@ -10428,7 +10429,8 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool, err } o.hh.Reset() o.hh.Write(meta) - sum := []byte(hex.EncodeToString(o.hh.Sum(nil))) + var hb [highwayhash.Size64]byte + sum := []byte(hex.EncodeToString(o.hh.Sum(hb[:0]))) // We can have the running state directly encoded now. state, err := o.encodeState() @@ -10645,7 +10647,7 @@ type consumerFileStore struct { name string odir string ifn string - hh hash.Hash64 + hh *highwayhash.Digest64 state ConsumerState fch chan struct{} qch chan struct{} @@ -10688,7 +10690,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt ifn: filepath.Join(odir, consumerState), } key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name)) - hh, err := highwayhash.New64(key[:]) + hh, err := highwayhash.NewDigest64(key[:]) if err != nil { return nil, fmt.Errorf("could not create hash: %v", err) } @@ -11323,7 +11325,8 @@ func (cfs *consumerFileStore) writeConsumerMeta() error { } cfs.hh.Reset() cfs.hh.Write(b) - checksum := hex.EncodeToString(cfs.hh.Sum(nil)) + var hb [highwayhash.Size64]byte + checksum := hex.EncodeToString(cfs.hh.Sum(hb[:0])) sum := filepath.Join(cfs.odir, JetStreamMetaFileSum) err = cfs.fs.writeFileWithOptionalSync(sum, []byte(checksum), defaultFilePerms) @@ -11707,14 +11710,14 @@ func (fs *fileStore) RemoveConsumer(o ConsumerStore) error { // Deprecated: stream templates are deprecated and will be removed in a future version. type templateFileStore struct { dir string - hh hash.Hash64 + hh *highwayhash.Digest64 } // Deprecated: stream templates are deprecated and will be removed in a future version. func newTemplateFileStore(storeDir string) *templateFileStore { tdir := filepath.Join(storeDir, tmplsDir) key := sha256.Sum256([]byte("templates")) - hh, err := highwayhash.New64(key[:]) + hh, err := highwayhash.NewDigest64(key[:]) if err != nil { return nil } @@ -11743,7 +11746,8 @@ func (ts *templateFileStore) Store(t *streamTemplate) error { // FIXME(dlc) - Do checksum ts.hh.Reset() ts.hh.Write(b) - checksum := hex.EncodeToString(ts.hh.Sum(nil)) + var hb [highwayhash.Size64]byte + checksum := hex.EncodeToString(ts.hh.Sum(hb[:0])) sum := filepath.Join(dir, JetStreamMetaFileSum) if err := os.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil { return err diff --git a/server/jetstream.go b/server/jetstream.go index e9f040939be..050a2a61449 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1218,7 +1218,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c tdir := filepath.Join(jsa.storeDir, tmplsDir) if stat, err := os.Stat(tdir); err == nil && stat.IsDir() { key := sha256.Sum256([]byte("templates")) - hh, err := highwayhash.New64(key[:]) + hh, err := highwayhash.NewDigest64(key[:]) if err != nil { return err } @@ -1242,7 +1242,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c } hh.Reset() hh.Write(buf) - checksum := hex.EncodeToString(hh.Sum(nil)) + var hb [highwayhash.Size64]byte + checksum := hex.EncodeToString(hh.Sum(hb[:0])) if checksum != string(sum) { s.Warnf(" StreamTemplate checksums do not match %q vs %q", sum, checksum) continue @@ -1395,7 +1396,7 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c return nil } key := sha256.Sum256([]byte(fi.Name())) - hh, err := highwayhash.New64(key[:]) + hh, err := highwayhash.NewDigest64(key[:]) if err != nil { return err } @@ -1420,7 +1421,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c return nil } hh.Write(buf) - checksum := hex.EncodeToString(hh.Sum(nil)) + var hb [highwayhash.Size64]byte + checksum := hex.EncodeToString(hh.Sum(hb[:0])) if checksum != string(sum) { s.Warnf(" Stream metafile %q: checksums do not match %q vs %q", metafile, sum, checksum) return nil diff --git a/server/raft.go b/server/raft.go index db061f7e94e..2fba5ab3438 100644 --- a/server/raft.go +++ b/server/raft.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "errors" "fmt" - "hash" "math" "math/rand" "net" @@ -153,7 +152,7 @@ type raft struct { state atomic.Int32 // RaftState leaderState atomic.Bool // Is in (complete) leader state. leaderSince atomic.Pointer[time.Time] // How long since becoming leader. - hh hash.Hash64 // Highwayhash, used for snapshots + hh *highwayhash.Digest64 // Highwayhash, used for snapshots snapfile string // Snapshot filename csz int // Cluster size @@ -447,7 +446,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel // Set up the highwayhash for the snapshots. key := sha256.Sum256([]byte(n.group)) - n.hh, _ = highwayhash.New64(key[:]) + n.hh, _ = highwayhash.NewDigest64(key[:]) // If we have a term and vote file (tav.idx on the filesystem) then read in // what we think the term and vote was. It's possible these are out of date @@ -1225,7 +1224,8 @@ func (n *raft) encodeSnapshot(snap *snapshot) []byte { // Now do the hash for the end. n.hh.Reset() n.hh.Write(buf[:wi]) - checksum := n.hh.Sum(nil) + var hb [highwayhash.Size64]byte + checksum := n.hh.Sum(hb[:0]) copy(buf[wi:], checksum) wi += len(checksum) return buf[:wi] @@ -1450,7 +1450,8 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) { lchk := buf[hoff:] n.hh.Reset() n.hh.Write(buf[:hoff]) - if !bytes.Equal(lchk[:], n.hh.Sum(nil)) { + var hb [highwayhash.Size64]byte + if !bytes.Equal(lchk[:], n.hh.Sum(hb[:0])) { n.warn("Snapshot corrupt, checksums did not match") os.Remove(n.snapfile) n.snapfile = _EMPTY_