Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ end

desc "remove artifacts"
task :clean do
sh 'rm -rf build *egg*'
sh 'python setup.py clean'
sh 'rm -rf build *egg* *.whl dist'
end

desc "build the docs"
Expand Down
154 changes: 154 additions & 0 deletions ddtrace/contrib/pymongo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@

# 3p
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.collection import Collection
from wrapt import ObjectProxy

# project
from ...ext import AppTypes
from ...ext import mongo as mongox
from ...ext import net as netx


def trace_mongo_client(client, tracer, service="mongodb"):
tracer.set_service_info(
service=service,
app=mongox.TYPE,
app_type=AppTypes.db,
)
return TracedMongoClient(tracer, service, client)


class TracedMongoCollection(ObjectProxy):

_tracer = None
_srv = None
_collection_name = None

def __init__(self, tracer, service, database_name, collection):
super(TracedMongoCollection, self).__init__(collection)
self._tracer = tracer
self._srv = service
self._tags = {
mongox.COLLECTION: collection.name,
mongox.DB: database_name,
}
self._collection_name = collection.name

def find(self, filter=None, *args, **kwargs):
with self.__trace() as span:
span.set_tags(self._tags)
nf = '{}'
if filter:
nf = normalize_filter(filter)
span.set_tag(mongox.QUERY, nf)
span.resource = _create_resource("query", self._collection_name, nf)
cursor = self.__wrapped__.find(filter=filter, *args, **kwargs)
_set_cursor_tags(span, cursor)
return cursor

def insert_one(self, *args, **kwargs):
with self.__trace() as span:
span.resource = _create_resource("insert_one", self._collection_name)
span.set_tags(self._tags)
return self.__wrapped__.insert(*args, **kwargs)

def insert_many(self, *args, **kwargs):
with self.__trace() as span:
span.resource = _create_resource("insert_many", self._collection_name)
span.set_tags(self._tags)
span.set_tag(mongox.ROWS, len(args[0]))
return self.__wrapped__.insert_many(*args, **kwargs)

def delete_one(self, filter):
with self.__trace() as span:
nf = '{}'
if filter:
nf = normalize_filter(filter)
span.resource = _create_resource("delete_one", self._collection_name, nf)
span.set_tags(self._tags)
return self.__wrapped__.delete_one(filter)

def delete_many(self, filter):
with self.__trace() as span:
nf = '{}'
if filter:
nf = normalize_filter(filter)
span.resource = _create_resource("delete_many", self._collection_name, nf)
span.set_tags(self._tags)
return self.__wrapped__.delete_many(filter)

def __trace(self):
return self._tracer.trace("pymongo.cmd", span_type=mongox.TYPE, service=self._srv)



class TracedMongoDatabase(ObjectProxy):

_tracer = None
_srv = None
_name = None

def __init__(self, tracer, service, db):
super(TracedMongoDatabase, self).__init__(db)
self._tracer = tracer
self._srv = service
self._name = db.name

def __getattr__(self, name):
c = getattr(self.__wrapped__, name)
if isinstance(c, Collection) and not isinstance(c, TracedMongoCollection):
return TracedMongoCollection(self._tracer, self._srv, self._name, c)
else:
return c

def __getitem__(self, name):
c = self.__wrapped__[name]
return TracedMongoCollection(self._tracer, self._srv, self._name, c)

class TracedMongoClient(ObjectProxy):

_tracer = None
_srv = None

def __init__(self, tracer, service, client):
super(TracedMongoClient, self).__init__(client)
self._tracer = tracer
self._srv = service

def __getitem__(self, name):
db = self.__wrapped__[name]
return TracedMongoDatabase(self._tracer, self._srv, db)

def normalize_filter(f=None):
if f is None:
return {}
elif isinstance(f, list):
# normalize lists of filters (e.g. {$or: [ { age: { $lt: 30 } }, { type: 1 } ]})
return [normalize_filter(s) for s in f]
else:
# normalize dicts of filters (e.g. {$or: [ { age: { $lt: 30 } }, { type: 1 } ]})
out = {}
for k, v in f.iteritems():
if isinstance(v, list) or isinstance(v, dict):
# RECURSION ALERT: needs to move to the agent
out[k] = normalize_filter(v)
else:
out[k] = '?'
return out

def _set_cursor_tags(span, cursor):
# the address is only set after the cursor is done.
if cursor and cursor.address:
span.set_tag(netx.TARGET_HOST, cursor.address[0])
span.set_tag(netx.TARGET_PORT, cursor.address[1])

def _create_resource(op, collection=None, filter=None):
if op and collection and filter:
return "%s %s %s" % (op, collection, filter)
elif op and collection:
return "%s %s" % (op, collection)
else:
return op

7 changes: 7 additions & 0 deletions ddtrace/ext/mongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

TYPE = 'mongodb'

COLLECTION = 'mongodb.collection'
DB = 'mongodb.db'
ROWS = 'mongodb.rows'
QUERY = 'mongodb.query'
2 changes: 1 addition & 1 deletion ddtrace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def pprint(self):
('type', self.span_type),
("start", self.start),
("end", "" if not self.duration else self.start + self.duration),
("duration", self.duration),
("duration", "%fs" % self.duration),
("error", self.error),
("tags", "")
]
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@
packages=find_packages(exclude=['tests*']),
tests_require=tests_require,
test_suite="nose.collector",
install_requires=[
"wrapt"
]
)
Empty file.
157 changes: 157 additions & 0 deletions tests/contrib/pymongo/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@

# stdlib
import time

# 3p
from nose.tools import eq_
from pymongo import MongoClient

# project
from ddtrace.contrib.pymongo import trace_mongo_client, normalize_filter
from ddtrace import Tracer

from ...test_tracer import DummyWriter


def test_normalize_filter():
# ensure we can properly normalize queries FIXME[matt] move to the agent
cases = [
(None, {}),
(
{"team":"leafs"},
{"team": "?"},
),
(
{"age": {"$gt" : 20}},
{"age": {"$gt" : "?"}},
),
(
{
"status": "A",
"$or": [ { "age": { "$lt": 30 } }, { "type": 1 } ]
},
{
"status": "?",
"$or": [ { "age": { "$lt": "?" } }, { "type": "?" } ]
}
)
]
for i, expected in cases:
out = normalize_filter(i)
eq_(expected, out)


def test_delete():
# ensure we trace deletes
tracer, client = _get_tracer_and_client("songdb")
writer = tracer.writer
db = client["testdb"]
db.drop_collection("songs")
input_songs = [
{'name' : 'Powderfinger', 'artist':'Neil'},
{'name' : 'Harvest', 'artist':'Neil'},
{'name' : 'Suzanne', 'artist':'Leonard'},
{'name' : 'Partisan', 'artist':'Leonard'},
]
db.songs.insert_many(input_songs)

# test delete one
af = {'artist':'Neil'}
eq_(db.songs.count(af), 2)
db.songs.delete_one(af)
eq_(db.songs.count(af), 1)

# test delete many
af = {'artist':'Leonard'}
eq_(db.songs.count(af), 2)
db.songs.delete_many(af)
eq_(db.songs.count(af), 0)

# ensure all is traced.
spans = writer.pop()
assert spans, spans
for span in spans:
# ensure all the of the common metadata is set
eq_(span.service, "songdb")
eq_(span.span_type, "mongodb")
eq_(span.meta.get("mongodb.collection"), "songs")
eq_(span.meta.get("mongodb.db"), "testdb")

expected_resources = set([
"insert_many songs",
"delete_one songs {'artist': '?'}",
"delete_many songs {'artist': '?'}",
])

eq_(expected_resources, {s.resource for s in spans})


def test_insert_find():
tracer, client = _get_tracer_and_client("pokemongodb")
writer = tracer.writer

db = client["testdb"]
db.drop_collection("teams")
teams = [
{
'name' : 'Toronto Maple Leafs',
'established' : 1917,
},
{
'name' : 'Montreal Canadiens',
'established' : 1910,
},
{
'name' : 'New York Rangers',
'established' : 1926,
}
]

# create some data (exercising both ways of inserting)
start = time.time()

db.teams.insert_one(teams[0])
db.teams.insert_many(teams[1:])

# query some data
cursor = db.teams.find()
count = 0
for row in cursor:
count += 1
eq_(count, len(teams))

queried = list(db.teams.find({"name": "Toronto Maple Leafs"}))
end = time.time()
eq_(len(queried), 1)
eq_(queried[0]["name"], "Toronto Maple Leafs")
eq_(queried[0]["established"], 1917)

spans = writer.pop()
for span in spans:
# ensure all the of the common metadata is set
eq_(span.service, "pokemongodb")
eq_(span.span_type, "mongodb")
eq_(span.meta.get("mongodb.collection"), "teams")
eq_(span.meta.get("mongodb.db"), "testdb")
assert span.start > start
assert span.duration < end - start

expected_resources = set([
"insert_many teams",
"insert_one teams",
"query teams {}",
"query teams {'name': '?'}",
])

eq_(expected_resources, {s.resource for s in spans})

def _get_tracer_and_client(service):
""" Return a tuple of (tracer, mongo_client) for testing. """
tracer = Tracer()
writer = DummyWriter()
tracer.writer = writer
original_client = MongoClient()
client = trace_mongo_client(original_client, tracer, service=service)
return tracer, client