Skip to content

Commit d9cef3f

Browse files
committed
Review feedback
1 parent 4a3f4f7 commit d9cef3f

File tree

10 files changed

+42
-38
lines changed

10 files changed

+42
-38
lines changed

block/internal/common/raft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
// RaftNode interface for raft consensus integration
1010
type RaftNode interface {
1111
IsLeader() bool
12-
GetState() *raft.RaftBlockState
12+
GetState() raft.RaftBlockState
1313

1414
Broadcast(ctx context.Context, state *raft.RaftBlockState) error
1515

block/internal/executing/executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func (e *Executor) initializeState() error {
215215

216216
if e.raftNode != nil {
217217
// ensure node is fully synced before producing any blocks
218-
if raftState := e.raftNode.GetState(); raftState != nil && raftState.Height != state.LastBlockHeight {
218+
if raftState := e.raftNode.GetState(); raftState.Height != 0 && raftState.Height != state.LastBlockHeight {
219219
return fmt.Errorf("invalid state: node is not synced with the chain: raft %d != %d state", raftState.Height, state.LastBlockHeight)
220220
}
221221
}
@@ -591,7 +591,7 @@ func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *Ba
591591
}
592592

593593
for i, tx := range batchData.Transactions {
594-
data.Txs[i] = types.Tx(tx)
594+
data.Txs[i] = tx
595595
}
596596

597597
// Set data hash

node/failover.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,12 @@ func (f *failoverState) Run(ctx context.Context) (multiErr error) {
200200
}
201201
}()
202202

203-
defer func() {
204-
if err := f.bc.Stop(); err != nil && !errors.Is(err, context.Canceled) {
205-
multiErr = errors.Join(multiErr, fmt.Errorf("stopping block components: %w", err))
206-
}
207-
}()
208-
209203
go func() {
204+
defer func() {
205+
if err := f.bc.Stop(); err != nil && !errors.Is(err, context.Canceled) {
206+
multiErr = errors.Join(multiErr, fmt.Errorf("stopping block components: %w", err))
207+
}
208+
}()
210209
if err := f.bc.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
211210
select {
212211
case errChan <- fmt.Errorf("components started with error: %w", err):
@@ -224,8 +223,8 @@ func (f *failoverState) Run(ctx context.Context) (multiErr error) {
224223
return <-errChan
225224
}
226225

227-
func (f *failoverState) IsSynced(s *raft.RaftBlockState) bool {
228-
if s == nil || s.Height == 0 {
226+
func (f *failoverState) IsSynced(s raft.RaftBlockState) bool {
227+
if s.Height == 0 {
229228
return true
230229
}
231230
if f.bc.Syncer != nil {

node/single_sequencer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,5 @@ func TestStartup(t *testing.T) {
5050

5151
// Run the cleanup function from setupTestNodeWithCleanup
5252
cleanup()
53+
time.Sleep(time.Second) // shutdown takes some time to persist caches
5354
}

pkg/raft/election.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync/atomic"
99
"time"
1010

11+
"github.com/hashicorp/raft"
1112
"github.com/rs/zerolog"
1213
)
1314

@@ -18,15 +19,15 @@ var ErrLeadershipLost = fmt.Errorf("leader lock lost")
1819
// IsSynced checks whether the component is synced with the given RaftBlockState.
1920
type Runnable interface {
2021
Run(ctx context.Context) error
21-
IsSynced(*RaftBlockState) bool
22+
IsSynced(RaftBlockState) bool
2223
}
2324

2425
type sourceNode interface {
2526
Config() Config
2627
leaderCh() <-chan bool
2728
leaderID() string
2829
NodeID() string
29-
GetState() *RaftBlockState
30+
GetState() RaftBlockState
3031
leadershipTransfer() error
3132
}
3233

@@ -93,7 +94,7 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error {
9394
time.Sleep(d.node.Config().SendTimeout)
9495
if !runnable.IsSynced(d.node.GetState()) {
9596
d.logger.Info().Msg("became leader, but not synced. Pass on leadership")
96-
if err := d.node.leadershipTransfer(); err != nil {
97+
if err := d.node.leadershipTransfer(); err != nil && !errors.Is(err, raft.ErrNotLeader) {
9798
return err
9899
}
99100
continue

pkg/raft/election_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,11 @@ func TestDynamicLeaderElectionRun(t *testing.T) {
109109
leaderCh := make(chan bool, 2)
110110
m.EXPECT().leaderCh().Return((<-chan bool)(leaderCh))
111111
m.EXPECT().Config().Return(testCfg())
112-
m.EXPECT().GetState().Return(&RaftBlockState{Height: 1})
112+
m.EXPECT().GetState().Return(RaftBlockState{Height: 1})
113113
m.EXPECT().leadershipTransfer().Return(nil)
114114

115115
fStarted := make(chan struct{})
116-
follower := &testRunnable{startedCh: fStarted, isSyncedFn: func(*RaftBlockState) bool { return false }}
116+
follower := &testRunnable{startedCh: fStarted, isSyncedFn: func(RaftBlockState) bool { return false }}
117117
leader := &testRunnable{runFn: func(ctx context.Context) error {
118118
t.Fatal("leader should not be running")
119119
return nil
@@ -148,7 +148,7 @@ func TestDynamicLeaderElectionRun(t *testing.T) {
148148
m.EXPECT().leaderCh().Return((<-chan bool)(leaderCh))
149149
// On leadership change to true, election will sleep SendTimeout, then check sync against state
150150
m.EXPECT().Config().Return(testCfg())
151-
m.EXPECT().GetState().Return(&RaftBlockState{Height: 1})
151+
m.EXPECT().GetState().Return(RaftBlockState{Height: 1})
152152

153153
fStarted := make(chan struct{})
154154
lStarted := make(chan struct{})
@@ -211,7 +211,7 @@ func testCfg() Config {
211211
// These channels are used only by tests to observe state.
212212
type testRunnable struct {
213213
runFn func(ctx context.Context) error
214-
isSyncedFn func(*RaftBlockState) bool
214+
isSyncedFn func(RaftBlockState) bool
215215
startedCh chan struct{}
216216
doneCh chan struct{}
217217
}
@@ -238,7 +238,7 @@ func (t *testRunnable) Run(ctx context.Context) error {
238238
return ctx.Err()
239239
}
240240

241-
func (t *testRunnable) IsSynced(s *RaftBlockState) bool {
241+
func (t *testRunnable) IsSynced(s RaftBlockState) bool {
242242
if t.isSyncedFn != nil {
243243
return t.isSyncedFn(s)
244244
}

pkg/raft/node.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"path/filepath"
1212
"slices"
1313
"strings"
14+
"sync/atomic"
1415
"time"
1516

1617
"github.com/hashicorp/raft"
@@ -47,7 +48,7 @@ type Config struct {
4748
// FSM implements raft.FSM for block state
4849
type FSM struct {
4950
logger zerolog.Logger
50-
state *RaftBlockState
51+
state *atomic.Pointer[RaftBlockState]
5152
applyCh chan<- RaftApplyMsg
5253
}
5354

@@ -63,9 +64,11 @@ func NewNode(cfg *Config, clusterClient clusterClient, logger zerolog.Logger) (*
6364
raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout
6465
raftConfig.LeaderLeaseTimeout = cfg.HeartbeatTimeout / 2
6566

67+
startPointer := new(atomic.Pointer[RaftBlockState])
68+
startPointer.Store(&RaftBlockState{})
6669
fsm := &FSM{
6770
logger: logger.With().Str("component", "raft-fsm").Logger(),
68-
state: &RaftBlockState{},
71+
state: startPointer,
6972
}
7073

7174
logStore, err := raftboltdb.NewBoltStore(filepath.Join(cfg.RaftDir, "raft-log.db"))
@@ -131,6 +134,7 @@ func (n *Node) Start(ctx context.Context) error {
131134
if n.GetState().Height != 0 {
132135
return nil
133136
}
137+
time.Sleep(time.Second / 10)
134138
}
135139
}
136140
}
@@ -158,7 +162,6 @@ func (n *Node) Start(ctx context.Context) error {
158162
}
159163
n.logger.Info().Msg("bootstrapped raft cluster")
160164
return nil
161-
162165
}
163166

164167
func (n *Node) awaitToBeClusterMember(ctx context.Context, nodeID raft.ServerID) error {
@@ -246,8 +249,8 @@ func (n *Node) Broadcast(_ context.Context, state *RaftBlockState) error {
246249
}
247250

248251
// GetState returns the current replicated state
249-
func (n *Node) GetState() *RaftBlockState {
250-
return n.fsm.state
252+
func (n *Node) GetState() RaftBlockState {
253+
return *n.fsm.state.Load()
251254
}
252255

253256
// AddPeer adds a peer to the raft cluster
@@ -304,10 +307,10 @@ func (f *FSM) Apply(log *raft.Log) interface{} {
304307
f.logger.Error().Err(err).Msg("unmarshal block state")
305308
return err
306309
}
307-
if err := f.state.assertValid(state); err != nil {
310+
if err := f.state.Load().assertValid(state); err != nil {
308311
return err
309312
}
310-
f.state = &state
313+
f.state.Store(&state)
311314
f.logger.Debug().Uint64("height", state.Height).Msg("received block state")
312315

313316
if f.applyCh != nil {
@@ -323,7 +326,7 @@ func (f *FSM) Apply(log *raft.Log) interface{} {
323326

324327
// Snapshot implements raft.FSM
325328
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
326-
return &fsmSnapshot{state: f.state}, nil
329+
return &fsmSnapshot{state: *f.state.Load()}, nil
327330
}
328331

329332
// Restore implements raft.FSM
@@ -335,13 +338,13 @@ func (f *FSM) Restore(rc io.ReadCloser) error {
335338
return fmt.Errorf("decode snapshot: %w", err)
336339
}
337340

338-
f.state = &state
341+
f.state.Store(&state)
339342
f.logger.Info().Uint64("height", state.Height).Msg("restored from snapshot")
340343
return nil
341344
}
342345

343346
type fsmSnapshot struct {
344-
state *RaftBlockState
347+
state RaftBlockState
345348
}
346349

347350
func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error {

pkg/raft/node_mock.go

Lines changed: 6 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/raft/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type RaftBlockState struct {
1212
Data []byte
1313
}
1414

15+
// assertValid checks basic constraints but does not ensure that no gaps exist or chain continuity
1516
func (s RaftBlockState) assertValid(next RaftBlockState) error {
1617
if s.Height > next.Height {
1718
return fmt.Errorf("invalid height: %d > %d", s.Height, next.Height)

pkg/rpc/client/raft_cluster.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"net/http"
10+
"time"
1011
)
1112

1213
// RaftClusterClient is simple http based client for raft cluster management
@@ -70,6 +71,6 @@ func (r RaftClusterClient) broadcast(ctx context.Context, path string, obj any)
7071
func NewRaftClusterClient(peers ...string) (RaftClusterClient, error) {
7172
return RaftClusterClient{
7273
peers: peers,
73-
httpClient: &http.Client{},
74+
httpClient: &http.Client{Timeout: 3 * time.Second},
7475
}, nil
7576
}

0 commit comments

Comments
 (0)