Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions op-e2e/actions/helpers/l2_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,16 @@ func NewL2Sequencer(t Testing, log log.Logger, l1 derive.L1Fetcher, blobSrc deri
asyncGossip := async.NoOpGossiper{}
seq := sequencing.NewSequencer(t.Ctx(), log, cfg, attrBuilder, l1OriginSelector,
seqStateListener, conduc, asyncGossip, metr)
opts := event.DefaultRegisterOpts()
opts.Emitter = event.EmitterOpts{
Limiting: true,
opts := event.WithEmitLimiter(
// TestSyncBatchType/DerivationWithFlakyL1RPC does *a lot* of quick retries
// TestL2BatcherBatchType/ExtendedTimeWithoutL1Batches as well.
Rate: rate.Limit(100_000),
Burst: 100_000,
OnLimited: func() {
rate.Limit(100_000),
100_000,
func() {
log.Warn("Hitting events rate-limit. An events code-path may be hot-looping.")
t.Fatal("Tests must not hot-loop events")
},
}
)
ver.eventSys.Register("sequencer", seq, opts)
ver.eventSys.Register("origin-selector", originSelector, opts)
require.NoError(t, seq.Init(t.Ctx(), true))
Expand Down
12 changes: 5 additions & 7 deletions op-e2e/actions/helpers/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,16 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher,
executor := event.NewGlobalSynchronous(ctx)
sys := event.NewSystem(log, executor)
t.Cleanup(sys.Stop)
opts := event.DefaultRegisterOpts()
opts.Emitter = event.EmitterOpts{
Limiting: true,
opts := event.WithEmitLimiter(
// TestSyncBatchType/DerivationWithFlakyL1RPC does *a lot* of quick retries
// TestL2BatcherBatchType/ExtendedTimeWithoutL1Batches as well.
Rate: rate.Limit(100_000),
Burst: 100_000,
OnLimited: func() {
rate.Limit(100_000),
100_000,
func() {
log.Warn("Hitting events rate-limit. An events code-path may be hot-looping.")
t.Fatal("Tests must not hot-loop events")
},
}
)

var interopSys interop.SubSystem
if cfg.InteropTime != nil {
Expand Down
4 changes: 2 additions & 2 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (n *OpNode) initEventSystem() {
executor := event.NewGlobalSynchronous(n.resourcesCtx)
sys := event.NewSystem(n.log, executor)
sys.AddTracer(event.NewMetricsTracer(n.metrics))
sys.Register("node", event.DeriverFunc(n.onEvent), event.DefaultRegisterOpts())
sys.Register("node", event.DeriverFunc(n.onEvent))
n.eventSys = sys
n.eventDrain = executor
}
Expand Down Expand Up @@ -411,7 +411,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
managedMode = ok
}
n.interopSys = sys
n.eventSys.Register("interop", n.interopSys, event.DefaultRegisterOpts())
n.eventSys.Register("interop", n.interopSys)
}

var sequencerConductor conductor.SequencerConductor = &conductor.NoOpConductor{}
Expand Down
30 changes: 14 additions & 16 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,41 +178,39 @@ func NewDriver(
) *Driver {
driverCtx, driverCancel := context.WithCancel(context.Background())

opts := event.DefaultRegisterOpts()

statusTracker := status.NewStatusTracker(log, metrics)
sys.Register("status", statusTracker, opts)
sys.Register("status", statusTracker)

l1Tracker := status.NewL1Tracker(l1)
sys.Register("l1-blocks", l1Tracker, opts)
sys.Register("l1-blocks", l1Tracker)

l1 = metered.NewMeteredL1Fetcher(l1Tracker, metrics)
verifConfDepth := confdepth.NewConfDepth(driverCfg.VerifierConfDepth, statusTracker.L1Head, l1)

ec := engine.NewEngineController(l2, log, metrics, cfg, syncCfg,
sys.Register("engine-controller", nil, opts))
sys.Register("engine-controller", nil))

sys.Register("engine-reset",
engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg), opts)
engine.NewEngineResetDeriver(driverCtx, log, cfg, l1, l2, syncCfg))

clSync := clsync.NewCLSync(log, cfg, metrics) // alt-sync still uses cl-sync state to determine what to sync to
sys.Register("cl-sync", clSync, opts)
sys.Register("cl-sync", clSync)

var finalizer Finalizer
if cfg.AltDAEnabled() {
finalizer = finality.NewAltDAFinalizer(driverCtx, log, cfg, l1, altDA)
} else {
finalizer = finality.NewFinalizer(driverCtx, log, cfg, l1)
}
sys.Register("finalizer", finalizer, opts)
sys.Register("finalizer", finalizer)

sys.Register("attributes-handler",
attributes.NewAttributesHandler(log, cfg, driverCtx, l2), opts)
attributes.NewAttributesHandler(log, cfg, driverCtx, l2))

derivationPipeline := derive.NewDerivationPipeline(log, cfg, verifConfDepth, l1Blobs, altDA, l2, metrics, managedMode)

sys.Register("pipeline",
derive.NewPipelineDeriver(driverCtx, derivationPipeline), opts)
derive.NewPipelineDeriver(driverCtx, derivationPipeline))

syncDeriver := &SyncDeriver{
Derivation: derivationPipeline,
Expand All @@ -228,28 +226,28 @@ func NewDriver(
Drain: drain.Drain,
ManagedMode: managedMode,
}
sys.Register("sync", syncDeriver, opts)
sys.Register("sync", syncDeriver)

sys.Register("engine", engine.NewEngDeriver(log, driverCtx, cfg, metrics, ec), opts)
sys.Register("engine", engine.NewEngDeriver(log, driverCtx, cfg, metrics, ec))

schedDeriv := NewStepSchedulingDeriver(log)
sys.Register("step-scheduler", schedDeriv, opts)
sys.Register("step-scheduler", schedDeriv)

var sequencer sequencing.SequencerIface
if driverCfg.SequencerEnabled {
asyncGossiper := async.NewAsyncGossiper(driverCtx, network, log, metrics)
attrBuilder := derive.NewFetchingAttributesBuilder(cfg, l1, l2)
sequencerConfDepth := confdepth.NewConfDepth(driverCfg.SequencerConfDepth, statusTracker.L1Head, l1)
findL1Origin := sequencing.NewL1OriginSelector(driverCtx, log, cfg, sequencerConfDepth)
sys.Register("origin-selector", findL1Origin, opts)
sys.Register("origin-selector", findL1Origin)
sequencer = sequencing.NewSequencer(driverCtx, log, cfg, attrBuilder, findL1Origin,
sequencerStateListener, sequencerConductor, asyncGossiper, metrics)
sys.Register("sequencer", sequencer, opts)
sys.Register("sequencer", sequencer)
} else {
sequencer = sequencing.DisabledSequencer{}
}

driverEmitter := sys.Register("driver", nil, opts)
driverEmitter := sys.Register("driver", nil)
driver := &Driver{
statusTracker: statusTracker,
SyncDeriver: syncDeriver,
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/event/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ func (fn ExecutableFunc) RunEvent(ev AnnotatedEvent) {
}

type Executor interface {
Add(d Executable, opts *ExecutorOpts) (leaveExecutor func())
Add(d Executable, cfg *ExecutorConfig) (leaveExecutor func())
Enqueue(ev AnnotatedEvent) error
}
147 changes: 120 additions & 27 deletions op-node/rollup/event/executor_global.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"slices"
"sort"
"sync"
"sync/atomic"
)
Expand All @@ -13,10 +14,75 @@ import (
// At some point it's better to drop events and warn something is exploding the number of events.
const sanityEventLimit = 10_000

type eventsList struct {
Events []AnnotatedEvent
}

type prioritizedEvents struct {
// keyed by priority. May contain empty lists.
byPriority [priorityCount]*eventsList

// number of events
count uint64

// Note: there is a very limited number of different priorities, that continue to show up over time.
// And events with equal priority should stay FIFO.
// So we don't use a priority-queue, but just statically have a few sub-lists, and never remove keys.
}

// Add enqueues the given event.
func (a *prioritizedEvents) Add(event AnnotatedEvent) {
if !event.EmitPriority.Valid() {
event.EmitPriority = Normal // if the priority is invalid, try to correct it
}
p := a.byPriority[event.EmitPriority-priorityMin]
p.Events = append(p.Events, event)
a.count += 1
}

// Pop returns the highest-priority event, and removes it at the same time.
// Returns a zeroed AnnotatedEvent if there is no event to pop.
func (a *prioritizedEvents) Pop() AnnotatedEvent {
for i := range a.byPriority {
pe := a.byPriority[priorityCount-1-i] // highest priority first
if len(pe.Events) > 0 {
out := pe.Events[0]
pe.Events = pe.Events[1:]
a.count -= 1
return out
}
}
return AnnotatedEvent{}
}

// Peek returns the highest-priority event, without removing it.
// Returns a zeroed AnnotatedEvent if there is no event to peek.
func (a *prioritizedEvents) Peek() AnnotatedEvent {
for i := range a.byPriority {
pe := a.byPriority[priorityCount-1-i] // highest priority first
if len(pe.Events) > 0 {
return pe.Events[0]
}
}
return AnnotatedEvent{}
}

// Count returns the number of currently queued events
func (a *prioritizedEvents) Count() uint64 {
return a.count
}

type GlobalSyncExec struct {
eventsLock sync.Mutex
events []AnnotatedEvent
events prioritizedEvents // protected by eventsLock

// queued is closed and replaced whenever a new item is enqueued.
// This is used to signal to Await callers when there are events.
// It is nil when no reader is awaiting.
// This is protected by eventsLock.
queued chan struct{}

// sorted by descending priority
handles []*globalHandle
handlesLock sync.RWMutex

Expand All @@ -26,15 +92,31 @@ type GlobalSyncExec struct {
var _ Executor = (*GlobalSyncExec)(nil)

func NewGlobalSynchronous(ctx context.Context) *GlobalSyncExec {
return &GlobalSyncExec{ctx: ctx}
var byPriority [priorityCount]*eventsList
for i := range byPriority {
// pre-allocate with some default capacity
byPriority[i] = &eventsList{make([]AnnotatedEvent, 0, 100)}
}
return &GlobalSyncExec{
ctx: ctx,
events: prioritizedEvents{
byPriority: byPriority,
count: 0,
},
queued: nil,
}
}

func (gs *GlobalSyncExec) Add(d Executable, _ *ExecutorOpts) (leaveExecutor func()) {
func (gs *GlobalSyncExec) Add(d Executable, cfg *ExecutorConfig) (leaveExecutor func()) {
gs.handlesLock.Lock()
defer gs.handlesLock.Unlock()
h := &globalHandle{d: d}
h := &globalHandle{d: d, priority: cfg.Priority}
h.g.Store(gs)
gs.handles = append(gs.handles, h)
// sort by descending priority
sort.Slice(gs.handles, func(i, j int) bool {
return gs.handles[i].priority > gs.handles[j].priority
})
return h.leave
}

Expand All @@ -55,24 +137,15 @@ func (gs *GlobalSyncExec) Enqueue(ev AnnotatedEvent) error {
gs.eventsLock.Lock()
defer gs.eventsLock.Unlock()
// sanity limit, never queue too many events
if len(gs.events) >= sanityEventLimit {
return fmt.Errorf("something is very wrong, queued up too many events! Dropping event %q", ev)
if gs.events.Count() >= sanityEventLimit {
return fmt.Errorf("something is very wrong, queued up too many events! Dropping event %q", ev.Event)
}
gs.events = append(gs.events, ev)
return nil
}

func (gs *GlobalSyncExec) pop() AnnotatedEvent {
gs.eventsLock.Lock()
defer gs.eventsLock.Unlock()

if len(gs.events) == 0 {
return AnnotatedEvent{}
gs.events.Add(ev)
if gs.queued != nil {
close(gs.queued) // To everyone waiting so far: let them know we have an event.
gs.queued = nil // To everyone in the future: they will need to Await for a new event again
}

first := gs.events[0]
gs.events = gs.events[1:]
return first
return nil
}

func (gs *GlobalSyncExec) processEvent(ev AnnotatedEvent) {
Expand All @@ -83,12 +156,32 @@ func (gs *GlobalSyncExec) processEvent(ev AnnotatedEvent) {
}
}

// Await returns a channel that is closed if and when event(s) have been queued up.
// This may be used to await when Drain() can be called for event processing.
func (gs *GlobalSyncExec) Await() <-chan struct{} {
gs.eventsLock.Lock()
defer gs.eventsLock.Unlock()
if gs.queued == nil { // If nobody was awaiting already, initialize.
out := make(chan struct{})
// If we already have events, close it immediately.
if gs.events.Peek().Event != nil {
close(out)
// gs.queued is already nil: we want to keep the close signal coupled to the enqueuing of events.
return out
}
gs.queued = out
}
return gs.queued
}

func (gs *GlobalSyncExec) Drain() error {
for {
if gs.ctx.Err() != nil {
return gs.ctx.Err()
}
ev := gs.pop()
gs.eventsLock.Lock()
ev := gs.events.Pop()
gs.eventsLock.Unlock()
if ev.Event == nil {
return nil
}
Expand All @@ -107,17 +200,16 @@ func (gs *GlobalSyncExec) DrainUntil(fn func(ev Event) bool, excl bool) error {
gs.eventsLock.Lock()
defer gs.eventsLock.Unlock()

if len(gs.events) == 0 {
ev = gs.events.Peek()
if ev.Event == nil {
return AnnotatedEvent{}, false, false
}

ev = gs.events[0]
stop := fn(ev.Event)
if excl && stop {
ev = AnnotatedEvent{}
stopExcl = true
} else {
gs.events = gs.events[1:]
gs.events.Pop()
}
if stop {
stopIncl = true
Expand Down Expand Up @@ -145,8 +237,9 @@ func (gs *GlobalSyncExec) DrainUntil(fn func(ev Event) bool, excl bool) error {
}

type globalHandle struct {
g atomic.Pointer[GlobalSyncExec]
d Executable
g atomic.Pointer[GlobalSyncExec]
d Executable
priority Priority
}

func (gh *globalHandle) onEvent(ev AnnotatedEvent) {
Expand Down
Loading