Skip to content

Commit

Permalink
Reload downloader client on config change (#848)
Browse files Browse the repository at this point in the history
Reload downloader client on config change (#848)

(cherry picked from commit 6d830e8)

# Conflicts:
#	internal/pkg/agent/application/local_mode.go
#	internal/pkg/artifact/config.go
  • Loading branch information
michalpristas authored and mergify[bot] committed Aug 5, 2022
1 parent f85ac3b commit f65797d
Show file tree
Hide file tree
Showing 15 changed files with 319 additions and 22 deletions.
10 changes: 10 additions & 0 deletions internal/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func newLocal(
return nil, errors.New(err, "failed to initialize composable controller")
}

routerArtifactReloader, ok := router.(emitter.Reloader)
if !ok {
return nil, errors.New("router not capable of artifact reload") // Needed for client reloading
}

discover := discoverer(pathConfigFile, cfg.Settings.Path, externalConfigsGlob())
emit, err := emitter.New(
localApplication.bgContext,
Expand All @@ -131,6 +136,11 @@ func newLocal(
},
caps,
monitor,
<<<<<<< HEAD
=======
artifact.NewReloader(cfg.Settings.DownloadConfig, log),
routerArtifactReloader,
>>>>>>> 6d830e88d (Reload downloader client on config change (#848))
)
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func newManaged(
return nil, errors.New(err, "failed to initialize composable controller")
}

routerArtifactReloader, ok := router.(emitter.Reloader)
if !ok {
return nil, errors.New("router not capable of artifact reload") // Needed for client reloading
}

emit, err := emitter.New(
managedApplication.bgContext,
log,
Expand All @@ -159,6 +164,7 @@ func newManaged(
caps,
monitor,
artifact.NewReloader(cfg.Settings.DownloadConfig, log),
routerArtifactReloader,
)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/agent/application/pipeline/emitter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

type reloadable interface {
type Reloader interface {
Reload(cfg *config.Config) error
}

Expand All @@ -32,7 +32,7 @@ type Controller struct {
controller composable.Controller
router pipeline.Router
modifiers *pipeline.ConfigModifiers
reloadables []reloadable
reloadables []Reloader
caps capabilities.Capability

// state
Expand All @@ -51,7 +51,7 @@ func NewController(
router pipeline.Router,
modifiers *pipeline.ConfigModifiers,
caps capabilities.Capability,
reloadables ...reloadable,
reloadables ...Reloader,
) *Controller {
init, _ := transpiler.NewVars(map[string]interface{}{}, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/pipeline/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// New creates a new emitter function.
func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...reloadable) (pipeline.EmitterFunc, error) {
func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...Reloader) (pipeline.EmitterFunc, error) {
log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", "))

ctrl := NewController(log, agentInfo, controller, router, modifiers, caps, reloadables...)
Expand Down
23 changes: 23 additions & 0 deletions internal/pkg/agent/application/pipeline/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter"
"github.com/elastic/elastic-agent/internal/pkg/agent/configrequest"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/sorted"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -35,6 +37,27 @@ func New(log *logger.Logger, factory pipeline.StreamFunc) (pipeline.Router, erro
return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil
}

func (r *router) Reload(c *config.Config) error {
keys := r.routes.Keys()
for _, key := range keys {
route, found := r.routes.Get(key)
if !found {
continue
}

routeReloader, ok := route.(emitter.Reloader)
if !ok {
continue
}

if err := routeReloader.Reload(c); err != nil {
return err
}
}

return nil
}

func (r *router) Routes() *sorted.Set {
return r.routes
}
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/agent/application/pipeline/stream/operator_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter"
"github.com/elastic/elastic-agent/internal/pkg/agent/configrequest"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -29,6 +31,15 @@ type specer interface {
Specs() map[string]program.Spec
}

func (b *operatorStream) Reload(c *config.Config) error {
r, ok := b.configHandler.(emitter.Reloader)
if !ok {
return nil
}

return r.Reload(c)
}

func (b *operatorStream) Close() error {
return b.configHandler.Close()
}
Expand Down
49 changes: 45 additions & 4 deletions internal/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/agent/stateresolver"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/artifact/download"
"github.com/elastic/elastic-agent/internal/pkg/artifact/install"
"github.com/elastic/elastic-agent/internal/pkg/artifact/uninstall"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/app"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop"
Expand Down Expand Up @@ -115,12 +117,51 @@ func NewOperator(

operator.initHandlerMap()

os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755)
os.MkdirAll(config.DownloadConfig.InstallPath, 0755)
if err := os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755); err != nil {
// can already exists from previous runs, not an error
logger.Warnf("failed creating %q: %v", config.DownloadConfig.TargetDirectory, err)
}
if err := os.MkdirAll(config.DownloadConfig.InstallPath, 0755); err != nil {
// can already exists from previous runs, not an error
logger.Warnf("failed creating %q: %v", config.DownloadConfig.InstallPath, err)
}

return operator, nil
}

func (o *Operator) Reload(rawConfig *config.Config) error {
// save some unpacking in downloaders
type reloadConfig struct {
C *artifact.Config `json:"agent.download" config:"agent.download"`
}
tmp := &reloadConfig{
C: artifact.DefaultConfig(),
}
if err := rawConfig.Unpack(&tmp); err != nil {
return errors.New(err, "failed to unpack artifact config")
}

if err := o.reloadComponent(o.downloader, "downloader", tmp.C); err != nil {
return err
}

return o.reloadComponent(o.verifier, "verifier", tmp.C)
}

func (o *Operator) reloadComponent(component interface{}, name string, cfg *artifact.Config) error {
r, ok := component.(artifact.ConfigReloader)
if !ok {
o.logger.Debugf("failed reloading %q: component is not reloadable", name)
return nil // not an error, could be filesystem downloader/verifier
}

if err := r.Reload(cfg); err != nil {
return errors.New(err, fmt.Sprintf("failed reloading %q config", component))
}

return nil
}

// State describes the current state of the system.
// Reports all known applications and theirs states. Whether they are running
// or not, and if they are information about process is also present.
Expand Down Expand Up @@ -238,12 +279,12 @@ func (o *Operator) Shutdown() {
a.Shutdown()
wg.Done()
o.logger.Debugf("took %s to shutdown %s",
time.Now().Sub(started), a.Name())
time.Since(started), a.Name())
}(a)
}
wg.Wait()
o.logger.Debugf("took %s to shutdown %d apps",
time.Now().Sub(started), len(o.apps))
time.Since(started), len(o.apps))
}

// Start starts a new process based on a configuration
Expand Down
61 changes: 56 additions & 5 deletions internal/pkg/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const (
defaultSourceURI = "https://artifacts.elastic.co/downloads/"
)

type ConfigReloader interface {
Reload(*Config) error
}

// Config is a configuration used for verifier and downloader
type Config struct {
// OperatingSystem: operating system [linux, windows, darwin]
Expand Down Expand Up @@ -52,18 +56,65 @@ type Config struct {
}

type Reloader struct {
log *logger.Logger
cfg *Config
log *logger.Logger
cfg *Config
reloaders []ConfigReloader
}

func NewReloader(cfg *Config, log *logger.Logger) *Reloader {
func NewReloader(cfg *Config, log *logger.Logger, rr ...ConfigReloader) *Reloader {
return &Reloader{
cfg: cfg,
log: log,
cfg: cfg,
log: log,
reloaders: rr,
}
}

func (r *Reloader) Reload(rawConfig *config.Config) error {
<<<<<<< HEAD
=======
if err := r.reloadConfig(rawConfig); err != nil {
return errors.New(err, "failed to reload config")
}

if err := r.reloadSourceURI(rawConfig); err != nil {
return errors.New(err, "failed to reload source URI")
}

for _, reloader := range r.reloaders {
if err := reloader.Reload(r.cfg); err != nil {
return errors.New(err, "failed reloading config")
}
}

return nil
}

func (r *Reloader) reloadConfig(rawConfig *config.Config) error {
type reloadConfig struct {
C *Config `json:"agent.download" config:"agent.download"`
}
tmp := &reloadConfig{
C: DefaultConfig(),
}
if err := rawConfig.Unpack(&tmp); err != nil {
return err
}

*(r.cfg) = Config{
OperatingSystem: tmp.C.OperatingSystem,
Architecture: tmp.C.Architecture,
SourceURI: tmp.C.SourceURI,
TargetDirectory: tmp.C.TargetDirectory,
InstallPath: tmp.C.InstallPath,
DropPath: tmp.C.DropPath,
HTTPTransportSettings: tmp.C.HTTPTransportSettings,
}

return nil
}

func (r *Reloader) reloadSourceURI(rawConfig *config.Config) error {
>>>>>>> 6d830e88d (Reload downloader client on config change (#848))
type reloadConfig struct {
// SourceURI: source of the artifacts, e.g https://artifacts.elastic.co/downloads/
SourceURI string `json:"agent.download.sourceURI" config:"agent.download.sourceURI"`
Expand Down
16 changes: 16 additions & 0 deletions internal/pkg/artifact/download/composed/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/hashicorp/go-multierror"
"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/artifact/download"
)

Expand Down Expand Up @@ -50,3 +52,17 @@ func (e *Downloader) Download(ctx context.Context, spec program.Spec, version st

return "", err
}

func (e *Downloader) Reload(c *artifact.Config) error {
for _, d := range e.dd {
reloadable, ok := d.(download.Reloader)
if !ok {
continue
}

if err := reloadable.Reload(c); err != nil {
return errors.New(err, "failed reloading artifact config for composed downloader")
}
}
return nil
}
18 changes: 16 additions & 2 deletions internal/pkg/artifact/download/composed/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package composed

import (
"errors"

"github.com/hashicorp/go-multierror"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/artifact/download"
)

Expand Down Expand Up @@ -54,3 +54,17 @@ func (e *Verifier) Verify(spec program.Spec, version string) error {

return err
}

func (e *Verifier) Reload(c *artifact.Config) error {
for _, v := range e.vv {
reloadable, ok := v.(download.Reloader)
if !ok {
continue
}

if err := reloadable.Reload(c); err != nil {
return errors.New(err, "failed reloading artifact config for composed verifier")
}
}
return nil
}
Loading

0 comments on commit f65797d

Please sign in to comment.