Skip to content

Commit

Permalink
feat(writer): add retry logic
Browse files Browse the repository at this point in the history
This change introduces a Fibonacci retry policy (with jitter) to the
agent writer to mitigate networking issues (e.g. timeouts, broken pipes,
...), similar to what the profiler does already.

Resolves DataDog#1413.
  • Loading branch information
P403n1x87 committed May 20, 2021
1 parent 4ac313b commit 8e99cd3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
18 changes: 15 additions & 3 deletions ddtrace/internal/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import TextIO

import six
import tenacity

import ddtrace
from ddtrace.vendor.dogstatsd import DogStatsd
Expand Down Expand Up @@ -193,6 +194,8 @@ class AgentWriter(periodic.PeriodicService, TraceWriter):
should be in the same trace.
"""

RETRY_ATTEMPTS = 3

def __init__(
self,
agent_url, # type: str
Expand Down Expand Up @@ -246,6 +249,15 @@ def __init__(
self._metrics_reset()
self._drop_sma = SimpleMovingAverage(DEFAULT_SMA_WINDOW)
self._sync_mode = sync_mode
self._retry_upload = tenacity.Retrying(
# Retry RETRY_ATTEMPTS times within the first half of the processing
# interval, using a Fibonacci policy with jitter
wait=tenacity.wait_random_exponential(
multiplier=0.618 * self.interval / (1.618 ** self.RETRY_ATTEMPTS) / 2, exp_base=1.618
),
stop=tenacity.stop_after_attempt(self.RETRY_ATTEMPTS),
retry=tenacity.retry_if_exception_type((compat.httplib.HTTPException, OSError, IOError)),
)

def _metrics_dist(self, name, count=1, tags=None):
self._metrics[name]["count"] += count
Expand Down Expand Up @@ -417,13 +429,13 @@ def flush_queue(self, raise_exc=False):

encoded = self._encoder.join_encoded(enc_traces)
try:
self._send_payload(encoded, len(enc_traces))
except (compat.httplib.HTTPException, OSError, IOError):
self._retry_upload(self._send_payload, encoded, len(enc_traces))
except tenacity.RetryError as e:
self._metrics_dist("http.errors", tags=["type:err"])
self._metrics_dist("http.dropped.bytes", len(encoded))
self._metrics_dist("http.dropped.traces", len(enc_traces))
if raise_exc:
raise
e.reraise()
else:
log.error("failed to send traces to Datadog Agent at %s", self.agent_url, exc_info=True)
finally:
Expand Down
5 changes: 5 additions & 0 deletions releasenotes/notes/tracer-upload-retry-d60310aa6c91059d.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
features:
- |
Added retry logic to the tracer to mitigate potential networking issues,
like timeouts or dropped connections.
10 changes: 5 additions & 5 deletions tests/tracer/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_metrics_bad_endpoint(self):
[
mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]),
mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
mock.call("datadog.tracer.http.requests", 1, tags=[]),
mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
],
Expand All @@ -92,7 +92,7 @@ def test_metrics_trace_too_big(self):
mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
mock.call("datadog.tracer.buffer.dropped.traces", 1, tags=["reason:t_too_big"]),
mock.call("datadog.tracer.buffer.dropped.bytes", AnyInt(), tags=["reason:t_too_big"]),
mock.call("datadog.tracer.http.requests", 1, tags=[]),
mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
],
Expand All @@ -111,7 +111,7 @@ def test_metrics_multi(self):
[
mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]),
mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
mock.call("datadog.tracer.http.requests", 1, tags=[]),
mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
],
Expand All @@ -131,7 +131,7 @@ def test_metrics_multi(self):
[
mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]),
mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
mock.call("datadog.tracer.http.requests", 1, tags=[]),
mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
],
Expand All @@ -146,7 +146,7 @@ def test_write_sync(self):
[
mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=[]),
mock.call("datadog.tracer.buffer.accepted.spans", 5, tags=[]),
mock.call("datadog.tracer.http.requests", 1, tags=[]),
mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
],
Expand Down

0 comments on commit 8e99cd3

Please sign in to comment.