Skip to content

Commit

Permalink
Make zipkin tag value length configurable (#1151)
Browse files Browse the repository at this point in the history
Zipkin exporter truncates tag values to a maximum length of 128
characters. This commit makes this value configurable while keeping
128 as the default value.
  • Loading branch information
owais authored Sep 24, 2020
1 parent 2b46d11 commit c1ec444
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 36 deletions.
3 changes: 3 additions & 0 deletions exporter/opentelemetry-exporter-zipkin/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Zipkin exporter now accepts a ``max_tag_value_length`` attribute to customize the
maximum allowed size a tag value can have. ([#1151](https://github.com/open-telemetry/opentelemetry-python/pull/1151))

## Version 0.13b0

Released 2020-09-17
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

DEFAULT_RETRY = False
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
ZIPKIN_HEADERS = {"Content-Type": "application/json"}

SPAN_KIND_MAP = {
Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(
ipv4: Optional[str] = None,
ipv6: Optional[str] = None,
retry: Optional[str] = DEFAULT_RETRY,
max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH,
):
self.service_name = service_name
if url is None:
Expand All @@ -122,6 +124,7 @@ def __init__(
self.ipv4 = ipv4
self.ipv6 = ipv6
self.retry = retry
self.max_tag_value_length = max_tag_value_length

def export(self, spans: Sequence[Span]) -> SpanExportResult:
zipkin_spans = self._translate_to_zipkin(spans)
Expand All @@ -141,6 +144,9 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
return SpanExportResult.FAILURE
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
pass

def _translate_to_zipkin(self, spans: Sequence[Span]):

local_endpoint = {"serviceName": self.service_name, "port": self.port}
Expand Down Expand Up @@ -171,8 +177,10 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
"duration": duration_mus,
"localEndpoint": local_endpoint,
"kind": SPAN_KIND_MAP[span.kind],
"tags": _extract_tags_from_span(span),
"annotations": _extract_annotations_from_events(span.events),
"tags": self._extract_tags_from_span(span),
"annotations": self._extract_annotations_from_events(
span.events
),
}

if span.instrumentation_info is not None:
Expand Down Expand Up @@ -205,42 +213,44 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
zipkin_spans.append(zipkin_span)
return zipkin_spans

def shutdown(self) -> None:
pass

def _extract_tags_from_dict(self, tags_dict):
tags = {}
if not tags_dict:
return tags
for attribute_key, attribute_value in tags_dict.items():
if isinstance(attribute_value, (int, bool, float)):
value = str(attribute_value)
elif isinstance(attribute_value, str):
value = attribute_value
else:
logger.warning("Could not serialize tag %s", attribute_key)
continue

if self.max_tag_value_length > 0:
value = value[: self.max_tag_value_length]
tags[attribute_key] = value
return tags

def _extract_tags_from_dict(tags_dict):
tags = {}
if not tags_dict:
def _extract_tags_from_span(self, span: Span):
tags = self._extract_tags_from_dict(getattr(span, "attributes", None))
if span.resource:
tags.update(self._extract_tags_from_dict(span.resource.attributes))
return tags
for attribute_key, attribute_value in tags_dict.items():
if isinstance(attribute_value, (int, bool, float)):
value = str(attribute_value)
elif isinstance(attribute_value, str):
value = attribute_value[:128]
else:
logger.warning("Could not serialize tag %s", attribute_key)
continue
tags[attribute_key] = value
return tags


def _extract_tags_from_span(span: Span):
tags = _extract_tags_from_dict(getattr(span, "attributes", None))
if span.resource:
tags.update(_extract_tags_from_dict(span.resource.attributes))
return tags


def _extract_annotations_from_events(events):
return (
[
{"timestamp": _nsec_to_usec_round(e.timestamp), "value": e.name}
for e in events
]
if events
else None
)

def _extract_annotations_from_events(
self, events
): # pylint: disable=R0201
return (
[
{
"timestamp": _nsec_to_usec_round(e.timestamp),
"value": e.name,
}
for e in events
]
if events
else None
)


def _nsec_to_usec_round(nsec):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,50 @@ def test_invalid_response(self, mock_post):
exporter = ZipkinSpanExporter("test-service")
status = exporter.export(spans)
self.assertEqual(SpanExportResult.FAILURE, status)

def test_max_tag_length(self):
service_name = "test-service"

span_context = trace_api.SpanContext(
0x0E0C63257DE34C926F9EFCD03927272E,
0x04BF92DEEFC58C92,
is_remote=False,
trace_flags=TraceFlags(TraceFlags.SAMPLED),
)

span = trace.Span(name="test-span", context=span_context,)

span.start()
span.resource = Resource({})
# added here to preserve order
span.set_attribute("k1", "v" * 500)
span.set_attribute("k2", "v" * 50)
span.set_status(
Status(StatusCanonicalCode.UNKNOWN, "Example description")
)
span.end()

exporter = ZipkinSpanExporter(service_name)
mock_post = MagicMock()
with patch("requests.post", mock_post):
mock_post.return_value = MockResponse(200)
status = exporter.export([span])
self.assertEqual(SpanExportResult.SUCCESS, status)

_, kwargs = mock_post.call_args # pylint: disable=E0633

tags = json.loads(kwargs["data"])[0]["tags"]
self.assertEqual(len(tags["k1"]), 128)
self.assertEqual(len(tags["k2"]), 50)

exporter = ZipkinSpanExporter(service_name, max_tag_value_length=2)
mock_post = MagicMock()
with patch("requests.post", mock_post):
mock_post.return_value = MockResponse(200)
status = exporter.export([span])
self.assertEqual(SpanExportResult.SUCCESS, status)

_, kwargs = mock_post.call_args # pylint: disable=E0633
tags = json.loads(kwargs["data"])[0]["tags"]
self.assertEqual(len(tags["k1"]), 2)
self.assertEqual(len(tags["k2"]), 2)

0 comments on commit c1ec444

Please sign in to comment.