Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type Database struct {
liveIterGauge *metrics.Gauge // Gauge for tracking the number of live database iterators
levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels

// Read and Write Amplification metrics
readAmpGauge *metrics.GaugeFloat64 // Gauge for tracking read amplification
levelWriteAmpGauge []*metrics.GaugeFloat64 // Gauge for tracking write amplification per level
totalWriteAmpGauge *metrics.GaugeFloat64 // Gauge for tracking total write amplification

quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
closed bool // keep track of whether we're Closed
Expand Down Expand Up @@ -164,6 +169,26 @@ func (d *Database) onWriteStallEnd() {
d.writeDelayStartTime = time.Time{}
}

// Track SST file operations
func (d *Database) onTableCreated(info pebble.TableCreateInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/sst/created", nil).Mark(1)
d.log.Debug("SST file created", "reason", info.Reason, "fileNum", info.FileNum)
}

func (d *Database) onTableDeleted(info pebble.TableDeleteInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/sst/deleted", nil).Mark(1)
}

// Track WAL (.log) file operations
func (d *Database) onWALCreated(info pebble.WALCreateInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/wal/created", nil).Mark(1)
d.log.Debug("WAL file created", "fileNum", info.FileNum, "recycled", info.RecycledFileNum)
}

func (d *Database) onWALDeleted(info pebble.WALDeleteInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/wal/deleted", nil).Mark(1)
}

// panicLogger is just a noop logger to disable Pebble's internal logger.
//
// TODO(karalabe): Remove when Pebble sets this as the default.
Expand Down Expand Up @@ -277,6 +302,10 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
CompactionEnd: db.onCompactionEnd,
WriteStallBegin: db.onWriteStallBegin,
WriteStallEnd: db.onWriteStallEnd,
TableCreated: db.onTableCreated,
TableDeleted: db.onTableDeleted,
WALCreated: db.onWALCreated,
WALDeleted: db.onWALDeleted,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble

Expand Down Expand Up @@ -338,6 +367,10 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
db.liveCompSizeGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/size", nil)
db.liveIterGauge = metrics.GetOrRegisterGauge(namespace+"iter/count", nil)

// Register read and write amplification metrics
db.readAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"readamp", nil)
db.totalWriteAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"writeamp/total", nil)

// Start up the metrics gathering and return
go db.meter(metricsGatheringInterval, namespace)
return db, nil
Expand Down Expand Up @@ -616,14 +649,29 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
d.filterHitGauge.Update(stats.Filter.Hits)
d.filterMissGauge.Update(stats.Filter.Misses)

// Update read amplification metric
// ReadAmp returns the current read amplification of the database
d.readAmpGauge.Update(float64(stats.ReadAmp()))

// Calculate and update write amplification metrics per level
for i, level := range stats.Levels {
// Append metrics for additional layers
if i >= len(d.levelsGauge) {
d.levelsGauge = append(d.levelsGauge, metrics.GetOrRegisterGauge(namespace+fmt.Sprintf("tables/level%v", i), nil))
d.levelWriteAmpGauge = append(d.levelWriteAmpGauge, metrics.GetOrRegisterGaugeFloat64(namespace+fmt.Sprintf("writeamp/level%v", i), nil))
}
d.levelsGauge[i].Update(level.NumFiles)

// Update write amplification for this level (as float)
writeAmp := level.WriteAmp()
d.levelWriteAmpGauge[i].Update(writeAmp)
}

// Calculate total write amplification (as float)
totalMetrics := stats.Total()
totalWriteAmp := totalMetrics.WriteAmp()
d.totalWriteAmpGauge.Update(totalWriteAmp)

// Sleep a bit, then repeat the stats collection
select {
case errc = <-d.quitChan:
Expand Down
Loading