diff --git a/x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go b/x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go index 14a72e760915..6e900ccc67d1 100644 --- a/x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go +++ b/x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go @@ -82,7 +82,6 @@ func main() { } ioutil.WriteFile(output, data, 0640) - return } diff --git a/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json b/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json index 5ed8c8aa19ca..ab1ff1b79c70 100644 --- a/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json +++ b/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json @@ -1,33 +1,39 @@ { - "actions": [ - { - "type": "POLICY_CHANGE", - "id": "id-1", - "data": { - "policy": { - "id": "policy-id", - "outputs": { - "default": { - "hosts": "https://localhost:9200" - } - }, - "streams": [ - { - "id": "string", - "type": "logs", - "path": "/var/log/hello.log", - "output": { - "use_output": "default" - } - } - ] - } + "action": "checkin", + "success": true, + "actions": [ + { + "type": "POLICY_CHANGE", + "data": { + "policy": { + "id": "default", + "outputs": { + "default": { + "id": "default", + "name": "default", + "type": "elasticsearch", + "hosts": "http://localhost:9200", + "ingest_pipeline": "default" } - }, - { - "type": "WHAT_TO_DO_WITH_IT", - "id": "id2" + }, + "streams": [ + { + "type": "metric/nginx", + "metricsets": [ + "stubstatus" + ], + "period": "10s", + "enabled": true, + "hosts": "http://127.0.0.1", + "id": "stubstatus", + "output": { + "use_output": "default" + } + } + ] } - ], - "success": true + }, + "id": "6d22f55a-d6e8-4e52-bcaa-16dde6091c5c" + } + ] } diff --git a/x-pack/agent/pkg/agent/application/action_dispatch.go b/x-pack/agent/pkg/agent/application/action_dispatch.go new file mode 100644 index 000000000000..7727bed4d75e --- /dev/null +++ b/x-pack/agent/pkg/agent/application/action_dispatch.go @@ -0,0 +1,104 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "fmt" + "reflect" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" +) + +type action interface{} + +type actionHandler interface { + Handle(a action) error +} + +type actionHandlers map[string]actionHandler + +type actionDispatcher struct { + log *logger.Logger + handlers actionHandlers + def actionHandler +} + +func newActionDispatcher(log *logger.Logger, def actionHandler) (*actionDispatcher, error) { + var err error + if log == nil { + log, err = logger.New() + if err != nil { + return nil, err + } + } + + if def == nil { + return nil, errors.New("missing default handler") + } + + return &actionDispatcher{ + log: log, + handlers: make(actionHandlers), + def: def, + }, nil +} + +func (ad *actionDispatcher) Register(a action, handler actionHandler) error { + k := ad.key(a) + _, ok := ad.handlers[k] + if ok { + return fmt.Errorf("action with type %T is already registered", a) + } + ad.handlers[k] = handler + return nil +} + +func (ad *actionDispatcher) MustRegister(a action, handler actionHandler) { + err := ad.Register(a, handler) + if err != nil { + panic("could not register action, error: " + err.Error()) + } +} + +func (ad *actionDispatcher) key(a action) string { + return reflect.TypeOf(a).String() +} + +func (ad *actionDispatcher) Dispatch(actions ...action) error { + ad.log.Debugf( + "Dispatch %d actions of types: %s", + len(actions), + strings.Join(detectTypes(actions), ", "), + ) + + for _, action := range actions { + if err := ad.dispatchAction(action); err != nil { + ad.log.Debugf("Failed to dispatch action '%+v', error: %+v", action, err) + return err + } + ad.log.Debugf("Succesfully dispatched action: '%+v'", action) + } + return nil +} + +func (ad *actionDispatcher) dispatchAction(a action) error { + handler, found := ad.handlers[(ad.key(a))] + if !found { + return ad.def.Handle(a) + } + + return handler.Handle(a) +} + +func detectTypes(actions []action) []string { + str := make([]string, len(actions)) + for idx, action := range actions { + str[idx] = reflect.TypeOf(action).String() + } + return str +} diff --git a/x-pack/agent/pkg/agent/application/action_dispatch_test.go b/x-pack/agent/pkg/agent/application/action_dispatch_test.go new file mode 100644 index 000000000000..79b59e695939 --- /dev/null +++ b/x-pack/agent/pkg/agent/application/action_dispatch_test.go @@ -0,0 +1,93 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type mockHandler struct { + received action + called bool + err error +} + +func (h *mockHandler) Handle(a action) error { + h.called = true + h.received = a + return h.err +} + +type mockAction struct{} +type mockActionUnknown struct{} +type mockActionOther struct{} + +func TestActionDispatcher(t *testing.T) { + t.Run("Success to dispatch multiples events", func(t *testing.T) { + def := &mockHandler{} + d, err := newActionDispatcher(nil, def) + require.NoError(t, err) + + success1 := &mockHandler{} + success2 := &mockHandler{} + + d.Register(&mockAction{}, success1) + d.Register(&mockActionOther{}, success2) + + action1 := &mockAction{} + action2 := &mockActionOther{} + + err = d.Dispatch(action1, action2) + + require.NoError(t, err) + + require.True(t, success1.called) + require.Equal(t, action1, success1.received) + + require.True(t, success2.called) + require.Equal(t, action2, success2.received) + + require.False(t, def.called) + require.Nil(t, def.received) + }) + + t.Run("Unknown action are catched by the unknown handler", func(t *testing.T) { + def := &mockHandler{} + d, err := newActionDispatcher(nil, def) + require.NoError(t, err) + + success := &mockHandler{} + d.Dispatch(mockAction{}, success) + + action := &mockActionUnknown{} + err = d.Dispatch(action) + + require.NoError(t, err) + require.False(t, success.called) + + require.True(t, def.called) + require.Equal(t, action, def.received) + + require.False(t, success.called) + require.Nil(t, success.received) + }) + + t.Run("Could not register two handlers on the same action", func(t *testing.T) { + success1 := &mockHandler{} + success2 := &mockHandler{} + + def := &mockHandler{} + d, err := newActionDispatcher(nil, def) + require.NoError(t, err) + + err = d.Register(&mockAction{}, success1) + require.NoError(t, err) + + err = d.Register(&mockAction{}, success2) + require.Error(t, err) + }) +} diff --git a/x-pack/agent/pkg/agent/application/application.go b/x-pack/agent/pkg/agent/application/application.go index dd7ca228b438..3fe9e17174e8 100644 --- a/x-pack/agent/pkg/agent/application/application.go +++ b/x-pack/agent/pkg/agent/application/application.go @@ -55,7 +55,7 @@ func createApplication( return newLocal(log, pathConfigFile, c.Management) case fleetMode: log.Info("Agent is managed by Fleet") - return newManaged(log, c.Management) + return newManaged(log, config) default: return nil, ErrInvalidMgmtMode } diff --git a/x-pack/agent/pkg/agent/application/emitter.go b/x-pack/agent/pkg/agent/application/emitter.go index fabeec4b694b..f74f3a058bd2 100644 --- a/x-pack/agent/pkg/agent/application/emitter.go +++ b/x-pack/agent/pkg/agent/application/emitter.go @@ -18,12 +18,7 @@ import ( type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error) func emitter(log *logger.Logger, router *router, decorators ...decoratorFunc) emitterFunc { - return func(files []string) error { - c, err := config.LoadFiles(files...) - if err != nil { - return errors.Wrap(err, "could not load or merge configuration") - } - + return func(c *config.Config) error { log.Debug("Transforming configuration into a tree") m, err := c.ToMapStr() if err != nil { @@ -55,3 +50,12 @@ func emitter(log *logger.Logger, router *router, decorators ...decoratorFunc) em return router.Dispatch(ast.HashStr(), programsToRun) } } + +func readfiles(files []string, emitter emitterFunc) error { + c, err := config.LoadFiles(files...) + if err != nil { + return errors.Wrap(err, "could not load or merge configuration") + } + + return emitter(c) +} diff --git a/x-pack/agent/pkg/agent/application/fleet_gateway.go b/x-pack/agent/pkg/agent/application/fleet_gateway.go new file mode 100644 index 000000000000..197dc6f64932 --- /dev/null +++ b/x-pack/agent/pkg/agent/application/fleet_gateway.go @@ -0,0 +1,118 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "time" + + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" + "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" + "github.com/elastic/beats/x-pack/agent/pkg/scheduler" +) + +type dispatcher interface { + Dispatch(...action) error +} + +// fleetGateway is a gateway between the Agent and the Fleet API, it's take cares of all the +// bidirectionnal commmmunication requirements. The gateway aggregates events and will periodically +// call the API to send the events and will receive actions to be executed locally. +// The only supported action for now is a "ActionPolicyChange". +type fleetGateway struct { + log *logger.Logger + dispatcher dispatcher + client clienter + scheduler scheduler.Scheduler + agentID string + done chan struct{} +} + +type fleetGatewaySettings struct { + Duration time.Duration +} + +func newFleetGateway( + log *logger.Logger, + settings *fleetGatewaySettings, + agentID string, + client clienter, + d dispatcher, +) (*fleetGateway, error) { + scheduler := scheduler.NewPeriodic(settings.Duration) + return newFleetGatewayWithScheduler( + log, + settings, + agentID, + client, + d, + scheduler, + ) +} + +func newFleetGatewayWithScheduler( + log *logger.Logger, + settings *fleetGatewaySettings, + agentID string, + client clienter, + d dispatcher, + scheduler scheduler.Scheduler, +) (*fleetGateway, error) { + return &fleetGateway{ + log: log, + dispatcher: d, + client: client, + agentID: agentID, //TODO(ph): this need to be a struct. + scheduler: scheduler, + done: make(chan struct{}), + }, nil +} + +func (f *fleetGateway) worker() { + for { + select { + case <-f.scheduler.WaitTick(): + f.log.Debug("FleetGateway calling Checkin API") + resp, err := f.execute() + if err != nil { + f.log.Error(err) + continue + } + + actions := make([]action, len(resp.Actions)) + for idx, a := range resp.Actions { + actions[idx] = a + } + + if err := f.dispatcher.Dispatch(actions...); err != nil { + f.log.Error(err) + } + + f.log.Debug("FleetGateway sleeping") + case <-f.done: + return + } + } +} + +func (f *fleetGateway) execute() (*fleetapi.CheckinResponse, error) { + // TODO(ph): Aggregates and send events. + cmd := fleetapi.NewCheckinCmd(f.agentID, f.client) + + req := &fleetapi.CheckinRequest{} + resp, err := cmd.Execute(req) + if err != nil { + return nil, err + } + + return resp, nil +} + +func (f *fleetGateway) Start() { + go f.worker() +} + +func (f *fleetGateway) Stop() { + close(f.done) +} diff --git a/x-pack/agent/pkg/agent/application/fleet_gateway_test.go b/x-pack/agent/pkg/agent/application/fleet_gateway_test.go new file mode 100644 index 000000000000..db6e299a5539 --- /dev/null +++ b/x-pack/agent/pkg/agent/application/fleet_gateway_test.go @@ -0,0 +1,250 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" + "github.com/elastic/beats/x-pack/agent/pkg/scheduler" +) + +type clientCallbackFunc func(headers http.Header, body io.Reader) (*http.Response, error) + +type testingClient struct { + sync.Mutex + callback clientCallbackFunc + received chan struct{} +} + +func (t *testingClient) Send( + method string, + path string, + params url.Values, + headers http.Header, + body io.Reader, +) (*http.Response, error) { + t.Lock() + defer t.Unlock() + defer func() { t.received <- struct{}{} }() + return t.callback(headers, body) +} + +func (t *testingClient) Answer(fn clientCallbackFunc) <-chan struct{} { + t.Lock() + defer t.Unlock() + t.callback = fn + return t.received +} + +func newTestingClient() *testingClient { + return &testingClient{received: make(chan struct{})} +} + +type testingDispatcherFunc func(...action) error + +type testingDispatcher struct { + sync.Mutex + callback testingDispatcherFunc + received chan struct{} +} + +func (t *testingDispatcher) Dispatch(actions ...action) error { + t.Lock() + defer t.Unlock() + defer func() { t.received <- struct{}{} }() + return t.callback(actions...) +} + +func (t *testingDispatcher) Answer(fn testingDispatcherFunc) <-chan struct{} { + t.Lock() + defer t.Unlock() + t.callback = fn + return t.received +} + +func newTestingDispatcher() *testingDispatcher { + return &testingDispatcher{received: make(chan struct{})} +} + +type withGatewayFunc func(*testing.T, *fleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper) + +func withGateway(agentID string, fn withGatewayFunc) func(t *testing.T) { + return func(t *testing.T) { + scheduler := scheduler.NewStepper() + client := newTestingClient() + dispatcher := newTestingDispatcher() + + log, _ := logger.New() + + gateway, err := newFleetGatewayWithScheduler( + log, + &fleetGatewaySettings{}, + agentID, + client, + dispatcher, + scheduler, + ) + + go gateway.Start() + defer gateway.Stop() + + require.NoError(t, err) + + fn(t, gateway, client, dispatcher, scheduler) + } +} + +func ackSeq(channels ...<-chan struct{}) <-chan struct{} { + comm := make(chan struct{}) + go func(comm chan struct{}) { + for _, c := range channels { + <-c + } + comm <- struct{}{} + }(comm) + return comm +} + +func wrapStrToResp(code int, body string) *http.Response { + return &http.Response{ + Status: fmt.Sprintf("%d %s", code, http.StatusText(code)), + StatusCode: code, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Body: ioutil.NopCloser(bytes.NewBufferString(body)), + ContentLength: int64(len(body)), + Header: make(http.Header, 0), + } +} + +func TestFleetGateway(t *testing.T) { + agentID := "agent-secret" + t.Run("send no event and receive no action", withGateway(agentID, func( + t *testing.T, + gateway *fleetGateway, + client *testingClient, + dispatcher *testingDispatcher, + scheduler *scheduler.Stepper, + ) { + received := ackSeq( + client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { + // TODO: assert no events + resp := wrapStrToResp(http.StatusOK, `{ "actions": [], "success": true }`) + return resp, nil + }), + dispatcher.Answer(func(actions ...action) error { + require.Equal(t, 0, len(actions)) + return nil + }), + ) + + // Synchronize scheduler and acking of calls from the worker go routine. + scheduler.Next() + <-received + })) + + t.Run("Successfully connects and receives a series of actions", withGateway(agentID, func( + t *testing.T, + gateway *fleetGateway, + client *testingClient, + dispatcher *testingDispatcher, + scheduler *scheduler.Stepper, + ) { + received := ackSeq( + client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { + // TODO: assert no events + resp := wrapStrToResp(http.StatusOK, ` +{ + "actions": [ + { + "type": "POLICY_CHANGE", + "id": "id1", + "data": { + "policy": { + "id": "policy-id" + } + } + }, + { + "type": "ANOTHER_ACTION", + "id": "id2" + } + ], + "success": true +} +`) + return resp, nil + }), + dispatcher.Answer(func(actions ...action) error { + require.Equal(t, 2, len(actions)) + return nil + }), + ) + + scheduler.Next() + <-received + })) + + // Test the normal time based execution. + t.Run("Periodically communicates with Fleet", func(t *testing.T) { + scheduler := scheduler.NewPeriodic(1 * time.Second) + client := newTestingClient() + dispatcher := newTestingDispatcher() + + log, _ := logger.New() + gateway, err := newFleetGatewayWithScheduler( + log, + &fleetGatewaySettings{}, + agentID, + client, + dispatcher, + scheduler, + ) + + go gateway.Start() + defer gateway.Stop() + + require.NoError(t, err) + + var count int + for { + received := ackSeq( + client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { + // TODO: assert no events + resp := wrapStrToResp(http.StatusOK, `{ "actions": [], "success": true }`) + return resp, nil + }), + dispatcher.Answer(func(actions ...action) error { + require.Equal(t, 0, len(actions)) + return nil + }), + ) + + <-received + count++ + if count == 5 { + return + } + } + }) + + t.Run("Successfully connects and sends events back to fleet", skip) +} + +func skip(t *testing.T) { + t.SkipNow() +} diff --git a/x-pack/agent/pkg/agent/application/handler_action_policy_change.go b/x-pack/agent/pkg/agent/application/handler_action_policy_change.go new file mode 100644 index 000000000000..55f12c6f1289 --- /dev/null +++ b/x-pack/agent/pkg/agent/application/handler_action_policy_change.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/x-pack/agent/pkg/config" + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" + "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" +) + +type handlerPolicyChange struct { + log *logger.Logger + emitter emitterFunc +} + +func (h *handlerPolicyChange) Handle(a action) error { + h.log.Debugf("HandlerPolicyChange: action '%+v' received", a) + action, ok := a.(*fleetapi.ActionPolicyChange) + if !ok { + return fmt.Errorf("invalid type, expected ActionPolicyChange and received %T", a) + } + + c, err := config.NewConfigFrom(action.Policy) + if err != nil { + return errors.Wrap(err, "could not parse the configuration from the policy") + } + + h.log.Debug("HandlerPolicyChange: emit configuration") + + return h.emitter(c) +} diff --git a/x-pack/agent/pkg/agent/application/handler_action_policy_change_test.go b/x-pack/agent/pkg/agent/application/handler_action_policy_change_test.go new file mode 100644 index 000000000000..0948c2723ecd --- /dev/null +++ b/x-pack/agent/pkg/agent/application/handler_action_policy_change_test.go @@ -0,0 +1,61 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/x-pack/agent/pkg/config" + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" + "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" +) + +type mockEmitter struct { + err error + policy *config.Config +} + +func (m *mockEmitter) Emitter(policy *config.Config) error { + m.policy = policy + return m.err +} + +func TestPolicyChange(t *testing.T) { + log, _ := logger.New() + t.Run("Receive a policy change and successfully emits a raw configuration", func(t *testing.T) { + emitter := &mockEmitter{} + + policy := map[string]interface{}{"hello": "world"} + action := &fleetapi.ActionPolicyChange{ + ActionBase: &fleetapi.ActionBase{ActionID: "abc123", ActionType: "POLICY_CHANGE"}, + Policy: policy, + } + + handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + + err := handler.Handle(action) + require.NoError(t, err) + require.Equal(t, config.MustNewConfigFrom(policy), emitter.policy) + }) + + t.Run("Receive a policy and fail to emits a raw configuration", func(t *testing.T) { + mockErr := errors.New("error returned") + emitter := &mockEmitter{err: mockErr} + + policy := map[string]interface{}{"hello": "world"} + action := &fleetapi.ActionPolicyChange{ + ActionBase: &fleetapi.ActionBase{ActionID: "abc123", ActionType: "POLICY_CHANGE"}, + Policy: policy, + } + + handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + + err := handler.Handle(action) + require.Error(t, err) + }) +} diff --git a/x-pack/agent/pkg/agent/application/handler_default.go b/x-pack/agent/pkg/agent/application/handler_default.go new file mode 100644 index 000000000000..2dbd40b0b573 --- /dev/null +++ b/x-pack/agent/pkg/agent/application/handler_default.go @@ -0,0 +1,16 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import "github.com/elastic/beats/x-pack/agent/pkg/core/logger" + +type handlerDefault struct { + log *logger.Logger +} + +func (h *handlerDefault) Handle(a action) error { + h.log.Errorf("HandlerDefault: action '%+v' received", a) + return nil +} diff --git a/x-pack/agent/pkg/agent/application/handler_unknown.go b/x-pack/agent/pkg/agent/application/handler_unknown.go new file mode 100644 index 000000000000..e1ac3723225a --- /dev/null +++ b/x-pack/agent/pkg/agent/application/handler_unknown.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" +) + +type handlerUnknown struct { + log *logger.Logger +} + +func (h *handlerUnknown) Handle(a action) error { + h.log.Errorf("HandlerUnknown: action '%+v' received", a) + return nil +} diff --git a/x-pack/agent/pkg/agent/application/local_mode.go b/x-pack/agent/pkg/agent/application/local_mode.go index 98fe2315a0ff..cc90c454bd18 100644 --- a/x-pack/agent/pkg/agent/application/local_mode.go +++ b/x-pack/agent/pkg/agent/application/local_mode.go @@ -15,7 +15,7 @@ import ( logreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/log" ) -type emitterFunc func([]string) error +type emitterFunc func(*config.Config) error // ConfigHandler is capable of handling config and perform actions at it. type ConfigHandler interface { diff --git a/x-pack/agent/pkg/agent/application/managed_mode.go b/x-pack/agent/pkg/agent/application/managed_mode.go index 875735ce2940..e71e9a39f471 100644 --- a/x-pack/agent/pkg/agent/application/managed_mode.go +++ b/x-pack/agent/pkg/agent/application/managed_mode.go @@ -9,18 +9,21 @@ import ( "net/http" "net/url" + "time" + "github.com/pkg/errors" "github.com/elastic/beats/x-pack/agent/pkg/agent/storage" "github.com/elastic/beats/x-pack/agent/pkg/config" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" - reporting "github.com/elastic/beats/x-pack/agent/pkg/reporter" fleetreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/fleet" logreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/log" ) +var durationTick = 10 * time.Second + type apiClient interface { Send( method string, @@ -34,9 +37,11 @@ type apiClient interface { // Managed application, when the application is run in managed mode, most of the configuration are // coming from the Fleet App. type Managed struct { - log *logger.Logger - Config FleetAgentConfig - api apiClient + log *logger.Logger + Config FleetAgentConfig + api apiClient + agentID string + gateway *fleetGateway } func newManaged( @@ -60,6 +65,8 @@ func newManaged( return nil, errors.Wrapf(err, "fail to read configuration %s for the agent", path) } + rawConfig.Merge(config) + cfg := defaultFleetAgentConfig() if err := config.Unpack(cfg); err != nil { return nil, errors.Wrapf(err, "fail to unpack configuration from %s", path) @@ -75,27 +82,57 @@ func newManaged( return nil, errors.Wrap(err, "fail to create reporters") } - // TODO(michal, ph) Link router with configuration - _, err = newRouter(log, streamFactory(config, client, reporter)) + router, err := newRouter(log, streamFactory(rawConfig, client, reporter)) if err != nil { return nil, errors.Wrap(err, "fail to initialize pipeline router") } + emit := emitter(log, router) + + actionDispatcher, err := newActionDispatcher(log, &handlerDefault{log: log}) + if err != nil { + return nil, err + } + + actionDispatcher.MustRegister( + &fleetapi.ActionPolicyChange{}, + &handlerPolicyChange{log: log, emitter: emit}, + ) + + actionDispatcher.MustRegister( + &fleetapi.ActionUnknown{}, + &handlerUnknown{log: log}, + ) + + gateway, err := newFleetGateway( + log, + &fleetGatewaySettings{Duration: durationTick}, + agentID, + client, + actionDispatcher, + ) + if err != nil { + return nil, err + } + return &Managed{ - log: log, - api: client, + log: log, + agentID: agentID, + gateway: gateway, }, nil } // Start starts a managed agent. func (m *Managed) Start() error { m.log.Info("Agent is starting") - defer m.log.Info("Agent is stopped") + m.gateway.Start() return nil } // Stop stops a managed agent. func (m *Managed) Stop() error { + defer m.log.Info("Agent is stopped") + m.gateway.Stop() return nil } diff --git a/x-pack/agent/pkg/agent/application/once.go b/x-pack/agent/pkg/agent/application/once.go index c88c942f3b45..70f9bf026ab0 100644 --- a/x-pack/agent/pkg/agent/application/once.go +++ b/x-pack/agent/pkg/agent/application/once.go @@ -30,7 +30,7 @@ func (o *once) Start() error { return ErrNoConfiguration } - return o.emitter(files) + return readfiles(files, o.emitter) } func (o *once) Stop() error { diff --git a/x-pack/agent/pkg/agent/application/periodic.go b/x-pack/agent/pkg/agent/application/periodic.go index ee954672cfcb..13979e932ea8 100644 --- a/x-pack/agent/pkg/agent/application/periodic.go +++ b/x-pack/agent/pkg/agent/application/periodic.go @@ -83,7 +83,7 @@ func (p *periodic) work() error { p.log.Debugf("Unchanged %d files: %s", len(s.Unchanged), strings.Join(s.Updated, ", ")) } - err := p.emitter(files) + err := readfiles(files, p.emitter) if err != nil { // assume something when really wrong and invalidate any cache // so we get a full new config on next tick. diff --git a/x-pack/agent/pkg/agent/application/router.go b/x-pack/agent/pkg/agent/application/router.go index ddb3b6ad7917..603b960e85d4 100644 --- a/x-pack/agent/pkg/agent/application/router.go +++ b/x-pack/agent/pkg/agent/application/router.go @@ -59,7 +59,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e p, ok := r.routes.Get(rk) var err error if !ok { - r.log.Debugf("Creating stream %s", rk) + r.log.Debugf("Creating stream: %s", rk) p, err = r.streamFactory(r.log, rk) if err != nil { return err diff --git a/x-pack/agent/pkg/scheduler/scheduler.go b/x-pack/agent/pkg/scheduler/scheduler.go new file mode 100644 index 000000000000..c73191bafa15 --- /dev/null +++ b/x-pack/agent/pkg/scheduler/scheduler.go @@ -0,0 +1,73 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package scheduler + +import "time" + +// Scheduler simple interface that encapsulate the scheduling logic, this is useful if you want to +// test asynchronous code in a synchronous way. +type Scheduler interface { + WaitTick() <-chan time.Time + Stop() +} + +// Stepper is a scheduler where each Tick is manully triggered, this is useful in scenario +// when you want to test the behavior of asynchronous code in a synchronous way. +type Stepper struct { + C chan time.Time +} + +// Next trigger the WaitTick unblock manually. +func (s *Stepper) Next() { + s.C <- time.Now() +} + +// WaitTick returns a channel to watch for ticks. +func (s *Stepper) WaitTick() <-chan time.Time { + return s.C +} + +// Stop is stopping the scheduler, in the case of the Stepper scheduler nothing is done. +func (s *Stepper) Stop() {} + +// NewStepper returns a new Stepper scheduler where the tick is manually controlled. +func NewStepper() *Stepper { + return &Stepper{ + C: make(chan time.Time), + } +} + +// Periodic wraps a time.Timer as the scheduler. +type Periodic struct { + Ticker *time.Ticker + ran bool +} + +// NewPeriodic returns a Periodic scheduler that will unblock the WaitTick based on a duration. +// The timer will do an initial tick, sleep for the defined period and tick again. +func NewPeriodic(d time.Duration) *Periodic { + return &Periodic{Ticker: time.NewTicker(d)} +} + +// WaitTick wait on the duration to be experied to unblock the channel. +// Note: you should not keep a reference to the channel. +func (p *Periodic) WaitTick() <-chan time.Time { + if p.ran { + return p.Ticker.C + } + + rC := make(chan time.Time, 1) + rC <- time.Now() + p.ran = true + + return rC +} + +// Stop stops the internal Ticker. +// Note this will not close the internal channel is up to the developer to unblock the goroutine +// using another mechanism. +func (p *Periodic) Stop() { + p.Ticker.Stop() +} diff --git a/x-pack/agent/pkg/scheduler/scheduler_test.go b/x-pack/agent/pkg/scheduler/scheduler_test.go new file mode 100644 index 000000000000..291ce75aa5d3 --- /dev/null +++ b/x-pack/agent/pkg/scheduler/scheduler_test.go @@ -0,0 +1,107 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package scheduler + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type e struct { + count int + at time.Time +} + +type tickRecorder struct { + scheduler Scheduler + count int + done chan struct{} + recorder chan e +} + +func (m *tickRecorder) Start() { + for { + select { + case t := <-m.scheduler.WaitTick(): + m.count = m.count + 1 + m.recorder <- e{count: m.count, at: t} + case <-m.done: + return + } + } +} + +func (m *tickRecorder) Stop() { + close(m.done) +} + +func TestScheduler(t *testing.T) { + t.Run("Step scheduler", testStepScheduler) +} + +func newTickRecorder(scheduler Scheduler) *tickRecorder { + return &tickRecorder{ + scheduler: scheduler, + done: make(chan struct{}), + recorder: make(chan e), + } +} + +func testStepScheduler(t *testing.T) { + t.Run("Trigger the Tick manually", func(t *testing.T) { + scheduler := NewStepper() + defer scheduler.Stop() + + recorder := newTickRecorder(scheduler) + go recorder.Start() + defer recorder.Stop() + + scheduler.Next() + nE := <-recorder.recorder + require.Equal(t, 1, nE.count) + scheduler.Next() + nE = <-recorder.recorder + require.Equal(t, 2, nE.count) + scheduler.Next() + nE = <-recorder.recorder + require.Equal(t, 3, nE.count) + }) +} + +func testPeriodic(t *testing.T) { + t.Run("tick than wait", func(t *testing.T) { + duration := 1 * time.Minute + scheduler := NewPeriodic(duration) + defer scheduler.Stop() + + startedAt := time.Now() + recorder := newTickRecorder(scheduler) + go recorder.Start() + defer recorder.Stop() + + nE := <-recorder.recorder + + require.True(t, nE.at.Sub(startedAt) < duration) + }) + + t.Run("multiple tick", func(t *testing.T) { + duration := 1 * time.Millisecond + scheduler := NewPeriodic(duration) + defer scheduler.Stop() + + recorder := newTickRecorder(scheduler) + go recorder.Start() + defer recorder.Stop() + + nE := <-recorder.recorder + require.Equal(t, 1, nE.count) + nE = <-recorder.recorder + require.Equal(t, 2, nE.count) + nE = <-recorder.recorder + require.Equal(t, 3, nE.count) + }) +}