Skip to content

Commit

Permalink
[OTLP/GRPC] Ensure OTLP receiver handles consume errors correctly (#8080
Browse files Browse the repository at this point in the history
)

**Description:** Follow the receiver contract and return Unavailable for
non-permanent and InvalidArgument for permanent errors for OTLP/gRPC
receiver.

Leave the "Retry-After" field blank and let the client implement an
exponential backoff strategy.

**Link to tracking Issue:**
#4335

**Testing:** Added relevant test cases.

---------

Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
VihasMakwana and evan-bradley authored Jan 17, 2024
1 parent 8356857 commit bb1ae64
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 17 deletions.
16 changes: 16 additions & 0 deletions .chloggen/handle-grpc-status-codes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Ensure OTLP receiver handles consume errors correctly

# One or more tracking issues or pull requests related to the change
issues: [4335]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Make sure OTLP receiver returns correct status code and follows the receiver contract (gRPC)
24 changes: 23 additions & 1 deletion receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package logs // import "go.opentelemetry.io/collector/receiver/otlpreceiver/inte
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
Expand Down Expand Up @@ -40,5 +44,23 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog
err := r.nextConsumer.ConsumeLogs(ctx, ld)
r.obsreport.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err)

return plogotlp.NewExportResponse(), err
// Use appropriate status codes for permanent/non-permanent errors
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
s, ok := status.FromError(err)
if !ok {
code := codes.Unavailable
if consumererror.IsPermanent(err) {
code = codes.InvalidArgument
}
s = status.New(code, err.Error())
}
return plogotlp.NewExportResponse(), s.Err()
}

return plogotlp.NewExportResponse(), nil
}
19 changes: 17 additions & 2 deletions receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
Expand Down Expand Up @@ -47,13 +50,25 @@ func TestExport_EmptyRequest(t *testing.T) {
assert.NotNil(t, resp, "The response is missing")
}

func TestExport_ErrorConsumer(t *testing.T) {
func TestExport_NonPermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateLogs(1)
req := plogotlp.NewExportRequestFromLogs(ld)

logClient := makeLogsServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, plogotlp.ExportResponse{}, resp)
}

func TestExport_PermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateLogs(1)
req := plogotlp.NewExportRequestFromLogs(ld)

logClient := makeLogsServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error"))))
resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, plogotlp.ExportResponse{}, resp)
}

Expand Down
24 changes: 23 additions & 1 deletion receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package metrics // import "go.opentelemetry.io/collector/receiver/otlpreceiver/i
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
Expand Down Expand Up @@ -40,5 +44,23 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
err := r.nextConsumer.ConsumeMetrics(ctx, md)
r.obsreport.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err)

return pmetricotlp.NewExportResponse(), err
// Use appropriate status codes for permanent/non-permanent errors
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
s, ok := status.FromError(err)
if !ok {
code := codes.Unavailable
if consumererror.IsPermanent(err) {
code = codes.InvalidArgument
}
s = status.New(code, err.Error())
}
return pmetricotlp.NewExportResponse(), s.Err()
}

return pmetricotlp.NewExportResponse(), nil
}
19 changes: 17 additions & 2 deletions receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
Expand Down Expand Up @@ -47,13 +50,25 @@ func TestExport_EmptyRequest(t *testing.T) {
require.NotNil(t, resp)
}

func TestExport_ErrorConsumer(t *testing.T) {
func TestExport_NonPermanentErrorConsumer(t *testing.T) {
md := testdata.GenerateMetrics(1)
req := pmetricotlp.NewExportRequestFromMetrics(md)

metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, pmetricotlp.ExportResponse{}, resp)
}

func TestExport_PermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateMetrics(1)
req := pmetricotlp.NewExportRequestFromMetrics(ld)

metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error"))))
resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, pmetricotlp.ExportResponse{}, resp)
}

Expand Down
24 changes: 23 additions & 1 deletion receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package trace // import "go.opentelemetry.io/collector/receiver/otlpreceiver/int
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
Expand Down Expand Up @@ -41,5 +45,23 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt
err := r.nextConsumer.ConsumeTraces(ctx, td)
r.obsreport.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err)

return ptraceotlp.NewExportResponse(), err
// Use appropriate status codes for permanent/non-permanent errors
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
s, ok := status.FromError(err)
if !ok {
code := codes.Unavailable
if consumererror.IsPermanent(err) {
code = codes.InvalidArgument
}
s = status.New(code, err.Error())
}
return ptraceotlp.NewExportResponse(), s.Err()
}

return ptraceotlp.NewExportResponse(), nil
}
18 changes: 16 additions & 2 deletions receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
Expand Down Expand Up @@ -45,13 +48,24 @@ func TestExport_EmptyRequest(t *testing.T) {
assert.NotNil(t, resp, "The response is missing")
}

func TestExport_ErrorConsumer(t *testing.T) {
func TestExport_NonPermanentErrorConsumer(t *testing.T) {
td := testdata.GenerateTraces(1)
req := ptraceotlp.NewExportRequestFromTraces(td)

traceClient := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := traceClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, ptraceotlp.ExportResponse{}, resp)
}
func TestExport_PermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateTraces(1)
req := ptraceotlp.NewExportRequestFromTraces(ld)

traceClient := makeTraceServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error"))))
resp, err := traceClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, ptraceotlp.ExportResponse{}, resp)
}

Expand Down
28 changes: 20 additions & 8 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/internal/testutil"
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestJsonHttp(t *testing.T) {
name: "GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.Internal, "").Err(),
err: status.New(codes.Unavailable, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand Down Expand Up @@ -122,7 +123,8 @@ func TestJsonHttp(t *testing.T) {
if s, ok := status.FromError(tt.err); ok {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
fmt.Println(errStatus)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -332,7 +334,7 @@ func TestProtoHttp(t *testing.T) {
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
err: status.New(codes.Unavailable, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand Down Expand Up @@ -366,7 +368,7 @@ func TestProtoHttp(t *testing.T) {
if s, ok := status.FromError(tt.err); ok {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -497,19 +499,25 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
permanent bool
expectedCode codes.Code
}

expectedReceivedBatches := 2
expectedIngestionBlockedRPCs := 1
expectedIngestionBlockedRPCs := 2
ingestionStates := []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.Unknown,
expectedCode: codes.Unavailable,
},
{
okToIngest: false,
expectedCode: codes.InvalidArgument,
permanent: true,
},
{
okToIngest: true,
Expand Down Expand Up @@ -541,7 +549,11 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
if ingestionState.okToIngest {
sink.SetConsumeError(nil)
} else {
sink.SetConsumeError(errors.New("consumer error"))
if ingestionState.permanent {
sink.SetConsumeError(consumererror.NewPermanent(errors.New("consumer error")))
} else {
sink.SetConsumeError(errors.New("consumer error"))
}
}

_, err = ptraceotlp.NewGRPCClient(cc).Export(context.Background(), ptraceotlp.NewExportRequestFromTraces(td))
Expand Down Expand Up @@ -576,7 +588,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
},
{
okToIngest: false,
expectedCode: codes.Unknown,
expectedCode: codes.Unavailable,
},
{
okToIngest: true,
Expand Down

0 comments on commit bb1ae64

Please sign in to comment.