diff --git a/.chloggen/handle-grpc-status-codes.yaml b/.chloggen/handle-grpc-status-codes.yaml new file mode 100644 index 00000000000..39cf9a61002 --- /dev/null +++ b/.chloggen/handle-grpc-status-codes.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlpreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Ensure OTLP receiver handles consume errors correctly + +# One or more tracking issues or pull requests related to the change +issues: [4335] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Make sure OTLP receiver returns correct status code and follows the receiver contract (gRPC) diff --git a/receiver/otlpreceiver/internal/logs/otlp.go b/receiver/otlpreceiver/internal/logs/otlp.go index faf92372d51..ea88814ec6d 100644 --- a/receiver/otlpreceiver/internal/logs/otlp.go +++ b/receiver/otlpreceiver/internal/logs/otlp.go @@ -6,7 +6,11 @@ package logs // import "go.opentelemetry.io/collector/receiver/otlpreceiver/inte import ( "context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" ) @@ -40,5 +44,23 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog err := r.nextConsumer.ConsumeLogs(ctx, ld) r.obsreport.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) - return plogotlp.NewExportResponse(), err + // Use appropriate status codes for permanent/non-permanent errors + // If we return the error straightaway, then the grpc implementation will set status code to Unknown + // Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345 + // So, convert the error to appropriate grpc status and return the error + // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) + // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) + if err != nil { + s, ok := status.FromError(err) + if !ok { + code := codes.Unavailable + if consumererror.IsPermanent(err) { + code = codes.InvalidArgument + } + s = status.New(code, err.Error()) + } + return plogotlp.NewExportResponse(), s.Err() + } + + return plogotlp.NewExportResponse(), nil } diff --git a/receiver/otlpreceiver/internal/logs/otlp_test.go b/receiver/otlpreceiver/internal/logs/otlp_test.go index beb64c159e1..b13eab39820 100644 --- a/receiver/otlpreceiver/internal/logs/otlp_test.go +++ b/receiver/otlpreceiver/internal/logs/otlp_test.go @@ -12,10 +12,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/plog/plogotlp" @@ -47,13 +50,25 @@ func TestExport_EmptyRequest(t *testing.T) { assert.NotNil(t, resp, "The response is missing") } -func TestExport_ErrorConsumer(t *testing.T) { +func TestExport_NonPermanentErrorConsumer(t *testing.T) { ld := testdata.GenerateLogs(1) req := plogotlp.NewExportRequestFromLogs(ld) logClient := makeLogsServiceClient(t, consumertest.NewErr(errors.New("my error"))) resp, err := logClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") + assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error") + assert.IsType(t, status.Error(codes.Unknown, ""), err) + assert.Equal(t, plogotlp.ExportResponse{}, resp) +} + +func TestExport_PermanentErrorConsumer(t *testing.T) { + ld := testdata.GenerateLogs(1) + req := plogotlp.NewExportRequestFromLogs(ld) + + logClient := makeLogsServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error")))) + resp, err := logClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error") + assert.IsType(t, status.Error(codes.Unknown, ""), err) assert.Equal(t, plogotlp.ExportResponse{}, resp) } diff --git a/receiver/otlpreceiver/internal/metrics/otlp.go b/receiver/otlpreceiver/internal/metrics/otlp.go index 59330dcc318..aa5a737c4ac 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp.go +++ b/receiver/otlpreceiver/internal/metrics/otlp.go @@ -6,7 +6,11 @@ package metrics // import "go.opentelemetry.io/collector/receiver/otlpreceiver/i import ( "context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" ) @@ -40,5 +44,23 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p err := r.nextConsumer.ConsumeMetrics(ctx, md) r.obsreport.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) - return pmetricotlp.NewExportResponse(), err + // Use appropriate status codes for permanent/non-permanent errors + // If we return the error straightaway, then the grpc implementation will set status code to Unknown + // Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345 + // So, convert the error to appropriate grpc status and return the error + // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) + // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) + if err != nil { + s, ok := status.FromError(err) + if !ok { + code := codes.Unavailable + if consumererror.IsPermanent(err) { + code = codes.InvalidArgument + } + s = status.New(code, err.Error()) + } + return pmetricotlp.NewExportResponse(), s.Err() + } + + return pmetricotlp.NewExportResponse(), nil } diff --git a/receiver/otlpreceiver/internal/metrics/otlp_test.go b/receiver/otlpreceiver/internal/metrics/otlp_test.go index d8691daaba1..0700cdcf828 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp_test.go +++ b/receiver/otlpreceiver/internal/metrics/otlp_test.go @@ -12,10 +12,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" @@ -47,13 +50,25 @@ func TestExport_EmptyRequest(t *testing.T) { require.NotNil(t, resp) } -func TestExport_ErrorConsumer(t *testing.T) { +func TestExport_NonPermanentErrorConsumer(t *testing.T) { md := testdata.GenerateMetrics(1) req := pmetricotlp.NewExportRequestFromMetrics(md) metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error"))) resp, err := metricsClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") + assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error") + assert.IsType(t, status.Error(codes.Unknown, ""), err) + assert.Equal(t, pmetricotlp.ExportResponse{}, resp) +} + +func TestExport_PermanentErrorConsumer(t *testing.T) { + ld := testdata.GenerateMetrics(1) + req := pmetricotlp.NewExportRequestFromMetrics(ld) + + metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error")))) + resp, err := metricsClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error") + assert.IsType(t, status.Error(codes.Unknown, ""), err) assert.Equal(t, pmetricotlp.ExportResponse{}, resp) } diff --git a/receiver/otlpreceiver/internal/trace/otlp.go b/receiver/otlpreceiver/internal/trace/otlp.go index d14779e712b..f1d890cf1ec 100644 --- a/receiver/otlpreceiver/internal/trace/otlp.go +++ b/receiver/otlpreceiver/internal/trace/otlp.go @@ -6,7 +6,11 @@ package trace // import "go.opentelemetry.io/collector/receiver/otlpreceiver/int import ( "context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" ) @@ -41,5 +45,23 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt err := r.nextConsumer.ConsumeTraces(ctx, td) r.obsreport.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) - return ptraceotlp.NewExportResponse(), err + // Use appropriate status codes for permanent/non-permanent errors + // If we return the error straightaway, then the grpc implementation will set status code to Unknown + // Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345 + // So, convert the error to appropriate grpc status and return the error + // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) + // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) + if err != nil { + s, ok := status.FromError(err) + if !ok { + code := codes.Unavailable + if consumererror.IsPermanent(err) { + code = codes.InvalidArgument + } + s = status.New(code, err.Error()) + } + return ptraceotlp.NewExportResponse(), s.Err() + } + + return ptraceotlp.NewExportResponse(), nil } diff --git a/receiver/otlpreceiver/internal/trace/otlp_test.go b/receiver/otlpreceiver/internal/trace/otlp_test.go index fcb7611c75f..0bfdbbe1b9f 100644 --- a/receiver/otlpreceiver/internal/trace/otlp_test.go +++ b/receiver/otlpreceiver/internal/trace/otlp_test.go @@ -12,10 +12,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" @@ -45,13 +48,24 @@ func TestExport_EmptyRequest(t *testing.T) { assert.NotNil(t, resp, "The response is missing") } -func TestExport_ErrorConsumer(t *testing.T) { +func TestExport_NonPermanentErrorConsumer(t *testing.T) { td := testdata.GenerateTraces(1) req := ptraceotlp.NewExportRequestFromTraces(td) traceClient := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) resp, err := traceClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") + assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error") + assert.IsType(t, status.Error(codes.Unknown, ""), err) + assert.Equal(t, ptraceotlp.ExportResponse{}, resp) +} +func TestExport_PermanentErrorConsumer(t *testing.T) { + ld := testdata.GenerateTraces(1) + req := ptraceotlp.NewExportRequestFromTraces(ld) + + traceClient := makeTraceServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error")))) + resp, err := traceClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error") + assert.IsType(t, status.Error(codes.Unknown, ""), err) assert.Equal(t, ptraceotlp.ExportResponse{}, resp) } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 1bbded22474..a0ca95c5526 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -35,6 +35,7 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/internal/testutil" @@ -91,7 +92,7 @@ func TestJsonHttp(t *testing.T) { name: "GRPCError", encoding: "", contentType: "application/json", - err: status.New(codes.Internal, "").Err(), + err: status.New(codes.Unavailable, "").Err(), }, } addr := testutil.GetAvailableLocalAddress(t) @@ -122,7 +123,8 @@ func TestJsonHttp(t *testing.T) { if s, ok := status.FromError(tt.err); ok { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"})) + fmt.Println(errStatus) + assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) } sink.checkData(t, dr.data, 0) } @@ -332,7 +334,7 @@ func TestProtoHttp(t *testing.T) { { name: "GRPCError", encoding: "", - err: status.New(codes.Internal, "").Err(), + err: status.New(codes.Unavailable, "").Err(), }, } addr := testutil.GetAvailableLocalAddress(t) @@ -366,7 +368,7 @@ func TestProtoHttp(t *testing.T) { if s, ok := status.FromError(tt.err); ok { assert.True(t, proto.Equal(errStatus, s.Proto())) } else { - assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"})) + assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"})) } sink.checkData(t, dr.data, 0) } @@ -497,11 +499,12 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) { func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { type ingestionStateTest struct { okToIngest bool + permanent bool expectedCode codes.Code } expectedReceivedBatches := 2 - expectedIngestionBlockedRPCs := 1 + expectedIngestionBlockedRPCs := 2 ingestionStates := []ingestionStateTest{ { okToIngest: true, @@ -509,7 +512,12 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { }, { okToIngest: false, - expectedCode: codes.Unknown, + expectedCode: codes.Unavailable, + }, + { + okToIngest: false, + expectedCode: codes.InvalidArgument, + permanent: true, }, { okToIngest: true, @@ -541,7 +549,11 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { if ingestionState.okToIngest { sink.SetConsumeError(nil) } else { - sink.SetConsumeError(errors.New("consumer error")) + if ingestionState.permanent { + sink.SetConsumeError(consumererror.NewPermanent(errors.New("consumer error"))) + } else { + sink.SetConsumeError(errors.New("consumer error")) + } } _, err = ptraceotlp.NewGRPCClient(cc).Export(context.Background(), ptraceotlp.NewExportRequestFromTraces(td)) @@ -576,7 +588,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) { }, { okToIngest: false, - expectedCode: codes.Unknown, + expectedCode: codes.Unavailable, }, { okToIngest: true,