Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddtrace/contrib/tornado/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
REQUEST_CONTEXT_KEY = 'datadog_context'
REQUEST_SPAN_KEY = '__datadog_request_span'
FUTURE_SPAN_KEY = '__datadog_future_span'
PARENT_SPAN_KEY = '__datadog_parent_span'
51 changes: 38 additions & 13 deletions ddtrace/contrib/tornado/decorators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import sys
import ddtrace

from .constants import FUTURE_SPAN_KEY
from functools import wraps

from .constants import FUTURE_SPAN_KEY, PARENT_SPAN_KEY
from .stack_context import TracerStackContext


Expand Down Expand Up @@ -31,35 +33,58 @@ def _finish_span(future):
def _run_on_executor(run_on_executor, _, params, kw_params):
"""
Wrap the `run_on_executor` function so that when a function is executed
in a different thread, we use an intermediate function (and a closure)
to keep track of the current `parent_span` if any. The real function
is then executed in a `TracerStackContext` so that `tracer.trace()`
in a different thread, we pass the current parent Span to the intermediate
function that will execute the original call. The original function
is then executed within a `TracerStackContext` so that `tracer.trace()`
can be used as usual, both with empty or existing `Context`.
"""
# we expect exceptions if the `run_on_executor` is called with
# wrong arguments; in this case we should not do anything
def pass_context_decorator(fn):
"""
Decorator that is used to wrap the original `run_on_executor_decorator`
so that we can pass the current active context before the `executor.submit`
is called. In this case we get the `parent_span` reference and we pass
that reference to `fn` reference. Because in the outer wrapper we replace
the original call with our `traced_wrapper`, we're sure that the `parent_span`
is passed to our intermediate function and not to the user function.
"""
@wraps(fn)
def wrapper(*args, **kwargs):
# from the current context, retrive the active span
current_ctx = ddtrace.tracer.get_call_context()
parent_span = getattr(current_ctx, '_current_span', None)

# pass the current parent span in the Future call so that
# it can be retrieved later
kwargs.update({PARENT_SPAN_KEY: parent_span})
return fn(*args, **kwargs)
return wrapper

# we expect exceptions here if the `run_on_executor` is called with
# wrong arguments; in that case we should not do anything because
# the exception must not be handled here
decorator = run_on_executor(*params, **kw_params)

# closure that holds the parent_span of this logical execution; the
# Context object may not exist and/or may be empty
current_ctx = ddtrace.tracer.get_call_context()
parent_span = getattr(current_ctx, '_current_span', None)

# `run_on_executor` can be called with arguments; in this case we
# return an inner decorator that holds the real function that should be
# called
if decorator.__module__ == 'tornado.concurrent':
def run_on_executor_decorator(deco_fn):
def inner_traced_wrapper(*args, **kwargs):
# retrieve the parent span from the function kwargs
parent_span = kwargs.pop(PARENT_SPAN_KEY, None)
return run_executor_stack_context(deco_fn, args, kwargs, parent_span)
return decorator(inner_traced_wrapper)
return pass_context_decorator(decorator(inner_traced_wrapper))

return run_on_executor_decorator

# return our wrapper function that executes an intermediate function to
# trace the real execution in a different thread
def traced_wrapper(*args, **kwargs):
# retrieve the parent span from the function kwargs
parent_span = kwargs.pop(PARENT_SPAN_KEY, None)
return run_executor_stack_context(params[0], args, kwargs, parent_span)
return run_on_executor(traced_wrapper)

return pass_context_decorator(run_on_executor(traced_wrapper))


def run_executor_stack_context(fn, args, kwargs, parent_span):
Expand Down
3 changes: 2 additions & 1 deletion tests/contrib/tornado/test_safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ddtrace.contrib.tornado import patch, unpatch

from . import web
from .web.app import CustomDefaultHandler
from .utils import TornadoTestCase


Expand Down Expand Up @@ -115,7 +116,7 @@ class TestCustomAppSafety(TornadoTestCase):
"""
def get_settings(self):
return {
'default_handler_class': web.CustomDefaultHandler,
'default_handler_class': CustomDefaultHandler,
'default_handler_args': dict(status_code=400),
}

Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/tornado/test_tornado_web.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from nose.tools import eq_, ok_

from . import web
from .web.app import CustomDefaultHandler
from .utils import TornadoTestCase


Expand Down Expand Up @@ -235,7 +235,7 @@ class TestCustomTornadoWeb(TornadoTestCase):
"""
def get_settings(self):
return {
'default_handler_class': web.CustomDefaultHandler,
'default_handler_class': CustomDefaultHandler,
'default_handler_args': dict(status_code=400),
}

Expand Down
9 changes: 6 additions & 3 deletions tests/contrib/tornado/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from ddtrace.contrib.tornado import patch, unpatch

from . import web
from .web import app
from .web.compat import reload_module
from ...test_tracer import get_dummy_tracer


Expand All @@ -13,15 +14,16 @@ class TornadoTestCase(AsyncHTTPTestCase):
in the `self.tracer` attribute.
"""
def get_app(self):
# patch Tornado
# patch Tornado and reload module app
patch()
reload_module(app)
# create a dummy tracer and a Tornado web application
self.tracer = get_dummy_tracer()
settings = self.get_settings()
trace_settings = settings.get('datadog_trace', {})
settings['datadog_trace'] = trace_settings
trace_settings['tracer'] = self.tracer
self.app = web.make_app(settings=settings)
self.app = app.make_app(settings=settings)
return self.app

def get_settings(self):
Expand All @@ -32,3 +34,4 @@ def tearDown(self):
super(TornadoTestCase, self).tearDown()
# unpatch Tornado
unpatch()
reload_module(app)
1 change: 0 additions & 1 deletion tests/contrib/tornado/web/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from .app import make_app, CustomDefaultHandler # noqa
85 changes: 43 additions & 42 deletions tests/contrib/tornado/web/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,63 +147,65 @@ class ExecutorHandler(tornado.web.RequestHandler):
# used automatically by the @run_on_executor decorator
executor = ThreadPoolExecutor(max_workers=3)

@tornado.concurrent.run_on_executor
def outer_executor(self):
tracer = self.settings['datadog_trace']['tracer']
with tracer.trace('tornado.executor.with'):
time.sleep(0.05)

@tornado.gen.coroutine
def get(self):
@tornado.concurrent.run_on_executor
def outer_executor(self):
# wait before creating a trace so that we're sure
# the `tornado.executor.with` span has the right
# parent
tracer = self.settings['datadog_trace']['tracer']

with tracer.trace('tornado.executor.with'):
time.sleep(0.05)

yield outer_executor(self)
yield self.outer_executor()
self.write('OK')


class ExecutorDelayedHandler(tornado.web.RequestHandler):
# used automatically by the @run_on_executor decorator
executor = ThreadPoolExecutor(max_workers=3)

@tornado.concurrent.run_on_executor
def outer_executor(self):
# waiting here means expecting that the `get()` flushes
# the request trace
time.sleep(0.01)
tracer = self.settings['datadog_trace']['tracer']
with tracer.trace('tornado.executor.with'):
time.sleep(0.05)

@tornado.gen.coroutine
def get(self):
@tornado.concurrent.run_on_executor
def outer_executor(self):
# waiting here means expecting that the `get()` flushes
# the request trace
time.sleep(0.01)
tracer = self.settings['datadog_trace']['tracer']

with tracer.trace('tornado.executor.with'):
time.sleep(0.05)

# we don't yield here but we expect that the outer_executor
# has the right parent; tests that use this handler, must
# yield sleep() to wait thread execution
outer_executor(self)
self.outer_executor()
self.write('OK')


class ExecutorCustomHandler(tornado.web.RequestHandler):
# not used automatically, a kwarg is required
custom_thread_pool = ThreadPoolExecutor(max_workers=3)
try:
class ExecutorCustomHandler(tornado.web.RequestHandler):
# not used automatically, a kwarg is required
custom_thread_pool = ThreadPoolExecutor(max_workers=3)

@tornado.gen.coroutine
def get(self):
@tornado.concurrent.run_on_executor(executor='custom_thread_pool')
def outer_executor(self):
# wait before creating a trace so that we're sure
# the `tornado.executor.with` span has the right
# parent
tracer = self.settings['datadog_trace']['tracer']

with tracer.trace('tornado.executor.with'):
time.sleep(0.05)

yield outer_executor(self)
self.write('OK')
@tornado.gen.coroutine
def get(self):
yield self.outer_executor()
self.write('OK')
except TypeError:
# the class definition fails because Tornado 4.0 and 4.1 don't support
# `run_on_executor` with params. Because it's just this case, we can
# use a try-except block, but if we have another case we may move
# these endpoints outside the module and use a compatibility system
class ExecutorCustomHandler(tornado.web.RequestHandler):
pass


class ExecutorCustomArgsHandler(tornado.web.RequestHandler):
Expand All @@ -222,20 +224,19 @@ class ExecutorExceptionHandler(tornado.web.RequestHandler):
# used automatically by the @run_on_executor decorator
executor = ThreadPoolExecutor(max_workers=3)

@tornado.concurrent.run_on_executor
def outer_executor(self):
# wait before creating a trace so that we're sure
# the `tornado.executor.with` span has the right
# parent
time.sleep(0.05)
tracer = self.settings['datadog_trace']['tracer']
with tracer.trace('tornado.executor.with'):
raise Exception('Ouch!')

@tornado.gen.coroutine
def get(self):
@tornado.concurrent.run_on_executor
def outer_executor(self):
# wait before creating a trace so that we're sure
# the `tornado.executor.with` span has the right
# parent
time.sleep(0.05)
tracer = self.settings['datadog_trace']['tracer']

with tracer.trace('tornado.executor.with'):
raise Exception('Ouch!')

yield outer_executor(self)
yield self.outer_executor()
self.write('OK')


Expand Down
5 changes: 5 additions & 0 deletions tests/contrib/tornado/web/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
from tornado.ioloop import IOLoop


try:
from importlib import reload as reload_module
except ImportError:
reload_module = reload

try:
from concurrent.futures import ThreadPoolExecutor
except ImportError:
Expand Down