Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createApplication(
return nil, err
}

router, err := newRouter(log, pipelineFactory(config, client, reporter))
router, err := newRouter(log, streamFactory(config, client, reporter))
if err != nil {
return nil, errors.Wrap(err, "initiating application")
}
Expand Down
32 changes: 16 additions & 16 deletions x-pack/agent/pkg/agent/application/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@ var defautlRK = "DEFAULT"

type routingKey = string

type pipeline interface {
type stream interface {
Execute(*configRequest) error
Close() error
}

type pipelineFunc func(*logger.Logger, routingKey) (pipeline, error)
type streamFunc func(*logger.Logger, routingKey) (stream, error)

type router struct {
log *logger.Logger
routes *sorted.Set
pipelineFactory pipelineFunc
log *logger.Logger
routes *sorted.Set
streamFactory streamFunc
}

func newRouter(log *logger.Logger, factory pipelineFunc) (*router, error) {
func newRouter(log *logger.Logger, factory streamFunc) (*router, error) {
var err error
if log == nil {
log, err = logger.New()
if err != nil {
return nil, err
}
}
return &router{log: log, pipelineFactory: factory, routes: sorted.NewSet()}, nil
return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil
}

func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) error {
Expand All @@ -54,13 +54,13 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
for _, rk := range s.Keys() {
active[rk] = true

// Are we already runnings this pipeline?
// If we don't we create it otherwise we just forward the config request.
// Are we already runnings this streams?
// When it doesn't exist we just create it, if it already exist we forward the configuration.
p, ok := r.routes.Get(rk)
var err error
if !ok {
r.log.Debugf("Creating pipeline %s", rk)
p, err = r.pipelineFactory(r.log, rk)
r.log.Debugf("Creating stream %s", rk)
p, err = r.streamFactory(r.log, rk)
if err != nil {
return err
}
Expand All @@ -78,20 +78,20 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
}

r.log.Debugf(
"Pipeline %s need to run config with ID %s and programs: %s",
"Streams %s need to run config with ID %s and programs: %s",
rk,
req.ShortID(),
strings.Join(req.ProgramNames(), ", "),
)

err = p.(pipeline).Execute(req)
err = p.(stream).Execute(req)
if err != nil {
return err
}
}

// cleanup inactive pipelines.
// pipelines are shutdown down in alphabetical order.
// cleanup inactive streams.
// streams are shutdown down in alphabetical order.
keys := r.routes.Keys()
for _, k := range keys {
_, ok := active[k]
Expand All @@ -104,7 +104,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
continue
}

p.(pipeline).Close()
p.(stream).Close()
r.routes.Remove(k)
}

Expand Down
24 changes: 12 additions & 12 deletions x-pack/agent/pkg/agent/application/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type notifyFunc func(routingKey, rOp, ...interface{})
func TestRouter(t *testing.T) {
programs := []program.Program{program.Program{Spec: program.Supported[1]}}

t.Run("create new and destroy unused pipeline", func(t *testing.T) {
t.Run("create new and destroy unused stream", func(t *testing.T) {
recorder := &recorder{}
r, err := newRouter(nil, recorder.factory)
require.NoError(t, err)
Expand All @@ -71,7 +71,7 @@ func TestRouter(t *testing.T) {
}, recorder.events)
})

t.Run("multiples create new and destroy unused pipeline", func(t *testing.T) {
t.Run("multiples create new and destroy unused stream", func(t *testing.T) {
k1 := "KEY_1"
k2 := "KEY_2"

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestRouter(t *testing.T) {
}, recorder.events)
})

t.Run("create new and delegate program to existing pipeline", func(t *testing.T) {
t.Run("create new and delegate program to existing stream", func(t *testing.T) {
recorder := &recorder{}
r, err := newRouter(nil, recorder.factory)
require.NoError(t, err)
Expand All @@ -136,7 +136,7 @@ func TestRouter(t *testing.T) {
}, recorder.events)
})

t.Run("when no pipelines are detected we shutdown all the running pipelines", func(t *testing.T) {
t.Run("when no stream are detected we shutdown all the running streams", func(t *testing.T) {
k1 := "KEY_1"
k2 := "KEY_2"

Expand Down Expand Up @@ -174,8 +174,8 @@ type recorder struct {
events []event
}

func (r *recorder) factory(_ *logger.Logger, rk routingKey) (pipeline, error) {
return newMockPipeline(rk, r.notify), nil
func (r *recorder) factory(_ *logger.Logger, rk routingKey) (stream, error) {
return newMockStream(rk, r.notify), nil
}

func (r *recorder) notify(rk routingKey, op rOp, args ...interface{}) {
Expand All @@ -186,30 +186,30 @@ func (r *recorder) reset() {
r.events = nil
}

type mockPipeline struct {
type mockStream struct {
rk routingKey
notify notifyFunc
}

func newMockPipeline(rk routingKey, notify notifyFunc) *mockPipeline {
func newMockStream(rk routingKey, notify notifyFunc) *mockStream {
notify(rk, createOp)
return &mockPipeline{
return &mockStream{
rk: rk,
notify: notify,
}
}

func (m *mockPipeline) Execute(req *configRequest) error {
func (m *mockStream) Execute(req *configRequest) error {
m.event(executeOp, req)
return nil
}

func (m *mockPipeline) Close() error {
func (m *mockStream) Close() error {
m.event(closeOp)
return nil
}

func (m *mockPipeline) event(op rOp, args ...interface{}) {
func (m *mockStream) event(op rOp, args ...interface{}) {
m.notify(m.rk, op, args...)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,28 @@ type sender interface {
) (*http.Response, error)
}

type operatorPipeline struct {
type operatorStream struct {
configHandler ConfigHandler
log *logger.Logger
}

func (b *operatorPipeline) Close() error {
func (b *operatorStream) Close() error {
return nil
}

func (b *operatorPipeline) Execute(cfg *configRequest) error {
func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func pipelineFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (pipeline, error) {
return func(log *logger.Logger, id routingKey) (pipeline, error) {
// new operator per pipeline to isolate processes without using tags
func streamFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(log, cfg, r)
if err != nil {
return nil, err
}

return &operatorPipeline{
return &operatorStream{
log: log,
configHandler: operator,
}, nil
Expand Down