diff --git a/network/vpack/dynamic_vpack.go b/network/vpack/dynamic_vpack.go new file mode 100644 index 0000000000..cf4552ee30 --- /dev/null +++ b/network/vpack/dynamic_vpack.go @@ -0,0 +1,617 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package vpack + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + + "github.com/algorand/msgp/msgp" +) + +// The second byte in the header is used by StatefulEncoder and +// StatefulDecoder to signal which values have been replaced +// by references. +// For r.prop, 3 bits are used to encode the reference directly. +// For r.rnd, a 2-bit delta encoding is used. +// +// 7 6 5 4 3 2 1 0 +// | | | \___/ \_/-- rnd encoding (00=literal, 01=+1, 10=-1, 11=same as last rnd) +// | | | `-------- prop window reference (000=literal, 001...111=window index) +// | | +------------ snd table reference (0=literal, 1=table) +// | +-------------- (sig.p,sig.p1s) table reference appears (0=literal, 1=table) +// +---------------- (sig.p2, sig.p2s) table reference (0=literal, 1=table) +const ( + // bits 0-1: rnd delta encoding + hdr1RndMask = 0b00000011 + hdr1RndDeltaSame = 0b11 + hdr1RndDeltaPlus1 = 0b01 + hdr1RndDeltaMinus1 = 0b10 + hdr1RndLiteral = 0b00 + + // bits 2-4: proposal-bundle reference (value<<2) + hdr1PropShift = 2 + hdr1PropMask = 0b00011100 + + // bits 5-7: whether snd, pk, pk2 are dynamic table references + hdr1SndRef = 1 << 5 + hdr1PkRef = 1 << 6 + hdr1Pk2Ref = 1 << 7 + + // sizes used below + pfSize = 80 // committee.VrfProof + digestSize = 32 // crypto.Digest (and basics.Address) + sigSize = 64 // crypto.Signature + pkSize = 32 // crypto.PublicKey +) + +// StatefulEncoder compresses votes by using references to previously seen values +// from earlier votes. +type StatefulEncoder struct{ dynamicTableState } + +// StatefulDecoder decompresses votes by using references to previously seen values +// from earlier votes. +type StatefulDecoder struct{ dynamicTableState } + +// NewStatefulEncoder creates a new StatefulEncoder with initialized LRU tables of the specified size +func NewStatefulEncoder(tableSize uint) (*StatefulEncoder, error) { + e := &StatefulEncoder{} + if err := e.initTables(tableSize); err != nil { + return nil, err + } + return e, nil +} + +// NewStatefulDecoder creates a new StatefulDecoder with initialized LRU tables of the specified size +func NewStatefulDecoder(tableSize uint) (*StatefulDecoder, error) { + d := &StatefulDecoder{} + if err := d.initTables(tableSize); err != nil { + return nil, err + } + return d, nil +} + +// dynamicTableState is shared by StatefulEncoder and StatefulDecoder. It contains +// the necessary state for tracking references to previously seen values. +type dynamicTableState struct { + // LRU hash tables for snd, p+p1s, and p2+p2s + sndTable *lruTable[addressValue] // 512 * 2 * 32 = 32KB + pkTable *lruTable[pkSigPair] // 512 * 2 * 96 = 96KB + pk2Table *lruTable[pkSigPair] // 512 * 2 * 96 = 96KB + + // 8-slot window of recent proposal values + proposalWindow propWindow + + // last round number seen in previous vote + lastRnd uint64 +} + +// pkSigPair is a 32-byte public key + 64-byte signature +// used for the LRU tables for p+p1s and p2+p2s. +type pkSigPair struct { + pk [pkSize]byte + sig [sigSize]byte +} + +func (p *pkSigPair) hash() uint64 { + // Since pk and sig should already be uniformly distributed, we can use a + // simple XOR of the first 8 bytes of each to get a good hash. + // Any invalid votes intentionally designed to cause collisions will only + // affect the sending peer's own per-peer compression state, and cause + // agreement to disconnect the peer. + return binary.LittleEndian.Uint64(p.pk[:8]) ^ binary.LittleEndian.Uint64(p.sig[:8]) +} + +// addressValue is a 32-byte address used for the LRU table for snd. +type addressValue [digestSize]byte + +func (v *addressValue) hash() uint64 { + // addresses are fairly uniformly distributed, so we can use a simple XOR + return binary.LittleEndian.Uint64(v[:8]) ^ binary.LittleEndian.Uint64(v[8:16]) ^ + binary.LittleEndian.Uint64(v[16:24]) ^ binary.LittleEndian.Uint64(v[24:]) +} + +// initTables initializes the LRU tables with the specified size for all tables +func (s *dynamicTableState) initTables(tableSize uint) error { + var err error + if s.sndTable, err = newLRUTable[addressValue](tableSize); err != nil { + return err + } + if s.pkTable, err = newLRUTable[pkSigPair](tableSize); err != nil { + return err + } + if s.pk2Table, err = newLRUTable[pkSigPair](tableSize); err != nil { + return err + } + return nil +} + +// statefulReader helps StatefulEncoder and StatefulDecoder to read from a +// source buffer with bounds checking. +type statefulReader struct { + src []byte + pos int +} + +func (r *statefulReader) readFixed(n int, field string) ([]byte, error) { + if r.pos+n > len(r.src) { + return nil, fmt.Errorf("truncated %s", field) + } + data := r.src[r.pos : r.pos+n] + r.pos += n + return data, nil +} + +func (r *statefulReader) readVaruintBytes(field string) ([]byte, error) { + if r.pos+1 > len(r.src) { + return nil, fmt.Errorf("truncated %s marker", field) + } + more, err := msgpVaruintRemaining(r.src[r.pos]) + if err != nil { + return nil, fmt.Errorf("invalid %s marker: %w", field, err) + } + total := 1 + more + if r.pos+total > len(r.src) { + return nil, fmt.Errorf("truncated %s", field) + } + data := r.src[r.pos : r.pos+total] + r.pos += total + return data, nil +} + +func (r *statefulReader) readVaruint(field string) ([]byte, uint64, error) { + data, err := r.readVaruintBytes(field) + if err != nil { + return nil, 0, err + } + // decode: readVaruintBytes has already validated the marker + var value uint64 + switch len(data) { + case 1: // fixint (values 0-127) + value = uint64(data[0]) + case 2: // uint8 (marker + uint8) + value = uint64(data[1]) + case 3: // uint16 (marker + uint16) + value = uint64(binary.BigEndian.Uint16(data[1:])) + case 5: // uint32 (marker + uint32) + value = uint64(binary.BigEndian.Uint32(data[1:])) + case 9: // uint64 (marker + uint64) + value = binary.BigEndian.Uint64(data[1:]) + default: + return nil, 0, fmt.Errorf("readVaruint: %s unexpected length %d", field, len(data)) + } + + return data, value, nil +} + +// readDynamicRef reads an LRU table reference ID from the statefulReader. +func (r *statefulReader) readDynamicRef(field string) (lruTableReferenceID, error) { + if r.pos+2 > len(r.src) { + return 0, fmt.Errorf("truncated %s", field) + } + id := binary.BigEndian.Uint16(r.src[r.pos : r.pos+2]) + r.pos += 2 + return lruTableReferenceID(id), nil +} + +// appendDynamicRef encodes an LRU table reference ID and appends it to dst. +func appendDynamicRef(dst []byte, id lruTableReferenceID) []byte { + return binary.BigEndian.AppendUint16(dst, uint16(id)) +} + +// Compress takes a vote compressed by StatelessEncoder, and additionally +// compresses it using dynamic references to previously seen values. +func (e *StatefulEncoder) Compress(dst, src []byte) ([]byte, error) { + r := statefulReader{src: src, pos: 0} + + // Read header + header, err := r.readFixed(2, "header") + if err != nil { + return nil, errors.New("src too short") + } + hdr0 := header[0] // from StatelessEncoder + var hdr1 byte // StatefulEncoder header + + // prepare output, leave room for 2-byte header + out := dst[:0] + out = append(out, hdr0, 0) // will fill in with hdr1 later + + // cred.pf: pass through + pf, err := r.readFixed(pfSize, "pf") + if err != nil { + return nil, err + } + out = append(out, pf...) + + // r.per: pass through, if present + if (hdr0 & bitPer) != 0 { + perData, err1 := r.readVaruintBytes("r.per") + if err1 != nil { + return nil, err1 + } + out = append(out, perData...) + } + + // r.prop: check LRU window + // copy proposal fields for table lookup + var prop proposalEntry + if (hdr0 & bitDig) != 0 { + dig, err1 := r.readFixed(digestSize, "dig") + if err1 != nil { + return nil, err1 + } + copy(prop.dig[:], dig) + } + if (hdr0 & bitEncDig) != 0 { + encdig, err1 := r.readFixed(digestSize, "encdig") + if err1 != nil { + return nil, err1 + } + copy(prop.encdig[:], encdig) + } + if (hdr0 & bitOper) != 0 { + operData, err1 := r.readVaruintBytes("oper") + if err1 != nil { + return nil, err1 + } + copy(prop.operEnc[:], operData) + prop.operLen = uint8(len(operData)) + } + if (hdr0 & bitOprop) != 0 { + oprop, err1 := r.readFixed(digestSize, "oprop") + if err1 != nil { + return nil, err1 + } + copy(prop.oprop[:], oprop) + } + prop.mask = hdr0 & propFieldsMask + + if idx := e.proposalWindow.lookup(prop); idx != 0 { + hdr1 |= byte(idx) << hdr1PropShift // set 001..111 + } else { + // not found: send literal and add to window (don't touch hdr1) + e.proposalWindow.insertNew(prop) + // write proposal bytes as StatelessEncoder would + if (hdr0 & bitDig) != 0 { + out = append(out, prop.dig[:]...) + } + if (hdr0 & bitEncDig) != 0 { + out = append(out, prop.encdig[:]...) + } + if (hdr0 & bitOper) != 0 { + out = append(out, prop.operEnc[:prop.operLen]...) + } + if (hdr0 & bitOprop) != 0 { + out = append(out, prop.oprop[:]...) + } + } + + // r.rnd: perform delta encoding + rndData, rnd, err := r.readVaruint("rnd") + if err != nil { + return nil, err + } + + switch { // delta encoding + case rnd == e.lastRnd: + hdr1 |= hdr1RndDeltaSame + case rnd == e.lastRnd+1 && e.lastRnd < math.MaxUint64: // avoid overflow + hdr1 |= hdr1RndDeltaPlus1 + case rnd == e.lastRnd-1 && e.lastRnd > 0: // avoid underflow + hdr1 |= hdr1RndDeltaMinus1 + default: + // pass through literal bytes (don't touch hdr1) + out = append(out, rndData...) + } + e.lastRnd = rnd + + // r.snd: check LRU table + sndData, err := r.readFixed(digestSize, "sender") + if err != nil { + return nil, err + } + var snd addressValue + copy(snd[:], sndData) + sndH := snd.hash() + if id, ok := e.sndTable.lookup(snd, sndH); ok { + // found in table, use reference + hdr1 |= hdr1SndRef + out = appendDynamicRef(out, id) + } else { // not found, add to table and use literal + out = append(out, snd[:]...) + e.sndTable.insert(snd, sndH) + } + + // r.step: pass through, if present + if (hdr0 & bitStep) != 0 { + stepData, err1 := r.readVaruintBytes("step") + if err1 != nil { + return nil, err1 + } + out = append(out, stepData...) + } + + // sig.p + sig.p1s: check LRU table + pkBundle, err := r.readFixed(pkSize+sigSize, "pk bundle") + if err != nil { + return nil, err + } + var pk pkSigPair + copy(pk.pk[:], pkBundle[:pkSize]) + copy(pk.sig[:], pkBundle[pkSize:]) + + pkH := pk.hash() + if id, ok := e.pkTable.lookup(pk, pkH); ok { + // found in table, use reference + hdr1 |= hdr1PkRef + out = appendDynamicRef(out, id) + } else { // not found, add to table and use literal + out = append(out, pk.pk[:]...) + out = append(out, pk.sig[:]...) + e.pkTable.insert(pk, pkH) + } + + // sig.p2 + sig.p2s: check LRU table + pk2Bundle, err := r.readFixed(pkSize+sigSize, "pk2 bundle") + if err != nil { + return nil, err + } + var pk2 pkSigPair + copy(pk2.pk[:], pk2Bundle[:pkSize]) + copy(pk2.sig[:], pk2Bundle[pkSize:]) + + pk2H := pk2.hash() + if id, ok := e.pk2Table.lookup(pk2, pk2H); ok { + // found in table, use reference + hdr1 |= hdr1Pk2Ref + out = appendDynamicRef(out, id) + } else { // not found, add to table and use literal + out = append(out, pk2.pk[:]...) + out = append(out, pk2.sig[:]...) + e.pk2Table.insert(pk2, pk2H) + } + + // sig.s: pass through + sigs, err := r.readFixed(sigSize, "sig.s") + if err != nil { + return nil, err + } + out = append(out, sigs...) + + if r.pos != len(src) { + return nil, fmt.Errorf("length mismatch: expected %d, got %d", len(src), r.pos) + } + + // fill in stateful header (hdr0 is unchanged) + out[1] = hdr1 + return out, nil +} + +// Decompress reverses StatefulEncoder, and writes a valid stateless vpack +// format buffer into dst. Caller must then pass it to StatelessDecoder. +func (d *StatefulDecoder) Decompress(dst, src []byte) ([]byte, error) { + r := statefulReader{src: src, pos: 0} + + // Read header + header, err := r.readFixed(2, "header") + if err != nil { + return nil, errors.New("input shorter than header") + } + hdr0 := header[0] // from StatelessEncoder + hdr1 := header[1] // from StatefulEncoder + + // prepare out; stateless size <= original + out := dst[:0] + out = append(out, hdr0, 0) // StatelessDecoder-compatible header + + // cred.pf: pass through + pf, err := r.readFixed(pfSize, "pf") + if err != nil { + return nil, err + } + out = append(out, pf...) + + // r.per: pass through, if present + if (hdr0 & bitPer) != 0 { + perData, err1 := r.readVaruintBytes("per") + if err1 != nil { + return nil, err1 + } + out = append(out, perData...) + } + + // r.prop: check for reference to LRU window + var prop proposalEntry + propRef := (hdr1 & hdr1PropMask) >> hdr1PropShift // index in range [0, 7] + if propRef == 0 { // literal follows + if (hdr0 & bitDig) != 0 { + dig, err1 := r.readFixed(digestSize, "digest") + if err1 != nil { + return nil, err1 + } + copy(prop.dig[:], dig) + } + if (hdr0 & bitEncDig) != 0 { + encdig, err1 := r.readFixed(digestSize, "encdig") + if err1 != nil { + return nil, err1 + } + copy(prop.encdig[:], encdig) + } + if (hdr0 & bitOper) != 0 { + operData, err1 := r.readVaruintBytes("oper") + if err1 != nil { + return nil, err1 + } + copy(prop.operEnc[:], operData) + prop.operLen = uint8(len(operData)) + } + if (hdr0 & bitOprop) != 0 { + oprop, err1 := r.readFixed(digestSize, "oprop") + if err1 != nil { + return nil, err1 + } + copy(prop.oprop[:], oprop) + } + prop.mask = hdr0 & propFieldsMask + // add literal to the proposal window + d.proposalWindow.insertNew(prop) + } else { // reference index 1-7 + var ok bool + prop, ok = d.proposalWindow.byRef(int(propRef)) + if !ok { + return nil, fmt.Errorf("bad proposal ref: %v", propRef) + } + } + + // write proposal bytes (from either literal or reference) + if (prop.mask & bitDig) != 0 { + out = append(out, prop.dig[:]...) + } + if (prop.mask & bitEncDig) != 0 { + out = append(out, prop.encdig[:]...) + } + if (prop.mask & bitOper) != 0 { + out = append(out, prop.operEnc[:prop.operLen]...) + } + if (prop.mask & bitOprop) != 0 { + out = append(out, prop.oprop[:]...) + } + + // r.rnd: perform delta decoding + var rnd uint64 + switch hdr1 & hdr1RndMask { + case hdr1RndDeltaSame: + rnd = d.lastRnd + out = msgp.AppendUint64(out, rnd) + case hdr1RndDeltaPlus1: + if d.lastRnd == math.MaxUint64 { + return nil, fmt.Errorf("round overflow: lastRnd %d", d.lastRnd) + } + rnd = d.lastRnd + 1 + out = msgp.AppendUint64(out, rnd) + case hdr1RndDeltaMinus1: + if d.lastRnd == 0 { + return nil, fmt.Errorf("round underflow: lastRnd %d", d.lastRnd) + } + rnd = d.lastRnd - 1 + out = msgp.AppendUint64(out, rnd) + case hdr1RndLiteral: + rndData, rndVal, err1 := r.readVaruint("rnd") + if err1 != nil { + return nil, err1 + } + rnd = rndVal + out = append(out, rndData...) + } + d.lastRnd = rnd + + // r.snd: check for reference to LRU table + if (hdr1 & hdr1SndRef) != 0 { // reference + id, err1 := r.readDynamicRef("snd ref") + if err1 != nil { + return nil, err1 + } + addr, ok := d.sndTable.fetch(id) + if !ok { + return nil, fmt.Errorf("bad sender ref: %v", id) + } + out = append(out, addr[:]...) + } else { // literal + sndData, err1 := r.readFixed(digestSize, "sender") + if err1 != nil { + return nil, err1 + } + var addr addressValue + copy(addr[:], sndData) + out = append(out, addr[:]...) + d.sndTable.insert(addr, addr.hash()) + } + + // r.step: pass through, if present + if (hdr0 & bitStep) != 0 { + stepData, err1 := r.readVaruintBytes("step") + if err1 != nil { + return nil, err1 + } + out = append(out, stepData...) + } + + // sig.p + p1s: check for reference to LRU table + if (hdr1 & hdr1PkRef) != 0 { // reference + id, err1 := r.readDynamicRef("pk ref") + if err1 != nil { + return nil, err1 + } + pkb, ok := d.pkTable.fetch(id) + if !ok { + return nil, fmt.Errorf("bad pk ref: %v", id) + } + out = append(out, pkb.pk[:]...) + out = append(out, pkb.sig[:]...) + } else { // literal + pkBundle, err1 := r.readFixed(pkSize+sigSize, "pk bundle") + if err1 != nil { + return nil, err1 + } + var pkb pkSigPair + copy(pkb.pk[:], pkBundle[:pkSize]) + copy(pkb.sig[:], pkBundle[pkSize:]) + out = append(out, pkb.pk[:]...) + out = append(out, pkb.sig[:]...) + d.pkTable.insert(pkb, pkb.hash()) + } + + // sig.p2 + p2s: check for reference to LRU table + if (hdr1 & hdr1Pk2Ref) != 0 { // reference + id, err1 := r.readDynamicRef("pk2 ref") + if err1 != nil { + return nil, err1 + } + pk2b, ok := d.pk2Table.fetch(id) + if !ok { + return nil, fmt.Errorf("bad pk2 ref: %v", id) + } + out = append(out, pk2b.pk[:]...) + out = append(out, pk2b.sig[:]...) + } else { // literal + pk2Bundle, err1 := r.readFixed(pkSize+sigSize, "pk2 bundle") + if err1 != nil { + return nil, err1 + } + var pk2b pkSigPair + copy(pk2b.pk[:], pk2Bundle[:pkSize]) + copy(pk2b.sig[:], pk2Bundle[pkSize:]) + out = append(out, pk2b.pk[:]...) + out = append(out, pk2b.sig[:]...) + d.pk2Table.insert(pk2b, pk2b.hash()) + } + + // sig.s: pass through + sigs, err := r.readFixed(sigSize, "sig.s") + if err != nil { + return nil, err + } + out = append(out, sigs...) + + if r.pos != len(src) { + return nil, fmt.Errorf("length mismatch: expected %d, got %d", len(src), r.pos) + } + return out, nil +} diff --git a/network/vpack/dynamic_vpack_test.go b/network/vpack/dynamic_vpack_test.go new file mode 100644 index 0000000000..9f968294e6 --- /dev/null +++ b/network/vpack/dynamic_vpack_test.go @@ -0,0 +1,469 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package vpack + +import ( + "math" + "reflect" + "slices" + "testing" + "unsafe" + + "github.com/algorand/go-algorand/agreement" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" + "github.com/stretchr/testify/require" +) + +// TestStatefulEncoderDecoderSequence verifies that a StatefulEncoder/StatefulDecoder +// pair can be reused across multiple votes while preserving correctness. +func TestStatefulEncoderDecoderSequence(t *testing.T) { + partitiontest.PartitionTest(t) + + const numVotes = 30 + + // Stateless encoder/decoder used as front/back before Stateful layer + stEnc := NewStatelessEncoder() + stDec := NewStatelessDecoder() + + enc, err := NewStatefulEncoder(1024) + require.NoError(t, err) + dec, err := NewStatefulDecoder(1024) + require.NoError(t, err) + + voteGen := generateRandomVote() + + for i := 0; i < numVotes; i++ { + v0 := voteGen.Example(i) + + // Ensure PKSigOld is zero to satisfy encoder expectations + v0.Sig.PKSigOld = [64]byte{} + + // Encode to msgpack and bounds-check size + msgpackBuf := protocol.EncodeMsgp(v0) + require.LessOrEqual(t, len(msgpackBuf), MaxMsgpackVoteSize) + + // First layer: stateless compression + statelessBuf, err := stEnc.CompressVote(nil, msgpackBuf) + require.NoError(t, err) + + // Second layer: stateful compression + encBuf, err := enc.Compress(nil, statelessBuf) + require.NoError(t, err, "Vote %d failed to compress", i) + // size sanity: compressed should not exceed stateless size + require.LessOrEqual(t, len(encBuf), len(statelessBuf)) + + // Reverse: stateful decompress → stateless + statelessOut, err := dec.Decompress(nil, encBuf) + require.NoError(t, err, "Vote %d failed to decompress", i) + + // Reverse: stateless decompress → msgpack + msgpackOut, err := stDec.DecompressVote(nil, statelessOut) + require.NoError(t, err) + + // Decode and compare objects for round-trip integrity + var v1 agreement.UnauthenticatedVote + err = protocol.Decode(msgpackOut, &v1) + require.NoError(t, err) + require.Equal(t, *v0, v1, "Vote %d round-trip mismatch", i) + } +} + +// TestStatefulEncoderReuse mirrors TestEncoderReuse in vpack_test.go but targets +// StatefulEncoder to guarantee that buffer reuse does not corrupt internal state. +func TestStatefulEncoderReuse(t *testing.T) { + partitiontest.PartitionTest(t) + + const numVotes = 10 + voteGen := generateRandomVote() + msgpackBufs := make([][]byte, 0, numVotes) + + // Generate and encode votes + for i := 0; i < numVotes; i++ { + buf := protocol.EncodeMsgp(voteGen.Example(i)) + require.LessOrEqual(t, len(buf), MaxMsgpackVoteSize) + msgpackBufs = append(msgpackBufs, buf) + } + + stEnc := NewStatelessEncoder() + stDec := NewStatelessDecoder() + enc, err := NewStatefulEncoder(1024) + require.NoError(t, err) + dec, err := NewStatefulDecoder(1024) + require.NoError(t, err) + + // 1) Compress into new buffers each time + var compressed [][]byte + for i, msgp := range msgpackBufs { + stateless, err := stEnc.CompressVote(nil, msgp) + require.NoError(t, err) + c, err := enc.Compress(nil, stateless) + require.NoError(t, err, "vote %d compress failed", i) + compressed = append(compressed, append([]byte(nil), c...)) + } + + for i, c := range compressed { + statelessOut, err := dec.Decompress(nil, c) + require.NoError(t, err, "vote %d decompress failed", i) + msgpackOut, err := stDec.DecompressVote(nil, statelessOut) + require.NoError(t, err) + var v agreement.UnauthenticatedVote + require.NoError(t, protocol.Decode(msgpackOut, &v)) + var orig agreement.UnauthenticatedVote + require.NoError(t, protocol.Decode(msgpackBufs[i], &orig)) + require.Equal(t, orig, v) + } + + // 2) Reuse a single destination slice + compressed = compressed[:0] + reused := make([]byte, 0, 4096) + for i, msgp := range msgpackBufs { + st, err := stEnc.CompressVote(nil, msgp) + require.NoError(t, err) + c, err := enc.Compress(reused[:0], st) + require.NoError(t, err, "vote %d compress failed (reuse)", i) + compressed = append(compressed, append([]byte(nil), c...)) + } + for i, c := range compressed { + stOut, err := dec.Decompress(nil, c) + require.NoError(t, err, "vote %d decompress failed (reuse)", i) + mpOut, err := stDec.DecompressVote(nil, stOut) + require.NoError(t, err) + var v agreement.UnauthenticatedVote + require.NoError(t, protocol.Decode(mpOut, &v)) + var orig agreement.UnauthenticatedVote + require.NoError(t, protocol.Decode(msgpackBufs[i], &orig)) + require.Equal(t, orig, v) + } + + // 3) Reuse a slice that grows over iterations + compressed = compressed[:0] + growing := make([]byte, 0, 8) + for i, msgp := range msgpackBufs { + st, err := stEnc.CompressVote(nil, msgp) + require.NoError(t, err) + c, err := enc.Compress(growing[:0], st) + require.NoError(t, err, "vote %d compress failed (growing)", i) + compressed = append(compressed, append([]byte(nil), c...)) + growing = c + } + for i, c := range compressed { + stOut, err := dec.Decompress(nil, c) + require.NoError(t, err, "vote %d decompress failed (growing)", i) + mpOut, err := stDec.DecompressVote(nil, stOut) + require.NoError(t, err) + var v agreement.UnauthenticatedVote + require.NoError(t, protocol.Decode(mpOut, &v)) + var orig agreement.UnauthenticatedVote + require.NoError(t, protocol.Decode(msgpackBufs[i], &orig)) + require.Equal(t, orig, v) + } +} + +func TestStatefulRndDelta(t *testing.T) { + partitiontest.PartitionTest(t) + + rounds := []uint64{10, 10, 11, 10, 11, 11, 20} + expected := []byte{hdr1RndLiteral, hdr1RndDeltaSame, hdr1RndDeltaPlus1, hdr1RndDeltaMinus1, hdr1RndDeltaPlus1, hdr1RndDeltaSame, hdr1RndLiteral} + + enc, err := NewStatefulEncoder(1024) + require.NoError(t, err) + dec, err := NewStatefulDecoder(1024) + require.NoError(t, err) + stEnc := NewStatelessEncoder() + stDec := NewStatelessDecoder() + voteGen := generateRandomVote() + + // Test both encoding and decoding in the same loop + for i, rnd := range rounds { + v := voteGen.Example(i) + v.R.Round = basics.Round(rnd) + + msgp := protocol.EncodeMsgp(v) + statelessBuf, err := stEnc.CompressVote(nil, msgp) + require.NoError(t, err) + + // Compress with stateful encoder + compressedBuf, err := enc.Compress(nil, statelessBuf) + require.NoError(t, err) + require.GreaterOrEqual(t, len(compressedBuf), 2) + + // Verify the round delta encoding in the header matches expectations + got := compressedBuf[1] & hdr1RndMask + require.Equal(t, expected[i], got) + + // Decompress with the stateful decoder + decompressedBuf, err := dec.Decompress(nil, compressedBuf) + require.NoError(t, err) + require.Equal(t, statelessBuf, decompressedBuf) + + // Decompress with the stateless decoder + decompressedStatelessBuf, err := stDec.DecompressVote(nil, statelessBuf) + require.NoError(t, err) + require.Equal(t, msgp, decompressedStatelessBuf) + + } +} + +func TestStatefulEncodeRef(t *testing.T) { + // ensure lruTableReferenceID can fit in uint16 encoding used in appendDynamicRef + partitiontest.PartitionTest(t) + var id lruTableReferenceID + require.Equal(t, uintptr(2), unsafe.Sizeof(id), "lruTableReferenceID should occupy 2 bytes (uint16)") + require.Equal(t, reflect.Uint16, reflect.TypeOf(id).Kind(), "lruTableReferenceID underlying kind should be uint16") + // Maximum table size we support is 2048 (1024 buckets, 2 slots each) + // Last bucket would be 1023, last slot would be 1, so maxID = (1023<<1)|1 = 2047 + maxTableSize := uint32(2048) + maxBucketIndex := (maxTableSize / 2) - 1 + maxID := lruTableReferenceID((maxBucketIndex << 1) | 1) // last bucket, last slot + require.LessOrEqual(t, uint32(maxID), uint32(math.MaxUint16)) +} + +func TestStatefulDecoderErrors(t *testing.T) { + partitiontest.PartitionTest(t) + + fullVote := slices.Concat( + // Header with all hdr0 optional bits set, but no hdr1 bits + []byte{byte(bitPer | bitDig | bitStep | bitEncDig | bitOper | bitOprop), 0x00}, + make([]byte, pfSize), // Credential prefix (80 bytes) + []byte{msgpUint32}, // Per field marker + []byte{0x01, 0x02, 0x03, 0x04}, // Per value (4 bytes) + make([]byte, digestSize), // Digest (32 bytes) + make([]byte, digestSize), // EncDig (32 bytes) + []byte{msgpUint32}, // Oper field marker + []byte{0x01, 0x02, 0x03, 0x04}, // Oper value (4 bytes) + make([]byte, digestSize), // Oprop (32 bytes) + []byte{msgpUint32}, // Round marker (msgpack marker) + []byte{0x01, 0x02, 0x03, 0x04}, // Round value (4 bytes) + make([]byte, digestSize), // Sender (32 bytes) + []byte{msgpUint32}, // Step field marker + []byte{0x01, 0x02, 0x03, 0x04}, // Step value (4 bytes) + make([]byte, pkSize+sigSize), // pk + p1s (96 bytes: 32 for pk, 64 for p1s) + make([]byte, pkSize+sigSize), // pk2 + p2s (96 bytes: 32 for pk2, 64 for p2s) + make([]byte, sigSize), // sig.s (64 bytes) + ) + + refVote := slices.Concat( + // Header with all hdr1 reference bits set, but no hdr0 bits + []byte{0x00, byte(hdr1SndRef | hdr1PkRef | hdr1Pk2Ref | hdr1RndLiteral)}, + make([]byte, pfSize), // Credential prefix + []byte{0x07}, // Round literal (fixint 7) + []byte{0x01, 0x02}, // Sender ref ID + []byte{0x03, 0x04}, // pk ref ID + []byte{0x05, 0x06}, // pk2 ref ID + make([]byte, sigSize), // sig.s + ) + + for _, tc := range []struct { + want string + buf []byte + }{ + // Truncation errors + {"input shorter than header", fullVote[:1]}, + {"truncated pf", fullVote[:2]}, + {"truncated per marker", fullVote[:82]}, + {"truncated per", fullVote[:83]}, + {"truncated digest", fullVote[:87]}, + {"truncated encdig", fullVote[:119]}, + {"truncated oper marker", fullVote[:151]}, + {"truncated oper", fullVote[:152]}, + {"truncated oprop", fullVote[:160]}, + {"truncated rnd marker", fullVote[:188]}, + {"truncated rnd", fullVote[:189]}, + {"truncated sender", fullVote[:193]}, + {"truncated step marker", fullVote[:225]}, + {"truncated step", fullVote[:226]}, + {"truncated pk bundle", fullVote[:234]}, + {"truncated pk2 bundle", fullVote[:334]}, + {"truncated sig.s", fullVote[:422]}, + // Reference ID decoding errors + {"truncated snd ref", refVote[:84]}, + {"truncated pk ref", refVote[:86]}, + {"truncated pk2 ref", refVote[:88]}, + {"bad sender ref", slices.Concat(refVote[:83], []byte{0xFF, 0xFF})}, + {"bad pk ref", slices.Concat(refVote[:85], []byte{0xFF, 0xFF})}, + {"bad pk2 ref", slices.Concat(refVote[:87], []byte{0xFF, 0xFF})}, + {"bad proposal ref", slices.Concat( + []byte{0x00, byte(3 << hdr1PropShift)}, // proposal reference ID 3 (invalid, StatefulDecoder is empty) + make([]byte, pfSize), // pf + []byte{0x01}, // round (fixint 1) + )}, + {"length mismatch: expected", slices.Concat(fullVote, []byte{0xFF, 0xFF})}, + } { + t.Run(tc.want, func(t *testing.T) { + dec, err := NewStatefulDecoder(1024) + require.NoError(t, err) + _, err = dec.Decompress(nil, tc.buf) + require.ErrorContains(t, err, tc.want) + }) + } +} + +func TestStatefulEncoderErrors(t *testing.T) { + partitiontest.PartitionTest(t) + + enc, err := NewStatefulEncoder(1024) + require.NoError(t, err) + + // Source too short error + _, err = enc.Compress(nil, []byte{0x00}) + require.ErrorContains(t, err, "src too short") + + // Length mismatch error + vote := generateRandomVote().Example(0) + stEnc := NewStatelessEncoder() + statelessBuf, err := stEnc.CompressVote(nil, protocol.EncodeMsgp(vote)) + require.NoError(t, err) + + badBuf := append(statelessBuf, 0xFF) // append spurious byte + _, err = enc.Compress(nil, badBuf) + require.ErrorContains(t, err, "length mismatch") + + // Test nil dst + compressedBuf, err := enc.Compress(nil, statelessBuf) + require.NoError(t, err) + require.Greater(t, len(compressedBuf), 0) + + // Test bounds checking errors + testCases := []struct { + name string + buf []byte + want string + }{ + { + name: "truncated pf", + buf: []byte{0x00, 0x00}, // header only, no pf + want: "truncated pf", + }, + { + name: "truncated r.per marker", + buf: append([]byte{byte(bitPer), 0x00}, make([]byte, pfSize)...), // header + pf, no per marker + want: "truncated r.per marker", + }, + { + name: "truncated r.per", + buf: append([]byte{byte(bitPer), 0x00}, append(make([]byte, pfSize), msgpUint32)...), // header + pf + per marker, no per data + want: "truncated r.per", + }, + { + name: "truncated dig", + buf: append([]byte{byte(bitDig), 0x00}, make([]byte, pfSize)...), // header + pf, no dig + want: "truncated dig", + }, + { + name: "truncated encdig", + // When bitDig is not set but bitEncDig is set, we expect encdig directly after pf + buf: append([]byte{byte(bitEncDig), 0x00}, make([]byte, pfSize)...), // header + pf, no encdig + want: "truncated encdig", + }, + { + name: "truncated oper marker", + buf: append([]byte{byte(bitOper), 0x00}, make([]byte, pfSize)...), // header + pf, no oper marker + want: "truncated oper marker", + }, + { + name: "truncated oper", + buf: append([]byte{byte(bitOper), 0x00}, append(make([]byte, pfSize), msgpUint32)...), // header + pf + oper marker, no oper data + want: "truncated oper", + }, + { + name: "truncated oprop", + buf: append([]byte{byte(bitOprop), 0x00}, make([]byte, pfSize)...), // header + pf, no oprop + want: "truncated oprop", + }, + { + name: "truncated rnd marker", + buf: append([]byte{0x00, 0x00}, make([]byte, pfSize)...), // header + pf, no rnd marker + want: "truncated rnd marker", + }, + { + name: "truncated rnd", + buf: append([]byte{0x00, 0x00}, append(make([]byte, pfSize), msgpUint32)...), // header + pf + rnd marker, no rnd data + want: "truncated rnd", + }, + { + name: "truncated sender", + buf: append([]byte{0x00, 0x00}, append(make([]byte, pfSize), 0x07)...), // header + pf + rnd (fixint), no sender + want: "truncated sender", + }, + { + name: "truncated step marker", + buf: append([]byte{byte(bitStep), 0x00}, append(make([]byte, pfSize), append([]byte{0x07}, make([]byte, digestSize)...)...)...), // header + pf + rnd + sender, no step marker + want: "truncated step marker", + }, + { + name: "truncated step", + buf: append([]byte{byte(bitStep), 0x00}, append(make([]byte, pfSize), append([]byte{0x07}, append(make([]byte, digestSize), msgpUint32)...)...)...), // header + pf + rnd + sender + step marker, no step data + want: "truncated step", + }, + { + name: "truncated pk bundle", + buf: append([]byte{0x00, 0x00}, append(make([]byte, pfSize), append([]byte{0x07}, make([]byte, digestSize)...)...)...), // header + pf + rnd + sender, no pk bundle + want: "truncated pk bundle", + }, + { + name: "truncated pk2 bundle", + buf: append([]byte{0x00, 0x00}, append(make([]byte, pfSize), append([]byte{0x07}, append(make([]byte, digestSize), make([]byte, pkSize+sigSize)...)...)...)...), // header + pf + rnd + sender + pk bundle, no pk2 bundle + want: "truncated pk2 bundle", + }, + { + name: "truncated sig.s", + buf: append([]byte{0x00, 0x00}, append(make([]byte, pfSize), append([]byte{0x07}, append(make([]byte, digestSize), append(make([]byte, pkSize+sigSize), make([]byte, pkSize+sigSize)...)...)...)...)...), // everything except sig.s + want: "truncated sig.s", + }, + { + name: "invalid r.per marker", + buf: append([]byte{byte(bitPer), 0x00}, append(make([]byte, pfSize), 0xFF)...), // header + pf + invalid per marker + want: "invalid r.per marker", + }, + { + name: "invalid oper marker", + buf: append([]byte{byte(bitOper), 0x00}, append(make([]byte, pfSize), 0xFF)...), // header + pf + invalid oper marker + want: "invalid oper marker", + }, + { + name: "invalid rnd marker", + buf: append([]byte{0x00, 0x00}, append(make([]byte, pfSize), 0xFF)...), // header + pf + invalid rnd marker + want: "invalid rnd marker", + }, + { + name: "invalid step marker", + buf: append([]byte{byte(bitStep), 0x00}, append(make([]byte, pfSize), append([]byte{0x07}, append(make([]byte, digestSize), 0xFF)...)...)...), // header + pf + rnd + sender + invalid step marker + want: "invalid step marker", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := enc.Compress(nil, tc.buf) + require.ErrorContains(t, err, tc.want) + }) + } +} + +func TestStatefulEncoderHeaderBits(t *testing.T) { + partitiontest.PartitionTest(t) + // Ensure that the three bits allocated in hdr1 for proposal references + // matches the size of the proposal window. + got := int(hdr1PropMask >> hdr1PropShift) + require.Equal(t, proposalWindowSize, got, + "hdr1PropMask (%d) and proposalWindowSize (%d) must stay in sync", got, proposalWindowSize) + + // Ensure that the header encoding of hdr1RndLiteral is zero + require.Equal(t, hdr1RndLiteral, 0) +} diff --git a/network/vpack/lru_table.go b/network/vpack/lru_table.go new file mode 100644 index 0000000000..cde90f9f07 --- /dev/null +++ b/network/vpack/lru_table.go @@ -0,0 +1,138 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package vpack + +import ( + "errors" +) + +// lruTable is a fixed-size, 2-way set-associative hash table with 512 buckets. +// Each bucket contains exactly two entries, with LRU eviction on collision. +// The implementation is O(1) and zero-allocation during lookups and inserts. +// +// Each bucket has a MRU bit that encodes which of the two slots is MRU. The +// bit is set to 0 if the first slot is MRU, and 1 if the second slot is MRU. +// +// Reference IDs are encoded as (bucket << 1) | slot, where bucket is the index +// of the bucket and slot is the index of the slot within the bucket (0 or 1). +type lruTable[K comparable] struct { + numBuckets uint + buckets []twoSlotBucket[K] + mru []byte // 1 bit per bucket +} + +// newLRUTable creates a new LRU table with the given size N. +// The size N is the total number of entries in the table. +// The number of buckets is N/2, and each bucket contains 2 slots. +func newLRUTable[K comparable](N uint) (*lruTable[K], error) { + // enforce size is a power of 2 and at least 16 + if N < 16 || N&(N-1) != 0 { + return nil, errors.New("lruTable size must be a power of 2 and at least 16") + } + numBuckets := N / 2 + return &lruTable[K]{ + numBuckets: numBuckets, + buckets: make([]twoSlotBucket[K], numBuckets), + mru: make([]byte, numBuckets/8), + }, nil +} + +// twoSlotBucket is a 2-way set-associative bucket that contains two slots. +type twoSlotBucket[K comparable] struct{ slots [2]K } + +// lruBucketIndex is the index of a bucket in the LRU table. +type lruBucketIndex uint32 + +// lruSlotIndex is the index of a slot in a bucket, either 0 or 1. +type lruSlotIndex uint8 + +// lruTableReferenceID is the reference ID for a key in the LRU table. +type lruTableReferenceID uint16 + +// mruBitmask returns the byte index and bit mask for the MRU bit of bucket b. +func (t *lruTable[K]) mruBitmask(b lruBucketIndex) (byteIdx uint32, mask byte) { + byteIdx = uint32(b) >> 3 + bitIdx := b & 7 + mask = 1 << bitIdx + return byteIdx, mask +} + +// getLRUSlot returns the index of the LRU slot in bucket b +func (t *lruTable[K]) getLRUSlot(b lruBucketIndex) lruSlotIndex { + byteIdx, mask := t.mruBitmask(b) + if (t.mru[byteIdx] & mask) == 0 { + return 1 // this bucket's bit is 0, meaning slot 1 is LRU + } + return 0 // this bucket's bit is 1, meaning slot 0 is LRU +} + +// setMRUSlot marks the given bucket and slot index as MRU +func (t *lruTable[K]) setMRUSlot(b lruBucketIndex, slot lruSlotIndex) { + byteIdx, mask := t.mruBitmask(b) + if slot == 0 { // want to set slot 0 to be MRU, so bucket bit should be 0 + t.mru[byteIdx] &^= mask + } else { // want to set slot 1 to be MRU, so bucket bit should be 1 + t.mru[byteIdx] |= mask + } +} + +func (t *lruTable[K]) hashToBucketIndex(h uint64) lruBucketIndex { + // Use the lower bits of the hash to determine the bucket index. + return lruBucketIndex(h & uint64(t.numBuckets-1)) +} + +// lookup returns the reference ID of the given key, if it exists. The hash is +// used to determine the bucket, and the key is used to determine the slot. +// A lookup marks the found key as MRU. +func (t *lruTable[K]) lookup(k K, h uint64) (id lruTableReferenceID, ok bool) { + b := t.hashToBucketIndex(h) + bk := &t.buckets[b] + if bk.slots[0] == k { + t.setMRUSlot(b, 0) + return lruTableReferenceID(b << 1), true + } + if bk.slots[1] == k { + t.setMRUSlot(b, 1) + return lruTableReferenceID(b<<1 | 1), true + } + return 0, false +} + +// insert inserts the given key into the table and returns its reference ID. +// The hash is used to determine the bucket, and the LRU slot is used to +// determine the slot. The inserted key is marked as MRU. +func (t *lruTable[K]) insert(k K, h uint64) lruTableReferenceID { + b := t.hashToBucketIndex(h) + evict := t.getLRUSlot(b) // LRU slot + t.buckets[b].slots[evict] = k + t.setMRUSlot(b, evict) // new key -> MRU + return lruTableReferenceID((lruTableReferenceID(b) << 1) | lruTableReferenceID(evict)) +} + +// fetch returns the key by id and marks it as MRU. If the id is invalid, it +// returns false (leading to a decoder error). The key is marked as MRU. +func (t *lruTable[K]) fetch(id lruTableReferenceID) (K, bool) { + b := lruBucketIndex(id >> 1) + slot := lruSlotIndex(id & 1) + if b >= lruBucketIndex(t.numBuckets) { // invalid id + var zero K + return zero, false + } + // touch MRU bit + t.setMRUSlot(b, slot) + return t.buckets[b].slots[slot], true +} diff --git a/network/vpack/lru_table_test.go b/network/vpack/lru_table_test.go new file mode 100644 index 0000000000..8abf267a3b --- /dev/null +++ b/network/vpack/lru_table_test.go @@ -0,0 +1,364 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package vpack + +import ( + "encoding/binary" + "hash/fnv" + "testing" + "testing/quick" + + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/test/partitiontest" + "github.com/stretchr/testify/require" +) + +func TestLRUTableSizeValidation(t *testing.T) { + partitiontest.PartitionTest(t) + + // Test invalid size (not power of 2) + _, err := NewStatefulEncoder(100) + require.Error(t, err) + require.Contains(t, err.Error(), "must be a power of 2") + + // Test invalid size (too small) + _, err = NewStatefulEncoder(8) + require.Error(t, err) + require.Contains(t, err.Error(), "at least 16") + + // Test valid sizes + for _, size := range []uint{16, 32, 64, 128, 256, 512, 1024, 2048} { + enc, err := NewStatefulEncoder(size) + require.NoError(t, err) + require.NotNil(t, enc) + + dec, err := NewStatefulDecoder(size) + require.NoError(t, err) + require.NotNil(t, dec) + } +} + +// TestLRUTableInvalidID tests the fetch function with an invalid ID +func TestLRUTableInvalidID(t *testing.T) { + partitiontest.PartitionTest(t) + + // Test fetch with invalid ID (greater than table size) + table, err := newLRUTable[pkSigPair](1024) + require.NoError(t, err) + var invalidID lruTableReferenceID = 1024 // greater than numBuckets (512) + result, ok := table.fetch(invalidID) + require.False(t, ok) + require.Equal(t, pkSigPair{}, result) +} + +func TestLRUTableInsertLookupFetch(t *testing.T) { + partitiontest.PartitionTest(t) + tab, err := newLRUTable[int](1024) + require.NoError(t, err) + + const bucketHash = 42 // deterministic hash for test + const baseID = bucketHash << 1 // slot-bit is OR-ed below + + // first insert on empty table sees MRU bit 0, so slot 1 is LRU + id1 := tab.insert(100, bucketHash) + // id1 is baseID | 1 (value was stored in slot 1) + require.EqualValues(t, baseID|1, id1) + // on insert, our slot 1 is now the MRU, so LRU is slot 0 + require.Equal(t, lruSlotIndex(0), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // lookup for same value and bucketHash returns the same ID + id, ok := tab.lookup(100, bucketHash) + require.True(t, ok) + require.EqualValues(t, id1, id) + // MRU/LRU is unchanged + require.Equal(t, lruSlotIndex(0), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // second insert with new value for same hash sees MRU bit 1, so slot 0 is LRU + id2 := tab.insert(200, bucketHash) + require.EqualValues(t, baseID, id2) + // MRU/LRU is flipped + require.Equal(t, lruSlotIndex(1), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // old key (100) is still in slot 1 + _, ok = tab.lookup(100, bucketHash) + require.True(t, ok) + // the act of lookup 100 flips the MRU bit to 1 + require.Equal(t, lruSlotIndex(0), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // lookup for 200 (slot 0) → MRU bit flips to 0 + _, ok = tab.lookup(200, bucketHash) + require.True(t, ok) + require.Equal(t, lruSlotIndex(1), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // third insert: evicts and replaces slot 1, and now MRU is slot 1 + id3 := tab.insert(300, bucketHash) + require.EqualValues(t, baseID|1, id3) + require.Equal(t, lruSlotIndex(0), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // fetch(id3) returns the value 300 and keeps the MRU bit at slot 1 + val, ok := tab.fetch(id3) + require.True(t, ok) + require.Equal(t, 300, val) + require.Equal(t, lruSlotIndex(0), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // after insert for a new value, slot 0 is evicted and assigned + id4 := tab.insert(400, bucketHash) + require.EqualValues(t, baseID, id4) + // now slot 1 is LRU + require.Equal(t, lruSlotIndex(1), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // fetch of 300 (slot 1) makes it the new MRU + val, ok = tab.fetch(id3) + require.True(t, ok) + require.Equal(t, 300, val) + require.Equal(t, lruSlotIndex(0), tab.getLRUSlot(lruBucketIndex(bucketHash))) + + // fetch of 400 (slot 0) makes it the new MRU + val, ok = tab.fetch(id4) + require.True(t, ok) + require.Equal(t, 400, val) + require.Equal(t, lruSlotIndex(1), tab.getLRUSlot(lruBucketIndex(bucketHash))) +} + +// TestLRUEvictionOrder verifies that the LRU table correctly evicts the least recently used item +// when inserting into a full bucket. This test will fail if the lruSlot implementation is incorrect. +func TestLRUEvictionOrder(t *testing.T) { + partitiontest.PartitionTest(t) + tab, err := newLRUTable[int](1024) + require.NoError(t, err) + bucketHash := uint64(42) // Use same hash to ensure both items go into the same bucket + + // Insert first value + id1 := tab.insert(100, bucketHash) + val1, ok := tab.fetch(id1) + require.True(t, ok) + require.Equal(t, 100, val1) + + // Insert second value to the same bucket + id2 := tab.insert(200, bucketHash) + val2, ok := tab.fetch(id2) + require.True(t, ok) + require.Equal(t, 200, val2) + + // Both values should still be accessible + refID, ok := tab.lookup(100, bucketHash) + require.True(t, ok, "First inserted value should still exist") + require.EqualValues(t, id1, refID, "Reference ID for first value should match") + + refID, ok = tab.lookup(200, bucketHash) + require.True(t, ok, "Second inserted value should exist") + require.EqualValues(t, id2, refID, "Reference ID for second value should match") + + // Access the first value to make it MRU + refID, ok = tab.lookup(100, bucketHash) + require.True(t, ok) + require.EqualValues(t, id1, refID) + + // Now the second value (200) should be LRU + // Insert a third value - it should evict the second value (200) + id3 := tab.insert(300, bucketHash) + val3, ok := tab.fetch(id3) + require.True(t, ok) + require.Equal(t, 300, val3) + + // First value should still be accessible + refID, ok = tab.lookup(100, bucketHash) + require.True(t, ok, "First value should still exist after third insert") + require.EqualValues(t, id1, refID) + + // Second value should have been evicted + refID, ok = tab.lookup(200, bucketHash) + require.False(t, ok, "Second value should be evicted as it was LRU") + require.EqualValues(t, 0, refID) + + // But the third value should be accessible + refID, ok = tab.lookup(300, bucketHash) + require.True(t, ok, "Third value should exist") + require.EqualValues(t, id3, refID) + + // Now make the third value MRU + refID, ok = tab.lookup(300, bucketHash) + require.True(t, ok) + require.EqualValues(t, id3, refID) + + // Insert a fourth value - it should evict the first value (100) + id4 := tab.insert(400, bucketHash) + val4, ok := tab.fetch(id4) + require.True(t, ok) + require.Equal(t, 400, val4) + + // First value should now be evicted + refID, ok = tab.lookup(100, bucketHash) + require.False(t, ok, "First value should now be evicted as it became LRU") + require.EqualValues(t, 0, refID) + + // Third and fourth values should be accessible + refID, ok = tab.lookup(300, bucketHash) + require.True(t, ok, "Third value should still exist") + require.EqualValues(t, id3, refID) + refID, ok = tab.lookup(400, bucketHash) + require.True(t, ok, "Fourth value should exist") + require.EqualValues(t, id4, refID) +} + +// TestLRURefIDConsistency verifies that reference IDs remain consistent +// and that fetch/lookup operations correctly mark items as MRU +func TestLRURefIDConsistency(t *testing.T) { + partitiontest.PartitionTest(t) + tab, err := newLRUTable[int](1024) + require.NoError(t, err) + bucketHash := uint64(42) + + // Insert and get reference ID + id1 := tab.insert(100, bucketHash) + + // Lookup should return the same reference ID + ref, ok := tab.lookup(100, bucketHash) + require.True(t, ok) + require.Equal(t, id1, ref, "Reference ID from lookup should match insert") + + // Fetch using the ID should return the correct value + val, ok := tab.fetch(id1) + require.True(t, ok) + require.Equal(t, 100, val, "Fetch should return the correct value") + + // Insert another value with same hash (same bucket) + id2 := tab.insert(200, bucketHash) + require.NotEqual(t, id1, id2, "Different values should have different reference IDs") + + // Both values should be accessible via their reference IDs + val1, ok1 := tab.fetch(id1) + val2, ok2 := tab.fetch(id2) + require.True(t, ok1) + require.True(t, ok2) + require.Equal(t, 100, val1) + require.Equal(t, 200, val2) +} + +// TestLRUErrorPaths tests the error paths in fetch operations to ensure 100% coverage +func TestLRUErrorPaths(t *testing.T) { + partitiontest.PartitionTest(t) + + // The lruTableSize in lru_table.go is 512, so we need to create an ID + // where the bucket index (id >> 1) exceeds this value + // If bucket index >= 512, fetch should return false + invalidBucketID := lruTableReferenceID(1024 << 1) // (1024 is > 512) + + // Create a decoder with an empty LRU table + dec, err := NewStatefulDecoder(1024) + require.NoError(t, err) + + // Attempt to access references with invalid bucket IDs + _, ok := dec.sndTable.fetch(invalidBucketID) + require.False(t, ok) + _, ok = dec.pkTable.fetch(invalidBucketID) + require.False(t, ok) + _, ok = dec.pk2Table.fetch(invalidBucketID) + require.False(t, ok) + + // Attempt to access an invalid proposal reference by looking up a proposal that doesn't exist + prop := proposalEntry{dig: crypto.Digest{1}, encdig: crypto.Digest{2}} + index := dec.proposalWindow.lookup(prop) + require.Equal(t, 0, index) +} + +func TestLRUTableQuick(t *testing.T) { + partitiontest.PartitionTest(t) + cfg := &quick.Config{MaxCount: 50000} + + hashfn := func(v uint32) uint64 { + // use FNV-1 for hashing test values + h64 := fnv.New64() + h64.Write([]byte(binary.LittleEndian.AppendUint32(nil, v))) + return h64.Sum64() + } + + // Property: when a third distinct value is inserted into a bucket, the + // previously least-recently-used (LRU) value must be evicted, while the + // previously most-recently-used (MRU) value survives. + prop := func(seq []uint32) bool { + tab, err := newLRUTable[uint32](1024) + require.NoError(t, err) + + // Per-bucket ordered list of values, index 0 == MRU, len<=2. + type order []uint32 + expectedState := make(map[lruBucketIndex]order) + + for _, v := range seq { + h := hashfn(v) + b := tab.hashToBucketIndex(h) + expectedBucket := expectedState[b] + + // First, try lookup. + if id, ok := tab.lookup(v, h); ok { + // Move found value to MRU position in state. + if len(expectedBucket) == 2 { + if expectedBucket[0] != v { + expectedBucket[0], expectedBucket[1] = v, expectedBucket[0] + } + } else if len(expectedBucket) == 1 { + expectedBucket[0] = v // already MRU + } + + // Round-trip fetch check. + fetched, okF := tab.fetch(id) + if !okF || fetched != v { + return false + } + expectedState[b] = expectedBucket + continue + } + + // Insert new distinct value. + _ = tab.insert(v, h) + // Update expected state. + switch len(expectedBucket) { + case 0: // Bucket was empty + expectedState[b] = order{v} + continue + case 1: // Bucket had one value + expectedState[b] = order{v, expectedBucket[0]} + continue + case 2: // Bucket was full, expect eviction of state[1] + lruVal := expectedBucket[1] + + // After insert: MRU is v, survivor should be previous MRU (state[0]) + expectedState[b] = order{v, expectedBucket[0]} + + // Check LRU really went away + if _, ok := tab.lookup(lruVal, h); ok { + return false + } + // The previous MRU MUST still be present + if _, ok := tab.lookup(expectedBucket[0], h); !ok { + return false + } + // The newly inserted value must be present + if _, ok := tab.lookup(v, h); !ok { + return false + } + default: // Should not happen + return false + } + } + return true + } + + if err := quick.Check(prop, cfg); err != nil { + t.Fatalf("quick-check failed: %v", err) + } +} diff --git a/network/vpack/msgp.go b/network/vpack/msgp.go index 0346eae866..b515d1fa3c 100644 --- a/network/vpack/msgp.go +++ b/network/vpack/msgp.go @@ -60,6 +60,26 @@ func isMsgpFixint(b byte) bool { return b>>7 == 0 } +// msgpVaruintRemaining looks at the first byte of a msgpack-encoded variable-length unsigned integer, +// and returns the number of bytes remaining in the encoded value (not including the first byte). +func msgpVaruintRemaining(first byte) (int, error) { + switch first { + case msgpUint8: + return 1, nil + case msgpUint16: + return 2, nil + case msgpUint32: + return 4, nil + case msgpUint64: + return 8, nil + default: + if !isMsgpFixint(first) { + return 0, fmt.Errorf("msgpVaruintRemaining: expected fixint or varuint tag, got 0x%02x", first) + } + return 0, nil + } +} + // msgpVoteParser provides a zero-allocation msgpVoteParser for vote messages. type msgpVoteParser struct { data []byte @@ -171,24 +191,14 @@ func (p *msgpVoteParser) readUintBytes() ([]byte, error) { if err != nil { return nil, err } + dataSize, err := msgpVaruintRemaining(b) + if err != nil { + return nil, err + } // fixint is a single byte containing marker and value - if isMsgpFixint(b) { + if dataSize == 0 { return p.data[startPos : startPos+1], nil } - // otherwise, we expect a tag byte followed by the value - var dataSize int - switch b { - case msgpUint8: - dataSize = 1 - case msgpUint16: - dataSize = 2 - case msgpUint32: - dataSize = 4 - case msgpUint64: - dataSize = 8 - default: - return nil, fmt.Errorf("expected uint tag, got 0x%02x", b) - } if err := p.ensureBytes(dataSize); err != nil { return nil, err } diff --git a/network/vpack/proposal_window.go b/network/vpack/proposal_window.go new file mode 100644 index 0000000000..b4004c4c24 --- /dev/null +++ b/network/vpack/proposal_window.go @@ -0,0 +1,96 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package vpack + +// proposalEntry contains all the values inside the r.prop map in a vote. +// Some fields may be omitted, so a mask is used to indicate which fields +// are present (bitDig, bitEncDig, bitOper, bitOprop). +type proposalEntry struct { + dig, encdig, oprop [digestSize]byte + operEnc [maxMsgpVaruintSize]byte // msgp varuint encoding of oper + operLen uint8 // length of operEnc + mask uint8 // which fields were present +} + +// proposalWindowSize is fixed because hdr[1] holds only 3 bits for the reference code +// (0 = literal, 1-7 = index). +const proposalWindowSize = 7 + +// propWindow implements a small sliding window for vote proposal bundles. +// It behaves like the dynamic table defined in RFC 7541 (HPACK), but is limited +// to 7 entries, encoded using 3 bits in the header byte. This is enough to +// provide effective compression, since usually almost all the votes in a round +// are for the same proposal value. +type propWindow struct { + entries [proposalWindowSize]proposalEntry // circular buffer + head int // slot of the oldest entry + size int // number of live entries (0 ... windowSize) +} + +// lookup returns the 1-based HPACK index of pv. It walks from the oldest entry +// to the newest; worst-case is seven comparisons, which is fine for such a +// small table. Returns 0 if not found. +func (w *propWindow) lookup(pv proposalEntry) int { + for i := range w.size { + slot := (w.head + i) % proposalWindowSize // oldest first + if w.entries[slot] == pv { + // Convert position to HPACK index. + // Example: size == 7 + // i == 0 (oldest) -> index 7 + // i == 1 -> index 6 + // i == 2 -> index 5 + // ... + // i == 6 (newest) -> index 1 + return w.size - i + } + } + return 0 +} + +// byRef returns the proposalEntry stored at HPACK index idx (1 ... w.size). +// ok == false if idx is out of range. +func (w *propWindow) byRef(idx int) (prop proposalEntry, ok bool) { + if idx < 1 || idx > w.size { + return proposalEntry{}, false + } + // convert HPACK index (1 == newest, w.size == oldest) to physical slot + // newest slot is (head + size - 1) % windowSize + // logical slot idx is (idx - 1) positions from newest + physical := (w.head + w.size - idx) % proposalWindowSize + // Example: size == 7, head == 2 + // logical idx == 1 (newest) -> slot (2 + 7 - 1) % 7 == slot 1 + // logical idx == 2 -> slot (2 + 7 - 2) % 7 == slot 0 + // logical idx == 3 -> slot (2 + 7 - 3) % 7 == slot 6 + // ... + // logical idx == 7 (oldest) -> slot (2 + 7 - 7) % 7 == slot 2 + return w.entries[physical], true +} + +// insertNew puts pv into the table as the newest entry (HPACK index 1). +// When the table is full, the oldest one is overwritten. +func (w *propWindow) insertNew(pv proposalEntry) { + if w.size == proposalWindowSize { + // Evict the oldest element at w.head, then advance head. + w.entries[w.head] = pv + w.head = (w.head + 1) % proposalWindowSize + } else { + // Store at the slot just after the current newest. + pos := (w.head + w.size) % proposalWindowSize + w.entries[pos] = pv + w.size++ + } +} diff --git a/network/vpack/proposal_window_test.go b/network/vpack/proposal_window_test.go new file mode 100644 index 0000000000..1ec9a9dab5 --- /dev/null +++ b/network/vpack/proposal_window_test.go @@ -0,0 +1,79 @@ +// Copyright (C) 2019-2025 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package vpack + +import ( + "testing" + + "github.com/algorand/go-algorand/test/partitiontest" + "github.com/stretchr/testify/require" +) + +func makeTestPropBundle(seed byte) proposalEntry { + var p proposalEntry + for i := range p.dig { + p.dig[i] = seed + } + p.operLen = 1 + p.operEnc[0] = seed + p.mask = bitDig | bitOper + return p +} + +func TestPropWindowHPACK(t *testing.T) { + partitiontest.PartitionTest(t) + var w propWindow + + // 1. Insert seven unique entries (fills the window). + for i := 0; i < proposalWindowSize; i++ { + pb := makeTestPropBundle(byte(i)) + w.insertNew(pb) + require.Equal(t, i+1, w.size, "size incorrect after insertNew") + // Newly inserted entry should always be HPACK index 1 (MRU). + require.Equal(t, 1, w.lookup(pb), "lookup did not return 1") + } + + // 2. Verify byRef/lookup mapping for current content. + for idx := 1; idx <= proposalWindowSize; idx++ { + prop, ok := w.byRef(idx) + require.True(t, ok) + expectedSeed := byte(proposalWindowSize - idx) // newest (idx==1) == seed 6, oldest (idx==7) == seed 0 + want := makeTestPropBundle(expectedSeed) + require.Equal(t, want, prop) + } + + // 3. Insert an eighth entry – should evict the oldest (seed 0). + evicted := makeTestPropBundle(0) + newEntry := makeTestPropBundle(7) + w.insertNew(newEntry) + require.Equal(t, proposalWindowSize, w.size, "size after eviction incorrect") + + // Oldest should now be former seed 1, and evicted one should not be found. + require.Equal(t, 0, w.lookup(evicted), "evicted entry still found") + + // New entry must be at HPACK index 1. + require.Equal(t, 1, w.lookup(newEntry), "newest entry lookup not 1") + + // Verify byRef again: idx 1 == seed 7, idx 7 == seed 1 + prop, ok := w.byRef(1) + require.True(t, ok) + require.Equal(t, newEntry, prop) + + prop, ok = w.byRef(proposalWindowSize) + require.True(t, ok) + require.Equal(t, makeTestPropBundle(1), prop) +} diff --git a/network/vpack/vpack.go b/network/vpack/vpack.go index 05e2c5861d..18b552ed91 100644 --- a/network/vpack/vpack.go +++ b/network/vpack/vpack.go @@ -39,7 +39,7 @@ const ( ) const ( - headerSize = 2 // 1 byte for mask, 1 byte for future use + headerSize = 2 // 1 byte for StatelessEncoder, 1 byte for StatefulEncoder maxMsgpVaruintSize = 9 // max size of a varuint is 8 bytes + 1 byte for the marker msgpBin8Len32Size = len(msgpBin8Len32) + 32 @@ -376,21 +376,9 @@ func (d *StatelessDecoder) varuint(fieldName string) error { return fmt.Errorf("not enough data to read varuint marker for field %s", fieldName) } marker := d.src[d.pos] // read msgpack varuint marker - moreBytes := 0 - switch marker { - case msgpUint8: - moreBytes = 1 - case msgpUint16: - moreBytes = 2 - case msgpUint32: - moreBytes = 4 - case msgpUint64: - moreBytes = 8 - default: // fixint uses a single byte for marker+value - if !isMsgpFixint(marker) { - return fmt.Errorf("not a fixint for field %s, got %d", fieldName, marker) - } - moreBytes = 0 + moreBytes, err := msgpVaruintRemaining(marker) + if err != nil { + return fmt.Errorf("invalid varuint marker %d for field %s: %w", marker, fieldName, err) } if d.pos+1+moreBytes > len(d.src) {