From 7dc7747e252214c0e0f2a39381ee96c75b4024c7 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Wed, 3 Jan 2024 19:25:38 +0530 Subject: [PATCH] feat: tracing support for gateway, processor and router (#4248) --- .github/workflows/tests.yaml | 3 +- gateway/gateway_test.go | 11 +- gateway/handle.go | 23 +- gateway/handle_http.go | 21 +- gateway/handle_lifecycle.go | 1 + gateway/types.go | 1 + integration_test/tracing/tracing_test.go | 712 ++++++++++++++++++++ processor/processor.go | 123 +++- processor/processor_test.go | 40 +- processor/transformer/transformer.go | 1 + processor/worker.go | 58 +- processor/worker_handle.go | 2 + processor/worker_handle_adapter.go | 5 + processor/worker_test.go | 4 + router/handle.go | 24 + router/handle_lifecycle.go | 1 + router/types/types.go | 1 + router/utils/utils.go | 1 + router/worker.go | 78 +++ testhelper/transformertest/builder.go | 85 ++- testhelper/transformertest/handler_funcs.go | 25 +- utils/types/types.go | 1 + 22 files changed, 1156 insertions(+), 65 deletions(-) create mode 100644 integration_test/tracing/tracing_test.go diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 2b8034b506..1be300e957 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -119,6 +119,7 @@ jobs: - integration_test/reporting_dropped_events - integration_test/reporting_error_index - integration_test/warehouse + - integration_test/tracing - processor - regulation-worker - router @@ -129,7 +130,7 @@ jobs: include: - package: services exclude: services/rsources - + steps: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 75a25a6721..aecb9a4f62 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -546,10 +546,13 @@ var _ = Describe("Gateway", func() { var paramsMap, expectedParamsMap map[string]interface{} _ = json.Unmarshal(job.Parameters, ¶msMap) - expectedStr := []byte(fmt.Sprintf(`{"source_id": "%v", "source_job_run_id": "", "source_task_run_id": ""}`, SourceIDEnabled)) + expectedStr := []byte(fmt.Sprintf( + `{"source_id": "%v", "source_job_run_id": "", "source_task_run_id": "", "traceparent": ""}`, + SourceIDEnabled, + )) _ = json.Unmarshal(expectedStr, &expectedParamsMap) equals := reflect.DeepEqual(paramsMap, expectedParamsMap) - Expect(equals).To(Equal(true)) + Expect(equals).To(BeTrue()) Expect(job.CustomVal).To(Equal(customVal)) @@ -1569,7 +1572,7 @@ var _ = Describe("Gateway", func() { jobs []*jobsdb.JobT, ) error { for idx, job := range jobs { - Expect(misc.IsValidUUID(job.UUID.String())).To(Equal(true)) + Expect(misc.IsValidUUID(job.UUID.String())).To(BeTrue()) Expect(job.CustomVal).To(Equal("WEBHOOK")) var paramsMap, expectedParamsMap map[string]interface{} @@ -1587,7 +1590,7 @@ var _ = Describe("Gateway", func() { _ = json.Unmarshal(job.Parameters, ¶msMap) _ = json.Unmarshal(expectedStr, &expectedParamsMap) equals := reflect.DeepEqual(paramsMap, expectedParamsMap) - Expect(equals).To(Equal(true)) + Expect(equals).To(BeTrue()) } return nil }). diff --git a/gateway/handle.go b/gateway/handle.go index 9bc30f3cdb..2c0e44146a 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -41,6 +41,7 @@ type Handle struct { config *config.Config logger logger.Logger stats stats.Stats + tracer stats.Tracer application app.App backendConfig backendconfig.BackendConfig jobsDB jobsdb.JobsDB @@ -271,6 +272,9 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq, // values retrieved from first event in batch sourcesJobRunID, sourcesTaskRunID = req.authContext.SourceJobRunID, req.authContext.SourceTaskRunID + + // tracing + traceParent = req.traceParent ) fillMessageID := func(event map[string]interface{}) { @@ -423,10 +427,11 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq, return } - params := map[string]interface{}{ + params := map[string]any{ "source_id": sourceID, "source_job_run_id": sourcesJobRunID, "source_task_run_id": sourcesTaskRunID, + "traceparent": traceParent, } marshalledParams, err = json.Marshal(params) if err != nil { @@ -588,6 +593,20 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do } userWebRequestWorker := gw.findUserWebRequestWorker(workerKey) ipAddr := misc.GetIPFromReq(req) - webReq := webRequestT{done: done, reqType: reqType, requestPayload: requestPayload, authContext: arctx, ipAddr: ipAddr, userIDHeader: userIDHeader} + + traceParent := stats.GetTraceParentFromContext(req.Context()) + if traceParent == "" { + gw.logger.Debugw("traceParent not found in request") + } + + webReq := webRequestT{ + done: done, + reqType: reqType, + requestPayload: requestPayload, + authContext: arctx, + traceParent: traceParent, + ipAddr: ipAddr, + userIDHeader: userIDHeader, + } userWebRequestWorker.webRequestQ <- &webReq } diff --git a/gateway/handle_http.go b/gateway/handle_http.go index 7c7eea0b6c..8050833d12 100644 --- a/gateway/handle_http.go +++ b/gateway/handle_http.go @@ -3,7 +3,9 @@ package gateway import ( "context" "net/http" + "time" + "github.com/rudderlabs/rudder-go-kit/stats" gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" "github.com/rudderlabs/rudder-server/gateway/response" "github.com/rudderlabs/rudder-server/utils/misc" @@ -74,12 +76,27 @@ func (gw *Handle) webHandler() http.HandlerFunc { // webRequestHandler - handles web requests containing rudder events as payload. // It parses the payload and calls the request handler to process the request. func (gw *Handle) webRequestHandler(rh RequestHandler, w http.ResponseWriter, r *http.Request) { - reqType := r.Context().Value(gwtypes.CtxParamCallType).(string) - arctx := r.Context().Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext) + ctx := r.Context() + reqType := ctx.Value(gwtypes.CtxParamCallType).(string) + arctx := ctx.Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext) + + ctx, span := gw.tracer.Start(ctx, "gw.webRequestHandler", stats.SpanKindServer, + stats.SpanWithTimestamp(time.Now()), + stats.SpanWithTags(stats.Tags{ + "reqType": reqType, + "path": r.URL.Path, + "workspaceId": arctx.WorkspaceID, + "sourceId": arctx.SourceID, + }), + ) + r = r.WithContext(ctx) + gw.logger.LogRequest(r) var errorMessage string defer func() { + defer span.End() if errorMessage != "" { + span.SetStatus(stats.SpanStatusError, errorMessage) status := response.GetErrorStatusCode(errorMessage) responseBody := response.GetStatus(errorMessage) gw.logger.Infow("response", diff --git a/gateway/handle_lifecycle.go b/gateway/handle_lifecycle.go index 8a9c123bdd..6e76cf7bc8 100644 --- a/gateway/handle_lifecycle.go +++ b/gateway/handle_lifecycle.go @@ -56,6 +56,7 @@ func (gw *Handle) Setup( gw.config = config gw.logger = logger gw.stats = stat + gw.tracer = stat.NewTracer("gateway") gw.application = application gw.backendConfig = backendConfig gw.jobsDB = jobsDB diff --git a/gateway/types.go b/gateway/types.go index eb920a0894..14a1a1f96a 100644 --- a/gateway/types.go +++ b/gateway/types.go @@ -23,6 +23,7 @@ type webRequestT struct { reqType string requestPayload []byte authContext *gwtypes.AuthRequestContext + traceParent string ipAddr string userIDHeader string errors []string diff --git a/integration_test/tracing/tracing_test.go b/integration_test/tracing/tracing_test.go new file mode 100644 index 0000000000..fabde06de0 --- /dev/null +++ b/integration_test/tracing/tracing_test.go @@ -0,0 +1,712 @@ +package tracing + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path" + "slices" + "strconv" + "testing" + "time" + + "github.com/rudderlabs/rudder-server/gateway/response" + + "go.opentelemetry.io/otel" + + "github.com/rudderlabs/rudder-server/processor/transformer" + + "github.com/rudderlabs/rudder-go-kit/stats/testhelper/tracemodel" + "github.com/rudderlabs/rudder-go-kit/testhelper/assert" + "github.com/rudderlabs/rudder-server/app" + "github.com/rudderlabs/rudder-server/testhelper/transformertest" + + "github.com/samber/lo" + + "github.com/rudderlabs/rudder-server/jobsdb" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + _ "github.com/marcboeker/go-duckdb" + + "github.com/rudderlabs/rudder-go-kit/config" + kithttputil "github.com/rudderlabs/rudder-go-kit/httputil" + kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" + "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" + "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + "github.com/rudderlabs/rudder-server/runner" + "github.com/rudderlabs/rudder-server/testhelper/backendconfigtest" + "github.com/rudderlabs/rudder-server/testhelper/health" +) + +type testConfig struct { + zipkinURL string + zipkinTracesURL string + postgresResource *resource.PostgresResource + gwPort int + prometheusPort int +} + +func TestTracing(t *testing.T) { + t.Run("gateway-processor-router tracing", func(t *testing.T) { + tc := setup(t) + + bcServer := backendconfigtest.NewBuilder(). + WithWorkspaceConfig( + backendconfigtest.NewConfigBuilder(). + WithSource( + backendconfigtest.NewSourceBuilder(). + WithID("source-1"). + WithWriteKey("writekey-1"). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-1"). + Build()). + Build()). + Build()). + Build() + defer bcServer.Close() + + trServer := transformertest.NewBuilder().Build() + defer trServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + err := runRudderServer(t, ctx, tc.gwPort, tc.prometheusPort, tc.postgresResource, tc.zipkinURL, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 12 + + err := sendEvents(eventsCount, "identify", "writekey-1", url) + require.NoError(t, err) + + requireJobsCount(t, ctx, tc.postgresResource.DB, "gw", jobsdb.Succeeded.State, eventsCount) + requireJobsCount(t, ctx, tc.postgresResource.DB, "rt", jobsdb.Succeeded.State, eventsCount) + + zipkinTraces := getZipkinTraces(t, tc.zipkinTracesURL) + require.Len(t, zipkinTraces, eventsCount) + for _, zipkinTrace := range zipkinTraces { + requireTags(t, zipkinTrace, "gw.webrequesthandler", map[string]string{"reqType": "batch", "path": "/v1/batch", "sourceId": "source-1", "otel.library.name": "gateway"}, 1) + requireTags(t, zipkinTrace, "proc.processjobsfordest", map[string]string{"sourceId": "source-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.transformations", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.store", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "rt.pickup", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.process", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + } + + cancel() + require.NoError(t, wg.Wait()) + }) + t.Run("zipkin down", func(t *testing.T) { + tc := setup(t) + zipkinDownURL := "http://localhost:1234/api/v2/spans" + + bcServer := backendconfigtest.NewBuilder(). + WithWorkspaceConfig( + backendconfigtest.NewConfigBuilder(). + WithSource( + backendconfigtest.NewSourceBuilder(). + WithID("source-1"). + WithWriteKey("writekey-1"). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-1"). + Build()). + Build()). + Build()). + Build() + defer bcServer.Close() + + trServer := transformertest.NewBuilder().Build() + defer trServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + err := runRudderServer(t, ctx, tc.gwPort, tc.prometheusPort, tc.postgresResource, zipkinDownURL, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 12 + + err := sendEvents(eventsCount, "identify", "writekey-1", url) + require.NoError(t, err) + + requireJobsCount(t, ctx, tc.postgresResource.DB, "gw", jobsdb.Succeeded.State, eventsCount) + requireJobsCount(t, ctx, tc.postgresResource.DB, "rt", jobsdb.Succeeded.State, eventsCount) + + zipkinTraces := getZipkinTraces(t, tc.zipkinTracesURL) + require.Empty(t, zipkinTraces) + + cancel() + require.NoError(t, wg.Wait()) + }) + t.Run("gateway-processor-router with transformations", func(t *testing.T) { + tc := setup(t) + + bcServer := backendconfigtest.NewBuilder(). + WithWorkspaceConfig( + backendconfigtest.NewConfigBuilder(). + WithSource( + backendconfigtest.NewSourceBuilder(). + WithID("source-1"). + WithWriteKey("writekey-1"). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-1"). + WithDefinitionConfigOption("transformAtV1", "router"). + Build()). + Build()). + Build()). + Build() + defer bcServer.Close() + + trServer := transformertest.NewBuilder().WithRouterTransform("WEBHOOK").Build() + defer trServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Router.guaranteeUserEventOrder"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Router.WEBHOOK.enableBatching"), "false") + + err := runRudderServer(t, ctx, tc.gwPort, tc.prometheusPort, tc.postgresResource, tc.zipkinURL, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 12 + + err := sendEvents(eventsCount, "identify", "writekey-1", url) + require.NoError(t, err) + + requireJobsCount(t, ctx, tc.postgresResource.DB, "gw", jobsdb.Succeeded.State, eventsCount) + requireJobsCount(t, ctx, tc.postgresResource.DB, "rt", jobsdb.Succeeded.State, eventsCount) + + zipkinTraces := getZipkinTraces(t, tc.zipkinTracesURL) + require.Len(t, zipkinTraces, eventsCount) + for _, zipkinTrace := range zipkinTraces { + requireTags(t, zipkinTrace, "gw.webrequesthandler", map[string]string{"reqType": "batch", "path": "/v1/batch", "sourceId": "source-1", "otel.library.name": "gateway"}, 1) + requireTags(t, zipkinTrace, "proc.processjobsfordest", map[string]string{"sourceId": "source-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.transformations", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.store", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "rt.pickup", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.transform", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.process", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + } + + cancel() + require.NoError(t, wg.Wait()) + }) + t.Run("gateway-processor-router with batch transformations", func(t *testing.T) { + tc := setup(t) + + bcServer := backendconfigtest.NewBuilder(). + WithWorkspaceConfig( + backendconfigtest.NewConfigBuilder(). + WithSource( + backendconfigtest.NewSourceBuilder(). + WithID("source-1"). + WithWriteKey("writekey-1"). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-1"). + Build()). + Build()). + Build()). + Build() + defer bcServer.Close() + + trServer := transformertest.NewBuilder().WithRouterTransform("WEBHOOK").Build() + defer trServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Router.guaranteeUserEventOrder"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Router.WEBHOOK.enableBatching"), "true") + + err := runRudderServer(t, ctx, tc.gwPort, tc.prometheusPort, tc.postgresResource, tc.zipkinURL, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 12 + + err := sendEvents(eventsCount, "identify", "writekey-1", url) + require.NoError(t, err) + + requireJobsCount(t, ctx, tc.postgresResource.DB, "gw", jobsdb.Succeeded.State, eventsCount) + requireJobsCount(t, ctx, tc.postgresResource.DB, "rt", jobsdb.Succeeded.State, eventsCount) + + zipkinTraces := getZipkinTraces(t, tc.zipkinTracesURL) + require.Len(t, zipkinTraces, eventsCount) + for _, zipkinTrace := range zipkinTraces { + requireTags(t, zipkinTrace, "gw.webrequesthandler", map[string]string{"reqType": "batch", "path": "/v1/batch", "sourceId": "source-1", "otel.library.name": "gateway"}, 1) + requireTags(t, zipkinTrace, "proc.processjobsfordest", map[string]string{"sourceId": "source-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.transformations", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.store", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "rt.pickup", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.batchtransform", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.process", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + } + + cancel() + require.NoError(t, wg.Wait()) + }) + t.Run("failed at gateway", func(t *testing.T) { + tc := setup(t) + + bcServer := backendconfigtest.NewBuilder(). + WithWorkspaceConfig( + backendconfigtest.NewConfigBuilder(). + WithSource( + backendconfigtest.NewSourceBuilder(). + WithID("source-1"). + WithWriteKey("writekey-1"). + Build()). + Build()). + Build() + defer bcServer.Close() + + trServer := transformertest.NewBuilder().Build() + defer trServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.maxReqSizeInKB"), "0") + + err := runRudderServer(t, ctx, tc.gwPort, tc.prometheusPort, tc.postgresResource, tc.zipkinURL, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 12 + + for i := 0; i < eventsCount; i++ { + err := sendEvents(1, "identify", "writekey-1", url) + require.Error(t, err) + } + + zipkinTraces := getZipkinTraces(t, tc.zipkinTracesURL) + require.Len(t, zipkinTraces, eventsCount) + for _, zipkinTrace := range zipkinTraces { + requireTags(t, zipkinTrace, "gw.webrequesthandler", map[string]string{"reqType": "batch", "path": "/v1/batch", "sourceId": "source-1", "otel.library.name": "gateway", "otel.status_code": "ERROR", "error": response.RequestBodyTooLarge}, 1) + } + + cancel() + require.NoError(t, wg.Wait()) + }) + t.Run("multiplexing in processor transformations", func(t *testing.T) { + tc := setup(t) + + bcServer := backendconfigtest.NewBuilder(). + WithWorkspaceConfig( + backendconfigtest.NewConfigBuilder(). + WithSource( + backendconfigtest.NewSourceBuilder(). + WithID("source-1"). + WithWriteKey("writekey-1"). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-1"). + WithUserTransformation("transformation-1", "version-1"). + Build()). + Build()). + Build()). + Build() + defer bcServer.Close() + + trServer := transformertest.NewBuilder(). + WithUserTransformHandler(func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) { + for i := range request { + req := request[i] + response = append(response, transformer.TransformerResponse{ + Metadata: req.Metadata, + Output: req.Message, + StatusCode: http.StatusOK, + }) + response = append(response, transformer.TransformerResponse{ + Metadata: req.Metadata, + Output: req.Message, + StatusCode: http.StatusOK, + }) + } + return + }). + Build() + defer trServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Router.jobQueryBatchSize"), "1") + + err := runRudderServer(t, ctx, tc.gwPort, tc.prometheusPort, tc.postgresResource, tc.zipkinURL, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 3 + + err := sendEvents(eventsCount, "identify", "writekey-1", url) + require.NoError(t, err) + + requireJobsCount(t, ctx, tc.postgresResource.DB, "gw", jobsdb.Succeeded.State, eventsCount) + requireJobsCount(t, ctx, tc.postgresResource.DB, "rt", jobsdb.Succeeded.State, 2*eventsCount) + + zipkinTraces := getZipkinTraces(t, tc.zipkinTracesURL) + require.Len(t, zipkinTraces, eventsCount) + for _, zipkinTrace := range zipkinTraces { + requireTags(t, zipkinTrace, "gw.webrequesthandler", map[string]string{"reqType": "batch", "path": "/v1/batch", "sourceId": "source-1", "otel.library.name": "gateway"}, 1) + requireTags(t, zipkinTrace, "proc.processjobsfordest", map[string]string{"sourceId": "source-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.transformations", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.store", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "rt.pickup", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 2) // 2 because of multiplexing + requireTags(t, zipkinTrace, "rt.process", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 2) // 2 because of multiplexing + } + + cancel() + require.NoError(t, wg.Wait()) + }) + t.Run("one source multiple destinations", func(t *testing.T) { + tc := setup(t) + + bcServer := backendconfigtest.NewBuilder(). + WithWorkspaceConfig( + backendconfigtest.NewConfigBuilder(). + WithSource( + backendconfigtest.NewSourceBuilder(). + WithID("source-1"). + WithWriteKey("writekey-1"). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-1"). + Build()). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-2"). + Build()). + WithConnection( + backendconfigtest.NewDestinationBuilder("WEBHOOK"). + WithID("destination-3"). + Build()). + Build()). + Build()). + Build() + defer bcServer.Close() + + trServer := transformertest.NewBuilder().Build() + defer trServer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + err := runRudderServer(t, ctx, tc.gwPort, tc.prometheusPort, tc.postgresResource, tc.zipkinURL, bcServer.URL, trServer.URL, t.TempDir()) + if err != nil { + t.Logf("rudder-server exited with error: %v", err) + } + return err + }) + + url := fmt.Sprintf("http://localhost:%d", tc.gwPort) + health.WaitUntilReady(ctx, t, url+"/health", 60*time.Second, 10*time.Millisecond, t.Name()) + + eventsCount := 3 + + err := sendEvents(eventsCount, "identify", "writekey-1", url) + require.NoError(t, err) + + requireJobsCount(t, ctx, tc.postgresResource.DB, "gw", jobsdb.Succeeded.State, eventsCount) + requireJobsCount(t, ctx, tc.postgresResource.DB, "rt", jobsdb.Succeeded.State, 3*eventsCount) + + zipkinTraces := getZipkinTraces(t, tc.zipkinTracesURL) + require.Len(t, zipkinTraces, eventsCount) + for _, zipkinTrace := range zipkinTraces { + requireTags(t, zipkinTrace, "gw.webrequesthandler", map[string]string{"reqType": "batch", "path": "/v1/batch", "sourceId": "source-1", "otel.library.name": "gateway"}, 1) + requireTags(t, zipkinTrace, "proc.processjobsfordest", map[string]string{"sourceId": "source-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.transformations", map[string]string{"sourceId": "source-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "proc.store", map[string]string{"sourceId": "source-1", "otel.library.name": "processor"}, 1) + requireTags(t, zipkinTrace, "rt.pickup", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.pickup", map[string]string{"sourceId": "source-1", "destinationId": "destination-2", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.pickup", map[string]string{"sourceId": "source-1", "destinationId": "destination-3", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.process", map[string]string{"sourceId": "source-1", "destinationId": "destination-1", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.process", map[string]string{"sourceId": "source-1", "destinationId": "destination-2", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + requireTags(t, zipkinTrace, "rt.process", map[string]string{"sourceId": "source-1", "destinationId": "destination-3", "destType": "WEBHOOK", "otel.library.name": "router"}, 1) + } + + cancel() + require.NoError(t, wg.Wait()) + }) +} + +func setup(t testing.TB) testConfig { + t.Helper() + + config.Reset() + t.Cleanup(config.Reset) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + zipkinResource, err := resource.SetupZipkin(pool, t) + require.NoError(t, err) + postgresResource, err := resource.SetupPostgres(pool, t) + require.NoError(t, err) + + zipkinURL := "http://localhost:" + zipkinResource.Port + "/api/v2/spans" + zipkinTracesURL := "http://localhost:" + zipkinResource.Port + "/api/v2/traces?limit=100&serviceName=" + app.EMBEDDED + + gwPort, err := kithelper.GetFreePort() + require.NoError(t, err) + prometheusPort, err := kithelper.GetFreePort() + require.NoError(t, err) + + return testConfig{ + zipkinURL: zipkinURL, + zipkinTracesURL: zipkinTracesURL, + postgresResource: postgresResource, + gwPort: gwPort, + prometheusPort: prometheusPort, + } +} + +func runRudderServer( + t testing.TB, + ctx context.Context, + port int, + prometheusPort int, + postgresContainer *resource.PostgresResource, + zipkinURL, cbURL, transformerURL, tmpDir string, +) (err error) { + t.Setenv("CONFIG_BACKEND_URL", cbURL) + t.Setenv("WORKSPACE_TOKEN", "token") + t.Setenv("DEST_TRANSFORM_URL", transformerURL) + + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.port"), postgresContainer.Port) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.user"), postgresContainer.User) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.name"), postgresContainer.Database) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DB.password"), postgresContainer.Password) + + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.mode"), "off") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "DestinationDebugger.disableEventDeliveryStatusUploads"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "SourceDebugger.disableEventUploads"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "TransformationDebugger.disableTransformationStatusUploads"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "JobsDB.backup.enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "JobsDB.migrateDSLoopSleepDuration"), "60m") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "archival.Enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Reporting.syncer.enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.mainLoopFreq"), "1s") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "BatchRouter.uploadFreq"), "1s") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.webPort"), strconv.Itoa(port)) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "RUDDER_TMPDIR"), os.TempDir()) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.storagePath"), path.Join(tmpDir, "/recovery_data.json")) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "recovery.enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Profiler.Enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Gateway.enableSuppressUserFeature"), "false") + + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "enableStats"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "RuntimeStats.enabled"), "false") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.enabled"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.traces.endpoint"), zipkinURL) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.traces.samplingRate"), "1.0") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.traces.withSyncer"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.traces.withZipkin"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.metrics.prometheus.enabled"), "true") + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.metrics.prometheus.port"), strconv.Itoa(prometheusPort)) + t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "OpenTelemetry.metrics.exportInterval"), "10ms") + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panicked: %v", r) + } + }() + r := runner.New(runner.ReleaseInfo{EnterpriseToken: "DUMMY"}) + c := r.Run(ctx, []string{"rudder-tracing"}) + if c != 0 { + err = fmt.Errorf("rudder-server exited with a non-0 exit code: %d", c) + } + return +} + +// nolint: unparam, bodyclose +func sendEvents( + num int, + eventType, writeKey, + url string, +) error { + for i := 0; i < num; i++ { + payload := []byte(fmt.Sprintf(` + { + "batch": [ + { + "userId": %[1]q, + "type": %[2]q, + "context": { + "traits": { + "trait1": "new-val" + }, + "ip": "14.5.67.21", + "library": { + "name": "http" + } + }, + "timestamp": "2020-02-02T00:23:09.544Z" + } + ] + }`, + rand.String(10), + eventType, + )) + req, err := http.NewRequest(http.MethodPost, url+"/v1/batch", bytes.NewReader(payload)) + if err != nil { + return err + } + req.SetBasicAuth(writeKey, "password") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to send event to rudder server, status code: %d: %s", resp.StatusCode, string(b)) + } + kithttputil.CloseResponse(resp) + } + return nil +} + +// nolint: unparam +func requireJobsCount( + t *testing.T, + ctx context.Context, + db *sql.DB, + queue, state string, + expectedCount int, +) { + t.Helper() + + query := fmt.Sprintf(` + SELECT + count(*) + FROM + unionjobsdbmetadata('%s', 1) + WHERE + job_state = '%s' + AND parameters ->> 'traceparent' is not NULL; + `, + queue, + state, + ) + require.Eventuallyf(t, func() bool { + var jobsCount int + require.NoError(t, db.QueryRowContext(ctx, query).Scan(&jobsCount)) + t.Logf("%s %sJobCount: %d", queue, state, jobsCount) + return jobsCount == expectedCount + }, + 30*time.Second, + 1*time.Second, + "%d %s events should be in %s state", expectedCount, queue, state, + ) +} + +func getZipkinTraces(t *testing.T, zipkinTracesURL string) [][]tracemodel.ZipkinTrace { + t.Helper() + + getTracesReq, err := http.NewRequest(http.MethodGet, zipkinTracesURL, nil) + require.NoError(t, err) + + spansBody := assert.RequireEventuallyStatusCode(t, http.StatusOK, getTracesReq) + + var zipkinTraces [][]tracemodel.ZipkinTrace + require.NoError(t, json.Unmarshal([]byte(spansBody), &zipkinTraces)) + + for _, zipkinTrace := range zipkinTraces { + slices.SortFunc(zipkinTrace, func(a, b tracemodel.ZipkinTrace) int { + return int(a.Timestamp - b.Timestamp) + }) + } + return zipkinTraces +} + +func requireTags(t *testing.T, zipkinTraces []tracemodel.ZipkinTrace, traceName string, traceTags map[string]string, expectedCount int) { + t.Helper() + + // Add common tags + expectedTags := lo.Assign(traceTags, map[string]string{ + "service.name": app.EMBEDDED, + "telemetry.sdk.language": "go", + "telemetry.sdk.name": "opentelemetry", + "telemetry.sdk.version": otel.Version(), + }) + filteredTraces := lo.Filter(zipkinTraces, func(trace tracemodel.ZipkinTrace, index int) bool { + if trace.Name != traceName { + return false + } + for key, value := range expectedTags { + if trace.Tags[key] != value { + return false + } + } + return true + }) + require.Len(t, filteredTraces, expectedCount) +} diff --git a/processor/processor.go b/processor/processor.go index fdeec59937..34373465b6 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -72,6 +72,7 @@ type sourceObserver interface { // Handle is a handle to the processor module type Handle struct { conf *config.Config + tracer stats.Tracer backendConfig backendconfig.BackendConfig transformer transformer.Transformer lastJobID int64 @@ -217,6 +218,7 @@ type ParametersT struct { SourceCategory string `json:"source_category"` RecordID interface{} `json:"record_id"` WorkspaceId string `json:"workspaceId"` + TraceParent string `json:"traceparent"` } type MetricMetadata struct { @@ -394,6 +396,7 @@ func (proc *Handle) Setup( // Stats proc.statsFactory = stats.Default + proc.tracer = proc.statsFactory.NewTracer("processor") proc.stats.statGatewayDBR = func(partition string) stats.Measurement { return proc.statsFactory.NewTaggedStat("processor_gateway_db_read", stats.CountType, stats.Tags{ "partition": partition, @@ -929,9 +932,10 @@ func makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, bat commonMetadata.EventName, _ = misc.MapLookup(singularEvent, "event").(string) commonMetadata.EventType, _ = misc.MapLookup(singularEvent, "type").(string) commonMetadata.SourceDefinitionID = source.SourceDefinition.ID - commonMetadata.SourceDefinitionType = source.SourceDefinition.Type + commonMetadata.TraceParent = eventParams.TraceParent + return &commonMetadata } @@ -959,6 +963,7 @@ func enhanceWithMetadata(commonMetadata *transformer.Metadata, event *transforme metadata.DestinationDefinitionID = destination.DestinationDefinition.ID metadata.DestinationType = destination.DestinationDefinition.Name metadata.SourceDefinitionType = commonMetadata.SourceDefinitionType + metadata.TraceParent = commonMetadata.TraceParent event.Metadata = metadata } @@ -1024,7 +1029,18 @@ func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb. } } -func (proc *Handle) getTransformerEvents(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, inPU, pu string) ([]transformer.TransformerEvent, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) { +func (proc *Handle) getTransformerEvents( + response transformer.Response, + commonMetaData *transformer.Metadata, + eventsByMessageID map[string]types.SingularEventWithReceivedAt, + destination *backendconfig.DestinationT, + inPU, pu string, +) ( + []transformer.TransformerEvent, + []*types.PUReportedMetric, + map[string]int64, + map[string]MetricMetadata, +) { successMetrics := make([]*types.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*types.ConnectionDetails) statusDetailsMap := make(map[string]map[string]*types.StatusDetail) @@ -1075,6 +1091,7 @@ func (proc *Handle) getTransformerEvents(response transformer.Response, commonMe eventMetadata.SourceDefinitionID = userTransformedEvent.Metadata.SourceDefinitionID eventMetadata.DestinationDefinitionID = userTransformedEvent.Metadata.DestinationDefinitionID eventMetadata.SourceCategory = userTransformedEvent.Metadata.SourceCategory + eventMetadata.TraceParent = userTransformedEvent.Metadata.TraceParent updatedEvent := transformer.TransformerEvent{ Message: userTransformedEvent.Output, Metadata: *eventMetadata, @@ -1528,20 +1545,44 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf outCountMap := make(map[string]int64) // destinations enabled destFilterStatusDetailMap := make(map[string]map[string]*types.StatusDetail) + spans := make([]stats.TraceSpan, 0, len(jobList)) + defer func() { + for _, span := range spans { + span.End() + } + }() for _, batchEvent := range jobList { + var eventParams types.EventParams + err := jsonfast.Unmarshal(batchEvent.Parameters, &eventParams) + if err != nil { + panic(err) + } + + sourceID := eventParams.SourceId + traceParent := eventParams.TraceParent + + var span stats.TraceSpan + if traceParent == "" { + proc.logger.Debugn("Missing traceParent in processJobsForDest", logger.NewIntField("jobId", batchEvent.JobID)) + } else { + ctx := stats.InjectTraceParentIntoContext(context.Background(), traceParent) + _, span = proc.tracer.Start(ctx, "proc.processJobsForDest", stats.SpanKindConsumer, stats.SpanWithTags(stats.Tags{ + "workspaceId": batchEvent.WorkspaceId, + "sourceId": sourceID, + })) + spans = append(spans, span) + } var gatewayBatchEvent types.GatewayBatchRequest - err := jsonfast.Unmarshal(batchEvent.EventPayload, &gatewayBatchEvent) + err = jsonfast.Unmarshal(batchEvent.EventPayload, &gatewayBatchEvent) if err != nil { + if span != nil { + span.SetStatus(stats.SpanStatusError, "cannot unmarshal event payload") + } proc.logger.Warnw("json parsing of event payload", "jobID", batchEvent.JobID, "error", err) gatewayBatchEvent.Batch = []types.SingularEventT{} } - var eventParams types.EventParams - err = jsonfast.Unmarshal(batchEvent.Parameters, &eventParams) - if err != nil { - panic(err) - } - sourceId := eventParams.SourceId + requestIP := gatewayBatchEvent.RequestIP receivedAt := gatewayBatchEvent.ReceivedAt @@ -1559,8 +1600,11 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf } statusList = append(statusList, &newStatus) - source, err := proc.getSourceBySourceID(sourceId) + source, err := proc.getSourceBySourceID(sourceID) if err != nil { + if span != nil { + span.SetStatus(stats.SpanStatusError, "source not found for sourceId") + } continue } @@ -1594,7 +1638,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf dedupKeys[dedupKey] = struct{}{} } - proc.updateSourceEventStatsDetailed(singularEvent, sourceId) + proc.updateSourceEventStatsDetailed(singularEvent, sourceID) // We count this as one, not destination specific ones totalEvents++ @@ -1681,12 +1725,12 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf // Getting all the destinations which are enabled for this // event - if !proc.isDestinationAvailable(singularEvent, sourceId) { + if !proc.isDestinationAvailable(singularEvent, sourceID) { continue } - if _, ok := groupedEventsBySourceId[SourceIDT(sourceId)]; !ok { - groupedEventsBySourceId[SourceIDT(sourceId)] = make([]transformer.TransformerEvent, 0) + if _, ok := groupedEventsBySourceId[SourceIDT(sourceID)]; !ok { + groupedEventsBySourceId[SourceIDT(sourceID)] = make([]transformer.TransformerEvent, 0) } shallowEventCopy := transformer.TransformerEvent{} shallowEventCopy.Message = singularEvent @@ -1704,7 +1748,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf shallowEventCopy.Metadata.SourceTpConfig = source.DgSourceTrackingPlanConfig.Config shallowEventCopy.Metadata.MergedTpConfig = source.DgSourceTrackingPlanConfig.GetMergedConfig(commonMetadataFromSingularEvent.EventType) - groupedEventsBySourceId[SourceIDT(sourceId)] = append(groupedEventsBySourceId[SourceIDT(sourceId)], shallowEventCopy) + groupedEventsBySourceId[SourceIDT(sourceID)] = append(groupedEventsBySourceId[SourceIDT(sourceID)], shallowEventCopy) if proc.isReportingEnabled() { proc.updateMetricMaps(inCountMetadataMap, outCountMap, connectionDetailsMap, destFilterStatusDetailMap, event, jobsdb.Succeeded.State, types.DESTINATION_FILTER, func() json.RawMessage { return []byte(`{}`) }, nil) @@ -1959,6 +2003,37 @@ func (proc *Handle) transformations(partition string, in *transformationMessage) wg := sync.WaitGroup{} wg.Add(len(in.groupedEvents)) + spans := make([]stats.TraceSpan, 0, len(in.groupedEvents)) + defer func() { + for _, span := range spans { + span.End() + } + }() + + traces := make(map[string]stats.Tags) + for _, eventList := range in.groupedEvents { + for _, event := range eventList { + if event.Metadata.TraceParent == "" { + proc.logger.Debugn("Missing traceParent in transformations", logger.NewIntField("jobId", event.Metadata.JobID)) + continue + } + if _, ok := traces[event.Metadata.TraceParent]; ok { + continue + } + tags := stats.Tags{ + "workspaceId": event.Metadata.WorkspaceID, + "sourceId": event.Metadata.SourceID, + "destinationId": event.Metadata.DestinationID, + "destType": event.Metadata.DestinationType, + } + ctx := stats.InjectTraceParentIntoContext(context.Background(), event.Metadata.TraceParent) + _, span := proc.tracer.Start(ctx, "proc.transformations", stats.SpanKindInternal, stats.SpanWithTags(tags)) + + spans = append(spans, span) + traces[event.Metadata.TraceParent] = tags + } + } + for srcAndDestKey, eventList := range in.groupedEvents { srcAndDestKey, eventList := srcAndDestKey, eventList rruntime.Go(func() { @@ -2015,6 +2090,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage) in.start, in.hasMore, in.rsourcesStats, + traces, } } @@ -2037,6 +2113,7 @@ type storeMessage struct { hasMore bool rsourcesStats rsources.StatsCollector + traces map[string]stats.Tags } func (sm *storeMessage) merge(subJob *storeMessage) { @@ -2077,6 +2154,18 @@ func (proc *Handle) sendQueryRetryStats(attempt int) { } func (proc *Handle) Store(partition string, in *storeMessage) { + spans := make([]stats.TraceSpan, 0, len(in.traces)) + defer func() { + for _, span := range spans { + span.End() + } + }() + for traceParent, tags := range in.traces { + ctx := stats.InjectTraceParentIntoContext(context.Background(), traceParent) + _, span := proc.tracer.Start(ctx, "proc.store", stats.SpanKindProducer, stats.SpanWithTags(tags)) + spans = append(spans, span) + } + if proc.limiter.store != nil { defer proc.limiter.store.BeginWithPriority(partition, proc.getLimiterPriority(partition))() } @@ -2633,6 +2722,7 @@ func (proc *Handle) transformSrcDest( DestinationDefinitionID: destDefID, RecordID: recordId, WorkspaceId: workspaceId, + TraceParent: metadata.TraceParent, } marshalledParams, err := jsonfast.Marshal(params) if err != nil { @@ -2904,8 +2994,7 @@ func (proc *Handle) handlePendingGatewayJobs(partition string) bool { subJobs: unprocessedList.Jobs, hasMore: false, rsourcesStats: rsourcesStats, - }, - ), + }), ), ) proc.stats.statLoopTime(partition).Since(s) diff --git a/processor/processor_test.go b/processor/processor_test.go index 39dda7970c..bf051fbd6c 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -1692,7 +1692,25 @@ var _ = Describe("Processor", Ordered, func() { Expect(job.ExpireAt).To(BeTemporally("~", time.Now(), 200*time.Millisecond)) Expect(string(job.EventPayload)).To(Equal(fmt.Sprintf(`{"int-value":%d,"string-value":%q}`, i, destination))) Expect(len(job.LastJobStatus.JobState)).To(Equal(0)) - Expect(string(job.Parameters)).To(Equal(`{"source_id":"source-from-transformer","destination_id":"destination-from-transformer","received_at":"","transform_at":"processor","message_id":"","gateway_job_id":0,"source_task_run_id":"","source_job_id":"","source_job_run_id":"","event_name":"","event_type":"","source_definition_id":"","destination_definition_id":"","source_category":"","record_id":null,"workspaceId":""}`)) + require.JSONEq(GinkgoT(), `{ + "source_id":"source-from-transformer", + "destination_id":"destination-from-transformer", + "received_at":"", + "transform_at":"processor", + "message_id":"", + "gateway_job_id":0, + "source_task_run_id":"", + "source_job_id":"", + "source_job_run_id":"", + "event_name":"", + "event_type":"", + "source_definition_id":"", + "destination_definition_id":"", + "source_category":"", + "record_id":null, + "workspaceId":"", + "traceparent":"" + }`, string(job.Parameters)) } // One Store call is expected for all events c.mockRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { @@ -1932,7 +1950,25 @@ var _ = Describe("Processor", Ordered, func() { // Expect(job.CustomVal).To(Equal("destination-definition-name-a")) Expect(string(job.EventPayload)).To(Equal(fmt.Sprintf(`{"int-value":%d,"string-value":%q}`, i, destination))) Expect(len(job.LastJobStatus.JobState)).To(Equal(0)) - Expect(string(job.Parameters)).To(Equal(`{"source_id":"source-from-transformer","destination_id":"destination-from-transformer","received_at":"","transform_at":"processor","message_id":"","gateway_job_id":0,"source_task_run_id":"","source_job_id":"","source_job_run_id":"","event_name":"","event_type":"","source_definition_id":"","destination_definition_id":"","source_category":"","record_id":null,"workspaceId":""}`)) + require.JSONEq(GinkgoT(), `{ + "source_id": "source-from-transformer", + "destination_id": "destination-from-transformer", + "received_at": "", + "transform_at": "processor", + "message_id": "", + "gateway_job_id": 0, + "source_task_run_id": "", + "source_job_id": "", + "source_job_run_id": "", + "event_name": "", + "event_type": "", + "source_definition_id": "", + "destination_definition_id": "", + "source_category": "", + "record_id": null, + "workspaceId": "", + "traceparent": "" + }`, string(job.Parameters)) } c.mockBatchRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index 2e3874b478..3c5edc4dce 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -65,6 +65,7 @@ type Metadata struct { DestinationType string `json:"destinationType"` MessageID string `json:"messageId"` OAuthAccessToken string `json:"oauthAccessToken"` + TraceParent string `json:"traceparent"` // set by user_transformer to indicate transformed event is part of group indicated by messageIDs MessageIDs []string `json:"messageIds"` RudderID string `json:"rudderId"` diff --git a/processor/worker.go b/processor/worker.go index 5ddb56e04e..c3e40a5cc4 100644 --- a/processor/worker.go +++ b/processor/worker.go @@ -121,42 +121,42 @@ func (w *worker) start() { // Work picks the next set of jobs from the jobsdb and returns [true] if jobs were picked, [false] otherwise func (w *worker) Work() (worked bool) { - if w.handle.config().enablePipelining { - start := time.Now() - jobs := w.handle.getJobs(w.partition) - afterGetJobs := time.Now() - if len(jobs.Jobs) == 0 { - return - } - worked = true + if !w.handle.config().enablePipelining { + return w.handle.handlePendingGatewayJobs(w.partition) + } - if err := w.handle.markExecuting(w.partition, jobs.Jobs); err != nil { - w.logger.Error(err) - panic(err) - } + start := time.Now() + jobs := w.handle.getJobs(w.partition) + afterGetJobs := time.Now() + if len(jobs.Jobs) == 0 { + return + } + worked = true + + if err := w.handle.markExecuting(w.partition, jobs.Jobs); err != nil { + w.logger.Error(err) + panic(err) + } - w.handle.stats().DBReadThroughput(w.partition).Count(throughputPerSecond(jobs.EventsCount, time.Since(start))) + w.handle.stats().DBReadThroughput(w.partition).Count(throughputPerSecond(jobs.EventsCount, time.Since(start))) - rsourcesStats := rsources.NewStatsCollector(w.handle.rsourcesService()) - rsourcesStats.BeginProcessing(jobs.Jobs) - subJobs := w.handle.jobSplitter(jobs.Jobs, rsourcesStats) - for _, subJob := range subJobs { - w.channel.preprocess <- subJob - } + rsourcesStats := rsources.NewStatsCollector(w.handle.rsourcesService()) + rsourcesStats.BeginProcessing(jobs.Jobs) + subJobs := w.handle.jobSplitter(jobs.Jobs, rsourcesStats) + for _, subJob := range subJobs { + w.channel.preprocess <- subJob + } - if !jobs.LimitsReached { - readLoopSleep := w.handle.config().readLoopSleep - if elapsed := time.Since(afterGetJobs); elapsed < readLoopSleep.Load() { - if err := misc.SleepCtx(w.lifecycle.ctx, readLoopSleep.Load()-elapsed); err != nil { - return - } + if !jobs.LimitsReached { + readLoopSleep := w.handle.config().readLoopSleep + if elapsed := time.Since(afterGetJobs); elapsed < readLoopSleep.Load() { + if err := misc.SleepCtx(w.lifecycle.ctx, readLoopSleep.Load()-elapsed); err != nil { + return } } - - return - } else { - return w.handle.handlePendingGatewayJobs(w.partition) } + + return } func (w *worker) SleepDurations() (min, max time.Duration) { diff --git a/processor/worker_handle.go b/processor/worker_handle.go index ce6eefdcac..6cecfb1e58 100644 --- a/processor/worker_handle.go +++ b/processor/worker_handle.go @@ -4,6 +4,7 @@ import ( "time" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/services/rsources" "github.com/rudderlabs/rudder-server/utils/misc" @@ -16,6 +17,7 @@ type workerHandle interface { rsourcesService() rsources.JobService handlePendingGatewayJobs(key string) bool stats() *processorStats + tracer() stats.Tracer getJobs(partition string) jobsdb.JobsResult markExecuting(partition string, jobs []*jobsdb.JobT) error diff --git a/processor/worker_handle_adapter.go b/processor/worker_handle_adapter.go index 0c009abff3..849b1b964b 100644 --- a/processor/worker_handle_adapter.go +++ b/processor/worker_handle_adapter.go @@ -2,6 +2,7 @@ package processor import ( "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/services/rsources" ) @@ -32,3 +33,7 @@ func (h *workerHandleAdapter) rsourcesService() rsources.JobService { func (h *workerHandleAdapter) stats() *processorStats { return &h.Handle.stats } + +func (h *workerHandleAdapter) tracer() stats.Tracer { + return h.Handle.tracer +} diff --git a/processor/worker_test.go b/processor/worker_test.go index aa1bd15415..f22c4a5e60 100644 --- a/processor/worker_test.go +++ b/processor/worker_test.go @@ -159,6 +159,10 @@ type mockWorkerHandle struct { limitsReached bool } +func (m *mockWorkerHandle) tracer() stats.Tracer { + return stats.NOP.NewTracer("") +} + func (m *mockWorkerHandle) validate(t *testing.T) { m.statsMu.RLock() defer m.statsMu.RUnlock() diff --git a/router/handle.go b/router/handle.go index ca2062d523..5f4b193cef 100644 --- a/router/handle.go +++ b/router/handle.go @@ -79,6 +79,7 @@ type Handle struct { // state logger logger.Logger + tracer stats.Tracer destinationResponseHandler ResponseHandler telemetry *Diagnostic netHandle NetHandle @@ -216,6 +217,13 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke statusList = nil } + traces := make(map[string]stats.TraceSpan) + defer func() { + for _, span := range traces { + span.End() + } + }() + // Identify jobs which can be processed var iterationInterrupted bool for iterator.HasNext() { @@ -230,6 +238,22 @@ func (rt *Handle) pickup(ctx context.Context, partition string, workers []*worke lastJob = job slot, err := rt.findWorkerSlot(ctx, workers, job, blockedOrderKeys) if err == nil { + traceParent := gjson.GetBytes(job.Parameters, "traceparent").String() + if traceParent != "" { + if _, ok := traces[traceParent]; !ok { + ctx := stats.InjectTraceParentIntoContext(context.Background(), traceParent) + _, span := rt.tracer.Start(ctx, "rt.pickup", stats.SpanKindConsumer, stats.SpanWithTags(stats.Tags{ + "workspaceId": job.WorkspaceId, + "sourceId": gjson.GetBytes(job.Parameters, "source_id").String(), + "destinationId": gjson.GetBytes(job.Parameters, "destination_id").String(), + "destType": rt.destType, + })) + traces[traceParent] = span + } + } else { + rt.logger.Debugn("traceParent is empty during router pickup", logger.NewIntField("jobId", job.JobID)) + } + status := jobsdb.JobStatusT{ JobID: job.JobID, AttemptNum: job.LastJobStatus.AttemptNum, diff --git a/router/handle_lifecycle.go b/router/handle_lifecycle.go index e5f5ce77ec..02f442ac45 100644 --- a/router/handle_lifecycle.go +++ b/router/handle_lifecycle.go @@ -108,6 +108,7 @@ func (rt *Handle) Setup( rt.eventOrderHalfEnabledStateDuration = config.GetReloadableDurationVar(10, time.Minute, "Router."+destType+".eventOrderHalfEnabledStateDuration", "Router.eventOrderHalfEnabledStateDuration") statTags := stats.Tags{"destType": rt.destType} + rt.tracer = stats.Default.NewTracer("router") rt.batchInputCountStat = stats.Default.NewTaggedStat("router_batch_num_input_jobs", stats.CountType, statTags) rt.batchOutputCountStat = stats.Default.NewTaggedStat("router_batch_num_output_jobs", stats.CountType, statTags) rt.routerTransformInputCountStat = stats.Default.NewTaggedStat("router_transform_num_input_jobs", stats.CountType, statTags) diff --git a/router/types/types.go b/router/types/types.go index ed970d2d70..2f5dddb779 100644 --- a/router/types/types.go +++ b/router/types/types.go @@ -79,6 +79,7 @@ type JobMetadataT struct { WorkerAssignedTime time.Time `json:"workerAssignedTime"` DestInfo json.RawMessage `json:"destInfo,omitempty"` DontBatch bool `json:"dontBatch"` + TraceParent string `json:"traceparent"` } // TransformMessageT is used to pass message to the transformer workers diff --git a/router/utils/utils.go b/router/utils/utils.go index 1b1dd754f6..d25b433dc5 100644 --- a/router/utils/utils.go +++ b/router/utils/utils.go @@ -71,6 +71,7 @@ type JobParameters struct { WorkspaceID string `json:"workspaceId"` RudderAccountID string `json:"rudderAccountId"` DontBatch bool `json:"dontBatch"` + TraceParent string `json:"traceparent"` } // ParseReceivedAtTime parses the [ReceivedAt] field and returns the parsed time or a zero value time if parsing fails diff --git a/router/worker.go b/router/worker.go index 008fe1e6ee..23f653d1c2 100644 --- a/router/worker.go +++ b/router/worker.go @@ -173,6 +173,7 @@ func (w *worker) workLoop() { WorkspaceID: parameters.WorkspaceID, WorkerAssignedTime: message.assignedAt, DontBatch: parameters.DontBatch, + TraceParent: parameters.TraceParent, } w.rt.destinationsMapMu.RLock() @@ -258,6 +259,31 @@ func (w *worker) transform(routerJobs []types.RouterJobT) []types.DestinationJob limiterStats.Update(w.partition, time.Since(start), len(routerJobs), 0) }() + traces := make(map[string]stats.TraceSpan) + defer func() { + for _, span := range traces { + span.End() + } + }() + + for _, job := range routerJobs { + traceParent := job.JobMetadata.TraceParent + if traceParent != "" { + if _, ok := traces[traceParent]; !ok { + ctx := stats.InjectTraceParentIntoContext(context.Background(), traceParent) + _, span := w.rt.tracer.Start(ctx, "rt.transform", stats.SpanKindInternal, stats.SpanWithTags(stats.Tags{ + "workspaceId": job.JobMetadata.WorkspaceID, + "sourceId": job.JobMetadata.SourceID, + "destinationId": job.JobMetadata.DestinationID, + "destType": w.rt.destType, + })) + traces[traceParent] = span + } + } else { + w.rt.logger.Debugn("traceParent is empty during router transform", logger.NewIntField("jobId", job.JobMetadata.JobID)) + } + } + w.rt.routerTransformInputCountStat.Count(len(routerJobs)) destinationJobs := w.rt.transformer.Transform( transformer.ROUTER_TRANSFORM, @@ -278,6 +304,31 @@ func (w *worker) batchTransform(routerJobs []types.RouterJobT) []types.Destinati limiterStats.Update(w.partition, time.Since(start), len(routerJobs), 0) }() + traces := make(map[string]stats.TraceSpan) + defer func() { + for _, span := range traces { + span.End() + } + }() + + for _, job := range routerJobs { + traceParent := job.JobMetadata.TraceParent + if traceParent != "" { + if _, ok := traces[traceParent]; !ok { + ctx := stats.InjectTraceParentIntoContext(context.Background(), traceParent) + _, span := w.rt.tracer.Start(ctx, "rt.batchTransform", stats.SpanKindInternal, stats.SpanWithTags(stats.Tags{ + "workspaceId": job.JobMetadata.WorkspaceID, + "sourceId": job.JobMetadata.SourceID, + "destinationId": job.JobMetadata.DestinationID, + "destType": w.rt.destType, + })) + traces[traceParent] = span + } + } else { + w.rt.logger.Debugn("traceParent is empty during router batch transform", logger.NewIntField("jobId", job.JobMetadata.JobID)) + } + } + inputJobsLength := len(routerJobs) w.rt.batchInputCountStat.Count(inputJobsLength) destinationJobs := w.rt.transformer.Transform( @@ -308,6 +359,33 @@ func (w *worker) processDestinationJobs() { transformerProxy := w.rt.reloadableConfig.transformerProxy.Load() + traces := make(map[string]stats.TraceSpan) + defer func() { + for _, span := range traces { + span.End() + } + }() + + for _, job := range w.destinationJobs { + for _, jobMetadata := range job.JobMetadataArray { + traceParent := jobMetadata.TraceParent + if traceParent != "" { + if _, ok := traces[traceParent]; !ok { + ctx := stats.InjectTraceParentIntoContext(context.Background(), traceParent) + _, span := w.rt.tracer.Start(ctx, "rt.process", stats.SpanKindInternal, stats.SpanWithTags(stats.Tags{ + "workspaceId": jobMetadata.WorkspaceID, + "sourceId": jobMetadata.SourceID, + "destinationId": jobMetadata.DestinationID, + "destType": w.rt.destType, + })) + traces[traceParent] = span + } + } else { + w.rt.logger.Debugn("traceParent is empty during router process", logger.NewIntField("jobId", jobMetadata.JobID)) + } + } + } + var respContentType string /* diff --git a/testhelper/transformertest/builder.go b/testhelper/transformertest/builder.go index 460f8d6a99..fc24b9554d 100644 --- a/testhelper/transformertest/builder.go +++ b/testhelper/transformertest/builder.go @@ -7,6 +7,8 @@ import ( "net/http/httptest" "strings" + "github.com/rudderlabs/rudder-server/router/types" + "github.com/tidwall/sjson" "github.com/rudderlabs/rudder-server/processor/transformer" @@ -23,10 +25,34 @@ func NewBuilder() *Builder { // Builder is a builder for a test transformer server type Builder struct { - routerTransforms map[string]struct{} - userTransformHandler http.HandlerFunc - destTransformHandlers map[string]http.HandlerFunc - trackingPlanHandler http.HandlerFunc + routerTransforms map[string]struct{} + userTransformHandler http.HandlerFunc + destTransformHandlers map[string]http.HandlerFunc + trackingPlanHandler http.HandlerFunc + routerTransformHandler http.HandlerFunc + routerBatchTransformHandler http.HandlerFunc +} + +// WithRouterTransformHandlerFunc sets the router transformation http handler function for the server +func (b *Builder) WithRouterTransformHandlerFunc(h http.HandlerFunc) *Builder { + b.routerTransformHandler = apiVersionMiddleware(h) + return b +} + +// WithRouterTransformHandler sets the router transformation handler for the server +func (b *Builder) WithRouterTransformHandler(h RouterTransformerHandler) *Builder { + return b.WithRouterTransformHandlerFunc(routerTransformerFunc(h)) +} + +// WithRouterBatchTransformHandlerFunc sets the router batch transformation http handler function for the server +func (b *Builder) WithRouterBatchTransformHandlerFunc(h http.HandlerFunc) *Builder { + b.routerBatchTransformHandler = apiVersionMiddleware(h) + return b +} + +// WithRouterBatchTransformHandler sets the router batch transformation handler for the server +func (b *Builder) WithRouterBatchTransformHandler(h RouterTransformerHandler) *Builder { + return b.WithRouterBatchTransformHandlerFunc(routerBatchTransformerFunc(h)) } // WithUserTransformHandlerFunc sets the user transformation http handler function for the server @@ -97,6 +123,16 @@ func (b *Builder) Build() *httptest.Server { } } + // router transformation + if b.routerTransformHandler == nil { + b.routerTransformHandler = apiVersionMiddleware(routerTransformerFunc(MirroringRouterTransformerHandler)) + } + mux.HandleFunc("/routerTransform", b.routerTransformHandler) + if b.routerBatchTransformHandler == nil { + b.routerBatchTransformHandler = apiVersionMiddleware(routerBatchTransformerFunc(MirroringRouterTransformerHandler)) + } + mux.HandleFunc("/batch", b.routerBatchTransformHandler) + // features features := []byte(`{"routerTransform": {}}`) for destType := range b.routerTransforms { @@ -112,12 +148,49 @@ func transformerFunc(h TransformerHandler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { data, err := io.ReadAll(r.Body) if err != nil { - w.WriteHeader(500) + w.WriteHeader(http.StatusInternalServerError) return } var request []transformer.TransformerEvent if err := json.Unmarshal(data, &request); err != nil { - w.WriteHeader(400) + w.WriteHeader(http.StatusBadRequest) + return + } + _ = json.NewEncoder(w).Encode(h(request)) + } +} + +func routerTransformerFunc(h RouterTransformerHandler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + data, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var request types.TransformMessageT + if err := json.Unmarshal(data, &request); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + _ = json.NewEncoder(w).Encode(struct { + Output []types.DestinationJobT `json:"output"` + }{ + Output: h(request), + }) + } +} + +func routerBatchTransformerFunc(h RouterTransformerHandler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + data, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + var request types.TransformMessageT + if err := json.Unmarshal(data, &request); err != nil { + w.WriteHeader(http.StatusBadRequest) return } _ = json.NewEncoder(w).Encode(h(request)) diff --git a/testhelper/transformertest/handler_funcs.go b/testhelper/transformertest/handler_funcs.go index d517f221d1..563c46fdd1 100644 --- a/testhelper/transformertest/handler_funcs.go +++ b/testhelper/transformertest/handler_funcs.go @@ -2,6 +2,9 @@ package transformertest import ( "encoding/json" + "net/http" + + "github.com/rudderlabs/rudder-server/router/types" "github.com/rudderlabs/rudder-server/processor/integrations" "github.com/rudderlabs/rudder-server/processor/transformer" @@ -10,6 +13,9 @@ import ( // TransformerHandler is a function that takes a transformer request and returns a response type TransformerHandler func(request []transformer.TransformerEvent) []transformer.TransformerResponse +// RouterTransformerHandler is a function that takes a router transformer request and returns a response +type RouterTransformerHandler func(request types.TransformMessageT) types.DestinationJobs + // MirroringTransformerHandler mirrors the request payload in the response var MirroringTransformerHandler TransformerHandler = func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) { for i := range request { @@ -17,12 +23,27 @@ var MirroringTransformerHandler TransformerHandler = func(request []transformer. response = append(response, transformer.TransformerResponse{ Metadata: req.Metadata, Output: req.Message, - StatusCode: 200, + StatusCode: http.StatusOK, }) } return } +// MirroringRouterTransformerHandler mirrors the router request payload in the response +var MirroringRouterTransformerHandler RouterTransformerHandler = func(request types.TransformMessageT) (response types.DestinationJobs) { + response = make(types.DestinationJobs, len(request.Data)) + for j := range request.Data { + req := request.Data[j] + response[j] = types.DestinationJobT{ + Message: req.Message, + JobMetadataArray: []types.JobMetadataT{req.JobMetadata}, + Destination: req.Destination, + StatusCode: http.StatusOK, + } + } + return +} + // ErrorTransformerHandler mirrors the request payload in the response but uses an error status code func ErrorTransformerHandler(code int, err string) TransformerHandler { return func(request []transformer.TransformerEvent) (response []transformer.TransformerResponse) { @@ -72,7 +93,7 @@ func DestTransformerHandler(f func(event transformer.TransformerEvent) integrati res = append(res, transformer.TransformerResponse{ Metadata: req.Metadata, Output: output, - StatusCode: 200, + StatusCode: http.StatusOK, }) } return diff --git a/utils/types/types.go b/utils/types/types.go index dfaff55ce9..578b99d434 100644 --- a/utils/types/types.go +++ b/utils/types/types.go @@ -35,6 +35,7 @@ type EventParams struct { SourceJobRunId string `json:"source_job_run_id"` SourceId string `json:"source_id"` SourceTaskRunId string `json:"source_task_run_id"` + TraceParent string `json:"traceparent"` } // UserSuppression is interface to access Suppress user feature