Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add attributes to webhook events for better debugging #4206

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion courier/http_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *httpChannel) Dispatch(ctx context.Context, msg Message) (err error) {
ctx, span := c.d.Tracer(ctx).Tracer().Start(ctx, "courier.httpChannel.Dispatch")
defer otelx.End(span, &err)

builder, err := request.NewBuilder(ctx, c.requestConfig, c.d, nil)
builder, err := request.NewBuilder(ctx, c.requestConfig, c.d)
if err != nil {
return errors.WithStack(err)
}
Expand Down
6 changes: 5 additions & 1 deletion embedx/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@
"title": "Web-Hook Configuration",
"description": "Define what the hook should do",
"properties": {
"id": {
"type": "string",
"description": "The ID of the hook. Used to identify the hook in logs and errors. For debugging purposes only."
},
"response": {
"title": "Response Handling",
"description": "How the web hook should handle the response",
Expand Down Expand Up @@ -2274,7 +2278,7 @@
"id": {
"type": "string",
"title": "Channel id",
"description": "The channel id. Corresponds to the .via property of the identity schema for recovery, verification, etc. Currently only phone is supported.",
"description": "The channel id. Corresponds to the .via property of the identity schema for recovery, verification, etc. Currently only sms is supported.",
"maxLength": 32,
"enum": ["sms"]
},
Expand Down
30 changes: 24 additions & 6 deletions request/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,36 @@ type (
deps Dependencies
cache *ristretto.Cache[[]byte, []byte]
}
options struct {
cache *ristretto.Cache[[]byte, []byte]
}
BuilderOption = func(*options)
)

func NewBuilder(ctx context.Context, config json.RawMessage, deps Dependencies, jsonnetCache *ristretto.Cache[[]byte, []byte]) (_ *Builder, err error) {
func WithCache(cache *ristretto.Cache[[]byte, []byte]) BuilderOption {
return func(o *options) {
o.cache = cache
}
}

func NewBuilder(ctx context.Context, config json.RawMessage, deps Dependencies, o ...BuilderOption) (_ *Builder, err error) {
_, span := deps.Tracer(ctx).Tracer().Start(ctx, "request.NewBuilder")
defer otelx.End(span, &err)

c, err := parseConfig(config)
if err != nil {
var opts options
for _, f := range o {
f(&opts)
}

c := Config{}
if err := json.Unmarshal(config, &c); err != nil {
return nil, err
}

span.SetAttributes(attribute.String("url", c.URL), attribute.String("method", c.Method))
span.SetAttributes(
attribute.String("url", c.URL),
attribute.String("method", c.Method),
)

r, err := retryablehttp.NewRequest(c.Method, c.URL, nil)
if err != nil {
Expand All @@ -66,9 +84,9 @@ func NewBuilder(ctx context.Context, config json.RawMessage, deps Dependencies,

return &Builder{
r: r,
Config: c,
Config: &c,
deps: deps,
cache: jsonnetCache,
cache: opts.cache,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions request/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestBuildRequest(t *testing.T) {
} {
t.Run(
"request-type="+tc.name, func(t *testing.T) {
rb, err := NewBuilder(context.Background(), json.RawMessage(tc.rawConfig), newTestDependencyProvider(t), nil)
rb, err := NewBuilder(context.Background(), json.RawMessage(tc.rawConfig), newTestDependencyProvider(t))
require.NoError(t, err)

assert.Equal(t, tc.bodyTemplateURI, rb.Config.TemplateURI)
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestBuildRequest(t *testing.T) {
"method": "POST",
"body": "file://./stub/cancel_body.jsonnet"
}`,
), newTestDependencyProvider(t), nil)
), newTestDependencyProvider(t))
require.NoError(t, err)

_, err = rb.BuildRequest(context.Background(), json.RawMessage(`{}`))
Expand Down
42 changes: 15 additions & 27 deletions request/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,36 @@ type (
}

Config struct {
Method string `json:"method"`
URL string `json:"url"`
TemplateURI string `json:"body"`
Header http.Header `json:"headers"`
Auth Auth `json:"auth,omitempty"`
}
)

func parseConfig(r json.RawMessage) (*Config, error) {
type rawConfig struct {
Method string `json:"method"`
URL string `json:"url"`
TemplateURI string `json:"body"`
Header json.RawMessage `json:"headers"`
Auth Auth `json:"auth,omitempty"`
Header http.Header `json:"-"`
RawHeader json.RawMessage `json:"headers"`
Auth Auth `json:"auth"`
}
)

var rc rawConfig
err := json.Unmarshal(r, &rc)
func (c *Config) UnmarshalJSON(raw []byte) error {
type Alias Config
var a Alias
err := json.Unmarshal(raw, &a)
if err != nil {
return nil, err
return err
}

rawHeader := gjson.ParseBytes(rc.Header).Map()
hdr := http.Header{}
rawHeader := gjson.ParseBytes(a.RawHeader).Map()
a.Header = make(http.Header, len(rawHeader))

_, ok := rawHeader["Content-Type"]
if !ok {
hdr.Set("Content-Type", ContentTypeJSON)
a.Header.Set("Content-Type", ContentTypeJSON)
}

for key, value := range rawHeader {
hdr.Set(key, value.String())
a.Header.Set(key, value.String())
}

c := Config{
Method: rc.Method,
URL: rc.URL,
TemplateURI: rc.TemplateURI,
Header: hdr,
Auth: rc.Auth,
}
*c = Config(a)

return &c, nil
return nil
}
5 changes: 3 additions & 2 deletions selfservice/hook/password_migration_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ory/herodot"
"github.com/ory/kratos/request"
"github.com/ory/kratos/schema"
"github.com/ory/kratos/x"
"github.com/ory/x/otelx"
)

Expand Down Expand Up @@ -52,9 +53,9 @@ func (p *PasswordMigration) Execute(ctx context.Context, data *PasswordMigration
defer otelx.End(span, &err)

if emitEvent {
instrumentHTTPClientForEvents(ctx, httpClient)
instrumentHTTPClientForEvents(ctx, httpClient, x.NewUUID(), "password_migration_hook")
}
builder, err := request.NewBuilder(ctx, p.conf, p.deps, nil)
builder, err := request.NewBuilder(ctx, p.conf, p.deps)
if err != nil {
return errors.WithStack(err)
}
Expand Down
20 changes: 12 additions & 8 deletions selfservice/hook/web_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
canInterrupt = gjson.GetBytes(e.conf, "can_interrupt").Bool()
parseResponse = gjson.GetBytes(e.conf, "response.parse").Bool()
emitEvent = gjson.GetBytes(e.conf, "emit_analytics_event").Bool() || !gjson.GetBytes(e.conf, "emit_analytics_event").Exists() // default true
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
webhookID = gjson.GetBytes(e.conf, "id").Str
// The trigger ID is a random ID. It can be used to correlate webhook requests across retries.
triggerID = x.NewUUID()
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
)
if ignoreResponse && (parseResponse || canInterrupt) {
return errors.WithStack(herodot.ErrInternalServerError.WithReasonf("A webhook is configured to ignore the response but also to parse the response. This is not possible."))
Expand All @@ -318,7 +321,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
defer otelx.End(span, &finalErr)

if emitEvent {
instrumentHTTPClientForEvents(ctx, httpClient)
instrumentHTTPClientForEvents(ctx, httpClient, triggerID, webhookID)
}

defer func(startTime time.Time) {
Expand All @@ -329,7 +332,7 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
}).WithField("duration", time.Since(startTime))
if finalErr != nil {
if emitEvent && !errors.Is(finalErr, context.Canceled) {
span.AddEvent(events.NewWebhookFailed(ctx, finalErr))
span.AddEvent(events.NewWebhookFailed(ctx, finalErr, triggerID, webhookID))
}
if ignoreResponse {
logger.WithError(finalErr).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored")
Expand All @@ -339,12 +342,12 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
} else {
logger.Info("Webhook request succeeded")
if emitEvent {
span.AddEvent(events.NewWebhookSucceeded(ctx))
span.AddEvent(events.NewWebhookSucceeded(ctx, triggerID, webhookID))
}
}
}(time.Now())

builder, err := request.NewBuilder(ctx, e.conf, e.deps, jsonnetCache)
builder, err := request.NewBuilder(ctx, e.conf, e.deps, request.WithCache(jsonnetCache))
if err != nil {
return err
}
Expand Down Expand Up @@ -551,7 +554,7 @@ func isTimeoutError(err error) bool {
return errors.As(err, &te) && te.Timeout() || errors.Is(err, context.DeadlineExceeded)
}

func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehttp.Client) {
func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehttp.Client, triggerID uuid.UUID, webhookID string) {
// TODO(@alnr): improve this implementation to redact sensitive data
var (
attempt = 0
Expand All @@ -560,8 +563,9 @@ func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehtt
)
httpClient.RequestLogHook = func(_ retryablehttp.Logger, req *http.Request, retryNumber int) {
attempt = retryNumber + 1
requestID = uuid.Must(uuid.NewV4())
requestID = x.NewUUID()
req.Header.Set("Ory-Webhook-Request-ID", requestID.String())
req.Header.Set("Ory-Webhook-Trigger-ID", triggerID.String())
// TODO(@alnr): redact sensitive data
// reqBody, _ = httputil.DumpRequestOut(req, true)
reqBody = []byte("<redacted>")
Expand All @@ -572,6 +576,6 @@ func instrumentHTTPClientForEvents(ctx context.Context, httpClient *retryablehtt
// resBody = resBody[:min(len(resBody), 2<<10)] // truncate response body to 2 kB for event
// TODO(@alnr): redact sensitive data
resBody := []byte("<redacted>")
trace.SpanFromContext(ctx).AddEvent(events.NewWebhookDelivered(ctx, res.Request.URL, reqBody, res.StatusCode, resBody, attempt, requestID))
trace.SpanFromContext(ctx).AddEvent(events.NewWebhookDelivered(ctx, res.Request.URL, reqBody, res.StatusCode, resBody, attempt, requestID, triggerID, webhookID))
}
}
Loading
Loading