Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion agentcfg/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package agentcfg

import (
"fmt"
"time"

gocache "github.com/patrickmn/go-cache"
Expand Down Expand Up @@ -55,7 +56,8 @@ func (c *cache) fetch(query Query, fetch func() (Result, error)) (Result, error)
c.gocache.SetDefault(query.id(), result)

if c.logger.IsDebug() {
c.logger.Debugf("Cache size %v. Added ID %v.", c.gocache.ItemCount(), query.id())
c.logger.Debugw(fmt.Sprintf("Cache size %v. Added ID %v.", c.gocache.ItemCount(), query.id()),
"service.name", query.Service.Name)
}
return result, nil
}
6 changes: 1 addition & 5 deletions beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ func Handler(processor *stream.Processor, batchProcessor model.BatchProcessor) r
return
}

metadata := model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}
res := processor.HandleStream(c.Request.Context(), c.RateLimiter, &metadata, reader, batchProcessor)
res := processor.HandleStream(c, reader, batchProcessor)
sendResponse(c, res)
}
}
Expand Down
4 changes: 2 additions & 2 deletions beater/jaeger/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ func (s *grpcSampler) GetSamplingStrategy(
if errors.As(err, &verr) {
if err := checkValidationError(verr); err != nil {
// do not return full error details since this is part of an unprotected endpoint response
s.logger.With(logp.Error(err)).Error("Configured Kibana client does not support agent remote configuration")
s.logger.With(logp.Error(err)).With("service.name", params.ServiceName).Error("Configured Kibana client does not support agent remote configuration")
return nil, errors.New("agent remote configuration not supported, check server logs for more details")
}
}

// do not return full error details since this is part of an unprotected endpoint response
s.logger.With(logp.Error(err)).Error("No valid sampling rate fetched from Kibana.")
s.logger.With(logp.Error(err)).With("service.name", params.ServiceName).Error("No valid sampling rate fetched from Kibana.")
return nil, errors.New("no sampling rate available, check server logs for more details")
}

Expand Down
5 changes: 5 additions & 0 deletions beater/middleware/log_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func LogMiddleware() Middleware {
return
}
h(c)

c.Logger = c.Logger.With("event.duration", time.Since(start))
if c.ServiceName != "" {
c.Logger = c.Logger.With("service.name", c.ServiceName)
}

if c.MultipleWriteAttempts() {
c.Logger.Warn("multiple write attempts")
}
Expand Down
2 changes: 2 additions & 0 deletions beater/request/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Context struct {
Logger *logp.Logger
RateLimiter *rate.Limiter
AuthResult authorization.Result
ServiceName string
IsRum bool
Result Result
RequestMetadata Metadata
Expand Down Expand Up @@ -75,6 +76,7 @@ func (c *Context) Reset(w http.ResponseWriter, r *http.Request) {
c.RateLimiter = nil
c.AuthResult = authorization.Result{}
c.IsRum = false
c.ServiceName = ""
c.Result.Reset()
c.RequestMetadata.Reset()

Expand Down
2 changes: 1 addition & 1 deletion model/stacktrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (st *Stacktrace) transform(ctx context.Context, cfg *transform.Config, rum
return
}
if _, ok := sourcemapErrorSet[errMsg]; !ok {
logger.Debug(errMsg)
logger.Debugw(errMsg, "service.name", service.Name)
sourcemapErrorSet[errMsg] = nil
}
})
Expand Down
2 changes: 1 addition & 1 deletion processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *Consumer) convertSpan(
metadata model.Metadata,
out *model.Batch,
) {
logger := logp.NewLogger(logs.Otel)
logger := logp.NewLogger(logs.Otel).With("service.name", metadata.Service.Name)

root := otelSpan.ParentSpanID().IsEmpty()

Expand Down
7 changes: 4 additions & 3 deletions processor/stream/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ package stream

import (
"bytes"
"context"
"io/ioutil"
"math"
"net/http"
"path/filepath"
"testing"

"github.com/elastic/apm-server/beater/request"

"golang.org/x/time/rate"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/model"
)

func BenchmarkBackendProcessor(b *testing.B) {
Expand Down Expand Up @@ -63,7 +64,7 @@ func benchmarkStreamProcessor(b *testing.B, processor *Processor, files []string
b.StopTimer()
r.Reset(data)
b.StartTimer()
processor.HandleStream(context.Background(), rl, &model.Metadata{}, r, batchProcessor)
processor.HandleStream(&request.Context{Request: &http.Request{}, RateLimiter: rl}, r, batchProcessor)
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion processor/stream/package_tests/intake_test_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"errors"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/elastic/apm-server/beater/request"

"github.com/elastic/beats/v7/libbeat/beat"

"github.com/elastic/apm-server/datastreams"
Expand Down Expand Up @@ -134,7 +137,7 @@ func (p *intakeTestProcessor) Process(buf []byte) ([]beat.Event, error) {
return nil
})

result := p.HandleStream(context.TODO(), nil, &model.Metadata{}, bytes.NewBuffer(buf), batchProcessor)
result := p.HandleStream(&request.Context{Request: &http.Request{}}, bytes.NewBuffer(buf), batchProcessor)
for _, event := range events {
// TODO(axw) migrate all of these tests to systemtest,
// so we can use the proper event publishing pipeline.
Expand Down
14 changes: 12 additions & 2 deletions processor/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"github.com/elastic/apm-server/beater/request"

"golang.org/x/time/rate"

"go.elastic.co/apm"
Expand Down Expand Up @@ -296,23 +298,31 @@ func handleDecodeErr(err error, r *streamReader, result *Result) bool {
}

// HandleStream processes a stream of events
func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limiter, meta *model.Metadata, reader io.Reader, processor model.BatchProcessor) *Result {
func (p *Processor) HandleStream(c *request.Context, reader io.Reader, processor model.BatchProcessor) *Result {
res := &Result{}

sr := p.getStreamReader(reader)
defer sr.release()

meta := &model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}

// first item is the metadata object
if err := p.readMetadata(sr, meta); err != nil {
// no point in continuing if we couldn't read the metadata
res.Add(err)
return res
}
c.ServiceName = meta.Service.Name

var allowedServiceNamesProcessor model.BatchProcessor = modelprocessor.Nop{}
if p.allowedServiceNames != nil {
allowedServiceNamesProcessor = modelprocessor.MetadataProcessorFunc(p.restrictAllowedServiceNames)
}

ctx := c.Request.Context()
requestTime := utility.RequestTime(ctx)

sp, ctx := apm.StartSpan(ctx, "Stream", "Reporter")
Expand All @@ -321,7 +331,7 @@ func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limite
var done bool
for !done {
var batch model.Batch
done = p.readBatch(ctx, ipRateLimiter, requestTime, meta, batchSize, &batch, sr, res)
done = p.readBatch(ctx, c.RateLimiter, requestTime, meta, batchSize, &batch, sr, res)
if batch.Len() == 0 {
continue
}
Expand Down
45 changes: 30 additions & 15 deletions processor/stream/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ import (
"fmt"
"io/ioutil"
"net"
"net/http"
"path/filepath"
"testing"
"testing/iotest"
"time"

"github.com/elastic/apm-server/beater/request"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -62,7 +65,7 @@ func TestHandlerReadStreamError(t *testing.T) {
timeoutReader := iotest.TimeoutReader(bytes.NewReader(payload))

sp := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024})
actualResult := sp.HandleStream(context.Background(), nil, &model.Metadata{}, timeoutReader, processor)
actualResult := sp.HandleStream(&request.Context{Request: &http.Request{}}, timeoutReader, processor)
assertApproveResult(t, actualResult, "ReadError")
}

Expand All @@ -87,7 +90,7 @@ func TestHandlerReportingStreamError(t *testing.T) {
},
} {
sp := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024})
actualResult := sp.HandleStream(context.Background(), nil, &model.Metadata{}, bytes.NewReader(payload), test.processor)
actualResult := sp.HandleStream(&request.Context{Request: &http.Request{}}, bytes.NewReader(payload), test.processor)
assertApproveResult(t, actualResult, test.name)
}
}
Expand Down Expand Up @@ -121,10 +124,14 @@ func TestIntegrationESOutput(t *testing.T) {
ctx := utility.ContextWithRequestTime(context.Background(), reqTimestamp)
batchProcessor := makeApproveEventsBatchProcessor(t, name)

reqDecoderMeta := &model.Metadata{System: model.System{IP: net.ParseIP("192.0.0.1")}}

c := &request.Context{
Request: (&http.Request{}).WithContext(ctx),
RequestMetadata: request.Metadata{
SystemIP: net.ParseIP("192.0.0.1"),
},
}
p := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024})
actualResult := p.HandleStream(ctx, nil, reqDecoderMeta, bytes.NewReader(payload), batchProcessor)
actualResult := p.HandleStream(c, bytes.NewReader(payload), batchProcessor)
assertApproveResult(t, actualResult, test.name)
})
}
Expand All @@ -147,12 +154,16 @@ func TestIntegrationRum(t *testing.T) {
ctx := utility.ContextWithRequestTime(context.Background(), reqTimestamp)
batchProcessor := makeApproveEventsBatchProcessor(t, name)

reqDecoderMeta := model.Metadata{
UserAgent: model.UserAgent{Original: "rum-2.0"},
Client: model.Client{IP: net.ParseIP("192.0.0.1")}}
c := &request.Context{
Request: (&http.Request{}).WithContext(ctx),
RequestMetadata: request.Metadata{
UserAgent: "rum-2.0",
SystemIP: net.ParseIP("192.0.0.1"),
},
}

p := RUMV2Processor(&config.Config{MaxEventSize: 100 * 1024, RumConfig: &config.RumConfig{}})
actualResult := p.HandleStream(ctx, nil, &reqDecoderMeta, bytes.NewReader(payload), batchProcessor)
actualResult := p.HandleStream(c, bytes.NewReader(payload), batchProcessor)
assertApproveResult(t, actualResult, test.name)
})
}
Expand All @@ -175,12 +186,16 @@ func TestRUMV3(t *testing.T) {
ctx := utility.ContextWithRequestTime(context.Background(), reqTimestamp)
batchProcessor := makeApproveEventsBatchProcessor(t, name)

reqDecoderMeta := model.Metadata{
UserAgent: model.UserAgent{Original: "rum-2.0"},
Client: model.Client{IP: net.ParseIP("192.0.0.1")}}
c := &request.Context{
Request: (&http.Request{}).WithContext(ctx),
RequestMetadata: request.Metadata{
UserAgent: "rum-2.0",
SystemIP: net.ParseIP("192.0.0.1"),
},
}

p := RUMV3Processor(&config.Config{MaxEventSize: 100 * 1024, RumConfig: &config.RumConfig{}})
actualResult := p.HandleStream(ctx, nil, &reqDecoderMeta, bytes.NewReader(payload), batchProcessor)
actualResult := p.HandleStream(c, bytes.NewReader(payload), batchProcessor)
assertApproveResult(t, actualResult, test.name)
})
}
Expand Down Expand Up @@ -215,7 +230,7 @@ func TestRUMAllowedServiceNames(t *testing.T) {
RumConfig: &config.RumConfig{AllowServiceNames: test.AllowServiceNames},
})

result := p.HandleStream(context.Background(), nil, &model.Metadata{}, bytes.NewReader(payload), modelprocessor.Nop{})
result := p.HandleStream(&request.Context{Request: &http.Request{}}, bytes.NewReader(payload), modelprocessor.Nop{})
assert.Equal(t, test.ExpectedResult, result)
}
}
Expand All @@ -242,7 +257,7 @@ func TestRateLimiting(t *testing.T) {
}

actualResult := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}).HandleStream(
context.Background(), test.lim, &model.Metadata{}, bytes.NewReader(payload), nopBatchProcessor{})
&request.Context{Request: &http.Request{}, RateLimiter: test.lim}, bytes.NewReader(payload), nopBatchProcessor{})
assertApproveResult(t, actualResult, test.name)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
"name": "rum-js",
"version": "0.0.0"
},
"client": {
"ip": "192.0.0.1"
},
"data_stream.dataset": "apm.error.apm_agent_js",
"data_stream.type": "logs",
"error": {
Expand Down Expand Up @@ -95,6 +92,9 @@
"url": "http://localhost:8000/test/e2e/general-usecase/"
}
},
"host": {
"ip": "192.0.0.1"
},
"http": {
"request": {
"referrer": "http://localhost:8000/test/e2e/"
Expand All @@ -108,9 +108,6 @@
"name": "apm-agent-js",
"version": "1.0.1"
},
"source": {
"ip": "192.0.0.1"
},
"timestamp": {
"us": 1512735530291000
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
"name": "rum-js",
"version": "0.0.0"
},
"client": {
"ip": "192.0.0.1"
},
"data_stream.dataset": "apm.apm_agent_js",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
},
"host": {
"ip": "192.0.0.1"
},
"http": {
"request": {
"referrer": "http://localhost:8000/test/e2e/"
Expand All @@ -27,9 +27,6 @@
"name": "apm-agent-js",
"version": "1.0.0"
},
"source": {
"ip": "192.0.0.1"
},
"timestamp": {
"us": 1533117600000000
},
Expand Down Expand Up @@ -74,14 +71,14 @@
"name": "rum-js",
"version": "0.0.0"
},
"client": {
"ip": "192.0.0.1"
},
"data_stream.dataset": "apm.apm_agent_js",
"data_stream.type": "traces",
"event": {
"outcome": "unknown"
},
"host": {
"ip": "192.0.0.1"
},
"parent": {
"id": "611f4fa950f04631"
},
Expand Down
Loading