Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions internal/rollup-shared-publisher/x/consensus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Consensus Module

This package contains the consensus coordinator for the SCP.
It includes:
- `TwoPCState` for holding votes and decision state (NOTE: it, however, doesn't perform the 2pc decision logic, i.e. the external caller is responsible for setting the decision).
- `ProtocolHandler`: for receiving messages and routing them to the appropriate coordinator call.
- `Coordinator`: for routing calls to the callback manager (`startFn`, `voteFn`, `decisionFn`, `blockFn`) and managing the 2PC logic.
- `CallbackManager`: for managing the callback functions and invoking them with timeouts.

## Documentation

```mermaid
classDiagram
direction LR

class ProtocolHandler {
Handle(ctx, from, msg)
}
class Coordinator {
StartTransaction(...)
RecordVote(...)
RecordDecision(...)
RecordCIRCMessage(...)
ConsumeCIRCMessage(...)
OnBlockCommitted(...)
OnL2BlockCommitted(...)
}
class CallbackManager {
startFn
voteFn
decisionFn
blockFn
}
class StateManager {
AddState(...)
GetState(...)
RemoveState(...)
}
class TwoPCState {
XTID
Decision
Votes
CIRCMessages
}

ProtocolHandler --> Coordinator : invokes fn by msg
Coordinator --> CallbackManager : routes callbacks
Coordinator --> StateManager : manage 2PC logic
StateManager o--> TwoPCState : owns instances

```

### Role
Identifies whether a coordinator instance runs as a `Follower` or `Leader`.

### DecisionState
`DecisionState` tracks the lifecycle of a 2PC transaction: `StateUndecided`, `StateCommit`, `StateAbort`.

### MessageType
`MessageType` enumerates the protobuf payloads understood by the protocol handler: `MsgXTRequest`, `MsgVote`, `MsgDecided`, and `MsgCIRCMessage`.

### Config
`Config` bundles static settings for a coordinator:
- a unique `NodeID`
- consensus `Timeout`
- `Role`

### TwoPCState
`TwoPCState` carries all data for a single cross-rollup transaction:
- `XTID`
- `Decision`
- participating chains
- collected votes (chain id -> bool)
- optional timeout `Timer`
- start timestamp
- original xt request payload
- queued CIRC messages

> [!NOTE]
> While the structure stores votes and the decision state, it doesn't implement the 2PC logic, i.e.:
> i) timeout triggers Abort decision
> ii) any vote false triggers Abort decision
> iii) all votes true triggers Commit decision
> Instead, the external caller is responsible for setting the decision via `SetDecision`.

### ProtocolHandler
`ProtocolHandler` forwards protocol messages to the appropriate coordinator method.

### Coordinator
`Coordinator` is invoked by the `ProtocolHandler.
It handles transaction start, vote/decision recording, CIRC buffering, block notifications, and lifecycle management.

It holds a `StateManager` for managing multiple `TwoPCState` instances while managing the 2PC logic, and
routes calls to the callback manager (`startFn`, `voteFn`, `decisionFn`, `blockFn`).

### StateManager
`StateManager` manages several `TwoPCState`.

### CallbackManager
`CallbackManager` stores the registered callbacks (`StartFn`, `VoteFn`, `DecisionFn`, and `BlockFn`)
and enforces timeouts when invoking them.

### Chain key helpers
Normalize chain identifiers into the hex-encoded keys used in state maps and logs.

## Tests

For running tests, use
```bash
go test -v ./...
```

- coordinator_test.go
- `TestCoordinatorStartTransactionRegistersStateAndCallback`
- `TestCoordinatorRecordVoteCommitTriggersDecision`
- `TestCoordinatorRecordVoteAbort`
- `TestCoordinatorRecordVoteRejectsUnknownParticipant`
- `TestCoordinatorRecordCIRCMessageAndConsume`
- `TestCoordinatorRecordDecisionFollower`
- `TestCoordinatorRecordDecisionRejectedForLeader`
- keys_test.go
- `TestChainKeyBytes`
- `TestChainKeyUint64`
- `TestChainKeyConsistency`
- `TestChainKeyZeroSpecialCase`
- `TestChainKeyRoundTrip`
- protocol_handler_test.go
- `TestProtocolHandlerHandleXTRequest`
- `TestProtocolHandlerHandleVote`
- `TestProtocolHandlerHandleDecided`
- `TestProtocolHandlerHandleCIRCMessage`
- `TestProtocolHandlerHandleUnknownMessage`
- `TestProtocolHandlerCanHandleAndName`
- state_manager_test.go
- `TestStateManagerAddGetRemove`
- `TestStateManagerCleanupRemovesCompletedStates`
- twopc_state_test.go
- `TestNewTwoPCStateInitializesFields`
- `TestTwoPCStateAddVote`
- `TestTwoPCStateDecisionFlow`
- `TestTwoPCStateGetVotesReturnsCopy`
- `TestTwoPCStateGetDurationIncreases`
- `TestTwoPCState2VotesDoesNotMeanComplete`
6 changes: 6 additions & 0 deletions internal/rollup-shared-publisher/x/consensus/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func (c *coordinator) StartTransaction(ctx context.Context, from string, xtReq *
}

// Timeout only for leader; followers rely on the SP decision
// TODO: followers (sequencers) should also have timeouts, with the logic that:
// if a vote hasn't yet been sent, send it with vote = false
if c.config.Role == Leader {
state.Timer = time.AfterFunc(c.config.Timeout, func() {
c.handleTimeout(xtID)
Expand Down Expand Up @@ -208,6 +210,10 @@ func (c *coordinator) RecordVote(xtID *pb.XtID, chainID string, vote bool) (Deci
return c.handleCommit(xtID, state), nil
}
} else {
// TODO: remove this logic. A sequencer receiving a vote from another sequencer doesn't imply
// that is should broadcast its own vote.
// Or is this function used by the sequencer to ONLY record its own vote?

// Follower broadcasts vote
c.callbackMgr.InvokeVote(xtID, vote, voteLatency)
}
Expand Down
231 changes: 231 additions & 0 deletions internal/rollup-shared-publisher/x/consensus/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package consensus

import (
"context"
"testing"
"time"

pb "github.com/ethereum/go-ethereum/internal/rollup-shared-publisher/proto/rollup/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCoordinatorStartTransactionRegistersStateAndCallback(t *testing.T) {
coord := newTestCoordinator(t, Leader)

chainA := []byte{0x01}
chainB := []byte{0x02}
xtReq := buildXTRequest(chainA, chainB)

startCalled := make(chan struct{}, 1)
coord.SetStartCallback(func(ctx context.Context, from string, req *pb.XTRequest) error {
startCalled <- struct{}{}
return nil
})

require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq))

select {
case <-startCalled:
case <-time.After(time.Second):
t.Fatal("expected start callback invocation")
}

xtID, err := xtReq.XtID()
require.NoError(t, err)

state, ok := coord.GetState(xtID)
require.True(t, ok)

assert.Equal(t, StateUndecided, state.GetDecision())
assert.Len(t, state.ParticipatingChains, 2)
assert.NotNil(t, state.Timer, "leader should arm timeout timer")
}

func TestCoordinatorRecordVoteCommitTriggersDecision(t *testing.T) {
coord := newTestCoordinator(t, Leader)

chainA := []byte{0x01}
chainB := []byte{0x02}
chainAKey := ChainKeyBytes(chainA)
chainBKey := ChainKeyBytes(chainB)

xtReq := buildXTRequest(chainA, chainB)
require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq))

xtID, err := xtReq.XtID()
require.NoError(t, err)

decisionCh := make(chan bool, 1)
coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error {
if id.Hex() == xtID.Hex() {
decisionCh <- decision
}
return nil
})

state, ok := coord.GetState(xtID)
require.True(t, ok)

result, err := coord.RecordVote(xtID, chainAKey, true)
require.NoError(t, err)
assert.Equal(t, StateUndecided, result)

result, err = coord.RecordVote(xtID, chainBKey, true)
require.NoError(t, err)
assert.Equal(t, StateCommit, result)

select {
case decision := <-decisionCh:
assert.True(t, decision)
case <-time.After(time.Second):
t.Fatal("expected decision callback for commit")
}

assert.Equal(t, StateCommit, state.GetDecision())
assert.Len(t, state.Votes, 2)
}

func TestCoordinatorRecordVoteAbort(t *testing.T) {
coord := newTestCoordinator(t, Leader)

chainA := []byte{0x01}
chainB := []byte{0x02}
chainAKey := ChainKeyBytes(chainA)

xtReq := buildXTRequest(chainA, chainB)
require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq))

xtID, err := xtReq.XtID()
require.NoError(t, err)

decisionCh := make(chan bool, 1)
coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error {
if id.Hex() == xtID.Hex() {
decisionCh <- decision
}
return nil
})

state, ok := coord.GetState(xtID)
require.True(t, ok)

result, err := coord.RecordVote(xtID, chainAKey, false)
require.NoError(t, err)
assert.Equal(t, StateAbort, result)

select {
case decision := <-decisionCh:
assert.False(t, decision)
case <-time.After(time.Second):
t.Fatal("expected decision callback for abort")
}

assert.Equal(t, StateAbort, state.GetDecision())
}

func TestCoordinatorRecordVoteRejectsUnknownParticipant(t *testing.T) {
coord := newTestCoordinator(t, Leader)

chainA := []byte{0x01}
chainB := []byte{0x02}
xtReq := buildXTRequest(chainA, chainB)
require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq))

xtID, err := xtReq.XtID()
require.NoError(t, err)

_, err = coord.RecordVote(xtID, "ff", true)
require.Error(t, err)
assert.Contains(t, err.Error(), "not participating")
}

func TestCoordinatorRecordCIRCMessageAndConsume(t *testing.T) {
coord := newTestCoordinator(t, Leader)

chainA := []byte{0x01}
chainB := []byte{0x02}
xtReq := buildXTRequest(chainA, chainB)
require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq))

xtID, err := xtReq.XtID()
require.NoError(t, err)

circ := &pb.CIRCMessage{
SourceChain: append([]byte(nil), chainB...),
DestinationChain: append([]byte(nil), chainA...),
XtId: xtID,
Data: [][]byte{[]byte("payload")},
}

require.NoError(t, coord.RecordCIRCMessage(circ))

msg, err := coord.ConsumeCIRCMessage(xtID, ChainKeyBytes(chainB))
require.NoError(t, err)
assert.Equal(t, circ, msg)

_, err = coord.ConsumeCIRCMessage(xtID, ChainKeyBytes(chainB))
require.Error(t, err)
assert.Contains(t, err.Error(), "no messages available")

err = coord.RecordCIRCMessage(&pb.CIRCMessage{
SourceChain: []byte{0xff},
XtId: xtID,
Data: [][]byte{[]byte("payload")},
})
require.Error(t, err)
assert.Contains(t, err.Error(), "not participating")
}

func TestCoordinatorRecordDecisionFollower(t *testing.T) {
coord := newTestCoordinator(t, Follower)

chainA := []byte{0x01}
chainB := []byte{0x02}
xtReq := buildXTRequest(chainA, chainB)
require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq))

xtID, err := xtReq.XtID()
require.NoError(t, err)

decisionCh := make(chan bool, 1)
coord.SetDecisionCallback(func(ctx context.Context, id *pb.XtID, decision bool) error {
if id.Hex() == xtID.Hex() {
decisionCh <- decision
}
return nil
})

require.NoError(t, coord.RecordDecision(xtID, true))

select {
case decision := <-decisionCh:
assert.True(t, decision)
case <-time.After(time.Second):
t.Fatal("expected decision callback for follower")
}

state, ok := coord.GetState(xtID)
require.True(t, ok)
assert.Equal(t, StateCommit, state.GetDecision())
}

func TestCoordinatorRecordDecisionRejectedForLeader(t *testing.T) {
coord := newTestCoordinator(t, Leader)

chainA := []byte{0x01}
chainB := []byte{0x02}
xtReq := buildXTRequest(chainA, chainB)
require.NoError(t, coord.StartTransaction(context.Background(), "sp", xtReq))

xtID, err := xtReq.XtID()
require.NoError(t, err)

err = coord.RecordDecision(xtID, true)
require.Error(t, err)
assert.Contains(t, err.Error(), "only followers can record decisions")

state, ok := coord.GetState(xtID)
require.True(t, ok)
assert.Equal(t, StateUndecided, state.GetDecision())
}
Loading