Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 276 additions & 4 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,40 @@ type Database struct {
liveIterGauge *metrics.Gauge // Gauge for tracking the number of live database iterators
levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels

// Read and Write Amplification metrics
readAmpGauge *metrics.GaugeFloat64 // Gauge for tracking read amplification
levelWriteAmpGauge []*metrics.GaugeFloat64 // Gauge for tracking write amplification per level
totalWriteAmpGauge *metrics.GaugeFloat64 // Gauge for tracking total write amplification

// Detailed I/O tracking metrics
walBytesWrittenMeter *metrics.Meter // Bytes written to WAL
walFileCountGauge *metrics.Gauge // Number of WAL files
sstBytesReadMeter *metrics.Meter // Bytes read from SST files (compaction input)
sstBytesWrittenMeter *metrics.Meter // Bytes written to SST files (compaction output)
flushBytesWrittenMeter *metrics.Meter // Bytes written during memtable flush

// Per-level size tracking
levelSizeGauge []*metrics.Gauge // Size of each level in bytes
levelScoreGauge []*metrics.Gauge // Compaction score per level (>1 means needs compaction)

// Detailed WAL metrics
walSizeGauge *metrics.Gauge // Current WAL size
walPhysicalSizeGauge *metrics.Gauge // Physical WAL size on disk
walObsoleteSizeGauge *metrics.Gauge // Obsolete WAL data size

// Snapshot metrics
snapshotCountGauge *metrics.Gauge // Number of snapshots

// Keys metrics for understanding data distribution
keysCountGauge []*metrics.Gauge // Number of keys per level

// Calculated amplification metrics
calcWriteAmpGauge *metrics.GaugeFloat64 // Calculated write amplification: total physical writes / logical user data
calcReadAmpGauge *metrics.GaugeFloat64 // Calculated read amplification (same as readamp)
calcSpaceAmpGauge *metrics.GaugeFloat64 // Calculated space amplification: disk/size / actual data size
walWriteAmpGauge *metrics.GaugeFloat64 // WAL write amplification: WAL physical / logical data
actualDataSizeGauge *metrics.Gauge // Actual user data size (from live SST files)

quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
closed bool // keep track of whether we're Closed
Expand Down Expand Up @@ -164,6 +198,26 @@ func (d *Database) onWriteStallEnd() {
d.writeDelayStartTime = time.Time{}
}

// Track SST file operations
func (d *Database) onTableCreated(info pebble.TableCreateInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/sst/created", nil).Mark(1)
d.log.Debug("SST file created", "reason", info.Reason, "fileNum", info.FileNum)
}

func (d *Database) onTableDeleted(info pebble.TableDeleteInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/sst/deleted", nil).Mark(1)
}

// Track WAL (.log) file operations
func (d *Database) onWALCreated(info pebble.WALCreateInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/wal/created", nil).Mark(1)
d.log.Debug("WAL file created", "fileNum", info.FileNum, "recycled", info.RecycledFileNum)
}

func (d *Database) onWALDeleted(info pebble.WALDeleteInfo) {
metrics.GetOrRegisterMeter(d.namespace+"file/wal/deleted", nil).Mark(1)
}

// panicLogger is just a noop logger to disable Pebble's internal logger.
//
// TODO(karalabe): Remove when Pebble sets this as the default.
Expand Down Expand Up @@ -277,6 +331,10 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
CompactionEnd: db.onCompactionEnd,
WriteStallBegin: db.onWriteStallBegin,
WriteStallEnd: db.onWriteStallEnd,
TableCreated: db.onTableCreated,
TableDeleted: db.onTableDeleted,
WALCreated: db.onWALCreated,
WALDeleted: db.onWALDeleted,
},
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble

Expand Down Expand Up @@ -338,6 +396,32 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
db.liveCompSizeGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/size", nil)
db.liveIterGauge = metrics.GetOrRegisterGauge(namespace+"iter/count", nil)

// Register read and write amplification metrics
db.readAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"readamp", nil)
db.totalWriteAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"writeamp/total", nil)

// Register detailed I/O tracking metrics
db.walBytesWrittenMeter = metrics.GetOrRegisterMeter(namespace+"wal/bytes", nil)
db.walFileCountGauge = metrics.GetOrRegisterGauge(namespace+"wal/files", nil)
db.sstBytesReadMeter = metrics.GetOrRegisterMeter(namespace+"sst/read", nil)
db.sstBytesWrittenMeter = metrics.GetOrRegisterMeter(namespace+"sst/written", nil)
db.flushBytesWrittenMeter = metrics.GetOrRegisterMeter(namespace+"flush/bytes", nil)

// WAL size metrics
db.walSizeGauge = metrics.GetOrRegisterGauge(namespace+"wal/size", nil)
db.walPhysicalSizeGauge = metrics.GetOrRegisterGauge(namespace+"wal/physicalsize", nil)
db.walObsoleteSizeGauge = metrics.GetOrRegisterGauge(namespace+"wal/obsoletesize", nil)

// Snapshot metrics
db.snapshotCountGauge = metrics.GetOrRegisterGauge(namespace+"snapshots/count", nil)

// Calculated amplification metrics
db.calcWriteAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"amplification/write/calculated", nil)
db.calcReadAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"amplification/read/calculated", nil)
db.calcSpaceAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"amplification/space/calculated", nil)
db.walWriteAmpGauge = metrics.GetOrRegisterGaugeFloat64(namespace+"amplification/wal", nil)
db.actualDataSizeGauge = metrics.GetOrRegisterGauge(namespace+"disk/actualsize", nil)

// Start up the metrics gathering and return
go db.meter(metricsGatheringInterval, namespace)
return db, nil
Expand Down Expand Up @@ -540,7 +624,9 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
compWrites [2]int64
compReads [2]int64

nWrites [2]int64
nWrites [2]int64
flushBytes [2]int64 // Add tracking for flush bytes
walWrites [2]int64 // Track WAL writes separately

writeDelayTimes [2]int64
writeDelayCounts [2]int64
Expand All @@ -566,18 +652,35 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
writeDelayCounts[i%2] = writeDelayCount
compTimes[i%2] = compTime

var totalFlushBytes int64
for _, levelMetrics := range stats.Levels {
nWrite += int64(levelMetrics.BytesCompacted)
nWrite += int64(levelMetrics.BytesFlushed)
// Don't add to nWrite yet - we'll calculate physical writes separately
compWrite += int64(levelMetrics.BytesCompacted)
compRead += int64(levelMetrics.BytesRead)
totalFlushBytes += int64(levelMetrics.BytesFlushed)
}

// Track both logical and physical WAL metrics
walLogicalWrites := int64(stats.WAL.BytesWritten)
walPhysicalSize := int64(stats.WAL.PhysicalSize)

// Calculate physical writes including WAL overhead
// For nWrite, we need to account for physical WAL overhead
// Use the ratio of physical/logical for current WAL as a multiplier
walOverheadRatio := 1.0
if stats.WAL.BytesWritten > 0 {
walOverheadRatio = float64(walPhysicalSize) / float64(stats.WAL.BytesWritten)
}

nWrite += int64(stats.WAL.BytesWritten)
// Estimate physical writes as: SST writes + (logical WAL * overhead ratio)
// This gives us a better approximation of actual disk I/O
nWrite = compWrite + totalFlushBytes + int64(float64(walLogicalWrites)*walOverheadRatio)

compWrites[i%2] = compWrite
compReads[i%2] = compRead
nWrites[i%2] = nWrite
walWrites[i%2] = walLogicalWrites
flushBytes[i%2] = totalFlushBytes

d.writeDelayNMeter.Mark(writeDelayCounts[i%2] - writeDelayCounts[(i-1)%2])
d.writeDelayMeter.Mark(writeDelayTimes[i%2] - writeDelayTimes[(i-1)%2])
Expand Down Expand Up @@ -616,14 +719,85 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
d.filterHitGauge.Update(stats.Filter.Hits)
d.filterMissGauge.Update(stats.Filter.Misses)

// Update read amplification metric
// ReadAmp returns the current read amplification of the database
d.readAmpGauge.Update(float64(stats.ReadAmp()))

// Track detailed I/O metrics
var (
totalSSTBytesRead int64
totalSSTBytesWritten int64
)

// Calculate and update write amplification metrics per level
for i, level := range stats.Levels {
// Append metrics for additional layers
if i >= len(d.levelsGauge) {
d.levelsGauge = append(d.levelsGauge, metrics.GetOrRegisterGauge(namespace+fmt.Sprintf("tables/level%v", i), nil))
d.levelWriteAmpGauge = append(d.levelWriteAmpGauge, metrics.GetOrRegisterGaugeFloat64(namespace+fmt.Sprintf("writeamp/level%v", i), nil))
d.levelSizeGauge = append(d.levelSizeGauge, metrics.GetOrRegisterGauge(namespace+fmt.Sprintf("size/level%v", i), nil))
d.levelScoreGauge = append(d.levelScoreGauge, metrics.GetOrRegisterGauge(namespace+fmt.Sprintf("score/level%v", i), nil))
d.keysCountGauge = append(d.keysCountGauge, metrics.GetOrRegisterGauge(namespace+fmt.Sprintf("keys/level%v", i), nil))
}
d.levelsGauge[i].Update(level.NumFiles)

// Update write amplification for this level
writeAmp := level.WriteAmp()
d.levelWriteAmpGauge[i].Update(writeAmp)

// Update level size
d.levelSizeGauge[i].Update(level.Size)

// Update compaction score (>1.0 means level needs compaction)
d.levelScoreGauge[i].Update(int64(level.Score * 1000)) // Multiply by 1000 for precision

// Update keys count per level
d.keysCountGauge[i].Update(level.NumFiles) // Approximate by file count

// Accumulate I/O stats (these are cumulative from Pebble)
totalSSTBytesRead += int64(level.BytesRead)
totalSSTBytesWritten += int64(level.BytesCompacted)
}
// Update I/O meters (mark only the delta since last measurement)
if i > 1 {
deltaRead := totalSSTBytesRead - compReads[(i-1)%2]
deltaWrite := totalSSTBytesWritten - compWrites[(i-1)%2]
deltaWAL := walWrites[i%2] - walWrites[(i-1)%2]
deltaFlush := flushBytes[i%2] - flushBytes[(i-1)%2]

// Only mark positive deltas to avoid negative values
if deltaRead > 0 {
d.sstBytesReadMeter.Mark(deltaRead)
}
if deltaWrite > 0 {
d.sstBytesWrittenMeter.Mark(deltaWrite)
}
// Track WAL logical writes (the actual application data)
if deltaWAL > 0 {
d.walBytesWrittenMeter.Mark(deltaWAL)
}
if deltaFlush > 0 {
d.flushBytesWrittenMeter.Mark(deltaFlush)
}
}

// Calculate total write amplification using Pebble's built-in method
totalMetrics := stats.Total()
totalWriteAmp := totalMetrics.WriteAmp()
d.totalWriteAmpGauge.Update(totalWriteAmp)

// Update WAL metrics
d.walFileCountGauge.Update(stats.WAL.Files)
d.walSizeGauge.Update(int64(stats.WAL.Size))
d.walPhysicalSizeGauge.Update(int64(stats.WAL.PhysicalSize))
d.walObsoleteSizeGauge.Update(int64(stats.WAL.ObsoletePhysicalSize))

// Update snapshot count
d.snapshotCountGauge.Update(int64(stats.Snapshots.Count))

// Calculate and update custom amplification metrics
d.updateCalculatedAmplifications(stats)

// Sleep a bit, then repeat the stats collection
select {
case errc = <-d.quitChan:
Expand Down Expand Up @@ -797,3 +971,101 @@ func (iter *pebbleIterator) Release() {
iter.released = true
}
}

// updateCalculatedAmplifications calculates and updates custom amplification metrics
func (d *Database) updateCalculatedAmplifications(stats *pebble.Metrics) {
// Calculate Write Amplification for the database
calcWriteAmp := d.calculateDatabaseWriteAmp(stats)
if calcWriteAmp >= 0 {
d.calcWriteAmpGauge.Update(calcWriteAmp)
}

// Calculate WAL Write Amplification
walWriteAmp := d.calculateWALWriteAmp(stats)
if walWriteAmp >= 0 {
d.walWriteAmpGauge.Update(walWriteAmp)
}

// Calculate Read Amplification (same as Pebble's built-in metric)
// This represents how many levels/sublevels need to be checked for a read
readAmp := float64(stats.ReadAmp())
d.calcReadAmpGauge.Update(readAmp)

// Calculate Space Amplification: Total disk space / Actual user data size
// This represents how much extra space is used compared to the logical data size
diskSpaceUsed := int64(stats.DiskSpaceUsage())

// Calculate actual user data size (sum of all live SST file sizes)
// This excludes obsolete files, WAL files, and internal metadata
// level.Size is the CURRENT live size, which already accounts for deleted files
var actualDataSize int64
for _, level := range stats.Levels {
actualDataSize += level.Size
}

d.actualDataSizeGauge.Update(actualDataSize)

if actualDataSize > 0 {
// Space Amp = Total disk usage / Live data size
// A value of 1.0 means no amplification (ideal)
// A value of 2.0 means using 2x the space of actual data
spaceAmp := float64(diskSpaceUsed) / float64(actualDataSize)
d.calcSpaceAmpGauge.Update(spaceAmp)
}
}

// calculateDatabaseWriteAmp calculates the write amplification for database writes.
func (d *Database) calculateDatabaseWriteAmp(stats *pebble.Metrics) float64 {
var totalBytesIn uint64
for _, level := range stats.Levels {
totalBytesIn += level.BytesIn
}

if totalBytesIn == 0 {
return -1
}

// Calculate SST writes (cumulative)
var totalSSTWrites uint64
for _, level := range stats.Levels {
totalSSTWrites += level.BytesFlushed + level.BytesCompacted
}

// WAL.BytesWritten is the cumulative physical bytes written to .log files
// This already includes:
// - Record headers and checksums
// - Batching overhead
// - fsync/sync overhead
//
// But it does NOT include:
// - Block alignment padding
// - Pre-allocated space
// - Recycled file space
//
// The ratio BytesWritten/BytesIn gives us the WAL encoding overhead
walBytesWritten := stats.WAL.BytesWritten

// Calculate total physical writes
totalPhysicalWrites := walBytesWritten + totalSSTWrites

// Write Amplification = Total physical writes / Logical user input
writeAmp := float64(totalPhysicalWrites) / float64(totalBytesIn)

return writeAmp
}

// calculateWALWriteAmp calculates WAL-specific write amplification
func (d *Database) calculateWALWriteAmp(stats *pebble.Metrics) float64 {
if stats.WAL.BytesIn == 0 {
return -1
}

// WAL write amplification = Physical writes / Logical application writes
// This captures the overhead from:
// - WAL record format (headers, checksums)
// - Batching (multiple logical writes in one physical write)
// - Sync overhead
walWriteAmp := float64(stats.WAL.BytesWritten) / float64(stats.WAL.BytesIn)

return walWriteAmp
}
Loading