diff --git a/tests/node/conn_drops_test.go b/tests/node/conn_drops_test.go new file mode 100644 index 0000000000..96ee051cac --- /dev/null +++ b/tests/node/conn_drops_test.go @@ -0,0 +1,240 @@ +package node + +import ( + "sync" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" + "golang.org/x/sync/errgroup" +) + +// TestConnDrops tests what happens when a node drops his connection to his peers. +// We simulate that by blacklisting all the peers of a node. +func TestConnDrops(gt *testing.T) { + t := devtest.SerialT(gt) + + out := NewMixedOpKona(t) + + nodes := out.L2CLNodes() + + ref := nodes[0] + + var wg sync.WaitGroup + for _, node := range nodes { + if node == ref { + continue + } + + t.Log("testing conn drops for node %s", node.Escape().ID().Key()) + + wg.Add(1) + go func() { + defer wg.Done() + + // Check that both the safe and unsafe chains are advancing + dsl.CheckAll(t, node.MatchedFn(&ref, types.LocalSafe, 50), node.MatchedFn(&ref, types.LocalUnsafe, 50)) + + // Blacklist all the peers of the node + peers := node.Peers() + for _, peer := range peers.Peers { + t.Log("blacklisting peer %s", peer.PeerID) + err := node.Escape().P2PAPI().BlockPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to block peer %s", peer.PeerID) + // Disconnect the peer to ensure that the node is not connected to it anymore + err = node.Escape().P2PAPI().DisconnectPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to disconnect peer %s", peer.PeerID) + } + + check := []dsl.CheckFunc{} + + // Wait for the safe chain to advance. The node should _only_ be able to sync the L1 chain: only the safe chain should advance. + // The local safe chain may diverge from the reference node, but the unsafe chain should be in sync. + check = append(check, node.AdvancedFn(types.LocalSafe, 20, 50)) + + // The node should be able to sync the unsafe chain (by consolidating the safe chain) + check = append(check, node.AdvancedFn(types.LocalUnsafe, 20, 50)) + + dsl.CheckAll(t, check...) + + if !isSequencer(&node) { + // The unsafe and safe chains should match + syncStatus := node.SyncStatus() + t.Require().Equal(syncStatus.UnsafeL2, syncStatus.SafeL2, "expected unsafe and safe chains to be in sync") + } else { + // The unsafe and safe chains should diverge + syncStatus := node.SyncStatus() + t.Require().NotEqual(syncStatus.UnsafeL2, syncStatus.SafeL2, "expected unsafe and safe chains to diverge") + } + + // Unblock the peers of the node + for _, peer := range peers.Peers { + t.Log("unblocking peer %s", peer.PeerID) + err := node.Escape().P2PAPI().UnblockPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to unblock peer %s", peer.PeerID) + } + + // Wait for the safe and unsafe chains to advance. The node should be able to sync both the safe and unsafe chains. + // The chains should be in sync with the reference node! + dsl.CheckAll(t, node.MatchedFn(&ref, types.LocalSafe, 50), node.MatchedFn(&ref, types.LocalUnsafe, 50)) + + }() + } + + wg.Wait() +} + +// TestConnDropsWithSequencer tests what happens when the sequencer node drops his connection to all the other nodes of the network. +// In that case, the sequencer should be able to sync both the safe and unsafe chains. The other nodes should be able to sync the L1 chain but diverge from the sequencer. +func TestConnDropsWithSequencer(gt *testing.T) { + t := devtest.SerialT(gt) + + out := NewMixedOpKona(t) + + nodes := out.L2CLNodes() + + sequencerList := filterSequencer(nodes) + + // Ensure that there is only one sequencer node (otherwise op-conductor might make matters tricky) + t.Gate().Equal(len(sequencerList), 1, "expected only one sequencer node") + + sequencer := sequencerList[0] + + // Blacklist all the peers of the sequencer + peers := sequencer.Peers() + for _, peer := range peers.Peers { + t.Log("blacklisting peer %s", peer.PeerID) + err := sequencer.Escape().P2PAPI().BlockPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to block peer %s", peer.PeerID) + // Disconnect the peer to ensure that the node is not connected to it anymore + err = sequencer.Escape().P2PAPI().DisconnectPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to disconnect peer %s", peer.PeerID) + } + + // Now: + // - The sequencer should be able to sync the L1 chain + // - The other nodes should be able to sync the L1 chain but diverge from the sequencer + // - The sequencer should be able to sync the safe and unsafe chains + // - The other nodes should be able to sync the safe and unsafe chains + + toCheck := []dsl.CheckFunc{} + toCheckErr := []dsl.CheckFunc{} + + toCheck = append(toCheck, sequencer.AdvancedFn(types.LocalSafe, 20, 50), sequencer.AdvancedFn(types.LocalUnsafe, 20, 50)) + + for _, node := range nodes { + if node == sequencer { + continue + } + + toCheck = append(toCheck, node.AdvancedFn(types.LocalSafe, 20, 50)) + toCheck = append(toCheck, node.AdvancedFn(types.LocalUnsafe, 20, 50)) + + // The other nodes should _always_ diverge from the sequencer + toCheckErr = append(toCheckErr, node.MatchedFn(&sequencer, types.LocalUnsafe, 50)) + } + + dsl.CheckAll(t, toCheck...) + CheckErr(t, toCheckErr...) + + // Unblock the peers of the sequencer. The network should get back to normal. + for _, peer := range peers.Peers { + t.Log("unblocking peer %s", peer.PeerID) + err := sequencer.Escape().P2PAPI().UnblockPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to unblock peer %s", peer.PeerID) + // Reconnect the peer to ensure that the node is connected to it again + err = sequencer.Escape().P2PAPI().ConnectPeer(t.Ctx(), peer.Addresses[0]) + t.Require().NoError(err, "failed to connect peer %s", peer.PeerID) + } + + toCheck = []dsl.CheckFunc{} + + // Wait for the safe and unsafe chains to advance. The sequencer should be able to sync both the safe and unsafe chains. + toCheck = append(toCheck, sequencer.AdvancedFn(types.LocalSafe, 20, 50), sequencer.AdvancedFn(types.LocalUnsafe, 20, 50)) + + for _, node := range nodes { + if node == sequencer { + continue + } + + toCheck = append(toCheck, node.MatchedFn(&sequencer, types.LocalSafe, 50)) + toCheck = append(toCheck, node.MatchedFn(&sequencer, types.LocalUnsafe, 50)) + } + + dsl.CheckAll(t, toCheck...) +} + +// Like CheckAll, but expects an error. +func CheckErr(t devtest.T, checks ...dsl.CheckFunc) { + var g errgroup.Group + for _, check := range checks { + check := check + g.Go(func() error { + return check() + }) + } + t.Require().Error(g.Wait()) +} + +// TestConnDropsEngineTaskCount tests that the engine task count is correctly updated when a node drops his connection to his peers. +func TestConnDropsEngineTaskCount(gt *testing.T) { + t := devtest.SerialT(gt) + + out := NewMixedOpKona(t) + + nodes := out.L2CLNodes() + + ref := nodes[0] + + var wg sync.WaitGroup + for _, node := range nodes { + if node == ref { + continue + } + + t.Log("testing conn drops for node %s", node.Escape().ID().Key()) + + wg.Add(1) + go func() { + defer wg.Done() + + // Blacklist all the peers of the node + peers := node.Peers() + for _, peer := range peers.Peers { + t.Log("blacklisting peer %s", peer.PeerID) + err := node.Escape().P2PAPI().BlockPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to block peer %s", peer.PeerID) + } + + // Check that the engine task count is correct + clRPC, err := GetNodeRPCEndpoint(t.Ctx(), &node) + t.Require().NoError(err, "failed to get RPC endpoint for node %s", node.Escape().ID().Key()) + wsRPC := websocketRPC(clRPC) + + const SECS_WAIT_FOR_ENGINE = 10 + queue := GetDevWS(t, wsRPC, "engine_queue_size", time.After(SECS_WAIT_FOR_ENGINE*time.Second)) + + // Check that the engine task count is correct + for _, q := range queue { + t.Require().LessOrEqual(q, uint64(1), "expected at most 1 engine task") + } + + // Unblock the peers of the node + for _, peer := range peers.Peers { + t.Log("unblocking peer %s", peer.PeerID) + err := node.Escape().P2PAPI().UnblockPeer(t.Ctx(), peer.PeerID) + t.Require().NoError(err, "failed to unblock peer %s", peer.PeerID) + } + + // Wait for the safe and unsafe chains to advance. The node should be able to sync both the safe and unsafe chains. + // The chains should be in sync with the reference node! + dsl.CheckAll(t, node.MatchedFn(&ref, types.LocalSafe, 50), node.MatchedFn(&ref, types.LocalUnsafe, 50)) + + }() + } + + wg.Wait() + +} diff --git a/tests/node/engine_test.go b/tests/node/engine_test.go index deab6d1e77..adaa8abd32 100644 --- a/tests/node/engine_test.go +++ b/tests/node/engine_test.go @@ -24,7 +24,7 @@ func TestEngine(gt *testing.T) { clRPC, err := GetNodeRPCEndpoint(t.Ctx(), &node) // See if the node supports the dev RPC. - if !supportsDevRPC(t, clName, clRPC) { + if !supportsDevRPC(clRPC) { t.Log("node does not support dev RPC, skipping engine test for", node.Escape().ID().Key()) continue } diff --git a/tests/node/mod.go b/tests/node/mod.go index 566420cf5a..17d5ffb66e 100644 --- a/tests/node/mod.go +++ b/tests/node/mod.go @@ -54,7 +54,7 @@ func websocketRPC(clRPC string) string { return strings.Replace(clRPC, "http", "ws", 1) } -func supportsDevRPC(t devtest.T, clName string, clRPC string) bool { +func supportsDevRPC(clRPC string) bool { // To see if the node supports the dev RPC, we try to send a request to the dev RPC to // get the last engine queue length. engineQueueLength := 0 @@ -255,7 +255,7 @@ outer_loop: return output } -func GetKonaWs[T any](t devtest.T, wsRPC string, method string, runUntil <-chan T) []eth.L2BlockRef { +func GetKonaWS[T any](t devtest.T, wsRPC string, method string, runUntil <-chan T) []eth.L2BlockRef { return GetPrefixedWs[T, eth.L2BlockRef](t, "ws", wsRPC, method, runUntil) } diff --git a/tests/node/sync_ws_test.go b/tests/node/sync_ws_test.go index 0c7f2b3be9..094f32a776 100644 --- a/tests/node/sync_ws_test.go +++ b/tests/node/sync_ws_test.go @@ -44,9 +44,9 @@ func TestSyncUnsafeBecomesSafe(gt *testing.T) { wsRPC := websocketRPC(clRPC) t.Log("node supports ws endpoint, continuing sync test", clName, wsRPC) - unsafeBlocks := GetKonaWs(t, wsRPC, "unsafe_head", time.After(SECS_WAIT_FOR_UNSAFE_HEAD*time.Second)) + unsafeBlocks := GetKonaWS(t, wsRPC, "unsafe_head", time.After(SECS_WAIT_FOR_UNSAFE_HEAD*time.Second)) - safeBlocks := GetKonaWs(t, wsRPC, "safe_head", time.After(SECS_WAIT_FOR_SAFE_HEAD*time.Second)) + safeBlocks := GetKonaWS(t, wsRPC, "safe_head", time.After(SECS_WAIT_FOR_SAFE_HEAD*time.Second)) require.GreaterOrEqual(t, len(unsafeBlocks), 1, "we didn't receive enough unsafe gossip blocks!") require.GreaterOrEqual(t, len(safeBlocks), 1, "we didn't receive enough safe gossip blocks!") @@ -101,7 +101,7 @@ func TestSyncUnsafe(gt *testing.T) { wsRPC := websocketRPC(clRPC) t.Log("node supports ws endpoint, continuing sync test", clName, wsRPC) - output := GetKonaWs(t, wsRPC, "unsafe_head", time.After(10*time.Second)) + output := GetKonaWS(t, wsRPC, "unsafe_head", time.After(10*time.Second)) // For each block, we check that the block is actually in the chain of the other nodes. // That should always be the case unless there is a reorg or a long sync. @@ -160,7 +160,7 @@ func TestSyncSafe(gt *testing.T) { wsRPC := websocketRPC(clRPC) t.Log("node supports ws endpoint, continuing sync test", clName, wsRPC) - output := GetKonaWs(t, wsRPC, "safe_head", time.After(10*time.Second)) + output := GetKonaWS(t, wsRPC, "safe_head", time.After(10*time.Second)) // For each block, we check that the block is actually in the chain of the other nodes. // That should always be the case unless there is a reorg or a long sync. @@ -219,7 +219,7 @@ func TestSyncFinalized(gt *testing.T) { wsRPC := websocketRPC(clRPC) t.Log("node supports ws endpoint, continuing sync test", clName, wsRPC) - output := GetKonaWs(t, wsRPC, "finalized_head", time.After(4*time.Minute)) + output := GetKonaWS(t, wsRPC, "finalized_head", time.After(4*time.Minute)) // We should check that we received at least 2 finalized blocks within 4 minutes! require.Greater(t, len(output), 1, "we didn't receive enough finalized gossip blocks!")