diff --git a/Rakefile b/Rakefile index c867f1de38f..ba1937e51f1 100644 --- a/Rakefile +++ b/Rakefile @@ -12,7 +12,8 @@ end desc "remove artifacts" task :clean do - sh 'rm -rf build *egg*' + sh 'python setup.py clean' + sh 'rm -rf build *egg* *.whl dist' end desc "build the docs" diff --git a/circle.yml b/circle.yml index 36f01b5be77..861e2be8717 100644 --- a/circle.yml +++ b/circle.yml @@ -18,6 +18,7 @@ dependencies: - docker pull cassandra:3 - docker pull postgres:9.5 - docker pull redis:3.2 + - docker pull mongo:3.2 database: override: - sudo service postgresql stop @@ -28,6 +29,7 @@ database: - docker run -d -p 9042:9042 cassandra:3 - docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=test postgres:9.5 - docker run -d -p 6379:6379 redis:3.2 + - docker run -d mongo:3.2 # Wait for Cassandra to be ready - until nc -v -z localhost 9042 ; do sleep 0.2 ; done # Wait for Postgres to be ready diff --git a/ddtrace/compat.py b/ddtrace/compat.py index 9a30b96c666..f1552574905 100644 --- a/ddtrace/compat.py +++ b/ddtrace/compat.py @@ -28,6 +28,12 @@ except ImportError: import json +def iteritems(obj, **kwargs): + func = getattr(obj, "iteritems", None) + if not func: + func = obj.items + return func(**kwargs) + __all__ = [ 'PY2', @@ -37,4 +43,5 @@ 'Queue', 'StringIO', 'json', + 'iteritems' ] diff --git a/ddtrace/contrib/pymongo/__init__.py b/ddtrace/contrib/pymongo/__init__.py new file mode 100644 index 00000000000..cd28e8762d4 --- /dev/null +++ b/ddtrace/contrib/pymongo/__init__.py @@ -0,0 +1,23 @@ +""" +The pymongo integration works by wrapping pymongo's MongoClient to trace +network calls. Basic usage:: + + from pymongo import MongoClient + from ddtrace import tracer + from ddtrace.contrib.pymongo import trace_mongo_client + + client = trace_mongo_client( + MongoClient(), tracer, "my-mongo-db") + + db = client["test-db"] + db.teams.find({"name": "Toronto Maple Leafs"}) +""" + +from ..util import require_modules + +required_modules = ['pymongo'] + +with require_modules(required_modules) as missing_modules: + if not missing_modules: + from .trace import trace_mongo_client + __all__ = ['trace_mongo_client'] diff --git a/ddtrace/contrib/pymongo/parse.py b/ddtrace/contrib/pymongo/parse.py new file mode 100644 index 00000000000..46eb86fb7f7 --- /dev/null +++ b/ddtrace/contrib/pymongo/parse.py @@ -0,0 +1,55 @@ + + +class Command(object): + """ Command stores information about a pymongo network command, """ + + __slots__ = ['name', 'coll', 'tags', 'metrics', 'query'] + + def __init__(self, name, coll): + self.name = name + self.coll = coll + self.tags = {} + self.metrics = {} + self.query = None + + +def parse_query(query): + """ Return a command parsed from the given mongo db query. """ + cmd = Command("query", query.coll) + cmd.query = query.spec + return cmd + +def parse_spec(spec): + """ Return a Command that has parsed the relevant detail for the given + pymongo SON spec. + """ + + # the first element is the command and collection + items = list(spec.items()) + if not items: + return None + name, coll = items[0] + cmd = Command(name, coll) + + if 'ordered' in spec: # in insert and update + cmd.tags['mongodb.ordered'] = spec['ordered'] + + if cmd.name == 'insert': + if 'documents' in spec: + cmd.metrics['mongodb.documents'] = len(spec['documents']) + + elif cmd.name == 'update': + updates = spec.get('updates') + if updates: + # FIXME[matt] is there ever more than one here? + cmd.query = updates[0].get("q") + + elif cmd.name == 'delete': + dels = spec.get('deletes') + if dels: + # FIXME[matt] is there ever more than one here? + cmd.query = dels[0].get("q") + + return cmd + + diff --git a/ddtrace/contrib/pymongo/trace.py b/ddtrace/contrib/pymongo/trace.py new file mode 100644 index 00000000000..3d085927877 --- /dev/null +++ b/ddtrace/contrib/pymongo/trace.py @@ -0,0 +1,197 @@ + +# stdlib +import contextlib +import logging + +# 3p +from pymongo import MongoClient +from pymongo.database import Database +from pymongo.collection import Collection +from wrapt import ObjectProxy + +# project +from ...compat import iteritems +from ...ext import AppTypes +from ...ext import mongo as mongox +from ...ext import net as netx +from .parse import parse_spec, parse_query, Command + + +log = logging.getLogger(__name__) + + +def trace_mongo_client(client, tracer, service=mongox.TYPE): + tracer.set_service_info( + service=service, + app=mongox.TYPE, + app_type=AppTypes.db, + ) + return TracedMongoClient(tracer, service, client) + + +class TracedSocket(ObjectProxy): + + _tracer = None + _srv = None + + def __init__(self, tracer, service, sock): + super(TracedSocket, self).__init__(sock) + self._tracer = tracer + self._srv = service + + def command(self, dbname, spec, *args, **kwargs): + cmd = None + try: + cmd = parse_spec(spec) + except Exception: + log.exception("error parsing spec. skipping trace") + + # skip tracing if we don't have a piece of data we need + if not dbname or not cmd: + return self.__wrapped__.command(dbname, spec, *args, **kwargs) + + with self.__trace(dbname, cmd) as span: + return self.__wrapped__.command(dbname, spec, *args, **kwargs) + + def write_command(self, *args, **kwargs): + # FIXME[matt] parse the db name and collection from the + # message. + coll = "" + db = "" + cmd = Command("insert_many", coll) + with self.__trace(db, cmd) as s: + s.resource = "insert_many" + result = self.__wrapped__.write_command(*args, **kwargs) + if result: + s.set_metric(mongox.ROWS, result.get("n", -1)) + return result + + def __trace(self, db, cmd): + s = self._tracer.trace( + "pymongo.cmd", + span_type=mongox.TYPE, + service=self._srv, + ) + + if db: s.set_tag(mongox.DB, db) + if cmd: + s.set_tag(mongox.COLLECTION, cmd.coll) + s.set_tags(cmd.tags) + # s.set_metrics(cmd.metrics) FIXME[matt] uncomment whe rebase + + s.resource = _resource_from_cmd(cmd) + if self.address: + _set_address_tags(s, self.address) + return s + + +class TracedServer(ObjectProxy): + + _tracer = None + _srv = None + + def __init__(self, tracer, service, topology): + super(TracedServer, self).__init__(topology) + self._tracer = tracer + self._srv = service + + def send_message_with_response(self, operation, *args, **kwargs): + + # if we're processing something unexpected, just skip tracing. + if getattr(operation, 'name', None) != 'find': + return self.__wrapped__.send_message_with_response( + operation, + *args, + **kwargs) + + # trace the given query. + cmd = parse_query(operation) + with self._tracer.trace( + "pymongo.cmd", + span_type=mongox.TYPE, + service=self._srv) as span: + + span.resource = _resource_from_cmd(cmd) + span.set_tag(mongox.DB, operation.db) + span.set_tag(mongox.COLLECTION, cmd.coll) + span.set_tags(cmd.tags) + + result = self.__wrapped__.send_message_with_response( + operation, + *args, + **kwargs + ) + + if result and result.address: + _set_address_tags(span, result.address) + return result + + @contextlib.contextmanager + def get_socket(self, *args, **kwargs): + with self.__wrapped__.get_socket(*args, **kwargs) as s: + if isinstance(s, TracedSocket): + yield s + else: + yield TracedSocket(self._tracer, self._srv, s) + +class TracedTopology(ObjectProxy): + + _tracer = None + _srv = None + + def __init__(self, tracer, service, topology): + super(TracedTopology, self).__init__(topology) + self._tracer = tracer + self._srv = service + + def select_server(self, *args, **kwargs): + s = self.__wrapped__.select_server(*args, **kwargs) + if isinstance(s, TracedServer): + return s + else: + return TracedServer(self._tracer, self._srv, s) + + +class TracedMongoClient(ObjectProxy): + + _tracer = None + _srv = None + + def __init__(self, tracer, service, client): + client._topology = TracedTopology(tracer, service, client._topology) + super(TracedMongoClient, self).__init__(client) + self._tracer = tracer + self._srv = service + + +def normalize_filter(f=None): + if f is None: + return {} + elif isinstance(f, list): + # normalize lists of filters + # e.g. {$or: [ { age: { $lt: 30 } }, { type: 1 } ]} + return [normalize_filter(s) for s in f] + else: + # normalize dicts of filters + # e.g. {$or: [ { age: { $lt: 30 } }, { type: 1 } ]}) + out = {} + for k, v in iteritems(f): + if isinstance(v, list) or isinstance(v, dict): + # RECURSION ALERT: needs to move to the agent + out[k] = normalize_filter(v) + else: + out[k] = '?' + return out + +def _set_address_tags(span, address): + # the address is only set after the cursor is done. + if address: + span.set_tag(netx.TARGET_HOST, address[0]) + span.set_tag(netx.TARGET_PORT, address[1]) + +def _resource_from_cmd(cmd): + if cmd.query is not None: + nq = normalize_filter(cmd.query) + return "%s %s %s" % (cmd.name, cmd.coll, nq) + else: + return "%s %s" % (cmd.name, cmd.coll) diff --git a/ddtrace/ext/mongo.py b/ddtrace/ext/mongo.py new file mode 100644 index 00000000000..88291544bc6 --- /dev/null +++ b/ddtrace/ext/mongo.py @@ -0,0 +1,8 @@ + +TYPE = 'mongodb' + +COLLECTION = 'mongodb.collection' +DB = 'mongodb.db' +ROWS = 'mongodb.rows' +QUERY = 'mongodb.query' + diff --git a/ddtrace/span.py b/ddtrace/span.py index 1ebe8ff5717..b90b7aaa2bc 100644 --- a/ddtrace/span.py +++ b/ddtrace/span.py @@ -206,12 +206,12 @@ def pprint(self): ('type', self.span_type), ("start", self.start), ("end", "" if not self.duration else self.start + self.duration), - ("duration", self.duration), + ("duration", "%fs" % self.duration), ("error", self.error), ("tags", "") ] - lines.extend((" ", "%s:%s" % kv) for kv in self.meta.items()) + lines.extend((" ", "%s:%s" % kv) for kv in sorted(self.meta.items())) return "\n".join("%10s %s" % l for l in lines) def __enter__(self): diff --git a/docs/index.rst b/docs/index.rst index 2c335aca99a..9cf6f7eb002 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -119,6 +119,11 @@ Postgres .. automodule:: ddtrace.contrib.psycopg +Pymongo +~~~~~~~ + +.. automodule:: ddtrace.contrib.pymongo + Redis ~~~~~ diff --git a/setup.py b/setup.py index cb028e861f3..96dc987d63a 100644 --- a/setup.py +++ b/setup.py @@ -12,11 +12,11 @@ 'elasticsearch', 'flask', 'psycopg2', + 'pymongo', 'redis', ] - version = __version__ # Append a suffix to the version for dev builds if os.environ.get('VERSION_SUFFIX'): @@ -36,4 +36,7 @@ packages=find_packages(exclude=['tests*']), tests_require=tests_require, test_suite="nose.collector", + install_requires=[ + "wrapt" + ] ) diff --git a/tests/contrib/pymongo/__init__.py b/tests/contrib/pymongo/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/contrib/pymongo/test.py b/tests/contrib/pymongo/test.py new file mode 100644 index 00000000000..528943dfccc --- /dev/null +++ b/tests/contrib/pymongo/test.py @@ -0,0 +1,210 @@ + +# stdlib +import time + +# 3p +from nose.tools import eq_ +from pymongo import MongoClient + +# project +from ddtrace.contrib.pymongo.trace import trace_mongo_client, normalize_filter +from ddtrace import Tracer + + +from ...test_tracer import DummyWriter + + +def test_normalize_filter(): + # ensure we can properly normalize queries FIXME[matt] move to the agent + cases = [ + (None, {}), + ( + {"team":"leafs"}, + {"team": "?"}, + ), + ( + {"age": {"$gt" : 20}}, + {"age": {"$gt" : "?"}}, + ), + ( + { + "status": "A", + "$or": [ { "age": { "$lt": 30 } }, { "type": 1 } ] + }, + { + "status": "?", + "$or": [ { "age": { "$lt": "?" } }, { "type": "?" } ] + } + ) + ] + for i, expected in cases: + out = normalize_filter(i) + eq_(expected, out) + + +def test_update(): + # ensure we trace deletes + tracer, client = _get_tracer_and_client("songdb") + writer = tracer.writer + db = client["testdb"] + db.drop_collection("songs") + input_songs = [ + {'name' : 'Powderfinger', 'artist':'Neil'}, + {'name' : 'Harvest', 'artist':'Neil'}, + {'name' : 'Suzanne', 'artist':'Leonard'}, + {'name' : 'Partisan', 'artist':'Leonard'}, + ] + db.songs.insert_many(input_songs) + + result = db.songs.update_many( + {"artist":"Neil"}, + {"$set": {"artist":"Shakey"}}, + ) + + eq_(result.matched_count, 2) + eq_(result.modified_count, 2) + + # ensure all is traced. + spans = writer.pop() + assert spans, spans + for span in spans: + # ensure all the of the common metadata is set + eq_(span.service, "songdb") + eq_(span.span_type, "mongodb") + if span.resource != "insert_many": + eq_(span.meta.get("mongodb.collection"), "songs") + eq_(span.meta.get("mongodb.db"), "testdb") + assert span.meta.get("out.host") + assert span.meta.get("out.port") + + expected_resources = set([ + "drop songs", + "update songs {'artist': '?'}", + "insert_many", + ]) + + eq_(expected_resources, {s.resource for s in spans}) + + +def test_delete(): + # ensure we trace deletes + tracer, client = _get_tracer_and_client("songdb") + writer = tracer.writer + db = client["testdb"] + db.drop_collection("songs") + input_songs = [ + {'name' : 'Powderfinger', 'artist':'Neil'}, + {'name' : 'Harvest', 'artist':'Neil'}, + {'name' : 'Suzanne', 'artist':'Leonard'}, + {'name' : 'Partisan', 'artist':'Leonard'}, + ] + db.songs.insert_many(input_songs) + + # test delete one + af = {'artist':'Neil'} + eq_(db.songs.count(af), 2) + db.songs.delete_one(af) + eq_(db.songs.count(af), 1) + + # test delete many + af = {'artist':'Leonard'} + eq_(db.songs.count(af), 2) + db.songs.delete_many(af) + eq_(db.songs.count(af), 0) + + # ensure all is traced. + spans = writer.pop() + assert spans, spans + for span in spans: + # ensure all the of the common metadata is set + eq_(span.service, "songdb") + eq_(span.span_type, "mongodb") + if span.resource != "insert_many": + eq_(span.meta.get("mongodb.collection"), "songs") + eq_(span.meta.get("mongodb.db"), "testdb") + assert span.meta.get("out.host") + assert span.meta.get("out.port") + + expected_resources = set([ + "drop songs", + "count songs", + "delete songs {'artist': '?'}", + "insert_many", + ]) + + eq_(expected_resources, {s.resource for s in spans}) + + +def test_insert_find(): + tracer, client = _get_tracer_and_client("pokemongodb") + writer = tracer.writer + + start = time.time() + db = client["testdb"] + db.drop_collection("teams") + teams = [ + { + 'name' : 'Toronto Maple Leafs', + 'established' : 1917, + }, + { + 'name' : 'Montreal Canadiens', + 'established' : 1910, + }, + { + 'name' : 'New York Rangers', + 'established' : 1926, + } + ] + + # create some data (exercising both ways of inserting) + + db.teams.insert_one(teams[0]) + db.teams.insert_many(teams[1:]) + + # query some data + cursor = db.teams.find() + count = 0 + for row in cursor: + count += 1 + eq_(count, len(teams)) + + queried = list(db.teams.find({"name": "Toronto Maple Leafs"})) + end = time.time() + eq_(len(queried), 1) + eq_(queried[0]["name"], "Toronto Maple Leafs") + eq_(queried[0]["established"], 1917) + + spans = writer.pop() + for span in spans: + # ensure all the of the common metadata is set + eq_(span.service, "pokemongodb") + eq_(span.span_type, "mongodb") + if span.resource != "insert_many": + eq_(span.meta.get("mongodb.collection"), "teams") + eq_(span.meta.get("mongodb.db"), "testdb") + assert span.meta.get("out.host"), span.pprint() + assert span.meta.get("out.port"), span.pprint() + assert span.start > start + assert span.duration < end - start + + expected_resources = set([ + "drop teams", + "insert teams", + "insert_many", + "query teams {}", + "query teams {'name': '?'}", + ]) + + eq_(expected_resources, {s.resource for s in spans}) + +def _get_tracer_and_client(service): + """ Return a tuple of (tracer, mongo_client) for testing. """ + tracer = Tracer() + writer = DummyWriter() + tracer.writer = writer + original_client = MongoClient() + client = trace_mongo_client(original_client, tracer, service=service) + return tracer, client + + diff --git a/tests/contrib/pymongo/test_spec.py b/tests/contrib/pymongo/test_spec.py new file mode 100644 index 00000000000..b977bf63ac8 --- /dev/null +++ b/tests/contrib/pymongo/test_spec.py @@ -0,0 +1,50 @@ +""" +tests for parsing specs. +""" + +from bson.son import SON +from nose.tools import eq_ + +from ddtrace.contrib.pymongo.parse import parse_spec + +def test_empty(): + cmd = parse_spec(SON([])) + assert cmd is None + +def test_create(): + cmd = parse_spec(SON([("create", "foo")])) + eq_(cmd.name, "create") + eq_(cmd.coll, "foo") + eq_(cmd.tags, {}) + eq_(cmd.metrics ,{}) + +def test_insert(): + spec = SON([ + ('insert', 'bla'), + ('ordered', True), + ('documents', ['a', 'b']), + ]) + cmd = parse_spec(spec) + eq_(cmd.name, "insert") + eq_(cmd.coll, "bla") + eq_(cmd.tags, {'mongodb.ordered':True}) + eq_(cmd.metrics, {'mongodb.documents':2}) + +def test_update(): + spec = SON([ + ('update', u'songs'), + ('ordered', True), + ('updates', [ + SON([ + ('q', {'artist': 'Neil'}), + ('u', {'$set': {'artist': 'Shakey'}}), + ('multi', True), + ('upsert', False) + ]) + ]) + ]) + cmd = parse_spec(spec) + eq_(cmd.name, "update") + eq_(cmd.coll, "songs") + eq_(cmd.query, {'artist':'Neil'}) +