Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions x-pack/agent/pkg/agent/application/agent_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"fmt"

"github.com/gofrs/uuid"
)

func generateAgentID() (string, error) {
id, err := uuid.NewV4()
if err != nil {
return "", fmt.Errorf("error while generating UUID for agent: %v", err)
}

return id.String(), nil
}
28 changes: 28 additions & 0 deletions x-pack/agent/pkg/agent/application/agent_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

// AgentInfo is a collection of information about agent.
type AgentInfo struct {
Comment thread
michalpristas marked this conversation as resolved.
Comment thread
michalpristas marked this conversation as resolved.
agentID string
}

// NewAgentInfo creates a new agent information.
// Generates new unique identifier for agent.
func NewAgentInfo() (*AgentInfo, error) {
agentID, err := generateAgentID()
if err != nil {
return nil, err
}

return &AgentInfo{
agentID: agentID,
}, nil
}

// AgentID returns an agent identifier.
func (i *AgentInfo) AgentID() string {
return i.agentID
}
6 changes: 1 addition & 5 deletions x-pack/agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type Application interface {
Start() error
Stop() error
AgentInfo() *AgentInfo
}

// New creates a new Agent and bootstrap the required subsystem.
Expand Down Expand Up @@ -60,8 +61,3 @@ func createApplication(
return nil, ErrInvalidMgmtMode
}
}

func getAgentID() string {
// TODO: implement correct way of acquiring agent ID
return "default"
}
18 changes: 11 additions & 7 deletions x-pack/agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ type dispatcher interface {
Dispatch(...action) error
}

type agentInfo interface {
AgentID() string
}

// fleetGateway is a gateway between the Agent and the Fleet API, it's take cares of all the
// bidirectionnal commmmunication requirements. The gateway aggregates events and will periodically
// bidirectional communication requirements. The gateway aggregates events and will periodically
// call the API to send the events and will receive actions to be executed locally.
// The only supported action for now is a "ActionPolicyChange".
type fleetGateway struct {
log *logger.Logger
dispatcher dispatcher
client clienter
scheduler scheduler.Scheduler
agentID string
agentInfo agentInfo
done chan struct{}
}

Expand All @@ -36,15 +40,15 @@ type fleetGatewaySettings struct {
func newFleetGateway(
log *logger.Logger,
settings *fleetGatewaySettings,
agentID string,
agentInfo agentInfo,
client clienter,
d dispatcher,
) (*fleetGateway, error) {
scheduler := scheduler.NewPeriodic(settings.Duration)
return newFleetGatewayWithScheduler(
log,
settings,
agentID,
agentInfo,
client,
d,
scheduler,
Expand All @@ -54,7 +58,7 @@ func newFleetGateway(
func newFleetGatewayWithScheduler(
log *logger.Logger,
settings *fleetGatewaySettings,
agentID string,
agentInfo agentInfo,
client clienter,
d dispatcher,
scheduler scheduler.Scheduler,
Expand All @@ -63,7 +67,7 @@ func newFleetGatewayWithScheduler(
log: log,
dispatcher: d,
client: client,
agentID: agentID, //TODO(ph): this need to be a struct.
agentInfo: agentInfo, //TODO(ph): this need to be a struct.
scheduler: scheduler,
done: make(chan struct{}),
}, nil
Expand Down Expand Up @@ -98,7 +102,7 @@ func (f *fleetGateway) worker() {

func (f *fleetGateway) execute() (*fleetapi.CheckinResponse, error) {
// TODO(ph): Aggregates and send events.
cmd := fleetapi.NewCheckinCmd(f.agentID, f.client)
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)

req := &fleetapi.CheckinRequest{}
resp, err := cmd.Execute(req)
Expand Down
16 changes: 10 additions & 6 deletions x-pack/agent/pkg/agent/application/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newTestingDispatcher() *testingDispatcher {

type withGatewayFunc func(*testing.T, *fleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper)

func withGateway(agentID string, fn withGatewayFunc) func(t *testing.T) {
func withGateway(agentInfo agentInfo, fn withGatewayFunc) func(t *testing.T) {
return func(t *testing.T) {
scheduler := scheduler.NewStepper()
client := newTestingClient()
Expand All @@ -92,7 +92,7 @@ func withGateway(agentID string, fn withGatewayFunc) func(t *testing.T) {
gateway, err := newFleetGatewayWithScheduler(
log,
&fleetGatewaySettings{},
agentID,
agentInfo,
client,
dispatcher,
scheduler,
Expand Down Expand Up @@ -132,8 +132,8 @@ func wrapStrToResp(code int, body string) *http.Response {
}

func TestFleetGateway(t *testing.T) {
agentID := "agent-secret"
t.Run("send no event and receive no action", withGateway(agentID, func(
agentInfo := &testAgentInfo{}
t.Run("send no event and receive no action", withGateway(agentInfo, func(
t *testing.T,
gateway *fleetGateway,
client *testingClient,
Expand All @@ -157,7 +157,7 @@ func TestFleetGateway(t *testing.T) {
<-received
}))

t.Run("Successfully connects and receives a series of actions", withGateway(agentID, func(
t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, func(
t *testing.T,
gateway *fleetGateway,
client *testingClient,
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestFleetGateway(t *testing.T) {
gateway, err := newFleetGatewayWithScheduler(
log,
&fleetGatewaySettings{},
agentID,
agentInfo,
client,
dispatcher,
scheduler,
Expand Down Expand Up @@ -248,3 +248,7 @@ func TestFleetGateway(t *testing.T) {
func skip(t *testing.T) {
t.SkipNow()
}

type testAgentInfo struct{}

func (testAgentInfo) AgentID() string { return "agent-secret" }
29 changes: 21 additions & 8 deletions x-pack/agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ var ErrNoConfiguration = errors.New("no configuration found")
// Local represents a standalone agents, that will read his configuration directly from disk.
// Some part of the configuration can be reloaded.
type Local struct {
log *logger.Logger
source source
log *logger.Logger
source source
agentInfo *AgentInfo
}

type source interface {
Expand All @@ -53,7 +54,10 @@ func newLocal(
}
}

agentID := getAgentID()
agentInfo, err := NewAgentInfo()
if err != nil {
return nil, err
}

c := localConfigDefault()
if err := config.Unpack(c); err != nil {
Expand All @@ -62,7 +66,12 @@ func newLocal(

logR := logreporter.NewReporter(log, c.Reporting)

reporter := reporting.NewReporter(log, agentID, logR)
localApplication := &Local{
log: log,
agentInfo: agentInfo,
}

reporter := reporting.NewReporter(log, localApplication.agentInfo, logR)

router, err := newRouter(log, streamFactory(config, nil, reporter))
if err != nil {
Expand All @@ -81,10 +90,9 @@ func newLocal(
cfgSource = newPeriodic(log, c.Reload.Period, discover, emit)
}

return &Local{
log: log,
source: cfgSource,
}, nil
localApplication.source = cfgSource

return localApplication, nil
}

// Start starts a local agent.
Expand All @@ -104,6 +112,11 @@ func (l *Local) Stop() error {
return l.source.Stop()
}

// AgentInfo retrieves agent information.
func (l *Local) AgentInfo() *AgentInfo {
return l.agentInfo
}

func discoverer(patterns ...string) discoverFunc {
var p []string
for _, newP := range patterns {
Expand Down
42 changes: 26 additions & 16 deletions x-pack/agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@ type apiClient interface {
// Managed application, when the application is run in managed mode, most of the configuration are
// coming from the Fleet App.
type Managed struct {
log *logger.Logger
Config FleetAgentConfig
api apiClient
agentID string
gateway *fleetGateway
log *logger.Logger
Config FleetAgentConfig
api apiClient
agentInfo *AgentInfo
gateway *fleetGateway
}

func newManaged(
log *logger.Logger,
rawConfig *config.Config,
) (*Managed, error) {

agentID := getAgentID()
agentInfo, err := NewAgentInfo()
if agentInfo != nil {
return nil, err
}

path := fleetAgentConfigPath()

Expand Down Expand Up @@ -77,7 +80,12 @@ func newManaged(
return nil, errors.Wrap(err, "fail to create API client")
}

reporter, err := createFleetReporters(log, cfg, agentID, client)
managedApplication := &Managed{
log: log,
agentInfo: agentInfo,
}

reporter, err := createFleetReporters(log, cfg, managedApplication.agentInfo, client)
if err != nil {
return nil, errors.Wrap(err, "fail to create reporters")
}
Expand Down Expand Up @@ -107,19 +115,16 @@ func newManaged(
gateway, err := newFleetGateway(
log,
&fleetGatewaySettings{Duration: durationTick},
agentID,
agentInfo,
client,
actionDispatcher,
)
if err != nil {
return nil, err
}

return &Managed{
log: log,
agentID: agentID,
gateway: gateway,
}, nil
managedApplication.gateway = gateway
return managedApplication, nil
}

// Start starts a managed agent.
Expand All @@ -136,19 +141,24 @@ func (m *Managed) Stop() error {
return nil
}

// AgentInfo retrieves agent information.
func (m *Managed) AgentInfo() *AgentInfo {
return m.agentInfo
}

func createFleetReporters(
log *logger.Logger,
cfg *FleetAgentConfig,
agentID string,
agentInfo *AgentInfo,
client apiClient,
) (reporter, error) {

logR := logreporter.NewReporter(log, cfg.Reporting.Log)

fleetR, err := fleetreporter.NewReporter(agentID, log, cfg.Reporting.Fleet, client)
fleetR, err := fleetreporter.NewReporter(agentInfo, log, cfg.Reporting.Fleet, client)
if err != nil {
return nil, err
}

return reporting.NewReporter(log, agentID, logR, fleetR), nil
return reporting.NewReporter(log, agentInfo, logR, fleetR), nil
}
20 changes: 13 additions & 7 deletions x-pack/agent/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/pkg/errors"
)

const checkingPath = "/api/fleet/agents/%s/checkin"

// CheckinRequest consists of multiple events reported to fleet ui.
type CheckinRequest struct {
Events []SerializableEvent `json:"events"`
Expand Down Expand Up @@ -53,17 +55,20 @@ func (e *CheckinResponse) Validate() error {

// CheckinCmd is a fleet API command.
type CheckinCmd struct {
client clienter
checkinPath string
client clienter
info agentInfo
}

type agentInfo interface {
AgentID() string
}

// NewCheckinCmd creates a new api command.
func NewCheckinCmd(agentID string, client clienter) *CheckinCmd {
const p = "/api/fleet/agents/%s/checkin"
func NewCheckinCmd(info agentInfo, client clienter) *CheckinCmd {

return &CheckinCmd{
client: client,
checkinPath: fmt.Sprintf(p, agentID),
client: client,
info: info,
}
}

Expand All @@ -78,7 +83,8 @@ func (e *CheckinCmd) Execute(r *CheckinRequest) (*CheckinResponse, error) {
return nil, errors.Wrap(err, "fail to encode the checkin request")
}

resp, err := e.client.Send("POST", e.checkinPath, nil, nil, bytes.NewBuffer(b))
cp := fmt.Sprintf(checkingPath, e.info.AgentID())
resp, err := e.client.Send("POST", cp, nil, nil, bytes.NewBuffer(b))
if err != nil {
return nil, errors.Wrap(err, "fail to checkin to fleet")
}
Expand Down
Loading