Skip to content

Commit

Permalink
[8.4](backport elastic#848) Reload downloader client on config change (
Browse files Browse the repository at this point in the history
…elastic#868)

[8.4](backport elastic#848) Reload downloader client on config change (elastic#868)
  • Loading branch information
michalpristas authored Aug 9, 2022
1 parent 76d4199 commit 785c6c7
Show file tree
Hide file tree
Showing 16 changed files with 613 additions and 33 deletions.
10 changes: 9 additions & 1 deletion internal/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/operation"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
Expand Down Expand Up @@ -118,6 +119,11 @@ func newLocal(
return nil, errors.New(err, "failed to initialize composable controller")
}

routerArtifactReloader, ok := router.(emitter.Reloader)
if !ok {
return nil, errors.New("router not capable of artifact reload") // Needed for client reloading
}

discover := discoverer(pathConfigFile, cfg.Settings.Path, externalConfigsGlob())
emit, err := emitter.New(
localApplication.bgContext,
Expand All @@ -131,6 +137,8 @@ func newLocal(
},
caps,
monitor,
artifact.NewReloader(cfg.Settings.DownloadConfig, log),
routerArtifactReloader,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -203,7 +211,7 @@ func (l *Local) AgentInfo() *info.AgentInfo {
}

func discoverer(patterns ...string) discoverFunc {
var p []string
p := make([]string, 0, len(patterns))
for _, newP := range patterns {
if len(newP) == 0 {
continue
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func newManaged(
return nil, errors.New(err, "failed to initialize composable controller")
}

routerArtifactReloader, ok := router.(emitter.Reloader)
if !ok {
return nil, errors.New("router not capable of artifact reload") // Needed for client reloading
}

emit, err := emitter.New(
managedApplication.bgContext,
log,
Expand All @@ -159,6 +164,7 @@ func newManaged(
caps,
monitor,
artifact.NewReloader(cfg.Settings.DownloadConfig, log),
routerArtifactReloader,
)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/agent/application/pipeline/emitter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

type reloadable interface {
type Reloader interface {
Reload(cfg *config.Config) error
}

Expand All @@ -32,7 +32,7 @@ type Controller struct {
controller composable.Controller
router pipeline.Router
modifiers *pipeline.ConfigModifiers
reloadables []reloadable
reloadables []Reloader
caps capabilities.Capability

// state
Expand All @@ -51,7 +51,7 @@ func NewController(
router pipeline.Router,
modifiers *pipeline.ConfigModifiers,
caps capabilities.Capability,
reloadables ...reloadable,
reloadables ...Reloader,
) *Controller {
init, _ := transpiler.NewVars(map[string]interface{}{}, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/pipeline/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// New creates a new emitter function.
func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...reloadable) (pipeline.EmitterFunc, error) {
func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...Reloader) (pipeline.EmitterFunc, error) {
log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", "))

ctrl := NewController(log, agentInfo, controller, router, modifiers, caps, reloadables...)
Expand Down
23 changes: 23 additions & 0 deletions internal/pkg/agent/application/pipeline/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter"
"github.com/elastic/elastic-agent/internal/pkg/agent/configrequest"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/sorted"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -35,6 +37,27 @@ func New(log *logger.Logger, factory pipeline.StreamFunc) (pipeline.Router, erro
return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil
}

func (r *router) Reload(c *config.Config) error {
keys := r.routes.Keys()
for _, key := range keys {
route, found := r.routes.Get(key)
if !found {
continue
}

routeReloader, ok := route.(emitter.Reloader)
if !ok {
continue
}

if err := routeReloader.Reload(c); err != nil {
return err
}
}

return nil
}

func (r *router) Routes() *sorted.Set {
return r.routes
}
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/agent/application/pipeline/stream/operator_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter"
"github.com/elastic/elastic-agent/internal/pkg/agent/configrequest"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -29,6 +31,15 @@ type specer interface {
Specs() map[string]program.Spec
}

func (b *operatorStream) Reload(c *config.Config) error {
r, ok := b.configHandler.(emitter.Reloader)
if !ok {
return nil
}

return r.Reload(c)
}

func (b *operatorStream) Close() error {
return b.configHandler.Close()
}
Expand Down
49 changes: 45 additions & 4 deletions internal/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/agent/stateresolver"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/artifact/download"
"github.com/elastic/elastic-agent/internal/pkg/artifact/install"
"github.com/elastic/elastic-agent/internal/pkg/artifact/uninstall"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/app"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop"
Expand Down Expand Up @@ -115,12 +117,51 @@ func NewOperator(

operator.initHandlerMap()

os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755)
os.MkdirAll(config.DownloadConfig.InstallPath, 0755)
if err := os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755); err != nil {
// can already exists from previous runs, not an error
logger.Warnf("failed creating %q: %v", config.DownloadConfig.TargetDirectory, err)
}
if err := os.MkdirAll(config.DownloadConfig.InstallPath, 0755); err != nil {
// can already exists from previous runs, not an error
logger.Warnf("failed creating %q: %v", config.DownloadConfig.InstallPath, err)
}

return operator, nil
}

func (o *Operator) Reload(rawConfig *config.Config) error {
// save some unpacking in downloaders
type reloadConfig struct {
C *artifact.Config `json:"agent.download" config:"agent.download"`
}
tmp := &reloadConfig{
C: artifact.DefaultConfig(),
}
if err := rawConfig.Unpack(&tmp); err != nil {
return errors.New(err, "failed to unpack artifact config")
}

if err := o.reloadComponent(o.downloader, "downloader", tmp.C); err != nil {
return err
}

return o.reloadComponent(o.verifier, "verifier", tmp.C)
}

func (o *Operator) reloadComponent(component interface{}, name string, cfg *artifact.Config) error {
r, ok := component.(artifact.ConfigReloader)
if !ok {
o.logger.Debugf("failed reloading %q: component is not reloadable", name)
return nil // not an error, could be filesystem downloader/verifier
}

if err := r.Reload(cfg); err != nil {
return errors.New(err, fmt.Sprintf("failed reloading %q config", component))
}

return nil
}

// State describes the current state of the system.
// Reports all known applications and theirs states. Whether they are running
// or not, and if they are information about process is also present.
Expand Down Expand Up @@ -238,12 +279,12 @@ func (o *Operator) Shutdown() {
a.Shutdown()
wg.Done()
o.logger.Debugf("took %s to shutdown %s",
time.Now().Sub(started), a.Name())
time.Since(started), a.Name())
}(a)
}
wg.Wait()
o.logger.Debugf("took %s to shutdown %d apps",
time.Now().Sub(started), len(o.apps))
time.Since(started), len(o.apps))
}

// Start starts a new process based on a configuration
Expand Down
106 changes: 97 additions & 9 deletions internal/pkg/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

c "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
Expand All @@ -24,6 +25,10 @@ const (
defaultSourceURI = "https://artifacts.elastic.co/downloads/"
)

type ConfigReloader interface {
Reload(*Config) error
}

// Config is a configuration used for verifier and downloader
type Config struct {
// OperatingSystem: operating system [linux, windows, darwin]
Expand Down Expand Up @@ -52,18 +57,62 @@ type Config struct {
}

type Reloader struct {
log *logger.Logger
cfg *Config
log *logger.Logger
cfg *Config
reloaders []ConfigReloader
}

func NewReloader(cfg *Config, log *logger.Logger) *Reloader {
func NewReloader(cfg *Config, log *logger.Logger, rr ...ConfigReloader) *Reloader {
return &Reloader{
cfg: cfg,
log: log,
cfg: cfg,
log: log,
reloaders: rr,
}
}

func (r *Reloader) Reload(rawConfig *config.Config) error {
if err := r.reloadConfig(rawConfig); err != nil {
return errors.New(err, "failed to reload config")
}

if err := r.reloadSourceURI(rawConfig); err != nil {
return errors.New(err, "failed to reload source URI")
}

for _, reloader := range r.reloaders {
if err := reloader.Reload(r.cfg); err != nil {
return errors.New(err, "failed reloading config")
}
}

return nil
}

func (r *Reloader) reloadConfig(rawConfig *config.Config) error {
type reloadConfig struct {
C *Config `json:"agent.download" config:"agent.download"`
}
tmp := &reloadConfig{
C: DefaultConfig(),
}
if err := rawConfig.Unpack(&tmp); err != nil {
return err
}

*(r.cfg) = Config{
OperatingSystem: tmp.C.OperatingSystem,
Architecture: tmp.C.Architecture,
SourceURI: tmp.C.SourceURI,
TargetDirectory: tmp.C.TargetDirectory,
InstallPath: tmp.C.InstallPath,
DropPath: tmp.C.DropPath,
HTTPTransportSettings: tmp.C.HTTPTransportSettings,
}

return nil
}

func (r *Reloader) reloadSourceURI(rawConfig *config.Config) error {
type reloadConfig struct {
// SourceURI: source of the artifacts, e.g https://artifacts.elastic.co/downloads/
SourceURI string `json:"agent.download.sourceURI" config:"agent.download.sourceURI"`
Expand All @@ -78,11 +127,11 @@ func (r *Reloader) Reload(rawConfig *config.Config) error {
}

var newSourceURI string
if cfg.FleetSourceURI != "" {
if fleetURI := strings.TrimSpace(cfg.FleetSourceURI); fleetURI != "" {
// fleet configuration takes precedence
newSourceURI = cfg.FleetSourceURI
} else if cfg.SourceURI != "" {
newSourceURI = cfg.SourceURI
newSourceURI = fleetURI
} else if sourceURI := strings.TrimSpace(cfg.SourceURI); sourceURI != "" {
newSourceURI = sourceURI
}

if newSourceURI != "" {
Expand Down Expand Up @@ -148,3 +197,42 @@ func (c *Config) Arch() string {
c.Architecture = arch
return c.Architecture
}

// Unpack reads a config object into the settings.
func (c *Config) Unpack(cfg *c.C) error {
tmp := struct {
OperatingSystem string `json:"-" config:",ignore"`
Architecture string `json:"-" config:",ignore"`
SourceURI string `json:"sourceURI" config:"sourceURI"`
TargetDirectory string `json:"targetDirectory" config:"target_directory"`
InstallPath string `yaml:"installPath" config:"install_path"`
DropPath string `yaml:"dropPath" config:"drop_path"`
}{
OperatingSystem: c.OperatingSystem,
Architecture: c.Architecture,
SourceURI: c.SourceURI,
TargetDirectory: c.TargetDirectory,
InstallPath: c.InstallPath,
DropPath: c.DropPath,
}

if err := cfg.Unpack(&tmp); err != nil {
return err
}

transport := DefaultConfig().HTTPTransportSettings
if err := cfg.Unpack(&transport); err != nil {
return err
}

*c = Config{
OperatingSystem: tmp.OperatingSystem,
Architecture: tmp.Architecture,
SourceURI: tmp.SourceURI,
TargetDirectory: tmp.TargetDirectory,
InstallPath: tmp.InstallPath,
DropPath: tmp.DropPath,
HTTPTransportSettings: transport,
}
return nil
}
Loading

0 comments on commit 785c6c7

Please sign in to comment.