Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OTLP/GRPC] Ensure OTLP receiver handles consume errors correctly #8080

Merged
merged 17 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/handle-grpc-status-codes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Ensure OTLP receiver handles consume errors correctly

# One or more tracking issues or pull requests related to the change
issues: [4335]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Make sure OTLP receiver returns correct status code and follows the receiver contract (gRPC)
24 changes: 23 additions & 1 deletion receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package logs // import "go.opentelemetry.io/collector/receiver/otlpreceiver/inte
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
Expand Down Expand Up @@ -40,5 +44,23 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog
err := r.nextConsumer.ConsumeLogs(ctx, ld)
r.obsreport.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err)

return plogotlp.NewExportResponse(), err
// Use appropriate status codes for permanent/non-permanent errors
atoulme marked this conversation as resolved.
Show resolved Hide resolved
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
s, ok := status.FromError(err)
if !ok {
code := codes.Unavailable
if consumererror.IsPermanent(err) {
code = codes.InvalidArgument
}
s = status.New(code, err.Error())
}
return plogotlp.NewExportResponse(), s.Err()
}

return plogotlp.NewExportResponse(), nil
}
19 changes: 17 additions & 2 deletions receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
Expand Down Expand Up @@ -47,13 +50,25 @@ func TestExport_EmptyRequest(t *testing.T) {
assert.NotNil(t, resp, "The response is missing")
}

func TestExport_ErrorConsumer(t *testing.T) {
func TestExport_NonPermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateLogs(1)
req := plogotlp.NewExportRequestFromLogs(ld)

logClient := makeLogsServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error")
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, plogotlp.ExportResponse{}, resp)
}

func TestExport_PermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateLogs(1)
req := plogotlp.NewExportRequestFromLogs(ld)

logClient := makeLogsServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error"))))
resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, plogotlp.ExportResponse{}, resp)
}

Expand Down
24 changes: 23 additions & 1 deletion receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package metrics // import "go.opentelemetry.io/collector/receiver/otlpreceiver/i
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
Expand Down Expand Up @@ -40,5 +44,23 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
err := r.nextConsumer.ConsumeMetrics(ctx, md)
r.obsreport.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err)

return pmetricotlp.NewExportResponse(), err
// Use appropriate status codes for permanent/non-permanent errors
atoulme marked this conversation as resolved.
Show resolved Hide resolved
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
s, ok := status.FromError(err)
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
code := codes.Unavailable
if consumererror.IsPermanent(err) {
code = codes.InvalidArgument
}
s = status.New(code, err.Error())
}
return pmetricotlp.NewExportResponse(), s.Err()
}

return pmetricotlp.NewExportResponse(), nil
}
19 changes: 17 additions & 2 deletions receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
Expand Down Expand Up @@ -47,13 +50,25 @@ func TestExport_EmptyRequest(t *testing.T) {
require.NotNil(t, resp)
}

func TestExport_ErrorConsumer(t *testing.T) {
func TestExport_NonPermanentErrorConsumer(t *testing.T) {
md := testdata.GenerateMetrics(1)
req := pmetricotlp.NewExportRequestFromMetrics(md)

metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, pmetricotlp.ExportResponse{}, resp)
}

func TestExport_PermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateMetrics(1)
req := pmetricotlp.NewExportRequestFromMetrics(ld)

metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error"))))
resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, pmetricotlp.ExportResponse{}, resp)
}

Expand Down
24 changes: 23 additions & 1 deletion receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package trace // import "go.opentelemetry.io/collector/receiver/otlpreceiver/int
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
Expand Down Expand Up @@ -41,5 +45,23 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt
err := r.nextConsumer.ConsumeTraces(ctx, td)
r.obsreport.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err)

return ptraceotlp.NewExportResponse(), err
// Use appropriate status codes for permanent/non-permanent errors
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
s, ok := status.FromError(err)
if !ok {
code := codes.Unavailable
if consumererror.IsPermanent(err) {
code = codes.InvalidArgument
}
s = status.New(code, err.Error())
}
Comment on lines +55 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this error handling is the same for all signals I do wonder if it could be extracted and re-used, but that is an optimization we could handle later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #9300 for this

return ptraceotlp.NewExportResponse(), s.Err()
}

return ptraceotlp.NewExportResponse(), nil
}
18 changes: 16 additions & 2 deletions receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
Expand Down Expand Up @@ -45,13 +48,24 @@ func TestExport_EmptyRequest(t *testing.T) {
assert.NotNil(t, resp, "The response is missing")
}

func TestExport_ErrorConsumer(t *testing.T) {
func TestExport_NonPermanentErrorConsumer(t *testing.T) {
td := testdata.GenerateTraces(1)
req := ptraceotlp.NewExportRequestFromTraces(td)

traceClient := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error")))
resp, err := traceClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.EqualError(t, err, "rpc error: code = Unavailable desc = my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, ptraceotlp.ExportResponse{}, resp)
}
func TestExport_PermanentErrorConsumer(t *testing.T) {
ld := testdata.GenerateTraces(1)
req := ptraceotlp.NewExportRequestFromTraces(ld)

traceClient := makeTraceServiceClient(t, consumertest.NewErr(consumererror.NewPermanent(errors.New("my error"))))
resp, err := traceClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = Permanent error: my error")
assert.IsType(t, status.Error(codes.Unknown, ""), err)
assert.Equal(t, ptraceotlp.ExportResponse{}, resp)
}

Expand Down
28 changes: 20 additions & 8 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/internal/testutil"
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestJsonHttp(t *testing.T) {
name: "GRPCError",
encoding: "",
contentType: "application/json",
err: status.New(codes.Internal, "").Err(),
err: status.New(codes.Unavailable, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand Down Expand Up @@ -123,7 +124,8 @@ func TestJsonHttp(t *testing.T) {
if s, ok := status.FromError(tt.err); ok {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
fmt.Println(errStatus)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -333,7 +335,7 @@ func TestProtoHttp(t *testing.T) {
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
err: status.New(codes.Unavailable, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)
Expand Down Expand Up @@ -367,7 +369,7 @@ func TestProtoHttp(t *testing.T) {
if s, ok := status.FromError(tt.err); ok {
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unavailable), Message: "my error"}))
}
sink.checkData(t, dr.data, 0)
}
Expand Down Expand Up @@ -498,19 +500,25 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
type ingestionStateTest struct {
okToIngest bool
permanent bool
expectedCode codes.Code
}

expectedReceivedBatches := 2
expectedIngestionBlockedRPCs := 1
expectedIngestionBlockedRPCs := 2
ingestionStates := []ingestionStateTest{
{
okToIngest: true,
expectedCode: codes.OK,
},
{
okToIngest: false,
expectedCode: codes.Unknown,
expectedCode: codes.Unavailable,
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
},
{
okToIngest: false,
expectedCode: codes.InvalidArgument,
permanent: true,
},
{
okToIngest: true,
Expand Down Expand Up @@ -542,7 +550,11 @@ func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) {
if ingestionState.okToIngest {
sink.SetConsumeError(nil)
} else {
sink.SetConsumeError(errors.New("consumer error"))
if ingestionState.permanent {
sink.SetConsumeError(consumererror.NewPermanent(errors.New("consumer error")))
} else {
sink.SetConsumeError(errors.New("consumer error"))
}
}

_, err = ptraceotlp.NewGRPCClient(cc).Export(context.Background(), ptraceotlp.NewExportRequestFromTraces(td))
Expand Down Expand Up @@ -577,7 +589,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {
},
{
okToIngest: false,
expectedCode: codes.Unknown,
expectedCode: codes.Unavailable,
},
{
okToIngest: true,
Expand Down
Loading