From d72ffeb9ccbe4955b71b7950f13b37432e6666c6 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 18 Nov 2019 15:52:36 -0500 Subject: [PATCH 1/2] Refactor: Rename pipeline into stream To align the code base with the multiple output strategy we will rename the pipeline concept into the streams. See #14445 --- .../pkg/agent/application/application.go | 2 +- x-pack/agent/pkg/agent/application/router.go | 32 +++++++++---------- .../pkg/agent/application/router_test.go | 24 +++++++------- .../application/{pipeline.go => stream.go} | 12 +++---- 4 files changed, 35 insertions(+), 35 deletions(-) rename x-pack/agent/pkg/agent/application/{pipeline.go => stream.go} (85%) diff --git a/x-pack/agent/pkg/agent/application/application.go b/x-pack/agent/pkg/agent/application/application.go index e2baf4ce81b4..178fe6dc56ba 100644 --- a/x-pack/agent/pkg/agent/application/application.go +++ b/x-pack/agent/pkg/agent/application/application.go @@ -63,7 +63,7 @@ func createApplication( return nil, err } - router, err := newRouter(log, pipelineFactory(config, client, reporter)) + router, err := newRouter(log, streamFactory(config, client, reporter)) if err != nil { return nil, errors.Wrap(err, "initiating application") } diff --git a/x-pack/agent/pkg/agent/application/router.go b/x-pack/agent/pkg/agent/application/router.go index 1114732b2e85..ddb3b6ad7917 100644 --- a/x-pack/agent/pkg/agent/application/router.go +++ b/x-pack/agent/pkg/agent/application/router.go @@ -18,20 +18,20 @@ var defautlRK = "DEFAULT" type routingKey = string -type pipeline interface { +type stream interface { Execute(*configRequest) error Close() error } -type pipelineFunc func(*logger.Logger, routingKey) (pipeline, error) +type streamFunc func(*logger.Logger, routingKey) (stream, error) type router struct { - log *logger.Logger - routes *sorted.Set - pipelineFactory pipelineFunc + log *logger.Logger + routes *sorted.Set + streamFactory streamFunc } -func newRouter(log *logger.Logger, factory pipelineFunc) (*router, error) { +func newRouter(log *logger.Logger, factory streamFunc) (*router, error) { var err error if log == nil { log, err = logger.New() @@ -39,7 +39,7 @@ func newRouter(log *logger.Logger, factory pipelineFunc) (*router, error) { return nil, err } } - return &router{log: log, pipelineFactory: factory, routes: sorted.NewSet()}, nil + return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil } func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) error { @@ -54,13 +54,13 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e for _, rk := range s.Keys() { active[rk] = true - // Are we already runnings this pipeline? - // If we don't we create it otherwise we just forward the config request. + // Are we already runnings this streams? + // When it doesn't exist we just create it, if it already exist we forward the configuration. p, ok := r.routes.Get(rk) var err error if !ok { - r.log.Debugf("Creating pipeline %s", rk) - p, err = r.pipelineFactory(r.log, rk) + r.log.Debugf("Creating stream %s", rk) + p, err = r.streamFactory(r.log, rk) if err != nil { return err } @@ -78,20 +78,20 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e } r.log.Debugf( - "Pipeline %s need to run config with ID %s and programs: %s", + "Streams %s need to run config with ID %s and programs: %s", rk, req.ShortID(), strings.Join(req.ProgramNames(), ", "), ) - err = p.(pipeline).Execute(req) + err = p.(stream).Execute(req) if err != nil { return err } } - // cleanup inactive pipelines. - // pipelines are shutdown down in alphabetical order. + // cleanup inactive streams. + // streams are shutdown down in alphabetical order. keys := r.routes.Keys() for _, k := range keys { _, ok := active[k] @@ -104,7 +104,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e continue } - p.(pipeline).Close() + p.(stream).Close() r.routes.Remove(k) } diff --git a/x-pack/agent/pkg/agent/application/router_test.go b/x-pack/agent/pkg/agent/application/router_test.go index 9da7d2524959..294ac682d0f7 100644 --- a/x-pack/agent/pkg/agent/application/router_test.go +++ b/x-pack/agent/pkg/agent/application/router_test.go @@ -44,7 +44,7 @@ type notifyFunc func(routingKey, rOp, ...interface{}) func TestRouter(t *testing.T) { programs := []program.Program{program.Program{Spec: program.Supported[1]}} - t.Run("create new and destroy unused pipeline", func(t *testing.T) { + t.Run("create new and destroy unused stream", func(t *testing.T) { recorder := &recorder{} r, err := newRouter(nil, recorder.factory) require.NoError(t, err) @@ -71,7 +71,7 @@ func TestRouter(t *testing.T) { }, recorder.events) }) - t.Run("multiples create new and destroy unused pipeline", func(t *testing.T) { + t.Run("multiples create new and destroy unused stream", func(t *testing.T) { k1 := "KEY_1" k2 := "KEY_2" @@ -112,7 +112,7 @@ func TestRouter(t *testing.T) { }, recorder.events) }) - t.Run("create new and delegate program to existing pipeline", func(t *testing.T) { + t.Run("create new and delegate program to existing stream", func(t *testing.T) { recorder := &recorder{} r, err := newRouter(nil, recorder.factory) require.NoError(t, err) @@ -136,7 +136,7 @@ func TestRouter(t *testing.T) { }, recorder.events) }) - t.Run("when no pipelines are detected we shutdown all the running pipelines", func(t *testing.T) { + t.Run("when no stream are detected we shutdown all the running streams", func(t *testing.T) { k1 := "KEY_1" k2 := "KEY_2" @@ -174,8 +174,8 @@ type recorder struct { events []event } -func (r *recorder) factory(_ *logger.Logger, rk routingKey) (pipeline, error) { - return newMockPipeline(rk, r.notify), nil +func (r *recorder) factory(_ *logger.Logger, rk routingKey) (stream, error) { + return newMockStream(rk, r.notify), nil } func (r *recorder) notify(rk routingKey, op rOp, args ...interface{}) { @@ -186,30 +186,30 @@ func (r *recorder) reset() { r.events = nil } -type mockPipeline struct { +type mockStream struct { rk routingKey notify notifyFunc } -func newMockPipeline(rk routingKey, notify notifyFunc) *mockPipeline { +func newMockStream(rk routingKey, notify notifyFunc) *mockStream { notify(rk, createOp) - return &mockPipeline{ + return &mockStream{ rk: rk, notify: notify, } } -func (m *mockPipeline) Execute(req *configRequest) error { +func (m *mockStream) Execute(req *configRequest) error { m.event(executeOp, req) return nil } -func (m *mockPipeline) Close() error { +func (m *mockStream) Close() error { m.event(closeOp) return nil } -func (m *mockPipeline) event(op rOp, args ...interface{}) { +func (m *mockStream) event(op rOp, args ...interface{}) { m.notify(m.rk, op, args...) } diff --git a/x-pack/agent/pkg/agent/application/pipeline.go b/x-pack/agent/pkg/agent/application/stream.go similarity index 85% rename from x-pack/agent/pkg/agent/application/pipeline.go rename to x-pack/agent/pkg/agent/application/stream.go index ff8239fe7ee0..7b3e0044a697 100644 --- a/x-pack/agent/pkg/agent/application/pipeline.go +++ b/x-pack/agent/pkg/agent/application/stream.go @@ -40,28 +40,28 @@ type sender interface { ) (*http.Response, error) } -type operatorPipeline struct { +type operatorStream struct { configHandler ConfigHandler log *logger.Logger } -func (b *operatorPipeline) Close() error { +func (b *operatorStream) Close() error { return nil } -func (b *operatorPipeline) Execute(cfg *configRequest) error { +func (b *operatorStream) Execute(cfg *configRequest) error { return b.configHandler.HandleConfig(cfg) } -func pipelineFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (pipeline, error) { - return func(log *logger.Logger, id routingKey) (pipeline, error) { +func streamFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) { + return func(log *logger.Logger, id routingKey) (stream, error) { // new operator per pipeline to isolate processes without using tags operator, err := newOperator(log, cfg, r) if err != nil { return nil, err } - return &operatorPipeline{ + return &operatorStream{ log: log, configHandler: operator, }, nil From 3c80ec95cb155026d2291c65350e2b6adc31347c Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 25 Nov 2019 12:06:48 -0500 Subject: [PATCH 2/2] fix comment --- x-pack/agent/pkg/agent/application/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/agent/pkg/agent/application/stream.go b/x-pack/agent/pkg/agent/application/stream.go index 7b3e0044a697..37e0cef229e8 100644 --- a/x-pack/agent/pkg/agent/application/stream.go +++ b/x-pack/agent/pkg/agent/application/stream.go @@ -55,7 +55,7 @@ func (b *operatorStream) Execute(cfg *configRequest) error { func streamFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) { return func(log *logger.Logger, id routingKey) (stream, error) { - // new operator per pipeline to isolate processes without using tags + // new operator per stream to isolate processes without using tags operator, err := newOperator(log, cfg, r) if err != nil { return nil, err