From 0765a1333213dc3067f392bcef32511646d2d91a Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Fri, 29 Nov 2019 13:39:04 -0500 Subject: [PATCH 1/4] Implements the require "Checkin" calls This PR implements the required structs and calls to retrieve the configuration from Fleet. This is step one before refactoring the Reporter to split the pushing event and fetching of configuration. --- x-pack/agent/pkg/fleetapi/actions.go | 164 +++++++++++ x-pack/agent/pkg/fleetapi/checkin_cmd.go | 68 ++--- x-pack/agent/pkg/fleetapi/checkin_cmd_test.go | 266 ++++++++++++++++++ x-pack/agent/pkg/fleetapi/client_test.go | 30 -- x-pack/agent/pkg/fleetapi/custom_types.go | 20 ++ .../agent/pkg/fleetapi/custom_types_test.go | 23 ++ x-pack/agent/pkg/fleetapi/helper_test.go | 61 ++++ x-pack/agent/pkg/reporter/fleet/reporter.go | 41 ++- .../agent/pkg/reporter/fleet/reporter_test.go | 12 +- 9 files changed, 597 insertions(+), 88 deletions(-) create mode 100644 x-pack/agent/pkg/fleetapi/actions.go create mode 100644 x-pack/agent/pkg/fleetapi/checkin_cmd_test.go create mode 100644 x-pack/agent/pkg/fleetapi/custom_types.go create mode 100644 x-pack/agent/pkg/fleetapi/custom_types_test.go create mode 100644 x-pack/agent/pkg/fleetapi/helper_test.go diff --git a/x-pack/agent/pkg/fleetapi/actions.go b/x-pack/agent/pkg/fleetapi/actions.go new file mode 100644 index 000000000000..2f02b25335d0 --- /dev/null +++ b/x-pack/agent/pkg/fleetapi/actions.go @@ -0,0 +1,164 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetapi + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/pkg/errors" +) + +// Action base interface for all the implemented action from the fleet API. +type Action interface { + fmt.Stringer + Type() string + ID() string +} + +// BaseAction is the base of all actions to be executed. +type BaseAction struct { + ActionID string + ActionType string +} + +// Type returns the action type. +func (a *BaseAction) Type() string { + return a.ActionType +} + +// ID returns the action ID. +func (a *BaseAction) ID() string { + return a.ActionID +} + +// UnknownAction is an action that is not know by the current version of the Agent and we don't want +// to return an error at parsing time but at execution time we can report or ignore. +// +// NOTE: We only keep the original type and the action id, the payload of the event is dropped, we +// do this to make sure we do not leak any unwanted information. +type UnknownAction struct { + *BaseAction + originalType string +} + +// Type returns the type of the Action. +func (a *UnknownAction) Type() string { + return "UNKNOWN" +} + +func (a *UnknownAction) String() string { + var s strings.Builder + s.WriteString("action_id: ") + s.WriteString(a.ID()) + s.WriteString(", type: ") + s.WriteString(a.Type()) + s.WriteString(" (original type: ") + s.WriteString(a.OriginalType()) + s.WriteString(")") + return s.String() +} + +// OriginalType returns the original type of the action as returned by the API. +func (a *UnknownAction) OriginalType() string { + return a.originalType +} + +// PolicyChangeAction is a request to apply a new +type PolicyChangeAction struct { + *BaseAction + Policy map[string]interface{} `json:"policy"` +} + +func (a *PolicyChangeAction) String() string { + var s strings.Builder + s.WriteString("action_id: ") + s.WriteString(a.ID()) + s.WriteString(", type: ") + s.WriteString(a.Type()) + return s.String() +} + +// Actions is a list of Actions to executes and allow to unmarshal heterogenous action type. +type Actions []Action + +// UnmarshalJSON takes every raw representation of an action and try to decode them. +func (a *Actions) UnmarshalJSON(data []byte) error { + type r struct { + ActionType string `json:"type"` + ActionID string `json:"id"` + Data json.RawMessage `json:"data"` + } + + var responses []r + + if err := json.Unmarshal(data, &responses); err != nil { + return errors.Wrap(err, "fail to decode actions") + } + + actions := make([]Action, 0, len(responses)) + var action Action + + for _, response := range responses { + switch response.ActionType { + case "POLICY_CHANGE": + action = &PolicyChangeAction{ + BaseAction: &BaseAction{ + ActionID: response.ActionID, + ActionType: response.ActionType, + }, + } + if err := json.Unmarshal(response.Data, action); err != nil { + return errors.Wrap(err, "fail to decode POLICY_CHANGE action") + } + default: + action = &UnknownAction{ + BaseAction: &BaseAction{ActionID: response.ActionID, ActionType: "UNKNOWN"}, + originalType: response.ActionType, + } + } + actions = append(actions, action) + } + + *a = actions + return nil +} + +// AckedAction represents a event to be send to the next checkin that will Ack an action. +type AckedAction struct { + EventType string `json:"type"` + ActionID string `json:"action_id"` + Ts time.Time `json:"timestamp"` + Msg string `json:"message,omitempty"` +} + +// Type return the type of event. +func (a *AckedAction) Type() string { + return a.EventType +} + +// Timestamp return when the event was created. +func (a *AckedAction) Timestamp() time.Time { + return a.Ts +} + +// Message returns the human readable string describing the event. +func (a *AckedAction) Message() string { + return a.Msg +} + +// Ack returns an event that represent an acked action. +func Ack(action Action) *AckedAction { + const t = "ACTION_ACKNOWLEDGED" + + return &AckedAction{ + EventType: t, + ActionID: action.ID(), + Msg: "Acknowledge action " + action.ID(), + Ts: time.Now(), + } +} diff --git a/x-pack/agent/pkg/fleetapi/checkin_cmd.go b/x-pack/agent/pkg/fleetapi/checkin_cmd.go index 61d4f8696fc7..abaa92d1afa1 100644 --- a/x-pack/agent/pkg/fleetapi/checkin_cmd.go +++ b/x-pack/agent/pkg/fleetapi/checkin_cmd.go @@ -8,67 +8,47 @@ import ( "bytes" "encoding/json" "fmt" + "net/http" + "time" "github.com/pkg/errors" ) // CheckinRequest consists of multiple events reported to fleet ui. -// -// Example: -// POST /api/fleet/agents/a4937110-e53e-11e9-934f-47a8e38a522c/checkin -// { -// "events": [{ -// "type": "STATE", -// "subtype": "STARTING", -// "message": "state changed from STOPPED to STARTING", -// "timestamp": "2019-10-01T13:42:54.323Z", -// "payload": {}, -// "data": "{}" -// }] -// } type CheckinRequest struct { - Events []Event `json:"events"` + Events []SerializableEvent `json:"events"` } -// Event is a single event out of collection of reported events. -type Event struct { - EventType string `json:"type"` - Timestamp string `json:"timestamp"` - SubType string `json:"subtype"` - Message string `json:"message"` - Payload map[string]interface{} `json:"payload,omitempty"` - Data string `json:"data,omitempty"` +// SerializableEvent is a representation of the event to be send to the Fleet API via the checkin +// endpoint, we are liberal into what we accept to be send you only need a type and be able to be +// serialized into JSON. +type SerializableEvent interface { + // Type return the type of the event, this must be included in the serialized document. + Type() string + + // Timestamp is used to keep track when the event was created in the system. + Timestamp() time.Time + + // Message is a human readable string to explain what the event does, this would be displayed in + // the UI as a string of text. + Message() string } // Validate validates the enrollment request before sending it to the API. func (e *CheckinRequest) Validate() error { - if len(e.Events) == 0 { - return errors.New("no events to report") - } - return nil } -// CheckinResponse is a fleets response to checking API request. -// -// Example: -// { -// "action": "checkin", -// "success": true, -// "policy": { -// }, -// "actions": [] -// } +// CheckinResponse is the response send back from the server which contains all the action that +// need to be executed or proxy to running processes. type CheckinResponse struct { - Action string `json:"action"` - Success bool `json:"success"` + Actions Actions `json:"actions"` + Success bool `json:"success"` } // Validate validates the response send from the server. func (e *CheckinResponse) Validate() error { - var err error - - return err + return nil } // CheckinCmd is a fleet API command. @@ -100,10 +80,14 @@ func (e *CheckinCmd) Execute(r *CheckinRequest) (*CheckinResponse, error) { resp, err := e.client.Send("POST", e.checkinPath, nil, nil, bytes.NewBuffer(b)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "fail to checkin to fleet") } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, extract(resp.Body) + } + checkinResponse := &CheckinResponse{} decoder := json.NewDecoder(resp.Body) if err := decoder.Decode(checkinResponse); err != nil { diff --git a/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go new file mode 100644 index 000000000000..f7fbb3b98bd9 --- /dev/null +++ b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go @@ -0,0 +1,266 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetapi + +import ( + "encoding/json" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCheckin(t *testing.T) { + const agentID = "bob" + const withAPIKey = "secret" + + t.Run("Send back status of actions", withServerWithAuthClient( + func(t *testing.T) *http.ServeMux { + raw := ` +{ + "actions": [], + "success": true +} +` + mux := http.NewServeMux() + path := fmt.Sprintf("/api/fleet/agents/%s/checkin", agentID) + mux.HandleFunc(path, authHandler(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + + type E struct { + ActionID string `json:"action_id"` + Type string `json:"type"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` + } + + responses := struct { + Events []E `json:"events"` + }{} + + decoder := json.NewDecoder(r.Body) + defer r.Body.Close() + + err := decoder.Decode(&responses) + require.NoError(t, err) + + require.Equal(t, 1, len(responses.Events)) + + e := responses.Events[0] + require.Equal(t, "my-id", e.ActionID) + require.Equal(t, "ACTION_ACKNOWLEDGED", e.Type) + require.Equal(t, "Acknowledge action my-id", e.Message) + + fmt.Fprintf(w, raw) + }, withAPIKey)) + return mux + }, withAPIKey, + func(t *testing.T, client clienter) { + action := &PolicyChangeAction{ + BaseAction: &BaseAction{ + ActionID: "my-id", + ActionType: "POLICY_CHANGE", + }, + Policy: map[string]interface{}{ + "id": "policy_id", + }, + } + + cmd := NewCheckinCmd(agentID, client) + + request := CheckinRequest{ + Events: []SerializableEvent{ + Ack(action), + }, + } + + r, err := cmd.Execute(&request) + require.NoError(t, err) + require.True(t, r.Success) + + require.Equal(t, 0, len(r.Actions)) + }, + )) + + t.Run("Propagate any errors from the server", withServerWithAuthClient( + func(t *testing.T) *http.ServeMux { + raw := ` +Something went wrong +} +` + mux := http.NewServeMux() + path := fmt.Sprintf("/api/fleet/agents/%s/checkin", agentID) + mux.HandleFunc(path, authHandler(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, raw) + }, withAPIKey)) + return mux + }, withAPIKey, + func(t *testing.T, client clienter) { + cmd := NewCheckinCmd(agentID, client) + + request := CheckinRequest{} + + _, err := cmd.Execute(&request) + require.Error(t, err) + }, + )) + + t.Run("Checkin receives a PolicyChange", withServerWithAuthClient( + func(t *testing.T) *http.ServeMux { + raw := ` +{ + "actions": [ + { + "type": "POLICY_CHANGE", + "id": "id1", + "data": { + "policy": { + "id": "policy-id", + "outputs": { + "default": { + "hosts": "https://localhost:9200" + } + }, + "streams": [ + { + "id": "string", + "type": "logs", + "path": "/var/log/hello.log", + "output": { + "use_output": "default" + } + } + ] + } + } + } + ], + "success": true +} +` + mux := http.NewServeMux() + path := fmt.Sprintf("/api/fleet/agents/%s/checkin", agentID) + mux.HandleFunc(path, authHandler(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, raw) + }, withAPIKey)) + return mux + }, withAPIKey, + func(t *testing.T, client clienter) { + cmd := NewCheckinCmd(agentID, client) + + request := CheckinRequest{} + + r, err := cmd.Execute(&request) + require.NoError(t, err) + require.True(t, r.Success) + + require.Equal(t, 1, len(r.Actions)) + + // PolicyChangeAction + require.Equal(t, "id1", r.Actions[0].ID()) + require.Equal(t, "POLICY_CHANGE", r.Actions[0].Type()) + }, + )) + + t.Run("Checkin receives known and unknown action type", withServerWithAuthClient( + func(t *testing.T) *http.ServeMux { + raw := ` +{ + "actions": [ + { + "type": "POLICY_CHANGE", + "id": "id1", + "data": { + "policy": { + "id": "policy-id", + "outputs": { + "default": { + "hosts": "https://localhost:9200" + } + }, + "streams": [ + { + "id": "string", + "type": "logs", + "path": "/var/log/hello.log", + "output": { + "use_output": "default" + } + } + ] + } + } + }, + { + "type": "WHAT_TO_DO_WITH_IT", + "id": "id2" + } + ], + "success": true +} +` + mux := http.NewServeMux() + path := fmt.Sprintf("/api/fleet/agents/%s/checkin", agentID) + mux.HandleFunc(path, authHandler(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, raw) + }, withAPIKey)) + return mux + }, withAPIKey, + func(t *testing.T, client clienter) { + cmd := NewCheckinCmd(agentID, client) + + request := CheckinRequest{} + + r, err := cmd.Execute(&request) + require.NoError(t, err) + require.True(t, r.Success) + + require.Equal(t, 2, len(r.Actions)) + + // PolicyChangeAction + require.Equal(t, "id1", r.Actions[0].ID()) + require.Equal(t, "POLICY_CHANGE", r.Actions[0].Type()) + + // UnknownAction + require.Equal(t, "id2", r.Actions[1].ID()) + require.Equal(t, "UNKNOWN", r.Actions[1].Type()) + require.Equal(t, "WHAT_TO_DO_WITH_IT", r.Actions[1].(*UnknownAction).OriginalType()) + }, + )) + + t.Run("When we receive no action", withServerWithAuthClient( + func(t *testing.T) *http.ServeMux { + raw := ` +{ + "actions": [], + "success": true +} +` + mux := http.NewServeMux() + path := fmt.Sprintf("/api/fleet/agents/%s/checkin", agentID) + mux.HandleFunc(path, authHandler(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, raw) + }, withAPIKey)) + return mux + }, withAPIKey, + func(t *testing.T, client clienter) { + cmd := NewCheckinCmd(agentID, client) + + request := CheckinRequest{} + + r, err := cmd.Execute(&request) + require.NoError(t, err) + require.True(t, r.Success) + + require.Equal(t, 0, len(r.Actions)) + }, + )) +} diff --git a/x-pack/agent/pkg/fleetapi/client_test.go b/x-pack/agent/pkg/fleetapi/client_test.go index 28e383392b15..fffc89556640 100644 --- a/x-pack/agent/pkg/fleetapi/client_test.go +++ b/x-pack/agent/pkg/fleetapi/client_test.go @@ -7,9 +7,7 @@ package fleetapi import ( "fmt" "io/ioutil" - "net" "net/http" - "strconv" "strings" "testing" @@ -105,34 +103,6 @@ func TestHTTPClient(t *testing.T) { )) } -func authHandler(handler http.HandlerFunc, apiKey string) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - const key = "Authorization" - const prefix = "ApiKey " - - v := strings.TrimPrefix(r.Header.Get(key), prefix) - if v != apiKey { - http.Error(w, "Unauthorized", http.StatusUnauthorized) - return - } - handler(w, r) - } -} - -func withServer(m func(t *testing.T) *http.ServeMux, test func(t *testing.T, host string)) func(t *testing.T) { - return func(t *testing.T) { - listener, err := net.Listen("tcp", ":0") - require.NoError(t, err) - defer listener.Close() - - port := listener.Addr().(*net.TCPAddr).Port - - go http.Serve(listener, m(t)) - - test(t, "localhost:"+strconv.Itoa(port)) - } -} - // NOTE(ph): Usually I would be agaisnt testing private methods as much as possible but in this // case since we might deal with different format or error I make sense to test this method in // isolation. diff --git a/x-pack/agent/pkg/fleetapi/custom_types.go b/x-pack/agent/pkg/fleetapi/custom_types.go new file mode 100644 index 000000000000..00532d159ab9 --- /dev/null +++ b/x-pack/agent/pkg/fleetapi/custom_types.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetapi + +import ( + "encoding/json" + "time" +) + +const timeFormat = time.RFC3339Nano + +// Time is a custom time that impose the serialization format. +type Time time.Time + +// MarshalJSON make sure that all the times are serialized with the RFC3339 format. +func (t Time) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Time(t).Format(timeFormat)) +} diff --git a/x-pack/agent/pkg/fleetapi/custom_types_test.go b/x-pack/agent/pkg/fleetapi/custom_types_test.go new file mode 100644 index 000000000000..db38af6af889 --- /dev/null +++ b/x-pack/agent/pkg/fleetapi/custom_types_test.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetapi + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestTimeSerialized(t *testing.T) { + then := time.Date( + 2020, 1, 8, 6, 30, 00, 651387237, time.UTC) + + b, err := json.Marshal(Time(then)) + require.NoError(t, err) + + require.Equal(t, "\"2020-01-08T06:30:00.651387237Z\"", string(b)) +} diff --git a/x-pack/agent/pkg/fleetapi/helper_test.go b/x-pack/agent/pkg/fleetapi/helper_test.go new file mode 100644 index 000000000000..7c56d6e99560 --- /dev/null +++ b/x-pack/agent/pkg/fleetapi/helper_test.go @@ -0,0 +1,61 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fleetapi + +import ( + "net" + "net/http" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/x-pack/agent/pkg/config" +) + +func authHandler(handler http.HandlerFunc, apiKey string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + const key = "Authorization" + const prefix = "ApiKey " + + v := strings.TrimPrefix(r.Header.Get(key), prefix) + if v != apiKey { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + handler(w, r) + } +} + +func withServer(m func(t *testing.T) *http.ServeMux, test func(t *testing.T, host string)) func(t *testing.T) { + return func(t *testing.T) { + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + defer listener.Close() + + port := listener.Addr().(*net.TCPAddr).Port + + go http.Serve(listener, m(t)) + + test(t, "localhost:"+strconv.Itoa(port)) + } +} + +func withServerWithAuthClient( + m func(t *testing.T) *http.ServeMux, + accessToken string, + test func(t *testing.T, client clienter), +) func(t *testing.T) { + + return withServer(m, func(t *testing.T, host string) { + cfg := config.MustNewConfigFrom(map[string]interface{}{ + "host": host, + }) + client, err := NewAuthWithConfig(nil, cfg, accessToken) + require.NoError(t, err) + test(t, client) + }) +} diff --git a/x-pack/agent/pkg/reporter/fleet/reporter.go b/x-pack/agent/pkg/reporter/fleet/reporter.go index 6043bd90e454..f6a7699f7479 100644 --- a/x-pack/agent/pkg/reporter/fleet/reporter.go +++ b/x-pack/agent/pkg/reporter/fleet/reporter.go @@ -18,9 +18,29 @@ import ( const ( defaultThreshold = 1000 - timeFormat = time.RFC3339 ) +type event struct { + EventType string `json:"type"` + Ts fleetapi.Time `json:"timestamp"` + SubType string `json:"subtype"` + Msg string `json:"message"` + Payload map[string]interface{} `json:"payload,omitempty"` + Data string `json:"data,omitempty"` +} + +func (e *event) Type() string { + return e.EventType +} + +func (e *event) Timestamp() time.Time { + return time.Time(e.Ts) +} + +func (e *event) Message() string { + return e.Msg +} + type checkinExecutor interface { Execute(r *fleetapi.CheckinRequest) (*fleetapi.CheckinResponse, error) } @@ -126,18 +146,19 @@ func (r *Reporter) queueCopy() []reporter.Event { func (r *Reporter) reportBatch(ee []reporter.Event) error { req := &fleetapi.CheckinRequest{ - Events: make([]fleetapi.Event, 0, len(ee)), + Events: make([]fleetapi.SerializableEvent, 0, len(ee)), } for _, e := range ee { - req.Events = append(req.Events, fleetapi.Event{ - EventType: e.Type(), - Timestamp: e.Time().Format(timeFormat), - SubType: e.SubType(), - Message: e.Message(), - Payload: e.Payload(), - Data: e.Data(), - }) + req.Events = append(req.Events, + &event{ + EventType: e.Type(), + Ts: fleetapi.Time(e.Time()), + SubType: e.SubType(), + Msg: e.Message(), + Payload: e.Payload(), + Data: e.Data(), + }) } _, err := r.checkingCmd.Execute(req) diff --git a/x-pack/agent/pkg/reporter/fleet/reporter_test.go b/x-pack/agent/pkg/reporter/fleet/reporter_test.go index 4cda5cea4825..12a1f9c6e5b3 100644 --- a/x-pack/agent/pkg/reporter/fleet/reporter_test.go +++ b/x-pack/agent/pkg/reporter/fleet/reporter_test.go @@ -73,19 +73,19 @@ func TestInfoDrop(t *testing.T) { } // check both are errors - if reportedEvents[0].EventType != reportedEvents[1].EventType || reportedEvents[0].EventType != reporter.EventTypeError { - t.Fatalf("expected ERROR events got [1]: '%v', [2]: '%v'", reportedEvents[0].EventType, reportedEvents[1].EventType) + if reportedEvents[0].Type() != reportedEvents[1].Type() || reportedEvents[0].Type() != reporter.EventTypeError { + t.Fatalf("expected ERROR events got [1]: '%v', [2]: '%v'", reportedEvents[0].Type(), reportedEvents[1].Type()) } } type testClient struct { - reportedEvents []fleetapi.Event + reportedEvents []fleetapi.SerializableEvent lock sync.Mutex } func newTestClient() *testClient { return &testClient{ - reportedEvents: make([]fleetapi.Event, 0), + reportedEvents: make([]fleetapi.SerializableEvent, 0), } } @@ -100,10 +100,10 @@ func (tc *testClient) Execute(r *fleetapi.CheckinRequest) (*fleetapi.CheckinResp func (tc *testClient) reset() { tc.lock.Lock() defer tc.lock.Unlock() - tc.reportedEvents = make([]fleetapi.Event, 0) + tc.reportedEvents = make([]fleetapi.SerializableEvent, 0) } -func (tc *testClient) events() []fleetapi.Event { +func (tc *testClient) events() []fleetapi.SerializableEvent { tc.lock.Lock() defer tc.lock.Unlock() From 5a66e3292b8f96398395da5f9bf9b48dbf2703c8 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 3 Dec 2019 13:37:59 -0500 Subject: [PATCH 2/4] renaming stuff --- .../pkg/fleetapi/{actions.go => action.go} | 68 --------------- .../{custom_types.go => custom_type.go} | 0 ...stom_types_test.go => custom_type_test.go} | 0 x-pack/agent/pkg/fleetapi/reply.go | 82 +++++++++++++++++++ 4 files changed, 82 insertions(+), 68 deletions(-) rename x-pack/agent/pkg/fleetapi/{actions.go => action.go} (55%) rename x-pack/agent/pkg/fleetapi/{custom_types.go => custom_type.go} (100%) rename x-pack/agent/pkg/fleetapi/{custom_types_test.go => custom_type_test.go} (100%) create mode 100644 x-pack/agent/pkg/fleetapi/reply.go diff --git a/x-pack/agent/pkg/fleetapi/actions.go b/x-pack/agent/pkg/fleetapi/action.go similarity index 55% rename from x-pack/agent/pkg/fleetapi/actions.go rename to x-pack/agent/pkg/fleetapi/action.go index 2f02b25335d0..2056dad5bd25 100644 --- a/x-pack/agent/pkg/fleetapi/actions.go +++ b/x-pack/agent/pkg/fleetapi/action.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "strings" - "time" "github.com/pkg/errors" ) @@ -36,38 +35,6 @@ func (a *BaseAction) ID() string { return a.ActionID } -// UnknownAction is an action that is not know by the current version of the Agent and we don't want -// to return an error at parsing time but at execution time we can report or ignore. -// -// NOTE: We only keep the original type and the action id, the payload of the event is dropped, we -// do this to make sure we do not leak any unwanted information. -type UnknownAction struct { - *BaseAction - originalType string -} - -// Type returns the type of the Action. -func (a *UnknownAction) Type() string { - return "UNKNOWN" -} - -func (a *UnknownAction) String() string { - var s strings.Builder - s.WriteString("action_id: ") - s.WriteString(a.ID()) - s.WriteString(", type: ") - s.WriteString(a.Type()) - s.WriteString(" (original type: ") - s.WriteString(a.OriginalType()) - s.WriteString(")") - return s.String() -} - -// OriginalType returns the original type of the action as returned by the API. -func (a *UnknownAction) OriginalType() string { - return a.originalType -} - // PolicyChangeAction is a request to apply a new type PolicyChangeAction struct { *BaseAction @@ -127,38 +94,3 @@ func (a *Actions) UnmarshalJSON(data []byte) error { *a = actions return nil } - -// AckedAction represents a event to be send to the next checkin that will Ack an action. -type AckedAction struct { - EventType string `json:"type"` - ActionID string `json:"action_id"` - Ts time.Time `json:"timestamp"` - Msg string `json:"message,omitempty"` -} - -// Type return the type of event. -func (a *AckedAction) Type() string { - return a.EventType -} - -// Timestamp return when the event was created. -func (a *AckedAction) Timestamp() time.Time { - return a.Ts -} - -// Message returns the human readable string describing the event. -func (a *AckedAction) Message() string { - return a.Msg -} - -// Ack returns an event that represent an acked action. -func Ack(action Action) *AckedAction { - const t = "ACTION_ACKNOWLEDGED" - - return &AckedAction{ - EventType: t, - ActionID: action.ID(), - Msg: "Acknowledge action " + action.ID(), - Ts: time.Now(), - } -} diff --git a/x-pack/agent/pkg/fleetapi/custom_types.go b/x-pack/agent/pkg/fleetapi/custom_type.go similarity index 100% rename from x-pack/agent/pkg/fleetapi/custom_types.go rename to x-pack/agent/pkg/fleetapi/custom_type.go diff --git a/x-pack/agent/pkg/fleetapi/custom_types_test.go b/x-pack/agent/pkg/fleetapi/custom_type_test.go similarity index 100% rename from x-pack/agent/pkg/fleetapi/custom_types_test.go rename to x-pack/agent/pkg/fleetapi/custom_type_test.go diff --git a/x-pack/agent/pkg/fleetapi/reply.go b/x-pack/agent/pkg/fleetapi/reply.go new file mode 100644 index 000000000000..b4c9a5300206 --- /dev/null +++ b/x-pack/agent/pkg/fleetapi/reply.go @@ -0,0 +1,82 @@ +package fleetapi + +import ( + "strings" + "time" +) + +// UnknownAction is an action that is not know by the current version of the Agent and we don't want +// to return an error at parsing time but at execution time we can report or ignore. +// +// NOTE: We only keep the original type and the action id, the payload of the event is dropped, we +// do this to make sure we do not leak any unwanted information. +type UnknownAction struct { + *BaseAction + originalType string +} + +// Type returns the type of the Action. +func (a *UnknownAction) Type() string { + return "Action" +} + +// SubType returns the subtype of the action. +func (a *UnknownAction) SubType() { + return "UNKNOWN" +} + +func (a *UnknownAction) String() string { + var s strings.Builder + s.WriteString("action_id: ") + s.WriteString(a.ID()) + s.WriteString(", type: ") + s.WriteString(a.Type()) + s.WriteString(", sub type: ") + s.WriteString(a.SubType()) + s.WriteString(" (original type: ") + s.WriteString(a.OriginalType()) + s.WriteString(")") + return s.String() +} + +// OriginalType returns the original type of the action as returned by the API. +func (a *UnknownAction) OriginalType() string { + return a.originalType +} + +// AckedAction represents a event to be send to the next checkin that will Ack an action. +type AckedAction struct { + EventType string `json:"type"` + ActionID string `json:"action_id"` + Ts time.Time `json:"timestamp"` + Msg string `json:"message,omitempty"` +} + +// Type return the type of event. +func (a *AckedAction) Type() string { + return a.EventType +} + +// Timestamp return when the event was created. +func (a *AckedAction) Timestamp() time.Time { + return a.Ts +} + +// Message returns the human readable string describing the event. +func (a *AckedAction) Message() string { + return a.Msg +} + +// Ack returns an event that represent an acked action. +func Ack(action Action) *AckedAction { + const t = "ACTION_ACKNOWLEDGED" + const st = "ACKNOWLEDGED" + + return &AckedAction{ + EventType: t, + EventSubType: st, + ActionID: action.ID(), + Msg: "Acknowledge action " + action.ID(), + Ts: time.Now(), + } +} From f1d4d1c85dd4ecba4477d7037b4b11477d7ad7ae Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 3 Dec 2019 15:42:10 -0500 Subject: [PATCH 3/4] clean up the acked --- x-pack/agent/pkg/fleetapi/action.go | 32 ++++++ x-pack/agent/pkg/fleetapi/checkin_cmd_test.go | 4 +- x-pack/agent/pkg/fleetapi/reply.go | 103 +++++++----------- x-pack/agent/pkg/fleetapi/round_trippers.go | 1 - 4 files changed, 77 insertions(+), 63 deletions(-) diff --git a/x-pack/agent/pkg/fleetapi/action.go b/x-pack/agent/pkg/fleetapi/action.go index 2056dad5bd25..6ea1774cad71 100644 --- a/x-pack/agent/pkg/fleetapi/action.go +++ b/x-pack/agent/pkg/fleetapi/action.go @@ -35,6 +35,38 @@ func (a *BaseAction) ID() string { return a.ActionID } +// UnknownAction is an action that is not know by the current version of the Agent and we don't want +// to return an error at parsing time but at execution time we can report or ignore. +// +// NOTE: We only keep the original type and the action id, the payload of the event is dropped, we +// do this to make sure we do not leak any unwanted information. +type UnknownAction struct { + *BaseAction + originalType string +} + +// Type returns the type of the Action. +func (a *UnknownAction) Type() string { + return "UNKNOWN" +} + +func (a *UnknownAction) String() string { + var s strings.Builder + s.WriteString("action_id: ") + s.WriteString(a.ID()) + s.WriteString(", type: ") + s.WriteString(a.Type()) + s.WriteString(" (original type: ") + s.WriteString(a.OriginalType()) + s.WriteString(")") + return s.String() +} + +// OriginalType returns the original type of the action as returned by the API. +func (a *UnknownAction) OriginalType() string { + return a.originalType +} + // PolicyChangeAction is a request to apply a new type PolicyChangeAction struct { *BaseAction diff --git a/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go index f7fbb3b98bd9..538b26937c25 100644 --- a/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go +++ b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go @@ -34,6 +34,7 @@ func TestCheckin(t *testing.T) { type E struct { ActionID string `json:"action_id"` Type string `json:"type"` + SubType string `json:"subtype"` Message string `json:"message"` Timestamp time.Time `json:"timestamp"` } @@ -52,7 +53,8 @@ func TestCheckin(t *testing.T) { e := responses.Events[0] require.Equal(t, "my-id", e.ActionID) - require.Equal(t, "ACTION_ACKNOWLEDGED", e.Type) + require.Equal(t, "ACTION", e.Type) + require.Equal(t, "ACKNOWLEDGED", e.SubType) require.Equal(t, "Acknowledge action my-id", e.Message) fmt.Fprintf(w, raw) diff --git a/x-pack/agent/pkg/fleetapi/reply.go b/x-pack/agent/pkg/fleetapi/reply.go index b4c9a5300206..36bd8c15861c 100644 --- a/x-pack/agent/pkg/fleetapi/reply.go +++ b/x-pack/agent/pkg/fleetapi/reply.go @@ -1,82 +1,63 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package fleetapi import ( - "strings" + "encoding/json" "time" ) -// UnknownAction is an action that is not know by the current version of the Agent and we don't want -// to return an error at parsing time but at execution time we can report or ignore. -// -// NOTE: We only keep the original type and the action id, the payload of the event is dropped, we -// do this to make sure we do not leak any unwanted information. -type UnknownAction struct { - *BaseAction - originalType string -} - -// Type returns the type of the Action. -func (a *UnknownAction) Type() string { - return "Action" -} - -// SubType returns the subtype of the action. -func (a *UnknownAction) SubType() { - return "UNKNOWN" -} - -func (a *UnknownAction) String() string { - var s strings.Builder - s.WriteString("action_id: ") - s.WriteString(a.ID()) - s.WriteString(", type: ") - s.WriteString(a.Type()) - s.WriteString(", sub type: ") - s.WriteString(a.SubType()) - s.WriteString(" (original type: ") - s.WriteString(a.OriginalType()) - s.WriteString(")") - return s.String() +// ReplyAckedAction acks a received action from a checkin call. +type ReplyAckedAction struct { + ActionID string + Ts Time } -// OriginalType returns the original type of the action as returned by the API. -func (a *UnknownAction) OriginalType() string { - return a.originalType -} - -// AckedAction represents a event to be send to the next checkin that will Ack an action. -type AckedAction struct { - EventType string `json:"type"` - ActionID string `json:"action_id"` - Ts time.Time `json:"timestamp"` - Msg string `json:"message,omitempty"` +// Type return the type of event. +func (a *ReplyAckedAction) Type() string { + return "ACTION" } -// Type return the type of event. -func (a *AckedAction) Type() string { - return a.EventType +// SubType returns "ACKNOWLEDGED". +func (a *ReplyAckedAction) SubType() string { + return "ACKNOWLEDGED" } // Timestamp return when the event was created. -func (a *AckedAction) Timestamp() time.Time { - return a.Ts +func (a *ReplyAckedAction) Timestamp() time.Time { + return time.Time(a.Ts) } // Message returns the human readable string describing the event. -func (a *AckedAction) Message() string { - return a.Msg +func (a *ReplyAckedAction) Message() string { + return "Acknowledge action " + a.ActionID +} + +// MarshalJSON custom serialization for an ReplyAckedAction. +func (a *ReplyAckedAction) MarshalJSON() ([]byte, error) { + e := struct { + Type string `json:"type"` + Subtype string `json:"subtype"` + ActionID string `json:"action_id"` + Ts time.Time `json:"timestamp"` + Msg string `json:"message,omitempty"` + }{ + Type: a.Type(), + Subtype: a.SubType(), + ActionID: a.ActionID, + Msg: a.Message(), + Ts: a.Timestamp(), + } + + return json.Marshal(e) } // Ack returns an event that represent an acked action. -func Ack(action Action) *AckedAction { - const t = "ACTION_ACKNOWLEDGED" - const st = "ACKNOWLEDGED" - - return &AckedAction{ - EventType: t, - EventSubType: st, - ActionID: action.ID(), - Msg: "Acknowledge action " + action.ID(), - Ts: time.Now(), +func Ack(action Action) *ReplyAckedAction { + return &ReplyAckedAction{ + ActionID: action.ID(), + Ts: Time(time.Now()), } } diff --git a/x-pack/agent/pkg/fleetapi/round_trippers.go b/x-pack/agent/pkg/fleetapi/round_trippers.go index 0f8a904e2484..8f96c27d5b57 100644 --- a/x-pack/agent/pkg/fleetapi/round_trippers.go +++ b/x-pack/agent/pkg/fleetapi/round_trippers.go @@ -48,7 +48,6 @@ func (r *FleetAuthRoundTripper) RoundTrip(req *http.Request) (*http.Response, er req.Header.Set(key, prefix+r.apiKey) resp, err := r.rt.RoundTrip(req) - if resp.StatusCode == http.StatusUnauthorized { defer resp.Body.Close() return resp, ErrInvalidAPIKey From 221b7045286d3b8e5e04232214e9d22fc6091bbd Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 3 Dec 2019 15:46:21 -0500 Subject: [PATCH 4/4] Rename Action* instead of *Action to have a clearer usage of the struct. --- x-pack/agent/pkg/fleetapi/action.go | 36 +++++++++---------- x-pack/agent/pkg/fleetapi/checkin_cmd_test.go | 10 +++--- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/x-pack/agent/pkg/fleetapi/action.go b/x-pack/agent/pkg/fleetapi/action.go index 6ea1774cad71..5f8006e7be0e 100644 --- a/x-pack/agent/pkg/fleetapi/action.go +++ b/x-pack/agent/pkg/fleetapi/action.go @@ -19,38 +19,38 @@ type Action interface { ID() string } -// BaseAction is the base of all actions to be executed. -type BaseAction struct { +// ActionBase is the base of all actions to be executed. +type ActionBase struct { ActionID string ActionType string } // Type returns the action type. -func (a *BaseAction) Type() string { +func (a *ActionBase) Type() string { return a.ActionType } // ID returns the action ID. -func (a *BaseAction) ID() string { +func (a *ActionBase) ID() string { return a.ActionID } -// UnknownAction is an action that is not know by the current version of the Agent and we don't want +// ActionUnknown is an action that is not know by the current version of the Agent and we don't want // to return an error at parsing time but at execution time we can report or ignore. // // NOTE: We only keep the original type and the action id, the payload of the event is dropped, we // do this to make sure we do not leak any unwanted information. -type UnknownAction struct { - *BaseAction +type ActionUnknown struct { + *ActionBase originalType string } // Type returns the type of the Action. -func (a *UnknownAction) Type() string { +func (a *ActionUnknown) Type() string { return "UNKNOWN" } -func (a *UnknownAction) String() string { +func (a *ActionUnknown) String() string { var s strings.Builder s.WriteString("action_id: ") s.WriteString(a.ID()) @@ -63,17 +63,17 @@ func (a *UnknownAction) String() string { } // OriginalType returns the original type of the action as returned by the API. -func (a *UnknownAction) OriginalType() string { +func (a *ActionUnknown) OriginalType() string { return a.originalType } -// PolicyChangeAction is a request to apply a new -type PolicyChangeAction struct { - *BaseAction +// ActionPolicyChange is a request to apply a new +type ActionPolicyChange struct { + *ActionBase Policy map[string]interface{} `json:"policy"` } -func (a *PolicyChangeAction) String() string { +func (a *ActionPolicyChange) String() string { var s strings.Builder s.WriteString("action_id: ") s.WriteString(a.ID()) @@ -105,8 +105,8 @@ func (a *Actions) UnmarshalJSON(data []byte) error { for _, response := range responses { switch response.ActionType { case "POLICY_CHANGE": - action = &PolicyChangeAction{ - BaseAction: &BaseAction{ + action = &ActionPolicyChange{ + ActionBase: &ActionBase{ ActionID: response.ActionID, ActionType: response.ActionType, }, @@ -115,8 +115,8 @@ func (a *Actions) UnmarshalJSON(data []byte) error { return errors.Wrap(err, "fail to decode POLICY_CHANGE action") } default: - action = &UnknownAction{ - BaseAction: &BaseAction{ActionID: response.ActionID, ActionType: "UNKNOWN"}, + action = &ActionUnknown{ + ActionBase: &ActionBase{ActionID: response.ActionID, ActionType: "UNKNOWN"}, originalType: response.ActionType, } } diff --git a/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go index 538b26937c25..be3d59c9100a 100644 --- a/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go +++ b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go @@ -62,8 +62,8 @@ func TestCheckin(t *testing.T) { return mux }, withAPIKey, func(t *testing.T, client clienter) { - action := &PolicyChangeAction{ - BaseAction: &BaseAction{ + action := &ActionPolicyChange{ + ActionBase: &ActionBase{ ActionID: "my-id", ActionType: "POLICY_CHANGE", }, @@ -164,7 +164,7 @@ Something went wrong require.Equal(t, 1, len(r.Actions)) - // PolicyChangeAction + // ActionPolicyChange require.Equal(t, "id1", r.Actions[0].ID()) require.Equal(t, "POLICY_CHANGE", r.Actions[0].Type()) }, @@ -226,14 +226,14 @@ Something went wrong require.Equal(t, 2, len(r.Actions)) - // PolicyChangeAction + // ActionPolicyChange require.Equal(t, "id1", r.Actions[0].ID()) require.Equal(t, "POLICY_CHANGE", r.Actions[0].Type()) // UnknownAction require.Equal(t, "id2", r.Actions[1].ID()) require.Equal(t, "UNKNOWN", r.Actions[1].Type()) - require.Equal(t, "WHAT_TO_DO_WITH_IT", r.Actions[1].(*UnknownAction).OriginalType()) + require.Equal(t, "WHAT_TO_DO_WITH_IT", r.Actions[1].(*ActionUnknown).OriginalType()) }, ))