From be128d223dd7066eabe826d97688edd4002dbcb5 Mon Sep 17 00:00:00 2001 From: MatheusFranco99 <48058141+MatheusFranco99@users.noreply.github.com> Date: Sun, 19 Oct 2025 15:51:38 +0100 Subject: [PATCH 1/5] add session id to CIRCMessage --- eth/ssv_mailbox_processor.go | 7 +++++++ .../proto/rollup/v1/rollup_messages.pb.go | 18 ++++++++++++++---- .../proto/rollup/v1/rollup_messages.proto | 1 + 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/eth/ssv_mailbox_processor.go b/eth/ssv_mailbox_processor.go index 9e185217c0..3e19bcff22 100644 --- a/eth/ssv_mailbox_processor.go +++ b/eth/ssv_mailbox_processor.go @@ -474,6 +474,7 @@ func (mp *MailboxProcessor) handleCrossRollupCoordination( dep.Receiver = common.BytesToAddress(circMsg.Receiver[0]) } dep.Data = circMsg.Data[0] + dep.SessionID = new(big.Int).SetUint64(circMsg.SessionId) circDeps = append(circDeps, dep) } @@ -490,6 +491,11 @@ func (mp *MailboxProcessor) handleCrossRollupCoordination( } func (mp *MailboxProcessor) sendCIRCMessage(ctx context.Context, msg *CrossRollupMessage, xtID *rollupv1.XtID) error { + var sessionID uint64 + if msg.SessionID != nil { + sessionID = msg.SessionID.Uint64() + } + // Build CIRC payload circMsg := &rollupv1.CIRCMessage{ SourceChain: new(big.Int).SetUint64(msg.SourceChainID).Bytes(), @@ -499,6 +505,7 @@ func (mp *MailboxProcessor) sendCIRCMessage(ctx context.Context, msg *CrossRollu XtId: xtID, Label: string(msg.Label), Data: [][]byte{msg.Data}, + SessionId: sessionID, } spMsg := &rollupv1.Message{ diff --git a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go index cdb2513475..b1171d4ca3 100644 --- a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go +++ b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v6.32.1 +// protoc v5.29.3 // source: rollup_messages.proto package rollupv1 @@ -574,7 +574,8 @@ type CIRCMessage struct { Receiver [][]byte `protobuf:"bytes,4,rep,name=receiver,proto3" json:"receiver,omitempty"` XtId *XtID `protobuf:"bytes,5,opt,name=xt_id,json=xtId,proto3" json:"xt_id,omitempty"` Label string `protobuf:"bytes,6,opt,name=label,proto3" json:"label,omitempty"` - Data [][]byte `protobuf:"bytes,7,rep,name=data,proto3" json:"data,omitempty"` // ABI encoded data + Data [][]byte `protobuf:"bytes,7,rep,name=data,proto3" json:"data,omitempty"` // ABI encoded data + SessionId uint64 `protobuf:"varint,8,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -658,6 +659,13 @@ func (x *CIRCMessage) GetData() [][]byte { return nil } +func (x *CIRCMessage) GetSessionId() uint64 { + if x != nil { + return x.SessionId + } + return 0 +} + // Auxiliary message for requesting specific L2 blocks type L2BlockRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1399,7 +1407,7 @@ const file_rollup_messages_proto_rawDesc = "" + "\bchain_id\x18\x01 \x01(\fR\achainId\x12\x1d\n" + "\n" + "block_data\x18\x02 \x01(\fR\tblockData\x127\n" + - "\x0fincluded_xt_ids\x18\x03 \x03(\v2\x0f.rollup.v1.XtIDR\rincludedXtIds\"\xe1\x01\n" + + "\x0fincluded_xt_ids\x18\x03 \x03(\v2\x0f.rollup.v1.XtIDR\rincludedXtIds\"\x80\x02\n" + "\vCIRCMessage\x12!\n" + "\fsource_chain\x18\x01 \x01(\fR\vsourceChain\x12+\n" + "\x11destination_chain\x18\x02 \x01(\fR\x10destinationChain\x12\x16\n" + @@ -1407,7 +1415,9 @@ const file_rollup_messages_proto_rawDesc = "" + "\breceiver\x18\x04 \x03(\fR\breceiver\x12$\n" + "\x05xt_id\x18\x05 \x01(\v2\x0f.rollup.v1.XtIDR\x04xtId\x12\x14\n" + "\x05label\x18\x06 \x01(\tR\x05label\x12\x12\n" + - "\x04data\x18\a \x03(\fR\x04data\"o\n" + + "\x04data\x18\a \x03(\fR\x04data\x12\x1d\n" + + "\n" + + "session_id\x18\b \x01(\x04R\tsessionId\"o\n" + "\x0eL2BlockRequest\x12\x19\n" + "\bchain_id\x18\x01 \x01(\fR\achainId\x12!\n" + "\fblock_number\x18\x02 \x01(\x04R\vblockNumber\x12\x1f\n" + diff --git a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto index 3ef12ce759..c241f082b4 100644 --- a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto +++ b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto @@ -73,6 +73,7 @@ message CIRCMessage { XtID xt_id = 5; string label = 6; repeated bytes data = 7; // ABI encoded data + uint64 session_id = 8; } // ============================================================================= From 5a348bdb752995a68f02e545fcfe9098a7649210 Mon Sep 17 00:00:00 2001 From: MatheusFranco99 <48058141+MatheusFranco99@users.noreply.github.com> Date: Mon, 20 Oct 2025 09:35:27 +0100 Subject: [PATCH 2/5] Set CIRCMessage.SessionID to type U256 --- cmd/xbridge/main.go | 2 ++ cmd/xclient/main.go | 2 ++ eth/ssv_mailbox_processor.go | 6 +++--- .../proto/rollup/v1/rollup_messages.pb.go | 10 +++++----- .../proto/rollup/v1/rollup_messages.proto | 2 +- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cmd/xbridge/main.go b/cmd/xbridge/main.go index d8092fe014..31ea8929d6 100644 --- a/cmd/xbridge/main.go +++ b/cmd/xbridge/main.go @@ -375,8 +375,10 @@ func getNonceFor(networkRPCAddr string, address common.Address) (uint64, error) } // generateRandomSessionID returns a random big.Int in the range [0, 2^63-1] +// TODO: use [0, 2^256) func generateRandomSessionID() *big.Int { max := new(big.Int).Lsh(big.NewInt(1), 63) + // max := new(big.Int).Lsh(big.NewInt(1), 256) n, err := rand.Int(rand.Reader, max) if err != nil { log.Fatalf("failed to generate random session ID: %v", err) diff --git a/cmd/xclient/main.go b/cmd/xclient/main.go index 509c7a79c6..b803450d82 100644 --- a/cmd/xclient/main.go +++ b/cmd/xclient/main.go @@ -236,8 +236,10 @@ func getNonceFor(networkRPCAddr string, address common.Address) (uint64, error) } // generateRandomSessionID returns a random big.Int in the range [0, 2^63-1] +// TODO: use [0, 2^256) func generateRandomSessionID() *big.Int { max := new(big.Int).Lsh(big.NewInt(1), 63) // 2^63 + // max := new(big.Int).Lsh(big.NewInt(1), 256) n, err := rand.Int(rand.Reader, max) if err != nil { log.Fatalf("failed to generate random session ID: %v", err) diff --git a/eth/ssv_mailbox_processor.go b/eth/ssv_mailbox_processor.go index 3e19bcff22..38985902cb 100644 --- a/eth/ssv_mailbox_processor.go +++ b/eth/ssv_mailbox_processor.go @@ -474,7 +474,7 @@ func (mp *MailboxProcessor) handleCrossRollupCoordination( dep.Receiver = common.BytesToAddress(circMsg.Receiver[0]) } dep.Data = circMsg.Data[0] - dep.SessionID = new(big.Int).SetUint64(circMsg.SessionId) + dep.SessionID = new(big.Int).SetBytes(circMsg.SessionId) circDeps = append(circDeps, dep) } @@ -491,9 +491,9 @@ func (mp *MailboxProcessor) handleCrossRollupCoordination( } func (mp *MailboxProcessor) sendCIRCMessage(ctx context.Context, msg *CrossRollupMessage, xtID *rollupv1.XtID) error { - var sessionID uint64 + var sessionID []byte if msg.SessionID != nil { - sessionID = msg.SessionID.Uint64() + sessionID = common.LeftPadBytes(msg.SessionID.Bytes(), 32) } // Build CIRC payload diff --git a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go index b1171d4ca3..2e15555d2a 100644 --- a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go +++ b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.pb.go @@ -574,8 +574,8 @@ type CIRCMessage struct { Receiver [][]byte `protobuf:"bytes,4,rep,name=receiver,proto3" json:"receiver,omitempty"` XtId *XtID `protobuf:"bytes,5,opt,name=xt_id,json=xtId,proto3" json:"xt_id,omitempty"` Label string `protobuf:"bytes,6,opt,name=label,proto3" json:"label,omitempty"` - Data [][]byte `protobuf:"bytes,7,rep,name=data,proto3" json:"data,omitempty"` // ABI encoded data - SessionId uint64 `protobuf:"varint,8,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` + Data [][]byte `protobuf:"bytes,7,rep,name=data,proto3" json:"data,omitempty"` // ABI encoded data + SessionId []byte `protobuf:"bytes,8,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -659,11 +659,11 @@ func (x *CIRCMessage) GetData() [][]byte { return nil } -func (x *CIRCMessage) GetSessionId() uint64 { +func (x *CIRCMessage) GetSessionId() []byte { if x != nil { return x.SessionId } - return 0 + return nil } // Auxiliary message for requesting specific L2 blocks @@ -1417,7 +1417,7 @@ const file_rollup_messages_proto_rawDesc = "" + "\x05label\x18\x06 \x01(\tR\x05label\x12\x12\n" + "\x04data\x18\a \x03(\fR\x04data\x12\x1d\n" + "\n" + - "session_id\x18\b \x01(\x04R\tsessionId\"o\n" + + "session_id\x18\b \x01(\fR\tsessionId\"o\n" + "\x0eL2BlockRequest\x12\x19\n" + "\bchain_id\x18\x01 \x01(\fR\achainId\x12!\n" + "\fblock_number\x18\x02 \x01(\x04R\vblockNumber\x12\x1f\n" + diff --git a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto index c241f082b4..25cb7892b4 100644 --- a/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto +++ b/internal/rollup-shared-publisher/proto/rollup/v1/rollup_messages.proto @@ -73,7 +73,7 @@ message CIRCMessage { XtID xt_id = 5; string label = 6; repeated bytes data = 7; // ABI encoded data - uint64 session_id = 8; + bytes session_id = 8; // 32-byte big-endian session identifier (U256) } // ============================================================================= From f271eb6b2e232fdcef4847399e184d5fad94d5b4 Mon Sep 17 00:00:00 2001 From: MatheusFranco99 <48058141+MatheusFranco99@users.noreply.github.com> Date: Mon, 20 Oct 2025 13:13:08 +0100 Subject: [PATCH 3/5] Remove redundant IsLeader field; add TODOs, unit tests, and documentation --- .../x/consensus/README.md | 143 +++++++ .../x/consensus/coordinator.go | 6 + .../x/consensus/coordinator_test.go | 231 +++++++++++ .../x/consensus/interfaces.go | 14 +- .../x/consensus/protocol_handler.go | 1 - .../x/consensus/protocol_handler_test.go | 359 ++++++++++++++++++ .../x/consensus/state_manager_test.go | 56 +++ .../x/consensus/test_helpers_test.go | 51 +++ .../x/consensus/twopc_state_test.go | 131 +++++++ .../x/consensus/types.go | 3 + .../sequencer/bootstrap/bootstrap.go | 1 - .../x/superblock/sequencer/coordinator.go | 5 + 12 files changed, 991 insertions(+), 10 deletions(-) create mode 100644 internal/rollup-shared-publisher/x/consensus/coordinator_test.go create mode 100644 internal/rollup-shared-publisher/x/consensus/protocol_handler_test.go create mode 100644 internal/rollup-shared-publisher/x/consensus/state_manager_test.go create mode 100644 internal/rollup-shared-publisher/x/consensus/test_helpers_test.go create mode 100644 internal/rollup-shared-publisher/x/consensus/twopc_state_test.go diff --git a/internal/rollup-shared-publisher/x/consensus/README.md b/internal/rollup-shared-publisher/x/consensus/README.md index e69de29bb2..6d6c29ec1e 100644 --- a/internal/rollup-shared-publisher/x/consensus/README.md +++ b/internal/rollup-shared-publisher/x/consensus/README.md @@ -0,0 +1,143 @@ +# Consensus Module + +This package contains the consensus coordinator for the SCP. +It includes: +- `TwoPCState` for holding votes and decision state (NOTE: it, however, doesn't perform the 2pc decision logic, i.e. the external caller is responsible for setting the decision). +- `ProtocolHandler`: for receiving messages and routing them to the appropriate coordinator call. +- `Coordinator`: for routing calls to the callback manager (`startFn`, `voteFn`, `decisionFn`, `blockFn`) and managing the 2PC logic. +- `CallbackManager`: for managing the callback functions and invoking them with timeouts. + +## Documentation + +```mermaid +classDiagram + direction LR + + class ProtocolHandler { + Handle(ctx, from, msg) + } + class Coordinator { + StartTransaction(...) + RecordVote(...) + RecordDecision(...) + RecordCIRCMessage(...) + ConsumeCIRCMessage(...) + OnBlockCommitted(...) + OnL2BlockCommitted(...) + } + class CallbackManager { + startFn + voteFn + decisionFn + blockFn + } + class StateManager { + AddState(...) + GetState(...) + RemoveState(...) + } + class TwoPCState { + XTID + Decision + Votes + CIRCMessages + } + + ProtocolHandler --> Coordinator : invokes fn by msg + Coordinator --> CallbackManager : routes callbacks + Coordinator --> StateManager : manage 2PC logic + StateManager o--> TwoPCState : owns instances + +``` + +### Role +Identifies whether a coordinator instance runs as a `Follower` or `Leader`. + +### DecisionState +`DecisionState` tracks the lifecycle of a 2PC transaction: `StateUndecided`, `StateCommit`, `StateAbort`. + +### MessageType +`MessageType` enumerates the protobuf payloads understood by the protocol handler: `MsgXTRequest`, `MsgVote`, `MsgDecided`, and `MsgCIRCMessage`. + +### Config +`Config` bundles static settings for a coordinator: +- a unique `NodeID` +- consensus `Timeout` +- `Role` + +### TwoPCState +`TwoPCState` carries all data for a single cross-rollup transaction: +- `XTID` +- `Decision` +- participating chains +- collected votes (chain id -> bool) +- optional timeout `Timer` +- start timestamp +- original xt request payload +- queued CIRC messages + +> [!NOTE] +> While the structure stores votes and the decision state, it doesn't implement the 2PC logic, i.e.: +> i) timeout triggers Abort decision +> ii) any vote false triggers Abort decision +> iii) all votes true triggers Commit decision +> Instead, the external caller is responsible for setting the decision via `SetDecision`. + +### ProtocolHandler +`ProtocolHandler` forwards protocol messages to the appropriate coordinator method. + +### Coordinator +`Coordinator` is invoked by the `ProtocolHandler. +It handles transaction start, vote/decision recording, CIRC buffering, block notifications, and lifecycle management. + +It holds a `StateManager` for managing multiple `TwoPCState` instances while managing the 2PC logic, and +routes calls to the callback manager (`startFn`, `voteFn`, `decisionFn`, `blockFn`). + +### StateManager +`StateManager` manages several `TwoPCState`. + +### CallbackManager +`CallbackManager` stores the registered callbacks (`StartFn`, `VoteFn`, `DecisionFn`, and `BlockFn`) +and enforces timeouts when invoking them. + +### Chain key helpers +Normalize chain identifiers into the hex-encoded keys used in state maps and logs. + +## Tests + +For running tests, use +```bash +go test -v ./... +``` + +- coordinator_test.go + - `TestCoordinatorStartTransactionRegistersStateAndCallback` + - `TestCoordinatorRecordVoteCommitTriggersDecision` + - `TestCoordinatorRecordVoteAbort` + - `TestCoordinatorRecordVoteRejectsUnknownParticipant` + - `TestCoordinatorRecordCIRCMessageAndConsume` + - `TestCoordinatorRecordDecisionFollower` + - `TestCoordinatorRecordDecisionRejectedForLeader` +- keys_test.go + - `TestChainKeyBytes` + - `TestChainKeyUint64` + - `TestChainKeyConsistency` + - `TestChainKeyZeroSpecialCase` + - `TestChainKeyRoundTrip` +- protocol_handler_test.go + - `TestProtocolHandlerHandleXTRequest` + - `TestProtocolHandlerHandleVote` + - `TestProtocolHandlerHandleDecided` + - `TestProtocolHandlerHandleCIRCMessage` + - `TestProtocolHandlerHandleUnknownMessage` + - `TestProtocolHandlerCanHandleAndName` +- state_manager_test.go + - `TestStateManagerAddGetRemove` + - `TestStateManagerCleanupRemovesCompletedStates` +- twopc_state_test.go + - `TestNewTwoPCStateInitializesFields` + - `TestTwoPCStateAddVote` + - `TestTwoPCStateDecisionFlow` + - `TestTwoPCStateGetVotesReturnsCopy` + - `TestTwoPCStateGetDurationIncreases` + - `TestTwoPCState2VotesDoesNotMeanComplete` diff --git a/internal/rollup-shared-publisher/x/consensus/coordinator.go b/internal/rollup-shared-publisher/x/consensus/coordinator.go index 581d476c1a..5ed96d6cff 100644 --- a/internal/rollup-shared-publisher/x/consensus/coordinator.go +++ b/internal/rollup-shared-publisher/x/consensus/coordinator.go @@ -141,6 +141,8 @@ func (c *coordinator) StartTransaction(ctx context.Context, from string, xtReq * } // Timeout only for leader; followers rely on the SP decision + // TODO: followers (sequencers) should also have timeouts, with the logic that: + // if a vote hasn't yet been sent, send it with vote = false if c.config.Role == Leader { state.Timer = time.AfterFunc(c.config.Timeout, func() { c.handleTimeout(xtID) @@ -208,6 +210,10 @@ func (c *coordinator) RecordVote(xtID *pb.XtID, chainID string, vote bool) (Deci return c.handleCommit(xtID, state), nil } } else { + // TODO: remove this logic. A sequencer receiving a vote from another sequencer doesn't imply + // that is should broadcast its own vote. + // Or is this function used by the sequencer to ONLY record its own vote? + // Follower broadcasts vote c.callbackMgr.InvokeVote(xtID, vote, voteLatency) } diff --git a/internal/rollup-shared-publisher/x/consensus/coordinator_test.go b/internal/rollup-shared-publisher/x/consensus/coordinator_test.go new file mode 100644 index 0000000000..7921363152 --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/coordinator_test.go @@ -0,0 +1,231 @@ +package consensus + +import ( + "context" + "testing" + "time" + + pb "github.com/ethereum/go-ethereum/internal/rollup-shared-publisher/proto/rollup/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCoordinatorStartTransactionRegistersStateAndCallback(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + + startCalled := make(chan struct{}, 1) + coord.SetStartCallback(func(ctx context.Context, from string, req *pb.XTRequest) error { + startCalled <- struct{}{} + return nil + }) + + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + select { + case <-startCalled: + case <-time.After(time.Second): + t.Fatal("expected start callback invocation") + } + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + state, ok := coord.GetState(xtID) + require.True(t, ok) + + assert.Equal(t, StateUndecided, state.GetDecision()) + assert.Len(t, state.ParticipatingChains, 2) + assert.NotNil(t, state.Timer, "leader should arm timeout timer") +} + +func TestCoordinatorRecordVoteCommitTriggersDecision(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + chainAKey := ChainKeyBytes(chainA) + chainBKey := ChainKeyBytes(chainB) + + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + decisionCh := make(chan bool, 1) + coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error { + if id.Hex() == xtID.Hex() { + decisionCh <- decision + } + return nil + }) + + state, ok := coord.GetState(xtID) + require.True(t, ok) + + result, err := coord.RecordVote(xtID, chainAKey, true) + require.NoError(t, err) + assert.Equal(t, StateUndecided, result) + + result, err = coord.RecordVote(xtID, chainBKey, true) + require.NoError(t, err) + assert.Equal(t, StateCommit, result) + + select { + case decision := <-decisionCh: + assert.True(t, decision) + case <-time.After(time.Second): + t.Fatal("expected decision callback for commit") + } + + assert.Equal(t, StateCommit, state.GetDecision()) + assert.Len(t, state.Votes, 2) +} + +func TestCoordinatorRecordVoteAbort(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + chainAKey := ChainKeyBytes(chainA) + + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + decisionCh := make(chan bool, 1) + coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error { + if id.Hex() == xtID.Hex() { + decisionCh <- decision + } + return nil + }) + + state, ok := coord.GetState(xtID) + require.True(t, ok) + + result, err := coord.RecordVote(xtID, chainAKey, false) + require.NoError(t, err) + assert.Equal(t, StateAbort, result) + + select { + case decision := <-decisionCh: + assert.False(t, decision) + case <-time.After(time.Second): + t.Fatal("expected decision callback for abort") + } + + assert.Equal(t, StateAbort, state.GetDecision()) +} + +func TestCoordinatorRecordVoteRejectsUnknownParticipant(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + _, err = coord.RecordVote(xtID, "ff", true) + require.Error(t, err) + assert.Contains(t, err.Error(), "not participating") +} + +func TestCoordinatorRecordCIRCMessageAndConsume(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + circ := &pb.CIRCMessage{ + SourceChain: append([]byte(nil), chainB...), + DestinationChain: append([]byte(nil), chainA...), + XtId: xtID, + Data: [][]byte{[]byte("payload")}, + } + + require.NoError(t, coord.RecordCIRCMessage(circ)) + + msg, err := coord.ConsumeCIRCMessage(xtID, ChainKeyBytes(chainB)) + require.NoError(t, err) + assert.Equal(t, circ, msg) + + _, err = coord.ConsumeCIRCMessage(xtID, ChainKeyBytes(chainB)) + require.Error(t, err) + assert.Contains(t, err.Error(), "no messages available") + + err = coord.RecordCIRCMessage(&pb.CIRCMessage{ + SourceChain: []byte{0xff}, + XtId: xtID, + Data: [][]byte{[]byte("payload")}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "not participating") +} + +func TestCoordinatorRecordDecisionFollower(t *testing.T) { + coord := newTestCoordinator(t, Follower) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + decisionCh := make(chan bool, 1) + coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error { + if id.Hex() == xtID.Hex() { + decisionCh <- decision + } + return nil + }) + + require.NoError(t, coord.RecordDecision(xtID, true)) + + select { + case decision := <-decisionCh: + assert.True(t, decision) + case <-time.After(time.Second): + t.Fatal("expected decision callback for follower") + } + + state, ok := coord.GetState(xtID) + require.True(t, ok) + assert.Equal(t, StateCommit, state.GetDecision()) +} + +func TestCoordinatorRecordDecisionRejectedForLeader(t *testing.T) { + coord := newTestCoordinator(t, Leader) + + chainA := []byte{0x01} + chainB := []byte{0x02} + xtReq := buildXTRequest(chainA, chainB) + require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq)) + + xtID, err := xtReq.XtID() + require.NoError(t, err) + + err = coord.RecordDecision(xtID, true) + require.Error(t, err) + assert.Contains(t, err.Error(), "only followers can record decisions") + + state, ok := coord.GetState(xtID) + require.True(t, ok) + assert.Equal(t, StateUndecided, state.GetDecision()) +} diff --git a/internal/rollup-shared-publisher/x/consensus/interfaces.go b/internal/rollup-shared-publisher/x/consensus/interfaces.go index 3995d5be01..e9bf604672 100644 --- a/internal/rollup-shared-publisher/x/consensus/interfaces.go +++ b/internal/rollup-shared-publisher/x/consensus/interfaces.go @@ -50,18 +50,16 @@ type BlockFn func(ctx context.Context, block *types.Block, xtIDs []*pb.XtID) err // Config holds coordinator configuration type Config struct { - NodeID string - IsLeader bool - Timeout time.Duration - Role Role + NodeID string + Timeout time.Duration + Role Role } // DefaultConfig returns sensible defaults func DefaultConfig(nodeID string) Config { return Config{ - NodeID: nodeID, - IsLeader: true, - Timeout: time.Minute, - Role: Leader, + NodeID: nodeID, + Timeout: 4 * time.Second, + Role: Leader, } } diff --git a/internal/rollup-shared-publisher/x/consensus/protocol_handler.go b/internal/rollup-shared-publisher/x/consensus/protocol_handler.go index 70ef81688c..9c907a97e1 100644 --- a/internal/rollup-shared-publisher/x/consensus/protocol_handler.go +++ b/internal/rollup-shared-publisher/x/consensus/protocol_handler.go @@ -8,7 +8,6 @@ import ( "github.com/rs/zerolog" ) -// ProtocolHandler defines the interface for SCP protocol message handling // ProtocolHandler defines the interface for SCP protocol message handling type ProtocolHandler interface { // Handle processes SCP protocol messages diff --git a/internal/rollup-shared-publisher/x/consensus/protocol_handler_test.go b/internal/rollup-shared-publisher/x/consensus/protocol_handler_test.go new file mode 100644 index 0000000000..3d2337c96b --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/protocol_handler_test.go @@ -0,0 +1,359 @@ +package consensus + +import ( + "context" + "errors" + "io" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + pb "github.com/ethereum/go-ethereum/internal/rollup-shared-publisher/proto/rollup/v1" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type contextKey string + +func TestProtocolHandlerHandleXTRequest(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + xtReq := buildXTRequest([]byte{0x01}) + ctx := context.WithValue(context.Background(), contextKey("test"), "value") + msg := &pb.Message{ + Payload: &pb.Message_XtRequest{XtRequest: xtReq}, + } + + err := handler.Handle(ctx, "sp", msg) + require.NoError(t, err) + + require.Len(t, stub.startCalls, 1) + call := stub.startCalls[0] + assert.Same(t, ctx, call.ctx) + assert.Equal(t, "sp", call.from) + assert.Equal(t, xtReq, call.req) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.startErr = errors.New("start failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + xtReq := buildXTRequest([]byte{0x02}) + msg := &pb.Message{ + Payload: &pb.Message_XtRequest{XtRequest: xtReq}, + } + + err := handler.Handle(context.Background(), "sp", msg) + require.ErrorIs(t, err, stub.startErr) + require.Len(t, stub.startCalls, 1) + }) +} + +func TestProtocolHandlerHandleVote(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.voteDecision = StateCommit + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + xtID := &pb.XtID{Hash: []byte{0xaa}} + vote := &pb.Vote{ + SenderChainId: []byte{0x0a}, + XtId: xtID, + Vote: true, + } + msg := &pb.Message{ + Payload: &pb.Message_Vote{Vote: vote}, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.NoError(t, err) + + require.Len(t, stub.voteCalls, 1) + call := stub.voteCalls[0] + assert.Equal(t, xtID, call.xtID) + assert.Equal(t, ChainKeyBytes(vote.SenderChainId), call.chainID) + assert.True(t, call.vote) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.voteErr = errors.New("vote failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + msg := &pb.Message{ + Payload: &pb.Message_Vote{ + Vote: &pb.Vote{ + SenderChainId: []byte{0x01}, + XtId: &pb.XtID{Hash: []byte{0x01}}, + Vote: false, + }, + }, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.ErrorIs(t, err, stub.voteErr) + require.Len(t, stub.voteCalls, 1) + }) +} + +func TestProtocolHandlerHandleDecided(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + decided := &pb.Decided{ + XtId: &pb.XtID{Hash: []byte{0x0b}}, + Decision: true, + } + msg := &pb.Message{ + Payload: &pb.Message_Decided{Decided: decided}, + } + + err := handler.Handle(context.Background(), "sp", msg) + require.NoError(t, err) + + require.Len(t, stub.decisionCalls, 1) + call := stub.decisionCalls[0] + assert.Equal(t, decided.XtId, call.xtID) + assert.Equal(t, decided.Decision, call.decision) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.decisionErr = errors.New("decision failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + msg := &pb.Message{ + Payload: &pb.Message_Decided{ + Decided: &pb.Decided{ + XtId: &pb.XtID{Hash: []byte{0x0c}}, + Decision: false, + }, + }, + } + + err := handler.Handle(context.Background(), "sp", msg) + require.ErrorIs(t, err, stub.decisionErr) + require.Len(t, stub.decisionCalls, 1) + }) +} + +func TestProtocolHandlerHandleCIRCMessage(t *testing.T) { + t.Parallel() + + t.Run("success", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + circ := &pb.CIRCMessage{ + SourceChain: []byte{0x01}, + DestinationChain: []byte{0x02}, + XtId: &pb.XtID{Hash: []byte{0x0d}}, + Data: [][]byte{[]byte("payload")}, + } + msg := &pb.Message{ + Payload: &pb.Message_CircMessage{CircMessage: circ}, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.NoError(t, err) + + require.Len(t, stub.circCalls, 1) + assert.Equal(t, circ, stub.circCalls[0].message) + }) + + t.Run("propagates error", func(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + stub.circErr = errors.New("circ failed") + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + msg := &pb.Message{ + Payload: &pb.Message_CircMessage{ + CircMessage: &pb.CIRCMessage{ + SourceChain: []byte{0x01}, + }, + }, + } + + err := handler.Handle(context.Background(), "peer", msg) + require.ErrorIs(t, err, stub.circErr) + require.Len(t, stub.circCalls, 1) + }) +} + +func TestProtocolHandlerHandleUnknownMessage(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + err := handler.Handle(context.Background(), "peer", &pb.Message{}) + require.Error(t, err) +} + +func TestProtocolHandlerCanHandleAndName(t *testing.T) { + t.Parallel() + + stub := newStubCoordinator(t) + handler := NewProtocolHandler(stub, zerolog.New(io.Discard)) + + assert.Equal(t, "SCP", handler.GetProtocolName()) + + assert.False(t, handler.CanHandle(nil)) + assert.False(t, handler.CanHandle(&pb.Message{ + Payload: &pb.Message_HandshakeRequest{HandshakeRequest: &pb.HandshakeRequest{}}, + })) + assert.True(t, handler.CanHandle(&pb.Message{ + Payload: &pb.Message_XtRequest{XtRequest: buildXTRequest([]byte{0x01})}, + })) +} + +type ( + startCall struct { + ctx context.Context + from string + req *pb.XTRequest + } + voteCall struct { + xtID *pb.XtID + chainID string + vote bool + } + decisionCall struct { + xtID *pb.XtID + decision bool + } + circCall struct { + message *pb.CIRCMessage + } +) + +type stubCoordinator struct { + t *testing.T + + startCalls []startCall + startErr error + + voteCalls []voteCall + voteDecision DecisionState + voteErr error + + decisionCalls []decisionCall + decisionErr error + + circCalls []circCall + circErr error +} + +func newStubCoordinator(t *testing.T) *stubCoordinator { + t.Helper() + return &stubCoordinator{ + t: t, + voteDecision: StateUndecided, + } +} + +func (s *stubCoordinator) StartTransaction(ctx context.Context, from string, xtReq *pb.XTRequest) error { + s.startCalls = append(s.startCalls, startCall{ctx: ctx, from: from, req: xtReq}) + return s.startErr +} + +func (s *stubCoordinator) RecordVote(xtID *pb.XtID, chainID string, vote bool) (DecisionState, error) { + s.voteCalls = append(s.voteCalls, voteCall{xtID: xtID, chainID: chainID, vote: vote}) + return s.voteDecision, s.voteErr +} + +func (s *stubCoordinator) RecordDecision(xtID *pb.XtID, decision bool) error { + s.decisionCalls = append(s.decisionCalls, decisionCall{xtID: xtID, decision: decision}) + return s.decisionErr +} + +func (s *stubCoordinator) GetTransactionState(*pb.XtID) (DecisionState, error) { + s.unexpected("GetTransactionState") + return StateUndecided, nil +} + +func (s *stubCoordinator) GetActiveTransactions() []*pb.XtID { + s.unexpected("GetActiveTransactions") + return nil +} + +func (s *stubCoordinator) GetState(*pb.XtID) (*TwoPCState, bool) { + s.unexpected("GetState") + return nil, false +} + +func (s *stubCoordinator) RecordCIRCMessage(circMessage *pb.CIRCMessage) error { + s.circCalls = append(s.circCalls, circCall{message: circMessage}) + return s.circErr +} + +func (s *stubCoordinator) ConsumeCIRCMessage(*pb.XtID, string) (*pb.CIRCMessage, error) { + s.unexpected("ConsumeCIRCMessage") + return nil, nil +} + +func (s *stubCoordinator) SetStartCallback(StartFn) { + s.unexpected("SetStartCallback") +} + +func (s *stubCoordinator) SetVoteCallback(VoteFn) { + s.unexpected("SetVoteCallback") +} + +func (s *stubCoordinator) SetDecisionCallback(DecisionFn) { + s.unexpected("SetDecisionCallback") +} + +func (s *stubCoordinator) SetBlockCallback(BlockFn) { + s.unexpected("SetBlockCallback") +} + +func (s *stubCoordinator) OnBlockCommitted(context.Context, *types.Block) error { + s.unexpected("OnBlockCommitted") + return nil +} + +func (s *stubCoordinator) OnL2BlockCommitted(context.Context, *pb.L2Block) error { + s.unexpected("OnL2BlockCommitted") + return nil +} + +func (s *stubCoordinator) Start(context.Context) error { + s.unexpected("Start") + return nil +} + +func (s *stubCoordinator) Stop(context.Context) error { + s.unexpected("Stop") + return nil +} + +func (s *stubCoordinator) unexpected(method string) { + if s.t != nil { + s.t.Helper() + s.t.Fatalf("unexpected call to %s", method) + } +} diff --git a/internal/rollup-shared-publisher/x/consensus/state_manager_test.go b/internal/rollup-shared-publisher/x/consensus/state_manager_test.go new file mode 100644 index 0000000000..e2faffab8a --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/state_manager_test.go @@ -0,0 +1,56 @@ +package consensus + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStateManagerAddGetRemove(t *testing.T) { + sm := NewStateManager() + defer sm.Shutdown() + + xtReq := buildXTRequest([]byte{0x01}) + xtID, err := xtReq.XtID() + require.NoError(t, err) + + state, err := sm.AddState(xtID, xtReq, xtReq.ChainIDs()) + require.NoError(t, err) + require.NotNil(t, state) + + retrieved, ok := sm.GetState(xtID) + require.True(t, ok) + require.Equal(t, state, retrieved) + + _, err = sm.AddState(xtID, xtReq, xtReq.ChainIDs()) + require.Error(t, err) + + active := sm.GetAllActiveIDs() + assert.Len(t, active, 1) + + sm.RemoveState(xtID) + _, ok = sm.GetState(xtID) + assert.False(t, ok) +} + +func TestStateManagerCleanupRemovesCompletedStates(t *testing.T) { + sm := NewStateManager() + defer sm.Shutdown() + + xtReq := buildXTRequest([]byte{0x01}) + xtID, err := xtReq.XtID() + require.NoError(t, err) + + state, err := sm.AddState(xtID, xtReq, xtReq.ChainIDs()) + require.NoError(t, err) + + state.SetDecision(StateCommit) + state.StartTime = time.Now().Add(-11 * time.Minute) + + sm.cleanup() + + _, ok := sm.GetState(xtID) + assert.False(t, ok, "cleanup should remove old committed state") +} diff --git a/internal/rollup-shared-publisher/x/consensus/test_helpers_test.go b/internal/rollup-shared-publisher/x/consensus/test_helpers_test.go new file mode 100644 index 0000000000..adae598100 --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/test_helpers_test.go @@ -0,0 +1,51 @@ +package consensus + +import ( + "context" + "io" + "testing" + "time" + + pb "github.com/ethereum/go-ethereum/internal/rollup-shared-publisher/proto/rollup/v1" + "github.com/rs/zerolog" +) + +func newTestCoordinator(t *testing.T, role Role) *coordinator { + t.Helper() + + logger := zerolog.New(io.Discard) + cfg := Config{ + NodeID: "test-node", + Timeout: 50 * time.Millisecond, + Role: role, + } + + coordIface := NewWithMetrics(logger, cfg, NewNoOpMetrics()) + coord := coordIface.(*coordinator) + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if err := coord.Stop(ctx); err != nil { + t.Fatalf("failed to stop coordinator: %v", err) + } + }) + + return coord +} + +func buildXTRequest(chainIDs ...[]byte) *pb.XTRequest { + txs := make([]*pb.TransactionRequest, 0, len(chainIDs)) + for idx, id := range chainIDs { + if len(id) == 0 { + continue + } + chainCopy := append([]byte(nil), id...) + txs = append(txs, &pb.TransactionRequest{ + ChainId: chainCopy, + Transaction: [][]byte{[]byte{byte(idx)}}, + }) + } + return &pb.XTRequest{Transactions: txs} +} diff --git a/internal/rollup-shared-publisher/x/consensus/twopc_state_test.go b/internal/rollup-shared-publisher/x/consensus/twopc_state_test.go new file mode 100644 index 0000000000..a997af87de --- /dev/null +++ b/internal/rollup-shared-publisher/x/consensus/twopc_state_test.go @@ -0,0 +1,131 @@ +package consensus + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestTwoPCState(t *testing.T) (*TwoPCState, map[string]struct{}) { + t.Helper() + + chainA := []byte{0x01} + chainB := []byte{0x02} + + xtReq := buildXTRequest(chainA, chainB) + xtID, err := xtReq.XtID() + require.NoError(t, err) + + chains := map[string]struct{}{ + ChainKeyBytes(chainA): {}, + ChainKeyBytes(chainB): {}, + } + + state := NewTwoPCState(xtID, xtReq, chains) + return state, chains +} + +func TestNewTwoPCStateInitializesFields(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + + assert.NotNil(t, state.XTID) + assert.Equal(t, StateUndecided, state.GetDecision()) + assert.Equal(t, len(chains), state.GetParticipantCount()) + assert.Equal(t, 0, state.GetVoteCount()) + assert.NotNil(t, state.XTRequest) + assert.NotNil(t, state.CIRCMessages) + assert.Nil(t, state.Timer) + assert.False(t, state.StartTime.IsZero()) +} + +func TestTwoPCStateAddVote(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + // Get frist chain key + var chain string + for k := range chains { + chain = k + break + } + + added := state.AddVote(chain, true) + assert.True(t, added) + assert.Equal(t, 1, state.GetVoteCount()) + + readded := state.AddVote(chain, false) + assert.False(t, readded) + + votes := state.GetVotes() + assert.Equal(t, true, votes[chain]) +} + +func TestTwoPCStateDecisionFlow(t *testing.T) { + t.Parallel() + + state, _ := newTestTwoPCState(t) + + assert.False(t, state.IsComplete()) + assert.Equal(t, StateUndecided, state.GetDecision()) + + state.SetDecision(StateCommit) + + assert.True(t, state.IsComplete()) + assert.Equal(t, StateCommit, state.GetDecision()) +} + +func TestTwoPCStateGetVotesReturnsCopy(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + + for chain := range chains { + require.True(t, state.AddVote(chain, true)) + } + + clone := state.GetVotes() + for k := range clone { + clone[k] = false + } + + original := state.GetVotes() + for chain := range chains { + assert.True(t, original[chain]) + } +} + +func TestTwoPCStateGetDurationIncreases(t *testing.T) { + t.Parallel() + + state, _ := newTestTwoPCState(t) + initial := state.GetDuration() + assert.GreaterOrEqual(t, int64(initial), int64(0)) + + time.Sleep(5 * time.Millisecond) + + later := state.GetDuration() + assert.Greater(t, later, initial) +} + +func TestTwoPCState2VotesDoesNotMeanComplete(t *testing.T) { + t.Parallel() + + state, chains := newTestTwoPCState(t) + + count := 0 + for chain := range chains { + require.True(t, state.AddVote(chain, true)) + count++ + if count < state.GetParticipantCount() { + assert.False(t, state.IsComplete()) + } + } + + assert.Equal(t, state.GetParticipantCount(), state.GetVoteCount()) + assert.False(t, state.IsComplete(), "adding votes should not set completion") + assert.Equal(t, StateUndecided, state.GetDecision()) // decision should remain undecided +} diff --git a/internal/rollup-shared-publisher/x/consensus/types.go b/internal/rollup-shared-publisher/x/consensus/types.go index 49a8782d26..1146b02cd8 100644 --- a/internal/rollup-shared-publisher/x/consensus/types.go +++ b/internal/rollup-shared-publisher/x/consensus/types.go @@ -59,6 +59,9 @@ func (s DecisionState) String() string { } // TwoPCState holds state for a single cross-chain transaction +// Important: it doesn't automatically set the decision state based on votes; +// that logic must be handled externally to allow for custom decision policies. +// TODO: we should add the decision logic here as well, making it a complete 2PC state manager. type TwoPCState struct { mu sync.RWMutex XTID *pb.XtID diff --git a/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go b/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go index a006e1d805..3806075328 100644 --- a/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go +++ b/internal/rollup-shared-publisher/x/superblock/sequencer/bootstrap/bootstrap.go @@ -77,7 +77,6 @@ func Setup(ctx context.Context, cfg Config) (*Runtime, error) { nodeID := fmt.Sprintf("sequencer-%d", time.Now().UnixNano()) c := consensus.DefaultConfig(nodeID) c.Role = consensus.Follower - c.IsLeader = false c.Timeout = time.Minute base = consensus.New(log, c) } diff --git a/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go b/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go index f6e0fe217b..b8743a66a2 100644 --- a/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go +++ b/internal/rollup-shared-publisher/x/superblock/sequencer/coordinator.go @@ -363,6 +363,8 @@ func (sc *SequencerCoordinator) handleStartSC( if lastSeq, ok := sc.scpIntegration.GetLastDecidedSequenceNumber(); ok { requiredSeq = lastSeq + 1 } + // TODO startSC.XtSequenceNumber doesn't need to be prev+1 + // check should be: startSC.XtSequenceNumber > next (= last+1) if startSC.XtSequenceNumber != requiredSeq { sc.log.Warn(). Uint64("got_seq", startSC.XtSequenceNumber). @@ -395,6 +397,9 @@ func (sc *SequencerCoordinator) handleStartSC( // Extract our transactions myTxs := sc.extractMyTransactions(startSC.XtRequest) + // TODO: set default vote result to be false + // if len(myTxs) == 0, set to true + // but if sc.callbacks.SimulateAndVote != nil, set to false var voteResult = true if sc.callbacks.SimulateAndVote != nil && len(myTxs) > 0 { From d4e990e1e8f66e480ba3438b84628c52c2e69d2b Mon Sep 17 00:00:00 2001 From: MatheusFranco99 <48058141+MatheusFranco99@users.noreply.github.com> Date: Fri, 24 Oct 2025 08:47:04 +0100 Subject: [PATCH 4/5] Add modules documentation --- .../x/superblock/protocol/README.md | 62 ++++++++++++ .../x/superblock/slot/README.md | 99 +++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 internal/rollup-shared-publisher/x/superblock/protocol/README.md create mode 100644 internal/rollup-shared-publisher/x/superblock/slot/README.md diff --git a/internal/rollup-shared-publisher/x/superblock/protocol/README.md b/internal/rollup-shared-publisher/x/superblock/protocol/README.md new file mode 100644 index 0000000000..9b35f3818b --- /dev/null +++ b/internal/rollup-shared-publisher/x/superblock/protocol/README.md @@ -0,0 +1,62 @@ +# Superblock Protocol Module + +This package hands the interface for SBCP, with messages with callbacks and validation functions. + +## Documentation + +```mermaid +classDiagram + direction LR + + class handler { + -messageHandler: MessageHandler + -validator: Validator + +Handle(ctx, from, msg) error + +CanHandle(msg) bool + +GetProtocolName() string + } + + class MessageHandler { + <> + +HandleStartSlot(ctx, from, StartSlot) error + +HandleRequestSeal(ctx, from, RequestSeal) error + +HandleL2Block(ctx, from, L2Block) error + +HandleStartSC(ctx, from, StartSC) error + +HandleRollBackAndStartSlot(ctx, from, RollBackAndStartSlot) error + } + + class Validator { + +ValidateStartSlot(StartSlot) error + +ValidateRequestSeal(RequestSeal) error + +ValidateL2Block(L2Block) error + +ValidateStartSC(StartSC) error + +ValidateRollBackAndStartSlot(RollBackAndStartSlot) error + } + + class MessageType { + <> + MsgStartSlot + MsgRequestSeal + MsgL2Block + MsgStartSC + MsgRollBackAndStartSlot + } + + + handler --> MessageHandler : Processes with + handler --> Validator : Validates with + MessageType --> handler : Msgs sent to + +``` + +### MessageType (types.go) +Enum describing of the SBCP message types (start slot, request seal, L2 block submission, start SC, and rollback). + +### Validator (interface and `basicValidator` implementation) +Validation functions of SBCP messages. + +### MessageHandler (interface) +Holds callbacks for each SBCP message. + +### Handler (interface and `handler` implementation) +Holds a validator and a message handler. Receives messages by validating and then processing them. diff --git a/internal/rollup-shared-publisher/x/superblock/slot/README.md b/internal/rollup-shared-publisher/x/superblock/slot/README.md new file mode 100644 index 0000000000..03ac9e9b40 --- /dev/null +++ b/internal/rollup-shared-publisher/x/superblock/slot/README.md @@ -0,0 +1,99 @@ +# SBCP State Transition (Slot Module) + +> [!WARNING] +> Probably dead code besides `Config`. + +This package models the SBCP state transitions. + +## Documentation + +```mermaid +classDiagram + direction LR + + class Config { + <> + Duration: time.Duration + SealCutover: float64 + GenesisTime: time.Time + } + + class Manager { + -genesisTime: time.Time + -slotDuration: time.Duration + -sealCutoverFraction: float64 + +GetCurrentSlot() uint64 + +GetSlotStartTime(slot) time.Time + +GetSlotProgress() float64 + +IsSlotSealTime() bool + +WaitForNextSlot(ctx) error + +GetSealTime(slot) time.Time + +GetSlotEndTime(slot) time.Time + } + + class State { + <> + StateStarting + StateFree + StateLocked + StateSealing + } + + class StateMachine { + -currentState: State + -currentSlot: uint64 + -slotManager: Manager + -receivedL2Blocks: map[string]pb.L2Block + -scpInstances: map[string]SCPInstance + -l2BlockRequests: map[string]pb.L2BlockRequest + -lastHeads: map[string]pb.L2Block + -stateChangeCallbacks: map[State][]StateChangeCallback + -transitionHistory: []StateTransition + +BeginSlot(...) + +StartSCP(...) + +ProcessSCPDecision(...) + +RequestSeal(...) + +ReceiveL2Block(...) + +RegisterStateChangeCallback(state, cb) + +TransitionTo(newState, reason) + } + + StateMachine --> State : Upon calls, transitions through + StateMachine --> Manager : Holds, but unused + +``` + +### Config +Defines slot timing parameters: `Duration`, `SealCutover`, and optional `GenesisTime`. Default sets slot to 6 seconds. + +### Manager +Holds slot timings utils. +Namely, it reports the current slot, +time remaining until seal or its end, +provides a function for waiting for the next slot, +and provides other deterministic timestamps based on genesis. + +### State +Enumerates the lifecycle states: `StateStarting`, `StateFree`, `StateLocked`, and `StateSealing` + +### StateChangeCallback +Observers interested in state transitions. +Whenever the state changes to `S`, the callbacks associated to `S`are invoked. + +### StateTransition (state_machine.go) +Historical record describing each transition, including timestamp and human-readable reason. The state machine maintains these entries for diagnostics. + +### StateMachine + +Manages the state progress throughout the slot. Holds funcions such as: +- `BeginSlot`: resetting the state for the next slot. +- `StartSCP`: updating the state with the new instance, if free. +- `ProcessSCPDecision`: switching to the `StateLocked` state. +- `RequestSeal`: sets it to sealing state. +- `ReceiveL2Blocks`: updates local state with new blocks. + +Also holds a general `TransitionTo` function, which triggers the callbacks for that state, for external callers to invoke. + + +### SCPInstance +Metadata about an SCP instanced with XT identifier, slot, participants, vote map, and decision. From d0aad5c288b180183e7b701af2da30184de48119 Mon Sep 17 00:00:00 2001 From: Julien Harbulot Date: Mon, 3 Nov 2025 09:28:36 +0400 Subject: [PATCH 5/5] Revert timeout to default value --- .../rollup-shared-publisher/x/consensus/interfaces.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/rollup-shared-publisher/x/consensus/interfaces.go b/internal/rollup-shared-publisher/x/consensus/interfaces.go index e9bf604672..a77d769db1 100644 --- a/internal/rollup-shared-publisher/x/consensus/interfaces.go +++ b/internal/rollup-shared-publisher/x/consensus/interfaces.go @@ -57,9 +57,9 @@ type Config struct { // DefaultConfig returns sensible defaults func DefaultConfig(nodeID string) Config { - return Config{ - NodeID: nodeID, - Timeout: 4 * time.Second, - Role: Leader, - } + return Config{ + NodeID: nodeID, + Timeout: time.Minute, + Role: Leader, + } }