Skip to content

Commit

Permalink
[v2] Use the v2 components runtime as the core of the Elastic Agent (#…
Browse files Browse the repository at this point in the history
…753)

* Add runtime for command v2 components.

* Fix imports.

* Add tests for watching checkins.

* Fix lint and move checkin period to a configurable timeout.

* Fix tests now that checkin timeout needs to be defined.

* Fix code review and lint.

* Work on actually running the v2 runtime.

* Work on switching to the v2 runtime.

* More work on switching to v2 runtime.

* Cleanup some imports.

* More import cleanups.

* Add TODO to FleetServerComponentModifier.

* Remove outdated managed_mode_test.go.

* Fixes from code review and lint.
  • Loading branch information
blakerouse authored Jul 26, 2022
1 parent 8812fc9 commit 5acdc40
Show file tree
Hide file tree
Showing 88 changed files with 3,109 additions and 6,524 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ go_env.properties
mage_output_file.go
elastic_agent
fleet.yml
fleet.enc
fleet.enc.lock

# Editor swap files
*.swp
Expand Down
74 changes: 53 additions & 21 deletions control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@ package cproto;
option cc_enable_arenas = true;
option go_package = "internal/pkg/agent/control/cproto";

// Status codes for the current state.
enum Status {
// State codes for the current state.
enum State {
STARTING = 0;
CONFIGURING = 1;
HEALTHY = 2;
DEGRADED = 3;
FAILED = 4;
STOPPING = 5;
UPGRADING = 6;
ROLLBACK = 7;
STOPPED = 6;
UPGRADING = 7;
ROLLBACK = 8;
}

// Unit Type running inside a component.
enum UnitType {
INPUT = 0;
OUTPUT = 1;
}

// Action status codes for restart and upgrade response.
Expand Down Expand Up @@ -93,18 +100,43 @@ message UpgradeResponse {
string error = 3;
}

// Current status of the application in Elastic Agent.
message ApplicationStatus {
// Unique application ID.
message ComponentUnitState {
// Type of unit in the component.
UnitType unit_type = 1;
// ID of the unit in the component.
string unit_id = 2;
// Current state.
State state = 3;
// Current state message.
string message = 4;
// Current state payload.
string payload = 5;
}

// Version information reported by the component to Elastic Agent.
message ComponentVersionInfo {
// Name of the component.
string name = 1;
// Version of the component.
string version = 2;
// Extra meta information about the version.
map<string, string> meta = 3;
}

// Current state of a running component by Elastic Agent.
message ComponentState {
// Unique component ID.
string id = 1;
// Application name.
// Component name.
string name = 2;
// Current status.
Status status = 3;
// Current status message.
// Current state.
State state = 3;
// Current state message.
string message = 4;
// Current status payload.
string payload = 5;
// Current units running in the component.
repeated ComponentUnitState units = 5;
// Current version information for the running component.
ComponentVersionInfo version_info = 6;
}

// Current metadata for a running process.
Expand All @@ -126,14 +158,14 @@ message ProcMeta {
string error = 15;
}

// Status is the current status of Elastic Agent.
message StatusResponse {
// Overall status of Elastic Agent.
Status status = 1;
// StateResponse is the current state of Elastic Agent.
message StateResponse {
// Overall state of Elastic Agent.
State state = 1;
// Overall status message of Elastic Agent.
string message = 2;
// Status of each application in Elastic Agent.
repeated ApplicationStatus applications = 3;
// Status of each component in Elastic Agent.
repeated ComponentState components = 3;
}

// ProcMetaResponse is the current running version infomation for all processes.
Expand Down Expand Up @@ -184,8 +216,8 @@ service ElasticAgentControl {
// Fetches the currently running version of the Elastic Agent.
rpc Version(Empty) returns (VersionResponse);

// Fetches the currently status of the Elastic Agent.
rpc Status(Empty) returns (StatusResponse);
// Fetches the currently states of the Elastic Agent.
rpc State(Empty) returns (StateResponse);

// Restart restarts the current running Elastic Agent.
rpc Restart(Empty) returns (RestartResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ package actions
import (
"context"

"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
)

// Handler handles action coming from fleet.
type Handler interface {
Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error
Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error
}

// ClientSetter sets the client for communication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"fmt"
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/server"
)

const (
Expand All @@ -25,27 +28,28 @@ var errActionTimeoutInvalid = errors.New("action timeout is invalid")

// AppAction is a handler for application actions.
type AppAction struct {
log *logger.Logger
srv *server.Server
log *logger.Logger
coord *coordinator.Coordinator
}

// NewAppAction creates a new AppAction handler.
func NewAppAction(log *logger.Logger, srv *server.Server) *AppAction {
func NewAppAction(log *logger.Logger, coord *coordinator.Coordinator) *AppAction {
return &AppAction{
log: log,
srv: srv,
log: log,
coord: coord,
}
}

// Handle handles application action.
func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error {
func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
h.log.Debugf("handlerAppAction: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionApp)
if !ok {
return fmt.Errorf("invalid type, expected ActionApp and received %T", a)
}

appState, ok := h.srv.FindByInputType(action.InputType)
state := h.coord.State()
unit, ok := findUnitFromInputType(state, action.InputType)
if !ok {
// If the matching action is not found ack the action with the error for action result document
action.StartedAt = time.Now().UTC().Format(time.RFC3339Nano)
Expand All @@ -71,8 +75,10 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker store.F

var res map[string]interface{}
if err == nil {
h.log.Debugf("handlerAppAction: action '%v' started with timeout: %v", action.InputType, timeout)
res, err = appState.PerformAction(action.InputType, params, timeout)
h.log.Debugf("handlerAppAction: action '%v' started with timeout: %v", action.ActionType, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err = h.coord.PerformAction(ctx, unit, action.ActionType, params)
}
end := time.Now().UTC()

Expand Down Expand Up @@ -143,3 +149,17 @@ func readMapString(m map[string]interface{}, key string, def string) string {
}
return def
}

func findUnitFromInputType(state coordinator.State, inputType string) (component.Unit, bool) {
for _, comp := range state.Components {
for _, unit := range comp.Component.Units {
if unit.Type == client.UnitTypeInput {
it, ok := unit.Config["type"]
if ok && it == inputType {
return unit, true
}
}
}
}
return component.Unit{}, false
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"context"
"fmt"

"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

Expand All @@ -32,7 +32,7 @@ func NewCancel(log *logger.Logger, cancel queueCanceler) *Cancel {
}

// Handle will cancel any actions in the queue that match target_id.
func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error {
func (h *Cancel) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
action, ok := a.(*fleetapi.ActionCancel)
if !ok {
return fmt.Errorf("invalid type, expected ActionCancel and received %T", a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (

"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/pipeline/actions"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/remote"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand All @@ -36,28 +36,28 @@ const (
// PolicyChange is a handler for POLICY_CHANGE action.
type PolicyChange struct {
log *logger.Logger
emitter pipeline.EmitterFunc
agentInfo *info.AgentInfo
config *configuration.Configuration
store storage.Store
ch chan coordinator.ConfigChange
setters []actions.ClientSetter
}

// NewPolicyChange creates a new PolicyChange handler.
func NewPolicyChange(
log *logger.Logger,
emitter pipeline.EmitterFunc,
agentInfo *info.AgentInfo,
config *configuration.Configuration,
store storage.Store,
ch chan coordinator.ConfigChange,
setters ...actions.ClientSetter,
) *PolicyChange {
return &PolicyChange{
log: log,
emitter: emitter,
agentInfo: agentInfo,
config: config,
store: store,
ch: ch,
setters: setters,
}
}
Expand All @@ -72,7 +72,7 @@ func (h *PolicyChange) AddSetter(cs actions.ClientSetter) {
}

// Handle handles policy change action.
func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker store.FleetAcker) error {
func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
h.log.Debugf("handlerPolicyChange: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionPolicyChange)
if !ok {
Expand All @@ -89,11 +89,19 @@ func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker stor
if err != nil {
return err
}
if err := h.emitter(ctx, c); err != nil {
return err

h.ch <- &policyChange{
ctx: ctx,
cfg: c,
action: a,
acker: acker,
}
return nil
}

return acker.Ack(ctx, action)
// Watch returns the channel for configuration change notifications.
func (h *PolicyChange) Watch() <-chan coordinator.ConfigChange {
return h.ch
}

func (h *PolicyChange) handleFleetServerHosts(ctx context.Context, c *config.Config) (err error) {
Expand Down Expand Up @@ -210,3 +218,33 @@ func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration)
}
return bytes.NewReader(data), nil
}

type policyChange struct {
ctx context.Context
cfg *config.Config
action fleetapi.Action
acker acker.Acker
commit bool
}

func (l *policyChange) Config() *config.Config {
return l.cfg
}

func (l *policyChange) Ack() error {
if l.action == nil {
return nil
}
err := l.acker.Ack(l.ctx, l.action)
if err != nil {
return err
}
if l.commit {
return l.acker.Commit(l.ctx)
}
return nil
}

func (l *policyChange) Fail(_ error) {
// do nothing
}
Loading

0 comments on commit 5acdc40

Please sign in to comment.