Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 47 additions & 27 deletions op-supervisor/supervisor/backend/db/entrydb/entry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import (
"github.com/ethereum/go-ethereum/log"
)

const EntrySize = 34
type EntryStore[T EntryType, E Entry[T]] interface {
Size() int64
LastEntryIdx() EntryIdx
Read(idx EntryIdx) (E, error)
Append(entries ...E) error
Truncate(idx EntryIdx) error
Close() error
}

type EntryIdx int64

Expand All @@ -18,10 +25,18 @@ type EntryType interface {
~uint8
}

type Entry[T EntryType] [EntrySize]byte
type Entry[T EntryType] interface {
Type() T
comparable
}

func (entry Entry[T]) Type() T {
return T(entry[0])
// Binary is the binary interface to encode/decode/size entries.
// This should be a zero-cost abstraction, and is bundled as interface for the EntryDB
// to have generic access to this functionality without const-generics for array size in Go.
type Binary[T EntryType, E Entry[T]] interface {
Append(dest []byte, e *E) []byte
ReadAt(dest *E, r io.ReaderAt, at int64) (n int, err error)
EntrySize() int
}

// dataAccess defines a minimal API required to manipulate the actual stored data.
Expand All @@ -33,10 +48,12 @@ type dataAccess interface {
Truncate(size int64) error
}

type EntryDB[T EntryType] struct {
type EntryDB[T EntryType, E Entry[T], B Binary[T, E]] struct {
data dataAccess
lastEntryIdx EntryIdx

b B

cleanupFailedWrite bool
}

Expand All @@ -45,7 +62,7 @@ type EntryDB[T EntryType] struct {
// If the file exists it will be used as the existing data.
// Returns ErrRecoveryRequired if the existing file is not a valid entry db. A EntryDB is still returned but all
// operations will return ErrRecoveryRequired until the Recover method is called.
func NewEntryDB[T EntryType](logger log.Logger, path string) (*EntryDB[T], error) {
func NewEntryDB[T EntryType, E Entry[T], B Binary[T, E]](logger log.Logger, path string) (*EntryDB[T, E, B], error) {
logger.Info("Opening entry database", "path", path)
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
Expand All @@ -55,38 +72,41 @@ func NewEntryDB[T EntryType](logger log.Logger, path string) (*EntryDB[T], error
if err != nil {
return nil, fmt.Errorf("failed to stat database at %v: %w", path, err)
}
size := info.Size() / EntrySize
db := &EntryDB[T]{
var b B
size := info.Size() / int64(b.EntrySize())
db := &EntryDB[T, E, B]{
data: file,
lastEntryIdx: EntryIdx(size - 1),
}
if size*EntrySize != info.Size() {
logger.Warn("File size is not a multiple of entry size. Truncating to last complete entry", "fileSize", size, "entrySize", EntrySize)
if size*int64(b.EntrySize()) != info.Size() {
logger.Warn("File size is not a multiple of entry size. Truncating to last complete entry", "fileSize", size, "entrySize", b.EntrySize())
if err := db.recover(); err != nil {
return nil, fmt.Errorf("failed to recover database at %v: %w", path, err)
}
}
return db, nil
}

func (e *EntryDB[T]) Size() int64 {
func (e *EntryDB[T, E, B]) Size() int64 {
return int64(e.lastEntryIdx) + 1
}

func (e *EntryDB[T]) LastEntryIdx() EntryIdx {
// LastEntryIdx returns the index of the last entry in the DB.
// This returns -1 if the DB is empty.
func (e *EntryDB[T, E, B]) LastEntryIdx() EntryIdx {
return e.lastEntryIdx
}

// Read an entry from the database by index. Returns io.EOF iff idx is after the last entry.
func (e *EntryDB[T]) Read(idx EntryIdx) (Entry[T], error) {
func (e *EntryDB[T, E, B]) Read(idx EntryIdx) (E, error) {
var out E
if idx > e.lastEntryIdx {
return Entry[T]{}, io.EOF
return out, io.EOF
}
var out Entry[T]
read, err := e.data.ReadAt(out[:], int64(idx)*EntrySize)
read, err := e.b.ReadAt(&out, e.data, int64(idx)*int64(e.b.EntrySize()))
// Ignore io.EOF if we read the entire last entry as ReadAt may return io.EOF or nil when it reads the last byte
if err != nil && !(errors.Is(err, io.EOF) && read == EntrySize) {
return Entry[T]{}, fmt.Errorf("failed to read entry %v: %w", idx, err)
if err != nil && !(errors.Is(err, io.EOF) && read == e.b.EntrySize()) {
return out, fmt.Errorf("failed to read entry %v: %w", idx, err)
}
return out, nil
}
Expand All @@ -95,16 +115,16 @@ func (e *EntryDB[T]) Read(idx EntryIdx) (Entry[T], error) {
// The entries are combined in memory and passed to a single Write invocation.
// If the write fails, it will attempt to truncate any partially written data.
// Subsequent writes to this instance will fail until partially written data is truncated.
func (e *EntryDB[T]) Append(entries ...Entry[T]) error {
func (e *EntryDB[T, E, B]) Append(entries ...E) error {
if e.cleanupFailedWrite {
// Try to rollback partially written data from a previous Append
if truncateErr := e.Truncate(e.lastEntryIdx); truncateErr != nil {
return fmt.Errorf("failed to recover from previous write error: %w", truncateErr)
}
}
data := make([]byte, 0, len(entries)*EntrySize)
for _, entry := range entries {
data = append(data, entry[:]...)
data := make([]byte, 0, len(entries)*e.b.EntrySize())
for i := range entries {
data = e.b.Append(data, &entries[i])
}
if n, err := e.data.Write(data); err != nil {
if n == 0 {
Expand All @@ -125,8 +145,8 @@ func (e *EntryDB[T]) Append(entries ...Entry[T]) error {
}

// Truncate the database so that the last retained entry is idx. Any entries after idx are deleted.
func (e *EntryDB[T]) Truncate(idx EntryIdx) error {
if err := e.data.Truncate((int64(idx) + 1) * EntrySize); err != nil {
func (e *EntryDB[T, E, B]) Truncate(idx EntryIdx) error {
if err := e.data.Truncate((int64(idx) + 1) * int64(e.b.EntrySize())); err != nil {
return fmt.Errorf("failed to truncate to entry %v: %w", idx, err)
}
// Update the lastEntryIdx cache
Expand All @@ -136,13 +156,13 @@ func (e *EntryDB[T]) Truncate(idx EntryIdx) error {
}

// recover an invalid database by truncating back to the last complete event.
func (e *EntryDB[T]) recover() error {
if err := e.data.Truncate((e.Size()) * EntrySize); err != nil {
func (e *EntryDB[T, E, B]) recover() error {
if err := e.data.Truncate(e.Size() * int64(e.b.EntrySize())); err != nil {
return fmt.Errorf("failed to truncate trailing partial entries: %w", err)
}
return nil
}

func (e *EntryDB[T]) Close() error {
func (e *EntryDB[T, E, B]) Close() error {
return e.data.Close()
}
52 changes: 39 additions & 13 deletions op-supervisor/supervisor/backend/db/entrydb/entry_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"path/filepath"
"testing"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-service/testlog"
)

type TestEntryType uint8
Expand All @@ -20,7 +22,31 @@ func (typ TestEntryType) String() string {
return fmt.Sprintf("%d", uint8(typ))
}

type TestEntry = Entry[TestEntryType]
const TestEntrySize = 34

type TestEntry [TestEntrySize]byte

func (t TestEntry) Type() TestEntryType {
return TestEntryType(t[0])
}

type TestEntryBinary struct{}

func (TestEntryBinary) Append(dest []byte, e *TestEntry) []byte {
return append(dest, e[:]...)
}

func (TestEntryBinary) ReadAt(dest *TestEntry, r io.ReaderAt, at int64) (n int, err error) {
return r.ReadAt(dest[:], at)
}

func (TestEntryBinary) EntrySize() int {
return TestEntrySize
}

var _ Binary[TestEntryType, TestEntry] = TestEntryBinary{}

type TestEntryDB = EntryDB[TestEntryType, TestEntry, TestEntryBinary]

func TestReadWrite(t *testing.T) {
t.Run("BasicReadWrite", func(t *testing.T) {
Expand Down Expand Up @@ -120,18 +146,18 @@ func TestTruncateTrailingPartialEntries(t *testing.T) {
entry2 := createEntry(2)
invalidData := make([]byte, len(entry1)+len(entry2)+4)
copy(invalidData, entry1[:])
copy(invalidData[EntrySize:], entry2[:])
copy(invalidData[TestEntrySize:], entry2[:])
invalidData[len(invalidData)-1] = 3 // Some invalid trailing data
require.NoError(t, os.WriteFile(file, invalidData, 0o644))
db, err := NewEntryDB[TestEntryType](logger, file)
db, err := NewEntryDB[TestEntryType, TestEntry, TestEntryBinary](logger, file)
require.NoError(t, err)
defer db.Close()

// Should automatically truncate the file to remove trailing partial entries
require.EqualValues(t, 2, db.Size())
stat, err := os.Stat(file)
require.NoError(t, err)
require.EqualValues(t, 2*EntrySize, stat.Size())
require.EqualValues(t, 2*TestEntrySize, stat.Size())
}

func TestWriteErrors(t *testing.T) {
Expand Down Expand Up @@ -162,7 +188,7 @@ func TestWriteErrors(t *testing.T) {
t.Run("PartialWriteAndTruncateFails", func(t *testing.T) {
db, stubData := createEntryDBWithStubData()
stubData.writeErr = expectedErr
stubData.writeErrAfterBytes = EntrySize + 2
stubData.writeErrAfterBytes = TestEntrySize + 2
stubData.truncateErr = errors.New("boom")
err := db.Append(createEntry(1), createEntry(2))
require.ErrorIs(t, err, expectedErr)
Expand All @@ -186,29 +212,29 @@ func TestWriteErrors(t *testing.T) {
})
}

func requireRead(t *testing.T, db *EntryDB[TestEntryType], idx EntryIdx, expected TestEntry) {
func requireRead(t *testing.T, db *TestEntryDB, idx EntryIdx, expected TestEntry) {
actual, err := db.Read(idx)
require.NoError(t, err)
require.Equal(t, expected, actual)
}

func createEntry(i byte) TestEntry {
return TestEntry(bytes.Repeat([]byte{i}, EntrySize))
return TestEntry(bytes.Repeat([]byte{i}, TestEntrySize))
}

func createEntryDB(t *testing.T) *EntryDB[TestEntryType] {
func createEntryDB(t *testing.T) *TestEntryDB {
logger := testlog.Logger(t, log.LvlInfo)
db, err := NewEntryDB[TestEntryType](logger, filepath.Join(t.TempDir(), "entries.db"))
db, err := NewEntryDB[TestEntryType, TestEntry, TestEntryBinary](logger, filepath.Join(t.TempDir(), "entries.db"))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
return db
}

func createEntryDBWithStubData() (*EntryDB[TestEntryType], *stubDataAccess) {
func createEntryDBWithStubData() (*TestEntryDB, *stubDataAccess) {
stubData := &stubDataAccess{}
db := &EntryDB[TestEntryType]{data: stubData, lastEntryIdx: -1}
db := &EntryDB[TestEntryType, TestEntry, TestEntryBinary]{data: stubData, lastEntryIdx: -1}
return db, stubData
}

Expand Down
39 changes: 39 additions & 0 deletions op-supervisor/supervisor/backend/db/entrydb/memdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package entrydb

import (
"io"
)

type MemEntryStore[T EntryType, E Entry[T]] struct {
entries []E
}

func (s *MemEntryStore[T, E]) Size() int64 {
return int64(len(s.entries))
}

func (s *MemEntryStore[T, E]) LastEntryIdx() EntryIdx {
return EntryIdx(s.Size() - 1)
}

func (s *MemEntryStore[T, E]) Read(idx EntryIdx) (E, error) {
if idx < EntryIdx(len(s.entries)) {
return s.entries[idx], nil
}
var out E
return out, io.EOF
}

func (s *MemEntryStore[T, E]) Append(entries ...E) error {
s.entries = append(s.entries, entries...)
return nil
}

func (s *MemEntryStore[T, E]) Truncate(idx EntryIdx) error {
s.entries = s.entries[:min(s.Size()-1, int64(idx+1))]
return nil
}

func (s *MemEntryStore[T, E]) Close() error {
return nil
}
Loading