Skip to content

Commit a1f1985

Browse files
rochdevtlhunter
authored andcommitted
add dc store binding polyfill and migrate http2 (#3284)
* add dc store binding polyfill and migrate http2 * fix error handling for synchronous exceptions * OutboundPlugin#tagPeerService() --------- Co-authored-by: Thomas Hunter II <[email protected]>
1 parent e33a4ef commit a1f1985

File tree

6 files changed

+201
-70
lines changed

6 files changed

+201
-70
lines changed

packages/datadog-instrumentations/src/http2/client.js

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,27 @@
11
'use strict'
22

33
const shimmer = require('../../../datadog-shimmer')
4-
const { addHook, channel, AsyncResource } = require('../helpers/instrument')
4+
const { addHook, channel } = require('../helpers/instrument')
55

66
const connectChannel = channel('apm:http2:client:connect:start')
77
const startChannel = channel('apm:http2:client:request:start')
8-
const finishChannel = channel('apm:http2:client:request:finish')
8+
const endChannel = channel('apm:http2:client:request:end')
9+
const asyncStartChannel = channel('apm:http2:client:request:asyncStart')
10+
const asyncEndChannel = channel('apm:http2:client:request:asyncEnd')
911
const errorChannel = channel('apm:http2:client:request:error')
10-
const responseChannel = channel('apm:http2:client:response')
1112

12-
function createWrapEmit (requestResource, parentResource) {
13+
function createWrapEmit (ctx) {
1314
return function wrapEmit (emit) {
1415
return function (event, arg1) {
15-
requestResource.runInAsyncScope(() => {
16-
switch (event) {
17-
case 'response':
18-
responseChannel.publish(arg1)
19-
break
20-
case 'error':
21-
errorChannel.publish(arg1)
22-
case 'close': // eslint-disable-line no-fallthrough
23-
finishChannel.publish()
24-
break
25-
}
26-
})
16+
ctx.eventName = event
17+
ctx.eventData = arg1
2718

28-
return parentResource.runInAsyncScope(() => {
29-
return emit.apply(this, arguments)
19+
return asyncStartChannel.runStores(ctx, () => {
20+
try {
21+
return emit.apply(this, arguments)
22+
} finally {
23+
asyncEndChannel.publish(ctx)
24+
}
3025
})
3126
}
3227
}
@@ -35,17 +30,21 @@ function createWrapEmit (requestResource, parentResource) {
3530
function createWrapRequest (authority, options) {
3631
return function wrapRequest (request) {
3732
return function (headers) {
38-
const parentResource = new AsyncResource('bound-anonymous-fn')
39-
const requestResource = new AsyncResource('bound-anonymous-fn')
33+
const ctx = { headers, authority, options }
4034

41-
return requestResource.runInAsyncScope(() => {
42-
startChannel.publish({ headers, authority, options })
35+
return startChannel.runStores(ctx, () => {
36+
try {
37+
const req = request.apply(this, arguments)
4338

44-
const req = request.apply(this, arguments)
39+
shimmer.wrap(req, 'emit', createWrapEmit(ctx))
4540

46-
shimmer.wrap(req, 'emit', createWrapEmit(requestResource, parentResource))
47-
48-
return req
41+
return req
42+
} catch (e) {
43+
ctx.error = e
44+
errorChannel.publish(ctx)
45+
} finally {
46+
endChannel.publish(ctx)
47+
}
4948
})
5049
}
5150
}

packages/datadog-plugin-http2/src/client.js

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ const log = require('../../dd-trace/src/log')
88
const tags = require('../../../ext/tags')
99
const kinds = require('../../../ext/kinds')
1010
const formats = require('../../../ext/formats')
11-
const analyticsSampler = require('../../dd-trace/src/analytics_sampler')
1211
const { COMPONENT, CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
1312
const urlFilter = require('../../dd-trace/src/plugins/util/urlfilter')
1413

@@ -25,32 +24,11 @@ const HTTP2_HEADER_STATUS = ':status'
2524
const HTTP2_METHOD_GET = 'GET'
2625

2726
class Http2ClientPlugin extends ClientPlugin {
28-
static get id () {
29-
return 'http2'
30-
}
31-
32-
constructor (...args) {
33-
super(...args)
34-
35-
this.addSub('apm:http2:client:response', (headers) => {
36-
const span = storage.getStore().span
37-
const status = headers && headers[HTTP2_HEADER_STATUS]
38-
39-
span.setTag(HTTP_STATUS_CODE, status)
40-
41-
if (!this.config.validateStatus(status)) {
42-
this.addError()
43-
}
27+
static get id () { return 'http2' }
28+
static get prefix () { return `apm:http2:client:request` }
4429

45-
addHeaderTags(span, headers, HTTP_RESPONSE_HEADERS, this.config)
46-
})
47-
}
48-
49-
addTraceSub (eventName, handler) {
50-
this.addSub(`apm:${this.constructor.id}:client:${this.operation}:${eventName}`, handler)
51-
}
52-
53-
start ({ authority, options, headers = {} }) {
30+
bindStart (message) {
31+
const { authority, options, headers = {} } = message
5432
const sessionDetails = extractSessionDetails(authority, options)
5533
const path = headers[HTTP2_HEADER_PATH] || '/'
5634
const pathname = path.split(/[?#]/)[0]
@@ -75,7 +53,7 @@ class Http2ClientPlugin extends ClientPlugin {
7553
metrics: {
7654
[CLIENT_PORT_KEY]: parseInt(sessionDetails.port)
7755
}
78-
})
56+
}, false)
7957

8058
// TODO: Figure out a better way to do this for any span.
8159
if (!allowed) {
@@ -88,14 +66,53 @@ class Http2ClientPlugin extends ClientPlugin {
8866
this.tracer.inject(span, HTTP_HEADERS, headers)
8967
}
9068

91-
analyticsSampler.sample(span, this.config.measured)
69+
message.parentStore = store
70+
message.currentStore = { ...store, span }
71+
72+
return message.currentStore
73+
}
74+
75+
bindAsyncStart ({ eventName, eventData, currentStore, parentStore }) {
76+
switch (eventName) {
77+
case 'response':
78+
this._onResponse(currentStore, eventData)
79+
return parentStore
80+
case 'error':
81+
this._onError(currentStore, eventData)
82+
return parentStore
83+
case 'close':
84+
this._onClose(currentStore, eventData)
85+
return parentStore
86+
}
9287

93-
this.enter(span, store)
88+
return storage.getStore()
9489
}
9590

9691
configure (config) {
9792
return super.configure(normalizeConfig(config))
9893
}
94+
95+
_onResponse (store, headers) {
96+
const status = headers && headers[HTTP2_HEADER_STATUS]
97+
98+
store.span.setTag(HTTP_STATUS_CODE, status)
99+
100+
if (!this.config.validateStatus(status)) {
101+
storage.run(store, () => this.addError())
102+
}
103+
104+
addHeaderTags(store.span, headers, HTTP_RESPONSE_HEADERS, this.config)
105+
}
106+
107+
_onError ({ span }, error) {
108+
span.setTag('error', error)
109+
span.finish()
110+
}
111+
112+
_onClose ({ span }) {
113+
this.tagPeerService(span)
114+
span.finish()
115+
}
99116
}
100117

101118
function extractSessionDetails (authority, options) {

packages/dd-trace/src/plugins/outbound.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,17 @@ class OutboundPlugin extends TracingPlugin {
6262

6363
finish () {
6464
const span = this.activeSpan
65+
this.tagPeerService(span)
66+
super.finish(...arguments)
67+
}
68+
69+
tagPeerService (span) {
6570
if (this.tracer._computePeerService) {
6671
const peerData = this.getPeerService(span.context()._tags)
6772
if (peerData) {
6873
span.addTags(peerData)
6974
}
7075
}
71-
super.finish(...arguments)
7276
}
7377

7478
connect (url) {

packages/dd-trace/src/plugins/plugin.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,31 @@ class Subscription {
2525
}
2626
}
2727

28+
class StoreBinding {
29+
constructor (event, transform) {
30+
this._channel = dc.channel(event)
31+
this._transform = data => {
32+
const store = storage.getStore()
33+
34+
return !store || !store.noop
35+
? transform(data)
36+
: store
37+
}
38+
}
39+
40+
enable () {
41+
this._channel.bindStore(storage, this._transform)
42+
}
43+
44+
disable () {
45+
this._channel.unbindStore(storage, this._transform)
46+
}
47+
}
48+
2849
module.exports = class Plugin {
2950
constructor (tracer, tracerConfig) {
3051
this._subscriptions = []
52+
this._bindings = []
3153
this._enabled = false
3254
this._tracer = tracer
3355
this.config = {} // plugin-specific configuration, unset until .configure() is called
@@ -53,6 +75,10 @@ module.exports = class Plugin {
5375
this._subscriptions.push(new Subscription(channelName, handler))
5476
}
5577

78+
addBind (channelName, transform) {
79+
this._bindings.push(new StoreBinding(channelName, transform))
80+
}
81+
5682
addError (error) {
5783
const store = storage.getStore()
5884

@@ -71,9 +97,11 @@ module.exports = class Plugin {
7197
if (config.enabled && !this._enabled) {
7298
this._enabled = true
7399
this._subscriptions.forEach(sub => sub.enable())
100+
this._bindings.forEach(sub => sub.enable())
74101
} else if (!config.enabled && this._enabled) {
75102
this._enabled = false
76103
this._subscriptions.forEach(sub => sub.disable())
104+
this._bindings.forEach(sub => sub.disable())
77105
}
78106
}
79107
}

packages/dd-trace/src/plugins/tracing.js

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,7 @@ class TracingPlugin extends Plugin {
1313
this.component = this.constructor.component || this.constructor.id
1414
this.operation = this.constructor.operation
1515

16-
this.addTraceSub('start', message => {
17-
this.start(message)
18-
})
19-
20-
this.addTraceSub('error', err => {
21-
this.error(err)
22-
})
23-
24-
this.addTraceSub('finish', message => {
25-
this.finish(message)
26-
})
16+
this.addTraceSubs()
2717
}
2818

2919
get activeSpan () {
@@ -65,19 +55,45 @@ class TracingPlugin extends Plugin {
6555
this.addError(error)
6656
}
6757

58+
addTraceSubs () {
59+
const events = ['start', 'end', 'asyncStart', 'asyncEnd', 'error', 'finish']
60+
61+
for (const event of events) {
62+
const bindName = `bind${event.charAt(0).toUpperCase()}${event.slice(1)}`
63+
64+
if (this[event]) {
65+
this.addTraceSub(event, message => {
66+
this[event](message)
67+
})
68+
}
69+
70+
if (this[bindName]) {
71+
this.addTraceBind(event, message => this[bindName](message))
72+
}
73+
}
74+
}
75+
6876
addTraceSub (eventName, handler) {
69-
this.addSub(`apm:${this.component}:${this.operation}:${eventName}`, handler)
77+
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
78+
this.addSub(`${prefix}:${eventName}`, handler)
79+
}
80+
81+
addTraceBind (eventName, transform) {
82+
const prefix = this.constructor.prefix || `apm:${this.component}:${this.operation}`
83+
this.addBind(`${prefix}:${eventName}`, transform)
7084
}
7185

7286
addError (error) {
7387
const span = this.activeSpan
7488

7589
if (!span._spanContext._tags['error']) {
90+
// Errors may be wrapped in a context.
91+
error = (error && error.error) || error
7692
span.setTag('error', error || 1)
7793
}
7894
}
7995

80-
startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}) {
96+
startSpan (name, { childOf, kind, meta, metrics, service, resource, type } = {}, enter = true) {
8197
const store = storage.getStore()
8298

8399
if (store && childOf === undefined) {
@@ -100,7 +116,10 @@ class TracingPlugin extends Plugin {
100116

101117
analyticsSampler.sample(span, this.config.measured)
102118

103-
storage.enterWith({ ...store, span })
119+
// TODO: Remove this after migration to TracingChannel is done.
120+
if (enter) {
121+
storage.enterWith({ ...store, span })
122+
}
104123

105124
return span
106125
}

0 commit comments

Comments
 (0)