Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing message to record malformed event #965

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion observability/opencensus/v2/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (n fakeObservabilityServiceWithError) InboundContextDecorators() []func(con
return nil
}

func (n fakeObservabilityServiceWithError) RecordReceivedMalformedEvent(ctx context.Context, err error) {
func (n fakeObservabilityServiceWithError) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
}

func (n fakeObservabilityServiceWithError) RecordCallingInvoker(ctx context.Context, event *event.Event) (context.Context, func(errOrResult error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (o opencensusObservabilityService) InboundContextDecorators() []func(contex
return []func(context.Context, binding.Message) context.Context{tracePropagatorContextDecorator}
}

func (o opencensusObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {
func (o opencensusObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
ctx, r := NewReporter(ctx, reportReceive)
r.Error()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (o OTelObservabilityService) InboundContextDecorators() []func(context.Cont
}

// RecordReceivedMalformedEvent records the error from a malformed event in the span.
func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {
func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
spanName := observability.ClientSpanName + ".malformed receive"
_, span := o.tracer.Start(
ctx, spanName,
Expand Down
1 change: 1 addition & 0 deletions test/observability/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
Expand Down
1 change: 1 addition & 0 deletions test/observability/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cloudevents/sdk-go/v2/extensions"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/test"
)

var (
Expand Down Expand Up @@ -385,7 +386,7 @@ func TestRecordReceivedMalformedEvent(t *testing.T) {
os := otelObs.NewOTelObservabilityService()

// act
os.RecordReceivedMalformedEvent(ctx, tc.expectedResult)
os.RecordReceivedMalformedEvent(ctx, test.FullMessage(), tc.expectedResult)

spans := sr.Ended()

Expand Down
4 changes: 2 additions & 2 deletions v2/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
e, eventErr := binding.ToEvent(ctx, m)
switch {
case eventErr != nil && r.fn.hasEventIn:
r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
r.observabilityService.RecordReceivedMalformedEvent(ctx, m, eventErr)
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr))
case r.fn != nil:
// Check if event is valid before invoking the receiver function
if e != nil {
if validationErr := e.Validate(); validationErr != nil {
r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)
r.observabilityService.RecordReceivedMalformedEvent(ctx, m, validationErr)
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))
}
}
Expand Down
5 changes: 3 additions & 2 deletions v2/client/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ObservabilityService interface {
InboundContextDecorators() []func(context.Context, binding.Message) context.Context

// RecordReceivedMalformedEvent is invoked when an event was received but it's malformed or invalid.
RecordReceivedMalformedEvent(ctx context.Context, err error)
RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error)
// RecordCallingInvoker is invoked before the user function is invoked.
// The returned callback will be invoked after the user finishes to process the event with the eventual processing error
// The error provided to the callback could be both a processing error, or a result
Expand All @@ -39,7 +39,8 @@ func (n noopObservabilityService) InboundContextDecorators() []func(context.Cont
return nil
}

func (n noopObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {}
func (n noopObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
}

func (n noopObservabilityService) RecordCallingInvoker(ctx context.Context, event *event.Event) (context.Context, func(errOrResult error)) {
return ctx, func(errOrResult error) {}
Expand Down