diff --git a/circle.yml b/circle.yml index 69bbf18323f..14f5244a1cf 100644 --- a/circle.yml +++ b/circle.yml @@ -1,12 +1,16 @@ machine: services: - docker + environment: + CASS_DRIVER_NO_EXTENSIONS: 1 post: - pyenv global 2.7.9 3.4.2 dependencies: pre: - sudo service postgresql stop - sudo service redis-server stop + # install of mock fails otherwise + - pip3.4 install -U setuptools test: override: - docker run -d -p 9200:9200 elasticsearch:2.3 diff --git a/ddtrace/sampler.py b/ddtrace/sampler.py index d9fa0dd71ce..b4a5bf6e88b 100644 --- a/ddtrace/sampler.py +++ b/ddtrace/sampler.py @@ -1,15 +1,28 @@ +"""Samplers manage the client-side trace sampling + +Any `sampled = False` trace won't be written, and can be ignored by the instrumentation. +""" + import logging +import array +import threading from .span import MAX_TRACE_ID log = logging.getLogger(__name__) +class DefaultSampler(object): + """Default sampler, sampling all the traces""" + + def sample(self, span): + span.sampled = True + + class RateSampler(object): - """RateSampler manages the client-side trace sampling based on a rate + """Sampler based on a rate - Keep (100 * sample_rate)% of the traces. - Any `sampled = False` trace won't be written, and can be ignored by the instrumentation. + Keep (100 * `sample_rate`)% of the traces. It samples randomly, its main purpose is to reduce the instrumentation footprint. """ @@ -20,6 +33,11 @@ def __init__(self, sample_rate): elif sample_rate > 1: sample_rate = 1 + self.set_sample_rate(sample_rate) + + log.info("initialized RateSampler, sample %s%% of traces", 100 * sample_rate) + + def set_sample_rate(self, sample_rate): self.sample_rate = sample_rate self.sampling_id_threshold = sample_rate * MAX_TRACE_ID @@ -27,3 +45,55 @@ def sample(self, span): span.sampled = span.trace_id <= self.sampling_id_threshold # `weight` is an attribute applied to all spans to help scaling related statistics span.weight = 1 / (self.sample_rate or 1) + + +class ThroughputSampler(object): + """Sampler applying a strict limit over the trace volume + + Stop tracing once reached more than `tps` traces per second. + Computation is based on a circular buffer over the last `BUFFER_DURATION` with a `BUFFER_SIZE` size. + """ + + # Reasonable values + BUCKETS_PER_S = 10 + BUFFER_DURATION = 2 + BUFFER_SIZE = BUCKETS_PER_S * BUFFER_DURATION + + def __init__(self, tps): + self.buffer_limit = tps * self.BUFFER_DURATION + + # Circular buffer counting sampled traces over the last `BUFFER_DURATION` + self.counter = 0 + self.counter_buffer = array.array('L', [0] * self.BUFFER_SIZE) + self._buffer_lock = threading.Lock() + # Last time we sampled a trace, multiplied by `BUCKETS_PER_S` + self.last_track_time = 0 + + log.info("initialized ThroughputSampler, sample up to %s traces/s", tps) + + def sample(self, span): + now = int(span.start * self.BUCKETS_PER_S) + + with self._buffer_lock: + last_track_time = self.last_track_time + if now > last_track_time: + self.last_track_time = now + self.expire_buckets(last_track_time, now) + + span.sampled = self.counter < self.buffer_limit + + if span.sampled: + self.counter += 1 + self.counter_buffer[self.key_from_time(now)] += 1 + + return span + + def key_from_time(self, t): + return t % self.BUFFER_SIZE + + def expire_buckets(self, start, end): + period = min(self.BUFFER_SIZE, (end - start)) + for i in range(period): + key = self.key_from_time(start + i + 1) + self.counter -= self.counter_buffer[key] + self.counter_buffer[key] = 0 diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index 4327f331134..ab58694e6e8 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -2,7 +2,7 @@ import threading from .buffer import ThreadLocalSpanBuffer -from .sampler import RateSampler +from .sampler import DefaultSampler from .span import Span from .writer import AgentWriter @@ -12,7 +12,7 @@ class Tracer(object): - def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): + def __init__(self, enabled=True, writer=None, span_buffer=None, sampler=None): """ Create a new tracer object. @@ -20,12 +20,13 @@ def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): writer: an instance of Writer span_buffer: a span buffer instance. used to store inflight traces. by default, will use thread local storage - sample_rate: Pre-sampling rate. + sampler: Trace sampler. """ self.enabled = enabled self._writer = writer or AgentWriter() self._span_buffer = span_buffer or ThreadLocalSpanBuffer() + self.sampler = sampler or DefaultSampler() # a list of buffered spans. self._spans_lock = threading.Lock() @@ -34,8 +35,6 @@ def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): # a collection of registered services by name. self._services = {} - self.sampler = RateSampler(sample_rate) - # A hook for local debugging. shouldn't be needed or used # in production. self.debug_logging = False diff --git a/setup.py b/setup.py index 2f6e83a412a..cb028e861f3 100644 --- a/setup.py +++ b/setup.py @@ -3,16 +3,20 @@ import os tests_require = [ + 'mock', + 'nose', + # contrib 'blinker', 'cassandra-driver', 'django', 'elasticsearch', 'flask', - 'nose', 'psycopg2', 'redis', ] + + version = __version__ # Append a suffix to the version for dev builds if os.environ.get('VERSION_SUFFIX'): diff --git a/tests/test_sampler.py b/tests/test_sampler.py new file mode 100644 index 00000000000..20670941654 --- /dev/null +++ b/tests/test_sampler.py @@ -0,0 +1,157 @@ +from __future__ import division + +import unittest +import random +import time +import threading + +from ddtrace.tracer import Tracer +from ddtrace.sampler import RateSampler, ThroughputSampler +from .test_tracer import DummyWriter +from .util import patch_time + + +class RateSamplerTest(unittest.TestCase): + + def test_random_sequence(self): + writer = DummyWriter() + sampler = RateSampler(0.5) + tracer = Tracer(writer=writer, sampler=sampler) + + # Set the seed so that the choice of sampled traces is deterministic, then write tests accordingly + random.seed(4012) + + # First trace, sampled + with tracer.trace("foo") as s: + assert s.sampled + assert s.weight == 2 + assert writer.pop() + + # Second trace, not sampled + with tracer.trace("figh") as s: + assert not s.sampled + s2 = tracer.trace("what") + assert not s2.sampled + s2.finish() + with tracer.trace("ever") as s3: + assert not s3.sampled + s4 = tracer.trace("!") + assert not s4.sampled + s4.finish() + spans = writer.pop() + assert not spans, spans + + # Third trace, not sampled + with tracer.trace("ters") as s: + assert s.sampled + assert writer.pop() + + +class ThroughputSamplerTest(unittest.TestCase): + """Test suite for the ThroughputSampler""" + + def test_simple_limit(self): + writer = DummyWriter() + tracer = Tracer(writer=writer) + + with patch_time() as fake_time: + tps = 5 + tracer.sampler = ThroughputSampler(tps) + + for _ in range(10): + s = tracer.trace("whatever") + s.finish() + traces = writer.pop() + + got = len(traces) + expected = 10 + + assert got == expected, \ + "Wrong number of traces sampled, %s instead of %s" % (got, expected) + + # Wait enough to reset + fake_time.sleep(tracer.sampler.BUFFER_DURATION + 1) + + for _ in range(100): + s = tracer.trace("whatever") + s.finish() + traces = writer.pop() + + got = len(traces) + expected = tps * tracer.sampler.BUFFER_DURATION + + assert got == expected, \ + "Wrong number of traces sampled, %s instead of %s" % (got, expected) + + def test_long_run(self): + writer = DummyWriter() + tracer = Tracer(writer=writer) + + # Test a big matrix of combinaisons + # Ensure to have total_time >> BUFFER_DURATION to reduce edge effects + for tps in [10, 23, 15, 31]: + for (traces_per_s, total_time) in [(80, 23), (75, 66), (1000, 77)]: + + with patch_time() as fake_time: + # We do tons of operations in this test, do not let the time slowly shift + fake_time.set_delta(0) + + tracer.sampler = ThroughputSampler(tps) + + for _ in range(total_time): + for _ in range(traces_per_s): + s = tracer.trace("whatever") + s.finish() + fake_time.sleep(1) + + traces = writer.pop() + # The current sampler implementation can introduce an error of up to + # `tps * BUFFER_DURATION` traces at initialization (since the sampler starts empty) + got = len(traces) + expected = tps * total_time + error_delta = tps * tracer.sampler.BUFFER_DURATION + + assert abs(got - expected) <= error_delta, \ + "Wrong number of traces sampled, %s instead of %s (error_delta > %s)" % (got, expected, error_delta) + + + def test_concurrency(self): + # Test that the sampler works well when used in different threads + writer = DummyWriter() + tracer = Tracer(writer=writer) + + total_time = 10 + concurrency = 100 + end_time = time.time() + total_time + + # Let's sample to a multiple of BUFFER_SIZE, so that we can pre-populate the buffer + tps = 15 * ThroughputSampler.BUFFER_SIZE + tracer.sampler = ThroughputSampler(tps) + + threads = [] + + def run_simulation(tracer, end_time): + while time.time() < end_time: + s = tracer.trace("whatever") + s.finish() + # ~1000 traces per s per thread + time.sleep(0.001) + + for i in range(concurrency): + thread = threading.Thread(target=run_simulation, args=(tracer, end_time)) + threads.append(thread) + + for t in threads: + t.start() + + for t in threads: + t.join() + + traces = writer.pop() + + got = len(traces) + expected = tps * total_time + error_delta = tps * ThroughputSampler.BUFFER_DURATION + + assert abs(got - expected) <= error_delta, \ + "Wrong number of traces sampled, %s instead of %s (error_delta > %s)" % (got, expected, error_delta) diff --git a/tests/test_tracer.py b/tests/test_tracer.py index 6a2230d3b73..2833a99cf74 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -3,7 +3,6 @@ """ import time -import random from nose.tools import eq_ @@ -104,39 +103,6 @@ def test_tracer_disabled_mem_leak(): s2.finish() assert not p1, p1 -def test_sampling(): - writer = DummyWriter() - tracer = Tracer(writer=writer, sample_rate=0.5) - - # Set the seed so that the choice of sampled traces is deterministic, then write tests accordingly - random.seed(4012) - - # First trace, sampled - with tracer.trace("foo") as s: - assert s.sampled - assert s.weight == 2 - assert writer.pop() - - # Second trace, not sampled - with tracer.trace("figh") as s: - assert not s.sampled - s2 = tracer.trace("what") - assert not s2.sampled - s2.finish() - with tracer.trace("ever") as s3: - assert not s3.sampled - s4 = tracer.trace("!") - assert not s4.sampled - s4.finish() - spans = writer.pop() - assert not spans, spans - - # Third trace, not sampled - with tracer.trace("ters") as s: - assert s.sampled - assert writer.pop() - - class DummyWriter(object): """ DummyWriter is a small fake writer used for tests. not thread-safe. """ diff --git a/tests/util.py b/tests/util.py new file mode 100644 index 00000000000..a8300796782 --- /dev/null +++ b/tests/util.py @@ -0,0 +1,31 @@ +import mock + +class FakeTime(object): + """"Allow to mock time.time for tests + + `time.time` returns a defined `current_time` instead. + Any `time.time` call also increase the `current_time` of `delta` seconds. + """ + + def __init__(self): + # Sane defaults + self._current_time = 1e9 + self._delta = 0.001 + + def __call__(self): + self._current_time = self._current_time + self._delta + return self._current_time + + def set_epoch(self, epoch): + self._current_time = epoch + + def set_delta(self, delta): + self._delta = delta + + def sleep(self, second): + self._current_time += second + + +def patch_time(): + """Patch time.time with FakeTime""" + return mock.patch('time.time', new_callable=FakeTime)