Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3243,7 +3243,7 @@ workflows:
- op-acceptance-tests:
name: memory-all
gate: "" # Empty gate = gateless mode
no_output_timeout: 80m # Keep this less than 90m to avoid CircleCI timeout
no_output_timeout: 120m # Allow longer runs for memory-all gate
context:
- circleci-repo-readonly-authenticated-github-token
- slack
Expand Down
2 changes: 1 addition & 1 deletion op-acceptance-tests/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ acceptance-test devnet="" gate="base":
"--validators" "./acceptance-tests.yaml"
"--exclude-gates" "flake-shake"
"--allow-skips"
"--timeout" "60m"
"--timeout" "120m"
"--orchestrator" "$ORCHESTRATOR"
"--show-progress"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build !ci

// use a tag prefixed with "!". Such tag ensures that the default behaviour of this test would be to be built/run even when the go toolchain (go test) doesn't specify any tag filter.
package conductor

import (
Expand Down
223 changes: 95 additions & 128 deletions op-acceptance-tests/tests/flashblocks/flashblocks_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ package flashblocks
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"strconv"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-devstack/devtest"
"github.com/ethereum-optimism/optimism/op-devstack/dsl"
"github.com/ethereum-optimism/optimism/op-devstack/presets"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/log/logfilter"
"github.com/ethereum-optimism/optimism/op-service/logmods"
"github.com/ethereum-optimism/optimism/op-test-sequencer/sequencer/seqtypes"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
Expand All @@ -25,14 +27,20 @@ var (
maxExpectedFlashblocks = 20
)

// TestFlashblocksStream checks we can connect to the flashblocks stream
// TestFlashblocksStream checks we can connect to the flashblocks stream across multiple CL backends.
func TestFlashblocksStream(gt *testing.T) {
t := devtest.SerialT(gt)
sys := presets.NewSimpleFlashblocks(t)
logger := testlog.Logger(t, log.LevelInfo).With("Test", "TestFlashblocksStream")
logger := t.Logger()
sys := presets.NewSingleChainWithFlashblocks(t)
filterHandler, ok := logmods.FindHandler[logfilter.FilterHandler](logger.Handler())
if ok {
filterHandler.Set(logfilter.DefaultMute(
logfilter.Level(slog.LevelError).Show(),
logfilter.Select("kind", "L2CLNode").Show(),
))
}
tracer := t.Tracer()
ctx := t.Ctx()
logger.Info("Started Flashblocks Stream test")

ctx, span := tracer.Start(ctx, "test chains")
defer span.End()
Expand All @@ -51,33 +59,91 @@ func TestFlashblocksStream(gt *testing.T) {
logger.Info("Flashblocks stream rate", "rate", flashblocksStreamRateMs)

// Test all L2 chains in the system
for l2Chain, flashblocksBuilderSet := range sys.FlashblocksBuilderSets {
_, span = tracer.Start(ctx, "test chain")
defer span.End()

networkName := l2Chain.String()
t.Run(fmt.Sprintf("L2_Chain_%s", networkName), func(tt devtest.T) {
if len(flashblocksBuilderSet) == 0 {
tt.Skip("no flashblocks builders for chain", l2Chain.String())
}
oprbuilderNode := sys.L2OPRBuilder
rollupBoostNode := sys.L2RollupBoost
_, span = tracer.Start(ctx, "test chain")
defer span.End()

expectedChainID := sys.L2Chain.ChainID().ToBig()
require.Equal(t, oprbuilderNode.Escape().ChainID().ToBig(), expectedChainID, "flashblocks builder node chain id should match expected chain id")

driveViaTestSequencer(t, sys, 3)

// Test the presence / absence of a flashblocks stream operating at a 250ms rate from a flashblocks-websocket-proxy node.
// Allow a generous window for first flashblocks to appear.
testDuration := time.Duration(int64(flashblocksStreamRateMs*maxExpectedFlashblocks*2)) * time.Millisecond
// Allow up to 15% of expected flashblocks to be missing due to timing variations
failureTolerance := int(0.15 * float64(maxExpectedFlashblocks))
Comment thread
teddyknox marked this conversation as resolved.

logger.Debug("Test duration", "duration", testDuration, "failure tolerance (of flashblocks)", failureTolerance)

// Instrument builder stream separately to confirm flashblocks emission upstream.
builderOutput := make(chan []byte, maxExpectedFlashblocks)
defer close(builderOutput)
builderDone := make(chan struct{})
go func() {
err := oprbuilderNode.FlashblocksClient().ListenFor(ctx, logger.With("stream_source", "op-rbuilder"), testDuration, builderOutput, builderDone)
require.NoError(t, err)
}()
builderMessages := make([]string, 0)

output := make(chan []byte, maxExpectedFlashblocks)
defer close(output)
doneListening := make(chan struct{})
streamedMessages := make([]string, 0)
go func() {
err := rollupBoostNode.FlashblocksClient().ListenFor(ctx, logger.With("stream_source", "rollup-boost"), testDuration, output, doneListening)
require.NoError(t, err)
}()

listening := true
for listening {
select {
case <-doneListening:
doneListening = nil
case <-builderDone:
builderDone = nil
case msg := <-output:
streamedMessages = append(streamedMessages, string(msg))
case msg := <-builderOutput:
builderMessages = append(builderMessages, string(msg))
}

expectedChainID := l2Chain.ChainID().ToBig()
for _, flashblocksBuilderNode := range flashblocksBuilderSet {
require.Equal(t, flashblocksBuilderNode.Escape().ChainID().ToBig(), expectedChainID, "flashblocks builder node chain id should match expected chain id")
if doneListening == nil && builderDone == nil {
listening = false
}
}

mode := FlashblocksStreamMode_Follower
if dsl.NewConductor(flashblocksBuilderNode.Escape().Conductor()).IsLeader() {
mode = FlashblocksStreamMode_Leader
}
logger.Info("Completed WebSocket stream reading", "msg_count", len(streamedMessages), "builder_msg_count", len(builderMessages))

testFlashblocksStreamRbuilder(tt, logger, flashblocksBuilderNode, mode, flashblocksStreamRateMs)
}
if len(builderMessages) > 0 {
logger.Info("Sample builder message", "payload", builderMessages[0])
}

for _, flashblocksWebsocketProxy := range sys.FlashblocksWebsocketProxies[l2Chain] {
testFlashblocksStreamFbWsProxy(tt, logger, flashblocksWebsocketProxy, flashblocksStreamRateMs)
}
})
totalFlashblocksProduced := evaluateFlashblocksStream(t, logger, streamedMessages, failureTolerance)
require.Greater(t, totalFlashblocksProduced, 0, "expected to receive flashblocks from rollup-boost stream")
logger.Info("Flashblocks stream validation completed", "total_flashblocks_produced", totalFlashblocksProduced)
}

// driveViaTestSequencer explicitly builds a few blocks to ensure the builder/rollup-boost
// have payloads to serve before we start listening for flashblocks.
func driveViaTestSequencer(t devtest.T, sys *presets.SingleChainWithFlashblocks, count int) {
Comment thread
teddyknox marked this conversation as resolved.
t.Helper()
ts := sys.TestSequencer.Escape().ControlAPI(sys.L2Chain.ChainID())
ctx := t.Ctx()

head := sys.L2EL.BlockRefByLabel(eth.Unsafe)
for i := 0; i < count; i++ {
require.NoError(t, ts.New(ctx, seqtypes.BuildOpts{Parent: head.Hash}))
require.NoError(t, ts.Next(ctx))
head = sys.L2EL.BlockRefByLabel(eth.Unsafe)
}
// Ensure the sequencer EL has produced at least one unsafe block before subscribing.
sys.L2EL.WaitForBlockNumber(1)

// Log the latest unsafe head and L1 origin to confirm block production before listening.
head = sys.L2EL.BlockRefByLabel(eth.Unsafe)
sys.Log.Info("Pre-listen unsafe head", "unsafe", head)
}

func evaluateFlashblocksStream(t devtest.T, logger log.Logger, streamedMessages []string, failureTolerance int) int {
Expand Down Expand Up @@ -133,102 +199,3 @@ func evaluateFlashblocksStream(t devtest.T, logger log.Logger, streamedMessages

return totalFlashblocksProduced
}

// testFlashblocksStreamRbuilder tests the presence / absence of a flashblocks stream operating at a 250ms (configurable via env var FLASHBLOCKS_STREAM_RATE) rate from an rbuilder node
func testFlashblocksStreamRbuilder(t devtest.T, logger log.Logger, flashblocksBuilderNode *dsl.FlashblocksBuilderNode, mode FlashblocksStreamMode, expectedFlashblocksStreamRateMs int) {
t.Run(fmt.Sprintf("Flashblocks_Stream_Rbuilder_%s_%s", flashblocksBuilderNode.Escape().ID(), mode), func(t devtest.T) {
testDuration := time.Duration(int64(expectedFlashblocksStreamRateMs*maxExpectedFlashblocks)) * time.Millisecond
failureTolerance := int(0.15 * float64(maxExpectedFlashblocks))

logger.Debug("Test duration", "duration", testDuration, "failure tolerance (of flashblocks)", failureTolerance)

require.Contains(t, []FlashblocksStreamMode{FlashblocksStreamMode_Leader, FlashblocksStreamMode_Follower}, mode, "mode should be either leader or follower")
require.NotNil(t, flashblocksBuilderNode, "flashblocksBuilderNode should not be nil")

output := make(chan []byte, maxExpectedFlashblocks)
doneListening := make(chan struct{})
streamedMessages := make([]string, 0)
go flashblocksBuilderNode.ListenFor(logger, testDuration, output, doneListening) //nolint:errcheck

for {
select {
case <-doneListening:
goto done
case msg := <-output:
streamedMessages = append(streamedMessages, string(msg))
}
}
done:

defer close(output)

logger.Info("Completed WebSocket stream reading", "message_count", len(streamedMessages))
if mode == FlashblocksStreamMode_Follower {
require.Equal(t, len(streamedMessages), 0, "follower should not receive any messages")
return
}

totalFlashblocksProduced := evaluateFlashblocksStream(t, logger, streamedMessages, failureTolerance)

minExpectedFlashblocks := maxExpectedFlashblocks - failureTolerance
require.Greater(t,
totalFlashblocksProduced, minExpectedFlashblocks,
fmt.Sprintf("total flashblocks produced should be greater than %d (%d over %s with a %dms rate with a failure tolerance of %d flashblocks)",
minExpectedFlashblocks,
maxExpectedFlashblocks,
testDuration,
expectedFlashblocksStreamRateMs,
failureTolerance,
),
)

logger.Info("Flashblocks stream validation completed", "total_flashblocks_produced", totalFlashblocksProduced)
})
}

// testFlashblocksStreamFbWsProxy tests the presence / absence of a flashblocks stream operating at a 250ms (configurable via env var FLASHBLOCKS_STREAM_RATE) rate from a flashblocks-websocket-proxy node
func testFlashblocksStreamFbWsProxy(t devtest.T, logger log.Logger, flashblocksWebsocketProxy *dsl.FlashblocksWebsocketProxy, expectedFlashblocksStreamRateMs int) {
t.Run(fmt.Sprintf("Flashblocks_Stream_FbWsProxy_%s", flashblocksWebsocketProxy.Escape().ID()), func(t devtest.T) {
testDuration := time.Duration(int64(expectedFlashblocksStreamRateMs*maxExpectedFlashblocks)) * time.Millisecond
failureTolerance := int(0.15 * float64(maxExpectedFlashblocks))

logger.Debug("Test duration", "duration", testDuration, "failure tolerance (of flashblocks)", failureTolerance)

require.NotNil(t, flashblocksWebsocketProxy, "flashblocksWebsocketProxy should not be nil")

output := make(chan []byte, maxExpectedFlashblocks)
doneListening := make(chan struct{})
streamedMessages := make([]string, 0)
go flashblocksWebsocketProxy.ListenFor(logger, testDuration, output, doneListening) //nolint:errcheck

for {
select {
case <-doneListening:
goto done
case msg := <-output:
streamedMessages = append(streamedMessages, string(msg))
}
}
done:

defer close(output)

logger.Info("Completed WebSocket stream reading", "message_count", len(streamedMessages))

totalFlashblocksProduced := evaluateFlashblocksStream(t, logger, streamedMessages, failureTolerance)

minExpectedFlashblocks := maxExpectedFlashblocks - failureTolerance
require.Greater(t,
totalFlashblocksProduced, minExpectedFlashblocks,
fmt.Sprintf("total flashblocks produced should be greater than %d (%d over %s with a %dms rate with a failure tolerance of %d flashblocks)",
minExpectedFlashblocks,
maxExpectedFlashblocks,
testDuration,
expectedFlashblocksStreamRateMs,
failureTolerance,
),
)

logger.Info("Flashblocks stream validation completed", "total_flashblocks_produced", totalFlashblocksProduced)
})
}
Loading