From a3a73f0507ae3def7e9ed42ed5193bc173ff88ec Mon Sep 17 00:00:00 2001 From: theochap Date: Thu, 28 Aug 2025 11:02:59 -0400 Subject: [PATCH 1/6] feat(node/test): improve connection drop tests --- tests/node/common/engine_test.go | 2 +- tests/node/common/sync_ws_test.go | 10 +- tests/node/common/ws.go | 128 --------------------- tests/node/restart/conn_drop_test.go | 68 +++++++++-- tests/node/restart/restart_test.go | 6 +- tests/node/utils/ws.go | 161 +++++++++++++++++++++++++++ 6 files changed, 224 insertions(+), 151 deletions(-) delete mode 100644 tests/node/common/ws.go create mode 100644 tests/node/utils/ws.go diff --git a/tests/node/common/engine_test.go b/tests/node/common/engine_test.go index 3d4874d9f3..5e109fdc0b 100644 --- a/tests/node/common/engine_test.go +++ b/tests/node/common/engine_test.go @@ -36,7 +36,7 @@ func TestEngine(gt *testing.T) { done <- struct{}{} }() - queue <- GetDevWS(t, node, "engine_queue_size", done) + queue <- node_utils.GetDevWS(t, node, "engine_queue_size", done) }() q := <-queue diff --git a/tests/node/common/sync_ws_test.go b/tests/node/common/sync_ws_test.go index 943f42bf3e..d97c7fb098 100644 --- a/tests/node/common/sync_ws_test.go +++ b/tests/node/common/sync_ws_test.go @@ -34,9 +34,9 @@ func TestSyncUnsafeBecomesSafe(gt *testing.T) { go func(node *dsl.L2CLNode) { defer wg.Done() - unsafeBlocks := GetKonaWs(t, node, "unsafe_head", time.After(SECS_WAIT_FOR_UNSAFE_HEAD*time.Second)) + unsafeBlocks := node_utils.GetKonaWs(t, node, "unsafe_head", time.After(SECS_WAIT_FOR_UNSAFE_HEAD*time.Second)) - safeBlocks := GetKonaWs(t, node, "safe_head", time.After(SECS_WAIT_FOR_SAFE_HEAD*time.Second)) + safeBlocks := node_utils.GetKonaWs(t, node, "safe_head", time.After(SECS_WAIT_FOR_SAFE_HEAD*time.Second)) require.GreaterOrEqual(t, len(unsafeBlocks), 1, "we didn't receive enough unsafe gossip blocks!") require.GreaterOrEqual(t, len(safeBlocks), 1, "we didn't receive enough safe gossip blocks!") @@ -80,7 +80,7 @@ func TestSyncUnsafe(gt *testing.T) { go func(node *dsl.L2CLNode) { defer wg.Done() - output := GetKonaWs(t, node, "unsafe_head", time.After(10*time.Second)) + output := node_utils.GetKonaWs(t, node, "unsafe_head", time.After(10*time.Second)) // For each block, we check that the block is actually in the chain of the other nodes. // That should always be the case unless there is a reorg or a long sync. @@ -125,7 +125,7 @@ func TestSyncSafe(gt *testing.T) { defer wg.Done() clName := node.Escape().ID().Key() - output := GetKonaWs(t, node, "safe_head", time.After(10*time.Second)) + output := node_utils.GetKonaWs(t, node, "safe_head", time.After(10*time.Second)) // For each block, we check that the block is actually in the chain of the other nodes. // That should always be the case unless there is a reorg or a long sync. @@ -170,7 +170,7 @@ func TestSyncFinalized(gt *testing.T) { defer wg.Done() clName := node.Escape().ID().Key() - output := GetKonaWs(t, node, "finalized_head", time.After(4*time.Minute)) + output := node_utils.GetKonaWs(t, node, "finalized_head", time.After(4*time.Minute)) // We should check that we received at least 2 finalized blocks within 4 minutes! require.Greater(t, len(output), 1, "we didn't receive enough finalized gossip blocks!") diff --git a/tests/node/common/ws.go b/tests/node/common/ws.go deleted file mode 100644 index 2e42267acc..0000000000 --- a/tests/node/common/ws.go +++ /dev/null @@ -1,128 +0,0 @@ -package node - -import ( - "encoding/json" - "strings" - "time" - - "github.com/ethereum-optimism/optimism/op-devstack/devtest" - "github.com/ethereum-optimism/optimism/op-devstack/dsl" - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/require" -) - -// --- Generic RPC request/response types ------------------------------------- -const ( - DEFAULT_TIMEOUT = 10 * time.Second -) - -type rpcRequest struct { - JSONRPC string `json:"jsonrpc"` - Method string `json:"method"` - Params interface{} `json:"params"` - ID uint64 `json:"id"` -} - -type rpcResponse struct { - JSONRPC string `json:"jsonrpc"` - ID uint64 `json:"id"` - Result json.RawMessage `json:"result,omitempty"` - Error *rpcError `json:"error,omitempty"` -} - -type rpcError struct { - Code int `json:"code"` - Message string `json:"message"` -} - -// push { "jsonrpc":"2.0", "method":"time", "params":{ "subscription":"0x…", "result":"…" } } -type push[Out any] struct { - Method string `json:"method"` - Params struct { - SubID uint64 `json:"subscription"` - Result Out `json:"result"` - } `json:"params"` -} - -// --------------------------------------------------------------------------- - -func GetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) []Out { - userRPC := node.Escape().UserRPC() - wsRPC := strings.Replace(userRPC, "http", "ws", 1) - - conn, _, err := websocket.DefaultDialer.DialContext(t.Ctx(), wsRPC, nil) - require.NoError(t, err, "dial: %v", err) - defer conn.Close() - - // 1. send the *_subscribe request - require.NoError(t, conn.WriteJSON(rpcRequest{ - JSONRPC: "2.0", - ID: 1, - Method: prefix + "_" + "subscribe_" + method, - Params: nil, - }), "subscribe: %v", err) - - // 2. read the ack – blocking read just once - var a rpcResponse - require.NoError(t, conn.ReadJSON(&a), "ack: %v", err) - t.Log("subscribed to websocket - id=", string(a.Result)) - - output := make([]Out, 0) - - // Function to handle JSON reading with error channel - readJSON := func(conn *websocket.Conn, msg *json.RawMessage) <-chan error { - errChan := make(chan error, 1) // Buffered channel to avoid goroutine leak - - go func() { - errChan <- conn.ReadJSON(msg) - close(errChan) - }() - - return errChan - } - - var msg json.RawMessage - - // 3. start a goroutine that keeps reading pushes -outer_loop: - for { - select { - case <-runUntil: - // Clean‑up if necessary, then exit - t.Log(method, "subscriber", "stopping: runUntil condition met") - break outer_loop - case <-t.Ctx().Done(): - // Clean‑up if necessary, then exit - t.Log("unsafe head subscriber", "stopping: context cancelled") - break outer_loop - case err := <-readJSON(conn, &msg): - require.NoError(t, err, "read: %v", err) - - var p push[Out] - require.NoError(t, json.Unmarshal(msg, &p), "decode: %v", err) - - t.Log(wsRPC, method, "received websocket message", p.Params.Result) - output = append(output, p.Params.Result) - } - } - - require.NoError(t, conn.WriteJSON(rpcRequest{ - JSONRPC: "2.0", - ID: 2, - Method: prefix + "_unsubscribe_" + method, - Params: []any{a.Result}, - }), "unsubscribe: %v", err) - - t.Log("gracefully closed websocket connection") - - return output -} - -func GetKonaWs[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []eth.L2BlockRef { - return GetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) -} - -func GetDevWS[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []uint64 { - return GetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) -} diff --git a/tests/node/restart/conn_drop_test.go b/tests/node/restart/conn_drop_test.go index 8aabed00cb..ec76e25173 100644 --- a/tests/node/restart/conn_drop_test.go +++ b/tests/node/restart/conn_drop_test.go @@ -32,17 +32,53 @@ func TestConnDropSync(gt *testing.T) { node.DisconnectPeer(&sequencer) // Ensure that the node is no longer connected to the sequencer + t.Logf("node %s is disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) seqPeers := sequencer.Peers() for _, peer := range seqPeers.Peers { t.Require().NotEqual(peer.PeerID, node.PeerInfo().PeerID, "expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) } + peers := node.Peers() + for _, peer := range peers.Peers { + t.Require().NotEqual(peer.PeerID, sequencer.PeerInfo().PeerID, "expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) + } + + currentUnsafeHead := node.ChainSyncStatus(node.ChainID(), types.LocalUnsafe) + + endSignal := make(chan struct{}) + + safeHeads, _ := node_utils.GetKonaWsAsync(t, &node, "safe_head", endSignal) + unsafeHeads, _ := node_utils.GetKonaWsAsync(t, &node, "unsafe_head", endSignal) + + // Ensures that.... + // - the node's safe head is advancing and eventually catches up with the unsafe head + // - the node's unsafe head is NOT advancing during this time + check := func() error { + defer close(endSignal) + + outer_loop: + for { + select { + case safeHead := <-safeHeads: + t.Logf("node %s safe head is advancing", clName) + if safeHead.Number >= currentUnsafeHead.Number { + t.Logf("node %s safe head caught up with unsafe head", clName) + break outer_loop + } + case unsafeHead := <-unsafeHeads: + return fmt.Errorf("node %s unsafe head is advancing: %d", clName, unsafeHead.Number) + } + } + + return nil + } + // Check that... // - the node's safe head is advancing // - the node's unsafe head is advancing (through consolidation) // - the node's safe head's number is catching up with the unsafe head's number // - the node's unsafe head is strictly lagging behind the sequencer's unsafe head - postDisconnectCheckFuns = append(postDisconnectCheckFuns, node.AdvancedFn(types.LocalSafe, 50, 200), node.AdvancedFn(types.LocalUnsafe, 50, 200), SafeUnsafeMatchedFn(t, node, 50)) + postDisconnectCheckFuns = append(postDisconnectCheckFuns, node.AdvancedFn(types.LocalSafe, 50, 200), node.AdvancedFn(types.LocalUnsafe, 50, 200), check) } postDisconnectCheckFuns = append(postDisconnectCheckFuns, sequencer.AdvancedFn(types.LocalUnsafe, 50, 200)) @@ -72,28 +108,36 @@ func TestConnDropSync(gt *testing.T) { t.Require().True(found, "expected node %s to be connected to reference node %s", clName, sequencer.Escape().ID().Key()) // Check that the node is resyncing with the unsafe head network - postReconnectCheckFuns = append(postReconnectCheckFuns, node.MatchedFn(&sequencer, types.LocalSafe, 50), node.MatchedFn(&sequencer, types.LocalUnsafe, 50)) + postReconnectCheckFuns = append(postReconnectCheckFuns, node.MatchedFn(&sequencer, types.LocalSafe, 50), node.AdvancedFn(types.LocalUnsafe, 50, 100), MatchedWithinRange(t, node, sequencer, 5, types.LocalUnsafe, 100)) } dsl.CheckAll(t, postReconnectCheckFuns...) } -// MatchedFn returns a lambda that checks the baseNode head with given safety level is matched with the refNode chain sync status provider -// Composable with other lambdas to wait in parallel -func SafeUnsafeMatchedFn(t devtest.T, clNode dsl.L2CLNode, attempts int) dsl.CheckFunc { +func MatchedWithinRange(t devtest.T, baseNode, refNode dsl.L2CLNode, delta uint64, lvl types.SafetyLevel, attempts int) dsl.CheckFunc { logger := t.Logger() - chainID := clNode.ChainID() + chainID := baseNode.ChainID() + return func() error { + base := baseNode.ChainSyncStatus(chainID, lvl) + ref := refNode.ChainSyncStatus(chainID, lvl) + logger.Info("Expecting node to match with reference", "base", base.Number, "ref", ref.Number) return retry.Do0(t.Ctx(), attempts, &retry.FixedStrategy{Dur: 2 * time.Second}, func() error { - base := clNode.ChainSyncStatus(chainID, types.LocalSafe) - ref := clNode.ChainSyncStatus(chainID, types.LocalUnsafe) - if ref.Hash == base.Hash && ref.Number == base.Number { - logger.Info("Node safe and unsafe heads matched", "ref", ref.Number, "base", base.Number) + base = baseNode.ChainSyncStatus(chainID, lvl) + ref = refNode.ChainSyncStatus(chainID, lvl) + if ref.Number <= base.Number+delta || ref.Number >= base.Number-delta { + logger.Info("Node matched", "ref", ref.Number, "base", base.Number, "delta", delta) + block, err := refNode.Escape().RollupAPI().OutputAtBlock(t.Ctx(), ref.Number) + if err != nil { + return err + } + t.Require().Equal(block.BlockRef.Hash, base.Hash, "expected block hash to match") + t.Require().Equal(block.BlockRef.Number, base.Number, "expected block number to match") return nil } - logger.Info("Node safe and unsafe heads not matched", "safe", base.Number, "unsafe", ref.Number, "ref", ref.Hash, "base", base.Hash) - return fmt.Errorf("expected safe and unsafe heads to match") + logger.Info("Node sync status", "base", base.Number, "ref", ref.Number) + return fmt.Errorf("expected head to match: %s", lvl) }) } } diff --git a/tests/node/restart/restart_test.go b/tests/node/restart/restart_test.go index 0ee482102e..6cd28c084e 100644 --- a/tests/node/restart/restart_test.go +++ b/tests/node/restart/restart_test.go @@ -24,7 +24,6 @@ func TestRestartSync(gt *testing.T) { sequencer := sequencerNodes[0] - var preStopCheckFuns []dsl.CheckFunc for _, node := range nodes { t.Logf("testing restarts for node %s", node.Escape().ID().Key()) clName := node.Escape().ID().Key() @@ -45,12 +44,9 @@ func TestRestartSync(gt *testing.T) { var out *eth.SyncStatus err := rpc.CallContext(context.Background(), &out, "opp2p_syncStatus") t.Require().Error(err, "expected node %s to be stopped", clName) - - // Ensure that the sequencer's head is advancing. - preStopCheckFuns = append(preStopCheckFuns, sequencer.AdvancedFn(types.LocalUnsafe, 50, 200)) } - dsl.CheckAll(t, preStopCheckFuns...) + sequencer.Advanced(types.LocalUnsafe, 50, 200) var postStartCheckFuns []dsl.CheckFunc for _, node := range nodes { diff --git a/tests/node/utils/ws.go b/tests/node/utils/ws.go new file mode 100644 index 0000000000..c772968b3b --- /dev/null +++ b/tests/node/utils/ws.go @@ -0,0 +1,161 @@ +package node_utils + +import ( + "encoding/json" + "strings" + "sync" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +// --- Generic RPC request/response types ------------------------------------- + +type rpcRequest struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params"` + ID uint64 `json:"id"` +} + +type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID uint64 `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error *rpcError `json:"error,omitempty"` +} + +type rpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// push { "jsonrpc":"2.0", "method":"time", "params":{ "subscription":"0x…", "result":"…" } } +type push[Out any] struct { + Method string `json:"method"` + Params struct { + SubID uint64 `json:"subscription"` + Result Out `json:"result"` + } `json:"params"` +} + +// --------------------------------------------------------------------------- + +func AsyncGetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) (<-chan Out, *sync.WaitGroup) { + userRPC := node.Escape().UserRPC() + wsRPC := strings.Replace(userRPC, "http", "ws", 1) + + output := make(chan Out) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + conn, _, err := websocket.DefaultDialer.DialContext(t.Ctx(), wsRPC, nil) + require.NoError(t, err, "dial: %v", err) + defer conn.Close() + defer close(output) + + // 1. send the *_subscribe request + require.NoError(t, conn.WriteJSON(rpcRequest{ + JSONRPC: "2.0", + ID: 1, + Method: prefix + "_" + "subscribe_" + method, + Params: nil, + }), "subscribe: %v", err) + + // 2. read the ack – blocking read just once + var a rpcResponse + require.NoError(t, conn.ReadJSON(&a), "ack: %v", err) + t.Log("subscribed to websocket - id=", string(a.Result)) + + // 3. defer the unsubscribe request + defer func() { + require.NoError(t, conn.WriteJSON(rpcRequest{ + JSONRPC: "2.0", + ID: 2, + Method: prefix + "_unsubscribe_" + method, + Params: []any{a.Result}, + }), "unsubscribe: %v", err) + + t.Log("gracefully closed websocket connection") + }() + + // Function to handle JSON reading with error channel + readJSON := func(conn *websocket.Conn, msg *json.RawMessage) <-chan error { + errChan := make(chan error, 1) // Buffered channel to avoid goroutine leak + + go func() { + errChan <- conn.ReadJSON(msg) + close(errChan) + }() + + return errChan + } + + var msg json.RawMessage + + // 4. start a goroutine that keeps reading pushes + outer_loop: + for { + select { + case _, ok := <-runUntil: + // Clean‑up if necessary, then exit + if ok { + t.Log(method, "subscriber", "stopping: runUntil condition met") + } else { + t.Log(method, "subscriber", "stopping: runUntil channel closed") + } + break outer_loop + case <-t.Ctx().Done(): + // Clean‑up if necessary, then exit + t.Log("unsafe head subscriber", "stopping: context cancelled") + break outer_loop + case err := <-readJSON(conn, &msg): + require.NoError(t, err, "read: %v", err) + + var p push[Out] + require.NoError(t, json.Unmarshal(msg, &p), "decode: %v", err) + + t.Log(wsRPC, method, "received websocket message", p.Params.Result) + output <- p.Params.Result + } + } + + }() + + return output, &wg +} + +func GetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) []Out { + output, wg := AsyncGetPrefixedWs[T, Out](t, node, prefix, method, runUntil) + wg.Wait() + + results := make([]Out, 0) + for result := range output { + results = append(results, result) + } + + return results +} + +func GetKonaWs[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []eth.L2BlockRef { + return GetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) +} + +func GetKonaWsAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) (<-chan eth.L2BlockRef, *sync.WaitGroup) { + return AsyncGetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) +} + +func GetDevWS[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []uint64 { + return GetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) +} + +func GetDevWSAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) (<-chan uint64, *sync.WaitGroup) { + return AsyncGetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) +} From 1b7d2b2bffcc4be09d451a2d3b43275d8847bbed Mon Sep 17 00:00:00 2001 From: theochap Date: Tue, 2 Sep 2025 16:49:26 +0200 Subject: [PATCH 2/6] fix(node/test): fix connection drop tests --- tests/node/restart/conn_drop_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/node/restart/conn_drop_test.go b/tests/node/restart/conn_drop_test.go index ec76e25173..b79e0f4bc6 100644 --- a/tests/node/restart/conn_drop_test.go +++ b/tests/node/restart/conn_drop_test.go @@ -54,7 +54,9 @@ func TestConnDropSync(gt *testing.T) { // - the node's safe head is advancing and eventually catches up with the unsafe head // - the node's unsafe head is NOT advancing during this time check := func() error { - defer close(endSignal) + defer func() { + endSignal <- struct{}{} + }() outer_loop: for { From 51c5c7081700c6758699d6d48b58efa24ff4e682 Mon Sep 17 00:00:00 2001 From: theochap Date: Tue, 2 Sep 2025 11:42:46 -0400 Subject: [PATCH 3/6] feat(node/test): try to fix restart tests --- tests/node/restart/restart_test.go | 23 +++++++++++++++----- tests/node/restart/sequencer_restart_test.go | 15 ++++++++++--- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/tests/node/restart/restart_test.go b/tests/node/restart/restart_test.go index 6cd28c084e..4908cdf706 100644 --- a/tests/node/restart/restart_test.go +++ b/tests/node/restart/restart_test.go @@ -2,11 +2,14 @@ package node_restart import ( "context" + "fmt" "testing" + "time" "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" node_utils "github.com/op-rs/kona/node/utils" ) @@ -17,7 +20,7 @@ func TestRestartSync(gt *testing.T) { out := node_utils.NewMixedOpKona(t) - nodes := out.L2CLKonaValidatorNodes + nodes := out.L2CLValidatorNodes() sequencerNodes := out.L2CLSequencerNodes() t.Gate().Greater(len(nodes), 0, "expected at least one validator node") t.Gate().Greater(len(sequencerNodes), 0, "expected at least one sequencer node") @@ -33,16 +36,24 @@ func TestRestartSync(gt *testing.T) { node.Stop() // Ensure that the node is no longer connected to the sequencer - seqPeers := sequencer.Peers() - for _, peer := range seqPeers.Peers { - t.Require().NotEqual(peer.PeerID, nodePeerId, "expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) - } + // Retry with an exponential backoff because the node may take a few seconds to stop. + _, err := retry.Do(t.Ctx(), 5, &retry.ExponentialStrategy{Max: 10 * time.Second, Min: 1 * time.Second, MaxJitter: 250 * time.Millisecond}, func() (any, error) { + seqPeers := sequencer.Peers() + for _, peer := range seqPeers.Peers { + if peer.PeerID == nodePeerId { + return nil, fmt.Errorf("expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) + } + } + return nil, nil + }) + + t.Require().NoError(err) // Ensure that the node is stopped // Check that calling any rpc method returns an error rpc := node_utils.GetNodeRPCEndpoint(&node) var out *eth.SyncStatus - err := rpc.CallContext(context.Background(), &out, "opp2p_syncStatus") + err = rpc.CallContext(context.Background(), &out, "opp2p_syncStatus") t.Require().Error(err, "expected node %s to be stopped", clName) } diff --git a/tests/node/restart/sequencer_restart_test.go b/tests/node/restart/sequencer_restart_test.go index c5b888b604..7772b5e5b0 100644 --- a/tests/node/restart/sequencer_restart_test.go +++ b/tests/node/restart/sequencer_restart_test.go @@ -1,10 +1,13 @@ package node_restart import ( + "fmt" "testing" + "time" "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" node_utils "github.com/op-rs/kona/node/utils" ) @@ -37,9 +40,15 @@ func TestSequencerRestart(gt *testing.T) { for _, node := range nodes { // Ensure that the node is no longer connected to the sequencer nodePeers := node.Peers() - for _, peer := range nodePeers.Peers { - t.Require().NotEqual(peer.PeerID, seqPeerId, "expected node %s to be disconnected from sequencer %s", node.Escape().ID().Key(), sequencer.Escape().ID().Key()) - } + _, err := retry.Do(t.Ctx(), 5, &retry.ExponentialStrategy{Max: 10 * time.Second, Min: 1 * time.Second, MaxJitter: 250 * time.Millisecond}, func() (any, error) { + for _, peer := range nodePeers.Peers { + if peer.PeerID == seqPeerId { + return nil, fmt.Errorf("expected node %s to be disconnected from sequencer %s", node.Escape().ID().Key(), sequencer.Escape().ID().Key()) + } + } + return nil, nil + }) + t.Require().NoError(err) // Ensure that the other nodes are not advancing. // The local safe head may advance (for the next l1 block to be processed), but the unsafe head should not. From bbb9761f833d2dc99e7d41518c63a4e5c0e9ccba Mon Sep 17 00:00:00 2001 From: theochap Date: Wed, 3 Sep 2025 06:54:14 -0400 Subject: [PATCH 4/6] fix(node/tests): fix test files and config --- mise.toml | 2 +- tests/devnets/first-kona-conductor.yaml | 1 + tests/devnets/large-kona-sequencer.yaml | 1 + tests/devnets/large-kona.yaml | 1 + tests/devnets/preinterop-supervisor.yaml | 1 + tests/devnets/simple-kona-conductor.yaml | 1 + tests/devnets/simple-kona-geth.yaml | 1 + tests/devnets/simple-kona-sequencer.yaml | 1 + tests/devnets/simple-kona.yaml | 1 + tests/devnets/simple-supervisor.yaml | 1 + 10 files changed, 10 insertions(+), 1 deletion(-) diff --git a/mise.toml b/mise.toml index 7c8a9a1eb5..b99e487ab3 100644 --- a/mise.toml +++ b/mise.toml @@ -1,3 +1,3 @@ [tools] -"ubi:kurtosis-tech/kurtosis-cli-release-artifacts[exe=kurtosis]" = "1.10.3" \ No newline at end of file +"ubi:kurtosis-tech/kurtosis-cli-release-artifacts[exe=kurtosis]" = "1.8.1" \ No newline at end of file diff --git a/tests/devnets/first-kona-conductor.yaml b/tests/devnets/first-kona-conductor.yaml index ef31b82b70..639e54e88d 100644 --- a/tests/devnets/first-kona-conductor.yaml +++ b/tests/devnets/first-kona-conductor.yaml @@ -56,6 +56,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/large-kona-sequencer.yaml b/tests/devnets/large-kona-sequencer.yaml index 188826d31c..66139268ef 100644 --- a/tests/devnets/large-kona-sequencer.yaml +++ b/tests/devnets/large-kona-sequencer.yaml @@ -94,6 +94,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/large-kona.yaml b/tests/devnets/large-kona.yaml index f68cc2b90f..e5e641c8c3 100644 --- a/tests/devnets/large-kona.yaml +++ b/tests/devnets/large-kona.yaml @@ -84,6 +84,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/preinterop-supervisor.yaml b/tests/devnets/preinterop-supervisor.yaml index ad2754c159..80e62d6fac 100644 --- a/tests/devnets/preinterop-supervisor.yaml +++ b/tests/devnets/preinterop-supervisor.yaml @@ -67,6 +67,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona-conductor.yaml b/tests/devnets/simple-kona-conductor.yaml index f1cc44e13c..79d11ef051 100644 --- a/tests/devnets/simple-kona-conductor.yaml +++ b/tests/devnets/simple-kona-conductor.yaml @@ -43,6 +43,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona-geth.yaml b/tests/devnets/simple-kona-geth.yaml index 092f57cacb..18a73debe1 100644 --- a/tests/devnets/simple-kona-geth.yaml +++ b/tests/devnets/simple-kona-geth.yaml @@ -42,6 +42,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona-sequencer.yaml b/tests/devnets/simple-kona-sequencer.yaml index a7c585484a..a6b275aabd 100644 --- a/tests/devnets/simple-kona-sequencer.yaml +++ b/tests/devnets/simple-kona-sequencer.yaml @@ -38,6 +38,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona.yaml b/tests/devnets/simple-kona.yaml index 018043a01c..c4eff228e8 100644 --- a/tests/devnets/simple-kona.yaml +++ b/tests/devnets/simple-kona.yaml @@ -42,6 +42,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-supervisor.yaml b/tests/devnets/simple-supervisor.yaml index e7659b49a8..bce6c51ce9 100644 --- a/tests/devnets/simple-supervisor.yaml +++ b/tests/devnets/simple-supervisor.yaml @@ -74,6 +74,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 From a935016ed49853d396e652d62718591a0dfb274d Mon Sep 17 00:00:00 2001 From: theochap Date: Wed, 3 Sep 2025 16:10:54 +0200 Subject: [PATCH 5/6] fix(node/test): fix websocket --- crates/node/rpc/src/ws.rs | 1 + tests/node/restart/conn_drop_test.go | 4 +-- tests/node/utils/ws.go | 52 +++++++++++++--------------- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/crates/node/rpc/src/ws.rs b/crates/node/rpc/src/ws.rs index 0330b30f46..7c39857ac3 100644 --- a/crates/node/rpc/src/ws.rs +++ b/crates/node/rpc/src/ws.rs @@ -71,6 +71,7 @@ impl WsServer for WsRPC { .await .map(|state| *state) { + info!(target: "rpc::ws", "Sending safe head update: {:?}", new_state.sync_state.safe_head()); current_safe_head = new_state.sync_state.safe_head(); Self::send_state_update(&sink, current_safe_head).await?; } diff --git a/tests/node/restart/conn_drop_test.go b/tests/node/restart/conn_drop_test.go index b79e0f4bc6..340014a7a5 100644 --- a/tests/node/restart/conn_drop_test.go +++ b/tests/node/restart/conn_drop_test.go @@ -47,8 +47,8 @@ func TestConnDropSync(gt *testing.T) { endSignal := make(chan struct{}) - safeHeads, _ := node_utils.GetKonaWsAsync(t, &node, "safe_head", endSignal) - unsafeHeads, _ := node_utils.GetKonaWsAsync(t, &node, "unsafe_head", endSignal) + safeHeads := node_utils.GetKonaWsAsync(t, &node, "safe_head", endSignal) + unsafeHeads := node_utils.GetKonaWsAsync(t, &node, "unsafe_head", endSignal) // Ensures that.... // - the node's safe head is advancing and eventually catches up with the unsafe head diff --git a/tests/node/utils/ws.go b/tests/node/utils/ws.go index c772968b3b..6b0bee053f 100644 --- a/tests/node/utils/ws.go +++ b/tests/node/utils/ws.go @@ -3,7 +3,6 @@ package node_utils import ( "encoding/json" "strings" - "sync" "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" @@ -44,18 +43,13 @@ type push[Out any] struct { // --------------------------------------------------------------------------- -func AsyncGetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) (<-chan Out, *sync.WaitGroup) { +func AsyncGetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) <-chan Out { userRPC := node.Escape().UserRPC() wsRPC := strings.Replace(userRPC, "http", "ws", 1) - output := make(chan Out) + output := make(chan Out, 128) - var wg sync.WaitGroup - - wg.Add(1) go func() { - defer wg.Done() - conn, _, err := websocket.DefaultDialer.DialContext(t.Ctx(), wsRPC, nil) require.NoError(t, err, "dial: %v", err) defer conn.Close() @@ -87,21 +81,23 @@ func AsyncGetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix }() // Function to handle JSON reading with error channel - readJSON := func(conn *websocket.Conn, msg *json.RawMessage) <-chan error { - errChan := make(chan error, 1) // Buffered channel to avoid goroutine leak + msgChan := make(chan json.RawMessage, 1) // Buffered channel to avoid goroutine leak - go func() { - errChan <- conn.ReadJSON(msg) - close(errChan) - }() + go func() { + var msg json.RawMessage + defer close(msgChan) - return errChan - } + for { + if err := conn.ReadJSON(&msg); err != nil { + t.Log("readJSON channel closed") + return + } - var msg json.RawMessage + msgChan <- msg + } + }() // 4. start a goroutine that keeps reading pushes - outer_loop: for { select { case _, ok := <-runUntil: @@ -111,13 +107,16 @@ func AsyncGetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix } else { t.Log(method, "subscriber", "stopping: runUntil channel closed") } - break outer_loop + return case <-t.Ctx().Done(): // Clean‑up if necessary, then exit t.Log("unsafe head subscriber", "stopping: context cancelled") - break outer_loop - case err := <-readJSON(conn, &msg): - require.NoError(t, err, "read: %v", err) + return + case msg, ok := <-msgChan: + if !ok { + t.Log("readJSON channel closed") + return + } var p push[Out] require.NoError(t, json.Unmarshal(msg, &p), "decode: %v", err) @@ -129,12 +128,11 @@ func AsyncGetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix }() - return output, &wg + return output } func GetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) []Out { - output, wg := AsyncGetPrefixedWs[T, Out](t, node, prefix, method, runUntil) - wg.Wait() + output := AsyncGetPrefixedWs[T, Out](t, node, prefix, method, runUntil) results := make([]Out, 0) for result := range output { @@ -148,7 +146,7 @@ func GetKonaWs[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil < return GetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) } -func GetKonaWsAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) (<-chan eth.L2BlockRef, *sync.WaitGroup) { +func GetKonaWsAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) <-chan eth.L2BlockRef { return AsyncGetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) } @@ -156,6 +154,6 @@ func GetDevWS[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <- return GetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) } -func GetDevWSAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) (<-chan uint64, *sync.WaitGroup) { +func GetDevWSAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) <-chan uint64 { return AsyncGetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) } From 2b64d348a751c88102051e004ca89a3c8148cbc8 Mon Sep 17 00:00:00 2001 From: theochap Date: Thu, 4 Sep 2025 10:49:59 +0200 Subject: [PATCH 6/6] fix(node/test): fix restart tests --- tests/node/restart/conn_drop_test.go | 31 ++++++++++++++------ tests/node/restart/restart_test.go | 2 +- tests/node/restart/sequencer_restart_test.go | 2 +- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/tests/node/restart/conn_drop_test.go b/tests/node/restart/conn_drop_test.go index 340014a7a5..e350679ea7 100644 --- a/tests/node/restart/conn_drop_test.go +++ b/tests/node/restart/conn_drop_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" node_utils "github.com/op-rs/kona/node/utils" @@ -54,10 +55,6 @@ func TestConnDropSync(gt *testing.T) { // - the node's safe head is advancing and eventually catches up with the unsafe head // - the node's unsafe head is NOT advancing during this time check := func() error { - defer func() { - endSignal <- struct{}{} - }() - outer_loop: for { select { @@ -72,6 +69,8 @@ func TestConnDropSync(gt *testing.T) { } } + endSignal <- struct{}{} + return nil } @@ -110,7 +109,7 @@ func TestConnDropSync(gt *testing.T) { t.Require().True(found, "expected node %s to be connected to reference node %s", clName, sequencer.Escape().ID().Key()) // Check that the node is resyncing with the unsafe head network - postReconnectCheckFuns = append(postReconnectCheckFuns, node.MatchedFn(&sequencer, types.LocalSafe, 50), node.AdvancedFn(types.LocalUnsafe, 50, 100), MatchedWithinRange(t, node, sequencer, 5, types.LocalUnsafe, 100)) + postReconnectCheckFuns = append(postReconnectCheckFuns, MatchedWithinRange(t, node, sequencer, 3, types.LocalSafe, 50), node.AdvancedFn(types.LocalUnsafe, 50, 100), MatchedWithinRange(t, node, sequencer, 3, types.LocalUnsafe, 100)) } dsl.CheckAll(t, postReconnectCheckFuns...) @@ -129,13 +128,27 @@ func MatchedWithinRange(t devtest.T, baseNode, refNode dsl.L2CLNode, delta uint6 base = baseNode.ChainSyncStatus(chainID, lvl) ref = refNode.ChainSyncStatus(chainID, lvl) if ref.Number <= base.Number+delta || ref.Number >= base.Number-delta { - logger.Info("Node matched", "ref", ref.Number, "base", base.Number, "delta", delta) - block, err := refNode.Escape().RollupAPI().OutputAtBlock(t.Ctx(), ref.Number) + logger.Info("Node matched", "ref_id", refNode, "base_id", baseNode, "ref", ref.Number, "base", base.Number, "delta", delta) + + // We get the same block from the head and tail node + var headNode dsl.L2CLNode + var tailNode eth.BlockID + if ref.Number > base.Number { + headNode = refNode + tailNode = base + } else { + headNode = baseNode + tailNode = ref + } + + baseBlock, err := headNode.Escape().RollupAPI().OutputAtBlock(t.Ctx(), tailNode.Number) if err != nil { return err } - t.Require().Equal(block.BlockRef.Hash, base.Hash, "expected block hash to match") - t.Require().Equal(block.BlockRef.Number, base.Number, "expected block number to match") + + t.Require().Equal(baseBlock.BlockRef.Number, tailNode.Number, "expected block number to match") + t.Require().Equal(baseBlock.BlockRef.Hash, tailNode.Hash, "expected block hash to match") + return nil } logger.Info("Node sync status", "base", base.Number, "ref", ref.Number) diff --git a/tests/node/restart/restart_test.go b/tests/node/restart/restart_test.go index 4908cdf706..6fe3474968 100644 --- a/tests/node/restart/restart_test.go +++ b/tests/node/restart/restart_test.go @@ -68,7 +68,7 @@ func TestRestartSync(gt *testing.T) { node.ConnectPeer(&sequencer) // Check that the node is resyncing with the network - postStartCheckFuns = append(postStartCheckFuns, node.MatchedFn(&sequencer, types.LocalSafe, 50), node.MatchedFn(&sequencer, types.LocalUnsafe, 50)) + postStartCheckFuns = append(postStartCheckFuns, MatchedWithinRange(t, node, sequencer, 3, types.LocalSafe, 100), MatchedWithinRange(t, node, sequencer, 3, types.LocalUnsafe, 100)) // Check that the node is connected to the reference node peers := node.Peers() diff --git a/tests/node/restart/sequencer_restart_test.go b/tests/node/restart/sequencer_restart_test.go index 7772b5e5b0..ebbcb7172e 100644 --- a/tests/node/restart/sequencer_restart_test.go +++ b/tests/node/restart/sequencer_restart_test.go @@ -28,7 +28,7 @@ func TestSequencerRestart(gt *testing.T) { // Let's ensure that all the nodes are properly advancing. var preCheckFuns []dsl.CheckFunc for _, node := range nodes { - preCheckFuns = append(preCheckFuns, node.LaggedFn(&sequencer, types.CrossUnsafe, 20, true), node.MatchedFn(&sequencer, types.LocalSafe, 20)) + preCheckFuns = append(preCheckFuns, node.LaggedFn(&sequencer, types.CrossUnsafe, 20, true), node.AdvancedFn(types.LocalSafe, 20, 40)) } dsl.CheckAll(t, preCheckFuns...)