Skip to content
26 changes: 23 additions & 3 deletions ddtrace/_hooks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import collections
from copy import deepcopy
from typing import Any
from typing import Callable
from typing import DefaultDict
from typing import Optional
from typing import Set

from .internal.logger import get_logger
Expand Down Expand Up @@ -29,7 +32,12 @@ def __deepcopy__(self, memodict=None):
hooks._hooks = deepcopy(self._hooks, memodict)
return hooks

def register(self, hook, func=None):
def register(
self,
hook, # type: Any
func=None, # type: Optional[Callable]
):
# type: (...) -> Optional[Callable[..., Any]]
"""
Function used to register a hook for the provided name.

Expand Down Expand Up @@ -63,14 +71,20 @@ def wrapper(func):

return wrapper
self._hooks[hook].add(func)
return None

# Provide shorthand `on` method for `register`
# >>> @config.falcon.hooks.on('request')
# def on_request(span, request, response):
# pass
on = register

def deregister(self, hook, func):
def deregister(
self,
hook, # type: Any
func, # type: Callable
):
# type: (...) -> None
"""
Function to deregister a function from a hook it was registered under

Expand All @@ -93,7 +107,13 @@ def on_request(span, request, response):
except KeyError:
pass

def emit(self, hook, *args, **kwargs):
def emit(
self,
hook, # type: Any
*args, # type: Any
**kwargs # type: Any
):
# type: (...) -> None
"""
Function used to call registered hook functions.

Expand Down
17 changes: 16 additions & 1 deletion ddtrace/_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
from typing import Optional

from .internal.logger import get_logger

Expand All @@ -19,7 +20,13 @@ class PeriodicWorkerThread(object):

_DEFAULT_INTERVAL = 1.0

def __init__(self, interval=_DEFAULT_INTERVAL, name=None, daemon=True):
def __init__(
self,
interval=_DEFAULT_INTERVAL, # type: float
name=None, # type: Optional[str]
daemon=True, # type: bool
):
# type: (...) -> None
"""Create a new worker thread that runs a function periodically.

:param interval: The interval in seconds to wait between calls to `run_periodic`.
Expand All @@ -34,37 +41,45 @@ def __init__(self, interval=_DEFAULT_INTERVAL, name=None, daemon=True):
self.interval = interval

def start(self):
# type: () -> None
"""Start the periodic worker."""
_LOG.debug("Starting %s thread", self._thread.name)
self._thread.start()
self.started = True

def stop(self):
# type: () -> None
"""Stop the worker."""
_LOG.debug("Stopping %s thread", self._thread.name)
self._stop.set()

def is_alive(self):
# type: () -> bool
return self._thread.is_alive()

def join(self, timeout=None):
# type: (Optional[float]) -> None
return self._thread.join(timeout)

def _target(self):
# type: () -> None
while not self._stop.wait(self.interval):
self.run_periodic()
self._on_shutdown()

@staticmethod
def run_periodic():
# type: () -> None
"""Method executed every interval."""
pass

def _on_shutdown(self):
# type: () -> None
_LOG.debug("Shutting down %s thread", self._thread.name)
self.on_shutdown()

@staticmethod
def on_shutdown():
# type: () -> None
"""Method ran on worker shutdown."""
pass
19 changes: 19 additions & 0 deletions ddtrace/encoding.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import json
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import TYPE_CHECKING

from .internal._encoding import MsgpackEncoder
from .internal.logger import get_logger


if TYPE_CHECKING:
from .span import Span


log = get_logger(__name__)


Expand All @@ -13,6 +22,7 @@ class _EncoderBase(object):
"""

def encode_traces(self, traces):
# type: (List[List[Span]]) -> str
"""
Encodes a list of traces, expecting a list of items where each items
is a list of spans. Before dump the string in a serialized format all
Expand All @@ -25,6 +35,7 @@ def encode_traces(self, traces):
return self.encode(normalized_traces)

def encode_trace(self, trace):
# type: (List[Span]) -> str
"""
Encodes a trace, expecting a list of spans. Before dump the string in a
serialized format all traces are normalized, calling the ``to_dict()`` method.
Expand Down Expand Up @@ -53,10 +64,12 @@ class JSONEncoder(_EncoderBase):

@staticmethod
def encode(obj):
# type: (Any) -> str
return json.dumps(obj)

@staticmethod
def join_encoded(objs):
# type: (List[str]) -> str
"""Join a list of encoded objects together as a json array"""
return "[" + ",".join(objs) + "]"

Expand All @@ -69,19 +82,23 @@ class JSONEncoderV2(JSONEncoder):
content_type = "application/json"

def encode_traces(self, traces):
# type: (List[List[Span]]) -> str
normalized_traces = [[JSONEncoderV2._convert_span(span) for span in trace] for trace in traces]
return self.encode({"traces": normalized_traces})

def encode_trace(self, trace):
# type: (List[Span]) -> str
return self.encode([JSONEncoderV2._convert_span(span) for span in trace])

@staticmethod
def join_encoded(objs):
# type: (List[str]) -> str
"""Join a list of encoded objects together as a json array"""
return '{"traces":[' + ",".join(objs) + "]}"

@staticmethod
def _convert_span(span):
# type: (Span) -> Dict[str, Any]
sp = span.to_dict()
sp["trace_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("trace_id"))
sp["parent_id"] = JSONEncoderV2._encode_id_to_hex(sp.get("parent_id"))
Expand All @@ -90,12 +107,14 @@ def _convert_span(span):

@staticmethod
def _encode_id_to_hex(dd_id):
# type: (Optional[int]) -> str
if not dd_id:
return "0000000000000000"
return "%0.16X" % int(dd_id)

@staticmethod
def _decode_id_to_hex(hex_id):
# type: (Optional[str]) -> int
if not hex_id:
return 0
return int(hex_id, 16)
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/internal/_encoding.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Any
from typing import List
from typing import Union

class MsgpackEncoder(object):
content_type: str
def _decode(self, data: Union[str, bytes]) -> Any: ...
def encode_trace(self, trace: List[Any]) -> bytes: ...
def join_encoded(self, objs: List[bytes]) -> bytes: ...
23 changes: 21 additions & 2 deletions ddtrace/payload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union

from .encoding import Encoder


if TYPE_CHECKING:
from .span import Span


class PayloadFull(Exception):
"""The payload is full."""

Expand All @@ -24,7 +33,12 @@ class Payload(object):
# 5 MB should be a good average efficient size
DEFAULT_MAX_PAYLOAD_SIZE = 5 * 1000000

def __init__(self, encoder=None, max_payload_size=DEFAULT_MAX_PAYLOAD_SIZE):
def __init__(
self,
encoder=None, # type: Optional[Encoder]
max_payload_size=DEFAULT_MAX_PAYLOAD_SIZE, # type: int
):
# type: (...) -> None
"""
Constructor for Payload

Expand All @@ -35,10 +49,11 @@ def __init__(self, encoder=None, max_payload_size=DEFAULT_MAX_PAYLOAD_SIZE):
"""
self.max_payload_size = max_payload_size
self.encoder = encoder or Encoder()
self.traces = []
self.traces = [] # type: List[bytes]
self.size = 0

def add_trace(self, trace):
# type: (Optional[List[Span]]) -> None
"""
Encode and append a trace to this payload

Expand All @@ -58,6 +73,7 @@ def add_trace(self, trace):

@property
def length(self):
# type: () -> int
"""
Get the number of traces in this payload

Expand All @@ -68,6 +84,7 @@ def length(self):

@property
def empty(self):
# type: () -> bool
"""
Whether this payload is empty or not

Expand All @@ -77,6 +94,7 @@ def empty(self):
return self.length == 0

def get_payload(self):
# type: () -> Union[str, bytes]
"""
Get the fully encoded payload

Expand All @@ -87,6 +105,7 @@ def get_payload(self):
return self.encoder.join_encoded(self.traces)

def __repr__(self):
# type: () -> str
"""Get the string representation of this payload"""
return "{0}(length={1}, size={2} B, max_payload_size={3} B)".format(
self.__class__.__name__, self.length, self.size, self.max_payload_size
Expand Down
Loading