Skip to content

Commit

Permalink
more testing
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Sep 19, 2024
1 parent 214a679 commit c5e65b2
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 179 deletions.
148 changes: 88 additions & 60 deletions packages/datadog-instrumentations/src/protobufjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,102 +6,120 @@ const finishSerializeCh = channel('datadog:protobuf:serialize:finish')
const startDeserializeCh = channel('datadog:protobuf:deserialize:start')
const finishDeserializeCh = channel('datadog:protobuf:deserialize:finish')

function wrapSerialization (Class) {
shimmer.wrap(Class, 'encode', original => {
return function wrappedEncode (...args) {
if (!startSerializeCh.hasSubscribers && !finishSerializeCh.hasSubscribers) {
return original.apply(this, args)
}

const asyncResource = new AsyncResource('bound-anonymous-fn')

asyncResource.runInAsyncScope(() => {
startSerializeCh.publish({ message: this })
})

try {
// when applying the original encode / decode functions, protobuf sets up the classes again
// causing our function wrappers to dissappear, we should verify they exist and rewrap if not
const wrappedDecode = this.decode
const wrappedEncode = this.encode
const result = original.apply(this, args)
ensureMessageIsWrapped(this, wrappedEncode, wrappedDecode)

if (original) {
asyncResource.runInAsyncScope(() => {
finishSerializeCh.publish({ message: this })
})
}
return result
} catch (err) {
asyncResource.runInAsyncScope(() => {
finishSerializeCh.publish({ message: this })
})
throw err
}
}
})
}

function ensureMessageIsWrapped (messageClass, wrappedEncode, wrappedDecode) {
if (messageClass.encode !== wrappedEncode) {
messageClass.encode = wrappedEncode
function wrapSerialization (messageClass) {
if (messageClass?.encode) {
wrapOperation(messageClass, 'encode', {
startChPublish: (obj, args) => startSerializeCh.publish({ message: obj }),
finishChPublish: (result) => finishSerializeCh.publish({ buffer: result }),
startCh: startSerializeCh,
finishCh: finishSerializeCh
})
}
}

if (messageClass.decode !== wrappedDecode) {
messageClass.decode = wrappedDecode
function wrapDeserialization (messageClass) {
if (messageClass?.decode) {
wrapOperation(messageClass, 'decode', {
startChPublish: (obj, args) => startDeserializeCh.publish({ buffer: args[0] }),
finishChPublish: (result) => finishDeserializeCh.publish({ message: result }),
startCh: startDeserializeCh,
finishCh: finishDeserializeCh
})
}
}

function wrapDeserialization (Class) {
shimmer.wrap(Class, 'decode', original => {
return function wrappedDecode (...args) {
if (!startDeserializeCh.hasSubscribers && !finishDeserializeCh.hasSubscribers) {
function wrapOperation (messageClass, operationName, channels) {
shimmer.wrap(messageClass, operationName, original => {
return function wrappedMethod (...args) {
if (!channels.startCh.hasSubscribers && !channels.finishCh.hasSubscribers) {
return original.apply(this, args)
}

const asyncResource = new AsyncResource('bound-anonymous-fn')

asyncResource.runInAsyncScope(() => {
startDeserializeCh.publish({ buffer: args[0] })
channels.startChPublish(this, args)
})

try {
// when applying the original encode / decode functions, protobuf sets up the classes again
// causing our function wrappers to dissappear, we should verify they exist and rewrap if not

const wrappedDecode = this.decode
const wrappedEncode = this.encode
const result = original.apply(this, args)
ensureMessageIsWrapped(this, wrappedEncode, wrappedDecode)

asyncResource.runInAsyncScope(() => {
finishDeserializeCh.publish({ message: result })
channels.finishChPublish(result)
})

return result
} catch (err) {
asyncResource.runInAsyncScope(() => {
finishDeserializeCh.publish({ buffer: args[0] })
channels.finishChPublish(args)
})
throw err
}
}
})
}

function wrapSetup (messageClass) {
if (messageClass?.setup) {
shimmer.wrap(messageClass, 'setup', original => {
return function wrappedSetup (...args) {
const result = original.apply(this, args)

wrapSerialization(messageClass)
wrapDeserialization(messageClass)

return result
}
})
}
}

function wrapProtobufClasses (root) {
if (!root) {
// pass
} else if (root.decode) {
wrapSerialization(root)
wrapDeserialization(root)
} else if (root.nestedArray) {
return
}

if (root.decode) {
wrapSetup(root)
}

if (root.nestedArray) {
for (const subRoot of root.nestedArray) {
wrapProtobufClasses(subRoot)
}
}
}

function wrapReflection (protobuf) {
const reflectionMethods = [
{
target: protobuf.Root,
name: 'fromJSON'
},
{
target: protobuf.Type.prototype,
name: 'fromObject'
}
]

reflectionMethods.forEach(method => {
shimmer.wrap(method.target, method.name, original => {
return function wrappedReflectionMethod (...args) {
const result = original.apply(this, args)
if (result.nested) {
for (const type in result.nested) {
wrapSetup(result.nested[type])
}
}
if (result.$type) {
wrapSetup(result.$type)
}
return result
}
})
})
}

addHook({
name: 'protobufjs',
versions: ['>=6.0.0']
Expand All @@ -124,5 +142,15 @@ addHook({
}
})

shimmer.wrap(protobuf, 'Type', Original => {
return function wrappedTypeConstructor (...args) {
const typeInstance = new Original(...args)
wrapSetup(typeInstance)
return typeInstance
}
})

wrapReflection(protobuf)

return protobuf
})
4 changes: 3 additions & 1 deletion packages/datadog-plugin-protobufjs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class ProtobufjsPlugin extends Plugin {
handleSerializeStart ({ message }) {
const activeSpan = this.tracer.scope().active()
if (activeSpan) {
SchemaExtractor.attachSchemaOnSpan(message, activeSpan, SERIALIZATION, this.tracer._dataStreamsProcessor)
SchemaExtractor.attachSchemaOnSpan(
message.$type ?? message, activeSpan, SERIALIZATION, this.tracer._dataStreamsProcessor
)
}
}

Expand Down
Loading

0 comments on commit c5e65b2

Please sign in to comment.