diff --git a/receiver/otlpreceiver/internal/errors/errors.go b/receiver/otlpreceiver/internal/errors/errors.go new file mode 100644 index 000000000000..77f70ea5875a --- /dev/null +++ b/receiver/otlpreceiver/internal/errors/errors.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" + +import ( + "net/http" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/collector/consumer/consumererror" +) + +func GetStatusFromError(err error) *status.Status { + s, ok := status.FromError(err) + if !ok { + // Default to a retryable error + // https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + code := codes.Unavailable + if consumererror.IsPermanent(err) { + code = codes.InvalidArgument + } + s = status.New(code, err.Error()) + } + return s +} + +func GetHTTPStatusCodeFromStatus(err error) int { + s, ok := status.FromError(err) + if !ok { + return http.StatusInternalServerError + } + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + // to see if a code is retryable. + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1 + // to see a list of retryable http status codes. + switch s.Code() { + // Retryable + case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return http.StatusServiceUnavailable + // Not Retryable + default: + return http.StatusInternalServerError + } +} diff --git a/receiver/otlpreceiver/internal/errors/errors_test.go b/receiver/otlpreceiver/internal/errors/errors_test.go new file mode 100644 index 000000000000..c511ea96bcb2 --- /dev/null +++ b/receiver/otlpreceiver/internal/errors/errors_test.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package errors // import "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/util" + +import ( + "fmt" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/collector/consumer/consumererror" +) + +func Test_GetStatusFromError(t *testing.T) { + tests := []struct { + name string + input error + expected *status.Status + }{ + { + name: "Status", + input: status.Error(codes.Aborted, "test"), + expected: status.New(codes.Aborted, "test"), + }, + { + name: "Permanent Error", + input: consumererror.NewPermanent(fmt.Errorf("test")), + expected: status.New(codes.InvalidArgument, "Permanent error: test"), + }, + { + name: "Non-Permanent Error", + input: fmt.Errorf("test"), + expected: status.New(codes.Unavailable, "test"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetStatusFromError(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func Test_GetHTTPStatusCodeFromStatus(t *testing.T) { + tests := []struct { + name string + input error + expected int + }{ + { + name: "Not a Status", + input: fmt.Errorf("not a status error"), + expected: http.StatusInternalServerError, + }, + { + name: "Retryable Status", + input: status.New(codes.Unavailable, "test").Err(), + expected: http.StatusServiceUnavailable, + }, + { + name: "Non-retryable Status", + input: status.New(codes.InvalidArgument, "test").Err(), + expected: http.StatusInternalServerError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetHTTPStatusCodeFromStatus(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/receiver/otlpreceiver/internal/logs/otlp.go b/receiver/otlpreceiver/internal/logs/otlp.go index ea88814ec6de..875174f2d787 100644 --- a/receiver/otlpreceiver/internal/logs/otlp.go +++ b/receiver/otlpreceiver/internal/logs/otlp.go @@ -6,12 +6,9 @@ package logs // import "go.opentelemetry.io/collector/receiver/otlpreceiver/inte import ( "context" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" "go.opentelemetry.io/collector/receiver/receiverhelper" ) @@ -51,15 +48,7 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) if err != nil { - s, ok := status.FromError(err) - if !ok { - code := codes.Unavailable - if consumererror.IsPermanent(err) { - code = codes.InvalidArgument - } - s = status.New(code, err.Error()) - } - return plogotlp.NewExportResponse(), s.Err() + return plogotlp.NewExportResponse(), errors.GetStatusFromError(err).Err() } return plogotlp.NewExportResponse(), nil diff --git a/receiver/otlpreceiver/internal/metrics/otlp.go b/receiver/otlpreceiver/internal/metrics/otlp.go index aa5a737c4acd..3f39520fd990 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp.go +++ b/receiver/otlpreceiver/internal/metrics/otlp.go @@ -6,12 +6,9 @@ package metrics // import "go.opentelemetry.io/collector/receiver/otlpreceiver/i import ( "context" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" "go.opentelemetry.io/collector/receiver/receiverhelper" ) @@ -51,15 +48,7 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) if err != nil { - s, ok := status.FromError(err) - if !ok { - code := codes.Unavailable - if consumererror.IsPermanent(err) { - code = codes.InvalidArgument - } - s = status.New(code, err.Error()) - } - return pmetricotlp.NewExportResponse(), s.Err() + return pmetricotlp.NewExportResponse(), errors.GetStatusFromError(err).Err() } return pmetricotlp.NewExportResponse(), nil diff --git a/receiver/otlpreceiver/internal/trace/otlp.go b/receiver/otlpreceiver/internal/trace/otlp.go index f1d890cf1ec7..c79ced54c902 100644 --- a/receiver/otlpreceiver/internal/trace/otlp.go +++ b/receiver/otlpreceiver/internal/trace/otlp.go @@ -6,12 +6,9 @@ package trace // import "go.opentelemetry.io/collector/receiver/otlpreceiver/int import ( "context" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" "go.opentelemetry.io/collector/receiver/receiverhelper" ) @@ -52,15 +49,7 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) if err != nil { - s, ok := status.FromError(err) - if !ok { - code := codes.Unavailable - if consumererror.IsPermanent(err) { - code = codes.InvalidArgument - } - s = status.New(code, err.Error()) - } - return ptraceotlp.NewExportResponse(), s.Err() + return ptraceotlp.NewExportResponse(), errors.GetStatusFromError(err).Err() } return ptraceotlp.NewExportResponse(), nil diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 20e00a38292a..54fe9ad8f120 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -52,10 +52,12 @@ var otlpReceiverID = component.NewIDWithName("otlp", otlpReceiverName) func TestJsonHttp(t *testing.T) { tests := []struct { - name string - encoding string - contentType string - err error + name string + encoding string + contentType string + err error + expectedStatus *spb.Status + expectedStatusCode int }{ { name: "JSONUncompressed", @@ -83,16 +85,36 @@ func TestJsonHttp(t *testing.T) { contentType: "application/json", }, { - name: "NotGRPCError", - encoding: "", - contentType: "application/json", - err: errors.New("my error"), + name: "Permanent NotGRPCError", + encoding: "", + contentType: "application/json", + err: consumererror.NewPermanent(errors.New("my error")), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"}, + expectedStatusCode: http.StatusInternalServerError, }, { - name: "GRPCError", - encoding: "", - contentType: "application/json", - err: status.New(codes.Unavailable, "").Err(), + name: "Retryable NotGRPCError", + encoding: "", + contentType: "application/json", + err: errors.New("my error"), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}, + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + name: "Permanent GRPCError", + encoding: "", + contentType: "application/json", + err: status.New(codes.InvalidArgument, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, + expectedStatusCode: http.StatusInternalServerError, + }, + { + name: "Retryable GRPCError", + encoding: "", + contentType: "application/json", + err: status.New(codes.Unavailable, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + expectedStatusCode: http.StatusServiceUnavailable, }, } addr := testutil.GetAvailableLocalAddress(t) @@ -108,7 +130,7 @@ func TestJsonHttp(t *testing.T) { for _, dr := range generateDataRequests(t) { url := "http://" + addr + dr.path - respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.err != nil) + respBytes := doHTTPRequest(t, url, tt.encoding, tt.contentType, dr.jsonBytes, tt.expectedStatusCode) if tt.err == nil { tr := ptraceotlp.NewExportResponse() assert.NoError(t, tr.UnmarshalJSON(respBytes), "Unable to unmarshal response to Response") @@ -120,7 +142,7 @@ func TestJsonHttp(t *testing.T) { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { fmt.Println(errStatus) - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) + assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) } sink.checkData(t, dr.data, 0) } @@ -302,9 +324,11 @@ func TestHandleInvalidRequests(t *testing.T) { func TestProtoHttp(t *testing.T) { tests := []struct { - name string - encoding string - err error + name string + encoding string + err error + expectedStatus *spb.Status + expectedStatusCode int }{ { name: "ProtoUncompressed", @@ -319,14 +343,32 @@ func TestProtoHttp(t *testing.T) { encoding: "zstd", }, { - name: "NotGRPCError", - encoding: "", - err: errors.New("my error"), + name: "Permanent NotGRPCError", + encoding: "", + err: consumererror.NewPermanent(errors.New("my error")), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Permanent error: my error"}, + expectedStatusCode: http.StatusInternalServerError, }, { - name: "GRPCError", - encoding: "", - err: status.New(codes.Unavailable, "").Err(), + name: "Retryable NotGRPCError", + encoding: "", + err: errors.New("my error"), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}, + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + name: "Permanent GRPCError", + encoding: "", + err: status.New(codes.InvalidArgument, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, + expectedStatusCode: http.StatusInternalServerError, + }, + { + name: "Retryable GRPCError", + encoding: "", + err: status.New(codes.Unavailable, "").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + expectedStatusCode: http.StatusServiceUnavailable, }, } addr := testutil.GetAvailableLocalAddress(t) @@ -345,7 +387,7 @@ func TestProtoHttp(t *testing.T) { for _, dr := range generateDataRequests(t) { url := "http://" + addr + dr.path - respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.err != nil) + respBytes := doHTTPRequest(t, url, tt.encoding, "application/x-protobuf", dr.protoBytes, tt.expectedStatusCode) if tt.err == nil { tr := ptraceotlp.NewExportResponse() assert.NoError(t, tr.UnmarshalProto(respBytes)) @@ -356,7 +398,7 @@ func TestProtoHttp(t *testing.T) { if s, ok := status.FromError(tt.err); ok { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) + assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) } sink.checkData(t, dr.data, 0) } @@ -560,20 +602,30 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { // trace receiver. func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { type ingestionStateTest struct { - okToIngest bool - expectedCode codes.Code + okToIngest bool + err error + expectedCode codes.Code + expectedStatusCode int } expectedReceivedBatches := 2 - expectedIngestionBlockedRPCs := 1 + expectedIngestionBlockedRPCs := 2 ingestionStates := []ingestionStateTest{ { okToIngest: true, expectedCode: codes.OK, }, { - okToIngest: false, - expectedCode: codes.Unavailable, + okToIngest: false, + err: consumererror.NewPermanent(errors.New("consumer error")), + expectedCode: codes.InvalidArgument, + expectedStatusCode: http.StatusInternalServerError, + }, + { + okToIngest: false, + err: errors.New("consumer error"), + expectedCode: codes.Unavailable, + expectedStatusCode: http.StatusServiceUnavailable, }, { okToIngest: true, @@ -599,7 +651,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { if ingestionState.okToIngest { sink.SetConsumeError(nil) } else { - sink.SetConsumeError(errors.New("consumer error")) + sink.SetConsumeError(ingestionState.err) } pbMarshaler := ptrace.ProtoMarshaler{} @@ -620,7 +672,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { } else { errStatus := &spb.Status{} assert.NoError(t, proto.Unmarshal(respBytes, errStatus)) - assert.Equal(t, http.StatusInternalServerError, resp.StatusCode) + assert.Equal(t, ingestionState.expectedStatusCode, resp.StatusCode) assert.Equal(t, ingestionState.expectedCode, codes.Code(errStatus.Code)) } } @@ -853,7 +905,7 @@ func doHTTPRequest( encoding string, contentType string, data []byte, - expectErr bool, + expectStatusCode int, ) []byte { req := createHTTPRequest(t, url, encoding, contentType, data) resp, err := http.DefaultClient.Do(req) @@ -866,10 +918,10 @@ func doHTTPRequest( // For cases like "application/json; charset=utf-8", the response will be only "application/json" require.True(t, strings.HasPrefix(strings.ToLower(contentType), resp.Header.Get("Content-Type"))) - if !expectErr { + if expectStatusCode == 0 { require.Equal(t, http.StatusOK, resp.StatusCode) } else { - require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Equal(t, expectStatusCode, resp.StatusCode) } return respBytes diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index dca427370528..099106820165 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -9,6 +9,7 @@ import ( "mime" "net/http" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -42,7 +43,7 @@ func handleTraces(resp http.ResponseWriter, req *http.Request, tracesReceiver *t otlpResp, err := tracesReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return } @@ -73,7 +74,7 @@ func handleMetrics(resp http.ResponseWriter, req *http.Request, metricsReceiver otlpResp, err := metricsReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return } @@ -104,7 +105,7 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, logsReceiver *logs. otlpResp, err := logsReceiver.Export(req.Context(), otlpReq) if err != nil { - writeError(resp, enc, err, http.StatusInternalServerError) + writeError(resp, enc, err, errors.GetHTTPStatusCodeFromStatus(err)) return }