Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions op-node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Metrics struct {
SequencingErrors *EventMetrics
PublishingErrors *EventMetrics

UnsafePayloadsBufferLen prometheus.Gauge
UnsafePayloadsBufferMemSize prometheus.Gauge

RefsNumber *prometheus.GaugeVec
RefsTime *prometheus.GaugeVec
RefsHash *prometheus.GaugeVec
Expand Down Expand Up @@ -150,6 +153,17 @@ func NewMetrics(procName string) *Metrics {
SequencingErrors: NewEventMetrics(registry, ns, "sequencing_errors", "sequencing errors"),
PublishingErrors: NewEventMetrics(registry, ns, "publishing_errors", "p2p publishing errors"),

UnsafePayloadsBufferLen: promauto.With(registry).NewGauge(prometheus.GaugeOpts{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 🎉

Namespace: ns,
Name: "unsafe_payloads_buffer_len",
Help: "Number of buffered L2 unsafe payloads",
}),
UnsafePayloadsBufferMemSize: promauto.With(registry).NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "unsafe_payloads_buffer_mem_size",
Help: "Total estimated memory size of buffered L2 unsafe payloads",
}),

RefsNumber: promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "refs_number",
Expand Down Expand Up @@ -321,6 +335,12 @@ func (m *Metrics) RecordL2Ref(name string, ref eth.L2BlockRef) {
m.RefsSeqNr.WithLabelValues(name).Set(float64(ref.SequenceNumber))
}

func (m *Metrics) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) {
m.recordRef("l2", "l2_buffer_unsafe", next.Number, 0, next.Hash)
m.UnsafePayloadsBufferLen.Set(float64(length))
m.UnsafePayloadsBufferMemSize.Set(float64(memSize))
}

func (m *Metrics) CountSequencedTxs(count int) {
m.TransactionsSequencedTotal.Add(float64(count))
}
Expand Down
48 changes: 30 additions & 18 deletions op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type Engine interface {
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
}

// Max number of unsafe payloads that may be queued up for execution
const maxUnsafePayloads = 50
// Max memory used for buffering unsafe payloads
const maxUnsafePayloadsMemory = 500 * 1024 * 1024

// finalityLookback defines the amount of L1<>L2 relations to track for finalization purposes, one per L1 block.
//
Expand Down Expand Up @@ -67,7 +67,7 @@ type EngineQueue struct {
progress Progress

safeAttributes []*eth.PayloadAttributes
unsafePayloads []*eth.ExecutionPayload
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps

// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
finalityData []FinalityData
Expand All @@ -87,6 +87,10 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
engine: engine,
metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback),
unsafePayloads: PayloadsQueue{
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
},
}
}

Expand All @@ -100,12 +104,17 @@ func (eq *EngineQueue) SetUnsafeHead(head eth.L2BlockRef) {
}

func (eq *EngineQueue) AddUnsafePayload(payload *eth.ExecutionPayload) {
if len(eq.unsafePayloads) > maxUnsafePayloads {
eq.log.Debug("Refusing to add unsafe payload", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber))
return // don't DoS ourselves by buffering too many unsafe payloads
if payload == nil {
eq.log.Warn("cannot add nil unsafe payload")
return
}
eq.log.Trace("Adding unsafe payload", "hash", payload.BlockHash, "number", uint64(payload.BlockNumber), "timestamp", uint64(payload.Timestamp))
eq.unsafePayloads = append(eq.unsafePayloads, payload)
if err := eq.unsafePayloads.Push(payload); err != nil {
eq.log.Warn("Could not add unsafe payload", "id", payload.ID(), "timestamp", uint64(payload.Timestamp), "err", err)
return
}
p := eq.unsafePayloads.Peek()
eq.metrics.RecordUnsafePayloadsBuffer(uint64(eq.unsafePayloads.Len()), eq.unsafePayloads.MemSize(), p.ID())
eq.log.Trace("Next unsafe payload to process", "next", p.ID(), "timestamp", uint64(p.Timestamp))
}

func (eq *EngineQueue) AddSafeAttributes(attributes *eth.PayloadAttributes) {
Expand Down Expand Up @@ -144,7 +153,7 @@ func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
if len(eq.unsafePayloads) > 0 {
if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx)
}
return io.EOF
Expand Down Expand Up @@ -200,25 +209,27 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
}

func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
first := eq.unsafePayloads[0]
first := eq.unsafePayloads.Peek()

if uint64(first.BlockNumber) <= eq.safeHead.Number {
eq.log.Info("skipping unsafe payload, since it is older than safe head", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads = eq.unsafePayloads[1:]
eq.unsafePayloads.Pop()
return nil
}

// TODO: once we support snap-sync we can remove this condition, and handle the "SYNCING" status of the execution engine.
if first.ParentHash != eq.unsafeHead.Hash {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads = eq.unsafePayloads[1:]
return nil
if uint64(first.BlockNumber) == eq.unsafeHead.Number+1 {
eq.log.Info("skipping unsafe payload, since it does not build onto the existing unsafe chain", "safe", eq.safeHead.ID(), "unsafe", first.ID(), "payload", first.ID())
eq.unsafePayloads.Pop()
}
return io.EOF // time to go to next stage if we cannot process the first unsafe payload
}

ref, err := PayloadToBlockRef(first, &eq.cfg.Genesis)
if err != nil {
eq.log.Error("failed to decode L2 block ref from payload", "err", err)
eq.unsafePayloads = eq.unsafePayloads[1:]
eq.unsafePayloads.Pop()
return nil
}

Expand Down Expand Up @@ -246,7 +257,7 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
}
}
if fcRes.PayloadStatus.Status != eth.ExecutionValid {
eq.unsafePayloads = eq.unsafePayloads[1:]
eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
Expand All @@ -255,12 +266,12 @@ func (eq *EngineQueue) tryNextUnsafePayload(ctx context.Context) error {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %v", err))
}
if status.Status != eth.ExecutionValid {
eq.unsafePayloads = eq.unsafePayloads[1:]
eq.unsafePayloads.Pop()
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %v",
first.ID(), first.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
eq.unsafeHead = ref
eq.unsafePayloads = eq.unsafePayloads[1:]
eq.unsafePayloads.Pop()
eq.metrics.RecordL2Ref("l2_unsafe", ref)
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
eq.logSyncProgress("unsafe payload from sequencer")
Expand Down Expand Up @@ -399,6 +410,7 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
eq.safeHead = safe
eq.finalized = finalized
eq.finalityData = eq.finalityData[:0]
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{
Origin: l1Origin,
Closed: false,
Expand Down
136 changes: 136 additions & 0 deletions op-node/rollup/derive/payloads_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package derive

import (
"container/heap"
"errors"
"fmt"

"github.com/ethereum-optimism/optimism/op-node/eth"
)

type payloadAndSize struct {
payload *eth.ExecutionPayload
size uint64
}

// payloadsByNumber buffers payloads ordered by block number.
// The lowest block number is peeked/popped first.
//
// payloadsByNumber implements heap.Interface: use the heap package methods to modify the queue.
type payloadsByNumber []payloadAndSize

var _ heap.Interface = (*payloadsByNumber)(nil)

func (pq payloadsByNumber) Len() int { return len(pq) }

func (pq payloadsByNumber) Less(i, j int) bool {
return pq[i].payload.BlockNumber < pq[j].payload.BlockNumber
}

// Swap is a heap.Interface method. Do not use this method directly.
func (pq payloadsByNumber) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

// Push is a heap.Interface method. Do not use this method directly, use heap.Push instead.
func (pq *payloadsByNumber) Push(x any) {
*pq = append(*pq, x.(payloadAndSize))
}

// Pop is a heap.Interface method. Do not use this method directly, use heap.Pop instead.
func (pq *payloadsByNumber) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = payloadAndSize{} // avoid memory leak
*pq = old[0 : n-1]
return item
}

const (
// ~580 bytes per payload, with some margin for overhead
payloadMemFixedCost uint64 = 600
// 24 bytes per tx overhead (size of slice header in memory)
payloadTxMemOverhead uint64 = 24
)

func payloadMemSize(p *eth.ExecutionPayload) uint64 {
out := payloadMemFixedCost
if p == nil {
return out
}
// 24 byte overhead per tx
for _, tx := range p.Transactions {
out += uint64(len(tx)) + payloadTxMemOverhead
}
return out
}

// PayloadsQueue buffers payloads by block number.
// PayloadsQueue is not safe to use concurrently.
// PayloadsQueue exposes typed Push/Peek/Pop methods to use the queue,
// without the need to use heap.Push/heap.Pop as caller.
// PayloadsQueue maintains a MaxSize by counting and tracking sizes of added eth.ExecutionPayload entries.
// When the size grows too large, the first (lowest block-number) payload is removed from the queue.
// PayloadsQueue allows entries with same block number, or even full duplicates.
type PayloadsQueue struct {
pq payloadsByNumber
currentSize uint64
MaxSize uint64
SizeFn func(p *eth.ExecutionPayload) uint64
}

func (upq *PayloadsQueue) Len() int {
return len(upq.pq)
}

func (upq *PayloadsQueue) MemSize() uint64 {
return upq.currentSize
}

// Push adds the payload to the queue, in O(log(N)).
//
// Don't DoS ourselves by buffering too many unsafe payloads.
// If the queue size after pushing exceed the allowed memory, then pop payloads until memory is not exceeding anymore.
//
// We prefer higher block numbers over lower block numbers, since lower block numbers are more likely to be conflicts and/or read from L1 sooner.
// The higher payload block numbers can be preserved, and once L1 contents meets these, they can all be processed in order.
func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
if p == nil {
return errors.New("cannot add nil payload")
}
size := upq.SizeFn(p)
if size > upq.MaxSize {
return fmt.Errorf("cannot add payload %s, payload mem size %d is larger than max queue size %d", p.ID(), size, upq.MaxSize)
}
heap.Push(&upq.pq, payloadAndSize{
payload: p,
size: size,
})
upq.currentSize += size
for upq.currentSize > upq.MaxSize {
upq.Pop()
}
return nil
}

// Peek retrieves the payload with the lowest block number from the queue in O(1), or nil if the queue is empty.
func (upq *PayloadsQueue) Peek() *eth.ExecutionPayload {
if len(upq.pq) == 0 {
return nil
}
// peek into the priority queue, the first element is the highest priority (lowest block number).
// This does not apply to other elements, those are structured like a heap.
return upq.pq[0].payload
}

// Pop removes the payload with the lowest block number from the queue in O(log(N)),
// and may return nil if the queue is empty.
func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload {
if len(upq.pq) == 0 {
return nil
}
ps := heap.Pop(&upq.pq).(payloadAndSize)
Comment thread
mslipper marked this conversation as resolved.
Outdated
upq.currentSize -= ps.size
return ps.payload
}
Loading