Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Agent struct {
systemInfo system.Info // Host system info
gpuManager *GPUManager // Manages GPU data
cache *systemDataCache // Cache for system stats based on cache time
offlineCache *systemOfflineCache // Cache for offline data storage
connectionManager *ConnectionManager // Channel to signal connection events
handlerRegistry *HandlerRegistry // Registry for routing incoming messages
server *ssh.Server // SSH server
Expand All @@ -53,8 +54,9 @@ type Agent struct {
// If the data directory is not set, it will attempt to find the optimal directory.
func NewAgent(dataDir ...string) (agent *Agent, err error) {
agent = &Agent{
fsStats: make(map[string]*system.FsStats),
cache: NewSystemDataCache(),
fsStats: make(map[string]*system.FsStats),
cache: NewSystemDataCache(),
offlineCache: NewSystemOfflineCache(),
}

// Initialize disk I/O previous counters storage
Expand Down Expand Up @@ -136,6 +138,11 @@ func NewAgent(dataDir ...string) (agent *Agent, err error) {
slog.Debug("Stats", "data", agent.gatherStats(0))
}

// start offline cache filler
if val, exists := GetEnv("OFFLINE_CACHING"); exists && val == "true" {
go agent.fillOfflineCache()
}

return agent, nil
}

Expand All @@ -158,9 +165,11 @@ func (a *Agent) gatherStats(cacheTimeMs uint16) *system.CombinedData {
return data
}

now := time.Now()
*data = system.CombinedData{
Stats: a.getSystemStats(cacheTimeMs),
Info: a.systemInfo,
Stats: a.getSystemStats(cacheTimeMs),
Info: a.systemInfo,
Timestamp: &now,
}
// slog.Info("System data", "data", data, "cacheTimeMs", cacheTimeMs)

Expand Down Expand Up @@ -242,3 +251,15 @@ func (a *Agent) getFingerprint() string {

return fingerprint
}

func (a *Agent) fillOfflineCache() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for range ticker.C {
if a.connectionManager.State == Disconnected {
data := a.gatherStats(0)
a.offlineCache.Add(*data)
}
}
}
37 changes: 37 additions & 0 deletions agent/agent_offline_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package agent

import (
"sync"

"github.com/henrygd/beszel/internal/entities/system"
)

type systemOfflineCache struct {
sync.Mutex
cache []*system.CombinedData
}

func NewSystemOfflineCache() *systemOfflineCache {
return &systemOfflineCache{
cache: make([]*system.CombinedData, 0),
}
}

// GetAll retrieves all cached combined data and clears the cache.
func (c *systemOfflineCache) GetAll() (result []*system.CombinedData) {
c.Lock()
defer c.Unlock()

result = append(result, c.cache...)
c.cache = c.cache[:0]

return result
}

// Add appends a new combined data snapshot to the cache.
func (c *systemOfflineCache) Add(data system.CombinedData) {
c.Lock()
defer c.Unlock()

c.cache = append(c.cache, &data)
}
2 changes: 1 addition & 1 deletion agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (client *WebSocketClient) sendResponse(data any, requestID *uint32) error {

// Set the appropriate typed field based on data type
switch v := data.(type) {
case *system.CombinedData:
case []*system.CombinedData:
response.SystemData = v
case *common.FingerprintResponse:
response.Fingerprint = v
Expand Down
6 changes: 4 additions & 2 deletions agent/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func (h *GetDataHandler) Handle(hctx *HandlerContext) error {
var options common.DataRequestOptions
_ = cbor.Unmarshal(hctx.Request.Data, &options)

sysStats := hctx.Agent.gatherStats(options.CacheTimeMs)
return hctx.SendResponse(sysStats, hctx.RequestID)
data := hctx.Agent.offlineCache.GetAll()
data = append(data, hctx.Agent.gatherStats(options.CacheTimeMs))

return hctx.SendResponse(data, hctx.RequestID)
}

////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (a *Agent) handleSSHRequest(w io.Writer, req *common.HubRequest[cbor.RawMes
sshResponder := func(data any, requestID *uint32) error {
response := common.AgentResponse{Id: requestID}
switch v := data.(type) {
case *system.CombinedData:
case []*system.CombinedData:
response.SystemData = v
case string:
response.String = &v
Expand Down
16 changes: 8 additions & 8 deletions internal/alerts/alerts_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,29 +108,29 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
}

systemStats := []struct {
Stats []byte `db:"stats"`
Created types.DateTime `db:"created"`
Stats []byte `db:"stats"`
Timestamp types.DateTime `db:"timestamp"`
}{}

err = am.hub.DB().
Select("stats", "created").
Select("stats", "timestamp").
From("system_stats").
Where(dbx.NewExp(
"system={:system} AND type='1m' AND created > {:created}",
"system={:system} AND type='1m' AND timestamp > {:timestamp}",
dbx.Params{
"system": systemRecord.Id,
// subtract some time to give us a bit of buffer
"created": oldestTime.Add(-time.Second * 90),
"timestamp": oldestTime.Add(-time.Second * 90),
},
)).
OrderBy("created").
OrderBy("timestamp").
All(&systemStats)
if err != nil || len(systemStats) == 0 {
return err
}

// get oldest record creation time from first record in the slice
oldestRecordTime := systemStats[0].Created.Time()
oldestRecordTime := systemStats[0].Timestamp.Time()
// log.Println("oldestRecordTime", oldestRecordTime.String())

// Filter validAlerts to keep only those with time newer than oldestRecord
Expand All @@ -153,7 +153,7 @@ func (am *AlertManager) HandleSystemAlerts(systemRecord *core.Record, data *syst
for i := range systemStats {
stat := systemStats[i]
// subtract 10 seconds to give a small time buffer
systemStatsCreation := stat.Created.Time().Add(-time.Second * 10)
systemStatsCreation := stat.Timestamp.Time().Add(-time.Second * 10)
if err := json.Unmarshal(stat.Stats, &stats); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/common/common-ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type HubRequest[T any] struct {
// AgentResponse defines the structure for responses sent from agent to hub.
type AgentResponse struct {
Id *uint32 `cbor:"0,keyasint,omitempty"`
SystemData *system.CombinedData `cbor:"1,keyasint,omitempty,omitzero"`
SystemData []*system.CombinedData `cbor:"1,keyasint,omitempty,omitzero"`
Fingerprint *FingerprintResponse `cbor:"2,keyasint,omitempty,omitzero"`
Error string `cbor:"3,keyasint,omitempty,omitzero"`
String *string `cbor:"4,keyasint,omitempty,omitzero"`
Expand Down
1 change: 1 addition & 0 deletions internal/entities/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,5 @@ type CombinedData struct {
Info Info `json:"info" cbor:"1,keyasint"`
Containers []*container.Stats `json:"container" cbor:"2,keyasint"`
SystemdServices []*systemd.Service `json:"systemd,omitempty" cbor:"3,keyasint,omitempty"`
Timestamp *time.Time `json:"ts,omitempty" cbor:"4,keyasint,omitempty"`
}
124 changes: 64 additions & 60 deletions internal/hub/systems/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,24 @@ import (
)

type System struct {
Id string `db:"id"`
Host string `db:"host"`
Port string `db:"port"`
Status string `db:"status"`
manager *SystemManager // Manager that this system belongs to
client *ssh.Client // SSH client for fetching data
data *system.CombinedData // system data from agent
ctx context.Context // Context for stopping the updater
cancel context.CancelFunc // Stops and removes system from updater
WsConn *ws.WsConn // Handler for agent WebSocket connection
agentVersion semver.Version // Agent version
updateTicker *time.Ticker // Ticker for updating the system
smartOnce sync.Once // Once for fetching and saving smart devices
Id string `db:"id"`
Host string `db:"host"`
Port string `db:"port"`
Status string `db:"status"`
manager *SystemManager // Manager that this system belongs to
client *ssh.Client // SSH client for fetching data
data []*system.CombinedData // system data from agent
ctx context.Context // Context for stopping the updater
cancel context.CancelFunc // Stops and removes system from updater
WsConn *ws.WsConn // Handler for agent WebSocket connection
agentVersion semver.Version // Agent version
updateTicker *time.Ticker // Ticker for updating the system
smartOnce sync.Once // Once for fetching and saving smart devices
}

func (sm *SystemManager) NewSystem(systemId string) *System {
system := &System{
Id: systemId,
data: &system.CombinedData{},
Id: systemId,
}
system.ctx, system.cancel = system.getContext()
return system
Expand Down Expand Up @@ -135,58 +134,66 @@ func (sys *System) handlePaused() {
}

// createRecords updates the system record and adds system_stats and container_stats records
func (sys *System) createRecords(data *system.CombinedData) (*core.Record, error) {
func (sys *System) createRecords(data []*system.CombinedData) (*core.Record, error) {
if len(data) == 0 {
return nil, errors.New("no data to create records")
}

systemRecord, err := sys.getRecord()
if err != nil {
return nil, err
}
hub := sys.manager.hub
err = hub.RunInTransaction(func(txApp core.App) error {
// add system_stats and container_stats records
systemStatsCollection, err := txApp.FindCachedCollectionByNameOrId("system_stats")
if err != nil {
return err
}

systemStatsRecord := core.NewRecord(systemStatsCollection)
systemStatsRecord.Set("system", systemRecord.Id)
systemStatsRecord.Set("stats", data.Stats)
systemStatsRecord.Set("type", "1m")
if err := txApp.SaveNoValidate(systemStatsRecord); err != nil {
return err
}
if len(data.Containers) > 0 {
// add / update containers records
if data.Containers[0].Id != "" {
if err := createContainerRecords(txApp, data.Containers, sys.Id); err != nil {
return err
}
}
// add new container_stats record
containerStatsCollection, err := txApp.FindCachedCollectionByNameOrId("container_stats")
err = hub.RunInTransaction(func(txApp core.App) error {
for _, d := range data { // add system_stats and container_stats records
systemStatsCollection, err := txApp.FindCachedCollectionByNameOrId("system_stats")
if err != nil {
return err
}
containerStatsRecord := core.NewRecord(containerStatsCollection)
containerStatsRecord.Set("system", systemRecord.Id)
containerStatsRecord.Set("stats", data.Containers)
containerStatsRecord.Set("type", "1m")
if err := txApp.SaveNoValidate(containerStatsRecord); err != nil {

systemStatsRecord := core.NewRecord(systemStatsCollection)
systemStatsRecord.Set("timestamp", d.Timestamp)
systemStatsRecord.Set("system", systemRecord.Id)
systemStatsRecord.Set("stats", d.Stats)
systemStatsRecord.Set("type", "1m")
if err := txApp.SaveNoValidate(systemStatsRecord); err != nil {
return err
}
}
if len(d.Containers) > 0 {
// add / update containers records
if d.Containers[0].Id != "" {
if err := createContainerRecords(txApp, d.Containers, sys.Id); err != nil {
return err
}
}
// add new container_stats record
containerStatsCollection, err := txApp.FindCachedCollectionByNameOrId("container_stats")
if err != nil {
return err
}
containerStatsRecord := core.NewRecord(containerStatsCollection)
containerStatsRecord.Set("timestamp", d.Timestamp)
containerStatsRecord.Set("system", systemRecord.Id)
containerStatsRecord.Set("stats", d.Containers)
containerStatsRecord.Set("type", "1m")
if err := txApp.SaveNoValidate(containerStatsRecord); err != nil {
return err
}
}

// add new systemd_stats record
if len(data.SystemdServices) > 0 {
if err := createSystemdStatsRecords(txApp, data.SystemdServices, sys.Id); err != nil {
return err
// add new systemd_stats record
if len(d.SystemdServices) > 0 {
if err := createSystemdStatsRecords(txApp, d.SystemdServices, sys.Id); err != nil {
return err
}
}
}

// update system record (do this last because it triggers alerts and we need above records to be inserted first)
systemRecord.Set("status", up)

systemRecord.Set("info", data.Info)
systemRecord.Set("info", data[0].Info)
if err := txApp.SaveNoValidate(systemRecord); err != nil {
return err
}
Expand Down Expand Up @@ -303,11 +310,7 @@ func (sys *System) getContext() (context.Context, context.CancelFunc) {

// fetchDataFromAgent attempts to fetch data from the agent,
// prioritizing WebSocket if available.
func (sys *System) fetchDataFromAgent(options common.DataRequestOptions) (*system.CombinedData, error) {
if sys.data == nil {
sys.data = &system.CombinedData{}
}

func (sys *System) fetchDataFromAgent(options common.DataRequestOptions) ([]*system.CombinedData, error) {
if sys.WsConn != nil && sys.WsConn.IsConnected() {
wsData, err := sys.fetchDataViaWebSocket(options)
if err == nil {
Expand All @@ -324,11 +327,11 @@ func (sys *System) fetchDataFromAgent(options common.DataRequestOptions) (*syste
return sshData, nil
}

func (sys *System) fetchDataViaWebSocket(options common.DataRequestOptions) (*system.CombinedData, error) {
func (sys *System) fetchDataViaWebSocket(options common.DataRequestOptions) ([]*system.CombinedData, error) {
if sys.WsConn == nil || !sys.WsConn.IsConnected() {
return nil, errors.New("no websocket connection")
}
err := sys.WsConn.RequestSystemData(context.Background(), sys.data, options)
err := sys.WsConn.RequestSystemData(context.Background(), &sys.data, options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -448,7 +451,7 @@ func makeStableHashId(strings ...string) string {
// fetchDataViaSSH handles fetching data using SSH.
// This function encapsulates the original SSH logic.
// It updates sys.data directly upon successful fetch.
func (sys *System) fetchDataViaSSH(options common.DataRequestOptions) (*system.CombinedData, error) {
func (sys *System) fetchDataViaSSH(options common.DataRequestOptions) ([]*system.CombinedData, error) {
err := sys.runSSHOperation(4*time.Second, 1, func(session *ssh.Session) (bool, error) {
stdout, err := session.StdoutPipe()
if err != nil {
Expand All @@ -459,16 +462,17 @@ func (sys *System) fetchDataViaSSH(options common.DataRequestOptions) (*system.C
return false, err
}

*sys.data = system.CombinedData{}

sys.data = sys.data[:0]
if sys.agentVersion.GTE(beszel.MinVersionAgentResponse) && stdinErr == nil {
req := common.HubRequest[any]{Action: common.GetData, Data: options}
_ = cbor.NewEncoder(stdin).Encode(req)
_ = stdin.Close()

var resp common.AgentResponse
if decErr := cbor.NewDecoder(stdout).Decode(&resp); decErr == nil && resp.SystemData != nil {
*sys.data = *resp.SystemData
if resp.SystemData != nil {
sys.data = append(sys.data, resp.SystemData...)
}
if err := session.Wait(); err != nil {
return false, err
}
Expand Down
Loading