Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
machine:
services:
- docker
environment:
CASS_DRIVER_NO_EXTENSIONS: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this make it fast? if so ...<3

post:
- pyenv global 2.7.9 3.4.2
dependencies:
pre:
- sudo service postgresql stop
- sudo service redis-server stop
# install of mock fails otherwise
- pip3.4 install -U setuptools
test:
override:
- docker run -d -p 9200:9200 elasticsearch:2.3
Expand Down
76 changes: 73 additions & 3 deletions ddtrace/sampler.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
"""Samplers manage the client-side trace sampling

Any `sampled = False` trace won't be written, and can be ignored by the instrumentation.
"""

import logging
import array
import threading

from .span import MAX_TRACE_ID

log = logging.getLogger(__name__)


class DefaultSampler(object):
"""Default sampler, sampling all the traces"""

def sample(self, span):
span.sampled = True


class RateSampler(object):
"""RateSampler manages the client-side trace sampling based on a rate
"""Sampler based on a rate

Keep (100 * sample_rate)% of the traces.
Any `sampled = False` trace won't be written, and can be ignored by the instrumentation.
Keep (100 * `sample_rate`)% of the traces.
It samples randomly, its main purpose is to reduce the instrumentation footprint.
"""

Expand All @@ -20,10 +33,67 @@ def __init__(self, sample_rate):
elif sample_rate > 1:
sample_rate = 1

self.set_sample_rate(sample_rate)

log.info("initialized RateSampler, sample %s%% of traces", 100 * sample_rate)

def set_sample_rate(self, sample_rate):
self.sample_rate = sample_rate
self.sampling_id_threshold = sample_rate * MAX_TRACE_ID

def sample(self, span):
span.sampled = span.trace_id <= self.sampling_id_threshold
# `weight` is an attribute applied to all spans to help scaling related statistics
span.weight = 1 / (self.sample_rate or 1)


class ThroughputSampler(object):
"""Sampler applying a strict limit over the trace volume

Stop tracing once reached more than `tps` traces per second.
Computation is based on a circular buffer over the last `BUFFER_DURATION` with a `BUFFER_SIZE` size.
"""

# Reasonable values
BUCKETS_PER_S = 10
BUFFER_DURATION = 2
BUFFER_SIZE = BUCKETS_PER_S * BUFFER_DURATION

def __init__(self, tps):
self.buffer_limit = tps * self.BUFFER_DURATION

# Circular buffer counting sampled traces over the last `BUFFER_DURATION`
self.counter = 0
self.counter_buffer = array.array('L', [0] * self.BUFFER_SIZE)
self._buffer_lock = threading.Lock()
# Last time we sampled a trace, multiplied by `BUCKETS_PER_S`
self.last_track_time = 0

log.info("initialized ThroughputSampler, sample up to %s traces/s", tps)

def sample(self, span):
now = int(span.start * self.BUCKETS_PER_S)

with self._buffer_lock:
last_track_time = self.last_track_time
if now > last_track_time:
self.last_track_time = now
self.expire_buckets(last_track_time, now)

span.sampled = self.counter < self.buffer_limit

if span.sampled:
self.counter += 1
self.counter_buffer[self.key_from_time(now)] += 1

return span

def key_from_time(self, t):
return t % self.BUFFER_SIZE

def expire_buckets(self, start, end):
period = min(self.BUFFER_SIZE, (end - start))
for i in range(period):
key = self.key_from_time(start + i + 1)
self.counter -= self.counter_buffer[key]
self.counter_buffer[key] = 0
9 changes: 4 additions & 5 deletions ddtrace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import threading

from .buffer import ThreadLocalSpanBuffer
from .sampler import RateSampler
from .sampler import DefaultSampler
from .span import Span
from .writer import AgentWriter

Expand All @@ -12,20 +12,21 @@

class Tracer(object):

def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1):
def __init__(self, enabled=True, writer=None, span_buffer=None, sampler=None):
"""
Create a new tracer object.

enabled: if False, no spans will be submitted to the writer
writer: an instance of Writer
span_buffer: a span buffer instance. used to store inflight traces. by
default, will use thread local storage
sample_rate: Pre-sampling rate.
sampler: Trace sampler.
"""
self.enabled = enabled

self._writer = writer or AgentWriter()
self._span_buffer = span_buffer or ThreadLocalSpanBuffer()
self.sampler = sampler or DefaultSampler()

# a list of buffered spans.
self._spans_lock = threading.Lock()
Expand All @@ -34,8 +35,6 @@ def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1):
# a collection of registered services by name.
self._services = {}

self.sampler = RateSampler(sample_rate)

# A hook for local debugging. shouldn't be needed or used
# in production.
self.debug_logging = False
Expand Down
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import os

tests_require = [
'mock',
'nose',
# contrib
'blinker',
'cassandra-driver',
'django',
'elasticsearch',
'flask',
'nose',
'psycopg2',
'redis',
]



version = __version__
# Append a suffix to the version for dev builds
if os.environ.get('VERSION_SUFFIX'):
Expand Down
157 changes: 157 additions & 0 deletions tests/test_sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import division

import unittest
import random
import time
import threading

from ddtrace.tracer import Tracer
from ddtrace.sampler import RateSampler, ThroughputSampler
from .test_tracer import DummyWriter
from .util import patch_time


class RateSamplerTest(unittest.TestCase):

def test_random_sequence(self):
writer = DummyWriter()
sampler = RateSampler(0.5)
tracer = Tracer(writer=writer, sampler=sampler)

# Set the seed so that the choice of sampled traces is deterministic, then write tests accordingly
random.seed(4012)

# First trace, sampled
with tracer.trace("foo") as s:
assert s.sampled
assert s.weight == 2
assert writer.pop()

# Second trace, not sampled
with tracer.trace("figh") as s:
assert not s.sampled
s2 = tracer.trace("what")
assert not s2.sampled
s2.finish()
with tracer.trace("ever") as s3:
assert not s3.sampled
s4 = tracer.trace("!")
assert not s4.sampled
s4.finish()
spans = writer.pop()
assert not spans, spans

# Third trace, not sampled
with tracer.trace("ters") as s:
assert s.sampled
assert writer.pop()


class ThroughputSamplerTest(unittest.TestCase):
"""Test suite for the ThroughputSampler"""

def test_simple_limit(self):
writer = DummyWriter()
tracer = Tracer(writer=writer)

with patch_time() as fake_time:
tps = 5
tracer.sampler = ThroughputSampler(tps)

for _ in range(10):
s = tracer.trace("whatever")
s.finish()
traces = writer.pop()

got = len(traces)
expected = 10

assert got == expected, \
"Wrong number of traces sampled, %s instead of %s" % (got, expected)

# Wait enough to reset
fake_time.sleep(tracer.sampler.BUFFER_DURATION + 1)

for _ in range(100):
s = tracer.trace("whatever")
s.finish()
traces = writer.pop()

got = len(traces)
expected = tps * tracer.sampler.BUFFER_DURATION

assert got == expected, \
"Wrong number of traces sampled, %s instead of %s" % (got, expected)

def test_long_run(self):
writer = DummyWriter()
tracer = Tracer(writer=writer)

# Test a big matrix of combinaisons
# Ensure to have total_time >> BUFFER_DURATION to reduce edge effects
for tps in [10, 23, 15, 31]:
for (traces_per_s, total_time) in [(80, 23), (75, 66), (1000, 77)]:

with patch_time() as fake_time:
# We do tons of operations in this test, do not let the time slowly shift
fake_time.set_delta(0)

tracer.sampler = ThroughputSampler(tps)

for _ in range(total_time):
for _ in range(traces_per_s):
s = tracer.trace("whatever")
s.finish()
fake_time.sleep(1)

traces = writer.pop()
# The current sampler implementation can introduce an error of up to
# `tps * BUFFER_DURATION` traces at initialization (since the sampler starts empty)
got = len(traces)
expected = tps * total_time
error_delta = tps * tracer.sampler.BUFFER_DURATION

assert abs(got - expected) <= error_delta, \
"Wrong number of traces sampled, %s instead of %s (error_delta > %s)" % (got, expected, error_delta)


def test_concurrency(self):
# Test that the sampler works well when used in different threads
writer = DummyWriter()
tracer = Tracer(writer=writer)

total_time = 10
concurrency = 100
end_time = time.time() + total_time

# Let's sample to a multiple of BUFFER_SIZE, so that we can pre-populate the buffer
tps = 15 * ThroughputSampler.BUFFER_SIZE
tracer.sampler = ThroughputSampler(tps)

threads = []

def run_simulation(tracer, end_time):
while time.time() < end_time:
s = tracer.trace("whatever")
s.finish()
# ~1000 traces per s per thread
time.sleep(0.001)

for i in range(concurrency):
thread = threading.Thread(target=run_simulation, args=(tracer, end_time))
threads.append(thread)

for t in threads:
t.start()

for t in threads:
t.join()

traces = writer.pop()

got = len(traces)
expected = tps * total_time
error_delta = tps * ThroughputSampler.BUFFER_DURATION

assert abs(got - expected) <= error_delta, \
"Wrong number of traces sampled, %s instead of %s (error_delta > %s)" % (got, expected, error_delta)
34 changes: 0 additions & 34 deletions tests/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

import time
import random

from nose.tools import eq_

Expand Down Expand Up @@ -104,39 +103,6 @@ def test_tracer_disabled_mem_leak():
s2.finish()
assert not p1, p1

def test_sampling():
writer = DummyWriter()
tracer = Tracer(writer=writer, sample_rate=0.5)

# Set the seed so that the choice of sampled traces is deterministic, then write tests accordingly
random.seed(4012)

# First trace, sampled
with tracer.trace("foo") as s:
assert s.sampled
assert s.weight == 2
assert writer.pop()

# Second trace, not sampled
with tracer.trace("figh") as s:
assert not s.sampled
s2 = tracer.trace("what")
assert not s2.sampled
s2.finish()
with tracer.trace("ever") as s3:
assert not s3.sampled
s4 = tracer.trace("!")
assert not s4.sampled
s4.finish()
spans = writer.pop()
assert not spans, spans

# Third trace, not sampled
with tracer.trace("ters") as s:
assert s.sampled
assert writer.pop()


class DummyWriter(object):
""" DummyWriter is a small fake writer used for tests. not thread-safe. """

Expand Down
Loading