Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func createApplication(
return nil, err
}

router, err := newRouter(log, pipelineFactory(config, client, reporter))
router, err := newRouter(log, streamFactory(config, client, reporter))
if err != nil {
return nil, errors.Wrap(err, "initiating application")
}
Expand Down
32 changes: 16 additions & 16 deletions x-pack/agent/pkg/agent/application/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@ var defautlRK = "DEFAULT"

type routingKey = string

type pipeline interface {
type stream interface {
Execute(*configRequest) error
Close() error
}

type pipelineFunc func(*logger.Logger, routingKey) (pipeline, error)
type streamFunc func(*logger.Logger, routingKey) (stream, error)

type router struct {
log *logger.Logger
routes *sorted.Set
pipelineFactory pipelineFunc
log *logger.Logger
routes *sorted.Set
streamFactory streamFunc
}

func newRouter(log *logger.Logger, factory pipelineFunc) (*router, error) {
func newRouter(log *logger.Logger, factory streamFunc) (*router, error) {
var err error
if log == nil {
log, err = logger.New()
if err != nil {
return nil, err
}
}
return &router{log: log, pipelineFactory: factory, routes: sorted.NewSet()}, nil
return &router{log: log, streamFactory: factory, routes: sorted.NewSet()}, nil
}

func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) error {
Expand All @@ -54,13 +54,13 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
for _, rk := range s.Keys() {
active[rk] = true

// Are we already runnings this pipeline?
// If we don't we create it otherwise we just forward the config request.
// Are we already runnings this streams?
// When it doesn't exist we just create it, if it already exist we forward the configuration.
p, ok := r.routes.Get(rk)
var err error
if !ok {
r.log.Debugf("Creating pipeline %s", rk)
p, err = r.pipelineFactory(r.log, rk)
r.log.Debugf("Creating stream %s", rk)
p, err = r.streamFactory(r.log, rk)
if err != nil {
return err
}
Expand All @@ -78,20 +78,20 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
}

r.log.Debugf(
"Pipeline %s need to run config with ID %s and programs: %s",
"Streams %s need to run config with ID %s and programs: %s",
rk,
req.ShortID(),
strings.Join(req.ProgramNames(), ", "),
)

err = p.(pipeline).Execute(req)
err = p.(stream).Execute(req)
if err != nil {
return err
}
}

// cleanup inactive pipelines.
// pipelines are shutdown down in alphabetical order.
// cleanup inactive streams.
// streams are shutdown down in alphabetical order.
keys := r.routes.Keys()
for _, k := range keys {
_, ok := active[k]
Expand All @@ -104,7 +104,7 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e
continue
}

p.(pipeline).Close()
p.(stream).Close()
r.routes.Remove(k)
}

Expand Down
24 changes: 12 additions & 12 deletions x-pack/agent/pkg/agent/application/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type notifyFunc func(routingKey, rOp, ...interface{})
func TestRouter(t *testing.T) {
programs := []program.Program{program.Program{Spec: program.Supported[1]}}

t.Run("create new and destroy unused pipeline", func(t *testing.T) {
t.Run("create new and destroy unused stream", func(t *testing.T) {
recorder := &recorder{}
r, err := newRouter(nil, recorder.factory)
require.NoError(t, err)
Expand All @@ -71,7 +71,7 @@ func TestRouter(t *testing.T) {
}, recorder.events)
})

t.Run("multiples create new and destroy unused pipeline", func(t *testing.T) {
t.Run("multiples create new and destroy unused stream", func(t *testing.T) {
k1 := "KEY_1"
k2 := "KEY_2"

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestRouter(t *testing.T) {
}, recorder.events)
})

t.Run("create new and delegate program to existing pipeline", func(t *testing.T) {
t.Run("create new and delegate program to existing stream", func(t *testing.T) {
recorder := &recorder{}
r, err := newRouter(nil, recorder.factory)
require.NoError(t, err)
Expand All @@ -136,7 +136,7 @@ func TestRouter(t *testing.T) {
}, recorder.events)
})

t.Run("when no pipelines are detected we shutdown all the running pipelines", func(t *testing.T) {
t.Run("when no stream are detected we shutdown all the running streams", func(t *testing.T) {
k1 := "KEY_1"
k2 := "KEY_2"

Expand Down Expand Up @@ -174,8 +174,8 @@ type recorder struct {
events []event
}

func (r *recorder) factory(_ *logger.Logger, rk routingKey) (pipeline, error) {
return newMockPipeline(rk, r.notify), nil
func (r *recorder) factory(_ *logger.Logger, rk routingKey) (stream, error) {
return newMockStream(rk, r.notify), nil
}

func (r *recorder) notify(rk routingKey, op rOp, args ...interface{}) {
Expand All @@ -186,30 +186,30 @@ func (r *recorder) reset() {
r.events = nil
}

type mockPipeline struct {
type mockStream struct {
rk routingKey
notify notifyFunc
}

func newMockPipeline(rk routingKey, notify notifyFunc) *mockPipeline {
func newMockStream(rk routingKey, notify notifyFunc) *mockStream {
notify(rk, createOp)
return &mockPipeline{
return &mockStream{
rk: rk,
notify: notify,
}
}

func (m *mockPipeline) Execute(req *configRequest) error {
func (m *mockStream) Execute(req *configRequest) error {
m.event(executeOp, req)
return nil
}

func (m *mockPipeline) Close() error {
func (m *mockStream) Close() error {
m.event(closeOp)
return nil
}

func (m *mockPipeline) event(op rOp, args ...interface{}) {
func (m *mockStream) event(op rOp, args ...interface{}) {
m.notify(m.rk, op, args...)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,28 @@ type sender interface {
) (*http.Response, error)
}

type operatorPipeline struct {
type operatorStream struct {
configHandler ConfigHandler
log *logger.Logger
}

func (b *operatorPipeline) Close() error {
func (b *operatorStream) Close() error {
return nil
}

func (b *operatorPipeline) Execute(cfg *configRequest) error {
func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func pipelineFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (pipeline, error) {
return func(log *logger.Logger, id routingKey) (pipeline, error) {
func streamFactory(cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per pipeline to isolate processes without using tags
Comment thread
ph marked this conversation as resolved.
Outdated
operator, err := newOperator(log, cfg, r)
if err != nil {
return nil, err
}

return &operatorPipeline{
return &operatorStream{
log: log,
configHandler: operator,
}, nil
Expand Down