Skip to content

Commit

Permalink
Reload downloader client on config change (#848)
Browse files Browse the repository at this point in the history
Reload downloader client on config change (#848)
  • Loading branch information
michalpristas authored Aug 5, 2022
1 parent ff8de85 commit 6d830e8
Show file tree
Hide file tree
Showing 15 changed files with 276 additions and 22 deletions.
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func newLocal(
return nil, errors.New(err, "failed to initialize composable controller")
}

routerArtifactReloader, ok := router.(emitter.Reloader)
if !ok {
return nil, errors.New("router not capable of artifact reload") // Needed for client reloading
}

discover := discoverer(pathConfigFile, cfg.Settings.Path, externalConfigsGlob())
emit, err := emitter.New(
localApplication.bgContext,
Expand All @@ -133,6 +138,7 @@ func newLocal(
caps,
monitor,
artifact.NewReloader(cfg.Settings.DownloadConfig, log),
routerArtifactReloader,
)
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func newManaged(
return nil, errors.New(err, "failed to initialize composable controller")
}

routerArtifactReloader, ok := router.(emitter.Reloader)
if !ok {
return nil, errors.New("router not capable of artifact reload") // Needed for client reloading
}

emit, err := emitter.New(
managedApplication.bgContext,
log,
Expand All @@ -159,6 +164,7 @@ func newManaged(
caps,
monitor,
artifact.NewReloader(cfg.Settings.DownloadConfig, log),
routerArtifactReloader,
)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/agent/application/pipeline/emitter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

type reloadable interface {
type Reloader interface {
Reload(cfg *config.Config) error
}

Expand All @@ -32,7 +32,7 @@ type Controller struct {
controller composable.Controller
router pipeline.Router
modifiers *pipeline.ConfigModifiers
reloadables []reloadable
reloadables []Reloader
caps capabilities.Capability

// state
Expand All @@ -51,7 +51,7 @@ func NewController(
router pipeline.Router,
modifiers *pipeline.ConfigModifiers,
caps capabilities.Capability,
reloadables ...reloadable,
reloadables ...Reloader,
) *Controller {
init, _ := transpiler.NewVars(map[string]interface{}{}, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/pipeline/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// New creates a new emitter function.
func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...reloadable) (pipeline.EmitterFunc, error) {
func New(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router pipeline.Router, modifiers *pipeline.ConfigModifiers, caps capabilities.Capability, reloadables ...Reloader) (pipeline.EmitterFunc, error) {
log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", "))

ctrl := NewController(log, agentInfo, controller, router, modifiers, caps, reloadables...)
Expand Down
23 changes: 23 additions & 0 deletions internal/pkg/agent/application/pipeline/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter"
"github.com/elastic/elastic-agent/internal/pkg/agent/configrequest"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/sorted"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -35,6 +37,27 @@ func New(log *logger.Logger, factory pipeline.StreamFunc) (pipeline.Router, erro
return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil
}

func (r *router) Reload(c *config.Config) error {
keys := r.routes.Keys()
for _, key := range keys {
route, found := r.routes.Get(key)
if !found {
continue
}

routeReloader, ok := route.(emitter.Reloader)
if !ok {
continue
}

if err := routeReloader.Reload(c); err != nil {
return err
}
}

return nil
}

func (r *router) Routes() *sorted.Set {
return r.routes
}
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/agent/application/pipeline/stream/operator_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/emitter"
"github.com/elastic/elastic-agent/internal/pkg/agent/configrequest"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -29,6 +31,15 @@ type specer interface {
Specs() map[string]program.Spec
}

func (b *operatorStream) Reload(c *config.Config) error {
r, ok := b.configHandler.(emitter.Reloader)
if !ok {
return nil
}

return r.Reload(c)
}

func (b *operatorStream) Close() error {
return b.configHandler.Close()
}
Expand Down
49 changes: 45 additions & 4 deletions internal/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/agent/stateresolver"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/artifact/download"
"github.com/elastic/elastic-agent/internal/pkg/artifact/install"
"github.com/elastic/elastic-agent/internal/pkg/artifact/uninstall"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/core/app"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring"
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop"
Expand Down Expand Up @@ -115,12 +117,51 @@ func NewOperator(

operator.initHandlerMap()

os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755)
os.MkdirAll(config.DownloadConfig.InstallPath, 0755)
if err := os.MkdirAll(config.DownloadConfig.TargetDirectory, 0755); err != nil {
// can already exists from previous runs, not an error
logger.Warnf("failed creating %q: %v", config.DownloadConfig.TargetDirectory, err)
}
if err := os.MkdirAll(config.DownloadConfig.InstallPath, 0755); err != nil {
// can already exists from previous runs, not an error
logger.Warnf("failed creating %q: %v", config.DownloadConfig.InstallPath, err)
}

return operator, nil
}

func (o *Operator) Reload(rawConfig *config.Config) error {
// save some unpacking in downloaders
type reloadConfig struct {
C *artifact.Config `json:"agent.download" config:"agent.download"`
}
tmp := &reloadConfig{
C: artifact.DefaultConfig(),
}
if err := rawConfig.Unpack(&tmp); err != nil {
return errors.New(err, "failed to unpack artifact config")
}

if err := o.reloadComponent(o.downloader, "downloader", tmp.C); err != nil {
return err
}

return o.reloadComponent(o.verifier, "verifier", tmp.C)
}

func (o *Operator) reloadComponent(component interface{}, name string, cfg *artifact.Config) error {
r, ok := component.(artifact.ConfigReloader)
if !ok {
o.logger.Debugf("failed reloading %q: component is not reloadable", name)
return nil // not an error, could be filesystem downloader/verifier
}

if err := r.Reload(cfg); err != nil {
return errors.New(err, fmt.Sprintf("failed reloading %q config", component))
}

return nil
}

// State describes the current state of the system.
// Reports all known applications and theirs states. Whether they are running
// or not, and if they are information about process is also present.
Expand Down Expand Up @@ -238,12 +279,12 @@ func (o *Operator) Shutdown() {
a.Shutdown()
wg.Done()
o.logger.Debugf("took %s to shutdown %s",
time.Now().Sub(started), a.Name())
time.Since(started), a.Name())
}(a)
}
wg.Wait()
o.logger.Debugf("took %s to shutdown %d apps",
time.Now().Sub(started), len(o.apps))
time.Since(started), len(o.apps))
}

// Start starts a new process based on a configuration
Expand Down
22 changes: 17 additions & 5 deletions internal/pkg/artifact/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
defaultSourceURI = "https://artifacts.elastic.co/downloads/"
)

type ConfigReloader interface {
Reload(*Config) error
}

// Config is a configuration used for verifier and downloader
type Config struct {
// OperatingSystem: operating system [linux, windows, darwin]
Expand Down Expand Up @@ -53,14 +57,16 @@ type Config struct {
}

type Reloader struct {
log *logger.Logger
cfg *Config
log *logger.Logger
cfg *Config
reloaders []ConfigReloader
}

func NewReloader(cfg *Config, log *logger.Logger) *Reloader {
func NewReloader(cfg *Config, log *logger.Logger, rr ...ConfigReloader) *Reloader {
return &Reloader{
cfg: cfg,
log: log,
cfg: cfg,
log: log,
reloaders: rr,
}
}

Expand All @@ -73,6 +79,12 @@ func (r *Reloader) Reload(rawConfig *config.Config) error {
return errors.New(err, "failed to reload source URI")
}

for _, reloader := range r.reloaders {
if err := reloader.Reload(r.cfg); err != nil {
return errors.New(err, "failed reloading config")
}
}

return nil
}

Expand Down
16 changes: 16 additions & 0 deletions internal/pkg/artifact/download/composed/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"github.com/hashicorp/go-multierror"
"go.elastic.co/apm"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/artifact/download"
)

Expand Down Expand Up @@ -50,3 +52,17 @@ func (e *Downloader) Download(ctx context.Context, spec program.Spec, version st

return "", err
}

func (e *Downloader) Reload(c *artifact.Config) error {
for _, d := range e.dd {
reloadable, ok := d.(download.Reloader)
if !ok {
continue
}

if err := reloadable.Reload(c); err != nil {
return errors.New(err, "failed reloading artifact config for composed downloader")
}
}
return nil
}
18 changes: 16 additions & 2 deletions internal/pkg/artifact/download/composed/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package composed

import (
"errors"

"github.com/hashicorp/go-multierror"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
"github.com/elastic/elastic-agent/internal/pkg/artifact/download"
)

Expand Down Expand Up @@ -54,3 +54,17 @@ func (e *Verifier) Verify(spec program.Spec, version string) error {

return err
}

func (e *Verifier) Reload(c *artifact.Config) error {
for _, v := range e.vv {
reloadable, ok := v.(download.Reloader)
if !ok {
continue
}

if err := reloadable.Reload(c); err != nil {
return errors.New(err, "failed reloading artifact config for composed verifier")
}
}
return nil
}
31 changes: 26 additions & 5 deletions internal/pkg/artifact/download/http/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/elastic/elastic-agent-libs/atomic"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"

"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
"github.com/elastic/elastic-agent/internal/pkg/artifact"
Expand Down Expand Up @@ -73,14 +72,33 @@ func NewDownloaderWithClient(log progressLogger, config *artifact.Config, client
}
}

func (e *Downloader) Reload(c *artifact.Config) error {
// reload client
client, err := c.HTTPTransportSettings.Client(
httpcommon.WithAPMHTTPInstrumentation(),
)
if err != nil {
return errors.New(err, "http.downloader: failed to generate client out of config")
}

client.Transport = withHeaders(client.Transport, headers)

e.client = *client
e.config = c

return nil
}

// Download fetches the package from configured source.
// Returns absolute path to downloaded package and an error.
func (e *Downloader) Download(ctx context.Context, spec program.Spec, version string) (_ string, err error) {
downloadedFiles := make([]string, 0, 2)
defer func() {
if err != nil {
for _, path := range downloadedFiles {
os.Remove(path)
if err := os.Remove(path); err != nil {
e.log.Warnf("failed to cleanup %s: %v", path, err)
}
}
}
}()
Expand Down Expand Up @@ -164,12 +182,14 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f

resp, err := e.client.Do(req.WithContext(ctx))
if err != nil {
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
// return path, file already exists and needs to be cleaned up
return fullPath, errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return "", errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
// return path, file already exists and needs to be cleaned up
return fullPath, errors.New(fmt.Sprintf("call to '%s' returned unsuccessful status code: %d", sourceURI, resp.StatusCode), errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}

fileSize := -1
Expand All @@ -186,7 +206,8 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
if err != nil {
reportCancel()
dp.ReportFailed(err)
return "", errors.New(err, "fetching package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
// return path, file already exists and needs to be cleaned up
return fullPath, errors.New(err, "copying fetched package failed", errors.TypeNetwork, errors.M(errors.MetaKeyURI, sourceURI))
}
reportCancel()
dp.ReportComplete()
Expand Down
Loading

0 comments on commit 6d830e8

Please sign in to comment.