Skip to content

feat: add entity id in event submission #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 9, 2020
2 changes: 2 additions & 0 deletions internal/agent/event_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (s *EventSenderSuite) TestLargeEventBatch(c *C) {
sender.QueueEvent(mapEvent{
"eventType": "TestEvent",
"value": i,
"entityID": 1,
}, "")
}

Expand All @@ -157,6 +158,7 @@ func (s *EventSenderSuite) TestLargeEventBatch(c *C) {
c.Assert(json.Unmarshal(accumulatedBatches[1], &postedBatches), IsNil)
c.Assert(postedBatches, HasLen, 1)
c.Assert(postedBatches[0].Events, HasLen, 10)
c.Assert(postedBatches[0].EntityID, Equals, 1)

c.Assert(accumulatedRequests, HasLen, 2)
c.Assert(accumulatedRequests[0].Header.Get(http2.LicenseHeader), Equals, "license")
Expand Down
48 changes: 37 additions & 11 deletions pkg/integrations/v4/dm/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/newrelic/infrastructure-agent/pkg/integrations/legacy"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/sirupsen/logrus"
"github.com/tevino/abool"
)

Expand Down Expand Up @@ -177,7 +178,7 @@ func (e *emitter) processEntityFwRequest(r fwrequest.EntityFwRequest) {

emitInventory(&plugin, r.Definition, r.Integration, r.ID(), r.Data, labels)

emitEvent(&plugin, r.Definition, r.Data, labels)
emitEvent(&plugin, r.Definition, r.Data, labels, r.ID())

metrics := dmProcessor.ProcessMetrics(r.Data.Metrics, r.Data.Common, r.Data.Entity)
if err := e.metricsSender.SendMetricsWithCommonAttributes(r.Data.Common, metrics); err != nil {
Expand Down Expand Up @@ -205,18 +206,43 @@ func emitInventory(
}
}

func emitEvent(
emitter agent.PluginEmitter,
metadata integration.Definition,
dataSet protocol.Dataset,
labels map[string]string) {
func emitEvent(emitter agent.PluginEmitter, metadata integration.Definition, dataSet protocol.Dataset, labels map[string]string, entityID entity.ID) {
builder := make([]func(protocol.EventData), 0)

u := metadata.ExecutorConfig.User
if u != "" {
builder = append(builder, protocol.WithIntegrationUser(u))
}

builder = append(builder, protocol.WithLabels(labels))

integrationUser := metadata.ExecutorConfig.User
for _, event := range dataSet.Events {
normalizedEvent := legacy.
NormalizeEvent(elog, event, labels, integrationUser, dataSet.Entity.Name)
if normalizedEvent != nil {
emitter.EmitEvent(normalizedEvent, entity.Key(dataSet.Entity.Name))
builder = append(builder,
protocol.WithEntity(entity.New(entity.Key(dataSet.Entity.Name), entityID)),
protocol.WithEvents(event))

attributesFromEvent(event, &builder)

e, err := protocol.NewEventData(builder...)

if err != nil {
elog.WithFields(logrus.Fields{
"payload": event,
"error": err,
}).Warn("discarding event, failed building event data.")
continue
}

emitter.EmitEvent(e, entity.Key(dataSet.Entity.Name))
}
}

func attributesFromEvent(event protocol.EventData, builder *[]func(protocol.EventData)) {
if a, ok := event["attributes"]; ok {
switch t := a.(type) {
default:
case map[string]interface{}:
*builder = append(*builder, protocol.WithAttributes(t))
}
}
}
Expand Down
80 changes: 74 additions & 6 deletions pkg/integrations/v4/dm/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
package dm

import (
"fmt"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/executor"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
"io/ioutil"
"os"
"sync"
"testing"

Expand Down Expand Up @@ -66,6 +74,9 @@ func TestEmitter_Send_usingIDCache(t *testing.T) {
aCtx := getAgentContext("bob")
aCtx.On("SendData",
agent.PluginOutput{Id: ids.PluginID{Category: "integration", Term: "integration name"}, Entity: entity.New("unique name", eID), Data: agent.PluginInventoryDataset{protocol.InventoryData{"id": "inventory_foo", "value": "bar"}}, NotApplicable: false})

aCtx.On("SendEvent", mock.AnythingOfType("agent.mapEvent"), entity.Key("unique name"))

aCtx.SendDataWg.Add(1)

dmSender := &mockedMetricsSender{
Expand Down Expand Up @@ -102,8 +113,12 @@ func TestEmitter_Send(t *testing.T) {

aCtx := getAgentContext("bob")
aCtx.On("SendData",
agent.PluginOutput{Id: ids.PluginID{Category: "integration", Term: "integration name"}, Entity: entity.New("unique name", eID), Data: agent.PluginInventoryDataset{protocol.InventoryData{"id": "inventory_foo", "value": "bar"}}, NotApplicable: false})
agent.PluginOutput{Id: ids.PluginID{Category: "integration", Term: "integration name"}, Entity: entity.New("unique name", eID), Data: agent.PluginInventoryDataset{protocol.InventoryData{"id": "inventory_foo", "value": "bar"}, protocol.InventoryData{"entityKey": "unique name", "id": "integrationUser", "value": "root"}}, NotApplicable: false})

aCtx.On("SendEvent", mock.Anything, entity.Key("unique name")).Run(assertEventData(t))

aCtx.SendDataWg.Add(1)

aCtx.On("Identity").Return(
entity.Identity{
ID: entity.ID(321), // agent one
Expand All @@ -125,7 +140,7 @@ func TestEmitter_Send(t *testing.T) {
e.registerMaxBatchSize = 1

data := integrationFixture.ProtocolV4.Clone().ParsedV4
em.Send(fwrequest.NewFwRequest(integration.Definition{}, nil, nil, data))
em.Send(fwrequest.NewFwRequest(integration.Definition{ExecutorConfig: executor.Config{User: "root"}}, nil, nil, data))

ms.wg.Wait()
aCtx.SendDataWg.Wait()
Expand All @@ -137,6 +152,62 @@ func TestEmitter_Send(t *testing.T) {
assert.Equal(t, eID.String(), sent[0].Attributes[fwrequest.EntityIdAttribute])
}

func Test_NrEntityIdConst(t *testing.T) {
assert.Equal(t, fwrequest.EntityIdAttribute, "nr.entity.id")
}

func TestEmitEvent_InvalidPayload(t *testing.T) {
log.SetOutput(ioutil.Discard) // discard logs so not to break race tests
defer log.SetOutput(os.Stderr) // return back to default
hook := new(logTest.Hook)
log.AddHook(hook)
log.SetLevel(logrus.WarnLevel)

never := 0
aCtx := getAgentContext("bob")
aCtx.On("SendEvent").Times(never)

d := integration.Definition{}
plugin := agent.NewExternalPluginCommon(d.PluginID("integration.Name"), aCtx, "TestEmitEvent_InvalidPayload")

emitEvent(&plugin, d, protocol.Dataset{Events: []protocol.EventData{{"value": "foo"}}}, nil, entity.ID(0))

entry := hook.LastEntry()
require.NotEmpty(t, hook.Entries)
assert.Equal(t, "DimensionalMetricsEmitter", entry.Data["component"])
assert.Equal(t, "discarding event, failed building event data.", entry.Message)
assert.EqualError(t, entry.Data["error"].(error), "invalid event format: missing required 'summary' field")
assert.Equal(t, logrus.WarnLevel, entry.Level)
}

func assertEventData(t *testing.T) func(args mock.Arguments) {
return func(args mock.Arguments) {
event := args.Get(0)
plainEvent := fmt.Sprint(event)

expectedSummary := "summary:foo"
assert.Contains(t, plainEvent, expectedSummary)

expectedEvent := "format:event"
assert.Contains(t, plainEvent, expectedEvent)

expectedCategory := "category:notifications"
assert.Contains(t, plainEvent, expectedCategory)

expectedType := "eventType:InfrastructureEvent"
assert.Contains(t, plainEvent, expectedType)

expectedEntityID := "entityID:1"
assert.Contains(t, plainEvent, expectedEntityID)

expectedAttribute := "attr.format:attribute"
assert.Contains(t, plainEvent, expectedAttribute)

expectedUser := "integrationUser:root"
assert.Contains(t, plainEvent, expectedUser)
}
}

func getAgentContext(hostname string) *mocks.AgentContext {
agentCtx := &mocks.AgentContext{
SendDataWg: sync.WaitGroup{},
Expand All @@ -146,10 +217,7 @@ func getAgentContext(hostname string) *mocks.AgentContext {
idLookup[sysinfo.HOST_SOURCE_INSTANCE_ID] = hostname
}
agentCtx.On("IDLookup").Return(idLookup)
agentCtx.On("Config").Return(nil)

return agentCtx
}

func Test_NrEntityIdConst(t *testing.T) {
assert.Equal(t, fwrequest.EntityIdAttribute, "nr.entity.id")
}
77 changes: 77 additions & 0 deletions pkg/integrations/v4/protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package protocol

import (
"encoding/json"
"errors"
"fmt"
"time"

Expand All @@ -22,6 +23,8 @@ const (

const millisSinceJanuaryFirst1978 = 252489600000

var acceptedAttribute = []string{"summary", "category", "entity_name", "format", "local_identity", "local_details"}

type DataV4 struct {
PluginProtocolVersion
Integration IntegrationMetadata `json:"integration"`
Expand Down Expand Up @@ -129,6 +132,80 @@ type MetricData map[string]interface{}
// EventData is the data type for single shot events
type EventData map[string]interface{}

// NewEventData create a new event data from builder func
func NewEventData(options ...func(EventData)) (EventData, error) {
e := EventData{
"eventType": "InfrastructureEvent",
"category": "notifications",
}

for _, opt := range options {
opt(e)
}

// Validate required field
if _, ok := e["summary"]; !ok {
return nil, errors.New("invalid event format: missing required 'summary' field")
}

// there are integrations that add the hostname
// and since backed has a attribute limit
// we remove it to avoid potential conflict when submitting events
delete(e, "hostname")

return e, nil
}

// Builder for NewEventData constructor will copy only valid keys
// valid keys: ["summary", "category", "entity_name", "format", "local_identity", "local_details"]
func WithEvents(original EventData) func(EventData) {
return func(copy EventData) {
for _, key := range acceptedAttribute {
if val, ok := original[key]; ok {
copy[key] = val
}
}
}
}

// Builder for NewEventData constructor will add 'integrationUser' key
func WithIntegrationUser(value string) func(EventData) {
return func(copy EventData) {
copy["integrationUser"] = value
}
}

// Builder for NewEventData constructor will add 'entityKey' and 'entityID' keys
func WithEntity(e entity.Entity) func(EventData) {
return func(copy EventData) {
copy["entityKey"] = e.Key.String()
copy["entityID"] = e.ID.String()
}
}

// Builder for NewEventData constructor will add labels with prefix 'label.'
func WithLabels(l map[string]string) func(EventData) {
return func(copy EventData) {
for key, value := range l {
copy[fmt.Sprintf("label.%s", key)] = value
}
}
}

// Builder for NewEventData constructor will add attributes
// if already exist in the eventData will add it with prefix 'attr.'
func WithAttributes(a map[string]interface{}) func(EventData) {
return func(copy EventData) {
for key, value := range a {
if _, ok := copy[key]; ok {
copy[fmt.Sprintf("attr.%s", key)] = value
} else {
copy[key] = value
}
}
}
}

// Minimum information to determine plugin protocol
type PluginProtocolVersion struct {
RawProtocolVersion interface{} `json:"protocol_version"` // Left open-ended for validation purposes
Expand Down
Loading