Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions cmd/relayproxy/controller/collect_eval_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/thomaspoignant/go-feature-flag/exporter"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -54,6 +55,12 @@ func NewCollectEvalData(
// @Failure 500 {object} modeldocs.HTTPErrorDoc "Internal server error"
// @Router /v1/data/collector [post]
func (h *collectEvalData) Handler(c echo.Context) error {
ctx := c.Request().Context()

tracer := otel.Tracer(config.OtelTracerName)
ctx, span := tracer.Start(ctx, "collectEventData")
defer span.End()

reqBody := new(model.CollectEvalDataRequest)
if err := c.Bind(reqBody); err != nil {
return echo.NewHTTPError(
Expand All @@ -63,9 +70,7 @@ func (h *collectEvalData) Handler(c echo.Context) error {
if reqBody.Events == nil {
return echo.NewHTTPError(http.StatusBadRequest, "collectEvalData: invalid input data")
}
tracer := otel.GetTracerProvider().Tracer(config.OtelTracerName)
_, span := tracer.Start(c.Request().Context(), "collectEventData")
defer span.End()

span.SetAttributes(attribute.Int("collectEventData.eventCollectionSize", len(reqBody.Events)))

flagset, httpErr := helper.GetFlagSet(h.flagsetManager, helper.GetAPIKey(c))
Expand All @@ -75,7 +80,26 @@ func (h *collectEvalData) Handler(c echo.Context) error {

counterTracking := 0
counterEvaluation := 0
for _, event := range reqBody.Events {
for i, event := range reqBody.Events {
// Check if context is cancelled before processing each event, to avoid
// long delays on large payloads.
select {
case <-ctx.Done():
err := fmt.Errorf("context cancelled after processing %d/%d events: %w",
i, len(reqBody.Events), ctx.Err())
span.SetAttributes(
attribute.Int("processed_tracking", counterTracking),
attribute.Int("processed_evaluation", counterEvaluation),
attribute.Int("total_events", len(reqBody.Events)),
)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
default:
// all good, keep going
}

switch event["kind"] {
case "tracking":
e, err := convertTrackingEvent(event, h.logger)
Expand All @@ -98,11 +122,13 @@ func (h *collectEvalData) Handler(c echo.Context) error {
counterEvaluation++
}
}

span.SetAttributes(attribute.Int("collectEventData.trackingCollectionSize", counterTracking))
span.SetAttributes(
attribute.Int("collectEventData.evaluationCollectionSize", counterEvaluation),
)
h.metrics.IncCollectEvalData(float64(len(reqBody.Events)))

return c.JSON(http.StatusOK, model.CollectEvalDataResponse{
IngestedContentCount: len(reqBody.Events),
})
Expand Down
42 changes: 42 additions & 0 deletions cmd/relayproxy/controller/collect_eval_data_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller_test

import (
"context"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -322,6 +323,47 @@ func Test_collect_eval_data_Handler(t *testing.T) {
}
}

func TestCollectEvalData_Handler_cancellation(t *testing.T) {
flagsetManager, err := service.NewFlagsetManager(&config.Config{
CommonFlagSet: config.CommonFlagSet{
PollingInterval: 10,
Retrievers: &[]retrieverconf.RetrieverConf{{Kind: "file", Path: configFlagsLocation}},
},
}, zap.NewNop(), []notifier.Notifier{})
require.NoError(t, err)

t.Cleanup(func() { flagsetManager.Close() })

ctrl := controller.NewCollectEvalData(flagsetManager, metric.Metrics{}, zap.NewNop())

// large payload with 20,000 events to ensure processing takes time
event := `{"kind":"feature","contextKind":"user","userKey":"u","creationDate":1680246000,"key":"f","variation":"v","value":"true","default":false,"version":"1","source":"PROVIDER_CACHE"}`
body := `{"events":[` + strings.Repeat(event+",", 19999) + event + `]}`

ctx, cancel := context.WithCancel(t.Context())
req := httptest.NewRequestWithContext(ctx, http.MethodPost, "/v1/data/collector", strings.NewReader(body))
req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)

e := echo.New()
c := e.NewContext(req, httptest.NewRecorder())

// run handler in background, cancel after 10ms
done := make(chan error, 1)
go func() {
done <- ctrl.Handler(c)
}()

time.Sleep(10 * time.Millisecond)
cancel()

err = <-done

require.ErrorIs(t, err, context.Canceled)
assert.Contains(t, err.Error(), "context cancelled after processing")
// count will be less than the total if cancellation worked
assert.NotContains(t, err.Error(), "20000/20000")
}

func Test_collect_tracking_and_evaluation_events(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading