Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/trace/exporters: add batch span processor exporter #153

Merged
109 changes: 109 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
# limitations under the License.

import logging
import queue
import threading
import typing
from enum import Enum

from opentelemetry.sdk import util

from .. import Span, SpanProcessor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,6 +82,111 @@ def shutdown(self) -> None:
self.span_exporter.shutdown()


class BatchExportSpanProcessor(SpanProcessor):
"""Batch span processor implementation.

BatchExportSpanProcessor is an implementation of `SpanProcessor` that
batches ended spans and pushes them to the configured `SpanExporter`.
"""

def __init__(
self,
span_exporter: SpanExporter,
max_queue_size: int = 2048,
schedule_delay_millis: float = 5000,
c24t marked this conversation as resolved.
Show resolved Hide resolved
max_export_batch_size: int = 512,
):
if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved

if schedule_delay_millis <= 0:
raise ValueError("schedule_delay_millis must be positive.")

if max_export_batch_size <= 0:
raise ValueError(
"max_export_batch_size must be a positive integer."
)

if max_export_batch_size > max_queue_size:
raise ValueError(
"max_export_batch_size must be less and equal to max_export_batch_size."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"max_export_batch_size must be less and equal to max_export_batch_size."
"max_export_batch_size must be less than or equal to max_export_batch_size."

Also FWIW the type annotations don't do anything at runtime, if you want to enforce int/float types here we need a type check too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That check is not strictly needed, I just want a number, if the user pass something else it'll fail at some point.

)

self.span_exporter = span_exporter
self.queue = queue.Queue(max_queue_size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You never call https://docs.python.org/3/library/queue.html#queue.Queue.task_done on the queue. Maybe a https://docs.python.org/3/library/collections.html#collections.deque would be the better (more light-weight) choice?

Deques support thread-safe, memory efficient appends and pops

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it'll work. It doesn't provide a way to access the number of elements in the queue, so an external counter for the number of elements would be needed (not sure if this will work because deque drops elements without warning).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, just use len(mydeque) :

In addition to the above, deques support iteration, pickling, len(d), reversed(d), copy.copy(d), copy.deepcopy(d), membership testing with the in operator, and subscript references such as d[-1].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally right, I need more coffee.

I used it, two changes:

  • now older spans are dropped (that's the way deque works, cannot be changed).
  • it is not possible to count the number of dropped spans (we can guess that spans would be dropped).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • now older spans are dropped

It sounds like we need to clarify that in the spec, I actually expected that we'd drop the oldest spans first.

  • it is not possible to count the number of dropped spans

I think it is if we lock around adding spans to the deque, which we might need to do later anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only consider CPython with its GIL, a plain list with a lock (condition) might actually be the best solution after all. But I expect the deque without locking at every added span to perform significantly better in GIL-less Python (pypy). By introducing a single lock that is called on every span.end(), we would effectively reintroduce a sort of GIL (even though we only hold this lock for a short time at once, it would be highly contended).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to avoid as much as possible having a lock when adding an element as it will content on all span endings.
That said, we could look more into detail of this implementation later on, there is too much room to improve and discuss.

self.worker_thread = threading.Thread(target=self.worker, daemon=True)
self.condition = threading.Condition()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.condition = threading.Condition()
self.condition = threading.Condition(threading.Lock())

Otherwise, a RLock is created by default, but we don't need one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Oberon00 what's wrong with the default RLock?

Copy link
Member

@Oberon00 Oberon00 Sep 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just don't need the additional guarantees that a recursive lock offers. Lock is at least as efficient as RLock (after all, a Lock could be implemented as being the same as an RLock but not the other way round).

self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.half_max_queue_size = max_queue_size // 2
self.done = False

self.worker_thread.start()

def on_start(self, span: Span) -> None:
pass

def on_end(self, span: Span) -> None:
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should also check done here and bail out with a warning if it is true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think specification is not clear if onEnd() could be called after shutdown(), anyway, be defensive now and check it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing we should maybe do is check https://docs.python.org/3/library/threading.html#threading.Thread.is_alive for our worker and restart it (logging a warning) if it crashed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this one. Maybe if that thread could crash we should implement a health check somewhere else I think. I don't want to make this onEnd slower because it's called to each span.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, about the slowness, we could maybe only check it in cases where we notify the condition.

self.queue.put(span, block=False)
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
except queue.Full:
# TODO: dropped spans counter?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped span counter sounds like a plan. Or we could log a warning the first time a span is dropped.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'd just log the first time a span is dropped. A better approach could be a rate-limited logging system that actually prints the number of spans being dropped per second or so.

pass
if self.queue.qsize() >= self.half_max_queue_size:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.queue.qsize() >= self.half_max_queue_size:
if self.queue.qsize() == self.half_max_queue_size:

I think we send too many notifications otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create a variable to avoid this "notification storm", the equal comparison could not work because it is possible that the check misses it (two spans end at the same time...).
Please give me your feedback in the new solution.

with self.condition:
self.condition.notify_all()

def worker(self):
timeout = self.schedule_delay_millis / 1e3
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
while not self.done:
if self.queue.qsize() < self.max_export_batch_size:
with self.condition:
self.condition.wait(timeout)
if self.queue.empty():
# spurious notification, let's wait again
continue
if self.done:
# missing spans will be sent when calling flush
break
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved

# substract the duration of this export call to the next timeout
start = util.time_ns()
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
self.export()
end = util.time_ns()
duration = (end - start) / 1e9
timeout = self.schedule_delay_millis / 1e3 - duration

# be sure that all spans are sent
self._flush()

def export(self):
"""Exports at most max_export_batch_size spans."""
idx = 0
spans = []
# currently only a single thread acts as consumer, so queue.get() will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how I could integrate it. It'd a big redesign.

# never block
while idx < self.max_export_batch_size and not self.queue.empty():
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
spans.append(self.queue.get())
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
idx += 1
try:
self.span_exporter.export(spans)
# pylint: disable=broad-except
except Exception as exc:
logger.warning("Exception while exporting data: %s", exc)
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved

def _flush(self):
while not self.queue.empty():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking queue.empty() here again, we could have export return a bool.

self.export()

def shutdown(self) -> None:
# signal the worker thread to finish and then wait for it
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
self.span_exporter.shutdown()


class ConsoleSpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that prints spans to the
console.
Expand Down
147 changes: 140 additions & 7 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,166 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import unittest

from opentelemetry.sdk import trace
from opentelemetry.sdk.trace import export


class MySpanExporter(export.SpanExporter):
"""Very simple span exporter used for testing."""

def __init__(self, destination, max_export_batch_size=None):
self.destination = destination
self.max_export_batch_size = max_export_batch_size

def export(self, spans: trace.Span) -> export.SpanExportResult:
if (
self.max_export_batch_size is not None
and len(spans) > self.max_export_batch_size
):
raise ValueError("Batch is too big")
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS


class TestSimpleExportSpanProcessor(unittest.TestCase):
def test_simple_span_processor(self):
class MySpanExporter(export.SpanExporter):
def __init__(self, destination):
self.destination = destination
tracer = trace.Tracer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote to leave the tracer out of these tests and call on_end directly with some mock spans instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, make them look more like test units than integration tests.
I updated it.


spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.SimpleExportSpanProcessor(my_exporter)
tracer.add_span_processor(span_processor)

def export(self, spans: trace.Span) -> export.SpanExportResult:
self.destination.extend(span.name for span in spans)
return export.SpanExportResult.SUCCESS
with tracer.start_span("foo"):
with tracer.start_span("bar"):
with tracer.start_span("xxx"):
pass

self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)


class TestBatchExportSpanProcessor(unittest.TestCase):
def test_batch_span_processor(self):
tracer = trace.Tracer()

spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.SimpleExportSpanProcessor(my_exporter)
span_processor = export.BatchExportSpanProcessor(my_exporter)
tracer.add_span_processor(span_processor)

with tracer.start_span("foo"):
with tracer.start_span("bar"):
with tracer.start_span("xxx"):
pass

# call shutdown on specific span processor
# TODO: this call is missing in the tracer
span_processor.shutdown()
self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)

def test_batch_span_processor_lossless(self):
"""Test that no spans are lost when sending max_queue_size spans"""
tracer = trace.Tracer()

spans_names_list = []

my_exporter = MySpanExporter(
destination=spans_names_list, max_export_batch_size=128
)
span_processor = export.BatchExportSpanProcessor(
my_exporter, max_queue_size=512, max_export_batch_size=128
)
tracer.add_span_processor(span_processor)

for idx in range(512):
with tracer.start_span("foo{}".format(idx)):
pass

# call shutdown on specific span processor
# TODO: this call is missing in the tracer
span_processor.shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this test and the one above just seem to check _flush, it's probably worth adding a separate check that we only export max_export_batch_size many spans at a time during normal operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it.

self.assertEqual(len(spans_names_list), 512)

def test_batch_span_processor_scheduled_delay(self):
"""Test that spans are exported each schedule_delay_millis"""
tracer = trace.Tracer()

spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchExportSpanProcessor(
my_exporter, schedule_delay_millis=50
)
tracer.add_span_processor(span_processor)

# start single span
with tracer.start_span("foo1"):
pass

time.sleep(0.05 + 0.02)
# span should be already exported
self.assertEqual(len(spans_names_list), 1)

# call shutdown on specific span processor
# TODO: this call is missing in the tracer
span_processor.shutdown()

def test_batch_span_processor_parameters(self):
# zero max_queue_size
self.assertRaises(
ValueError, export.BatchExportSpanProcessor, None, max_queue_size=0
)

# negative max_queue_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_queue_size=-500,
)

# zero schedule_delay_millis
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
schedule_delay_millis=0,
)

# negative schedule_delay_millis
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
schedule_delay_millis=-500,
)

# zero max_export_batch_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_export_batch_size=0,
)

# negative max_export_batch_size
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_export_batch_size=-500,
)

# max_export_batch_size > max_queue_size:
self.assertRaises(
ValueError,
export.BatchExportSpanProcessor,
None,
max_queue_size=256,
max_export_batch_size=512,
)