Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions beszel/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/containrrr/shoutrrr v0.8.0
github.com/gliderlabs/ssh v0.3.8
github.com/goccy/go-json v0.10.5
github.com/luthermonson/go-proxmox v0.2.1
github.com/pocketbase/dbx v1.11.0
github.com/pocketbase/pocketbase v0.25.0
github.com/rhysd/go-github-selfupdate v1.2.3
Expand Down Expand Up @@ -41,8 +42,10 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.14 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.14 // indirect
github.com/aws/smithy-go v1.22.2 // indirect
github.com/buger/goterm v1.0.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/disintegration/imaging v1.6.2 // indirect
github.com/diskfs/go-diskfs v1.2.0 // indirect
github.com/domodwyer/mailyak/v3 v3.6.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ebitengine/purego v0.8.2 // indirect
Expand All @@ -57,9 +60,12 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jinzhu/copier v0.3.4 // indirect
github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 // indirect
github.com/magefile/mage v1.14.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
Expand All @@ -86,6 +92,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20250204164813-702378808489 // indirect
google.golang.org/grpc v1.70.0 // indirect
google.golang.org/protobuf v1.36.4 // indirect
gopkg.in/djherbis/times.v1 v1.2.0 // indirect
modernc.org/libc v1.55.3 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.8.2 // indirect
Expand Down
49 changes: 49 additions & 0 deletions beszel/go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions beszel/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Agent struct {
sensorsWhitelist map[string]struct{} // List of sensors to monitor
systemInfo system.Info // Host system info
gpuManager *GPUManager // Manages GPU data
pveManager *pveManager // Manages Proxmox API requests
}

func NewAgent() *Agent {
Expand Down Expand Up @@ -75,6 +76,7 @@ func NewAgent() *Agent {
agent.initializeDiskInfo()
agent.initializeNetIoStats()
agent.dockerManager = newDockerManager(agent)
agent.pveManager = newPVEManager(agent)

// initialize GPU manager
if gm, err := NewGPUManager(); err != nil {
Expand Down Expand Up @@ -116,6 +118,12 @@ func (a *Agent) gatherStats() system.CombinedData {
} else {
slog.Debug("Error getting docker stats", "err", err)
}
// add pve stats
if pveStats, err := a.pveManager.getPVEStats(); err == nil {
systemData.PveContainers = pveStats
} else {
slog.Error("Error getting pve stats", "err", err)
}
// add extra filesystems
systemData.Stats.ExtraFs = make(map[string]*system.FsStats)
for name, stats := range a.fsStats {
Expand Down
136 changes: 136 additions & 0 deletions beszel/internal/agent/pve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package agent

import (
"beszel/internal/entities/container"
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"time"

"github.com/luthermonson/go-proxmox"
)

type pveManager struct {
client *proxmox.Client // Client to query PVE API
nodeName string // Cluster node name
cpuCount int // CPU count on node
containerStatsMap map[string]*container.Stats // Keeps track of container stats
}

// Returns stats for all running containers
func (pm *pveManager) getPVEStats() ([]*container.Stats, error) {
if pm.client == nil {
return nil, errors.New("PVE client not configured")
}
cluster, err := pm.client.Cluster(context.Background())
if err != nil {
return nil, err
}
resources, err := cluster.Resources(context.Background(), "vm")
if err != nil {
return nil, err
}

containersLength := len(resources)

var containerIds = make(map[string]struct{}, containersLength)

// only include vms and lxcs on selected node
for _, resource := range resources {
if resource.Node == pm.nodeName {
containerIds[resource.ID] = struct{}{}
}
}
// remove invalid container stats
for id := range pm.containerStatsMap {
if _, exists := containerIds[id]; !exists {
delete(pm.containerStatsMap, id)
}
}

// populate stats
stats := make([]*container.Stats, 0, len(containerIds))
for _, resource := range resources {
if _, exists := containerIds[resource.ID]; !exists {
continue
}
resourceStats, initialized := pm.containerStatsMap[resource.ID]
if !initialized {
resourceStats = &container.Stats{}
pm.containerStatsMap[resource.ID] = resourceStats
}
// reset current stats
resourceStats.Name = fmt.Sprintf("%s (%s)", resource.Name, resource.Type)
resourceStats.Cpu = 0
resourceStats.Mem = 0
resourceStats.NetworkSent = 0
resourceStats.NetworkRecv = 0
// prevent first run from sending all prev sent/recv bytes
total_sent := uint64(resource.NetOut)
total_recv := uint64(resource.NetIn)
var sent_delta, recv_delta float64
if initialized {
secondsElapsed := time.Since(resourceStats.PrevNet.Time).Seconds()
sent_delta = float64(total_sent-resourceStats.PrevNet.Sent) / secondsElapsed
recv_delta = float64(total_recv-resourceStats.PrevNet.Recv) / secondsElapsed
}
resourceStats.PrevNet.Sent = total_sent
resourceStats.PrevNet.Recv = total_recv
resourceStats.PrevNet.Time = time.Now()

resourceStats.Cpu = twoDecimals(100.0 * resource.CPU * float64(resource.MaxCPU) / float64(pm.cpuCount))
resourceStats.Mem = bytesToMegabytes(float64(resource.Mem))
resourceStats.NetworkSent = bytesToMegabytes(sent_delta)
resourceStats.NetworkRecv = bytesToMegabytes(recv_delta)

stats = append(stats, resourceStats)
}

return stats, nil
}

// Creates a new PVE client
func newPVEManager(_ *Agent) *pveManager {
url, exists := GetEnv("PROXMOX_URL")
if !exists {
url = "https://localhost:8006/api2/json"
}
nodeName, nodeNameExists := GetEnv("PROXMOX_NODE")
tokenID, tokenIDExists := GetEnv("PROXMOX_TOKENID")
secret, secretExists := GetEnv("PROXMOX_SECRET")
var client *proxmox.Client
if nodeNameExists && tokenIDExists && secretExists {
insecureHTTPClient := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
client = proxmox.NewClient(url,
proxmox.WithHTTPClient(&insecureHTTPClient),
proxmox.WithAPIToken(tokenID, secret),
)
} else {
client = nil
}

pveManager := &pveManager{
client: client,
nodeName: nodeName,
containerStatsMap: make(map[string]*container.Stats),
}
// Retrieve node cpu count
if client != nil {
node, err := client.Node(context.Background(), nodeName)
if err != nil {
pveManager.client = nil
} else {
pveManager.cpuCount = node.CPUInfo.CPUs
}
}

return pveManager
}
7 changes: 4 additions & 3 deletions beszel/internal/entities/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type Info struct {

// Final data structure to return to the hub
type CombinedData struct {
Stats Stats `json:"stats"`
Info Info `json:"info"`
Containers []*container.Stats `json:"container"`
Stats Stats `json:"stats"`
Info Info `json:"info"`
Containers []*container.Stats `json:"container"`
PveContainers []*container.Stats `json:"pve_container"`
}
48 changes: 33 additions & 15 deletions beszel/internal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import (

type Hub struct {
*pocketbase.PocketBase
sshClientConfig *ssh.ClientConfig
pubKey string
am *alerts.AlertManager
um *users.UserManager
rm *records.RecordManager
systemStats *core.Collection
containerStats *core.Collection
appURL string
sshClientConfig *ssh.ClientConfig
pubKey string
am *alerts.AlertManager
um *users.UserManager
rm *records.RecordManager
systemStats *core.Collection
containerStats *core.Collection
pveContainerStats *core.Collection
appURL string
}

// NewHub creates a new Hub instance with default configuration
Expand Down Expand Up @@ -184,8 +185,8 @@ func (h *Hub) Run() {
h.Cron().MustAdd("delete old records", "8 * * * *", h.rm.DeleteOldRecords)
// create longer records every 10 minutes
h.Cron().MustAdd("create longer records", "*/10 * * * *", func() {
if systemStats, containerStats, err := h.getCollections(); err == nil {
h.rm.CreateLongerRecords([]*core.Collection{systemStats, containerStats})
if systemStats, containerStats, pveContainerStats, err := h.getCollections(); err == nil {
h.rm.CreateLongerRecords([]*core.Collection{systemStats, containerStats, pveContainerStats})
}
})
return se.Next()
Expand Down Expand Up @@ -351,7 +352,7 @@ func (h *Hub) updateSystem(record *core.Record) {
h.Logger().Error("Failed to update record: ", "err", err.Error())
}
// add system_stats and container_stats records
if systemStats, containerStats, err := h.getCollections(); err != nil {
if systemStats, containerStats, pveContainerStats, err := h.getCollections(); err != nil {
h.Logger().Error("Failed to get collections: ", "err", err.Error())
} else {
// add new system_stats record
Expand All @@ -372,6 +373,16 @@ func (h *Hub) updateSystem(record *core.Record) {
h.Logger().Error("Failed to save record: ", "err", err.Error())
}
}
// add new pve_container_stats record
if len(systemData.PveContainers) > 0 {
containerStatsRecord := core.NewRecord(pveContainerStats)
containerStatsRecord.Set("system", record.Id)
containerStatsRecord.Set("stats", systemData.PveContainers)
containerStatsRecord.Set("type", "1m")
if err := h.SaveNoValidate(containerStatsRecord); err != nil {
h.Logger().Error("Failed to save record: ", "err", err.Error())
}
}
}

// system info alerts
Expand All @@ -381,22 +392,29 @@ func (h *Hub) updateSystem(record *core.Record) {
}

// return system_stats and container_stats collections
func (h *Hub) getCollections() (*core.Collection, *core.Collection, error) {
func (h *Hub) getCollections() (*core.Collection, *core.Collection, *core.Collection, error) {
if h.systemStats == nil {
systemStats, err := h.FindCollectionByNameOrId("system_stats")
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
h.systemStats = systemStats
}
if h.containerStats == nil {
containerStats, err := h.FindCollectionByNameOrId("container_stats")
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
h.containerStats = containerStats
}
return h.systemStats, h.containerStats, nil
if h.pveContainerStats == nil {
pveContainerStats, err := h.FindCollectionByNameOrId("pve_container_stats")
if err != nil {
return nil, nil, nil, err
}
h.pveContainerStats = pveContainerStats
}
return h.systemStats, h.containerStats, h.pveContainerStats, nil
}

// set system to specified status and save record
Expand Down
4 changes: 3 additions & 1 deletion beszel/internal/records/records.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func (rm *RecordManager) CreateLongerRecords(collections []*core.Collection) {
longerRecord.Set("stats", rm.AverageSystemStats(stats))
case "container_stats":
longerRecord.Set("stats", rm.AverageContainerStats(stats))
case "pve_container_stats":
longerRecord.Set("stats", rm.AverageContainerStats(stats))
}
if err := txApp.SaveNoValidate(longerRecord); err != nil {
log.Println("failed to save longer record", "err", err.Error())
Expand Down Expand Up @@ -330,7 +332,7 @@ func (rm *RecordManager) AverageContainerStats(records RecordStats) []container.

// Deletes records older than what is displayed in the UI
func (rm *RecordManager) DeleteOldRecords() {
collections := []string{"system_stats", "container_stats"}
collections := []string{"system_stats", "container_stats", "pve_container_stats"}
recordData := []RecordDeletionData{
{
recordType: "1m",
Expand Down
Loading