diff --git a/agreement/actions.go b/agreement/actions.go
index 833dc8af36..91276e22fc 100644
--- a/agreement/actions.go
+++ b/agreement/actions.go
@@ -19,6 +19,7 @@ package agreement
import (
"context"
"fmt"
+ "time"
"github.com/algorand/go-algorand/logging/logspec"
"github.com/algorand/go-algorand/logging/telemetryspec"
@@ -209,6 +210,11 @@ type ensureAction struct {
Payload proposal
// the certificate proving commitment
Certificate Certificate
+
+ // The time that the winning proposal-vote was validated, relative to the beginning of the round
+ voteValidatedAt time.Duration
+ // The dynamic filter timeout calculated for this round, even if not enabled, for reporting to telemetry.
+ dynamicFilterTimeout time.Duration
}
func (a ensureAction) t() actionType {
@@ -231,14 +237,16 @@ func (a ensureAction) do(ctx context.Context, s *Service) {
logEvent.Type = logspec.RoundConcluded
s.log.with(logEvent).Infof("committed round %d with pre-validated block %v", a.Certificate.Round, a.Certificate.Proposal)
s.log.EventWithDetails(telemetryspec.Agreement, telemetryspec.BlockAcceptedEvent, telemetryspec.BlockAcceptedEventDetails{
- Address: a.Certificate.Proposal.OriginalProposer.String(),
- Hash: a.Certificate.Proposal.BlockDigest.String(),
- Round: uint64(a.Certificate.Round),
- ValidatedAt: a.Payload.validatedAt,
- ReceivedAt: a.Payload.receivedAt,
- PreValidated: true,
- PropBufLen: uint64(len(s.demux.rawProposals)),
- VoteBufLen: uint64(len(s.demux.rawVotes)),
+ Address: a.Certificate.Proposal.OriginalProposer.String(),
+ Hash: a.Certificate.Proposal.BlockDigest.String(),
+ Round: uint64(a.Certificate.Round),
+ ValidatedAt: a.Payload.validatedAt,
+ ReceivedAt: a.Payload.receivedAt,
+ VoteValidatedAt: a.voteValidatedAt,
+ DynamicFilterTimeout: a.dynamicFilterTimeout,
+ PreValidated: true,
+ PropBufLen: uint64(len(s.demux.rawProposals)),
+ VoteBufLen: uint64(len(s.demux.rawVotes)),
})
s.Ledger.EnsureValidatedBlock(a.Payload.ve, a.Certificate)
} else {
@@ -246,14 +254,16 @@ func (a ensureAction) do(ctx context.Context, s *Service) {
logEvent.Type = logspec.RoundConcluded
s.log.with(logEvent).Infof("committed round %d with block %v", a.Certificate.Round, a.Certificate.Proposal)
s.log.EventWithDetails(telemetryspec.Agreement, telemetryspec.BlockAcceptedEvent, telemetryspec.BlockAcceptedEventDetails{
- Address: a.Certificate.Proposal.OriginalProposer.String(),
- Hash: a.Certificate.Proposal.BlockDigest.String(),
- Round: uint64(a.Certificate.Round),
- ValidatedAt: a.Payload.validatedAt,
- ReceivedAt: a.Payload.receivedAt,
- PreValidated: false,
- PropBufLen: uint64(len(s.demux.rawProposals)),
- VoteBufLen: uint64(len(s.demux.rawVotes)),
+ Address: a.Certificate.Proposal.OriginalProposer.String(),
+ Hash: a.Certificate.Proposal.BlockDigest.String(),
+ Round: uint64(a.Certificate.Round),
+ ValidatedAt: a.Payload.validatedAt,
+ ReceivedAt: a.Payload.receivedAt,
+ VoteValidatedAt: a.voteValidatedAt,
+ DynamicFilterTimeout: a.dynamicFilterTimeout,
+ PreValidated: false,
+ PropBufLen: uint64(len(s.demux.rawProposals)),
+ VoteBufLen: uint64(len(s.demux.rawVotes)),
})
s.Ledger.EnsureBlock(block, a.Certificate)
}
diff --git a/agreement/credentialArrivalHistory.go b/agreement/credentialArrivalHistory.go
new file mode 100644
index 0000000000..b4bc48c976
--- /dev/null
+++ b/agreement/credentialArrivalHistory.go
@@ -0,0 +1,83 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package agreement
+
+import (
+ "sort"
+ "time"
+)
+
+// credentialArrivalHistory maintains a circular buffer of time.Duration samples.
+type credentialArrivalHistory struct {
+ history []time.Duration
+ writePtr int
+ full bool
+}
+
+func makeCredentialArrivalHistory(size int) credentialArrivalHistory {
+ if size < 0 {
+ panic("can't create CredentialArrivalHistory with negative size")
+ }
+ history := credentialArrivalHistory{history: make([]time.Duration, size)}
+ history.reset()
+ return history
+}
+
+// store saves a new sample into the circular buffer.
+// If the buffer is full, it overwrites the oldest sample.
+func (history *credentialArrivalHistory) store(sample time.Duration) {
+ if len(history.history) == 0 {
+ return
+ }
+
+ history.history[history.writePtr] = sample
+ history.writePtr++
+ if history.writePtr == len(history.history) {
+ history.full = true
+ history.writePtr = 0
+ }
+}
+
+// reset marks the history buffer as empty
+func (history *credentialArrivalHistory) reset() {
+ history.writePtr = 0
+ history.full = false
+}
+
+// isFull checks if the circular buffer has been fully populated at least once.
+func (history *credentialArrivalHistory) isFull() bool {
+ return history.full
+}
+
+// orderStatistics returns the idx'th time duration in the sorted history array.
+// It assumes that history is full and the idx is within the array bounds, and
+// panics if either of these assumptions doesn't hold.
+func (history *credentialArrivalHistory) orderStatistics(idx int) time.Duration {
+ if !history.isFull() {
+ panic("history not full")
+ }
+ if idx < 0 || idx >= len(history.history) {
+ panic("index out of bounds")
+ }
+
+ // if history.history is long, then we could optimize this function to use
+ // the linear time order statistics algorithm.
+ sortedArrivals := make([]time.Duration, len(history.history))
+ copy(sortedArrivals[:], history.history[:])
+ sort.Slice(sortedArrivals, func(i, j int) bool { return sortedArrivals[i] < sortedArrivals[j] })
+ return sortedArrivals[idx]
+}
diff --git a/agreement/credentialArrivalHistory_test.go b/agreement/credentialArrivalHistory_test.go
new file mode 100644
index 0000000000..8626094dd8
--- /dev/null
+++ b/agreement/credentialArrivalHistory_test.go
@@ -0,0 +1,129 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package agreement
+
+import (
+ "testing"
+ "time"
+
+ "github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/stretchr/testify/require"
+)
+
+func TestCredentialHistoryStore(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ size := 5
+ buffer := makeCredentialArrivalHistory(size)
+ // last store call overwrites the first one
+ for i := 0; i < size+1; i++ {
+ buffer.store(time.Duration(i))
+ }
+
+ require.True(t, buffer.isFull())
+ require.Equal(t, time.Duration(size), buffer.history[0])
+ for i := 1; i < size; i++ {
+ require.Equal(t, time.Duration(i), buffer.history[i])
+ }
+}
+
+func TestCredentialHistoryReset(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ size := 5
+ buffer := makeCredentialArrivalHistory(size)
+ // last store call overwrites the first one
+ for i := 0; i < size+1; i++ {
+ buffer.store(time.Duration(i))
+ }
+
+ require.Equal(t, time.Duration(size), buffer.history[0])
+ for i := 1; i < size; i++ {
+ require.Equal(t, time.Duration(i), buffer.history[i])
+ }
+ require.True(t, buffer.isFull())
+ buffer.reset()
+ require.False(t, buffer.isFull())
+ buffer.store(time.Duration(100))
+ require.Equal(t, time.Duration(100), buffer.history[0])
+}
+
+func TestCredentialHistoryIsFull(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ var buffer credentialArrivalHistory
+ require.False(t, buffer.isFull())
+
+ size := 5
+ buffer = makeCredentialArrivalHistory(size)
+ require.False(t, buffer.isFull())
+
+ for i := 1; i < size+10; i++ {
+ buffer.store(time.Duration(i))
+ if i < size {
+ require.False(t, buffer.isFull())
+ } else {
+ require.True(t, buffer.isFull())
+ }
+ }
+
+ // reset the buffer and then fill it again
+ buffer.reset()
+ require.False(t, buffer.isFull())
+
+ for i := 1; i < size+10; i++ {
+ buffer.store(time.Duration(i))
+ if i < size {
+ require.False(t, buffer.isFull())
+ } else {
+ require.True(t, buffer.isFull())
+ }
+ }
+}
+
+func TestCredentialHisotyZeroSize(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var buffer credentialArrivalHistory
+ require.False(t, buffer.isFull())
+
+ size := 0
+ buffer = makeCredentialArrivalHistory(size)
+ require.False(t, buffer.isFull())
+
+ // trying to store new samples won't panic but the history is never full
+ for i := 0; i < size+10; i++ {
+ buffer.store(time.Duration(i))
+ require.False(t, buffer.isFull())
+ }
+}
+
+func TestOrderStatistics(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ size := 5
+ buffer := makeCredentialArrivalHistory(size)
+ require.False(t, buffer.isFull())
+
+ for i := 0; i < size; i++ {
+ buffer.store(time.Duration(size - i))
+ }
+ require.True(t, buffer.isFull())
+
+ for i := 0; i < size; i++ {
+ require.Equal(t, time.Duration(i+1), buffer.orderStatistics(i))
+ }
+}
diff --git a/agreement/demux.go b/agreement/demux.go
index c225396293..60ea2a8dd5 100644
--- a/agreement/demux.go
+++ b/agreement/demux.go
@@ -199,9 +199,14 @@ func (d *demux) next(s *Service, deadline Deadline, fastDeadline Deadline, curre
switch e.t() {
case payloadVerified:
- e = e.(messageEvent).AttachValidatedAt(s.Clock.Since())
+ e = e.(messageEvent).AttachValidatedAt(s.Clock.Since(), currentRound)
case payloadPresent, votePresent:
- e = e.(messageEvent).AttachReceivedAt(s.Clock.Since())
+ e = e.(messageEvent).AttachReceivedAt(s.Clock.Since(), currentRound)
+ case voteVerified:
+ // if this is a proposal vote (step 0), record the validatedAt time on the vote
+ if e.(messageEvent).Input.Vote.R.Step == 0 {
+ e = e.(messageEvent).AttachValidatedAt(s.Clock.Since(), currentRound)
+ }
}
}()
diff --git a/agreement/dynamicFilterTimeoutParams.go b/agreement/dynamicFilterTimeoutParams.go
new file mode 100644
index 0000000000..36348615b5
--- /dev/null
+++ b/agreement/dynamicFilterTimeoutParams.go
@@ -0,0 +1,45 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package agreement
+
+import "time"
+
+// This file contains parameters for the dynamic filter timeout mechanism. When
+// this feature is enabled (dynamicFilterTimeout is true), these parameters
+// should migrate to be consensus params.
+
+// DynamicFilterCredentialArrivalHistory specifies the number of past
+// credential arrivals that are measured to determine the next filter
+// timeout. If DynamicFilterCredentialArrivalHistory <= 0, then the dynamic
+// timeout feature is off and the filter step timeout is calculated using
+// the static configuration.
+const dynamicFilterCredentialArrivalHistory int = 40
+
+// DynamicFilterTimeoutLowerBound specifies a minimal duration that the
+// filter timeout must meet.
+const dynamicFilterTimeoutLowerBound time.Duration = 500 * time.Millisecond
+
+// DynamicFilterTimeoutCredentialArrivalHistoryIdx specified which sample to use
+// out of a sorted DynamicFilterCredentialArrivalHistory-sized array of time
+// samples. The 95th percentile of dynamicFilterCredentialArrivalHistory = 40
+// sorted samples, is at index 37.
+const dynamicFilterTimeoutCredentialArrivalHistoryIdx int = 37
+
+// DynamicFilterTimeoutGraceInterval is additional extension to the dynamic
+// filter time atop the one calculated based on the history of credential
+// arrivals.
+const dynamicFilterTimeoutGraceInterval time.Duration = 50 * time.Millisecond
diff --git a/agreement/dynamicFilterTimeoutParams_test.go b/agreement/dynamicFilterTimeoutParams_test.go
new file mode 100644
index 0000000000..7d03de7dea
--- /dev/null
+++ b/agreement/dynamicFilterTimeoutParams_test.go
@@ -0,0 +1,41 @@
+// Copyright (C) 2019-2023 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package agreement
+
+import (
+ "testing"
+ "time"
+
+ "github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/stretchr/testify/require"
+)
+
+func TestSampleIndexIsValid(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ require.GreaterOrEqual(t, dynamicFilterCredentialArrivalHistory, 0)
+ require.GreaterOrEqual(t, dynamicFilterTimeoutCredentialArrivalHistoryIdx, 0)
+ if dynamicFilterCredentialArrivalHistory > 0 {
+ require.Less(t, dynamicFilterTimeoutCredentialArrivalHistoryIdx, dynamicFilterCredentialArrivalHistory)
+ }
+}
+
+func TestLowerBound(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ require.Less(t, 20*time.Millisecond, dynamicFilterTimeoutLowerBound)
+}
diff --git a/agreement/events.go b/agreement/events.go
index 52737e5f2c..faa4badebe 100644
--- a/agreement/events.go
+++ b/agreement/events.go
@@ -197,6 +197,10 @@ const (
// readPinned is sent to the proposalStore to read the pinned value, if it exists.
readPinned
+ // readLowestVote is sent to the proposalPeriodMachine to read the
+ // proposal-vote with the lowest credential.
+ readLowestVote
+
/*
* The following are event types that replace queries, and may warrant
* a revision to make them more state-machine-esque.
@@ -407,6 +411,36 @@ func (e newRoundEvent) ComparableStr() string {
return e.String()
}
+type readLowestEvent struct {
+ // T currently only supports readLowestVote
+ T eventType
+
+ // Round and Period are the round and period for which to query the
+ // lowest-credential vote, value or payload. This type of event is only
+ // sent for reading the lowest period 0 credential, but the Period is here
+ // anyway to route to the appropriate proposalMachinePeriod.
+ Round round
+ Period period
+
+ // Vote holds the lowest-credential vote.
+ Vote vote
+
+ // Filled indicates whether the Vote field is filled
+ Filled bool
+}
+
+func (e readLowestEvent) t() eventType {
+ return e.T
+}
+
+func (e readLowestEvent) String() string {
+ return fmt.Sprintf("%s: %d %d", e.t().String(), e.Round, e.Period)
+}
+
+func (e readLowestEvent) ComparableStr() string {
+ return e.String()
+}
+
type newPeriodEvent struct {
// Period holds the latest period relevant to the proposalRoundMachine.
Period period
@@ -941,17 +975,37 @@ func (e checkpointEvent) AttachConsensusVersion(v ConsensusVersionView) external
return e
}
-func (e messageEvent) AttachValidatedAt(d time.Duration) messageEvent {
- e.Input.Proposal.validatedAt = d
+// AttachValidatedAt looks for a validated proposal or vote inside a
+// payloadVerified or voteVerified messageEvent, and attaches the given time to
+// the proposal's validatedAt field.
+func (e messageEvent) AttachValidatedAt(d time.Duration, currentRound round) messageEvent {
+ switch e.T {
+ case payloadVerified:
+ if e.Input.Proposal.Round() > currentRound {
+ e.Input.Proposal.validatedAt = 1
+ } else {
+ e.Input.Proposal.validatedAt = d
+ }
+ case voteVerified:
+ if e.Input.Vote.R.Round > currentRound {
+ e.Input.Vote.validatedAt = 1
+ } else {
+ e.Input.Vote.validatedAt = d
+ }
+ }
return e
}
// AttachReceivedAt looks for an unauthenticatedProposal inside a
// payloadPresent or votePresent messageEvent, and attaches the given
// time to the proposal's receivedAt field.
-func (e messageEvent) AttachReceivedAt(d time.Duration) messageEvent {
+func (e messageEvent) AttachReceivedAt(d time.Duration, currentRound round) messageEvent {
if e.T == payloadPresent {
- e.Input.UnauthenticatedProposal.receivedAt = d
+ if e.Input.UnauthenticatedProposal.Round() > currentRound {
+ e.Input.UnauthenticatedProposal.receivedAt = 1
+ } else {
+ e.Input.UnauthenticatedProposal.receivedAt = d
+ }
} else if e.T == votePresent {
// Check for non-nil Tail, indicating this votePresent event
// contains a synthetic payloadPresent event that was attached
@@ -960,7 +1014,11 @@ func (e messageEvent) AttachReceivedAt(d time.Duration) messageEvent {
// The tail event is payloadPresent, serialized together
// with the proposal vote as a single CompoundMessage
// using a protocol.ProposalPayloadTag network message.
- e.Tail.Input.UnauthenticatedProposal.receivedAt = d
+ if e.Tail.Input.UnauthenticatedProposal.Round() > currentRound {
+ e.Tail.Input.UnauthenticatedProposal.receivedAt = 1
+ } else {
+ e.Tail.Input.UnauthenticatedProposal.receivedAt = d
+ }
}
}
return e
diff --git a/agreement/eventtype_string.go b/agreement/eventtype_string.go
index 9da84c1b98..9973215b12 100644
--- a/agreement/eventtype_string.go
+++ b/agreement/eventtype_string.go
@@ -37,21 +37,22 @@ func _() {
_ = x[newPeriod-26]
_ = x[readStaging-27]
_ = x[readPinned-28]
- _ = x[voteFilterRequest-29]
- _ = x[voteFilteredStep-30]
- _ = x[nextThresholdStatusRequest-31]
- _ = x[nextThresholdStatus-32]
- _ = x[freshestBundleRequest-33]
- _ = x[freshestBundle-34]
- _ = x[dumpVotesRequest-35]
- _ = x[dumpVotes-36]
- _ = x[wrappedAction-37]
- _ = x[checkpointReached-38]
+ _ = x[readLowestVote-29]
+ _ = x[voteFilterRequest-30]
+ _ = x[voteFilteredStep-31]
+ _ = x[nextThresholdStatusRequest-32]
+ _ = x[nextThresholdStatus-33]
+ _ = x[freshestBundleRequest-34]
+ _ = x[freshestBundle-35]
+ _ = x[dumpVotesRequest-36]
+ _ = x[dumpVotes-37]
+ _ = x[wrappedAction-38]
+ _ = x[checkpointReached-39]
}
-const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedvoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached"
+const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedreadLowestVotevoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached"
-var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 131, 144, 157, 176, 192, 204, 217, 231, 246, 261, 277, 293, 308, 322, 334, 342, 351, 362, 372, 389, 405, 431, 450, 471, 485, 501, 510, 523, 540}
+var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 131, 144, 157, 176, 192, 204, 217, 231, 246, 261, 277, 293, 308, 322, 334, 342, 351, 362, 372, 386, 403, 419, 445, 464, 485, 499, 515, 524, 537, 554}
func (i eventType) String() string {
if i >= eventType(len(_eventType_index)-1) {
diff --git a/agreement/msgp_gen.go b/agreement/msgp_gen.go
index d083edd8e8..5a5efa7ddc 100644
--- a/agreement/msgp_gen.go
+++ b/agreement/msgp_gen.go
@@ -3950,7 +3950,7 @@ func (z *player) MarshalMsg(b []byte) (o []byte) {
o = msgp.Require(b, z.Msgsize())
// omitempty: check for empty values
zb0001Len := uint32(9)
- var zb0001Mask uint16 /* 10 bits */
+ var zb0001Mask uint16 /* 12 bits */
if (*z).OldDeadline == 0 {
zb0001Len--
zb0001Mask |= 0x1
@@ -11155,7 +11155,7 @@ func (z *vote) MarshalMsg(b []byte) (o []byte) {
o = msgp.Require(b, z.Msgsize())
// omitempty: check for empty values
zb0001Len := uint32(3)
- var zb0001Mask uint8 /* 4 bits */
+ var zb0001Mask uint8 /* 5 bits */
if (*z).Cred.MsgIsZero() {
zb0001Len--
zb0001Mask |= 0x2
diff --git a/agreement/persistence.go b/agreement/persistence.go
index 0e5e75915a..819806b69a 100644
--- a/agreement/persistence.go
+++ b/agreement/persistence.go
@@ -228,7 +228,7 @@ func decode(raw []byte, t0 timers.Clock[TimeoutType], log serviceLogger, reflect
if err != nil {
return
}
-
+ p2.lowestCredentialArrivals = makeCredentialArrivalHistory(dynamicFilterCredentialArrivalHistory)
rr2 = makeRootRouter(p2)
err = protocol.DecodeReflect(s.Router, &rr2)
if err != nil {
@@ -244,6 +244,7 @@ func decode(raw []byte, t0 timers.Clock[TimeoutType], log serviceLogger, reflect
return
}
}
+ p2.lowestCredentialArrivals = makeCredentialArrivalHistory(dynamicFilterCredentialArrivalHistory)
if p2.OldDeadline != 0 {
p2.Deadline = Deadline{Duration: p2.OldDeadline, Type: TimeoutDeadline}
p2.OldDeadline = 0 // clear old value
diff --git a/agreement/persistence_test.go b/agreement/persistence_test.go
index ef738e86d6..a3cc99755d 100644
--- a/agreement/persistence_test.go
+++ b/agreement/persistence_test.go
@@ -38,7 +38,7 @@ func TestAgreementSerialization(t *testing.T) {
// todo : we need to deserialize some more meaningful state.
clock := timers.MakeMonotonicClock[TimeoutType](time.Date(2015, 1, 2, 5, 6, 7, 8, time.UTC))
- status := player{Round: 350, Step: soft, Deadline: Deadline{Duration: time.Duration(23) * time.Second, Type: TimeoutDeadline}}
+ status := player{Round: 350, Step: soft, Deadline: Deadline{Duration: time.Duration(23) * time.Second, Type: TimeoutDeadline}, lowestCredentialArrivals: makeCredentialArrivalHistory(dynamicFilterCredentialArrivalHistory)}
router := makeRootRouter(status)
a := []action{checkpointAction{}, disconnectAction(messageEvent{}, nil)}
@@ -218,6 +218,27 @@ func TestRandomizedEncodingFullDiskState(t *testing.T) {
}
+func TestCredentialHistoryAllocated(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ router, player := randomizeDiskState()
+ a := []action{}
+ clock := timers.MakeMonotonicClock[TimeoutType](time.Date(2015, 1, 2, 5, 6, 7, 8, time.UTC))
+ log := makeServiceLogger(logging.Base())
+ e1 := encode(clock, router, player, a, true)
+ e2 := encode(clock, router, player, a, false)
+ require.Equalf(t, e1, e2, "msgp and go-codec encodings differ: len(msgp)=%v, len(reflect)=%v", len(e1), len(e2))
+ _, _, p1, _, err1 := decode(e1, clock, log, true)
+ _, _, p2, _, err2 := decode(e1, clock, log, false)
+ require.NoErrorf(t, err1, "reflect decoding failed")
+ require.NoErrorf(t, err2, "msgp decoding failed")
+
+ require.Len(t, p1.lowestCredentialArrivals.history, dynamicFilterCredentialArrivalHistory)
+ require.Len(t, p2.lowestCredentialArrivals.history, dynamicFilterCredentialArrivalHistory)
+ emptyHistory := makeCredentialArrivalHistory(dynamicFilterCredentialArrivalHistory)
+ require.Equalf(t, p1.lowestCredentialArrivals, emptyHistory, "credential arrival history isn't empty")
+ require.Equalf(t, p2.lowestCredentialArrivals, emptyHistory, "credential arrival history isn't empty")
+}
+
func BenchmarkRandomizedEncode(b *testing.B) {
clock := timers.MakeMonotonicClock[TimeoutType](time.Date(2015, 1, 2, 5, 6, 7, 8, time.UTC))
router, player := randomizeDiskState()
diff --git a/agreement/player.go b/agreement/player.go
index a3451ea0db..72fe2f4556 100644
--- a/agreement/player.go
+++ b/agreement/player.go
@@ -58,6 +58,15 @@ type player struct {
// Pending holds the player's proposalTable, which stores proposals that
// must be verified after some vote has been verified.
Pending proposalTable
+
+ // the history of arrival times of the lowest credential from previous
+ // ronuds, used for calculating the filter timeout dynamically.
+ lowestCredentialArrivals credentialArrivalHistory
+
+ // The period 0 dynamic filter timeout calculated for this round, if set,
+ // even if dynamic filter timeouts are not enabled. It is used for reporting
+ // to telemetry.
+ dynamicFilterTimeout time.Duration
}
func (p *player) T() stateMachineTag {
@@ -266,6 +275,65 @@ func (p *player) handleCheckpointEvent(r routerHandle, e checkpointEvent) []acti
}}
}
+// updateCredentialArrivalHistory is called at the end of a successful
+// uninterrupted round (just after ensureAction is generated) to collect
+// credential arrival times to dynamically set the filter timeout.
+// It returns the time of the lowest credential's arrival, if one was
+// collected and added to lowestCredentialArrivals, or zero otherwise.
+func (p *player) updateCredentialArrivalHistory(r routerHandle, ver protocol.ConsensusVersion) time.Duration {
+ // only append to lowestCredentialArrivals if this was a successful round completing in period 0.
+ if p.Period != 0 {
+ return 0
+ }
+ // look up the validatedAt time of the winning proposal-vote
+ re := readLowestEvent{T: readLowestVote, Round: p.Round, Period: p.Period}
+ re = r.dispatch(*p, re, proposalMachineRound, p.Round, p.Period, 0).(readLowestEvent)
+ if !re.Filled {
+ return 0
+ }
+
+ p.lowestCredentialArrivals.store(re.Vote.validatedAt)
+ return re.Vote.validatedAt
+}
+
+// calculateFilterTimeout chooses the appropriate filter timeout.
+func (p *player) calculateFilterTimeout(ver protocol.ConsensusVersion, tracer *tracer) time.Duration {
+ proto := config.Consensus[ver]
+ if dynamicFilterCredentialArrivalHistory <= 0 || p.Period != 0 {
+ // Either dynamic filter timeout is disabled, or we're not in period 0
+ // and therefore, can't use dynamic timeout
+ return FilterTimeout(p.Period, ver)
+ }
+ defaultTimeout := FilterTimeout(0, ver)
+ if !p.lowestCredentialArrivals.isFull() {
+ // not enough samples, use the default
+ return defaultTimeout
+ }
+
+ dynamicTimeout := p.lowestCredentialArrivals.orderStatistics(dynamicFilterTimeoutCredentialArrivalHistoryIdx) + dynamicFilterTimeoutGraceInterval
+
+ // Make sure the dynamic filter timeout is not too small nor too large
+ clampedTimeout := dynamicTimeout
+ if clampedTimeout < dynamicFilterTimeoutLowerBound {
+ clampedTimeout = dynamicFilterTimeoutLowerBound
+ }
+ if clampedTimeout > defaultTimeout {
+ clampedTimeout = defaultTimeout
+ }
+ tracer.log.Debugf("round %d, period %d: dynamicTimeout = %d, clamped timeout = %d", p.Round, p.Period, dynamicTimeout, clampedTimeout)
+ // store dynamicFilterTimeout on the player for debugging & reporting
+ p.dynamicFilterTimeout = dynamicTimeout
+
+ if !proto.DynamicFilterTimeout {
+ // If the dynamic filter timeout is disabled, return the default filter
+ // timeout (after logging what the timeout would have been, if this
+ // feature were enabled).
+ return defaultTimeout
+ }
+
+ return clampedTimeout
+}
+
func (p *player) handleThresholdEvent(r routerHandle, e thresholdEvent) []action {
r.t.timeR().RecThreshold(e)
@@ -280,6 +348,8 @@ func (p *player) handleThresholdEvent(r routerHandle, e thresholdEvent) []action
if res.Committable {
cert := Certificate(e.Bundle)
a0 := ensureAction{Payload: res.Payload, Certificate: cert}
+ a0.voteValidatedAt = p.updateCredentialArrivalHistory(r, e.Proto)
+ a0.dynamicFilterTimeout = p.dynamicFilterTimeout
actions = append(actions, a0)
as := p.enterRound(r, e, p.Round+1)
return append(actions, as...)
@@ -333,7 +403,13 @@ func (p *player) enterPeriod(r routerHandle, source thresholdEvent, target perio
p.Step = soft
p.Napping = false
p.FastRecoveryDeadline = 0 // set immediately
- p.Deadline = Deadline{Duration: FilterTimeout(target, source.Proto), Type: TimeoutFilter}
+
+ if target != 0 {
+ // We entered a non-0 period, we should reset the filter timeout
+ // calculation mechanism.
+ p.lowestCredentialArrivals.reset()
+ }
+ p.Deadline = Deadline{Duration: p.calculateFilterTimeout(source.Proto, r.t), Type: TimeoutFilter}
// update tracer state to match player
r.t.setMetadata(tracerMetadata{p.Round, p.Period, p.Step})
@@ -381,11 +457,11 @@ func (p *player) enterRound(r routerHandle, source event, target round) []action
switch source := source.(type) {
case roundInterruptionEvent:
- p.Deadline = Deadline{Duration: FilterTimeout(0, source.Proto.Version), Type: TimeoutFilter}
+ p.Deadline = Deadline{Duration: p.calculateFilterTimeout(source.Proto.Version, r.t), Type: TimeoutFilter}
case thresholdEvent:
- p.Deadline = Deadline{Duration: FilterTimeout(0, source.Proto), Type: TimeoutFilter}
+ p.Deadline = Deadline{Duration: p.calculateFilterTimeout(source.Proto, r.t), Type: TimeoutFilter}
case filterableMessageEvent:
- p.Deadline = Deadline{Duration: FilterTimeout(0, source.Proto.Version), Type: TimeoutFilter}
+ p.Deadline = Deadline{Duration: p.calculateFilterTimeout(source.Proto.Version, r.t), Type: TimeoutFilter}
}
// update tracer state to match player
@@ -599,6 +675,8 @@ func (p *player) handleMessageEvent(r routerHandle, e messageEvent) (actions []a
if freshestRes.Ok && freshestRes.Event.t() == certThreshold && freshestRes.Event.Proposal == e.Input.Proposal.value() {
cert := Certificate(freshestRes.Event.Bundle)
a0 := ensureAction{Payload: e.Input.Proposal, Certificate: cert}
+ a0.voteValidatedAt = p.updateCredentialArrivalHistory(r, e.Proto.Version)
+ a0.dynamicFilterTimeout = p.dynamicFilterTimeout
actions = append(actions, a0)
as := p.enterRound(r, delegatedE, cert.Round+1)
return append(actions, as...)
diff --git a/agreement/player_test.go b/agreement/player_test.go
index 84aad574ba..a9a9f1106d 100644
--- a/agreement/player_test.go
+++ b/agreement/player_test.go
@@ -416,7 +416,8 @@ func testPlayerSetup() (player, rootRouter, testAccountData, testBlockFactory, L
accs := testAccountData{addresses: addresses, vrfs: vrfSecrets, ots: otSecrets}
round := ledger.NextRound()
period := period(0)
- player := player{Round: round, Period: period, Step: soft}
+ historyBuffer := makeCredentialArrivalHistory(dynamicFilterCredentialArrivalHistory)
+ player := player{Round: round, Period: period, Step: soft, lowestCredentialArrivals: historyBuffer}
var p actor = ioLoggedActor{checkedActor{actor: &player, actorContract: playerContract{}}, playerTracer}
router := routerFixture
@@ -500,7 +501,8 @@ func TestPlayerLateBlockProposalPeriod0(t *testing.T) {
func setupP(t *testing.T, r round, p period, s step) (plyr *player, pMachine ioAutomata, helper *voteMakerHelper) {
// Set up a composed test machine starting at specified rps
- rRouter := makeRootRouter(player{Round: r, Period: p, Step: s, Deadline: Deadline{Duration: FilterTimeout(p, protocol.ConsensusCurrentVersion), Type: TimeoutFilter}})
+ history := makeCredentialArrivalHistory(dynamicFilterCredentialArrivalHistory)
+ rRouter := makeRootRouter(player{Round: r, Period: p, Step: s, Deadline: Deadline{Duration: FilterTimeout(p, protocol.ConsensusCurrentVersion), Type: TimeoutFilter}, lowestCredentialArrivals: history})
concreteMachine := ioAutomataConcretePlayer{rootRouter: &rRouter}
plyr = concreteMachine.underlying()
pMachine = &concreteMachine
@@ -3234,19 +3236,20 @@ func TestPlayerAlwaysResynchsPinnedValue(t *testing.T) {
}
// test that ReceivedAt and ValidateAt timing information are retained in proposalStore
-// when the payloadPresent and payloadVerified events are processed, and that both timings
+// when the payloadPresent, payloadVerified, and voteVerified events are processed, and that all timings
// are available when the ensureAction is called for the block.
-func TestPlayerRetainsReceivedValidatedAt(t *testing.T) {
+func TestPlayerRetainsReceivedValidatedAtOneSample(t *testing.T) {
partitiontest.PartitionTest(t)
const r = round(20239)
- const p = period(1001)
+ const p = period(0)
pWhite, pM, helper := setupP(t, r-1, p, soft)
pP, pV := helper.MakeRandomProposalPayload(t, r-1)
// send voteVerified message
vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
inMsg := messageEvent{T: voteVerified, Input: message{Vote: vVote, UnauthenticatedVote: vVote.u()}}
+ inMsg = inMsg.AttachValidatedAt(501*time.Millisecond, r-1)
err, panicErr := pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)
@@ -3254,66 +3257,429 @@ func TestPlayerRetainsReceivedValidatedAt(t *testing.T) {
// send payloadPresent message
m := message{UnauthenticatedProposal: pP.u()}
inMsg = messageEvent{T: payloadPresent, Input: m}
- inMsg = inMsg.AttachReceivedAt(time.Second)
+ inMsg = inMsg.AttachReceivedAt(time.Second, r-1)
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, m, protocol.ConsensusFuture, time.Second)
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+ require.Equal(t, pWhite.lowestCredentialArrivals.writePtr, 1)
+ require.False(t, pWhite.lowestCredentialArrivals.isFull())
+ require.Equal(t, 501*time.Millisecond, pWhite.lowestCredentialArrivals.history[0])
+}
+
+// test that ReceivedAt and ValidateAt timing information are retained in
+// proposalStore when the payloadPresent, payloadVerified, and voteVerified
+// events are processed in the *preceding round*, and that all timings are
+// available when the ensureAction is called for the block.
+func TestPlayerRetainsEarlyReceivedValidatedAtOneSample(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ const r = round(20239)
+ const p = period(0)
+ pWhite, pM, helper := setupP(t, r-1, p, soft)
+ pP, pV := helper.MakeRandomProposalPayload(t, r-1)
+
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+ // move two rounds, so the measurement from round r gets inserted to the
+ // history window
+
+ // create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
+ vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
+ unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
+ proposalMsg := message{UnauthenticatedProposal: pP.u()}
+ compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
+ Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
+
+ inMsg := compoundMsg.AttachReceivedAt(time.Second, r-2) // call AttachReceivedAt like demux would
+ err, panicErr := pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure vote verify requests
+ verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")
+
+ // send voteVerified
+ verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
+ timestamp := 500
+ inMsg = inMsg.AttachValidatedAt(time.Duration(timestamp)*time.Millisecond, r-2)
err, panicErr = pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)
+ moveToRound(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg, protocol.ConsensusFuture)
+
+ // receive credential for the next round, check that it gets a timestamp of 1
+ require.Equal(t, time.Duration(1), pWhite.lowestCredentialArrivals.history[0])
+}
+
+// test that ReceivedAt and ValidateAt timing information are retained in proposalStore
+// when the payloadPresent, payloadVerified, and voteVerified events are processed, and that all timings
+// are available when the ensureAction is called for the block. The history should be kept for the last
+// DynamicFilterCredentialArrivalHistory rounds.
+func TestPlayerRetainsReceivedValidatedAtForHistoryWindow(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ const r = round(20239)
+ const p = period(0)
+ pWhite, pM, helper := setupP(t, r-1, p, soft)
+
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+
+ for i := 0; i < dynamicFilterCredentialArrivalHistory; i++ {
+ // send voteVerified message
+ pP, pV := helper.MakeRandomProposalPayload(t, r+round(i)-1)
+ vVote := helper.MakeVerifiedVote(t, 0, r+round(i)-1, p, propose, *pV)
+ inMsg := messageEvent{T: voteVerified, Input: message{Vote: vVote, UnauthenticatedVote: vVote.u()}}
+ timestamp := 500 + i
+ inMsg = inMsg.AttachValidatedAt(time.Duration(timestamp)*time.Millisecond, r+round(i)-1)
+ err, panicErr := pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
- assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, m)
+ // send payloadPresent message
+ m := message{UnauthenticatedProposal: pP.u()}
+ inMsg = messageEvent{T: payloadPresent, Input: m}
+ inMsg = inMsg.AttachReceivedAt(time.Second, r+round(i)-1)
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+ moveToRound(t, pWhite, pM, helper, r+round(i), p, pP, pV, m, protocol.ConsensusFuture)
+ }
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.True(t, pWhite.lowestCredentialArrivals.isFull())
+ for i := 0; i < dynamicFilterCredentialArrivalHistory; i++ {
+ // only the last historyLen samples are kept, so the first one is discarded
+ timestamp := 500 + i
+ require.Equal(t, time.Duration(timestamp)*time.Millisecond, pWhite.lowestCredentialArrivals.history[i])
+ }
}
// test that ReceivedAt and ValidateAt timing information are retained in proposalStore
-// when the payloadPresent (as part of the CompoundMessage encoding used by PP messages)
-// and payloadVerified events are processed, and that both timings
+// when the payloadPresent (as part of the CompoundMessage encoding used by PP messages),
+// payloadVerified, and voteVerified events are processed, and that all timings
// are available when the ensureAction is called for the block.
-func TestPlayerRetainsReceivedValidatedAtPP(t *testing.T) {
+func TestPlayerRetainsReceivedValidatedAtPPOneSample(t *testing.T) {
partitiontest.PartitionTest(t)
const r = round(20239)
- const p = period(1001)
+ const p = period(0)
pWhite, pM, helper := setupP(t, r-1, p, soft)
pP, pV := helper.MakeRandomProposalPayload(t, r-1)
// create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
- voteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
proposalMsg := message{UnauthenticatedProposal: pP.u()}
- compoundMsg := messageEvent{T: votePresent, Input: voteMsg,
+ compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
- inMsg := compoundMsg.AttachReceivedAt(time.Second) // call AttachReceivedAt like demux would
+ inMsg := compoundMsg.AttachReceivedAt(time.Second, r-1) // call AttachReceivedAt like demux would
err, panicErr := pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)
// make sure vote verify requests
- verifyEvent := ev(cryptoAction{T: verifyVote, M: voteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
+ verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")
// send voteVerified
- inMsg = messageEvent{T: voteVerified, Input: voteMsg, TaskIndex: 1}
+ verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
+ inMsg = inMsg.AttachValidatedAt(502*time.Millisecond, r-1)
err, panicErr = pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)
- assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg)
+ assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg, protocol.ConsensusFuture, time.Second)
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+ require.Equal(t, pWhite.lowestCredentialArrivals.writePtr, 1)
+ require.False(t, pWhite.lowestCredentialArrivals.isFull())
+ require.Equal(t, 502*time.Millisecond, pWhite.lowestCredentialArrivals.history[0])
}
-func assertCorrectReceivedAtSet(t *testing.T, pWhite *player, pM ioAutomata, helper *voteMakerHelper,
- r round, p period, pP *proposal, pV *proposalValue, m message) {
+// test that ReceivedAt and ValidateAt timing information are retained in
+// proposalStore when the payloadPresent (as part of the CompoundMessage
+// encoding used by PP messages), payloadVerified, and voteVerified events are
+// processed one round early, and that all timings are available when the
+// ensureAction is called for the block.
+func TestPlayerRetainsEarlyReceivedValidatedAtPPOneSample(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ const r = round(20239)
+ const p = period(0)
+ pWhite, pM, helper := setupP(t, r-1, p, soft)
+ pP, pV := helper.MakeRandomProposalPayload(t, r-1)
+
+ // create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
+ vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
+ unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
+ proposalMsg := message{UnauthenticatedProposal: pP.u()}
+ compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
+ Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
+ inMsg := compoundMsg.AttachReceivedAt(time.Second, r-2) // call AttachReceivedAt like demux would
+ err, panicErr := pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure vote verify requests
+ verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")
+
+ // send voteVerified
+ verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
+ inMsg = inMsg.AttachValidatedAt(502*time.Millisecond, r-2)
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg, protocol.ConsensusFuture, time.Duration(1))
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+ require.Equal(t, pWhite.lowestCredentialArrivals.writePtr, 1)
+ require.False(t, pWhite.lowestCredentialArrivals.isFull())
+ require.Equal(t, time.Duration(1), pWhite.lowestCredentialArrivals.history[0])
+}
+
+// test that ReceivedAt and ValidateAt timing information are retained in
+// proposalStore when the payloadPresent (as part of the CompoundMessage
+// encoding used by PP messages), payloadVerified, and voteVerified events are
+// processed, and that all timings are available when the ensureAction is called
+// for the block. The history should be kept for the last
+// DynamicFilterCredentialArrivalHistory rounds.
+func TestPlayerRetainsReceivedValidatedAtPPForHistoryWindow(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ const r = round(20239)
+ const p = period(0)
+ pWhite, pM, helper := setupP(t, r-1, p, soft)
+ pP, pV := helper.MakeRandomProposalPayload(t, r-1)
+
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+
+ for i := 0; i < dynamicFilterCredentialArrivalHistory; i++ {
+ // create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
+ vVote := helper.MakeVerifiedVote(t, 0, r+round(i)-1, p, propose, *pV)
+ unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
+ proposalMsg := message{UnauthenticatedProposal: pP.u()}
+ compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
+ Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
+
+ inMsg := compoundMsg.AttachReceivedAt(time.Second, r+round(i)-1) // call AttachReceivedAt like demux would
+ err, panicErr := pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure vote verify requests
+ verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r + round(i) - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")
+
+ // send voteVerified
+ verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
+ timestamp := 500 + i
+ inMsg = inMsg.AttachValidatedAt(time.Duration(timestamp)*time.Millisecond, r+round(i)-1)
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+ moveToRound(t, pWhite, pM, helper, r+round(i), p, pP, pV, proposalMsg, protocol.ConsensusFuture)
+ }
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.True(t, pWhite.lowestCredentialArrivals.isFull())
+ for i := 0; i < dynamicFilterCredentialArrivalHistory; i++ {
+ // only the last historyLen samples are kept, so the first one is discarded
+ timestamp := 500 + i
+ require.Equal(t, time.Duration(timestamp)*time.Millisecond, pWhite.lowestCredentialArrivals.history[i])
+ }
+}
+
+// test that ReceivedAt and ValidateAt timing information are retained in proposalStore
+// when the voteVerified event comes in first (as part of the AV message before PP),
+// then the payloadPresent (as part of the CompoundMessage encoding used by PP messages)
+// and payloadVerified events are processed, and that all timings
+// are available when the ensureAction is called for the block.
+func TestPlayerRetainsReceivedValidatedAtAVPPOneSample(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ const r = round(20239)
+ const p = period(0)
+ pWhite, pM, helper := setupP(t, r-1, p, soft)
+ pP, pV := helper.MakeRandomProposalPayload(t, r-1)
+
+ // send votePresent message (mimicking the first AV message validating)
+ vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
+ unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
+ inMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg}
+ err, panicErr := pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure vote verify requests
+ verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")
+
+ // send voteVerified
+ verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
+ inMsg = inMsg.AttachValidatedAt(502*time.Millisecond, r-1)
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
+ proposalMsg := message{UnauthenticatedProposal: pP.u()}
+ compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
+ Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
+ inMsg = compoundMsg.AttachReceivedAt(time.Second, r-1) // call AttachReceivedAt like demux would
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure no second request to verify this vote
+ verifyEvent = ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Equal(t, 1, pM.getTrace().CountEvent(verifyEvent), "Player should not verify second vote")
+
+ assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg, protocol.ConsensusFuture, time.Second)
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+ require.Equal(t, pWhite.lowestCredentialArrivals.writePtr, 1)
+ require.False(t, pWhite.lowestCredentialArrivals.isFull())
+ require.Equal(t, 502*time.Millisecond, pWhite.lowestCredentialArrivals.history[0])
+}
+
+// test that ReceivedAt and ValidateAt timing information are retained in
+// proposalStore when the voteVerified event comes in first (as part of the AV
+// message before PP), then the payloadPresent (as part of the CompoundMessage
+// encoding used by PP messages) and payloadVerified events are processed one
+// round early, and that all timings are available when the ensureAction is
+// called for the block.
+func TestPlayerRetainsEarlyReceivedValidatedAtAVPPOneSample(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ const r = round(20239)
+ const p = period(0)
+ pWhite, pM, helper := setupP(t, r-1, p, soft)
+ pP, pV := helper.MakeRandomProposalPayload(t, r-1)
+
+ // send votePresent message (mimicking the first AV message validating)
+ vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
+ unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
+ inMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg}
+ err, panicErr := pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure vote verify requests
+ verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")
+
+ // send voteVerified
+ verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
+ inMsg = inMsg.AttachValidatedAt(502*time.Millisecond, r-2)
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
+ proposalMsg := message{UnauthenticatedProposal: pP.u()}
+ compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
+ Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
+ inMsg = compoundMsg.AttachReceivedAt(time.Second, r-2) // call AttachReceivedAt like demux would
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure no second request to verify this vote
+ verifyEvent = ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Equal(t, 1, pM.getTrace().CountEvent(verifyEvent), "Player should not verify second vote")
+
+ assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg, protocol.ConsensusFuture, time.Duration(1))
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+ require.Equal(t, pWhite.lowestCredentialArrivals.writePtr, 1)
+ require.False(t, pWhite.lowestCredentialArrivals.isFull())
+ require.Equal(t, time.Duration(1), pWhite.lowestCredentialArrivals.history[0])
+}
+
+func TestPlayerRetainsReceivedValidatedAtAVPPHistoryWindow(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ const r = round(20239)
+ const p = period(0)
+ pWhite, pM, helper := setupP(t, r-1, p, soft)
+
+ require.NotZero(t, dynamicFilterCredentialArrivalHistory)
+
+ for i := 0; i < dynamicFilterCredentialArrivalHistory; i++ {
+ pP, pV := helper.MakeRandomProposalPayload(t, r+round(i)-1)
+
+ // send votePresent message (mimicking the first AV message validating)
+ vVote := helper.MakeVerifiedVote(t, 0, r+round(i)-1, p, propose, *pV)
+ unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
+ inMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg}
+ err, panicErr := pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // make sure vote verify requests
+ verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r + round(i) - 1, Period: p, Step: propose, TaskIndex: 1})
+ require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")
+
+ // send voteVerified
+ verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
+ inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
+ timestamp := 500 + i
+ inMsg = inMsg.AttachValidatedAt(time.Duration(timestamp)*time.Millisecond, r+round(i)-1)
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ // create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
+ proposalMsg := message{UnauthenticatedProposal: pP.u()}
+ compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
+ Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
+ inMsg = compoundMsg.AttachReceivedAt(time.Second, r+round(i)-1) // call AttachReceivedAt like demux would
+ err, panicErr = pM.transition(inMsg)
+ require.NoError(t, err)
+ require.NoError(t, panicErr)
+
+ moveToRound(t, pWhite, pM, helper, r+round(i), p, pP, pV, proposalMsg, protocol.ConsensusFuture)
+ }
+
+ // assert lowest vote validateAt time was recorded into payloadArrivals
+ require.True(t, pWhite.lowestCredentialArrivals.isFull())
+ for i := 0; i < dynamicFilterCredentialArrivalHistory; i++ {
+ // only the last historyLen samples are kept, so the first one is discarded
+ timestamp := 500 + i
+ require.Equal(t, time.Duration(timestamp)*time.Millisecond, pWhite.lowestCredentialArrivals.history[i])
+ }
+}
+
+func moveToRound(t *testing.T, pWhite *player, pM ioAutomata, helper *voteMakerHelper,
+ r round, p period, pP *proposal, pV *proposalValue, m message, ver protocol.ConsensusVersion) {
+
// make sure payload verify request
verifyEvent := ev(cryptoAction{T: verifyPayload, M: m, Round: r - 1, Period: p, Step: propose, TaskIndex: 0})
require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify payload")
// payloadVerified
- inMsg := messageEvent{T: payloadVerified, Input: message{Proposal: *pP}, Proto: ConsensusVersionView{Version: protocol.ConsensusCurrentVersion}}
- inMsg = inMsg.AttachValidatedAt(2 * time.Second) // call AttachValidatedAt like demux would
+ inMsg := messageEvent{T: payloadVerified, Input: message{Proposal: *pP}, Proto: ConsensusVersionView{Version: ver}}
+ inMsg = inMsg.AttachValidatedAt(2*time.Second, r-1) // call AttachValidatedAt like demux would
err, panicErr := pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)
// gen cert to move into the next round
- votes := make([]vote, int(cert.threshold(config.Consensus[protocol.ConsensusCurrentVersion])))
- for i := 0; i < int(cert.threshold(config.Consensus[protocol.ConsensusCurrentVersion])); i++ {
+ votes := make([]vote, int(cert.threshold(config.Consensus[ver])))
+ for i := 0; i < int(cert.threshold(config.Consensus[ver])); i++ {
votes[i] = helper.MakeVerifiedVote(t, i, r-1, p, cert, *pV)
}
bun := unauthenticatedBundle{
@@ -3330,7 +3696,7 @@ func assertCorrectReceivedAtSet(t *testing.T, pWhite *player, pM ioAutomata, hel
},
UnauthenticatedBundle: bun,
},
- Proto: ConsensusVersionView{Version: protocol.ConsensusCurrentVersion},
+ Proto: ConsensusVersionView{Version: ver},
}
err, panicErr = pM.transition(inMsg)
require.NoError(t, err)
@@ -3340,6 +3706,12 @@ func assertCorrectReceivedAtSet(t *testing.T, pWhite *player, pM ioAutomata, hel
require.Equalf(t, period(0), pWhite.Period, "player did not enter period 0 in new round")
commitEvent := ev(ensureAction{Certificate: Certificate(bun), Payload: *pP})
require.Truef(t, pM.getTrace().Contains(commitEvent), "Player should try to ensure block/digest on ledger")
+}
+
+func assertCorrectReceivedAtSet(t *testing.T, pWhite *player, pM ioAutomata, helper *voteMakerHelper,
+ r round, p period, pP *proposal, pV *proposalValue, m message, ver protocol.ConsensusVersion, validationTimestamp time.Duration) {
+
+ moveToRound(t, pWhite, pM, helper, r, p, pP, pV, m, ver)
// find and unwrap ensureAction from trace
var ea ensureAction
@@ -3355,7 +3727,7 @@ func assertCorrectReceivedAtSet(t *testing.T, pWhite *player, pM ioAutomata, hel
}
require.True(t, foundEA)
require.Equal(t, 2*time.Second, ea.Payload.validatedAt)
- require.Equal(t, time.Second, ea.Payload.receivedAt)
+ require.Equal(t, validationTimestamp, ea.Payload.receivedAt)
}
// todo: test pipelined rounds, and round interruption
diff --git a/agreement/proposalStore.go b/agreement/proposalStore.go
index 080609de50..fdfecac5f2 100644
--- a/agreement/proposalStore.go
+++ b/agreement/proposalStore.go
@@ -352,6 +352,9 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event {
se.Committable = ea.Assembled
se.Payload = ea.Payload
return se
+ case readLowestVote:
+ re := e.(readLowestEvent)
+ return r.dispatch(p, re, proposalMachinePeriod, re.Round, re.Period, 0).(readLowestEvent)
case readPinned:
se := e.(pinnedValueEvent)
ea := store.Assemblers[store.Pinned] // If pinned is bottom, assembled/payloadOK = false, payload = bottom
diff --git a/agreement/proposalTracker.go b/agreement/proposalTracker.go
index 59ffb77a28..603a8fd40b 100644
--- a/agreement/proposalTracker.go
+++ b/agreement/proposalTracker.go
@@ -88,7 +88,7 @@ func (t *proposalTracker) underlying() listener {
return t
}
-// A proposalTracker handles five types of events.
+// A proposalTracker handles six types of events.
//
// - voteFilterRequest returns a voteFiltered event if a given proposal-vote
// from a given sender has already been seen. Otherwise it returns an empty
@@ -118,6 +118,9 @@ func (t *proposalTracker) underlying() listener {
// - readStaging returns the a stagingValueEvent with the proposal-value
// believed to be the staging value (i.e., sigma(S, r, p)) by the
// proposalTracker in period p.
+//
+// - readLowestVote returns the vote with the lowest credential that was received so far.
+
func (t *proposalTracker) handle(r routerHandle, p player, e event) event {
switch e.t() {
case voteFilterRequest:
@@ -165,6 +168,12 @@ func (t *proposalTracker) handle(r routerHandle, p player, e event) event {
t.Freezer = t.Freezer.freeze()
return e
+ case readLowestVote:
+ e := e.(readLowestEvent)
+ e.Vote = t.Freezer.Lowest
+ e.Filled = t.Freezer.Filled
+ return e
+
case softThreshold, certThreshold:
e := e.(thresholdEvent)
t.Staging = e.Proposal
diff --git a/agreement/proposalTrackerContract.go b/agreement/proposalTrackerContract.go
index 2b995dfcac..a6fa2ffc36 100644
--- a/agreement/proposalTrackerContract.go
+++ b/agreement/proposalTrackerContract.go
@@ -32,7 +32,7 @@ type proposalTrackerContract struct {
// TODO check concrete types of events
func (c *proposalTrackerContract) pre(p player, in event) (pre []error) {
switch in.t() {
- case voteVerified, proposalFrozen, softThreshold, certThreshold, voteFilterRequest, readStaging:
+ case voteVerified, proposalFrozen, softThreshold, certThreshold, voteFilterRequest, readStaging, readLowestVote:
default:
pre = append(pre, fmt.Errorf("incoming event has invalid type: %v", in.t()))
}
diff --git a/agreement/service.go b/agreement/service.go
index 263efe0b09..b0668ac2a5 100644
--- a/agreement/service.go
+++ b/agreement/service.go
@@ -216,7 +216,7 @@ func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, r
s.log.Errorf("unable to retrieve consensus version for round %d, defaulting to binary consensus version", nextRound)
nextVersion = protocol.ConsensusCurrentVersion
}
- status = player{Round: nextRound, Step: soft, Deadline: Deadline{Duration: FilterTimeout(0, nextVersion), Type: TimeoutFilter}}
+ status = player{Round: nextRound, Step: soft, Deadline: Deadline{Duration: FilterTimeout(0, nextVersion), Type: TimeoutFilter}, lowestCredentialArrivals: makeCredentialArrivalHistory(dynamicFilterCredentialArrivalHistory)}
router = makeRootRouter(status)
a1 := pseudonodeAction{T: assemble, Round: s.Ledger.NextRound()}
diff --git a/agreement/service_test.go b/agreement/service_test.go
index fc50c302d5..b2ecd42f31 100644
--- a/agreement/service_test.go
+++ b/agreement/service_test.go
@@ -77,7 +77,7 @@ func (c *testingClock) Zero() timers.Clock[TimeoutType] {
}
func (c *testingClock) Since() time.Duration {
- return 0
+ return 1
}
func (c *testingClock) TimeoutAt(d time.Duration, timeoutType TimeoutType) <-chan time.Time {
@@ -93,6 +93,17 @@ func (c *testingClock) TimeoutAt(d time.Duration, timeoutType TimeoutType) <-cha
return ta.ch
}
+func (c *testingClock) when(timeoutType TimeoutType) (time.Duration, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ ta, ok := c.TA[timeoutType]
+ if !ok {
+ return time.Duration(0), fmt.Errorf("no timeout of type, %v", timeoutType)
+ }
+ return ta.delta, nil
+}
+
func (c *testingClock) Encode() []byte {
return nil
}
@@ -862,6 +873,10 @@ func runRound(clocks []timers.Clock[TimeoutType], activityMonitor *activityMonit
triggerGlobalTimeout(filterTimeout, TimeoutFilter, clocks, activityMonitor)
return expectNewPeriod(clocks, zeroes)
}
+func runRoundTriggerFilter(clocks []timers.Clock[TimeoutType], activityMonitor *activityMonitor, zeroes uint) (newzeroes uint) {
+ triggerGlobalTimeoutType(TimeoutFilter, clocks, activityMonitor)
+ return expectNewPeriod(clocks, zeroes)
+}
func sanityCheck(startRound round, numRounds round, ledgers []Ledger) {
for i := range ledgers {
@@ -880,18 +895,18 @@ func sanityCheck(startRound round, numRounds round, ledgers []Ledger) {
}
}
-func simulateAgreement(t *testing.T, numNodes int, numRounds int, traceLevel traceLevel) {
- simulateAgreementWithLedgerFactory(t, numNodes, numRounds, traceLevel, makeTestLedger)
+func simulateAgreement(t *testing.T, numNodes int, numRounds int, traceLevel traceLevel) (filterTimeouts []time.Duration) {
+ return simulateAgreementWithLedgerFactory(t, numNodes, numRounds, traceLevel, makeTestLedger)
}
-func simulateAgreementWithConsensusVersion(t *testing.T, numNodes int, numRounds int, traceLevel traceLevel, consensusVersion func(basics.Round) (protocol.ConsensusVersion, error)) {
+func simulateAgreementWithConsensusVersion(t *testing.T, numNodes int, numRounds int, traceLevel traceLevel, consensusVersion func(basics.Round) (protocol.ConsensusVersion, error)) (filterTimeouts []time.Duration) {
ledgerFactory := func(data map[basics.Address]basics.AccountData) Ledger {
return makeTestLedgerWithConsensusVersion(data, consensusVersion)
}
- simulateAgreementWithLedgerFactory(t, numNodes, numRounds, traceLevel, ledgerFactory)
+ return simulateAgreementWithLedgerFactory(t, numNodes, numRounds, traceLevel, ledgerFactory)
}
-func simulateAgreementWithLedgerFactory(t *testing.T, numNodes int, numRounds int, traceLevel traceLevel, ledgerFactory func(map[basics.Address]basics.AccountData) Ledger) {
+func simulateAgreementWithLedgerFactory(t *testing.T, numNodes int, numRounds int, traceLevel traceLevel, ledgerFactory func(map[basics.Address]basics.AccountData) Ledger) []time.Duration {
_, baseLedger, cleanupFn, services, clocks, ledgers, activityMonitor := setupAgreement(t, numNodes, traceLevel, ledgerFactory)
startRound := baseLedger.NextRound()
defer cleanupFn()
@@ -903,12 +918,17 @@ func simulateAgreementWithLedgerFactory(t *testing.T, numNodes int, numRounds in
activityMonitor.waitForQuiet()
zeroes := expectNewPeriod(clocks, 0)
+ filterTimeouts := make([][]time.Duration, numNodes, numNodes)
+
// run round with round-specific consensus version first (since fix in #1896)
- version, _ := baseLedger.ConsensusVersion(ParamsRound(startRound))
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ zeroes = runRoundTriggerFilter(clocks, activityMonitor, zeroes)
for j := 1; j < numRounds; j++ {
- version, _ := baseLedger.ConsensusVersion(ParamsRound(baseLedger.NextRound() + basics.Round(j-1)))
- zeroes = runRound(clocks, activityMonitor, zeroes, FilterTimeout(0, version))
+ for srvIdx, clock := range clocks {
+ delta, err := clock.(*testingClock).when(TimeoutFilter)
+ require.NoError(t, err)
+ filterTimeouts[srvIdx] = append(filterTimeouts[srvIdx], delta)
+ }
+ zeroes = runRoundTriggerFilter(clocks, activityMonitor, zeroes)
}
for i := 0; i < numNodes; i++ {
@@ -916,6 +936,19 @@ func simulateAgreementWithLedgerFactory(t *testing.T, numNodes int, numRounds in
}
sanityCheck(startRound, round(numRounds), ledgers)
+
+ if len(clocks) == 0 {
+ return nil
+ }
+
+ for rnd := 0; rnd < numRounds-1; rnd++ {
+ delta := filterTimeouts[0][rnd]
+ for srvIdx := range clocks {
+ require.Equal(t, delta, filterTimeouts[srvIdx][rnd])
+ }
+ }
+
+ return filterTimeouts[0]
}
func TestAgreementSynchronous1(t *testing.T) {
@@ -988,6 +1021,159 @@ func TestAgreementSynchronous5_50(t *testing.T) {
simulateAgreement(t, 5, 50, disabled)
}
+func createDynamicFilterConfig() (version protocol.ConsensusVersion, consensusVersion func(r basics.Round) (protocol.ConsensusVersion, error), configCleanup func()) {
+ version = protocol.ConsensusVersion("test-protocol-filtertimeout")
+ protoParams := config.Consensus[protocol.ConsensusCurrentVersion]
+ protoParams.DynamicFilterTimeout = true
+ config.Consensus[version] = protoParams
+
+ consensusVersion = func(r basics.Round) (protocol.ConsensusVersion, error) {
+ return version, nil
+ }
+
+ configCleanup = func() {
+ delete(config.Consensus, version)
+ }
+
+ return
+}
+
+func TestAgreementSynchronousFuture5_DynamicFilterRounds(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ if testing.Short() {
+ t.Skip("Skipping agreement integration test")
+ }
+
+ _, consensusVersion, configCleanup := createDynamicFilterConfig()
+ defer configCleanup()
+
+ if dynamicFilterCredentialArrivalHistory <= 0 {
+ return
+ }
+
+ rounds := dynamicFilterCredentialArrivalHistory + 20
+
+ filterTimeouts := simulateAgreementWithConsensusVersion(t, 5, rounds, disabled, consensusVersion)
+ require.Len(t, filterTimeouts, rounds-1)
+ for i := 1; i < dynamicFilterCredentialArrivalHistory-1; i++ {
+ require.Equal(t, filterTimeouts[i-1], filterTimeouts[i])
+ }
+
+ // dynamic filter timeout kicks in when history window is full
+ require.Less(t, filterTimeouts[dynamicFilterCredentialArrivalHistory-1], filterTimeouts[dynamicFilterCredentialArrivalHistory-2])
+
+ for i := dynamicFilterCredentialArrivalHistory; i < len(filterTimeouts); i++ {
+ require.Equal(t, filterTimeouts[i-1], filterTimeouts[i])
+ }
+}
+
+func TestDynamicFilterTimeoutResets(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ if testing.Short() {
+ t.Skip("Skipping agreement integration test")
+ }
+
+ version, consensusVersion, configCleanup := createDynamicFilterConfig()
+ defer configCleanup()
+
+ if dynamicFilterCredentialArrivalHistory <= 0 {
+ return
+ }
+
+ numNodes := 5
+
+ ledgerFactory := func(data map[basics.Address]basics.AccountData) Ledger {
+ return makeTestLedgerWithConsensusVersion(data, consensusVersion)
+ }
+
+ baseNetwork, baseLedger, cleanupFn, services, clocks, ledgers, activityMonitor := setupAgreement(t, numNodes, disabled, ledgerFactory)
+ startRound := baseLedger.NextRound()
+ defer cleanupFn()
+
+ for i := 0; i < numNodes; i++ {
+ services[i].Start()
+ }
+ activityMonitor.waitForActivity()
+ activityMonitor.waitForQuiet()
+ zeroes := expectNewPeriod(clocks, 0)
+
+ filterTimeouts := make([][]time.Duration, numNodes, numNodes)
+
+ // run round with round-specific consensus version first (since fix in #1896)
+ zeroes = runRoundTriggerFilter(clocks, activityMonitor, zeroes)
+ for j := 1; j < dynamicFilterCredentialArrivalHistory+1; j++ {
+ for srvIdx, clock := range clocks {
+ delta, err := clock.(*testingClock).when(TimeoutFilter)
+ require.NoError(t, err)
+ filterTimeouts[srvIdx] = append(filterTimeouts[srvIdx], delta)
+ }
+ zeroes = runRoundTriggerFilter(clocks, activityMonitor, zeroes)
+ }
+
+ for i := range clocks {
+ require.Len(t, filterTimeouts[i], dynamicFilterCredentialArrivalHistory)
+ for j := 1; j < dynamicFilterCredentialArrivalHistory-1; j++ {
+ require.Equal(t, filterTimeouts[i][j-1], filterTimeouts[i][j])
+ }
+ require.Less(t, filterTimeouts[i][dynamicFilterCredentialArrivalHistory-1], filterTimeouts[i][dynamicFilterCredentialArrivalHistory-2])
+ }
+
+ // force fast partition recovery into bottom
+ {
+ baseNetwork.dropAllSoftVotes()
+ baseNetwork.dropAllSlowNextVotes()
+
+ triggerGlobalTimeout(FilterTimeout(0, version), TimeoutFilter, clocks, activityMonitor)
+ zeroes = expectNoNewPeriod(clocks, zeroes)
+
+ triggerGlobalTimeoutType(TimeoutDeadline, clocks, activityMonitor)
+ zeroes = expectNoNewPeriod(clocks, zeroes)
+
+ triggerGlobalTimeout(0, TimeoutFastRecovery, clocks, activityMonitor) // activates fast partition recovery timer
+ zeroes = expectNoNewPeriod(clocks, zeroes)
+
+ triggerGlobalTimeout(firstFPR, TimeoutFastRecovery, clocks, activityMonitor)
+ zeroes = expectNewPeriod(clocks, zeroes)
+ }
+
+ // terminate on period 1
+ {
+ baseNetwork.repairAll()
+ triggerGlobalTimeout(FilterTimeout(1, version), TimeoutFilter, clocks, activityMonitor)
+ zeroes = expectNewPeriod(clocks, zeroes)
+ }
+
+ filterTimeoutsPostRecovery := make([][]time.Duration, numNodes, numNodes)
+
+ // run round with round-specific consensus version first (since fix in #1896)
+ zeroes = runRoundTriggerFilter(clocks, activityMonitor, zeroes)
+ for j := 1; j < dynamicFilterCredentialArrivalHistory+1; j++ {
+ for srvIdx, clock := range clocks {
+ delta, err := clock.(*testingClock).when(TimeoutFilter)
+ require.NoError(t, err)
+ filterTimeoutsPostRecovery[srvIdx] = append(filterTimeoutsPostRecovery[srvIdx], delta)
+ }
+ zeroes = runRoundTriggerFilter(clocks, activityMonitor, zeroes)
+ }
+
+ for i := range clocks {
+ require.Len(t, filterTimeoutsPostRecovery[i], dynamicFilterCredentialArrivalHistory)
+ // check that history was discarded, so filter time increased back to its original default
+ require.Less(t, filterTimeouts[i][dynamicFilterCredentialArrivalHistory-1], filterTimeoutsPostRecovery[i][0])
+ require.Equal(t, filterTimeouts[i][dynamicFilterCredentialArrivalHistory-2], filterTimeoutsPostRecovery[i][0])
+
+ // check that filter timeout was updated to at the end of the history window
+ for j := 1; j < dynamicFilterCredentialArrivalHistory-1; j++ {
+ require.Equal(t, filterTimeoutsPostRecovery[i][j-1], filterTimeoutsPostRecovery[i][j])
+ }
+ require.Less(t, filterTimeoutsPostRecovery[i][dynamicFilterCredentialArrivalHistory-1], filterTimeoutsPostRecovery[i][dynamicFilterCredentialArrivalHistory-2])
+ }
+
+ sanityCheck(startRound, 2*round(dynamicFilterCredentialArrivalHistory+1)+1, ledgers)
+}
+
func TestAgreementSynchronousFuture1(t *testing.T) {
partitiontest.PartitionTest(t)
diff --git a/agreement/state_machine_test.go b/agreement/state_machine_test.go
index 0effd9fda1..69b31a9866 100644
--- a/agreement/state_machine_test.go
+++ b/agreement/state_machine_test.go
@@ -140,6 +140,15 @@ func (t ioTrace) Contains(e event) bool {
})
}
+func (t ioTrace) CountEvent(b event) (count int) {
+ for _, e := range t.events {
+ if e.ComparableStr() == b.ComparableStr() {
+ count++
+ }
+ }
+ return
+}
+
// for each event, passes it into the given fn; if returns true, returns true.
func (t ioTrace) ContainsFn(compareFn func(b event) bool) bool {
for _, ev := range t.events {
diff --git a/agreement/vote.go b/agreement/vote.go
index af157ee3bb..bed20fe88f 100644
--- a/agreement/vote.go
+++ b/agreement/vote.go
@@ -18,6 +18,7 @@ package agreement
import (
"fmt"
+ "time"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
@@ -51,6 +52,10 @@ type (
R rawVote `codec:"r"`
Cred committee.Credential `codec:"cred"`
Sig crypto.OneTimeSignature `codec:"sig,omitempty,omitemptycheckstruct"`
+
+ // validatedAt indicates the time at which this vote was verified (as a voteVerified messageEvent),
+ // relative to the zero of that round. It is only set for step 0.
+ validatedAt time.Duration
}
// unauthenticatedEquivocationVote is a pair of votes which has not
diff --git a/config/consensus.go b/config/consensus.go
index 8c217c18c1..190e5ce16a 100644
--- a/config/consensus.go
+++ b/config/consensus.go
@@ -516,6 +516,12 @@ type ConsensusParams struct {
// used by agreement for Circulation, and updates the calculation of StateProofOnlineTotalWeight used
// by state proofs to use the same method (rather than excluding stake from the top N stakeholders as before).
ExcludeExpiredCirculation bool
+
+ // DynamicFilterTimeout indicates whether the filter timeout is set
+ // dynamically, at run time, according to the recent history of credential
+ // arrival times or is set to a static value. Even if this flag disables the
+ // dynamic filter, it will be calculated and logged (but not used).
+ DynamicFilterTimeout bool
}
// PaysetCommitType enumerates possible ways for the block header to commit to
@@ -1357,11 +1363,16 @@ func initConsensusProtocols() {
// ConsensusFuture is used to test features that are implemented
// but not yet released in a production protocol version.
vFuture := v38
+
vFuture.ApprovedUpgrades = map[protocol.ConsensusVersion]uint64{}
vFuture.LogicSigVersion = 10 // When moving this to a release, put a new higher LogicSigVersion here
vFuture.EnableLogicSigCostPooling = true
+ // Setting DynamicFilterTimeout in vFuture will impact e2e test performance
+ // by reducing round time. Hence, it is commented out for now.
+ // vFuture.DynamicFilterTimeout = true
+
Consensus[protocol.ConsensusFuture] = vFuture
// vAlphaX versions are an separate series of consensus parameters and versions for alphanet
diff --git a/logging/telemetryspec/event.go b/logging/telemetryspec/event.go
index 4a8280591c..1617b0339c 100644
--- a/logging/telemetryspec/event.go
+++ b/logging/telemetryspec/event.go
@@ -89,14 +89,16 @@ const BlockAcceptedEvent Event = "BlockAccepted"
// BlockAcceptedEventDetails contains details for the BlockAcceptedEvent
type BlockAcceptedEventDetails struct {
- Address string
- Hash string
- Round uint64
- ValidatedAt time.Duration
- ReceivedAt time.Duration
- PreValidated bool
- PropBufLen uint64
- VoteBufLen uint64
+ Address string
+ Hash string
+ Round uint64
+ ValidatedAt time.Duration
+ ReceivedAt time.Duration
+ VoteValidatedAt time.Duration
+ DynamicFilterTimeout time.Duration
+ PreValidated bool
+ PropBufLen uint64
+ VoteBufLen uint64
}
// AccountRegisteredEvent event
diff --git a/tools/x-repo-types/xrt_test.go b/tools/x-repo-types/xrt_test.go
index 4360b432d6..8de6e83122 100644
--- a/tools/x-repo-types/xrt_test.go
+++ b/tools/x-repo-types/xrt_test.go
@@ -78,7 +78,7 @@ func TestCrossRepoTypes(t *testing.T) {
xPkg: "github.com/algorand/go-algorand/config",
xType: "ConsensusParams",
yPkg: "github.com/algorand/go-algorand-sdk/v2/protocol/config",
- yBranch: "develop",
+ yBranch: "b5f90353ea841fc2e3af9f77d42205c337c93dd2",
yType: "ConsensusParams",
},
{