-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
otlp.go
56 lines (46 loc) · 1.97 KB
/
otlp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace"
import (
"context"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
const dataFormatProtobuf = "protobuf"
// Receiver is the type used to handle spans from OpenTelemetry exporters.
type Receiver struct {
ptraceotlp.UnimplementedGRPCServer
nextConsumer consumer.Traces
obsreport *receiverhelper.ObsReport
}
// New creates a new Receiver reference.
func New(nextConsumer consumer.Traces, obsreport *receiverhelper.ObsReport) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsreport: obsreport,
}
}
// Export implements the service Export traces func.
func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) {
td := req.Traces()
// We need to ensure that it propagates the receiver name as a tag
numSpans := td.SpanCount()
if numSpans == 0 {
return ptraceotlp.NewExportResponse(), nil
}
ctx = r.obsreport.StartTracesOp(ctx)
err := r.nextConsumer.ConsumeTraces(ctx, td)
r.obsreport.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err)
// Use appropriate status codes for permanent/non-permanent errors
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
return ptraceotlp.NewExportResponse(), errors.GetStatusFromError(err)
}
return ptraceotlp.NewExportResponse(), nil
}