diff --git a/internal/rollup-shared-publisher/x/consensus/README.md b/internal/rollup-shared-publisher/x/consensus/README.md index e69de29bb2..6d6c29ec1e 100644 --- a/internal/rollup-shared-publisher/x/consensus/README.md +++ b/internal/rollup-shared-publisher/x/consensus/README.md @@ -0,0 +1,143 @@ +# Consensus Module + +This package contains the consensus coordinator for the SCP. +It includes: +- `TwoPCState` for holding votes and decision state (NOTE: it, however, doesn't perform the 2pc decision logic, i.e. the external caller is responsible for setting the decision). +- `ProtocolHandler`: for receiving messages and routing them to the appropriate coordinator call. +- `Coordinator`: for routing calls to the callback manager (`startFn`, `voteFn`, `decisionFn`, `blockFn`) and managing the 2PC logic. +- `CallbackManager`: for managing the callback functions and invoking them with timeouts. + +## Documentation + +```mermaid +classDiagram + direction LR + + class ProtocolHandler { + Handle(ctx, from, msg) + } + class Coordinator { + StartTransaction(...) + RecordVote(...) + RecordDecision(...) + RecordCIRCMessage(...) + ConsumeCIRCMessage(...) + OnBlockCommitted(...) + OnL2BlockCommitted(...) + } + class CallbackManager { + startFn + voteFn + decisionFn + blockFn + } + class StateManager { + AddState(...) + GetState(...) + RemoveState(...) + } + class TwoPCState { + XTID + Decision + Votes + CIRCMessages + } + + ProtocolHandler --> Coordinator : invokes fn by msg + Coordinator --> CallbackManager : routes callbacks + Coordinator --> StateManager : manage 2PC logic + StateManager o--> TwoPCState : owns instances + +``` + +### Role +Identifies whether a coordinator instance runs as a `Follower` or `Leader`. + +### DecisionState +`DecisionState` tracks the lifecycle of a 2PC transaction: `StateUndecided`, `StateCommit`, `StateAbort`. + +### MessageType +`MessageType` enumerates the protobuf payloads understood by the protocol handler: `MsgXTRequest`, `MsgVote`, `MsgDecided`, and `MsgCIRCMessage`. + +### Config +`Config` bundles static settings for a coordinator: +- a unique `NodeID` +- consensus `Timeout` +- `Role` + +### TwoPCState +`TwoPCState` carries all data for a single cross-rollup transaction: +- `XTID` +- `Decision` +- participating chains +- collected votes (chain id -> bool) +- optional timeout `Timer` +- start timestamp +- original xt request payload +- queued CIRC messages + +> [!NOTE] +> While the structure stores votes and the decision state, it doesn't implement the 2PC logic, i.e.: +> i) timeout triggers Abort decision +> ii) any vote false triggers Abort decision +> iii) all votes true triggers Commit decision +> Instead, the external caller is responsible for setting the decision via `SetDecision`. + +### ProtocolHandler +`ProtocolHandler` forwards protocol messages to the appropriate coordinator method. + +### Coordinator +`Coordinator` is invoked by the `ProtocolHandler. +It handles transaction start, vote/decision recording, CIRC buffering, block notifications, and lifecycle management. + +It holds a `StateManager` for managing multiple `TwoPCState` instances while managing the 2PC logic, and +routes calls to the callback manager (`startFn`, `voteFn`, `decisionFn`, `blockFn`). + +### StateManager +`StateManager` manages several `TwoPCState`. + +### CallbackManager +`CallbackManager` stores the registered callbacks (`StartFn`, `VoteFn`, `DecisionFn`, and `BlockFn`) +and enforces timeouts when invoking them. + +### Chain key helpers +Normalize chain identifiers into the hex-encoded keys used in state maps and logs. + +## Tests + +For running tests, use +```bash +go test -v ./... +``` + +- coordinator_test.go + - `TestCoordinatorStartTransactionRegistersStateAndCallback` + - `TestCoordinatorRecordVoteCommitTriggersDecision` + - `TestCoordinatorRecordVoteAbort` + - `TestCoordinatorRecordVoteRejectsUnknownParticipant` + - `TestCoordinatorRecordCIRCMessageAndConsume` + - `TestCoordinatorRecordDecisionFollower` + - `TestCoordinatorRecordDecisionRejectedForLeader` +- keys_test.go + - `TestChainKeyBytes` + - `TestChainKeyUint64` + - `TestChainKeyConsistency` + - `TestChainKeyZeroSpecialCase` + - `TestChainKeyRoundTrip` +- protocol_handler_test.go + - `TestProtocolHandlerHandleXTRequest` + - `TestProtocolHandlerHandleVote` + - `TestProtocolHandlerHandleDecided` + - `TestProtocolHandlerHandleCIRCMessage` + - `TestProtocolHandlerHandleUnknownMessage` + - `TestProtocolHandlerCanHandleAndName` +- state_manager_test.go + - `TestStateManagerAddGetRemove` + - `TestStateManagerCleanupRemovesCompletedStates` +- twopc_state_test.go + - `TestNewTwoPCStateInitializesFields` + - `TestTwoPCStateAddVote` + - `TestTwoPCStateDecisionFlow` + - `TestTwoPCStateGetVotesReturnsCopy` + - `TestTwoPCStateGetDurationIncreases` + - `TestTwoPCState2VotesDoesNotMeanComplete` diff --git a/internal/rollup-shared-publisher/x/consensus/coordinator.go b/internal/rollup-shared-publisher/x/consensus/coordinator.go index 581d476c1a..5ed96d6cff 100644 --- a/internal/rollup-shared-publisher/x/consensus/coordinator.go +++ b/internal/rollup-shared-publisher/x/consensus/coordinator.go @@ -141,6 +141,8 @@ func (c *coordinator) StartTransaction(ctx context.Context, from string, xtReq * } // Timeout only for leader; followers rely on the SP decision + // TODO: followers (sequencers) should also have timeouts, with the logic that: + // if a vote hasn't yet been sent, send it with vote = false if c.config.Role == Leader { state.Timer = time.AfterFunc(c.config.Timeout, func() { c.handleTimeout(xtID) @@ -208,6 +210,10 @@ func (c *coordinator) RecordVote(xtID *pb.XtID, chainID string, vote bool) (Deci return c.handleCommit(xtID, state), nil } } else { + // TODO: remove this logic. A sequencer receiving a vote from another sequencer doesn't imply + // that is should broadcast its own vote. + // Or is this function used by the sequencer to ONLY record its own vote? + // Follower broadcasts vote c.callbackMgr.InvokeVote(xtID, vote, voteLatency) } diff --git a/internal/rollup-shared-publisher/x/consensus/coordinator_test.go b/internal/rollup-shared-publisher/x/consensus/coordinator_test.go new file mode 100644 index 0000000000..7921363152 --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/coordinator_test.go @@ -0,0 +1,231 @@ +package consensus + +import ( + "context" + "testing" + "time" + + pb "github.com/ethereum/go-ethereum/internal/rollup-shared-publisher/proto/rollup/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCoordinatorStartTransactionRegistersStateAndCallback(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + + startCalled := make(chan struct{}, 1) + coord.SetStartCallback(func(ctx context.Context, from string, req *pb.XTRequest) error { + startCalled <- struct{}{} + return nil + }) + + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + select { + case <-startCalled: + case <-time.After(time.Second): + t.Fatal("expected start callback invocation") + } + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + state, ok := coord.GetState(xtID) + require.True(t, ok) + + assert.Equal(t, StateUndecided, state.GetDecision()) + assert.Len(t, state.ParticipatingChains, 2) + assert.NotNil(t, state.Timer, "leader should arm timeout timer") +} + +func TestCoordinatorRecordVoteCommitTriggersDecision(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + chainAKey := ChainKeyBytes(chainA) + chainBKey := ChainKeyBytes(chainB) + + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + decisionCh := make(chan bool, 1) + coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error { + if id.Hex() == xtID.Hex() { + decisionCh <- decision + } + return nil + }) + + state, ok := coord.GetState(xtID) + require.True(t, ok) + + result, err := coord.RecordVote(xtID, chainAKey, true) + require.NoError(t, err) + assert.Equal(t, StateUndecided, result) + + result, err = coord.RecordVote(xtID, chainBKey, true) + require.NoError(t, err) + assert.Equal(t, StateCommit, result) + + select { + case decision := <-decisionCh: + assert.True(t, decision) + case <-time.After(time.Second): + t.Fatal("expected decision callback for commit") + } + + assert.Equal(t, StateCommit, state.GetDecision()) + assert.Len(t, state.Votes, 2) +} + +func TestCoordinatorRecordVoteAbort(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + chainAKey := ChainKeyBytes(chainA) + + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + decisionCh := make(chan bool, 1) + coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error { + if id.Hex() == xtID.Hex() { + decisionCh <- decision + } + return nil + }) + + state, ok := coord.GetState(xtID) + require.True(t, ok) + + result, err := coord.RecordVote(xtID, chainAKey, false) + require.NoError(t, err) + assert.Equal(t, StateAbort, result) + + select { + case decision := <-decisionCh: + assert.False(t, decision) + case <-time.After(time.Second): + t.Fatal("expected decision callback for abort") + } + + assert.Equal(t, StateAbort, state.GetDecision()) +} + +func TestCoordinatorRecordVoteRejectsUnknownParticipant(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + _, err = coord.RecordVote(xtID, "ff", true) + require.Error(t, err) + assert.Contains(t, err.Error(), "not participating") +} + +func TestCoordinatorRecordCIRCMessageAndConsume(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + circ := &pb.CIRCMessage{ + SourceChain: append([]byte(nil), chainB...), + DestinationChain: append([]byte(nil), chainA...), + XtId: xtID, + Data: [][]byte{[]byte("payload")}, + } + + require.NoError(t, coord.RecordCIRCMessage(circ)) + + msg, err := coord.ConsumeCIRCMessage(xtID, ChainKeyBytes(chainB)) + require.NoError(t, err) + assert.Equal(t, circ, msg) + + _, err = coord.ConsumeCIRCMessage(xtID, ChainKeyBytes(chainB)) + require.Error(t, err) + assert.Contains(t, err.Error(), "no messages available") + + err = coord.RecordCIRCMessage(&pb.CIRCMessage{ + SourceChain: []byte{0xff}, + XtId: xtID, + Data: [][]byte{[]byte("payload")}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "not participating") +} + +func TestCoordinatorRecordDecisionFollower(t *testing.T) { + coord := newTestCoordinator(t, Follower) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + decisionCh := make(chan bool, 1) + coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error { + if id.Hex() == xtID.Hex() { + decisionCh <- decision + } + return nil + }) + + require.NoError(t, coord.RecordDecision(xtID, true)) + + select { + case decision := <-decisionCh: + assert.True(t, decision) + case <-time.After(time.Second): + t.Fatal("expected decision callback for follower") + } + + state, ok := coord.GetState(xtID) + require.True(t, ok) + assert.Equal(t, StateCommit, state.GetDecision()) +} + +func TestCoordinatorRecordDecisionRejectedForLeader(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + err = coord.RecordDecision(xtID, true) + require.Error(t, err) + assert.Contains(t, err.Error(), "only followers can record decisions") + + state, ok := coord.GetState(xtID) + require.True(t, ok) + assert.Equal(t, StateUndecided, state.GetDecision()) +} diff --git a/internal/rollup-shared-publisher/x/consensus/interfaces.go b/internal/rollup-shared-publisher/x/consensus/interfaces.go index 3995d5be01..e9bf604672 100644 --- a/internal/rollup-shared-publisher/x/consensus/interfaces.go +++ b/internal/rollup-shared-publisher/x/consensus/interfaces.go @@ -50,18 +50,16 @@ type BlockFn func(ctx context.Context, block *types.Block, xtIDs []*pb.XtID) err // Config holds coordinator configuration type Config struct { - NodeID string - IsLeader bool - Timeout time.Duration - Role Role + NodeID string + Timeout time.Duration + Role Role } // DefaultConfig returns sensible defaults func DefaultConfig(nodeID string) Config { return Config{ - NodeID: nodeID, - IsLeader: true, - Timeout: time.Minute, - Role: Leader, + NodeID: nodeID, + Timeout: 4 * time.Second, + Role: Leader, } } diff --git a/internal/rollup-shared-publisher/x/consensus/protocol_handler.go b/internal/rollup-shared-publisher/x/consensus/protocol_handler.go index 70ef81688c..9c907a97e1 100644 --- a/internal/rollup-shared-publisher/x/consensus/protocol_handler.go +++ b/internal/rollup-shared-publisher/x/consensus/protocol_handler.go @@ -8,7 +8,6 @@ import ( "github.com/rs/zerolog" ) -// ProtocolHandler defines the interface for SCP protocol message handling // ProtocolHandler defines the interface for SCP protocol message handling type ProtocolHandler interface { // Handle processes SCP protocol messages diff --git a/internal/rollup-shared-publisher/x/consensus/protocol_handler_test.go b/internal/rollup-shared-publisher/x/consensus/protocol_handler_test.go new file mode 100644 index 0000000000..3d2337c96b --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/protocol_handler_test.go @@ -0,0 +1,359 @@ +package consensus + +import ( + "context" + "errors" + "io" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + pb "github.com/ethereum/go-ethereum/internal/rollup-shared-publisher/proto/rollup/v1" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type contextKey string + +func TestProtocolHandlerHandleXTRequest(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + xtReq := buildXTRequest([]byte{0x01}) + ctx := context.WithValue(context.Background(), contextKey("test"), "value") + msg := &pb.Message{ + Payload: &pb.Message_XtRequest{XtRequest: xtReq}, + } + + err := handler.Handle(ctx, "sp", msg) + require.NoError(t, err) + + require.Len(t, stub.startCalls, 1) + call := stub.startCalls[0] + assert.Same(t, ctx, call.ctx) + assert.Equal(t, "sp", call.from) + assert.Equal(t, xtReq, call.req) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.startErr = errors.New("start failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + xtReq := buildXTRequest([]byte{0x02}) + msg := &pb.Message{ + Payload: &pb.Message_XtRequest{XtRequest: xtReq}, + } + + err := handler.Handle(context.Background(), "sp", msg) + require.ErrorIs(t, err, stub.startErr) + require.Len(t, stub.startCalls, 1) + }) +} + +func TestProtocolHandlerHandleVote(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.voteDecision = StateCommit + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + xtID := &pb.XtID{Hash: []byte{0xaa}} + vote := &pb.Vote{ + SenderChainId: []byte{0x0a}, + XtId: xtID, + Vote: true, + } + msg := &pb.Message{ + Payload: &pb.Message_Vote{Vote: vote}, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.NoError(t, err) + + require.Len(t, stub.voteCalls, 1) + call := stub.voteCalls[0] + assert.Equal(t, xtID, call.xtID) + assert.Equal(t, ChainKeyBytes(vote.SenderChainId), call.chainID) + assert.True(t, call.vote) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.voteErr = errors.New("vote failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + msg := &pb.Message{ + Payload: &pb.Message_Vote{ + Vote: &pb.Vote{ + SenderChainId: []byte{0x01}, + XtId: &pb.XtID{Hash: []byte{0x01}}, + Vote: false, + }, + }, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.ErrorIs(t, err, stub.voteErr) + require.Len(t, stub.voteCalls, 1) + }) +} + +func TestProtocolHandlerHandleDecided(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + decided := &pb.Decided{ + XtId: &pb.XtID{Hash: []byte{0x0b}}, + Decision: true, + } + msg := &pb.Message{ + Payload: &pb.Message_Decided{Decided: decided}, + } + + err := handler.Handle(context.Background(), "sp", msg) + require.NoError(t, err) + + require.Len(t, stub.decisionCalls, 1) + call := stub.decisionCalls[0] + assert.Equal(t, decided.XtId, call.xtID) + assert.Equal(t, decided.Decision, call.decision) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.decisionErr = errors.New("decision failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + msg := &pb.Message{ + Payload: &pb.Message_Decided{ + Decided: &pb.Decided{ + XtId: &pb.XtID{Hash: []byte{0x0c}}, + Decision: false, + }, + }, + } + + err := handler.Handle(context.Background(), "sp", msg) + require.ErrorIs(t, err, stub.decisionErr) + require.Len(t, stub.decisionCalls, 1) + }) +} + +func TestProtocolHandlerHandleCIRCMessage(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + circ := &pb.CIRCMessage{ + SourceChain: []byte{0x01}, + DestinationChain: []byte{0x02}, + XtId: &pb.XtID{Hash: []byte{0x0d}}, + Data: [][]byte{[]byte("payload")}, + } + msg := &pb.Message{ + Payload: &pb.Message_CircMessage{CircMessage: circ}, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.NoError(t, err) + + require.Len(t, stub.circCalls, 1) + assert.Equal(t, circ, stub.circCalls[0].message) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.circErr = errors.New("circ failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + msg := &pb.Message{ + Payload: &pb.Message_CircMessage{ + CircMessage: &pb.CIRCMessage{ + SourceChain: []byte{0x01}, + }, + }, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.ErrorIs(t, err, stub.circErr) + require.Len(t, stub.circCalls, 1) + }) +} + +func TestProtocolHandlerHandleUnknownMessage(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + err := handler.Handle(context.Background(), "peer", &pb.Message{}) + require.Error(t, err) +} + +func TestProtocolHandlerCanHandleAndName(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + assert.Equal(t, "SCP", handler.GetProtocolName()) + + assert.False(t, handler.CanHandle(nil)) + assert.False(t, handler.CanHandle(&pb.Message{ + Payload: &pb.Message_HandshakeRequest{HandshakeRequest: &pb.HandshakeRequest{}}, + })) + assert.True(t, handler.CanHandle(&pb.Message{ + Payload: &pb.Message_XtRequest{XtRequest: buildXTRequest([]byte{0x01})}, + })) +} + +type ( + startCall struct { + ctx context.Context + from string + req *pb.XTRequest + } + voteCall struct { + xtID *pb.XtID + chainID string + vote bool + } + decisionCall struct { + xtID *pb.XtID + decision bool + } + circCall struct { + message *pb.CIRCMessage + } +) + +type stubCoordinator struct { + t *testing.T + + startCalls []startCall + startErr error + + voteCalls []voteCall + voteDecision DecisionState + voteErr error + + decisionCalls []decisionCall + decisionErr error + + circCalls []circCall + circErr error +} + +func newStubCoordinator(t *testing.T) *stubCoordinator { + t.Helper() + return &stubCoordinator{ + t: t, + voteDecision: StateUndecided, + } +} + +func (s *stubCoordinator) StartTransaction(ctx context.Context, from string, xtReq *pb.XTRequest) error { + s.startCalls = append(s.startCalls, startCall{ctx: ctx, from: from, req: xtReq}) + return s.startErr +} + +func (s *stubCoordinator) RecordVote(xtID *pb.XtID, chainID string, vote bool) (DecisionState, error) { + s.voteCalls = append(s.voteCalls, voteCall{xtID: xtID, chainID: chainID, vote: vote}) + return s.voteDecision, s.voteErr +} + +func (s *stubCoordinator) RecordDecision(xtID *pb.XtID, decision bool) error { + s.decisionCalls = append(s.decisionCalls, decisionCall{xtID: xtID, decision: decision}) + return s.decisionErr +} + +func (s *stubCoordinator) GetTransactionState(*pb.XtID) (DecisionState, error) { + s.unexpected("GetTransactionState") + return StateUndecided, nil +} + +func (s *stubCoordinator) GetActiveTransactions() []*pb.XtID { + s.unexpected("GetActiveTransactions") + return nil +} + +func (s *stubCoordinator) GetState(*pb.XtID) (*TwoPCState, bool) { + s.unexpected("GetState") + return nil, false +} + +func (s *stubCoordinator) RecordCIRCMessage(circMessage *pb.CIRCMessage) error { + s.circCalls = append(s.circCalls, circCall{message: circMessage}) + return s.circErr +} + +func (s *stubCoordinator) ConsumeCIRCMessage(*pb.XtID, string) (*pb.CIRCMessage, error) { + s.unexpected("ConsumeCIRCMessage") + return nil, nil +} + +func (s *stubCoordinator) SetStartCallback(StartFn) { + s.unexpected("SetStartCallback") +} + +func (s *stubCoordinator) SetVoteCallback(VoteFn) { + s.unexpected("SetVoteCallback") +} + +func (s *stubCoordinator) SetDecisionCallback(DecisionFn) { + s.unexpected("SetDecisionCallback") +} + +func (s *stubCoordinator) SetBlockCallback(BlockFn) { + s.unexpected("SetBlockCallback") +} + +func (s *stubCoordinator) OnBlockCommitted(context.Context, *types.Block) error { + s.unexpected("OnBlockCommitted") + return nil +} + +func (s *stubCoordinator) OnL2BlockCommitted(context.Context, *pb.L2Block) error { + s.unexpected("OnL2BlockCommitted") + return nil +} + +func (s *stubCoordinator) Start(context.Context) error { + s.unexpected("Start") + return nil +} + +func (s *stubCoordinator) Stop(context.Context) error { + s.unexpected("Stop") + return nil +} + +func (s *stubCoordinator) unexpected(method string) { + if s.t != nil { + s.t.Helper() + s.t.Fatalf("unexpected call to %s", method) + } +} diff --git a/internal/rollup-shared-publisher/x/consensus/state_manager_test.go b/internal/rollup-shared-publisher/x/consensus/state_manager_test.go new file mode 100644 index 0000000000..e2faffab8a --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/state_manager_test.go @@ -0,0 +1,56 @@ +package consensus + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStateManagerAddGetRemove(t *testing.T) { + sm := NewStateManager() + defer sm.Shutdown() + + xtReq := buildXTRequest([]byte{0x01}) + xtID, err := xtReq.XtID() + require.NoError(t, err) + + state, err := sm.AddState(xtID, xtReq, xtReq.ChainIDs()) + require.NoError(t, err) + require.NotNil(t, state) + + retrieved, ok := sm.GetState(xtID) + require.True(t, ok) + require.Equal(t, state, retrieved) + + _, err = sm.AddState(xtID, xtReq, xtReq.ChainIDs()) + require.Error(t, err) + + active := sm.GetAllActiveIDs() + assert.Len(t, active, 1) + + sm.RemoveState(xtID) + _, ok = sm.GetState(xtID) + assert.False(t, ok) +} + +func TestStateManagerCleanupRemovesCompletedStates(t *testing.T) { + sm := NewStateManager() + defer sm.Shutdown() + + xtReq := buildXTRequest([]byte{0x01}) + xtID, err := xtReq.XtID() + require.NoError(t, err) + + state, err := sm.AddState(xtID, xtReq, xtReq.ChainIDs()) + require.NoError(t, err) + + state.SetDecision(StateCommit) + state.StartTime = time.Now().Add(-11 * time.Minute) + + sm.cleanup() + + _, ok := sm.GetState(xtID) + assert.False(t, ok, "cleanup should remove old committed state") +} diff --git a/internal/rollup-shared-publisher/x/consensus/test_helpers_test.go b/internal/rollup-shared-publisher/x/consensus/test_helpers_test.go new file mode 100644 index 0000000000..adae598100 --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/test_helpers_test.go @@ -0,0 +1,51 @@ +package consensus + +import ( + "context" + "io" + "testing" + "time" + + pb "github.com/ethereum/go-ethereum/internal/rollup-shared-publisher/proto/rollup/v1" + "github.com/rs/zerolog" +) + +func newTestCoordinator(t *testing.T, role Role) *coordinator { + t.Helper() + + logger := zerolog.New(io.Discard) + cfg := Config{ + NodeID: "test-node", + Timeout: 50 * time.Millisecond, + Role: role, + } + + coordIface := NewWithMetrics(logger, cfg, NewNoOpMetrics()) + coord := coordIface.(*coordinator) + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if err := coord.Stop(ctx); err != nil { + t.Fatalf("failed to stop coordinator: %v", err) + } + }) + + return coord +} + +func buildXTRequest(chainIDs ...[]byte) *pb.XTRequest { + txs := make([]*pb.TransactionRequest, 0, len(chainIDs)) + for idx, id := range chainIDs { + if len(id) == 0 { + continue + } + chainCopy := append([]byte(nil), id...) + txs = append(txs, &pb.TransactionRequest{ + ChainId: chainCopy, + Transaction: [][]byte{[]byte{byte(idx)}}, + }) + } + return &pb.XTRequest{Transactions: txs} +} diff --git a/internal/rollup-shared-publisher/x/consensus/twopc_state_test.go b/internal/rollup-shared-publisher/x/consensus/twopc_state_test.go new file mode 100644 index 0000000000..a997af87de --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/twopc_state_test.go @@ -0,0 +1,131 @@ +package consensus + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestTwoPCState(t *testing.T) (*TwoPCState, map[string]struct{}) { + t.Helper() + + chainA := []byte{0x01} + chainB := []byte{0x02} + + xtReq := buildXTRequest(chainA, chainB) + xtID, err := xtReq.XtID() + require.NoError(t, err) + + chains := map[string]struct{}{ + ChainKeyBytes(chainA): {}, + ChainKeyBytes(chainB): {}, + } + + state := NewTwoPCState(xtID, xtReq, chains) + return state, chains +} + +func TestNewTwoPCStateInitializesFields(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + + assert.NotNil(t, state.XTID) + assert.Equal(t, StateUndecided, state.GetDecision()) + assert.Equal(t, len(chains), state.GetParticipantCount()) + assert.Equal(t, 0, state.GetVoteCount()) + assert.NotNil(t, state.XTRequest) + assert.NotNil(t, state.CIRCMessages) + assert.Nil(t, state.Timer) + assert.False(t, state.StartTime.IsZero()) +} + +func TestTwoPCStateAddVote(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + // Get frist chain key + var chain string + for k := range chains { + chain = k + break + } + + added := state.AddVote(chain, true) + assert.True(t, added) + assert.Equal(t, 1, state.GetVoteCount()) + + readded := state.AddVote(chain, false) + assert.False(t, readded) + + votes := state.GetVotes() + assert.Equal(t, true, votes[chain]) +} + +func TestTwoPCStateDecisionFlow(t *testing.T) { + t.Parallel() + + state, _ := newTestTwoPCState(t) + + assert.False(t, state.IsComplete()) + assert.Equal(t, StateUndecided, state.GetDecision()) + + state.SetDecision(StateCommit) + + assert.True(t, state.IsComplete()) + assert.Equal(t, StateCommit, state.GetDecision()) +} + +func TestTwoPCStateGetVotesReturnsCopy(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + + for chain := range chains { + require.True(t, state.AddVote(chain, true)) + } + + clone := state.GetVotes() + for k := range clone { + clone[k] = false + } + + original := state.GetVotes() + for chain := range chains { + assert.True(t, original[chain]) + } +} + +func TestTwoPCStateGetDurationIncreases(t *testing.T) { + t.Parallel() + + state, _ := newTestTwoPCState(t) + initial := state.GetDuration() + assert.GreaterOrEqual(t, int64(initial), int64(0)) + + time.Sleep(5 * time.Millisecond) + + later := state.GetDuration() + assert.Greater(t, later, initial) +} + +func TestTwoPCState2VotesDoesNotMeanComplete(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + + count := 0 + for chain := range chains { + require.True(t, state.AddVote(chain, true)) + count++ + if count < state.GetParticipantCount() { + assert.False(t, state.IsComplete()) + } + } + + assert.Equal(t, state.GetParticipantCount(), state.GetVoteCount()) + assert.False(t, state.IsComplete(), "adding votes should not set completion") + assert.Equal(t, StateUndecided, state.GetDecision()) // decision should remain undecided +} diff --git a/internal/rollup-shared-publisher/x/consensus/types.go b/internal/rollup-shared-publisher/x/consensus/types.go index 49a8782d26..1146b02cd8 100644 --- a/internal/rollup-shared-publisher/x/consensus/types.go +++ b/internal/rollup-shared-publisher/x/consensus/types.go @@ -59,6 +59,9 @@ func (s DecisionState) String() string { } // TwoPCState holds state for a single cross-chain transaction +// Important: it doesn't automatically set the decision state based on votes; +// that logic must be handled externally to allow for custom decision policies. +// TODO: we should add the decision logic here as well, making it a complete 2PC state manager. type TwoPCState struct { mu sync.RWMutex XTID *pb.XtID diff --git a/internal/rollup-shared-publisher/x/superblock/protocol/README.md b/internal/rollup-shared-publisher/x/superblock/protocol/README.md new file mode 100644 index 0000000000..9b35f3818b --- /dev/null +++ b/internal/rollup-shared-publisher/x/superblock/protocol/README.md @@ -0,0 +1,62 @@ +# Superblock Protocol Module + +This package hands the interface for SBCP, with messages with callbacks and validation functions. + +## Documentation + +```mermaid +classDiagram + direction LR + + class handler { + -messageHandler: MessageHandler + -validator: Validator + +Handle(ctx, from, msg) error + +CanHandle(msg) bool + +GetProtocolName() string + } + + class MessageHandler { + <> + +HandleStartSlot(ctx, from, StartSlot) error + +HandleRequestSeal(ctx, from, RequestSeal) error + +HandleL2Block(ctx, from, L2Block) error + +HandleStartSC(ctx, from, StartSC) error + +HandleRollBackAndStartSlot(ctx, from, RollBackAndStartSlot) error + } + + class Validator { + +ValidateStartSlot(StartSlot) error + +ValidateRequestSeal(RequestSeal) error + +ValidateL2Block(L2Block) error + +ValidateStartSC(StartSC) error + +ValidateRollBackAndStartSlot(RollBackAndStartSlot) error + } + + class MessageType { + <> + MsgStartSlot + MsgRequestSeal + MsgL2Block + MsgStartSC + MsgRollBackAndStartSlot + } + + + handler --> MessageHandler : Processes with + handler --> Validator : Validates with + MessageType --> handler : Msgs sent to + +``` + +### MessageType (types.go) +Enum describing of the SBCP message types (start slot, request seal, L2 block submission, start SC, and rollback). + +### Validator (interface and `basicValidator` implementation) +Validation functions of SBCP messages. + +### MessageHandler (interface) +Holds callbacks for each SBCP message. + +### Handler (interface and `handler` implementation) +Holds a validator and a message handler. Receives messages by validating and then processing them. diff --git a/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go b/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go index a006e1d805..3806075328 100644 --- a/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go +++ b/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go @@ -77,7 +77,6 @@ func Setup(ctx context.Context, cfg Config) (*Runtime, error) { nodeID := fmt.Sprintf("sequencer-%d", time.Now().UnixNano()) c := consensus.DefaultConfig(nodeID) c.Role = consensus.Follower - c.IsLeader = false c.Timeout = time.Minute base = consensus.New(log, c) } diff --git a/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go b/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go index f6e0fe217b..b8743a66a2 100644 --- a/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go +++ b/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go @@ -363,6 +363,8 @@ func (sc *SequencerCoordinator) handleStartSC( if lastSeq, ok := sc.scpIntegration.GetLastDecidedSequenceNumber(); ok { requiredSeq = lastSeq + 1 } + // TODO startSC.XtSequenceNumber doesn't need to be prev+1 + // check should be: startSC.XtSequenceNumber > next (= last+1) if startSC.XtSequenceNumber != requiredSeq { sc.log.Warn(). Uint64("got_seq", startSC.XtSequenceNumber). @@ -395,6 +397,9 @@ func (sc *SequencerCoordinator) handleStartSC( // Extract our transactions myTxs := sc.extractMyTransactions(startSC.XtRequest) + // TODO: set default vote result to be false + // if len(myTxs) == 0, set to true + // but if sc.callbacks.SimulateAndVote != nil, set to false var voteResult = true if sc.callbacks.SimulateAndVote != nil && len(myTxs) > 0 { diff --git a/internal/rollup-shared-publisher/x/superblock/slot/README.md b/internal/rollup-shared-publisher/x/superblock/slot/README.md new file mode 100644 index 0000000000..03ac9e9b40 --- /dev/null +++ b/internal/rollup-shared-publisher/x/superblock/slot/README.md @@ -0,0 +1,99 @@ +# SBCP State Transition (Slot Module) + +> [!WARNING] +> Probably dead code besides `Config`. + +This package models the SBCP state transitions. + +## Documentation + +```mermaid +classDiagram + direction LR + + class Config { + <> + Duration: time.Duration + SealCutover: float64 + GenesisTime: time.Time + } + + class Manager { + -genesisTime: time.Time + -slotDuration: time.Duration + -sealCutoverFraction: float64 + +GetCurrentSlot() uint64 + +GetSlotStartTime(slot) time.Time + +GetSlotProgress() float64 + +IsSlotSealTime() bool + +WaitForNextSlot(ctx) error + +GetSealTime(slot) time.Time + +GetSlotEndTime(slot) time.Time + } + + class State { + <> + StateStarting + StateFree + StateLocked + StateSealing + } + + class StateMachine { + -currentState: State + -currentSlot: uint64 + -slotManager: Manager + -receivedL2Blocks: map[string]pb.L2Block + -scpInstances: map[string]SCPInstance + -l2BlockRequests: map[string]pb.L2BlockRequest + -lastHeads: map[string]pb.L2Block + -stateChangeCallbacks: map[State][]StateChangeCallback + -transitionHistory: []StateTransition + +BeginSlot(...) + +StartSCP(...) + +ProcessSCPDecision(...) + +RequestSeal(...) + +ReceiveL2Block(...) + +RegisterStateChangeCallback(state, cb) + +TransitionTo(newState, reason) + } + + StateMachine --> State : Upon calls, transitions through + StateMachine --> Manager : Holds, but unused + +``` + +### Config +Defines slot timing parameters: `Duration`, `SealCutover`, and optional `GenesisTime`. Default sets slot to 6 seconds. + +### Manager +Holds slot timings utils. +Namely, it reports the current slot, +time remaining until seal or its end, +provides a function for waiting for the next slot, +and provides other deterministic timestamps based on genesis. + +### State +Enumerates the lifecycle states: `StateStarting`, `StateFree`, `StateLocked`, and `StateSealing` + +### StateChangeCallback +Observers interested in state transitions. +Whenever the state changes to `S`, the callbacks associated to `S`are invoked. + +### StateTransition (state_machine.go) +Historical record describing each transition, including timestamp and human-readable reason. The state machine maintains these entries for diagnostics. + +### StateMachine + +Manages the state progress throughout the slot. Holds funcions such as: +- `BeginSlot`: resetting the state for the next slot. +- `StartSCP`: updating the state with the new instance, if free. +- `ProcessSCPDecision`: switching to the `StateLocked` state. +- `RequestSeal`: sets it to sealing state. +- `ReceiveL2Blocks`: updates local state with new blocks. + +Also holds a general `TransitionTo` function, which triggers the callbacks for that state, for external callers to invoke. + + +### SCPInstance +Metadata about an SCP instanced with XT identifier, slot, participants, vote map, and decision.