Skip to content

Commit

Permalink
Code review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jun 24, 2024
1 parent 0b148ae commit 512c421
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 30 deletions.
7 changes: 5 additions & 2 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go/types"
"log"
"os"
"os/signal"
"strconv"
"strings"

Expand Down Expand Up @@ -235,11 +236,13 @@ var dbReapCmd = &cobra.Command{
reaper := ingest.NewReaper(
ingest.ReapConfig{
RetentionCount: uint32(globalConfig.HistoryRetentionCount),
ReapBatchSize: uint32(globalConfig.HistoryRetentionReapCount),
BatchSize: uint32(globalConfig.HistoryRetentionReapCount),
},
session,
)
return reaper.DeleteUnretainedHistory(context.Background())
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
return reaper.DeleteUnretainedHistory(ctx)
},
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestReapLookupTables(t *testing.T) {
reaper := ingest.NewReaper(
ingest.ReapConfig{
RetentionCount: 1,
ReapBatchSize: 50,
BatchSize: 50,
},
db,
)
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,14 @@ func Flags() (*Config, support.ConfigOptions) {
"A value of 1 implies history is trimmed after every ledger. " +
"A value of 2 implies history is trimmed on every second ledger.",
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
val := viper.GetUint(opt.Name)
if val <= 0 {
return fmt.Errorf("flag --reap-frequency must be positive")
}
*(opt.ConfigKey.(*uint)) = val
return nil
},
},
&support.ConfigOption{
Name: "history-stale-threshold",
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/httpt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (ht *HTTPT) ReapHistory(retention uint32) {
reaper := ingest.NewReaper(
ingest.ReapConfig{
RetentionCount: retention,
ReapBatchSize: 50_000,
BatchSize: 50_000,
}, ht.HorizonSession())

ht.Require.NoError(reaper.DeleteUnretainedHistory(context.Background()))
Expand Down
11 changes: 3 additions & 8 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ type Config struct {
CoreProtocolVersionFn ledgerbackend.CoreProtocolVersionFunc
CoreBuildVersionFn ledgerbackend.CoreBuildVersionFunc

ReapFrequency uint
HistoryRetentionCount uint
HistoryRetentionReapCount uint
ReapConfig ReapConfig
}

const (
Expand Down Expand Up @@ -326,10 +324,7 @@ func NewSystem(config Config) (System, error) {
),
maxLedgerPerFlush: maxLedgersPerFlush,
reaper: NewReaper(
ReapConfig{
RetentionCount: uint32(config.HistoryRetentionCount),
ReapBatchSize: uint32(config.HistoryRetentionReapCount),
},
config.ReapConfig,
config.HistorySession,
),
}
Expand Down Expand Up @@ -710,7 +705,7 @@ func (s *system) runStateMachine(cur stateMachineNode) error {
}

func (s *system) maybeReapHistory(lastIngestedLedger uint32) {
if s.config.ReapFrequency == 0 || lastIngestedLedger%uint32(s.config.ReapFrequency) != 0 {
if s.config.ReapConfig.Frequency == 0 || lastIngestedLedger%uint32(s.config.ReapConfig.Frequency) != 0 {
return
}
s.wg.Add(1)
Expand Down
27 changes: 15 additions & 12 deletions services/horizon/internal/ingest/reap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -20,20 +20,20 @@ import (
type Reaper struct {
historyQ history.IngestionQ
reapLockQ history.IngestionQ
pending atomic.Bool
config ReapConfig
logger *logpkg.Entry

totalDuration *prometheus.SummaryVec
totalDeleted *prometheus.SummaryVec
deleteBatchDuration prometheus.Summary
rowsInBatchDeleted prometheus.Summary

lock sync.Mutex
}

type ReapConfig struct {
Frequency uint
RetentionCount uint32
ReapBatchSize uint32
BatchSize uint32
}

// NewReaper creates a new Reaper instance
Expand All @@ -53,7 +53,7 @@ func newReaper(config ReapConfig, historyQ, reapLockQ history.IngestionQ) *Reape
}),
rowsInBatchDeleted: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "reap", Name: "batch_rows_deleted",
Help: "rows deleted during reap batch , sliding window = 10m",
Help: "rows deleted during reap batch, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
totalDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Expand All @@ -77,11 +77,12 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error {
return nil
}

if !r.lock.TryLock() {
r.logger.Infof("reap already in progress")
// check if reap is already in progress on this horizon node
if !r.pending.CompareAndSwap(false, true) {
r.logger.Infof("existing reap already in progress, skipping request to start a new one")
return nil
}
defer r.lock.Unlock()
defer r.pending.Store(false)

if err := r.reapLockQ.Begin(ctx); err != nil {
return errors.Wrap(err, "error while starting reaper lock transaction")
Expand All @@ -91,8 +92,9 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error {
r.logger.WithField("error", err).Error("failed to release reaper lock")
}
}()
// check if reap is already in progress on another horizon node
if acquired, err := r.reapLockQ.TryReaperLock(ctx); err != nil {
return errors.Wrap(err, "error while acquiring reaper lock")
return errors.Wrap(err, "error while acquiring reaper database lock")
} else if !acquired {
r.logger.Info("reap already in progress on another node")
return nil
Expand Down Expand Up @@ -154,10 +156,11 @@ func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) {
)
}

// Work backwards in 50k (by default, otherwise configurable via the CLI) ledger
// Work in 50k (by default, otherwise configurable via the CLI) ledger
// blocks to prevent using all the CPU.
//
// This runs every hour, so we need to make sure it doesn't run for longer than
// By default, this runs every 720 ledgers (approximately 1 hour), so we
// need to make sure it doesn't run for longer than
// an hour.
//
// Current ledger at 2024-04-04s is 51,092,283, so 50k means 1021 batches. At 1
Expand All @@ -166,7 +169,7 @@ func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) {
var sleep = 1 * time.Second

func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) (int64, error) {
batchSize := r.config.ReapBatchSize
batchSize := r.config.BatchSize
var sum int64
if batchSize <= 0 {
return sum, fmt.Errorf("invalid batch size for reaping (%d)", batchSize)
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestDeleteUnretainedHistory(t *testing.T) {

reaper := NewReaper(ReapConfig{
RetentionCount: 0,
ReapBatchSize: 50,
BatchSize: 50,
}, db)

// Disable sleeps for this.
Expand Down Expand Up @@ -68,7 +68,7 @@ type ReaperTestSuite struct {
ctx context.Context
historyQ *mockDBQ
reapLockQ *mockDBQ
reaper *Reaper
reaper Reaper
prevSleep time.Duration
}

Expand All @@ -82,7 +82,7 @@ func (t *ReaperTestSuite) SetupTest() {
t.reapLockQ = &mockDBQ{}
t.reaper = newReaper(ReapConfig{

Check failure on line 83 in services/horizon/internal/ingest/reap_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-22.04, 1.22, 12)

cannot use newReaper(ReapConfig{…}, t.historyQ, t.reapLockQ) (value of type *Reaper) as Reaper value in assignment

Check failure on line 83 in services/horizon/internal/ingest/reap_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-22.04, 1.22, 16)

cannot use newReaper(ReapConfig{…}, t.historyQ, t.reapLockQ) (value of type *Reaper) as Reaper value in assignment

Check failure on line 83 in services/horizon/internal/ingest/reap_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-22.04, 1.21, 12)

cannot use newReaper(ReapConfig{…}, t.historyQ, t.reapLockQ) (value of type *Reaper) as Reaper value in assignment

Check failure on line 83 in services/horizon/internal/ingest/reap_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-22.04, 1.21, 16)

cannot use newReaper(ReapConfig{…}, t.historyQ, t.reapLockQ) (value of type *Reaper) as Reaper value in assignment
RetentionCount: 30,
ReapBatchSize: 10,
BatchSize: 10,
}, t.historyQ, t.reapLockQ)
t.prevSleep = sleep
sleep = 0
Expand Down
8 changes: 5 additions & 3 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ func initIngester(app *App) {
EnableExtendedLogLedgerStats: app.config.IngestEnableExtendedLogLedgerStats,
RoundingSlippageFilter: app.config.RoundingSlippageFilter,
SkipTxmeta: app.config.SkipTxmeta,
HistoryRetentionCount: app.config.HistoryRetentionCount,
HistoryRetentionReapCount: app.config.HistoryRetentionReapCount,
ReapFrequency: app.config.ReapFrequency,
ReapConfig: ingest.ReapConfig{
Frequency: app.config.ReapFrequency,
RetentionCount: uint32(app.config.HistoryRetentionCount),
BatchSize: uint32(app.config.HistoryRetentionReapCount),
},
})

if err != nil {
Expand Down

0 comments on commit 512c421

Please sign in to comment.