Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions op-node/rollup/driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ type Config struct {
// SequencerMaxSafeLag is the maximum number of L2 blocks for restricting the distance between L2 safe and unsafe.
// Disabled if 0.
SequencerMaxSafeLag uint64 `json:"sequencer_max_safe_lag"`

ParallelEventProcessing bool `json:"parallel_event_processing"`
}
6 changes: 4 additions & 2 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ func NewDriver(

var executor event.Executor
var drain func() error
// This instantiation will be one of more options: soon there will be a parallel events executor
{
if driverCfg.ParallelEventProcessing {
executor = event.NewParallelExec()
drain = func() error { return nil } // no-op
} else {
s := event.NewGlobalSynchronous(driverCtx)
executor = s
drain = s.Drain
Expand Down
112 changes: 112 additions & 0 deletions op-node/rollup/event/executor_parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package event

import (
"context"
"slices"
"sync"
"sync/atomic"
)

type ParallelExec struct {
workers []*worker
mu sync.RWMutex
}

var _ Executor = (*ParallelExec)(nil)

func NewParallelExec() *ParallelExec {
return &ParallelExec{}
}

func (p *ParallelExec) Add(d Executable, opts *ExecutorOpts) (leaveExecutor func()) {
p.mu.Lock()
defer p.mu.Unlock()
w := newWorker(p, d, opts)
p.workers = append(p.workers, w)
return w.leave
}

func (p *ParallelExec) remove(w *worker) {
p.mu.Lock()
defer p.mu.Unlock()
// Linear search to delete is fine,
// since we delete much less frequently than we process events with these.
for i, v := range p.workers {
if v == w {
p.workers = slices.Delete(p.workers, i, i+1)
return
}
}
}

func (p *ParallelExec) Enqueue(ev AnnotatedEvent) error {
p.mu.RLock()
defer p.mu.RUnlock()
for _, w := range p.workers {
w.enqueue(ev) // this will block if its capacity is full, providing back-pressure to the Enqueue caller
}
return nil
}

type worker struct {
// ctx signals when the worker is exiting.
// No additional events will be accepted after cancellation.
ctx context.Context
cancel context.CancelFunc

// closed as channel is closed upon exit of the run loop
closed chan struct{}

// ingress is the buffered channel of events to process
ingress chan AnnotatedEvent

// d is the underlying executable to process events on
d Executable

// p is a reference to the ParallelExec that owns this worker.
// The worker removes itself from this upon leaving.
p atomic.Pointer[ParallelExec]
}

func newWorker(p *ParallelExec, d Executable, opts *ExecutorOpts) *worker {
ctx, cancel := context.WithCancel(context.Background())
w := &worker{
ctx: ctx,
cancel: cancel,
closed: make(chan struct{}),
ingress: make(chan AnnotatedEvent, opts.Capacity),
d: d,
}
w.p.Store(p)
go w.run()
return w
}

func (w *worker) enqueue(ev AnnotatedEvent) {
select {
case <-w.ctx.Done():
case w.ingress <- ev:
}
}

func (w *worker) leave() {
w.cancel()
if old := w.p.Swap(nil); old != nil {
// remove from worker pool
old.remove(w)
}
// wait for run loop to exit
<-w.closed
}

func (w *worker) run() {
for {
select {
case <-w.ctx.Done():
close(w.closed)
return
case ev := <-w.ingress:
w.d.RunEvent(ev)
}
}
}