Skip to content
Merged
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/upgrade/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Rollback(ctx context.Context, log *logger.Logger, c client.Client, topDirPa
}

// revert active commit
if err := UpdateActiveCommit(log, topDirPath, prevHash); err != nil {
if err := UpdateActiveCommit(log, topDirPath, prevHash, os.WriteFile); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/agent/application/upgrade/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ func createUpdateMarker(t *testing.T, log *logger.Logger, topDir, newAgentVersio
hash: oldAgentHash,
versionedHome: oldAgentVersionedHome,
}

markUpgrade := markUpgradeProvider(UpdateActiveCommit, os.WriteFile)
err := markUpgrade(log,
paths.DataFrom(topDir),
newAgentInstall,
Expand Down
77 changes: 41 additions & 36 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package upgrade

import (
"encoding/json"
goerrors "errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -196,50 +197,54 @@ type agentInstall struct {
versionedHome string
}

// markUpgrade marks update happened so we can handle grace period
func markUpgrade(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error {

if len(previousAgent.hash) > hashLen {
previousAgent.hash = previousAgent.hash[:hashLen]
}

marker := &UpdateMarker{
Version: agent.version,
Hash: agent.hash,
VersionedHome: agent.versionedHome,
UpdatedOn: time.Now(),
PrevVersion: previousAgent.version,
PrevHash: previousAgent.hash,
PrevVersionedHome: previousAgent.versionedHome,
Action: action,
Details: upgradeDetails,
DesiredOutcome: desiredOutcome,
}

markerBytes, err := yaml.Marshal(newMarkerSerializer(marker))
if err != nil {
return errors.New(err, errors.TypeConfig, "failed to parse marker file")
}
type updateActiveCommitFunc func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error

markerPath := markerFilePath(dataDirPath)
log.Infow("Writing upgrade marker file", "file.path", markerPath, "hash", marker.Hash, "prev_hash", marker.PrevHash)
if err := os.WriteFile(markerPath, markerBytes, 0600); err != nil {
return errors.New(err, errors.TypeFilesystem, "failed to create update marker file", errors.M(errors.MetaKeyPath, markerPath))
}
// markUpgrade marks update happened so we can handle grace period
func markUpgradeProvider(updateActiveCommit updateActiveCommitFunc, writeFile writeFileFunc) markUpgradeFunc {
return func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error {

if len(previousAgent.hash) > hashLen {
previousAgent.hash = previousAgent.hash[:hashLen]
}

marker := &UpdateMarker{
Version: agent.version,
Hash: agent.hash,
VersionedHome: agent.versionedHome,
UpdatedOn: time.Now(),
PrevVersion: previousAgent.version,
PrevHash: previousAgent.hash,
PrevVersionedHome: previousAgent.versionedHome,
Action: action,
Details: upgradeDetails,
DesiredOutcome: desiredOutcome,
}

markerBytes, err := yaml.Marshal(newMarkerSerializer(marker))
if err != nil {
return errors.New(err, errors.TypeConfig, "failed to parse marker file")
}

markerPath := markerFilePath(dataDirPath)
log.Infow("Writing upgrade marker file", "file.path", markerPath, "hash", marker.Hash, "prev_hash", marker.PrevHash)
if err := writeFile(markerPath, markerBytes, 0600); err != nil {
return goerrors.Join(err, errors.New(errors.TypeFilesystem, "failed to create update marker file", errors.M(errors.MetaKeyPath, markerPath)))
}

if err := updateActiveCommit(log, paths.Top(), agent.hash, writeFile); err != nil {
return err
}

if err := UpdateActiveCommit(log, paths.Top(), agent.hash); err != nil {
return err
return nil
}

return nil
}

// UpdateActiveCommit updates active.commit file to point to active version.
func UpdateActiveCommit(log *logger.Logger, topDirPath, hash string) error {
func UpdateActiveCommit(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error {
activeCommitPath := filepath.Join(topDirPath, agentCommitFile)
log.Infow("Updating active commit", "file.path", activeCommitPath, "hash", hash)
if err := os.WriteFile(activeCommitPath, []byte(hash), 0600); err != nil {
return errors.New(err, errors.TypeFilesystem, "failed to update active commit", errors.M(errors.MetaKeyPath, activeCommitPath))
if err := writeFile(activeCommitPath, []byte(hash), 0600); err != nil {
return goerrors.Join(err, errors.New(errors.TypeFilesystem, "failed to update active commit", errors.M(errors.MetaKeyPath, activeCommitPath)))
}

return nil
Expand Down
97 changes: 97 additions & 0 deletions internal/pkg/agent/application/upgrade/step_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
package upgrade

import (
"errors"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
)

func TestSaveAndLoadMarker_NoLoss(t *testing.T) {
Expand Down Expand Up @@ -260,3 +264,96 @@ desired_outcome: true
})
}
}

func TestMarkUpgrade(t *testing.T) {
log, _ := loggertest.New("test")
agent := agentInstall{
version: "8.5.0",
hash: "abc123",
versionedHome: "home/v8.5.0",
}
previousAgent := agentInstall{
version: "8.4.0",
hash: "xyz789",
versionedHome: "home/v8.4.0",
}
action := &fleetapi.ActionUpgrade{
ActionID: "action-123",
ActionType: "UPGRADE",
Data: fleetapi.ActionUpgradeData{
Version: "8.5.0",
SourceURI: "https://example.com/upgrade",
},
}
upgradeDetails := details.NewDetails("8.5.0", details.StateScheduled, "action-123")
desiredOutcome := OUTCOME_UPGRADE

testError := errors.New("test error")

type testCase struct {
fileName string
expectedError error
markUpgrade markUpgradeFunc
}

testCases := map[string]testCase{
"should return error if it fails updating the active commit file": {
fileName: "commit",
expectedError: testError,
markUpgrade: markUpgradeProvider(func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error {
return testError
}, func(name string, data []byte, perm os.FileMode) error {
return nil
}),
},
"should return error if it fails writing to marker file": {
fileName: "marker",
expectedError: testError,
markUpgrade: markUpgradeProvider(func(log *logger.Logger, topDirPath, hash string, writeFile writeFileFunc) error {
return nil
}, func(name string, data []byte, perm os.FileMode) error {
return testError
}),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
baseDir := t.TempDir()
paths.SetTop(baseDir)

err := tc.markUpgrade(log, paths.Data(), agent, previousAgent, action, upgradeDetails, desiredOutcome)
require.Error(t, err)
require.ErrorIs(t, err, tc.expectedError)
})
}
}

func TestUpdateActiveCommit(t *testing.T) {
log, _ := loggertest.New("test")
testError := errors.New("test error")
testCases := map[string]struct {
expectedError error
writeFileFunc writeFileFunc
}{
"should return error if it fails writing to file": {
expectedError: testError,
writeFileFunc: func(name string, data []byte, perm os.FileMode) error {
return testError
},
},
"should not return error if it writes to file": {
expectedError: nil,
writeFileFunc: func(name string, data []byte, perm os.FileMode) error {
return nil
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
err := UpdateActiveCommit(log, paths.Top(), "hash", tc.writeFileFunc)
require.ErrorIs(t, err, tc.expectedError)
})
}

}
21 changes: 15 additions & 6 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type unpackHandler interface {
type copyActionStoreFunc func(log *logger.Logger, newHome string) error
type copyRunDirectoryFunc func(log *logger.Logger, oldRunPath, newRunPath string) error
type fileDirCopyFunc func(from, to string, opts ...copy.Options) error
type markUpgradeFunc func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error
type changeSymlinkFunc func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error
type rollbackInstallFunc func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error

// Types used to abstract stdlib functions
type mkdirAllFunc func(name string, perm fs.FileMode) error
Expand All @@ -104,6 +107,9 @@ type Upgrader struct {
extractAgentVersion func(metadata packageMetadata, upgradeVersion string) agentVersion
copyActionStore copyActionStoreFunc
copyRunDirectory copyRunDirectoryFunc
markUpgrade markUpgradeFunc
changeSymlink changeSymlinkFunc
rollbackInstall rollbackInstallFunc
}

// IsUpgradeable when agent is installed and running as a service or flag was provided.
Expand All @@ -127,6 +133,9 @@ func NewUpgrader(log *logger.Logger, settings *artifact.Config, agentInfo info.A
extractAgentVersion: extractAgentVersion,
copyActionStore: copyActionStoreProvider(os.ReadFile, os.WriteFile),
copyRunDirectory: copyRunDirectoryProvider(os.MkdirAll, copy.Copy),
markUpgrade: markUpgradeProvider(UpdateActiveCommit, os.WriteFile),
changeSymlink: changeSymlink,
rollbackInstall: rollbackInstall,
}, nil
}

Expand Down Expand Up @@ -360,9 +369,9 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string
return nil, fmt.Errorf("calculating home path relative to top, home: %q top: %q : %w", paths.Home(), paths.Top(), err)
}

if err := changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil {
if err := u.changeSymlink(u.log, paths.Top(), symlinkPath, newPath); err != nil {
u.log.Errorw("Rolling back: changing symlink failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}

Expand All @@ -385,13 +394,13 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string
versionedHome: currentVersionedHome,
}

if err := markUpgrade(u.log,
if err := u.markUpgrade(u.log,
paths.Data(), // data dir to place the marker in
current, // new agent version data
previous, // old agent version data
action, det, OUTCOME_UPGRADE); err != nil {
u.log.Errorw("Rolling back: marking upgrade failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}

Expand All @@ -400,14 +409,14 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string
var watcherCmd *exec.Cmd
if watcherCmd, err = InvokeWatcher(u.log, watcherExecutable); err != nil {
u.log.Errorw("Rolling back: starting watcher failed", "error.message", err)
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(err, rollbackErr)
}

watcherWaitErr := waitForWatcher(ctx, u.log, markerFilePath(paths.Data()), watcherMaxWaitTime)
if watcherWaitErr != nil {
killWatcherErr := watcherCmd.Process.Kill()
rollbackErr := rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
rollbackErr := u.rollbackInstall(ctx, u.log, paths.Top(), hashedDir, currentVersionedHome)
return nil, goerrors.Join(watcherWaitErr, killWatcherErr, rollbackErr)
}

Expand Down
75 changes: 75 additions & 0 deletions internal/pkg/agent/application/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,81 @@ func TestUpgradeErrorHandling(t *testing.T) {
}
},
},
"should return error if changeSymlink fails": {
isDiskSpaceErrorResult: false,
expectedError: testError,
upgraderMocker: func(upgrader *Upgrader) {
upgrader.artifactDownloader = &mockArtifactDownloader{}
upgrader.extractAgentVersion = func(metadata packageMetadata, upgradeVersion string) agentVersion {
return agentVersion{
version: upgradeVersion,
snapshot: false,
hash: metadata.hash,
}
}
upgrader.unpacker = &mockUnpacker{
returnPackageMetadata: packageMetadata{
manifest: &v1.PackageManifest{},
hash: "hash",
},
returnUnpackResult: UnpackResult{
Hash: "hash",
VersionedHome: "versionedHome",
},
}
upgrader.copyActionStore = func(log *logger.Logger, newHome string) error {
return nil
}
upgrader.copyRunDirectory = func(log *logger.Logger, oldRunPath, newRunPath string) error {
return nil
}
upgrader.rollbackInstall = func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error {
return nil
}
upgrader.changeSymlink = func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error {
return testError
}
},
},
"should return error if markUpgrade fails": {
isDiskSpaceErrorResult: false,
expectedError: testError,
upgraderMocker: func(upgrader *Upgrader) {
upgrader.artifactDownloader = &mockArtifactDownloader{}
upgrader.extractAgentVersion = func(metadata packageMetadata, upgradeVersion string) agentVersion {
return agentVersion{
version: upgradeVersion,
snapshot: false,
hash: metadata.hash,
}
}
upgrader.unpacker = &mockUnpacker{
returnPackageMetadata: packageMetadata{
manifest: &v1.PackageManifest{},
hash: "hash",
},
returnUnpackResult: UnpackResult{
Hash: "hash",
VersionedHome: "versionedHome",
},
}
upgrader.copyActionStore = func(log *logger.Logger, newHome string) error {
return nil
}
upgrader.copyRunDirectory = func(log *logger.Logger, oldRunPath, newRunPath string) error {
return nil
}
upgrader.changeSymlink = func(log *logger.Logger, topDirPath, symlinkPath, newTarget string) error {
return nil
}
upgrader.rollbackInstall = func(ctx context.Context, log *logger.Logger, topDirPath, versionedHome, oldVersionedHome string) error {
return nil
}
upgrader.markUpgrade = func(log *logger.Logger, dataDirPath string, agent, previousAgent agentInstall, action *fleetapi.ActionUpgrade, upgradeDetails *details.Details, desiredOutcome UpgradeOutcome) error {
return testError
}
},
},
"should add disk space error to the error chain if downloadArtifact fails with disk space error": {
isDiskSpaceErrorResult: true,
expectedError: upgradeErrors.ErrInsufficientDiskSpace,
Expand Down
Loading