diff --git a/crates/node/rpc/src/ws.rs b/crates/node/rpc/src/ws.rs index 0330b30f46..7c39857ac3 100644 --- a/crates/node/rpc/src/ws.rs +++ b/crates/node/rpc/src/ws.rs @@ -71,6 +71,7 @@ impl WsServer for WsRPC { .await .map(|state| *state) { + info!(target: "rpc::ws", "Sending safe head update: {:?}", new_state.sync_state.safe_head()); current_safe_head = new_state.sync_state.safe_head(); Self::send_state_update(&sink, current_safe_head).await?; } diff --git a/mise.toml b/mise.toml index 7c8a9a1eb5..b99e487ab3 100644 --- a/mise.toml +++ b/mise.toml @@ -1,3 +1,3 @@ [tools] -"ubi:kurtosis-tech/kurtosis-cli-release-artifacts[exe=kurtosis]" = "1.10.3" \ No newline at end of file +"ubi:kurtosis-tech/kurtosis-cli-release-artifacts[exe=kurtosis]" = "1.8.1" \ No newline at end of file diff --git a/tests/devnets/first-kona-conductor.yaml b/tests/devnets/first-kona-conductor.yaml index ef31b82b70..639e54e88d 100644 --- a/tests/devnets/first-kona-conductor.yaml +++ b/tests/devnets/first-kona-conductor.yaml @@ -56,6 +56,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/large-kona-sequencer.yaml b/tests/devnets/large-kona-sequencer.yaml index 188826d31c..66139268ef 100644 --- a/tests/devnets/large-kona-sequencer.yaml +++ b/tests/devnets/large-kona-sequencer.yaml @@ -94,6 +94,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/large-kona.yaml b/tests/devnets/large-kona.yaml index f68cc2b90f..e5e641c8c3 100644 --- a/tests/devnets/large-kona.yaml +++ b/tests/devnets/large-kona.yaml @@ -84,6 +84,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/preinterop-supervisor.yaml b/tests/devnets/preinterop-supervisor.yaml index ad2754c159..80e62d6fac 100644 --- a/tests/devnets/preinterop-supervisor.yaml +++ b/tests/devnets/preinterop-supervisor.yaml @@ -67,6 +67,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona-conductor.yaml b/tests/devnets/simple-kona-conductor.yaml index f1cc44e13c..79d11ef051 100644 --- a/tests/devnets/simple-kona-conductor.yaml +++ b/tests/devnets/simple-kona-conductor.yaml @@ -43,6 +43,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona-geth.yaml b/tests/devnets/simple-kona-geth.yaml index 092f57cacb..18a73debe1 100644 --- a/tests/devnets/simple-kona-geth.yaml +++ b/tests/devnets/simple-kona-geth.yaml @@ -42,6 +42,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona-sequencer.yaml b/tests/devnets/simple-kona-sequencer.yaml index a7c585484a..a6b275aabd 100644 --- a/tests/devnets/simple-kona-sequencer.yaml +++ b/tests/devnets/simple-kona-sequencer.yaml @@ -38,6 +38,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-kona.yaml b/tests/devnets/simple-kona.yaml index 018043a01c..c4eff228e8 100644 --- a/tests/devnets/simple-kona.yaml +++ b/tests/devnets/simple-kona.yaml @@ -42,6 +42,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/devnets/simple-supervisor.yaml b/tests/devnets/simple-supervisor.yaml index e7659b49a8..bce6c51ce9 100644 --- a/tests/devnets/simple-supervisor.yaml +++ b/tests/devnets/simple-supervisor.yaml @@ -74,6 +74,7 @@ ethereum_package: participants: - el_type: geth cl_type: teku + cl_image: consensys/teku:25.7.1 network_params: preset: minimal genesis_delay: 5 diff --git a/tests/node/common/engine_test.go b/tests/node/common/engine_test.go index 3d4874d9f3..5e109fdc0b 100644 --- a/tests/node/common/engine_test.go +++ b/tests/node/common/engine_test.go @@ -36,7 +36,7 @@ func TestEngine(gt *testing.T) { done <- struct{}{} }() - queue <- GetDevWS(t, node, "engine_queue_size", done) + queue <- node_utils.GetDevWS(t, node, "engine_queue_size", done) }() q := <-queue diff --git a/tests/node/common/sync_ws_test.go b/tests/node/common/sync_ws_test.go index 943f42bf3e..d97c7fb098 100644 --- a/tests/node/common/sync_ws_test.go +++ b/tests/node/common/sync_ws_test.go @@ -34,9 +34,9 @@ func TestSyncUnsafeBecomesSafe(gt *testing.T) { go func(node *dsl.L2CLNode) { defer wg.Done() - unsafeBlocks := GetKonaWs(t, node, "unsafe_head", time.After(SECS_WAIT_FOR_UNSAFE_HEAD*time.Second)) + unsafeBlocks := node_utils.GetKonaWs(t, node, "unsafe_head", time.After(SECS_WAIT_FOR_UNSAFE_HEAD*time.Second)) - safeBlocks := GetKonaWs(t, node, "safe_head", time.After(SECS_WAIT_FOR_SAFE_HEAD*time.Second)) + safeBlocks := node_utils.GetKonaWs(t, node, "safe_head", time.After(SECS_WAIT_FOR_SAFE_HEAD*time.Second)) require.GreaterOrEqual(t, len(unsafeBlocks), 1, "we didn't receive enough unsafe gossip blocks!") require.GreaterOrEqual(t, len(safeBlocks), 1, "we didn't receive enough safe gossip blocks!") @@ -80,7 +80,7 @@ func TestSyncUnsafe(gt *testing.T) { go func(node *dsl.L2CLNode) { defer wg.Done() - output := GetKonaWs(t, node, "unsafe_head", time.After(10*time.Second)) + output := node_utils.GetKonaWs(t, node, "unsafe_head", time.After(10*time.Second)) // For each block, we check that the block is actually in the chain of the other nodes. // That should always be the case unless there is a reorg or a long sync. @@ -125,7 +125,7 @@ func TestSyncSafe(gt *testing.T) { defer wg.Done() clName := node.Escape().ID().Key() - output := GetKonaWs(t, node, "safe_head", time.After(10*time.Second)) + output := node_utils.GetKonaWs(t, node, "safe_head", time.After(10*time.Second)) // For each block, we check that the block is actually in the chain of the other nodes. // That should always be the case unless there is a reorg or a long sync. @@ -170,7 +170,7 @@ func TestSyncFinalized(gt *testing.T) { defer wg.Done() clName := node.Escape().ID().Key() - output := GetKonaWs(t, node, "finalized_head", time.After(4*time.Minute)) + output := node_utils.GetKonaWs(t, node, "finalized_head", time.After(4*time.Minute)) // We should check that we received at least 2 finalized blocks within 4 minutes! require.Greater(t, len(output), 1, "we didn't receive enough finalized gossip blocks!") diff --git a/tests/node/common/ws.go b/tests/node/common/ws.go deleted file mode 100644 index 2e42267acc..0000000000 --- a/tests/node/common/ws.go +++ /dev/null @@ -1,128 +0,0 @@ -package node - -import ( - "encoding/json" - "strings" - "time" - - "github.com/ethereum-optimism/optimism/op-devstack/devtest" - "github.com/ethereum-optimism/optimism/op-devstack/dsl" - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/require" -) - -// --- Generic RPC request/response types ------------------------------------- -const ( - DEFAULT_TIMEOUT = 10 * time.Second -) - -type rpcRequest struct { - JSONRPC string `json:"jsonrpc"` - Method string `json:"method"` - Params interface{} `json:"params"` - ID uint64 `json:"id"` -} - -type rpcResponse struct { - JSONRPC string `json:"jsonrpc"` - ID uint64 `json:"id"` - Result json.RawMessage `json:"result,omitempty"` - Error *rpcError `json:"error,omitempty"` -} - -type rpcError struct { - Code int `json:"code"` - Message string `json:"message"` -} - -// push { "jsonrpc":"2.0", "method":"time", "params":{ "subscription":"0x…", "result":"…" } } -type push[Out any] struct { - Method string `json:"method"` - Params struct { - SubID uint64 `json:"subscription"` - Result Out `json:"result"` - } `json:"params"` -} - -// --------------------------------------------------------------------------- - -func GetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) []Out { - userRPC := node.Escape().UserRPC() - wsRPC := strings.Replace(userRPC, "http", "ws", 1) - - conn, _, err := websocket.DefaultDialer.DialContext(t.Ctx(), wsRPC, nil) - require.NoError(t, err, "dial: %v", err) - defer conn.Close() - - // 1. send the *_subscribe request - require.NoError(t, conn.WriteJSON(rpcRequest{ - JSONRPC: "2.0", - ID: 1, - Method: prefix + "_" + "subscribe_" + method, - Params: nil, - }), "subscribe: %v", err) - - // 2. read the ack – blocking read just once - var a rpcResponse - require.NoError(t, conn.ReadJSON(&a), "ack: %v", err) - t.Log("subscribed to websocket - id=", string(a.Result)) - - output := make([]Out, 0) - - // Function to handle JSON reading with error channel - readJSON := func(conn *websocket.Conn, msg *json.RawMessage) <-chan error { - errChan := make(chan error, 1) // Buffered channel to avoid goroutine leak - - go func() { - errChan <- conn.ReadJSON(msg) - close(errChan) - }() - - return errChan - } - - var msg json.RawMessage - - // 3. start a goroutine that keeps reading pushes -outer_loop: - for { - select { - case <-runUntil: - // Clean‑up if necessary, then exit - t.Log(method, "subscriber", "stopping: runUntil condition met") - break outer_loop - case <-t.Ctx().Done(): - // Clean‑up if necessary, then exit - t.Log("unsafe head subscriber", "stopping: context cancelled") - break outer_loop - case err := <-readJSON(conn, &msg): - require.NoError(t, err, "read: %v", err) - - var p push[Out] - require.NoError(t, json.Unmarshal(msg, &p), "decode: %v", err) - - t.Log(wsRPC, method, "received websocket message", p.Params.Result) - output = append(output, p.Params.Result) - } - } - - require.NoError(t, conn.WriteJSON(rpcRequest{ - JSONRPC: "2.0", - ID: 2, - Method: prefix + "_unsubscribe_" + method, - Params: []any{a.Result}, - }), "unsubscribe: %v", err) - - t.Log("gracefully closed websocket connection") - - return output -} - -func GetKonaWs[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []eth.L2BlockRef { - return GetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) -} - -func GetDevWS[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []uint64 { - return GetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) -} diff --git a/tests/node/restart/conn_drop_test.go b/tests/node/restart/conn_drop_test.go index 8aabed00cb..e350679ea7 100644 --- a/tests/node/restart/conn_drop_test.go +++ b/tests/node/restart/conn_drop_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" node_utils "github.com/op-rs/kona/node/utils" @@ -32,17 +33,53 @@ func TestConnDropSync(gt *testing.T) { node.DisconnectPeer(&sequencer) // Ensure that the node is no longer connected to the sequencer + t.Logf("node %s is disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) seqPeers := sequencer.Peers() for _, peer := range seqPeers.Peers { t.Require().NotEqual(peer.PeerID, node.PeerInfo().PeerID, "expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) } + peers := node.Peers() + for _, peer := range peers.Peers { + t.Require().NotEqual(peer.PeerID, sequencer.PeerInfo().PeerID, "expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) + } + + currentUnsafeHead := node.ChainSyncStatus(node.ChainID(), types.LocalUnsafe) + + endSignal := make(chan struct{}) + + safeHeads := node_utils.GetKonaWsAsync(t, &node, "safe_head", endSignal) + unsafeHeads := node_utils.GetKonaWsAsync(t, &node, "unsafe_head", endSignal) + + // Ensures that.... + // - the node's safe head is advancing and eventually catches up with the unsafe head + // - the node's unsafe head is NOT advancing during this time + check := func() error { + outer_loop: + for { + select { + case safeHead := <-safeHeads: + t.Logf("node %s safe head is advancing", clName) + if safeHead.Number >= currentUnsafeHead.Number { + t.Logf("node %s safe head caught up with unsafe head", clName) + break outer_loop + } + case unsafeHead := <-unsafeHeads: + return fmt.Errorf("node %s unsafe head is advancing: %d", clName, unsafeHead.Number) + } + } + + endSignal <- struct{}{} + + return nil + } + // Check that... // - the node's safe head is advancing // - the node's unsafe head is advancing (through consolidation) // - the node's safe head's number is catching up with the unsafe head's number // - the node's unsafe head is strictly lagging behind the sequencer's unsafe head - postDisconnectCheckFuns = append(postDisconnectCheckFuns, node.AdvancedFn(types.LocalSafe, 50, 200), node.AdvancedFn(types.LocalUnsafe, 50, 200), SafeUnsafeMatchedFn(t, node, 50)) + postDisconnectCheckFuns = append(postDisconnectCheckFuns, node.AdvancedFn(types.LocalSafe, 50, 200), node.AdvancedFn(types.LocalUnsafe, 50, 200), check) } postDisconnectCheckFuns = append(postDisconnectCheckFuns, sequencer.AdvancedFn(types.LocalUnsafe, 50, 200)) @@ -72,28 +109,50 @@ func TestConnDropSync(gt *testing.T) { t.Require().True(found, "expected node %s to be connected to reference node %s", clName, sequencer.Escape().ID().Key()) // Check that the node is resyncing with the unsafe head network - postReconnectCheckFuns = append(postReconnectCheckFuns, node.MatchedFn(&sequencer, types.LocalSafe, 50), node.MatchedFn(&sequencer, types.LocalUnsafe, 50)) + postReconnectCheckFuns = append(postReconnectCheckFuns, MatchedWithinRange(t, node, sequencer, 3, types.LocalSafe, 50), node.AdvancedFn(types.LocalUnsafe, 50, 100), MatchedWithinRange(t, node, sequencer, 3, types.LocalUnsafe, 100)) } dsl.CheckAll(t, postReconnectCheckFuns...) } -// MatchedFn returns a lambda that checks the baseNode head with given safety level is matched with the refNode chain sync status provider -// Composable with other lambdas to wait in parallel -func SafeUnsafeMatchedFn(t devtest.T, clNode dsl.L2CLNode, attempts int) dsl.CheckFunc { +func MatchedWithinRange(t devtest.T, baseNode, refNode dsl.L2CLNode, delta uint64, lvl types.SafetyLevel, attempts int) dsl.CheckFunc { logger := t.Logger() - chainID := clNode.ChainID() + chainID := baseNode.ChainID() + return func() error { + base := baseNode.ChainSyncStatus(chainID, lvl) + ref := refNode.ChainSyncStatus(chainID, lvl) + logger.Info("Expecting node to match with reference", "base", base.Number, "ref", ref.Number) return retry.Do0(t.Ctx(), attempts, &retry.FixedStrategy{Dur: 2 * time.Second}, func() error { - base := clNode.ChainSyncStatus(chainID, types.LocalSafe) - ref := clNode.ChainSyncStatus(chainID, types.LocalUnsafe) - if ref.Hash == base.Hash && ref.Number == base.Number { - logger.Info("Node safe and unsafe heads matched", "ref", ref.Number, "base", base.Number) + base = baseNode.ChainSyncStatus(chainID, lvl) + ref = refNode.ChainSyncStatus(chainID, lvl) + if ref.Number <= base.Number+delta || ref.Number >= base.Number-delta { + logger.Info("Node matched", "ref_id", refNode, "base_id", baseNode, "ref", ref.Number, "base", base.Number, "delta", delta) + + // We get the same block from the head and tail node + var headNode dsl.L2CLNode + var tailNode eth.BlockID + if ref.Number > base.Number { + headNode = refNode + tailNode = base + } else { + headNode = baseNode + tailNode = ref + } + + baseBlock, err := headNode.Escape().RollupAPI().OutputAtBlock(t.Ctx(), tailNode.Number) + if err != nil { + return err + } + + t.Require().Equal(baseBlock.BlockRef.Number, tailNode.Number, "expected block number to match") + t.Require().Equal(baseBlock.BlockRef.Hash, tailNode.Hash, "expected block hash to match") + return nil } - logger.Info("Node safe and unsafe heads not matched", "safe", base.Number, "unsafe", ref.Number, "ref", ref.Hash, "base", base.Hash) - return fmt.Errorf("expected safe and unsafe heads to match") + logger.Info("Node sync status", "base", base.Number, "ref", ref.Number) + return fmt.Errorf("expected head to match: %s", lvl) }) } } diff --git a/tests/node/restart/restart_test.go b/tests/node/restart/restart_test.go index 0ee482102e..6fe3474968 100644 --- a/tests/node/restart/restart_test.go +++ b/tests/node/restart/restart_test.go @@ -2,11 +2,14 @@ package node_restart import ( "context" + "fmt" "testing" + "time" "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" node_utils "github.com/op-rs/kona/node/utils" ) @@ -17,14 +20,13 @@ func TestRestartSync(gt *testing.T) { out := node_utils.NewMixedOpKona(t) - nodes := out.L2CLKonaValidatorNodes + nodes := out.L2CLValidatorNodes() sequencerNodes := out.L2CLSequencerNodes() t.Gate().Greater(len(nodes), 0, "expected at least one validator node") t.Gate().Greater(len(sequencerNodes), 0, "expected at least one sequencer node") sequencer := sequencerNodes[0] - var preStopCheckFuns []dsl.CheckFunc for _, node := range nodes { t.Logf("testing restarts for node %s", node.Escape().ID().Key()) clName := node.Escape().ID().Key() @@ -34,23 +36,28 @@ func TestRestartSync(gt *testing.T) { node.Stop() // Ensure that the node is no longer connected to the sequencer - seqPeers := sequencer.Peers() - for _, peer := range seqPeers.Peers { - t.Require().NotEqual(peer.PeerID, nodePeerId, "expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) - } + // Retry with an exponential backoff because the node may take a few seconds to stop. + _, err := retry.Do(t.Ctx(), 5, &retry.ExponentialStrategy{Max: 10 * time.Second, Min: 1 * time.Second, MaxJitter: 250 * time.Millisecond}, func() (any, error) { + seqPeers := sequencer.Peers() + for _, peer := range seqPeers.Peers { + if peer.PeerID == nodePeerId { + return nil, fmt.Errorf("expected node %s to be disconnected from sequencer %s", clName, sequencer.Escape().ID().Key()) + } + } + return nil, nil + }) + + t.Require().NoError(err) // Ensure that the node is stopped // Check that calling any rpc method returns an error rpc := node_utils.GetNodeRPCEndpoint(&node) var out *eth.SyncStatus - err := rpc.CallContext(context.Background(), &out, "opp2p_syncStatus") + err = rpc.CallContext(context.Background(), &out, "opp2p_syncStatus") t.Require().Error(err, "expected node %s to be stopped", clName) - - // Ensure that the sequencer's head is advancing. - preStopCheckFuns = append(preStopCheckFuns, sequencer.AdvancedFn(types.LocalUnsafe, 50, 200)) } - dsl.CheckAll(t, preStopCheckFuns...) + sequencer.Advanced(types.LocalUnsafe, 50, 200) var postStartCheckFuns []dsl.CheckFunc for _, node := range nodes { @@ -61,7 +68,7 @@ func TestRestartSync(gt *testing.T) { node.ConnectPeer(&sequencer) // Check that the node is resyncing with the network - postStartCheckFuns = append(postStartCheckFuns, node.MatchedFn(&sequencer, types.LocalSafe, 50), node.MatchedFn(&sequencer, types.LocalUnsafe, 50)) + postStartCheckFuns = append(postStartCheckFuns, MatchedWithinRange(t, node, sequencer, 3, types.LocalSafe, 100), MatchedWithinRange(t, node, sequencer, 3, types.LocalUnsafe, 100)) // Check that the node is connected to the reference node peers := node.Peers() diff --git a/tests/node/restart/sequencer_restart_test.go b/tests/node/restart/sequencer_restart_test.go index c5b888b604..ebbcb7172e 100644 --- a/tests/node/restart/sequencer_restart_test.go +++ b/tests/node/restart/sequencer_restart_test.go @@ -1,10 +1,13 @@ package node_restart import ( + "fmt" "testing" + "time" "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" node_utils "github.com/op-rs/kona/node/utils" ) @@ -25,7 +28,7 @@ func TestSequencerRestart(gt *testing.T) { // Let's ensure that all the nodes are properly advancing. var preCheckFuns []dsl.CheckFunc for _, node := range nodes { - preCheckFuns = append(preCheckFuns, node.LaggedFn(&sequencer, types.CrossUnsafe, 20, true), node.MatchedFn(&sequencer, types.LocalSafe, 20)) + preCheckFuns = append(preCheckFuns, node.LaggedFn(&sequencer, types.CrossUnsafe, 20, true), node.AdvancedFn(types.LocalSafe, 20, 40)) } dsl.CheckAll(t, preCheckFuns...) @@ -37,9 +40,15 @@ func TestSequencerRestart(gt *testing.T) { for _, node := range nodes { // Ensure that the node is no longer connected to the sequencer nodePeers := node.Peers() - for _, peer := range nodePeers.Peers { - t.Require().NotEqual(peer.PeerID, seqPeerId, "expected node %s to be disconnected from sequencer %s", node.Escape().ID().Key(), sequencer.Escape().ID().Key()) - } + _, err := retry.Do(t.Ctx(), 5, &retry.ExponentialStrategy{Max: 10 * time.Second, Min: 1 * time.Second, MaxJitter: 250 * time.Millisecond}, func() (any, error) { + for _, peer := range nodePeers.Peers { + if peer.PeerID == seqPeerId { + return nil, fmt.Errorf("expected node %s to be disconnected from sequencer %s", node.Escape().ID().Key(), sequencer.Escape().ID().Key()) + } + } + return nil, nil + }) + t.Require().NoError(err) // Ensure that the other nodes are not advancing. // The local safe head may advance (for the next l1 block to be processed), but the unsafe head should not. diff --git a/tests/node/utils/ws.go b/tests/node/utils/ws.go new file mode 100644 index 0000000000..6b0bee053f --- /dev/null +++ b/tests/node/utils/ws.go @@ -0,0 +1,159 @@ +package node_utils + +import ( + "encoding/json" + "strings" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +// --- Generic RPC request/response types ------------------------------------- + +type rpcRequest struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params interface{} `json:"params"` + ID uint64 `json:"id"` +} + +type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID uint64 `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error *rpcError `json:"error,omitempty"` +} + +type rpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// push { "jsonrpc":"2.0", "method":"time", "params":{ "subscription":"0x…", "result":"…" } } +type push[Out any] struct { + Method string `json:"method"` + Params struct { + SubID uint64 `json:"subscription"` + Result Out `json:"result"` + } `json:"params"` +} + +// --------------------------------------------------------------------------- + +func AsyncGetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) <-chan Out { + userRPC := node.Escape().UserRPC() + wsRPC := strings.Replace(userRPC, "http", "ws", 1) + + output := make(chan Out, 128) + + go func() { + conn, _, err := websocket.DefaultDialer.DialContext(t.Ctx(), wsRPC, nil) + require.NoError(t, err, "dial: %v", err) + defer conn.Close() + defer close(output) + + // 1. send the *_subscribe request + require.NoError(t, conn.WriteJSON(rpcRequest{ + JSONRPC: "2.0", + ID: 1, + Method: prefix + "_" + "subscribe_" + method, + Params: nil, + }), "subscribe: %v", err) + + // 2. read the ack – blocking read just once + var a rpcResponse + require.NoError(t, conn.ReadJSON(&a), "ack: %v", err) + t.Log("subscribed to websocket - id=", string(a.Result)) + + // 3. defer the unsubscribe request + defer func() { + require.NoError(t, conn.WriteJSON(rpcRequest{ + JSONRPC: "2.0", + ID: 2, + Method: prefix + "_unsubscribe_" + method, + Params: []any{a.Result}, + }), "unsubscribe: %v", err) + + t.Log("gracefully closed websocket connection") + }() + + // Function to handle JSON reading with error channel + msgChan := make(chan json.RawMessage, 1) // Buffered channel to avoid goroutine leak + + go func() { + var msg json.RawMessage + defer close(msgChan) + + for { + if err := conn.ReadJSON(&msg); err != nil { + t.Log("readJSON channel closed") + return + } + + msgChan <- msg + } + }() + + // 4. start a goroutine that keeps reading pushes + for { + select { + case _, ok := <-runUntil: + // Clean‑up if necessary, then exit + if ok { + t.Log(method, "subscriber", "stopping: runUntil condition met") + } else { + t.Log(method, "subscriber", "stopping: runUntil channel closed") + } + return + case <-t.Ctx().Done(): + // Clean‑up if necessary, then exit + t.Log("unsafe head subscriber", "stopping: context cancelled") + return + case msg, ok := <-msgChan: + if !ok { + t.Log("readJSON channel closed") + return + } + + var p push[Out] + require.NoError(t, json.Unmarshal(msg, &p), "decode: %v", err) + + t.Log(wsRPC, method, "received websocket message", p.Params.Result) + output <- p.Params.Result + } + } + + }() + + return output +} + +func GetPrefixedWs[T any, Out any](t devtest.T, node *dsl.L2CLNode, prefix string, method string, runUntil <-chan T) []Out { + output := AsyncGetPrefixedWs[T, Out](t, node, prefix, method, runUntil) + + results := make([]Out, 0) + for result := range output { + results = append(results, result) + } + + return results +} + +func GetKonaWs[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []eth.L2BlockRef { + return GetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) +} + +func GetKonaWsAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) <-chan eth.L2BlockRef { + return AsyncGetPrefixedWs[T, eth.L2BlockRef](t, node, "ws", method, runUntil) +} + +func GetDevWS[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) []uint64 { + return GetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) +} + +func GetDevWSAsync[T any](t devtest.T, node *dsl.L2CLNode, method string, runUntil <-chan T) <-chan uint64 { + return AsyncGetPrefixedWs[T, uint64](t, node, "dev", method, runUntil) +}