Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ end

desc "remove artifacts"
task :clean do
sh 'rm -rf build *egg*'
sh 'python setup.py clean'
sh 'rm -rf build *egg* *.whl dist'
end

desc "build the docs"
Expand Down
2 changes: 2 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies:
- docker pull cassandra:3
- docker pull postgres:9.5
- docker pull redis:3.2
- docker pull mongo:3.2
database:
override:
- sudo service postgresql stop
Expand All @@ -28,6 +29,7 @@ database:
- docker run -d -p 9042:9042 cassandra:3
- docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=test postgres:9.5
- docker run -d -p 6379:6379 redis:3.2
- docker run -d mongo:3.2
# Wait for Cassandra to be ready
- until nc -v -z localhost 9042 ; do sleep 0.2 ; done
# Wait for Postgres to be ready
Expand Down
7 changes: 7 additions & 0 deletions ddtrace/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
except ImportError:
import json

def iteritems(obj, **kwargs):
func = getattr(obj, "iteritems", None)
if not func:
func = obj.items
return func(**kwargs)


__all__ = [
'PY2',
Expand All @@ -37,4 +43,5 @@
'Queue',
'StringIO',
'json',
'iteritems'
]
23 changes: 23 additions & 0 deletions ddtrace/contrib/pymongo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
The pymongo integration works by wrapping pymongo's MongoClient to trace
network calls. Basic usage::

from pymongo import MongoClient
from ddtrace import tracer
from ddtrace.contrib.pymongo import trace_mongo_client

client = trace_mongo_client(
MongoClient(), tracer, "my-mongo-db")

db = client["test-db"]
db.teams.find({"name": "Toronto Maple Leafs"})
"""

from ..util import require_modules

required_modules = ['pymongo']

with require_modules(required_modules) as missing_modules:
if not missing_modules:
from .trace import trace_mongo_client
__all__ = ['trace_mongo_client']
55 changes: 55 additions & 0 deletions ddtrace/contrib/pymongo/parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@


class Command(object):
""" Command stores information about a pymongo network command, """

__slots__ = ['name', 'coll', 'tags', 'metrics', 'query']

def __init__(self, name, coll):
self.name = name
self.coll = coll
Copy link
Contributor

@talwai talwai Aug 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call it collection? it's not that long nvm looks like that's a pymongo thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea same same

self.tags = {}
self.metrics = {}
self.query = None


def parse_query(query):
""" Return a command parsed from the given mongo db query. """
cmd = Command("query", query.coll)
cmd.query = query.spec
return cmd

def parse_spec(spec):
""" Return a Command that has parsed the relevant detail for the given
pymongo SON spec.
"""

# the first element is the command and collection
items = list(spec.items())
if not items:
return None
name, coll = items[0]
cmd = Command(name, coll)

if 'ordered' in spec: # in insert and update
cmd.tags['mongodb.ordered'] = spec['ordered']

if cmd.name == 'insert':
if 'documents' in spec:
cmd.metrics['mongodb.documents'] = len(spec['documents'])

elif cmd.name == 'update':
updates = spec.get('updates')
if updates:
# FIXME[matt] is there ever more than one here?
cmd.query = updates[0].get("q")

elif cmd.name == 'delete':
dels = spec.get('deletes')
if dels:
# FIXME[matt] is there ever more than one here?
cmd.query = dels[0].get("q")

return cmd


197 changes: 197 additions & 0 deletions ddtrace/contrib/pymongo/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@

# stdlib
import contextlib
import logging

# 3p
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.collection import Collection
from wrapt import ObjectProxy

# project
from ...compat import iteritems
from ...ext import AppTypes
from ...ext import mongo as mongox
from ...ext import net as netx
from .parse import parse_spec, parse_query, Command


log = logging.getLogger(__name__)


def trace_mongo_client(client, tracer, service=mongox.TYPE):
tracer.set_service_info(
service=service,
app=mongox.TYPE,
app_type=AppTypes.db,
)
return TracedMongoClient(tracer, service, client)


class TracedSocket(ObjectProxy):

_tracer = None
_srv = None

def __init__(self, tracer, service, sock):
super(TracedSocket, self).__init__(sock)
self._tracer = tracer
self._srv = service

def command(self, dbname, spec, *args, **kwargs):
cmd = None
try:
cmd = parse_spec(spec)
except Exception:
log.exception("error parsing spec. skipping trace")

# skip tracing if we don't have a piece of data we need
if not dbname or not cmd:
return self.__wrapped__.command(dbname, spec, *args, **kwargs)

with self.__trace(dbname, cmd) as span:
return self.__wrapped__.command(dbname, spec, *args, **kwargs)

def write_command(self, *args, **kwargs):
# FIXME[matt] parse the db name and collection from the
# message.
coll = ""
db = ""
cmd = Command("insert_many", coll)
with self.__trace(db, cmd) as s:
s.resource = "insert_many"
result = self.__wrapped__.write_command(*args, **kwargs)
if result:
s.set_metric(mongox.ROWS, result.get("n", -1))
return result

def __trace(self, db, cmd):
s = self._tracer.trace(
"pymongo.cmd",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think mongodb.cmd is more consistent with postgres.query / cassandra.query / redis.command ?

span_type=mongox.TYPE,
service=self._srv,
)

if db: s.set_tag(mongox.DB, db)
if cmd:
s.set_tag(mongox.COLLECTION, cmd.coll)
s.set_tags(cmd.tags)
# s.set_metrics(cmd.metrics) FIXME[matt] uncomment whe rebase

s.resource = _resource_from_cmd(cmd)
if self.address:
_set_address_tags(s, self.address)
return s


class TracedServer(ObjectProxy):

_tracer = None
_srv = None

def __init__(self, tracer, service, topology):
super(TracedServer, self).__init__(topology)
self._tracer = tracer
self._srv = service

def send_message_with_response(self, operation, *args, **kwargs):

# if we're processing something unexpected, just skip tracing.
if getattr(operation, 'name', None) != 'find':
return self.__wrapped__.send_message_with_response(
operation,
*args,
**kwargs)

# trace the given query.
cmd = parse_query(operation)
with self._tracer.trace(
"pymongo.cmd",
span_type=mongox.TYPE,
service=self._srv) as span:

span.resource = _resource_from_cmd(cmd)
span.set_tag(mongox.DB, operation.db)
span.set_tag(mongox.COLLECTION, cmd.coll)
span.set_tags(cmd.tags)

result = self.__wrapped__.send_message_with_response(
operation,
*args,
**kwargs
)

if result and result.address:
_set_address_tags(span, result.address)
return result

@contextlib.contextmanager
def get_socket(self, *args, **kwargs):
with self.__wrapped__.get_socket(*args, **kwargs) as s:
if isinstance(s, TracedSocket):
yield s
else:
yield TracedSocket(self._tracer, self._srv, s)

class TracedTopology(ObjectProxy):

_tracer = None
_srv = None

def __init__(self, tracer, service, topology):
super(TracedTopology, self).__init__(topology)
self._tracer = tracer
self._srv = service

def select_server(self, *args, **kwargs):
s = self.__wrapped__.select_server(*args, **kwargs)
if isinstance(s, TracedServer):
return s
else:
return TracedServer(self._tracer, self._srv, s)


class TracedMongoClient(ObjectProxy):

_tracer = None
_srv = None

def __init__(self, tracer, service, client):
client._topology = TracedTopology(tracer, service, client._topology)
super(TracedMongoClient, self).__init__(client)
self._tracer = tracer
self._srv = service


def normalize_filter(f=None):
if f is None:
return {}
elif isinstance(f, list):
# normalize lists of filters
# e.g. {$or: [ { age: { $lt: 30 } }, { type: 1 } ]}
return [normalize_filter(s) for s in f]
else:
# normalize dicts of filters
# e.g. {$or: [ { age: { $lt: 30 } }, { type: 1 } ]})
out = {}
for k, v in iteritems(f):
if isinstance(v, list) or isinstance(v, dict):
# RECURSION ALERT: needs to move to the agent
out[k] = normalize_filter(v)
else:
out[k] = '?'
return out

def _set_address_tags(span, address):
# the address is only set after the cursor is done.
if address:
span.set_tag(netx.TARGET_HOST, address[0])
span.set_tag(netx.TARGET_PORT, address[1])

def _resource_from_cmd(cmd):
if cmd.query is not None:
nq = normalize_filter(cmd.query)
return "%s %s %s" % (cmd.name, cmd.coll, nq)
else:
return "%s %s" % (cmd.name, cmd.coll)
8 changes: 8 additions & 0 deletions ddtrace/ext/mongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

TYPE = 'mongodb'

COLLECTION = 'mongodb.collection'
DB = 'mongodb.db'
ROWS = 'mongodb.rows'
QUERY = 'mongodb.query'

4 changes: 2 additions & 2 deletions ddtrace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ def pprint(self):
('type', self.span_type),
("start", self.start),
("end", "" if not self.duration else self.start + self.duration),
("duration", self.duration),
("duration", "%fs" % self.duration),
("error", self.error),
("tags", "")
]

lines.extend((" ", "%s:%s" % kv) for kv in self.meta.items())
lines.extend((" ", "%s:%s" % kv) for kv in sorted(self.meta.items()))
return "\n".join("%10s %s" % l for l in lines)

def __enter__(self):
Expand Down
5 changes: 5 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ Postgres

.. automodule:: ddtrace.contrib.psycopg

Pymongo
~~~~~~~

.. automodule:: ddtrace.contrib.pymongo

Redis
~~~~~

Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
'elasticsearch',
'flask',
'psycopg2',
'pymongo',
'redis',
]



version = __version__
# Append a suffix to the version for dev builds
if os.environ.get('VERSION_SUFFIX'):
Expand All @@ -36,4 +36,7 @@
packages=find_packages(exclude=['tests*']),
tests_require=tests_require,
test_suite="nose.collector",
install_requires=[
"wrapt"
]
)
Empty file.
Loading