Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 195 additions & 3 deletions agent/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ const (
dockerTimeoutMs = 2100
// Maximum realistic network speed (5 GB/s) to detect bad deltas
maxNetworkSpeedBps uint64 = 5e9
// Container and health constants
composeProjectLabel = "com.docker.compose.project"
healthStatusNone = "none"
containerStateRunning = "running"
containerStateUnknown = "unknown"
volumeTypeVolume = "volume"
diskOpRead = "read"
diskOpReadCap = "Read"
diskOpWrite = "write"
diskOpWriteCap = "Write"
// Maximum conceivable memory usage of a container (100TB) to detect bad memory stats
maxMemoryUsage uint64 = 100 * 1024 * 1024 * 1024 * 1024
)
Expand All @@ -42,6 +52,8 @@ type dockerManager struct {
buf *bytes.Buffer // Buffer to store and read response bodies
decoder *json.Decoder // Reusable JSON decoder that reads from buf
apiStats *container.ApiStats // Reusable API stats object
volumeSizeCache map[string]float64 // Cached volume sizes (name -> size in MB)
volumeSizeUpdated time.Time // Last time volume sizes were updated

// Cache-time-aware tracking for CPU stats (similar to cpu.go)
// Maps cache time intervals to container-specific CPU usage tracking
Expand All @@ -53,6 +65,11 @@ type dockerManager struct {
// cacheTimeMs -> DeltaTracker for network bytes sent/received
networkSentTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
networkRecvTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]

// Disk I/O delta trackers - one per cache time to avoid interference
// cacheTimeMs -> DeltaTracker for disk bytes read/written
diskReadTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
diskWriteTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
}

// userAgentRoundTripper is a custom http.RoundTripper that adds a User-Agent header to all requests
Expand Down Expand Up @@ -159,8 +176,9 @@ func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats,
}
}

// prepare network trackers for next interval for this cache time
// prepare network and disk trackers for next interval for this cache time
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
dm.cycleDiskDeltasForCacheTime(cacheTimeMs)

return stats, nil
}
Expand Down Expand Up @@ -239,6 +257,32 @@ func (dm *dockerManager) cycleNetworkDeltasForCacheTime(cacheTimeMs uint16) {
}
}

// getDiskTracker returns the DeltaTracker for disk I/O for a specific cache time, creating it if needed
func (dm *dockerManager) getDiskTracker(cacheTimeMs uint16, isRead bool) *deltatracker.DeltaTracker[string, uint64] {
var trackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
if isRead {
trackers = dm.diskReadTrackers
} else {
trackers = dm.diskWriteTrackers
}

if trackers[cacheTimeMs] == nil {
trackers[cacheTimeMs] = deltatracker.NewDeltaTracker[string, uint64]()
}

return trackers[cacheTimeMs]
}

// cycleDiskDeltasForCacheTime cycles the disk delta trackers for a specific cache time
func (dm *dockerManager) cycleDiskDeltasForCacheTime(cacheTimeMs uint16) {
if dm.diskReadTrackers[cacheTimeMs] != nil {
dm.diskReadTrackers[cacheTimeMs].Cycle()
}
if dm.diskWriteTrackers[cacheTimeMs] != nil {
dm.diskWriteTrackers[cacheTimeMs].Cycle()
}
}

// calculateNetworkStats calculates network sent/receive deltas using DeltaTracker
func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, stats *container.Stats, initialized bool, name string, cacheTimeMs uint16) (uint64, uint64) {
var total_sent, total_recv uint64
Expand Down Expand Up @@ -284,6 +328,50 @@ func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats
return sent_delta, recv_delta
}

// calculateDiskStats calculates disk read/write deltas using DeltaTracker
func (dm *dockerManager) calculateDiskStats(ctr *container.ApiInfo, apiStats *container.ApiStats, stats *container.Stats, initialized bool, cacheTimeMs uint16) (uint64, uint64) {
var total_read, total_write uint64
for _, entry := range apiStats.BlkioStats.IoServiceBytesRecursive {
switch entry.Op {
case diskOpRead, diskOpReadCap:
total_read += entry.Value
case diskOpWrite, diskOpWriteCap:
total_write += entry.Value
}
}

// Get the DeltaTracker for this specific cache time
readTracker := dm.getDiskTracker(cacheTimeMs, true)
writeTracker := dm.getDiskTracker(cacheTimeMs, false)

// Set current values in the cache-time-specific DeltaTracker
readTracker.Set(ctr.IdShort, total_read)
writeTracker.Set(ctr.IdShort, total_write)

// Get deltas (bytes since last measurement)
read_delta_raw := readTracker.Delta(ctr.IdShort)
write_delta_raw := writeTracker.Delta(ctr.IdShort)

// Calculate bytes per second if we have previous data
var read_delta, write_delta uint64
if initialized {
millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds())
if millisecondsElapsed > 0 {
if read_delta_raw > 0 {
read_delta = read_delta_raw * 1000 / millisecondsElapsed
}
if write_delta_raw > 0 {
write_delta = write_delta_raw * 1000 / millisecondsElapsed
}
}
}

// Store current disk values for legacy compatibility
stats.PrevDisk.Read, stats.PrevDisk.Write = total_read, total_write

return read_delta, write_delta
}

// validateCpuPercentage checks if CPU percentage is within valid range
func validateCpuPercentage(cpuPct float64, containerName string) error {
if cpuPct > 100 {
Expand All @@ -293,11 +381,13 @@ func validateCpuPercentage(cpuPct float64, containerName string) error {
}

// updateContainerStatsValues updates the final stats values
func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta uint64, readTime time.Time) {
func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta, read_delta, write_delta uint64, readTime time.Time) {
stats.Cpu = twoDecimals(cpuPct)
stats.Mem = bytesToMegabytes(float64(usedMemory))
stats.NetworkSent = bytesToMegabytes(float64(sent_delta))
stats.NetworkRecv = bytesToMegabytes(float64(recv_delta))
stats.DiskRead = bytesToMegabytes(float64(read_delta))
stats.DiskWrite = bytesToMegabytes(float64(write_delta))
stats.PrevReadTime = readTime
}

Expand All @@ -320,11 +410,64 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeM
dm.containerStatsMap[ctr.IdShort] = stats
}

// Update name in case it changed
stats.Name = name

// Set container metadata
stats.IdShort = ctr.IdShort
stats.Status = ctr.State
if stats.Status == "" {
stats.Status = containerStateUnknown
}

// Set health status
stats.Health = healthStatusNone
if ctr.Health != "" {
stats.Health = ctr.Health
}

// Set Docker Compose project name
if ctr.Labels != nil {
if projectName, exists := ctr.Labels[composeProjectLabel]; exists {
stats.Project = projectName
}
}

// Calculate uptime for running containers
if ctr.StartedAt > 0 && stats.Status == containerStateRunning {
startedTime := time.Unix(ctr.StartedAt, 0)
stats.Uptime = twoDecimals(time.Since(startedTime).Seconds())
} else {
stats.Uptime = 0
}

// Collect volume information and fetch sizes
volumeCount := 0
for _, mount := range ctr.Mounts {
if mount.Type == volumeTypeVolume && mount.Name != "" {
volumeCount++
}
}
if volumeCount > 0 {
stats.Volumes = make(map[string]float64, volumeCount)
for _, mount := range ctr.Mounts {
if mount.Type == volumeTypeVolume && mount.Name != "" {
// Fetch volume size using Docker system df API
size := dm.getVolumeSize(mount.Name)
stats.Volumes[mount.Name] = size
}
}
} else {
stats.Volumes = nil
}

// reset current stats
stats.Cpu = 0
stats.Mem = 0
stats.NetworkSent = 0
stats.NetworkRecv = 0
stats.DiskRead = 0
stats.DiskWrite = 0

res := dm.apiStats
res.Networks = nil
Expand Down Expand Up @@ -374,8 +517,11 @@ func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeM
}
stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv

// Calculate disk I/O stats using DeltaTracker
read_delta, write_delta := dm.calculateDiskStats(ctr, res, stats, initialized, cacheTimeMs)

// Update final stats values
updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, res.Read)
updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, read_delta, write_delta, res.Read)
// store per-cache-time read time for Windows CPU percent calc
dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort] = res.Read

Expand Down Expand Up @@ -460,13 +606,16 @@ func newDockerManager(a *Agent) *dockerManager {
sem: make(chan struct{}, 5),
apiContainerList: []*container.ApiInfo{},
apiStats: &container.ApiStats{},
volumeSizeCache: make(map[string]float64),

// Initialize cache-time-aware tracking structures
lastCpuContainer: make(map[uint16]map[string]uint64),
lastCpuSystem: make(map[uint16]map[string]uint64),
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
diskReadTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
diskWriteTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
}

// If using podman, return client
Expand Down Expand Up @@ -521,6 +670,49 @@ func (dm *dockerManager) checkDockerVersion() {
}
}

// getVolumeSize returns the cached size of a Docker volume
// Refreshes the cache every 5 minutes using the system df API
// Returns size in MB (megabytes)
func (dm *dockerManager) getVolumeSize(volumeName string) float64 {
// Refresh cache if older than 5 minutes
if time.Since(dm.volumeSizeUpdated) > 5*time.Minute {
dm.refreshVolumeSizes()
}

return dm.volumeSizeCache[volumeName]
}

// refreshVolumeSizes fetches all volume sizes from Docker and updates the cache
func (dm *dockerManager) refreshVolumeSizes() {
type volumeInfo struct {
Name string
UsageData struct {
Size int64
}
}
type systemDfResponse struct {
Volumes []volumeInfo
}

resp, err := dm.client.Get("http://localhost/system/df")
if err != nil {
return
}

var dfData systemDfResponse
if err := dm.decode(resp, &dfData); err != nil {
return
}

// Update all volume sizes in cache
for _, vol := range dfData.Volumes {
// Convert bytes to MB (megabytes)
dm.volumeSizeCache[vol.Name] = float64(vol.UsageData.Size) / 1_000_000
}

dm.volumeSizeUpdated = time.Now()
}

// Decodes Docker API JSON response using a reusable buffer and decoder. Not thread safe.
func (dm *dockerManager) decode(resp *http.Response, d any) error {
if dm.buf == nil {
Expand Down
Loading