Skip to content

Commit

Permalink
add q4 trimming support
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Sep 9, 2024
1 parent 1502153 commit 06b6535
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 87 deletions.
32 changes: 26 additions & 6 deletions store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type metrics struct {
putExists metric.Int64Counter
get metric.Float64Histogram
has metric.Float64Histogram
remove metric.Float64Histogram
removeAll metric.Float64Histogram
removeQ4 metric.Float64Histogram
unreg func() error
}

Expand Down Expand Up @@ -53,8 +54,14 @@ func (s *Store) WithMetrics() error {
return err
}

remove, err := meter.Float64Histogram("eds_store_remove_time_histogram",
metric.WithDescription("eds store remove time histogram(s)"))
removeQ4, err := meter.Float64Histogram("eds_store_remove_q4_time_histogram",
metric.WithDescription("eds store remove q4 data time histogram(s)"))
if err != nil {
return err
}

removeAll, err := meter.Float64Histogram("eds_store_remove_all_time_histogram",
metric.WithDescription("eds store remove all data time histogram(s)"))
if err != nil {
return err
}
Expand All @@ -64,7 +71,8 @@ func (s *Store) WithMetrics() error {
putExists: putExists,
get: get,
has: has,
remove: remove,
removeAll: removeAll,
removeQ4: removeQ4,
}
return s.metrics.addCacheMetrics(s.cache)
}
Expand Down Expand Up @@ -130,15 +138,27 @@ func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) {
func (m *metrics) observeRemoveAll(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.removeAll.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

func (m *metrics) observeRemoveQ4(ctx context.Context, dur time.Duration, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

m.remove.Record(ctx, dur.Seconds(), metric.WithAttributes(
m.removeQ4.Record(ctx, dur.Seconds(), metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}

Expand Down
92 changes: 58 additions & 34 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
var ErrNotFound = errors.New("eds not found in store")

// Store is a storage for EDS files. It persists EDS files on disk in form of Q1Q4 files or ODS
// files. It provides methods to put, get and remove EDS files. It has two caches: recent eds cache
// files. It provides methods to put, get and removeAll EDS files. It has two caches: recent eds cache
// and availability cache. Recent eds cache is used to cache recent blocks. Availability cache is
// used to cache blocks that are accessed by sample requests. Store is thread-safe.
type Store struct {
Expand Down Expand Up @@ -149,23 +149,22 @@ func (s *Store) createFile(
return true, nil
}
if err != nil {
// ensure we don't have partial writes if any operation fails
removeErr := s.removeAll(height, roots.Hash())
return false, errors.Join(
fmt.Errorf("creating ODSQ4 file: %w", err),
// ensure we don't have partial writes
remove(pathODS),
remove(pathQ4),
removeErr,
)
}

// create hard link with height as name
err = s.linkHeight(roots.Hash(), height)
if err != nil {
// ensure we don't have partial writes if any operation fails
removeErr := s.removeAll(height, roots.Hash())
return false, errors.Join(
fmt.Errorf("hardlinking height: %w", err),
remove(pathODS),
remove(pathQ4),
s.removeLink(height),
removeErr,
)
}
return false, nil
Expand Down Expand Up @@ -305,51 +304,76 @@ func (s *Store) hasByHeight(height uint64) (bool, error) {
return exists(pathODS)
}

func (s *Store) Remove(ctx context.Context, height uint64, datahash share.DataHash) error {
func (s *Store) RemoveAll(ctx context.Context, height uint64, datahash share.DataHash) error {
lock := s.stripLock.byHashAndHeight(datahash, height)
lock.lock()
defer lock.unlock()

tNow := time.Now()
err := s.remove(height, datahash)
s.metrics.observeRemove(ctx, time.Since(tNow), err != nil)
err := s.removeAll(height, datahash)
s.metrics.observeRemoveAll(ctx, time.Since(tNow), err != nil)
return err
}

func (s *Store) remove(height uint64, datahash share.DataHash) error {
lock := s.stripLock.byHeight(height)
lock.Lock()
if err := s.removeLink(height); err != nil {
return fmt.Errorf("removing link: %w", err)
func (s *Store) removeAll(height uint64, datahash share.DataHash) error {
if err := s.removeODS(height, datahash); err != nil {
return fmt.Errorf("removing ODS: %w", err)
}
lock.Unlock()

dlock := s.stripLock.byHash(datahash)
dlock.Lock()
defer dlock.Unlock()
if err := s.removeFile(datahash); err != nil {
return fmt.Errorf("removing file: %w", err)
if err := s.removeQ4(height, datahash); err != nil {
return fmt.Errorf("removing Q4: %w", err)
}
return nil
}

func (s *Store) removeLink(height uint64) error {
func (s *Store) removeODS(height uint64, datahash share.DataHash) error {
if err := s.cache.Remove(height); err != nil {
return fmt.Errorf("removing from cache: %w", err)
}

pathODS := s.heightToPath(height, odsFileExt)
return remove(pathODS)
pathLink := s.heightToPath(height, odsFileExt)
if err := remove(pathLink); err != nil {
return fmt.Errorf("removing hardlink: %w", err)
}

// if datahash is empty, we don't need to remove the ODS file, only the hardlink
if datahash.IsEmptyEDS() {
return nil
}

pathODS := s.hashToPath(datahash, odsFileExt)
if err := remove(pathODS); err != nil {
return fmt.Errorf("removing ODS file: %w", err)
}
return nil
}

func (s *Store) removeFile(hash share.DataHash) error {
// we don't need to remove the empty file, it should always be there
if hash.IsEmptyEDS() {
func (s *Store) RemoveQ4(ctx context.Context, height uint64, datahash share.DataHash) error {
lock := s.stripLock.byHashAndHeight(datahash, height)
lock.lock()
defer lock.unlock()

tNow := time.Now()
err := s.removeQ4(height, datahash)
s.metrics.observeRemoveQ4(ctx, time.Since(tNow), err != nil)
return err
}

func (s *Store) removeQ4(height uint64, datahash share.DataHash) error {
// if datahash is empty, we don't need to remove the Q4 file
if datahash.IsEmptyEDS() {
return nil
}

pathODS := s.hashToPath(hash, odsFileExt)
pathQ4 := s.hashToPath(hash, q4FileExt)
return errors.Join(
remove(pathODS),
remove(pathQ4),
)
if err := s.cache.Remove(height); err != nil {
return fmt.Errorf("removing from cache: %w", err)
}

// remove Q4 file
pathQ4File := s.hashToPath(datahash, q4FileExt)
if err := remove(pathQ4File); err != nil {
return fmt.Errorf("removing Q4 file: %w", err)
}
return nil
}

func (s *Store) hashToPath(datahash share.DataHash, ext string) string {
Expand Down
Loading

0 comments on commit 06b6535

Please sign in to comment.