Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tracing): implement protobufjs DSM schema support #4701

Merged
merged 28 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eb12e15
add dsm schema tracking
wconti27 Sep 13, 2024
87c5e5a
add protobuf schemas support for DSM
wconti27 Sep 18, 2024
4b187c4
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 19, 2024
6b8b0b3
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 19, 2024
a9cf1fc
revert fetch change
wconti27 Sep 19, 2024
38c1419
fix tests
wconti27 Sep 19, 2024
214a679
fix test
wconti27 Sep 19, 2024
c5e65b2
more testing
wconti27 Sep 19, 2024
be68bd5
clean up tests
wconti27 Sep 19, 2024
8186fbd
ensure shcmeas only added if dsm is enabled
wconti27 Sep 19, 2024
7f56ce0
more tests
wconti27 Sep 23, 2024
40a2fe8
increase schema sampler cache
wconti27 Sep 23, 2024
4fa3e81
fix sampling
wconti27 Sep 24, 2024
79e4945
fix sampling
wconti27 Sep 24, 2024
6028c36
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 24, 2024
af12edb
fix protobuf google-api
wconti27 Sep 24, 2024
48fa7ff
using tracing channel and abstract schemaPlugin
wconti27 Sep 25, 2024
2748d35
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 26, 2024
237c316
only use specific trace channels
wconti27 Sep 27, 2024
6b01e44
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Sep 27, 2024
1ec86fe
revert tracingChannel change
wconti27 Sep 27, 2024
1f42aab
fix channels
wconti27 Sep 27, 2024
da15fc1
fix proto lint
wconti27 Oct 3, 2024
b294d0e
more cleanup
wconti27 Oct 3, 2024
2adeb42
fix protobuf tests and use version range for testing
wconti27 Oct 3, 2024
af18e69
add necessary docs
wconti27 Oct 3, 2024
55d74fe
fix promise code
wconti27 Oct 4, 2024
af77ce5
Merge branch 'master' into conti/support-protobuf-schemas
wconti27 Oct 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,15 @@ jobs:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test

protobufjs:
runs-on: ubuntu-latest
env:
PLUGINS: protobufjs
DD_DATA_STREAMS_ENABLED: true
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test-and-upstream

q:
runs-on: ubuntu-latest
env:
Expand Down
1 change: 1 addition & 0 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ module.exports = {
playwright: () => require('../playwright'),
'promise-js': () => require('../promise-js'),
promise: () => require('../promise'),
protobufjs: () => require('../protobufjs'),
q: () => require('../q'),
qs: () => require('../qs'),
redis: () => require('../redis'),
Expand Down
156 changes: 156 additions & 0 deletions packages/datadog-instrumentations/src/protobufjs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
const shimmer = require('../../datadog-shimmer')
const { channel, addHook, AsyncResource } = require('./helpers/instrument')

const startSerializeCh = channel('datadog:protobuf:serialize:start')
const finishSerializeCh = channel('datadog:protobuf:serialize:finish')
const startDeserializeCh = channel('datadog:protobuf:deserialize:start')
const finishDeserializeCh = channel('datadog:protobuf:deserialize:finish')
wconti27 marked this conversation as resolved.
Show resolved Hide resolved

function wrapSerialization (messageClass) {
if (messageClass?.encode) {
wrapOperation(messageClass, 'encode', {
startChPublish: (obj, args) => startSerializeCh.publish({ message: obj }),
finishChPublish: (result) => finishSerializeCh.publish({ buffer: result }),
startCh: startSerializeCh,
finishCh: finishSerializeCh
})
}
}

function wrapDeserialization (messageClass) {
if (messageClass?.decode) {
wrapOperation(messageClass, 'decode', {
startChPublish: (obj, args) => startDeserializeCh.publish({ buffer: args[0] }),
finishChPublish: (result) => finishDeserializeCh.publish({ message: result }),
startCh: startDeserializeCh,
finishCh: finishDeserializeCh
})
}
}

function wrapOperation (messageClass, operationName, channels) {
shimmer.wrap(messageClass, operationName, original => {
return function wrappedMethod (...args) {
if (!channels.startCh.hasSubscribers && !channels.finishCh.hasSubscribers) {
return original.apply(this, args)
}

const asyncResource = new AsyncResource('bound-anonymous-fn')

asyncResource.runInAsyncScope(() => {
channels.startChPublish(this, args)
})

try {
const result = original.apply(this, args)

asyncResource.runInAsyncScope(() => {
channels.finishChPublish(result)
})

return result
} catch (err) {
asyncResource.runInAsyncScope(() => {
channels.finishChPublish(args)
})
throw err
}
}
})
}

function wrapSetup (messageClass) {
if (messageClass?.setup) {
shimmer.wrap(messageClass, 'setup', original => {
return function wrappedSetup (...args) {
const result = original.apply(this, args)

wrapSerialization(messageClass)
wrapDeserialization(messageClass)

return result
}
})
}
}

function wrapProtobufClasses (root) {
if (!root) {
return
}

if (root.decode) {
wrapSetup(root)
}

if (root.nestedArray) {
for (const subRoot of root.nestedArray) {
wrapProtobufClasses(subRoot)
}
}
}

function wrapReflection (protobuf) {
const reflectionMethods = [
{
target: protobuf.Root,
name: 'fromJSON'
},
{
target: protobuf.Type.prototype,
name: 'fromObject'
}
]

reflectionMethods.forEach(method => {
shimmer.wrap(method.target, method.name, original => {
return function wrappedReflectionMethod (...args) {
const result = original.apply(this, args)
if (result.nested) {
for (const type in result.nested) {
wrapSetup(result.nested[type])
}
}
if (result.$type) {
wrapSetup(result.$type)
}
return result
}
})
})
}

addHook({
name: 'protobufjs',
versions: ['>=6.0.0']
}, protobuf => {
shimmer.wrap(protobuf.Root.prototype, 'load', original => {
return function wrappedLoad (...args) {
const result = original.apply(this, args)
result.then(root => {
wrapProtobufClasses(root)
})
return result
}
})

shimmer.wrap(protobuf.Root.prototype, 'loadSync', original => {
return function wrappedLoadSync (...args) {
const root = original.apply(this, args)
wrapProtobufClasses(root)
return root
}
})

shimmer.wrap(protobuf, 'Type', Original => {
return function wrappedTypeConstructor (...args) {
const typeInstance = new Original(...args)
wrapSetup(typeInstance)
return typeInstance
}
})

wrapReflection(protobuf)

return protobuf
})
36 changes: 36 additions & 0 deletions packages/datadog-plugin-protobufjs/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const Plugin = require('../../dd-trace/src/plugins/plugin')
const SchemaExtractor = require('./schema_iterator')

const SERIALIZATION = 'serialization'
const DESERIALIZATION = 'deserialization'

class ProtobufjsPlugin extends Plugin {
static get id () {
return 'protobufjs'
}

constructor (...args) {
super(...args)

this.addSub('datadog:protobuf:serialize:start', this.handleSerializeStart.bind(this))
this.addSub('datadog:protobuf:deserialize:finish', this.handleDeserializeFinish.bind(this))
}

handleSerializeStart ({ message }) {
const activeSpan = this.tracer.scope().active()
if (activeSpan) {
SchemaExtractor.attachSchemaOnSpan(
message.$type ?? message, activeSpan, SERIALIZATION, this.tracer._dataStreamsProcessor
)
}
}

handleDeserializeFinish ({ message }) {
const activeSpan = this.tracer.scope().active()
if (activeSpan) {
SchemaExtractor.attachSchemaOnSpan(message.$type, activeSpan, DESERIALIZATION, this.tracer._dataStreamsProcessor)
}
}
}

module.exports = ProtobufjsPlugin
180 changes: 180 additions & 0 deletions packages/datadog-plugin-protobufjs/src/schema_iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
const PROTOBUF = 'protobuf'
const {
SCHEMA_DEFINITION,
SCHEMA_ID,
SCHEMA_NAME,
SCHEMA_OPERATION,
SCHEMA_WEIGHT,
SCHEMA_TYPE
} = require('../../dd-trace/src/constants')
const {
SchemaBuilder
} = require('../../dd-trace/src/datastreams/schemas/schema_builder')

class SchemaExtractor {
constructor (schema) {
this.schema = schema
}

static getTypeAndFormat (type) {
const typeFormatMapping = {
int32: ['integer', 'int32'],
int64: ['integer', 'int64'],
uint32: ['integer', 'uint32'],
uint64: ['integer', 'uint64'],
sint32: ['integer', 'sint32'],
sint64: ['integer', 'sint64'],
fixed32: ['integer', 'fixed32'],
fixed64: ['integer', 'fixed64'],
sfixed32: ['integer', 'sfixed32'],
sfixed64: ['integer', 'sfixed64'],
float: ['number', 'float'],
double: ['number', 'double'],
bool: ['boolean', null],
string: ['string', null],
bytes: ['string', 'byte'],
Enum: ['enum', null],
Type: ['type', null],
map: ['map', null],
repeated: ['array', null]
}

return typeFormatMapping[type] || ['string', null]
}

static extractProperty (field, schemaName, fieldName, builder, depth) {
let array = false
let description
let ref
let enumValues

const resolvedType = field.resolvedType ? field.resolvedType.constructor.name : field.type

const isRepeatedField = field.rule === 'repeated'

let typeFormat = this.getTypeAndFormat(isRepeatedField ? 'repeated' : resolvedType)
let type = typeFormat[0]
let format = typeFormat[1]

if (type === 'array') {
array = true
typeFormat = this.getTypeAndFormat(resolvedType)
type = typeFormat[0]
format = typeFormat[1]
}

if (type === 'type') {
format = null
ref = `#/components/schemas/${removeLeadingPeriod(field.resolvedType.fullName)}`
// keep a reference to the original builder iterator since when we recurse this reference will get reset to
// deeper schemas
const originalSchemaExtractor = builder.iterator
if (!this.extractSchema(field.resolvedType, builder, depth, this)) {
return false
}
type = 'object'
builder.iterator = originalSchemaExtractor
} else if (type === 'enum') {
enumValues = []
let i = 0
while (field.resolvedType.valuesById[i]) {
enumValues.push(field.resolvedType.valuesById[i])
i += 1
}
}
return builder.addProperty(schemaName, fieldName, array, type, description, ref, format, enumValues)
}

static extractSchema (schema, builder, depth, extractor) {
depth += 1
const schemaName = removeLeadingPeriod(schema.resolvedType ? schema.resolvedType.fullName : schema.fullName)
if (extractor) {
// if we already have a defined extractor, this is a nested schema. create a new extractor for the nested
// schema, ensure it is added to our schema builder's cache, and replace the builders iterator with our
// nested schema iterator / extractor. Then add the new schema to our builder's schemas.
const nestedSchemaExtractor = new SchemaExtractor(schema)
builder.iterator = nestedSchemaExtractor
const nestedSchema = SchemaBuilder.getSchema(schemaName, nestedSchemaExtractor, builder)
for (const nestedSubSchemaName in nestedSchema.components.schemas) {
if (nestedSchema.components.schemas.hasOwnProperty(nestedSubSchemaName)) {
builder.schema.components.schemas[nestedSubSchemaName] = nestedSchema.components.schemas[nestedSubSchemaName]
}
}
return true
} else {
if (!builder.shouldExtractSchema(schemaName, depth)) {
return false
}
try {
for (const field of schema.fieldsArray) {
if (!this.extractProperty(field, schemaName, field.name, builder, depth)) {
return false
}
}
} catch (error) {
return false
}
return true
}
}

static extractSchemas (descriptor, dataStreamsProcessor) {
const schemaName = removeLeadingPeriod(
descriptor.resolvedType ? descriptor.resolvedType.fullName : descriptor.fullName
)
return dataStreamsProcessor.getSchema(schemaName, new SchemaExtractor(descriptor))
}

iterateOverSchema (builder) {
this.constructor.extractSchema(this.schema, builder, 0)
}

static attachSchemaOnSpan (descriptor, span, operation, dataStreamsProcessor) {
if (!descriptor || !span) {
return
}

if (span.context()._tags[SCHEMA_TYPE] && operation === 'serialization') {
// we have already added a schema to this span, this call is an encode of nested schema types
return
}

span.setTag(SCHEMA_TYPE, PROTOBUF)
span.setTag(SCHEMA_NAME, removeLeadingPeriod(descriptor.fullName))
span.setTag(SCHEMA_OPERATION, operation)

if (!dataStreamsProcessor.canSampleSchema(operation)) {
return
}

// const prio = span.context.samplingPriority
wconti27 marked this conversation as resolved.
Show resolved Hide resolved
// if (prio === null || prio <= 0) {
// return
// }

const weight = dataStreamsProcessor.trySampleSchema(operation)
if (weight === 0) {
return
}

const schemaData = SchemaBuilder.getSchemaDefinition(
this.extractSchemas(descriptor, dataStreamsProcessor)
)

span.setTag(SCHEMA_DEFINITION, schemaData.definition)
span.setTag(SCHEMA_WEIGHT, weight)
span.setTag(SCHEMA_ID, schemaData.id)
}
}

function removeLeadingPeriod (str) {
// Check if the first character is a period
if (str.charAt(0) === '.') {
// Remove the first character
return str.slice(1)
}
// Return the original string if the first character is not a period
return str
}

module.exports = SchemaExtractor
Loading
Loading