diff --git a/op-supervisor/supervisor/backend/db/entrydb/db.go b/op-supervisor/supervisor/backend/db/entrydb/db.go new file mode 100644 index 0000000000000..1247597aaa0e3 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/entrydb/db.go @@ -0,0 +1,305 @@ +package entrydb + +import ( + "errors" + "fmt" + "io" + "sync" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" +) + +type EntryStore[T EntryType] interface { + Size() int64 + LastEntryIdx() EntryIdx + Read(idx EntryIdx) (Entry[T], error) + Append(entries ...Entry[T]) error + Truncate(idx EntryIdx) error + Close() error +} + +type Metrics interface { + RecordDBEntryCount(count int64) + RecordDBSearchEntriesRead(count int64) +} + +type IndexKey interface { + comparable + String() string +} + +type IndexState[T EntryType, K IndexKey] interface { + NextIndex() EntryIdx + Key() (k K, ok bool) + Incomplete() bool + ApplyEntry(entry Entry[T]) error + + Out() []Entry[T] + ClearOut() +} + +type IndexDriver[T EntryType, K IndexKey, S IndexState[T, K]] interface { + // Less compares the primary key. To allow binary search over the index. + Less(a, b K) bool + // Copy copies an index state. To allow state-snapshots without copy, for conditional iteration. + Copy(src, dst S) + // NewState creates an empty state, with the given index as next target input. + NewState(nextIndex EntryIdx) S + // KeyFromCheckpoint is called to turn an entry at a SearchCheckpointFrequency interval into a primary key. + KeyFromCheckpoint(e Entry[T]) (K, error) + // ValidEnd inspects if we can truncate the DB and leave the given entry as last entry. + ValidEnd(e Entry[T]) bool + // SearchCheckpointFrequency returns a constant, the interval of how far apart the guaranteed checkpoint entries are. + SearchCheckpointFrequency() uint64 +} + +type DB[T EntryType, K IndexKey, S IndexState[T, K], D IndexDriver[T, K, S]] struct { + log log.Logger + m Metrics + store EntryStore[T] + rwLock sync.RWMutex + + HeadState S + + driver D +} + +func (db *DB[T, K, S, D]) LastEntryIdx() EntryIdx { + return db.store.LastEntryIdx() +} + +func (db *DB[T, K, S, D]) Init(trimToLastSealed bool) error { + defer db.updateEntryCountMetric() // Always update the entry count metric after init completes + if trimToLastSealed { + if err := db.trimToLastSealed(); err != nil { + return fmt.Errorf("failed to trim invalid trailing entries: %w", err) + } + } + if db.LastEntryIdx() < 0 { + // Database is empty. + // Make a state that is ready to apply the genesis block on top of as first entry. + // This will infer into a checkpoint (half of the block seal here) + // and is then followed up with canonical-hash entry of genesis. + db.HeadState = db.driver.NewState(0) + return nil + } + // start at the last checkpoint, + // and then apply any remaining changes on top, to hydrate the state. + searchCheckpointFrequency := EntryIdx(db.driver.SearchCheckpointFrequency()) + lastCheckpoint := (db.LastEntryIdx() / searchCheckpointFrequency) * searchCheckpointFrequency + i := db.newIterator(lastCheckpoint) + if err := i.End(); err != nil { + return fmt.Errorf("failed to init from remaining trailing data: %w", err) + } + db.HeadState = i.current + return nil +} + +func (db *DB[T, K, S, D]) trimToLastSealed() error { + i := db.LastEntryIdx() + for ; i >= 0; i-- { + entry, err := db.store.Read(i) + if err != nil { + return fmt.Errorf("failed to read %v to check for trailing entries: %w", i, err) + } + if db.driver.ValidEnd(entry) { + break + } + } + if i < db.LastEntryIdx() { + db.log.Warn("Truncating unexpected trailing entries", "prev", db.LastEntryIdx(), "new", i) + // trim such that the last entry is the canonical-hash we identified + return db.store.Truncate(i) + } + return nil +} + +func (db *DB[T, K, S, D]) updateEntryCountMetric() { + db.m.RecordDBEntryCount(db.store.Size()) +} + +// NewIteratorFor returns an iterator that will have traversed everything that was returned as true by the given lessFn. +// It may return an ErrSkipped if some data is known, but no data is known to be less than the requested key. +// It may return ErrFuture if no data is known at all. +func (db *DB[T, K, S, D]) NewIteratorFor(lessFn func(key K) bool) (Iterator[T, K, S], error) { + return db.newIteratorFor(lessFn) +} + +func (db *DB[T, K, S, D]) newIteratorExactlyAt(at K) (*iterator[T, K, S, D], error) { + iter, err := db.newIteratorFor(func(key K) bool { + return db.driver.Less(key, at) || key == at + }) + if err != nil { + return nil, err + } + k, ok := iter.State().Key() + if !ok { // we should have stopped at complete data + return nil, ErrDataCorruption + } + if k != at { // we found data less than the key, but not exactly equal to it + return nil, ErrFuture + } + return iter, nil +} + +func (db *DB[T, K, S, D]) newIteratorFor(lessFn func(key K) bool) (*iterator[T, K, S, D], error) { + // Find a checkpoint before (not at) the requested key, + // so we can read the value data corresponding to the key into the iterator state. + searchCheckpointIndex, err := db.searchCheckpoint(lessFn) + if errors.Is(err, io.EOF) { + // Did not find a checkpoint to start reading from so the log cannot be present. + return nil, ErrFuture + } else if err != nil { + return nil, err + } + // The iterator did not consume the checkpoint yet, it's positioned right at it. + // So we can call NextBlock() and get the checkpoint itself as first entry. + iter := db.newIterator(searchCheckpointIndex) + if err != nil { + return nil, err + } + defer func() { + db.m.RecordDBSearchEntriesRead(iter.entriesRead) + }() + err = iter.TraverseConditional(func(state S) error { + at, ok := state.Key() + if !ok { + return errors.New("expected complete state") + } + if !lessFn(at) { + return ErrStop + } + return nil + }) + if err == nil { + panic("expected any error, good or bad, on stop") + } + if errors.Is(err, ErrStop) { + err = nil + } + if err != nil { + return nil, err + } + return iter, nil +} + +// newIterator creates an iterator at the given index. +// None of the iterator attributes will be ready for reads, +// but the entry at the given index will be first read when using the iterator. +func (db *DB[T, K, S, D]) newIterator(index EntryIdx) *iterator[T, K, S, D] { + return &iterator[T, K, S, D]{ + db: db, + current: db.driver.NewState(index), + } +} + +// searchCheckpoint performs a binary search of the searchCheckpoint entries +// to find the closest one with an equal or lower derivedFrom block number and equal or lower derived block number. +// Returns the index of the searchCheckpoint to begin reading from or an error. +func (db *DB[T, K, S, D]) searchCheckpoint(lessFn func(key K) bool) (EntryIdx, error) { + if db.HeadState.NextIndex() == 0 { + return 0, ErrFuture // empty DB, everything is in the future + } + searchCheckpointFrequency := EntryIdx(db.driver.SearchCheckpointFrequency()) + n := (db.LastEntryIdx() / searchCheckpointFrequency) + 1 + // Define: x is the array of known checkpoints + // Invariant: x[i] <= target, x[j] > target. + i, j := EntryIdx(0), n + for i+1 < j { // i is inclusive, j is exclusive. + // Get the checkpoint exactly in-between, + // bias towards a higher value if an even number of checkpoints. + // E.g. i=3 and j=4 would not run, since i + 1 < j + // E.g. i=3 and j=5 leaves checkpoints 3, 4, and we pick 4 as pivot + // E.g. i=3 and j=6 leaves checkpoints 3, 4, 5, and we pick 4 as pivot + // + // The following holds: i ≤ h < j + h := EntryIdx((uint64(i) + uint64(j)) >> 1) + checkpoint, err := db.readSearchCheckpoint(h * searchCheckpointFrequency) + if err != nil { + return 0, fmt.Errorf("failed to read entry %v: %w", h, err) + } + if lessFn(checkpoint) { + i = h + } else { + j = h + } + } + if i+1 != j { + panic("expected to have 1 checkpoint left") + } + result := i * searchCheckpointFrequency + checkpoint, err := db.readSearchCheckpoint(result) + if err != nil { + return 0, fmt.Errorf("failed to read final search checkpoint result: %w", err) + } + if !lessFn(checkpoint) { + return 0, fmt.Errorf("missing data, earliest search checkpoint is %s, but is not before target: %w", checkpoint, ErrSkipped) + } + return result, nil +} + +// Rewind the database to remove any blocks after headBlockNum +// The block at headBlockNum itself is not removed. +func (db *DB[T, K, S, D]) Rewind(newHead K) error { + db.rwLock.Lock() + defer db.rwLock.Unlock() + // Even if the last fully-processed block matches headBlockNum, + // we might still have trailing log events to get rid of. + iter, err := db.newIteratorExactlyAt(newHead) + if err != nil { + return err + } + // Truncate to contain idx+1 entries, since indices are 0 based, + // this deletes everything after idx + if err := db.store.Truncate(iter.NextIndex()); err != nil { + return fmt.Errorf("failed to truncate to %s: %w", newHead, err) + } + // Use db.init() to find the state for the new latest entry + if err := db.Init(true); err != nil { + return fmt.Errorf("failed to find new last entry context: %w", err) + } + return nil +} + +// debug util to log the last 10 entries of the chain +func (db *DB[T, K, S, D]) debugTip() { + for x := 0; x < 10; x++ { + index := db.LastEntryIdx() - EntryIdx(x) + if index < 0 { + continue + } + e, err := db.store.Read(index) + if err == nil { + db.log.Debug("tip", "index", index, "type", e.Type()) + } + } +} + +func (db *DB[T, K, S, D]) Flush() error { + out := db.HeadState.Out() + nextIndex := db.HeadState.NextIndex() + for i, e := range out { + db.log.Trace("appending entry", "type", e.Type(), "entry", hexutil.Bytes(e[:]), + "next", int(nextIndex)-len(out)+i) + } + if err := db.store.Append(out...); err != nil { + return fmt.Errorf("failed to append entries: %w", err) + } + db.HeadState.ClearOut() + db.updateEntryCountMetric() + return nil +} + +func (db *DB[T, K, S, D]) readSearchCheckpoint(entryIdx EntryIdx) (K, error) { + data, err := db.store.Read(entryIdx) + if err != nil { + var k K + return k, fmt.Errorf("failed to read entry %v: %w", entryIdx, err) + } + return db.driver.KeyFromCheckpoint(data) +} + +func (db *DB[T, K, S, D]) Close() error { + return db.store.Close() +} diff --git a/op-supervisor/supervisor/backend/db/entrydb/iterator.go b/op-supervisor/supervisor/backend/db/entrydb/iterator.go new file mode 100644 index 0000000000000..fef8deb48adb9 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/entrydb/iterator.go @@ -0,0 +1,76 @@ +package entrydb + +import ( + "errors" + "fmt" + "io" +) + +type Iterator[T EntryType, K IndexKey, S IndexState[T, K]] interface { + TraverseConditional(fn func(state S) error) error + State() S +} + +type iterator[T EntryType, K IndexKey, S IndexState[T, K], D IndexDriver[T, K, S]] struct { + db *DB[T, K, S, D] + current S + entriesRead int64 +} + +func (i *iterator[T, K, S, D]) State() S { + return i.current +} + +// End traverses the iterator to the end of the DB. +// It does not return io.EOF or ErrFuture. +func (i *iterator[T, K, S, D]) End() error { + for { + err := i.next() + if errors.Is(err, ErrFuture) { + return nil + } else if err != nil { + return err + } + } +} + +func (i *iterator[T, K, S, D]) TraverseConditional(fn func(state S) error) error { + snapshot := i.db.driver.NewState(0) + for { + i.db.driver.Copy(i.current, snapshot) // copy the iterator state, without allocating a new snapshot each iteration + err := i.next() + if err != nil { + i.current = snapshot + return err + } + if i.current.Incomplete() { // skip intermediate states + continue + } + if err := fn(i.current); err != nil { + i.current = snapshot + return err + } + } +} + +// Read and apply the next entry. +func (i *iterator[T, K, S, D]) next() error { + index := i.current.NextIndex() + entry, err := i.db.store.Read(index) + if err != nil { + if errors.Is(err, io.EOF) { + return ErrFuture + } + return fmt.Errorf("failed to read entry %d: %w", index, err) + } + if err := i.current.ApplyEntry(entry); err != nil { + return fmt.Errorf("failed to process entry %d to iterator state: %w", index, err) + } + + i.entriesRead++ + return nil +} + +func (i *iterator[T, K, S, D]) NextIndex() EntryIdx { + return i.current.NextIndex() +} diff --git a/op-supervisor/supervisor/backend/db/fromda/db.go b/op-supervisor/supervisor/backend/db/fromda/db.go new file mode 100644 index 0000000000000..772021eb69ba8 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/fromda/db.go @@ -0,0 +1,127 @@ +package fromda + +import ( + "errors" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +type DB struct { + log log.Logger + inner *entrydb.DB[EntryType, Key, *state, driver] + rwLock sync.RWMutex +} + +func (db *DB) AddDerived(derivedFrom eth.BlockRef, derived eth.BlockRef) error { + db.rwLock.Lock() + defer db.rwLock.Unlock() + + if err := db.inner.HeadState.AddDerived(derivedFrom, derived); err != nil { + return fmt.Errorf("failed to add derived block derivedFrom: %s, derived: %s, err: %w", derivedFrom, derived, err) + } + db.log.Trace("Added derived block", "derivedFrom", derivedFrom, "derived", derived) + return db.inner.Flush() +} + +func (db *DB) Rewind(derivedFrom uint64) error { + return db.inner.Rewind(Key{DerivedFrom: derivedFrom, Derived: 0}) +} + +// LatestDerivedFrom returns the last known primary key (the L1 block) +func (db *DB) LatestDerivedFrom() (ref types.BlockSeal, ok bool) { + db.rwLock.Lock() + defer db.rwLock.Unlock() + state := db.inner.HeadState + if state.Incomplete() { + return types.BlockSeal{}, false + } + return state.derived, true +} + +// LatestDerived returns the last known value (the L2 block that was derived) +func (db *DB) LatestDerived() (ref types.BlockSeal, ok bool) { + db.rwLock.Lock() + defer db.rwLock.Unlock() + state := db.inner.HeadState + if state.Incomplete() { + return types.BlockSeal{}, false + } + return state.derived, true +} + +// LastDerivedAt returns the last L2 block derived from the given L1 block +func (db *DB) LastDerivedAt(derivedFrom eth.BlockID) (types.BlockSeal, error) { + db.rwLock.Lock() + defer db.rwLock.Unlock() + iter, err := db.inner.NewIteratorFor(func(key Key) bool { + return key.DerivedFrom < derivedFrom.Number + }) + if err != nil { + return types.BlockSeal{}, err + } + if errors.Is(err, entrydb.ErrStop) { + err = nil + } + if err != nil { + return types.BlockSeal{}, err + } + state := iter.State() + if state.Incomplete() { + return types.BlockSeal{}, entrydb.ErrDataCorruption + } + if state.derivedFrom.ID() != derivedFrom { // did not reach derived From yet + return types.BlockSeal{}, entrydb.ErrFuture + } + return state.derived, nil +} + +// TODO do we want to expose an iterator interface? +//type Iterator interface { +// TraverseConditional(fn func(*state) error) error +//} +// +//func (db *DB) IteratorStartingFor() (Iterator, error) { +// return db.inner.NewIteratorFor() +//} + +// DerivedFrom determines where a L2 block was derived from. +func (db *DB) DerivedFrom(derived eth.BlockID) (types.BlockSeal, error) { + // search to the last point before the data + iter, err := db.inner.NewIteratorFor(func(key Key) bool { + return key.Derived < derived.Number + }) + if err != nil { + return types.BlockSeal{}, err + } + // go forward and read the data + err = iter.TraverseConditional(func(state *state) error { + v, ok := state.Derived() + if !ok { + return nil + } + if v.Number > derived.Number { + return entrydb.ErrStop + } + return nil + }) + if errors.Is(err, entrydb.ErrStop) { + err = nil + } + if err != nil { + return types.BlockSeal{}, err + } + state := iter.State() + if state.Incomplete() { + return types.BlockSeal{}, entrydb.ErrDataCorruption + } + if state.derived.ID() != derived { + return types.BlockSeal{}, entrydb.ErrConflict + } + return state.derivedFrom, nil +} diff --git a/op-supervisor/supervisor/backend/db/fromda/driver.go b/op-supervisor/supervisor/backend/db/fromda/driver.go new file mode 100644 index 0000000000000..58b7c6beb76c3 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/fromda/driver.go @@ -0,0 +1,53 @@ +package fromda + +import ( + "errors" + + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +type driver struct { +} + +func (d driver) Less(a, b Key) bool { + return a.DerivedFrom < b.DerivedFrom || (a.DerivedFrom == b.DerivedFrom && a.Derived < b.Derived) +} + +func (d driver) Copy(src, dst *state) { + *dst = *src // shallow copy is enough + dst.ClearOut() // don't retain output (there shouldn't be any) +} + +func (d driver) NewState(nextIndex entrydb.EntryIdx) *state { + return &state{ + nextEntryIndex: nextIndex, + derivedFrom: types.BlockSeal{}, + derivedUntil: 0, + derivedSince: 0, + derived: types.BlockSeal{}, + need: FlagSearchCheckpoint, + out: nil, + } +} + +func (d driver) KeyFromCheckpoint(e Entry) (Key, error) { + if e.Type() != TypeSearchCheckpoint { + return Key{}, errors.New("expected search checkpoint") + } + p, err := newSearchCheckpointFromEntry(e) + if err != nil { + return Key{}, err + } + return Key{DerivedFrom: p.blockNum, Derived: p.derivedUntil + uint64(p.derivedSince)}, nil +} + +func (d driver) ValidEnd(e Entry) bool { + return e.Type() == TypeCanonicalHash +} + +func (d driver) SearchCheckpointFrequency() uint64 { + return searchCheckpointFrequency +} + +var _ entrydb.IndexDriver[EntryType, Key, *state] = (*driver)(nil) diff --git a/op-supervisor/supervisor/backend/db/fromda/entries.go b/op-supervisor/supervisor/backend/db/fromda/entries.go new file mode 100644 index 0000000000000..70ba5db2394b2 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/fromda/entries.go @@ -0,0 +1,124 @@ +package fromda + +import ( + "encoding/binary" + "fmt" + + "github.com/ethereum/go-ethereum/common" + + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" +) + +// searchCheckpoint is both a checkpoint for searching, as well as a checkpoint for sealing blocks. +type searchCheckpoint struct { + // number of the L1 block that we derived from + blockNum uint64 + // timestamp of the L1 block that was derived from + timestamp uint64 + // number of L2 blocks that were derived after this checkpoint + derivedSince uint32 + // L2 block that we last derived until starting deriving from this L1 block + derivedUntil uint64 +} + +func newSearchCheckpoint(blockNum uint64, timestamp uint64, blocksSince uint32, derivedUntil uint64) searchCheckpoint { + return searchCheckpoint{ + blockNum: blockNum, + timestamp: timestamp, + derivedSince: blocksSince, + derivedUntil: derivedUntil, + } +} + +func newSearchCheckpointFromEntry(data Entry) (searchCheckpoint, error) { + if data.Type() != TypeSearchCheckpoint { + return searchCheckpoint{}, fmt.Errorf("%w: attempting to decode search checkpoint but was type %s", entrydb.ErrDataCorruption, data.Type()) + } + return searchCheckpoint{ + blockNum: binary.LittleEndian.Uint64(data[1:9]), + timestamp: binary.LittleEndian.Uint64(data[9:17]), + derivedSince: binary.LittleEndian.Uint32(data[17:21]), + derivedUntil: binary.LittleEndian.Uint64(data[21:29]), + }, nil +} + +// encode creates a checkpoint entry +// type 0: "search checkpoint" = 29 bytes +func (s searchCheckpoint) encode() Entry { + var data Entry + data[0] = uint8(TypeSearchCheckpoint) + binary.LittleEndian.PutUint64(data[1:9], s.blockNum) + binary.LittleEndian.PutUint64(data[9:17], s.timestamp) + binary.LittleEndian.PutUint32(data[17:21], s.derivedSince) + binary.LittleEndian.PutUint64(data[21:29], s.derivedUntil) + return data +} + +type canonicalHash struct { + hash common.Hash +} + +func newCanonicalHash(hash common.Hash) canonicalHash { + return canonicalHash{hash: hash} +} + +func newCanonicalHashFromEntry(data Entry) (canonicalHash, error) { + if data.Type() != TypeCanonicalHash { + return canonicalHash{}, fmt.Errorf("%w: attempting to decode canonical hash but was type %s", entrydb.ErrDataCorruption, data.Type()) + } + return newCanonicalHash(common.Hash(data[1:33])), nil +} + +func (c canonicalHash) encode() Entry { + var entry Entry + entry[0] = uint8(TypeCanonicalHash) + copy(entry[1:33], c.hash[:]) + return entry +} + +type derivedLink struct { + number uint64 + timestamp uint64 + // May contain additional flag value in the future +} + +func newDerivedLink(num uint64, timestamp uint64) derivedLink { + return derivedLink{number: num, timestamp: timestamp} +} + +func newDerivedLinkFromEntry(data Entry) (derivedLink, error) { + if data.Type() != TypeDerivedLink { + return derivedLink{}, fmt.Errorf("%w: attempting to decode derived link but was type %s", entrydb.ErrDataCorruption, data.Type()) + } + return newDerivedLink(binary.LittleEndian.Uint64(data[1:9]), binary.LittleEndian.Uint64(data[9:17])), nil +} + +func (d derivedLink) encode() Entry { + var entry Entry + entry[0] = uint8(TypeDerivedLink) + binary.LittleEndian.PutUint64(entry[1:9], d.number) + binary.LittleEndian.PutUint64(entry[9:17], d.timestamp) + return entry +} + +type derivedCheck struct { + hash common.Hash +} + +func newDerivedCheck(hash common.Hash) derivedCheck { + return derivedCheck{hash: hash} +} + +func newDerivedCheckFromEntry(data Entry) (derivedCheck, error) { + if data.Type() != TypeDerivedCheck { + return derivedCheck{}, fmt.Errorf("%w: attempting to decode derived check but was type %s", entrydb.ErrDataCorruption, data.Type()) + } + return newDerivedCheck(common.Hash(data[1:33])), nil +} + +func (d derivedCheck) encode() Entry { + var entry Entry + entry[0] = uint8(TypeDerivedCheck) + copy(entry[1:33], d.hash[:]) + return entry +} diff --git a/op-supervisor/supervisor/backend/db/fromda/entry.go b/op-supervisor/supervisor/backend/db/fromda/entry.go new file mode 100644 index 0000000000000..70caf7d536cc9 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/fromda/entry.go @@ -0,0 +1,75 @@ +package fromda + +import ( + "fmt" + "strings" + + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" +) + +const searchCheckpointFrequency = 256 + +type EntryObj interface { + encode() Entry +} + +type Entry = entrydb.Entry[EntryType] + +type EntryTypeFlag uint8 + +const ( + FlagSearchCheckpoint EntryTypeFlag = 1 << TypeSearchCheckpoint + FlagCanonicalHash EntryTypeFlag = 1 << TypeCanonicalHash + FlagDerivedLink EntryTypeFlag = 1 << TypeDerivedLink + FlagDerivedCheck EntryTypeFlag = 1 << TypeDerivedCheck + FlagPadding EntryTypeFlag = 1 << TypePadding +) + +func (x EntryTypeFlag) String() string { + var out []string + for i := EntryTypeFlag(1); i != 0; i <<= 1 { // iterate to bitmask + if x.Any(i) { + out = append(out, i.String()) + } + } + return strings.Join(out, "|") +} + +func (x EntryTypeFlag) Any(v EntryTypeFlag) bool { + return x&v != 0 +} + +func (x *EntryTypeFlag) Add(v EntryTypeFlag) { + *x = *x | v +} + +func (x *EntryTypeFlag) Remove(v EntryTypeFlag) { + *x = *x &^ v +} + +type EntryType uint8 + +const ( + TypeSearchCheckpoint EntryType = iota + TypeCanonicalHash + TypeDerivedLink + TypeDerivedCheck + TypePadding +) + +func (x EntryType) String() string { + switch x { + case TypeSearchCheckpoint: + return "searchCheckpoint" + case TypeCanonicalHash: + return "canonicalHash" + case TypeDerivedLink: + return "derivedLink" + case TypeDerivedCheck: + return "derivedCheck" + case TypePadding: + return "padding" + default: + return fmt.Sprintf("unknown-%d", uint8(x)) + } +} diff --git a/op-supervisor/supervisor/backend/db/fromda/key.go b/op-supervisor/supervisor/backend/db/fromda/key.go new file mode 100644 index 0000000000000..ca902752a7acd --- /dev/null +++ b/op-supervisor/supervisor/backend/db/fromda/key.go @@ -0,0 +1,14 @@ +package fromda + +import ( + "fmt" +) + +type Key struct { + DerivedFrom uint64 + Derived uint64 +} + +func (k Key) String() string { + return fmt.Sprintf("derivedFrom: %d, derived: %d", k.DerivedFrom, k.Derived) +} diff --git a/op-supervisor/supervisor/backend/db/fromda/state.go b/op-supervisor/supervisor/backend/db/fromda/state.go new file mode 100644 index 0000000000000..22b419b7ef7a7 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/fromda/state.go @@ -0,0 +1,286 @@ +package fromda + +import ( + "fmt" + "io" + "slices" + + "github.com/ethereum/go-ethereum/common" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +type state struct { + // next entry index, including the contents of `out` + nextEntryIndex entrydb.EntryIdx + + derivedFrom types.BlockSeal + derivedUntil uint64 // L2 block that we last derived until starting deriving from this L1 block + derivedSince uint32 // amount of blocks derived from derivedFrom thus far + + derived types.BlockSeal // produced using L1 data up to and including that of derivedFrom + + need EntryTypeFlag + + // buffer of entries not yet in the DB. + // This is generated as objects are applied. + // E.g. you can build things on top of the state, + // before flushing the entries to a DB. + // However, no entries can be read from the DB while objects are being applied. + out []Entry +} + +var _ entrydb.IndexState[EntryType, Key] = (*state)(nil) + +func (l *state) Key() (k Key, ok bool) { + return Key{DerivedFrom: l.derivedFrom.Number, Derived: l.derived.Number}, l.need == 0 +} + +func (l *state) Incomplete() bool { + return l.need != 0 +} + +func (l *state) Out() []Entry { + return slices.Clone(l.out) +} + +func (l *state) ClearOut() { + l.out = l.out[:0] +} + +func (l *state) NextIndex() entrydb.EntryIdx { + return l.nextEntryIndex +} + +func (l *state) DerivedFrom() (id types.BlockSeal, ok bool) { + return l.derivedFrom, l.need == 0 +} + +func (l *state) DerivedSince() (count uint32, ok bool) { + return l.derivedSince, l.need == 0 +} + +func (l *state) DerivedUntil() (derivedUntil uint64, ok bool) { + return l.derivedUntil, l.need == 0 +} + +func (l *state) Derived() (id types.BlockSeal, ok bool) { + return l.derived, l.need == 0 +} + +// ApplyEntry applies an entry on top of the current state. +func (l *state) ApplyEntry(entry Entry) error { + // Wrap processEntry to add common useful error message info + err := l.processEntry(entry) + if err != nil { + return fmt.Errorf("failed to process type %s entry at idx %d (%x): %w", entry.Type().String(), l.nextEntryIndex, entry[:], err) + } + return nil +} + +func (l *state) processEntry(entry Entry) error { + if len(l.out) != 0 { + panic("can only apply without appending if the state is still empty") + } + switch entry.Type() { + case TypeSearchCheckpoint: + v, err := newSearchCheckpointFromEntry(entry) + if err != nil { + return err + } + l.derivedFrom = types.BlockSeal{ + Hash: common.Hash{}, + Number: v.blockNum, + Timestamp: v.timestamp, + } + l.derivedSince = v.derivedSince + l.need.Remove(FlagSearchCheckpoint) + l.need.Add(FlagCanonicalHash) + case TypeCanonicalHash: + v, err := newCanonicalHashFromEntry(entry) + if err != nil { + return err + } + l.derivedFrom.Hash = v.hash + l.need.Remove(FlagCanonicalHash) + case TypeDerivedLink: + v, err := newDerivedLinkFromEntry(entry) + if err != nil { + return err + } + l.need.Remove(FlagDerivedLink) + l.need.Add(FlagDerivedCheck) + l.derived = types.BlockSeal{ + Hash: common.Hash{}, + Number: v.number, + Timestamp: v.timestamp, + } + case TypeDerivedCheck: + v, err := newDerivedCheckFromEntry(entry) + if err != nil { + return err + } + l.need.Remove(FlagDerivedCheck) + l.derived.Hash = v.hash + // we derived a new block! + l.derivedSince += 1 + case TypePadding: + l.need.Remove(FlagPadding) + default: + return fmt.Errorf("unknown entry type: %s", entry.Type()) + } + return nil +} + +// appendEntry add the entry to the output-buffer, +// and registers it as last processed entry type, and increments the next entry-index. +func (l *state) appendEntry(obj EntryObj) { + entry := obj.encode() + l.out = append(l.out, entry) + l.nextEntryIndex += 1 +} + +// infer advances the logContext in cases where complex entries contain multiple implied entries +// eg. a SearchCheckpoint implies a CannonicalHash will follow +// this also handles inserting the searchCheckpoint at the set frequency, and padding entries +func (l *state) infer() error { + // We force-insert a checkpoint whenever we hit the known fixed interval. + if l.nextEntryIndex%searchCheckpointFrequency == 0 { + l.need.Add(FlagSearchCheckpoint) + } + if l.need.Any(FlagSearchCheckpoint) { + l.appendEntry(newSearchCheckpoint(l.derivedFrom.Number, l.derivedFrom.Timestamp, l.derivedSince, l.derivedUntil)) + l.need.Add(FlagCanonicalHash) // always follow with a canonical hash + l.need.Remove(FlagSearchCheckpoint) + return nil + } + if l.need.Any(FlagCanonicalHash) { + l.appendEntry(newCanonicalHash(l.derivedFrom.Hash)) + l.need.Remove(FlagCanonicalHash) + return nil + } + if l.need.Any(FlagDerivedLink) { + // Add padding if this link/check combination is going to overlap with the checkpoint + switch l.nextEntryIndex % searchCheckpointFrequency { + case searchCheckpointFrequency - 1: + l.need.Add(FlagPadding) + return nil + } + l.appendEntry(newDerivedLink(l.derived.Number, l.derived.Timestamp)) + l.need.Remove(FlagDerivedLink) + l.need.Any(FlagDerivedCheck) + return nil + } + if l.need.Any(FlagDerivedCheck) { + l.appendEntry(newDerivedCheck(l.derived.Hash)) + l.need.Remove(FlagDerivedCheck) + // we derived a new L2 block! + l.derivedSince += 1 + return nil + } + return io.EOF +} + +// inferFull advances the logContext until it cannot infer any more entries. +func (l *state) inferFull() error { + for i := 0; i < 10; i++ { + err := l.infer() + if err == nil { + continue + } + if err == io.EOF { // wrapped io.EOF does not count. + return nil + } else { + return err + } + } + panic("hit sanity limit") +} + +// AddDerived adds a L1<>L2 block derivation link. +// This may repeat the L1 block if there are multiple L2 blocks derived from it, or repeat the L2 block if the L1 block is empty. +func (l *state) AddDerived(derivedFrom eth.BlockRef, derived eth.BlockRef) error { + // If we don't have any entries yet, allow any block to start things off + if l.nextEntryIndex != 0 { + // TODO insert starting point + } + + if l.derived.ID() == derived.ID() && l.derivedFrom.ID() == derivedFrom.ID() { + // Repeat of same information. No entries to be written. + // But we can silently ignore and not return an error, as that brings the caller + // in a consistent state, after which it can insert the actual new derived-from information. + return nil + } + + // Check derived relation: the L2 chain has to be sequential without gaps. An L2 block may repeat if the L1 block is empty. + if l.derived.Number == derived.Number { + // Same block height? Then it must be the same block. + // I.e. we encountered an empty L1 block, and the same L2 block continues to be the last block that was derived from it. + if l.derived.Hash != derived.Hash { + // TODO + } + } else if l.derived.Number+1 == derived.Number { + if l.derived.Hash != derived.ParentHash { + return fmt.Errorf("derived block %s (parent %s) does not build on %s: %w", + derived, derived.ParentHash, l.derived, entrydb.ErrConflict) + } + } else if l.derived.Number+1 < derived.Number { + return fmt.Errorf("derived block %s (parent: %s) is too new, expected to build on top of %s: %w", + derived, derived.ParentHash, l.derived, entrydb.ErrOutOfOrder) + } else { + return fmt.Errorf("derived block %s is older than current derived block %s: %w", + derived, l.derived, entrydb.ErrOutOfOrder) + } + + // Check derived-from relation: multiple L2 blocks may be derived from the same L1 block. But everything in sequence. + if l.derivedFrom.Number == derivedFrom.Number { + // Same block height? Then it must be the same block. + if l.derivedFrom.Hash != derivedFrom.Hash { + return fmt.Errorf("cannot add block %s as derived from %s, expected to be derived from %s at this block height: %w", + derived, derivedFrom, l.derivedFrom, entrydb.ErrConflict) + } + } else if l.derivedFrom.Number+1 == derivedFrom.Number { + // parent hash check + if l.derivedFrom.Hash != derivedFrom.ParentHash { + return fmt.Errorf("cannot add block %s as derived from %s (parent %s) derived on top of %s: %w", + derived, derivedFrom, derivedFrom.ParentHash, l.derivedFrom, entrydb.ErrConflict) + } + } else if l.derivedFrom.Number+1 < derivedFrom.Number { + // adding block that is derived from something too far into the future + return fmt.Errorf("cannot add block %s as derived from %s, still deriving from %s: %s", + derived, derivedFrom, l.derivedFrom, entrydb.ErrOutOfOrder) + } else { + // adding block that is derived from something too old + return fmt.Errorf("cannot add block %s as derived from %s, deriving already at %s: %w", + derived, derivedFrom, l.derivedFrom, entrydb.ErrOutOfOrder) + } + + if l.derivedFrom.ID() != derivedFrom.ID() { + // Sanity check our state + if expected := l.derivedUntil + uint64(l.derivedSince); expected != l.derived.Number { + panic(fmt.Errorf("expected to have derived up to %d (%d until current L1 block, and %d since then), but have %d", + expected, l.derivedUntil, l.derivedSince, l.derived.Number)) + } + l.need.Add(FlagSearchCheckpoint) + l.derivedUntil += l.derived.Number + + l.derivedFrom = types.BlockSeal{ + Hash: derivedFrom.Hash, + Number: derivedFrom.Number, + Timestamp: derivedFrom.Time, + } + } + + if l.derived.ID() != derived.ID() { + l.need.Add(FlagDerivedLink) + l.derived = types.BlockSeal{ + Hash: derived.Hash, + Number: derived.Number, + Timestamp: derived.Time, + } + } + + return l.inferFull() +}