Skip to content

Commit

Permalink
feat: tracing support for gateway, processor and router (#4248)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Jan 3, 2024
1 parent 9842f64 commit 7dc7747
Show file tree
Hide file tree
Showing 22 changed files with 1,156 additions and 65 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ jobs:
- integration_test/reporting_dropped_events
- integration_test/reporting_error_index
- integration_test/warehouse
- integration_test/tracing
- processor
- regulation-worker
- router
Expand All @@ -129,7 +130,7 @@ jobs:
include:
- package: services
exclude: services/rsources

steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
Expand Down
11 changes: 7 additions & 4 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,13 @@ var _ = Describe("Gateway", func() {

var paramsMap, expectedParamsMap map[string]interface{}
_ = json.Unmarshal(job.Parameters, &paramsMap)
expectedStr := []byte(fmt.Sprintf(`{"source_id": "%v", "source_job_run_id": "", "source_task_run_id": ""}`, SourceIDEnabled))
expectedStr := []byte(fmt.Sprintf(
`{"source_id": "%v", "source_job_run_id": "", "source_task_run_id": "", "traceparent": ""}`,
SourceIDEnabled,
))
_ = json.Unmarshal(expectedStr, &expectedParamsMap)
equals := reflect.DeepEqual(paramsMap, expectedParamsMap)
Expect(equals).To(Equal(true))
Expect(equals).To(BeTrue())

Expect(job.CustomVal).To(Equal(customVal))

Expand Down Expand Up @@ -1569,7 +1572,7 @@ var _ = Describe("Gateway", func() {
jobs []*jobsdb.JobT,
) error {
for idx, job := range jobs {
Expect(misc.IsValidUUID(job.UUID.String())).To(Equal(true))
Expect(misc.IsValidUUID(job.UUID.String())).To(BeTrue())
Expect(job.CustomVal).To(Equal("WEBHOOK"))

var paramsMap, expectedParamsMap map[string]interface{}
Expand All @@ -1587,7 +1590,7 @@ var _ = Describe("Gateway", func() {
_ = json.Unmarshal(job.Parameters, &paramsMap)
_ = json.Unmarshal(expectedStr, &expectedParamsMap)
equals := reflect.DeepEqual(paramsMap, expectedParamsMap)
Expect(equals).To(Equal(true))
Expect(equals).To(BeTrue())
}
return nil
}).
Expand Down
23 changes: 21 additions & 2 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Handle struct {
config *config.Config
logger logger.Logger
stats stats.Stats
tracer stats.Tracer
application app.App
backendConfig backendconfig.BackendConfig
jobsDB jobsdb.JobsDB
Expand Down Expand Up @@ -271,6 +272,9 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,

// values retrieved from first event in batch
sourcesJobRunID, sourcesTaskRunID = req.authContext.SourceJobRunID, req.authContext.SourceTaskRunID

// tracing
traceParent = req.traceParent
)

fillMessageID := func(event map[string]interface{}) {
Expand Down Expand Up @@ -423,10 +427,11 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
return
}

params := map[string]interface{}{
params := map[string]any{
"source_id": sourceID,
"source_job_run_id": sourcesJobRunID,
"source_task_run_id": sourcesTaskRunID,
"traceparent": traceParent,
}
marshalledParams, err = json.Marshal(params)
if err != nil {
Expand Down Expand Up @@ -588,6 +593,20 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do
}
userWebRequestWorker := gw.findUserWebRequestWorker(workerKey)
ipAddr := misc.GetIPFromReq(req)
webReq := webRequestT{done: done, reqType: reqType, requestPayload: requestPayload, authContext: arctx, ipAddr: ipAddr, userIDHeader: userIDHeader}

traceParent := stats.GetTraceParentFromContext(req.Context())
if traceParent == "" {
gw.logger.Debugw("traceParent not found in request")
}

webReq := webRequestT{
done: done,
reqType: reqType,
requestPayload: requestPayload,
authContext: arctx,
traceParent: traceParent,
ipAddr: ipAddr,
userIDHeader: userIDHeader,
}
userWebRequestWorker.webRequestQ <- &webReq
}
21 changes: 19 additions & 2 deletions gateway/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package gateway
import (
"context"
"net/http"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -74,12 +76,27 @@ func (gw *Handle) webHandler() http.HandlerFunc {
// webRequestHandler - handles web requests containing rudder events as payload.
// It parses the payload and calls the request handler to process the request.
func (gw *Handle) webRequestHandler(rh RequestHandler, w http.ResponseWriter, r *http.Request) {
reqType := r.Context().Value(gwtypes.CtxParamCallType).(string)
arctx := r.Context().Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext)
ctx := r.Context()
reqType := ctx.Value(gwtypes.CtxParamCallType).(string)
arctx := ctx.Value(gwtypes.CtxParamAuthRequestContext).(*gwtypes.AuthRequestContext)

ctx, span := gw.tracer.Start(ctx, "gw.webRequestHandler", stats.SpanKindServer,
stats.SpanWithTimestamp(time.Now()),
stats.SpanWithTags(stats.Tags{
"reqType": reqType,
"path": r.URL.Path,
"workspaceId": arctx.WorkspaceID,
"sourceId": arctx.SourceID,
}),
)
r = r.WithContext(ctx)

gw.logger.LogRequest(r)
var errorMessage string
defer func() {
defer span.End()
if errorMessage != "" {
span.SetStatus(stats.SpanStatusError, errorMessage)
status := response.GetErrorStatusCode(errorMessage)
responseBody := response.GetStatus(errorMessage)
gw.logger.Infow("response",
Expand Down
1 change: 1 addition & 0 deletions gateway/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (gw *Handle) Setup(
gw.config = config
gw.logger = logger
gw.stats = stat
gw.tracer = stat.NewTracer("gateway")
gw.application = application
gw.backendConfig = backendConfig
gw.jobsDB = jobsDB
Expand Down
1 change: 1 addition & 0 deletions gateway/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type webRequestT struct {
reqType string
requestPayload []byte
authContext *gwtypes.AuthRequestContext
traceParent string
ipAddr string
userIDHeader string
errors []string
Expand Down
Loading

0 comments on commit 7dc7747

Please sign in to comment.