Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 35 additions & 29 deletions x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
{
"actions": [
{
"type": "POLICY_CHANGE",
"id": "id-1",
"data": {
"policy": {
"id": "policy-id",
"outputs": {
"default": {
"hosts": "https://localhost:9200"
}
},
"streams": [
{
"id": "string",
"type": "logs",
"path": "/var/log/hello.log",
"output": {
"use_output": "default"
}
}
]
}
"action": "checkin",
"success": true,
"actions": [
{
"type": "POLICY_CHANGE",
"data": {
"policy": {
"id": "default",
"outputs": {
"default": {
"id": "default",
"name": "default",
"type": "elasticsearch",
"hosts": "http://localhost:9200",
"ingest_pipeline": "default"
}
},
{
"type": "WHAT_TO_DO_WITH_IT",
"id": "id2"
},
"streams": [
{
"type": "metric/nginx",
"metricsets": [
"stubstatus"
],
"period": "10s",
"enabled": true,
"hosts": "http://127.0.0.1",
"id": "stubstatus",
"output": {
"use_output": "default"
}
}
]
}
],
"success": true
},
"id": "6d22f55a-d6e8-4e52-bcaa-16dde6091c5c"
}
]
}
104 changes: 104 additions & 0 deletions x-pack/agent/pkg/agent/application/action_dispatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"fmt"
"reflect"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/x-pack/agent/pkg/core/logger"
)

type action interface{}

type actionHandler interface {
Handle(a action) error
}

type actionHandlers map[string]actionHandler

type actionDispatcher struct {
log *logger.Logger
handlers actionHandlers
def actionHandler
}

func newActionDispatcher(log *logger.Logger, def actionHandler) (*actionDispatcher, error) {
var err error
if log == nil {
log, err = logger.New()
if err != nil {
return nil, err
}
}

if def == nil {
return nil, errors.New("missing default handler")
}

return &actionDispatcher{
log: log,
handlers: make(actionHandlers),
def: def,
}, nil
}

func (ad *actionDispatcher) Register(a action, handler actionHandler) error {
k := ad.key(a)
_, ok := ad.handlers[k]
if ok {
return fmt.Errorf("action with type %T is already registered", a)
}
ad.handlers[k] = handler
return nil
}

func (ad *actionDispatcher) MustRegister(a action, handler actionHandler) {
err := ad.Register(a, handler)
if err != nil {
panic("could not register action, error: " + err.Error())
}
}

func (ad *actionDispatcher) key(a action) string {
return reflect.TypeOf(a).String()
}

func (ad *actionDispatcher) Dispatch(actions ...action) error {
ad.log.Debugf(
"Dispatch %d actions of types: %s",
len(actions),
strings.Join(detectTypes(actions), ", "),
)

for _, action := range actions {
if err := ad.dispatchAction(action); err != nil {
ad.log.Debugf("Failed to dispatch action '%+v', error: %+v", action, err)
return err
}
ad.log.Debugf("Succesfully dispatched action: '%+v'", action)
}
return nil
}

func (ad *actionDispatcher) dispatchAction(a action) error {
handler, found := ad.handlers[(ad.key(a))]
if !found {
return ad.def.Handle(a)
}

return handler.Handle(a)
}

func detectTypes(actions []action) []string {
str := make([]string, len(actions))
for idx, action := range actions {
str[idx] = reflect.TypeOf(action).String()
}
return str
}
93 changes: 93 additions & 0 deletions x-pack/agent/pkg/agent/application/action_dispatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"testing"

"github.com/stretchr/testify/require"
)

type mockHandler struct {
received action
called bool
err error
}

func (h *mockHandler) Handle(a action) error {
h.called = true
h.received = a
return h.err
}

type mockAction struct{}
type mockActionUnknown struct{}
type mockActionOther struct{}

func TestActionDispatcher(t *testing.T) {
t.Run("Success to dispatch multiples events", func(t *testing.T) {
def := &mockHandler{}
d, err := newActionDispatcher(nil, def)
require.NoError(t, err)

success1 := &mockHandler{}
success2 := &mockHandler{}

d.Register(&mockAction{}, success1)
d.Register(&mockActionOther{}, success2)

action1 := &mockAction{}
action2 := &mockActionOther{}

err = d.Dispatch(action1, action2)

require.NoError(t, err)

require.True(t, success1.called)
require.Equal(t, action1, success1.received)

require.True(t, success2.called)
require.Equal(t, action2, success2.received)

require.False(t, def.called)
require.Nil(t, def.received)
})

t.Run("Unknown action are catched by the unknown handler", func(t *testing.T) {
def := &mockHandler{}
d, err := newActionDispatcher(nil, def)
require.NoError(t, err)

success := &mockHandler{}
d.Dispatch(mockAction{}, success)

action := &mockActionUnknown{}
err = d.Dispatch(action)

require.NoError(t, err)
require.False(t, success.called)

require.True(t, def.called)
require.Equal(t, action, def.received)

require.False(t, success.called)
require.Nil(t, success.received)
})

t.Run("Could not register two handlers on the same action", func(t *testing.T) {
success1 := &mockHandler{}
success2 := &mockHandler{}

def := &mockHandler{}
d, err := newActionDispatcher(nil, def)
require.NoError(t, err)

err = d.Register(&mockAction{}, success1)
require.NoError(t, err)

err = d.Register(&mockAction{}, success2)
require.Error(t, err)
})
}
2 changes: 1 addition & 1 deletion x-pack/agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func createApplication(
return newLocal(log, pathConfigFile, c.Management)
case fleetMode:
log.Info("Agent is managed by Fleet")
return newManaged(log, c.Management)
return newManaged(log, config)
default:
return nil, ErrInvalidMgmtMode
}
Expand Down
16 changes: 10 additions & 6 deletions x-pack/agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@ import (
type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error)

func emitter(log *logger.Logger, router *router, decorators ...decoratorFunc) emitterFunc {
return func(files []string) error {
c, err := config.LoadFiles(files...)
if err != nil {
return errors.Wrap(err, "could not load or merge configuration")
}

return func(c *config.Config) error {
log.Debug("Transforming configuration into a tree")
m, err := c.ToMapStr()
if err != nil {
Expand Down Expand Up @@ -55,3 +50,12 @@ func emitter(log *logger.Logger, router *router, decorators ...decoratorFunc) em
return router.Dispatch(ast.HashStr(), programsToRun)
}
}

func readfiles(files []string, emitter emitterFunc) error {
c, err := config.LoadFiles(files...)
if err != nil {
return errors.Wrap(err, "could not load or merge configuration")
}

return emitter(c)
}
Loading