diff --git a/cmd/relayproxy/controller/collect_eval_data.go b/cmd/relayproxy/controller/collect_eval_data.go index d2117b9cf52..f09c75485f8 100644 --- a/cmd/relayproxy/controller/collect_eval_data.go +++ b/cmd/relayproxy/controller/collect_eval_data.go @@ -16,6 +16,7 @@ import ( "github.com/thomaspoignant/go-feature-flag/exporter" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.uber.org/zap" ) @@ -54,6 +55,12 @@ func NewCollectEvalData( // @Failure 500 {object} modeldocs.HTTPErrorDoc "Internal server error" // @Router /v1/data/collector [post] func (h *collectEvalData) Handler(c echo.Context) error { + ctx := c.Request().Context() + + tracer := otel.Tracer(config.OtelTracerName) + ctx, span := tracer.Start(ctx, "collectEventData") + defer span.End() + reqBody := new(model.CollectEvalDataRequest) if err := c.Bind(reqBody); err != nil { return echo.NewHTTPError( @@ -63,9 +70,7 @@ func (h *collectEvalData) Handler(c echo.Context) error { if reqBody.Events == nil { return echo.NewHTTPError(http.StatusBadRequest, "collectEvalData: invalid input data") } - tracer := otel.GetTracerProvider().Tracer(config.OtelTracerName) - _, span := tracer.Start(c.Request().Context(), "collectEventData") - defer span.End() + span.SetAttributes(attribute.Int("collectEventData.eventCollectionSize", len(reqBody.Events))) flagset, httpErr := helper.GetFlagSet(h.flagsetManager, helper.GetAPIKey(c)) @@ -75,7 +80,26 @@ func (h *collectEvalData) Handler(c echo.Context) error { counterTracking := 0 counterEvaluation := 0 - for _, event := range reqBody.Events { + for i, event := range reqBody.Events { + // Check if context is cancelled before processing each event, to avoid + // long delays on large payloads. + select { + case <-ctx.Done(): + err := fmt.Errorf("context cancelled after processing %d/%d events: %w", + i, len(reqBody.Events), ctx.Err()) + span.SetAttributes( + attribute.Int("processed_tracking", counterTracking), + attribute.Int("processed_evaluation", counterEvaluation), + attribute.Int("total_events", len(reqBody.Events)), + ) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + default: + // all good, keep going + } + switch event["kind"] { case "tracking": e, err := convertTrackingEvent(event, h.logger) @@ -98,11 +122,13 @@ func (h *collectEvalData) Handler(c echo.Context) error { counterEvaluation++ } } + span.SetAttributes(attribute.Int("collectEventData.trackingCollectionSize", counterTracking)) span.SetAttributes( attribute.Int("collectEventData.evaluationCollectionSize", counterEvaluation), ) h.metrics.IncCollectEvalData(float64(len(reqBody.Events))) + return c.JSON(http.StatusOK, model.CollectEvalDataResponse{ IngestedContentCount: len(reqBody.Events), }) diff --git a/cmd/relayproxy/controller/collect_eval_data_test.go b/cmd/relayproxy/controller/collect_eval_data_test.go index c802ae55ba7..bafd94669d7 100644 --- a/cmd/relayproxy/controller/collect_eval_data_test.go +++ b/cmd/relayproxy/controller/collect_eval_data_test.go @@ -1,6 +1,7 @@ package controller_test import ( + "context" "io" "net/http" "net/http/httptest" @@ -322,6 +323,47 @@ func Test_collect_eval_data_Handler(t *testing.T) { } } +func TestCollectEvalData_Handler_cancellation(t *testing.T) { + flagsetManager, err := service.NewFlagsetManager(&config.Config{ + CommonFlagSet: config.CommonFlagSet{ + PollingInterval: 10, + Retrievers: &[]retrieverconf.RetrieverConf{{Kind: "file", Path: configFlagsLocation}}, + }, + }, zap.NewNop(), []notifier.Notifier{}) + require.NoError(t, err) + + t.Cleanup(func() { flagsetManager.Close() }) + + ctrl := controller.NewCollectEvalData(flagsetManager, metric.Metrics{}, zap.NewNop()) + + // large payload with 20,000 events to ensure processing takes time + event := `{"kind":"feature","contextKind":"user","userKey":"u","creationDate":1680246000,"key":"f","variation":"v","value":"true","default":false,"version":"1","source":"PROVIDER_CACHE"}` + body := `{"events":[` + strings.Repeat(event+",", 19999) + event + `]}` + + ctx, cancel := context.WithCancel(t.Context()) + req := httptest.NewRequestWithContext(ctx, http.MethodPost, "/v1/data/collector", strings.NewReader(body)) + req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON) + + e := echo.New() + c := e.NewContext(req, httptest.NewRecorder()) + + // run handler in background, cancel after 10ms + done := make(chan error, 1) + go func() { + done <- ctrl.Handler(c) + }() + + time.Sleep(10 * time.Millisecond) + cancel() + + err = <-done + + require.ErrorIs(t, err, context.Canceled) + assert.Contains(t, err.Error(), "context cancelled after processing") + // count will be less than the total if cancellation worked + assert.NotContains(t, err.Error(), "20000/20000") +} + func Test_collect_tracking_and_evaluation_events(t *testing.T) { tests := []struct { name string