diff --git a/opentelemetry_distro_solarwinds/__init__.py b/opentelemetry_distro_solarwinds/__init__.py index f102a9ca..8c04e25f 100644 --- a/opentelemetry_distro_solarwinds/__init__.py +++ b/opentelemetry_distro_solarwinds/__init__.py @@ -1 +1,10 @@ __version__ = "0.0.1" + +COMMA = "," +COMMA_W3C_SANITIZED = "...." +EQUALS = "=" +EQUALS_W3C_SANITIZED = "####" +SW_TRACESTATE_KEY = "sw" +OTEL_CONTEXT_SW_OPTIONS_KEY = "sw_xtraceoptions" +OTEL_CONTEXT_SW_SIGNATURE_KEY = "sw_signature" +DEFAULT_SW_TRACES_EXPORTER = "solarwinds_exporter" \ No newline at end of file diff --git a/opentelemetry_distro_solarwinds/configurator.py b/opentelemetry_distro_solarwinds/configurator.py new file mode 100644 index 00000000..2a06f7e2 --- /dev/null +++ b/opentelemetry_distro_solarwinds/configurator.py @@ -0,0 +1,156 @@ +"""Module to initialize OpenTelemetry SDK components to work with SolarWinds backend""" + +import logging +from os import environ +from pkg_resources import ( + iter_entry_points, + load_entry_point +) + +from opentelemetry import trace +from opentelemetry.environment_variables import ( + OTEL_PROPAGATORS, + OTEL_TRACES_EXPORTER +) +from opentelemetry.instrumentation.propagators import set_global_response_propagator +from opentelemetry.propagate import set_global_textmap +from opentelemetry.propagators.composite import CompositePropagator +from opentelemetry.sdk._configuration import _OTelSDKConfigurator +from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER +from opentelemetry.sdk.trace import ( + sampling, + TracerProvider +) +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +from opentelemetry_distro_solarwinds import DEFAULT_SW_TRACES_EXPORTER +from opentelemetry_distro_solarwinds.extension.oboe import Reporter +from opentelemetry_distro_solarwinds.response_propagator import SolarWindsTraceResponsePropagator + +logger = logging.getLogger(__name__) + +class SolarWindsConfigurator(_OTelSDKConfigurator): + """OpenTelemetry Configurator for initializing SolarWinds-reporting SDK components""" + + # Cannot set as env default because not part of OTel Python _KNOWN_SAMPLERS + # https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/trace/sampling.py#L364-L380 + _DEFAULT_SW_TRACES_SAMPLER = "solarwinds_sampler" + + def _configure(self, **kwargs): + """Configure OTel sampler, exporter, propagator, response propagator""" + reporter = self._initialize_solarwinds_reporter() + self._configure_sampler() + self._configure_exporter(reporter) + self._configure_propagator() + # Set global HTTP response propagator + set_global_response_propagator(SolarWindsTraceResponsePropagator()) + + def _configure_sampler(self): + """Always configure SolarWinds OTel sampler""" + try: + sampler = load_entry_point( + "opentelemetry_distro_solarwinds", + "opentelemetry_traces_sampler", + self._DEFAULT_SW_TRACES_SAMPLER + )() + except: + logger.exception( + "Failed to load configured sampler {}".format( + self._DEFAULT_SW_TRACES_SAMPLER + ) + ) + raise + trace.set_tracer_provider( + TracerProvider(sampler=sampler) + ) + + def _configure_exporter(self, reporter): + """Configure SolarWinds or env-specified OTel span exporter. + Initialization of SolarWinds exporter requires a liboboe reporter.""" + exporter = None + environ_exporter_name = environ.get(OTEL_TRACES_EXPORTER) + + if environ_exporter_name == DEFAULT_SW_TRACES_EXPORTER: + try: + exporter = load_entry_point( + "opentelemetry_distro_solarwinds", + "opentelemetry_traces_exporter", + environ_exporter_name + )(reporter) + except: + logger.exception( + "Failed to load configured exporter {} with reporter".format( + environ_exporter_name + ) + ) + raise + else: + try: + exporter = next( + iter_entry_points( + "opentelemetry_traces_exporter", + environ_exporter_name + ) + ).load()() + except: + logger.exception( + "Failed to load configured exporter {}".format( + environ_exporter_name + ) + ) + raise + span_processor = BatchSpanProcessor(exporter) + trace.get_tracer_provider().add_span_processor(span_processor) + + def _configure_propagator(self): + """Configure CompositePropagator with SolarWinds and other propagators""" + propagators = [] + environ_propagators_names = environ.get(OTEL_PROPAGATORS).split(",") + for propagator_name in environ_propagators_names: + try: + propagators.append( + next( + iter_entry_points("opentelemetry_propagator", propagator_name) + ).load()() + ) + except Exception: + logger.exception( + "Failed to load configured propagator {}".format( + propagator_name + ) + ) + raise + set_global_textmap(CompositePropagator(propagators)) + + def _initialize_solarwinds_reporter(self) -> Reporter: + """Initialize SolarWinds reporter used by sampler and exporter. This establishes collector and sampling settings in a background thread.""" + log_level = environ.get('SOLARWINDS_DEBUG_LEVEL', 3) + try: + log_level = int(log_level) + except ValueError: + log_level = 3 + # TODO make some of these customizable + return Reporter( + hostname_alias='', + log_level=log_level, + log_file_path='', + max_transactions=-1, + max_flush_wait_time=-1, + events_flush_interval=-1, + max_request_size_bytes=-1, + reporter='ssl', + host=environ.get('SOLARWINDS_COLLECTOR', ''), + service_key=environ.get('SOLARWINDS_SERVICE_KEY', ''), + trusted_path='', + buffer_size=-1, + trace_metrics=-1, + histogram_precision=-1, + token_bucket_capacity=-1, + token_bucket_rate=-1, + file_single=0, + ec2_metadata_timeout=1000, + grpc_proxy='', + stdout_clear_nonblocking=0, + is_grpc_clean_hack_enabled=False, + w3c_trace_format=1, + ) diff --git a/opentelemetry_distro_solarwinds/distro.py b/opentelemetry_distro_solarwinds/distro.py index cdfd7c3b..be0a2b27 100644 --- a/opentelemetry_distro_solarwinds/distro.py +++ b/opentelemetry_distro_solarwinds/distro.py @@ -1,24 +1,52 @@ -"""Module to configure OpenTelemetry agent to work with SolarWinds backend""" +"""Module to configure OpenTelemetry to work with SolarWinds backend""" -from opentelemetry import trace +import logging +from os import environ + +from opentelemetry.environment_variables import ( + OTEL_PROPAGATORS, + OTEL_TRACES_EXPORTER +) from opentelemetry.instrumentation.distro import BaseDistro -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry_distro_solarwinds.exporter import SolarWindsSpanExporter -from opentelemetry_distro_solarwinds.sampler import ParentBasedAoSampler +from opentelemetry_distro_solarwinds import DEFAULT_SW_TRACES_EXPORTER +logger = logging.getLogger(__name__) class SolarWindsDistro(BaseDistro): - """SolarWinds custom distro for OpenTelemetry agents. + """OpenTelemetry Distro for SolarWinds reporting environment""" + + _TRACECONTEXT_PROPAGATOR = "tracecontext" + _SW_PROPAGATOR = "solarwinds_propagator" + _DEFAULT_SW_PROPAGATORS = [ + _TRACECONTEXT_PROPAGATOR, + "baggage", + _SW_PROPAGATOR, + ] - With this custom distro, the following functionality is introduced: - - no functionality added at this time - """ def _configure(self, **kwargs): - # automatically make use of custom SolarWinds sampler - trace.set_tracer_provider( - TracerProvider(sampler=ParentBasedAoSampler())) - # Automatically configure the SolarWinds Span exporter - span_exporter = BatchSpanProcessor(SolarWindsSpanExporter()) - trace.get_tracer_provider().add_span_processor(span_exporter) + """Configure OTel exporter and propagators""" + environ.setdefault(OTEL_TRACES_EXPORTER, DEFAULT_SW_TRACES_EXPORTER) + + environ_propagators = environ.get( + OTEL_PROPAGATORS, + ",".join(self._DEFAULT_SW_PROPAGATORS) + ).split(",") + # If not using the default propagators, + # can any arbitrary list BUT + # (1) must include tracecontext and solarwinds_propagator + # (2) tracecontext must be before solarwinds_propagator + if environ_propagators != self._DEFAULT_SW_PROPAGATORS: + if not self._TRACECONTEXT_PROPAGATOR in environ_propagators or \ + not self._SW_PROPAGATOR in environ_propagators: + raise ValueError("Must include tracecontext and solarwinds_propagator in OTEL_PROPAGATORS to use SolarWinds Observability.") + + if environ_propagators.index(self._SW_PROPAGATOR) \ + < environ_propagators.index(self._TRACECONTEXT_PROPAGATOR): + raise ValueError("tracecontext must be before solarwinds_propagator in OTEL_PROPAGATORS to use SolarWinds Observability.") + environ[OTEL_PROPAGATORS] = ",".join(environ_propagators) + + logger.debug("Configured SolarWindsDistro: {}, {}".format( + environ.get(OTEL_TRACES_EXPORTER), + environ.get(OTEL_PROPAGATORS) + )) diff --git a/opentelemetry_distro_solarwinds/exporter.py b/opentelemetry_distro_solarwinds/exporter.py index bc6973c2..3a03355e 100644 --- a/opentelemetry_distro_solarwinds/exporter.py +++ b/opentelemetry_distro_solarwinds/exporter.py @@ -5,26 +5,25 @@ """ import logging -import os from opentelemetry.sdk.trace.export import SpanExporter -from opentelemetry_distro_solarwinds.extension.oboe import (Context, Metadata, - Reporter) -from opentelemetry_distro_solarwinds.ot_ao_transformer import transform_id +from opentelemetry_distro_solarwinds.extension.oboe import ( + Context, + Metadata +) +from opentelemetry_distro_solarwinds.w3c_transformer import W3CTransformer logger = logging.getLogger(__file__) class SolarWindsSpanExporter(SpanExporter): - """SolarWinds span exporter. - - Reports instrumentation data to the SolarWinds backend. + """SolarWinds custom span exporter for the SolarWinds backend. + Initialization requires a liboboe reporter. """ - def __init__(self, *args, **kw_args): + def __init__(self, reporter, *args, **kw_args): super().__init__(*args, **kw_args) - self.reporter = None - self._initialize_solarwinds_reporter() + self.reporter = reporter def export(self, spans): """Export to AO events and report via liboboe. @@ -36,21 +35,21 @@ def export(self, spans): md = self._build_metadata(span.get_span_context()) if span.parent and span.parent.is_valid: # If there is a parent, we need to add an edge to this parent to this entry event - logger.debug("Continue trace from %s", md.toString()) + logger.debug("Continue trace from {}".format(md.toString())) parent_md = self._build_metadata(span.parent) - evt = Context.startTrace(md, int(span.start_time / 1000), + evt = Context.createEntry(md, int(span.start_time / 1000), parent_md) else: # In OpenTelemrtry, there are no events with individual IDs, but only a span ID # and trace ID. Thus, the entry event needs to be generated such that it has the # same op ID as the span ID of the OTel span. - logger.debug("Start a new trace %s", md.toString()) - evt = Context.startTrace(md, int(span.start_time / 1000)) + logger.debug("Start a new trace {}".format(md.toString())) + evt = Context.createEntry(md, int(span.start_time / 1000)) evt.addInfo('Layer', span.name) evt.addInfo('Language', 'Python') for k, v in span.attributes.items(): evt.addInfo(k, v) - self.reporter.sendReport(evt) + self.reporter.sendReport(evt, False) for event in span.events: if event.name == 'exception': @@ -58,9 +57,9 @@ def export(self, spans): else: self._report_info_event(event) - evt = Context.stopTrace(int(span.end_time / 1000)) + evt = Context.createExit(int(span.end_time / 1000)) evt.addInfo('Layer', span.name) - self.reporter.sendReport(evt) + self.reporter.sendReport(evt, False) def _report_exception_event(self, event): evt = Context.createEvent(int(event.timestamp / 1000)) @@ -76,7 +75,7 @@ def _report_exception_event(self, event): if k not in ('exception.type', 'exception.message', 'exception.stacktrace'): evt.addInfo(k, v) - self.reporter.sendReport(evt) + self.reporter.sendReport(evt, False) def _report_info_event(self, event): print("Found info event") @@ -86,40 +85,10 @@ def _report_info_event(self, event): evt.addInfo('Label', 'info') for k, v in event.attributes.items(): evt.addInfo(k, v) - self.reporter.sendReport(evt) - - def _initialize_solarwinds_reporter(self): - """Initialize liboboe.""" - log_level = os.environ.get('SOLARWINDS_DEBUG_LEVEL', 3) - try: - log_level = int(log_level) - except ValueError: - log_level = 3 - self.reporter = Reporter( - hostname_alias='', - log_level=log_level, - log_file_path='', - max_transactions=-1, - max_flush_wait_time=-1, - events_flush_interval=-1, - max_request_size_bytes=-1, - reporter='ssl', - host=os.environ.get('SOLARWINDS_COLLECTOR', ''), - service_key=os.environ.get('SOLARWINDS_SERVICE_KEY', ''), - trusted_path='', - buffer_size=-1, - trace_metrics=-1, - histogram_precision=-1, - token_bucket_capacity=-1, - token_bucket_rate=-1, - file_single=0, - ec2_metadata_timeout=1000, - grpc_proxy='', - stdout_clear_nonblocking=0, - is_grpc_clean_hack_enabled=False, - w3c_trace_format=1, - ) + self.reporter.sendReport(evt, False) @staticmethod def _build_metadata(span_context): - return Metadata.fromString(transform_id(span_context)) + return Metadata.fromString( + W3CTransformer.traceparent_from_context(span_context) + ) diff --git a/opentelemetry_distro_solarwinds/ot_ao_transformer.py b/opentelemetry_distro_solarwinds/ot_ao_transformer.py deleted file mode 100644 index 21f01cdf..00000000 --- a/opentelemetry_distro_solarwinds/ot_ao_transformer.py +++ /dev/null @@ -1,17 +0,0 @@ -"""Provides functionality to transform OpenTelemetry Data to SolarWinds AppOptics data. -""" - -import logging -import os - -logger = logging.getLogger(__file__) - - -def transform_id(span_context): - """Generates a liboboe W3C compatible trace_context from provided OTel span context.""" - xtr = "00-{0:032X}-{1:016X}-{2:02X}".format(span_context.trace_id, - span_context.span_id, - span_context.trace_flags) - logger.debug("Generated trace_context %s from span context %s", xtr, - span_context) - return xtr diff --git a/opentelemetry_distro_solarwinds/propagator.py b/opentelemetry_distro_solarwinds/propagator.py new file mode 100644 index 00000000..46b578dd --- /dev/null +++ b/opentelemetry_distro_solarwinds/propagator.py @@ -0,0 +1,110 @@ +import logging +import typing + +from opentelemetry import trace +from opentelemetry.context.context import Context +from opentelemetry.propagators import textmap +from opentelemetry.trace.span import TraceState + +from opentelemetry_distro_solarwinds import ( + OTEL_CONTEXT_SW_OPTIONS_KEY, + OTEL_CONTEXT_SW_SIGNATURE_KEY, + SW_TRACESTATE_KEY +) +from opentelemetry_distro_solarwinds.traceoptions import XTraceOptions +from opentelemetry_distro_solarwinds.w3c_transformer import W3CTransformer + +logger = logging.getLogger(__file__) + +class SolarWindsPropagator(textmap.TextMapPropagator): + """Extracts and injects SolarWinds headers for trace propagation. + Must be used in composite with TraceContextTextMapPropagator. + """ + _TRACESTATE_HEADER_NAME = "tracestate" + _XTRACEOPTIONS_HEADER_NAME = "x-trace-options" + _XTRACEOPTIONS_SIGNATURE_HEADER_NAME = "x-trace-options-signature" + + def extract( + self, + carrier: textmap.CarrierT, + context: typing.Optional[Context] = None, + getter: textmap.Getter = textmap.default_getter, + ) -> Context: + """Extracts sw trace options and signature from carrier into OTel + Context. Note: tracestate is extracted by TraceContextTextMapPropagator + """ + if context is None: + context = Context() + + xtraceoptions_header = getter.get( + carrier, + self._XTRACEOPTIONS_HEADER_NAME + ) + if not xtraceoptions_header: + logger.debug("No xtraceoptions to extract; ignoring signature") + return context + + context.update({ + OTEL_CONTEXT_SW_OPTIONS_KEY: xtraceoptions_header[0] + }) + logger.debug("Extracted {} as {}: {}".format( + self._XTRACEOPTIONS_HEADER_NAME, + OTEL_CONTEXT_SW_OPTIONS_KEY, + xtraceoptions_header[0] + )) + + signature_header = getter.get( + carrier, + self._XTRACEOPTIONS_SIGNATURE_HEADER_NAME + ) + if signature_header: + context.update({ + OTEL_CONTEXT_SW_SIGNATURE_KEY: signature_header[0] + }) + logger.debug("Extracted {} as {}: {}".format( + self._XTRACEOPTIONS_SIGNATURE_HEADER_NAME, + OTEL_CONTEXT_SW_SIGNATURE_KEY, + xtraceoptions_header[0] + )) + return context + + def inject( + self, + carrier: textmap.CarrierT, + context: typing.Optional[Context] = None, + setter: textmap.Setter = textmap.default_setter, + ) -> None: + """Injects sw tracestate from carrier into carrier for HTTP request, to get + tracestate injected by previous propagators""" + span = trace.get_current_span(context) + span_context = span.get_span_context() + sw_value = W3CTransformer.sw_from_context(span_context) + trace_state_header = carrier.get(self._TRACESTATE_HEADER_NAME, None) + + # Prepare carrier with carrier's or new tracestate + if trace_state_header: + trace_state = TraceState.from_header([trace_state_header]) + # Check if trace_state already contains sw KV + if SW_TRACESTATE_KEY in trace_state.keys(): + # If so, modify current span_id and trace_flags, and move to beginning of list + logger.debug("Updating trace state for injection with {}".format(sw_value)) + trace_state = trace_state.update(SW_TRACESTATE_KEY, sw_value) + + else: + # If not, add sw KV to beginning of list + logger.debug("Adding KV to trace state for injection with {}".format(sw_value)) + trace_state = trace_state.add(SW_TRACESTATE_KEY, sw_value) + else: + logger.debug("Creating new trace state for injection with {}".format(sw_value)) + trace_state = TraceState([(SW_TRACESTATE_KEY, sw_value)]) + + setter.set( + carrier, self._TRACESTATE_HEADER_NAME, trace_state.to_header() + ) + + @property + def fields(self) -> typing.Set[str]: + """Returns a set with the fields set in `inject`""" + return { + self._TRACESTATE_HEADER_NAME + } diff --git a/opentelemetry_distro_solarwinds/response_propagator.py b/opentelemetry_distro_solarwinds/response_propagator.py new file mode 100644 index 00000000..39d5a149 --- /dev/null +++ b/opentelemetry_distro_solarwinds/response_propagator.py @@ -0,0 +1,83 @@ +import logging +import typing + +from opentelemetry import trace +from opentelemetry.context.context import Context +from opentelemetry.instrumentation.propagators import ResponsePropagator +from opentelemetry.propagators import textmap +from opentelemetry.trace.span import TraceState + +from opentelemetry_distro_solarwinds import ( + COMMA, + COMMA_W3C_SANITIZED, + EQUALS, + EQUALS_W3C_SANITIZED +) +from opentelemetry_distro_solarwinds.traceoptions import XTraceOptions +from opentelemetry_distro_solarwinds.w3c_transformer import W3CTransformer + +logger = logging.getLogger(__file__) + +class SolarWindsTraceResponsePropagator(ResponsePropagator): + """Propagator that injects SW values into HTTP responses""" + _HTTP_HEADER_ACCESS_CONTROL_EXPOSE_HEADERS = "Access-Control-Expose-Headers" + _XTRACE_HEADER_NAME = "x-trace" + _XTRACEOPTIONS_RESPONSE_HEADER_NAME = "x-trace-options-response" + + def inject( + self, + carrier: textmap.CarrierT, + context: typing.Optional[Context] = None, + setter: textmap.Setter = textmap.default_setter, + ) -> None: + """Injects x-trace and options-response into the HTTP response carrier.""" + span = trace.get_current_span(context) + span_context = span.get_span_context() + if span_context == trace.INVALID_SPAN_CONTEXT: + return + + x_trace = W3CTransformer.traceparent_from_context(span_context) + setter.set( + carrier, + self._XTRACE_HEADER_NAME, + x_trace, + ) + exposed_headers = [self._XTRACE_HEADER_NAME] + + xtraceoptions_response = self.recover_response_from_tracestate( + span_context.trace_state + ) + if xtraceoptions_response: + exposed_headers.append("{}".format( + self._XTRACEOPTIONS_RESPONSE_HEADER_NAME + )) + setter.set( + carrier, + self._XTRACEOPTIONS_RESPONSE_HEADER_NAME, + xtraceoptions_response, + ) + setter.set( + carrier, + self._HTTP_HEADER_ACCESS_CONTROL_EXPOSE_HEADERS, + ",".join(exposed_headers), + ) + + def recover_response_from_tracestate( + self, + tracestate: TraceState + ) -> str: + """Use tracestate to recover xtraceoptions response by + converting delimiters: + EQUALS_W3C_SANITIZED becomes EQUALS + COMMA_W3C_SANITIZED becomes COMMA + """ + sanitized = tracestate.get( + XTraceOptions.get_sw_xtraceoptions_response_key(), + None + ) + if not sanitized: + return + return sanitized.replace( + EQUALS_W3C_SANITIZED, + EQUALS + ).replace(COMMA_W3C_SANITIZED, COMMA) diff --git a/opentelemetry_distro_solarwinds/sampler.py b/opentelemetry_distro_solarwinds/sampler.py index 5f14525e..378e2ab4 100644 --- a/opentelemetry_distro_solarwinds/sampler.py +++ b/opentelemetry_distro_solarwinds/sampler.py @@ -4,26 +4,371 @@ """ import logging -from typing import Optional, Sequence +from types import MappingProxyType +from typing import Optional, Sequence, Dict +from opentelemetry.context.context import Context as OtelContext from opentelemetry.sdk.trace.sampling import (Decision, ParentBased, Sampler, SamplingResult) -from opentelemetry.trace import SpanKind +from opentelemetry.trace import Link, SpanKind, get_current_span +from opentelemetry.trace.span import SpanContext, TraceState from opentelemetry.util.types import Attributes +from opentelemetry_distro_solarwinds import ( + COMMA_W3C_SANITIZED, + EQUALS_W3C_SANITIZED, + SW_TRACESTATE_KEY +) from opentelemetry_distro_solarwinds.extension.oboe import Context +from opentelemetry_distro_solarwinds.traceoptions import XTraceOptions +from opentelemetry_distro_solarwinds.w3c_transformer import W3CTransformer logger = logging.getLogger(__name__) -class _AoSampler(Sampler): - """AppOptics Custom sampler which obeys configuration options provided by the AO Backend.""" +class _SwSampler(Sampler): + """SolarWinds custom opentelemetry sampler which obeys configuration options provided by the NH/AO Backend.""" + + _INTERNAL_BUCKET_CAPACITY = "BucketCapacity" + _INTERNAL_BUCKET_RATE = "BucketRate" + _INTERNAL_SAMPLE_RATE = "SampleRate" + _INTERNAL_SAMPLE_SOURCE = "SampleSource" + _INTERNAL_SW_KEYS = "SWKeys" + _LIBOBOE_CONTINUED = -1 + _SW_TRACESTATE_CAPTURE_KEY = "sw.w3c.tracestate" + _SW_TRACESTATE_ROOT_KEY = "sw.tracestate_parent_id" + _XTRACEOPTIONS_RESP_AUTH = "auth" + _XTRACEOPTIONS_RESP_IGNORED = "ignored" + _XTRACEOPTIONS_RESP_TRIGGER_IGNORED = "ignored" + _XTRACEOPTIONS_RESP_TRIGGER_NOT_REQUESTED = "not-requested" + _XTRACEOPTIONS_RESP_TRIGGER_TRACE = "trigger-trace" + def get_description(self) -> str: - return "AppOptics custom sampler" + return "SolarWinds custom opentelemetry sampler" + + def calculate_liboboe_decision( + self, + parent_span_context: SpanContext, + xtraceoptions: Optional[XTraceOptions] = None, + ) -> Dict: + """Calculates oboe trace decision based on parent span context.""" + tracestring = None + if parent_span_context.is_valid and parent_span_context.is_remote: + tracestring = W3CTransformer.traceparent_from_context( + parent_span_context + ) + sw_member_value = parent_span_context.trace_state.get(SW_TRACESTATE_KEY) + + # TODO: local config --> enable/disable tracing, sample_rate, tt mode + tracing_mode = -1 + sample_rate = -1 + trigger_tracing_mode_disabled = -1 + + options = None + trigger_trace = 0 + signature = None + timestamp = None + if xtraceoptions: + options = xtraceoptions.options_header + trigger_trace = xtraceoptions.trigger_trace + signature = xtraceoptions.signature + timestamp = xtraceoptions.ts + + logger.debug( + "Creating new oboe decision with " + "tracestring: {}, " + "sw_member_value: {}, " + "tracing_mode: {}, " + "sample_rate: {}, " + "trigger_trace: {}, " + "trigger_tracing_mode_disabled: {}, " + "options: {}, " + "signature: {}, " + "timestamp: {}".format( + tracestring, + sw_member_value, + tracing_mode, + sample_rate, + trigger_trace, + trigger_tracing_mode_disabled, + options, + signature, + timestamp + )) + do_metrics, do_sample, \ + rate, source, bucket_rate, bucket_cap, decision_type, \ + auth, status_msg, auth_msg, status = Context.getDecisions( + tracestring, + sw_member_value, + tracing_mode, + sample_rate, + trigger_trace, + trigger_tracing_mode_disabled, + options, + signature, + timestamp + ) + decision = { + "do_metrics": do_metrics, + "do_sample": do_sample, + "rate": rate, + "source": source, + "bucket_rate": bucket_rate, + "bucket_cap": bucket_cap, + "decision_type": decision_type, + "auth": auth, + "status_msg": status_msg, + "auth_msg": auth_msg, + "status": status, + } + logger.debug("Got liboboe decision outputs: {}".format(decision)) + return decision + + def is_decision_continued( + self, + liboboe_decision: Dict + ) -> bool: + """Returns True if liboboe decision is a continued one, else False""" + for val in [ + liboboe_decision["rate"], + liboboe_decision["source"], + liboboe_decision["bucket_rate"], + liboboe_decision["bucket_cap"] + ]: + if val != self._LIBOBOE_CONTINUED: + return False + return True + + def otel_decision_from_liboboe( + self, + liboboe_decision: Dict + ) -> Decision: + """Formats OTel decision from liboboe decision""" + decision = Decision.DROP + if liboboe_decision["do_metrics"]: + # TODO: need to eck what record only actually means and how metrics work in OTel + decision = Decision.RECORD_ONLY + if liboboe_decision["do_sample"]: + decision = Decision.RECORD_AND_SAMPLE + logger.debug("OTel decision created: {}".format(decision)) + return decision + + def create_xtraceoptions_response_value( + self, + decision: Dict, + parent_span_context: SpanContext, + xtraceoptions: XTraceOptions + ) -> str: + """Use decision and xtraceoptions to create sanitized xtraceoptions + response kv with W3C tracestate-allowed delimiters: + EQUALS_W3C_SANITIZED instead of EQUALS + COMMA_W3C_SANITIZED instead of COMMA + """ + response = [] + + if xtraceoptions.signature: + response.append(EQUALS_W3C_SANITIZED.join([ + self._XTRACEOPTIONS_RESP_AUTH, + decision["auth_msg"] + ])) + + if not decision["auth"] or decision["auth"] < 1: + trigger_msg = "" + if xtraceoptions.trigger_trace: + # If a traceparent header was provided then oboe does not generate the message + tracestring = None + if parent_span_context.is_valid and parent_span_context.is_remote: + tracestring = W3CTransformer.traceparent_from_context( + parent_span_context + ) + + if tracestring and decision["decision_type"] == 0: + trigger_msg = self._XTRACEOPTIONS_RESP_TRIGGER_IGNORED + else: + trigger_msg = decision["status_msg"] + else: + trigger_msg = self.XTRACEOPTIONS_TRIGGER_NOT_REQUESTED + response.append(EQUALS_W3C_SANITIZED.join([ + self._XTRACEOPTIONS_RESP_TRIGGER_TRACE, + trigger_msg + ])) + + if xtraceoptions.ignored: + response.append( + EQUALS_W3C_SANITIZED.join([ + self._XTRACEOPTIONS_RESP_IGNORED, + (COMMA_W3C_SANITIZED.join(decision["ignored"])) + ]) + ) + + return ";".join(response) + + def create_new_trace_state( + self, + decision: Dict, + parent_span_context: SpanContext, + xtraceoptions: Optional[XTraceOptions] = None, + ) -> TraceState: + """Creates new TraceState based on trace decision, parent span id, + and x-trace-options if provided""" + trace_state = TraceState([( + SW_TRACESTATE_KEY, + W3CTransformer.sw_from_span_and_decision( + parent_span_context.span_id, + W3CTransformer.trace_flags_from_int(decision["do_sample"]) + ) + )]) + if xtraceoptions and xtraceoptions.trigger_trace: + trace_state = trace_state.add( + XTraceOptions.get_sw_xtraceoptions_response_key(), + self.create_xtraceoptions_response_value( + decision, + parent_span_context, + xtraceoptions + ) + ) + logger.debug("Created new trace_state: {}".format(trace_state)) + return trace_state + + def calculate_trace_state( + self, + decision: Dict, + parent_span_context: SpanContext, + xtraceoptions: Optional[XTraceOptions] = None, + ) -> TraceState: + """Calculates trace_state based on parent span context, trace decision, + and x-trace-options if provided -- for non-existent or remote parent + spans only.""" + # No valid parent i.e. root span, or parent is remote + if not parent_span_context.is_valid: + trace_state = self.create_new_trace_state( + decision, + parent_span_context, + xtraceoptions + ) + else: + trace_state = parent_span_context.trace_state + if not trace_state: + # tracestate nonexistent/non-parsable + trace_state = self.create_new_trace_state( + decision, + parent_span_context, + xtraceoptions + ) + else: + # Update trace_state with span_id and sw trace_flags + trace_state = trace_state.update( + SW_TRACESTATE_KEY, + W3CTransformer.sw_from_span_and_decision( + parent_span_context.span_id, + W3CTransformer.trace_flags_from_int(decision["do_sample"]) + ) + ) + # Update trace_state with x-trace-options-response + # Not a propagated header, so always an add + if xtraceoptions and xtraceoptions.trigger_trace: + trace_state = trace_state.add( + XTraceOptions.get_sw_xtraceoptions_response_key(), + self.create_xtraceoptions_response_value( + decision, + parent_span_context, + xtraceoptions + ) + ) + logger.debug("Updated trace_state: {}".format(trace_state)) + return trace_state + + def remove_response_from_sw( + self, + trace_state: TraceState + ) -> TraceState: + """Remove xtraceoptions response from tracestate""" + return trace_state.delete(XTraceOptions.get_sw_xtraceoptions_response_key()) + + def calculate_attributes( + self, + attributes: Attributes, + decision: Dict, + trace_state: TraceState, + parent_span_context: SpanContext, + xtraceoptions: XTraceOptions + ) -> Attributes or None: + """Calculates Attributes or None based on trace decision, trace state, + parent span context, xtraceoptions, and existing attributes.""" + logger.debug("Received attributes: {}".format(attributes)) + # Don't set attributes if not tracing + if self.otel_decision_from_liboboe(decision) == Decision.DROP: + logger.debug("Trace decision is to drop - not setting attributes") + return None + + new_attributes = {} + + # Always (root or is_remote) set _INTERNAL_SW_KEYS if injected + internal_sw_keys = xtraceoptions.sw_keys + if internal_sw_keys: + new_attributes[self._INTERNAL_SW_KEYS] = internal_sw_keys + + # If not a liboboe continued trace, set service entry internal KVs + if not self.is_decision_continued(decision): + new_attributes[self._INTERNAL_BUCKET_CAPACITY] = "{}".format(decision["bucket_cap"]) + new_attributes[self._INTERNAL_BUCKET_RATE] = "{}".format(decision["bucket_rate"]) + new_attributes[self._INTERNAL_SAMPLE_RATE] = decision["rate"] + new_attributes[self._INTERNAL_SAMPLE_SOURCE] = decision["source"] + logger.debug( + "Set attributes with service entry internal KVs: {}".format(new_attributes) + ) + + # Trace's root span has no valid traceparent nor tracestate + # so we can't calculate remaining attributes + if not parent_span_context.is_valid or not trace_state: + logger.debug( + "No valid traceparent or no tracestate - returning attributes: {}" + .format(new_attributes) + ) + + if new_attributes: + # attributes must be immutable for SamplingResult + return MappingProxyType(new_attributes) + else: + return None + + if not attributes: + # _SW_TRACESTATE_ROOT_KEY is set once per trace, if possible + sw_value = parent_span_context.trace_state.get(SW_TRACESTATE_KEY, None) + if sw_value: + new_attributes[self._SW_TRACESTATE_ROOT_KEY]= W3CTransformer.span_id_from_sw(sw_value) + else: + # Copy existing MappingProxyType KV into new_attributes for modification. + # attributes may have other vendor info etc + for k,v in attributes.items(): + new_attributes[k] = v + + tracestate_capture = new_attributes.get(self._SW_TRACESTATE_CAPTURE_KEY, None) + if not tracestate_capture: + trace_state_no_response = self.remove_response_from_sw(trace_state) + else: + # Must retain all potential tracestate pairs for attributes + attr_trace_state = TraceState.from_header([ + tracestate_capture + ]) + new_attr_trace_state = attr_trace_state.update( + SW_TRACESTATE_KEY, + W3CTransformer.sw_from_span_and_decision( + parent_span_context.span_id, + W3CTransformer.trace_flags_from_int(decision["do_sample"]) + ) + ) + trace_state_no_response = self.remove_response_from_sw(new_attr_trace_state) + new_attributes[self._SW_TRACESTATE_CAPTURE_KEY] = trace_state_no_response.to_header() + + logger.debug("Setting attributes: {}".format(new_attributes)) + + # attributes must be immutable for SamplingResult + return MappingProxyType(new_attributes) def should_sample( self, - parent_context: Optional["Context"], + parent_context: Optional[OtelContext], trace_id: int, name: str, kind: SpanKind = None, @@ -31,26 +376,76 @@ def should_sample( links: Sequence["Link"] = None, trace_state: "TraceState" = None, ) -> "SamplingResult": + """ + Calculates SamplingResult based on calculation of new/continued trace + decision, new/updated trace state, and new/updated attributes. + """ + parent_span_context = get_current_span( + parent_context + ).get_span_context() + xtraceoptions = XTraceOptions(parent_context) + + liboboe_decision = self.calculate_liboboe_decision( + parent_span_context, + xtraceoptions + ) + + # Always calculate trace_state for propagation + new_trace_state = self.calculate_trace_state( + liboboe_decision, + parent_span_context, + xtraceoptions + ) + new_attributes = self.calculate_attributes( + attributes, + liboboe_decision, + new_trace_state, + parent_span_context, + xtraceoptions + ) + otel_decision = self.otel_decision_from_liboboe(liboboe_decision) - do_metrics, do_sample, _, _, _, _, _, _, _, _, _ = Context.getDecisions( - None) - decision = Decision.DROP - if do_metrics: - # TODO: need to eck what record only actually means and how metrics work in OTel - decision = Decision.RECORD_ONLY - if do_sample: - decision = Decision.RECORD_AND_SAMPLE return SamplingResult( - decision, - attributes if decision != Decision.DROP else None, - trace_state, + otel_decision, + new_attributes if otel_decision != Decision.DROP else None, + new_trace_state, ) -class ParentBasedAoSampler(ParentBased): +class ParentBasedSwSampler(ParentBased): """ Sampler that respects its parent span's sampling decision, but otherwise - samples according to the configurations from the AO backend. + samples according to the configurations from the NH/AO backend. """ def __init__(self): - super().__init__(root=_AoSampler()) + """ + Uses _SwSampler/liboboe if no parent span. + Uses _SwSampler/liboboe if parent span is_remote. + Uses OTEL defaults if parent span is_local. + """ + super().__init__( + root=_SwSampler(), + remote_parent_sampled=_SwSampler(), + remote_parent_not_sampled=_SwSampler() + ) + + def should_sample( + self, + parent_context: Optional["Context"], + trace_id: int, + name: str, + kind: SpanKind = None, + attributes: Attributes = None, + links: Sequence["Link"] = None, + trace_state: "TraceState" = None + ) -> "SamplingResult": + + return super().should_sample( + parent_context, + trace_id, + name, + kind, + attributes, + links, + trace_state + ) diff --git a/opentelemetry_distro_solarwinds/traceoptions.py b/opentelemetry_distro_solarwinds/traceoptions.py new file mode 100644 index 00000000..6d04a101 --- /dev/null +++ b/opentelemetry_distro_solarwinds/traceoptions.py @@ -0,0 +1,98 @@ +import logging +import re +import typing + +from opentelemetry.context.context import Context + +from opentelemetry_distro_solarwinds import ( + OTEL_CONTEXT_SW_OPTIONS_KEY, + OTEL_CONTEXT_SW_SIGNATURE_KEY +) + +logger = logging.getLogger(__file__) + +class XTraceOptions(): + """Formats X-Trace-Options and signature for trigger tracing""" + + _SW_XTRACEOPTIONS_RESPONSE_KEY = "xtrace_options_response" + _XTRACEOPTIONS_CUSTOM = ("^custom-[^\s]*$") + _XTRACEOPTIONS_CUSTOM_RE = re.compile(_XTRACEOPTIONS_CUSTOM) + + _XTRACEOPTIONS_HEADER_KEY_SW_KEYS = "sw-keys" + _XTRACEOPTIONS_HEADER_KEY_TRIGGER_TRACE = "trigger-trace" + _XTRACEOPTIONS_HEADER_KEY_TS = "ts" + + def __init__(self, + context: typing.Optional[Context] = None, + ): + """ + Args: + context: OTel context that may contain OTEL_CONTEXT_SW_OPTIONS_KEY,OTEL_CONTEXT_SW_SIGNATURE_KEY + """ + self.ignored = [] + self.options_header = "" + self.signature = None + self.sw_keys = "" + self.trigger_trace = 0 + self.ts = 0 + + if not context: + return + options_header = context.get(OTEL_CONTEXT_SW_OPTIONS_KEY, None) + if not options_header: + return + + # store original header for sample decision later + self.options_header = options_header + + traceoptions = re.split(r";+", options_header) + for option in traceoptions: + # KVs (e.g. sw-keys or custom-key1) are assigned by equals + option_kv = option.split("=", 2) + if not option_kv[0]: + continue + + option_key = option_kv[0].strip() + if option_key == self._XTRACEOPTIONS_HEADER_KEY_TRIGGER_TRACE: + if len(option_kv) > 1: + logger.debug("trigger-trace must be standalone flag. Ignoring.") + self.ignored.append( + self._XTRACEOPTIONS_HEADER_KEY_TRIGGER_TRACE + ) + else: + self.trigger_trace = 1 + + elif option_key == self._XTRACEOPTIONS_HEADER_KEY_SW_KEYS: + self.sw_keys = option_kv[1].strip() + + elif re.match(self._XTRACEOPTIONS_CUSTOM_RE, option_key): + # custom keys are valid but do not need parsing + pass + + elif option_key == self._XTRACEOPTIONS_HEADER_KEY_TS: + try: + self.ts = int(option_kv[1]) + except ValueError as e: + logger.debug("ts must be base 10 int. Ignoring.") + self.ignore.append(self._XTRACEOPTIONS_HEADER_KEY_TS) + + else: + logger.debug( + "{} is not a recognized trace option. Ignoring".format( + option_key + )) + self.ignored.append(option_key) + + if self.ignored: + logger.debug( + "Some x-trace-options were ignored: {}".format( + ", ".join(self.ignored) + )) + + options_signature = context.get(OTEL_CONTEXT_SW_SIGNATURE_KEY, None) + if options_signature: + self.signature = options_signature + + @classmethod + def get_sw_xtraceoptions_response_key(cls) -> str: + return cls._SW_XTRACEOPTIONS_RESPONSE_KEY diff --git a/opentelemetry_distro_solarwinds/w3c_transformer.py b/opentelemetry_distro_solarwinds/w3c_transformer.py new file mode 100644 index 00000000..9859ac17 --- /dev/null +++ b/opentelemetry_distro_solarwinds/w3c_transformer.py @@ -0,0 +1,67 @@ +"""Provides functionality to transform OpenTelemetry Data to SolarWinds AppOptics data. +""" + +import logging +from opentelemetry.context.context import Context + +logger = logging.getLogger(__file__) + +class W3CTransformer(): + """Transforms inputs to W3C-compliant data for SolarWinds context propagation""" + + _DECISION = "{}" + _SPAN_ID_HEX = "{:016x}" + _TRACE_FLAGS_HEX = "{:02x}" + _TRACE_ID_HEX = "{:032x}" + _VERSION = "00" + + @classmethod + def span_id_from_int(cls, span_id: int) -> str: + """Formats span ID as 16-byte hexadecimal string""" + return cls._SPAN_ID_HEX.format(span_id) + + @classmethod + def span_id_from_sw(cls, sw: str) -> str: + """Formats span ID from sw tracestate value""" + return sw.split("-")[0] + + @classmethod + def trace_flags_from_int(cls, trace_flags: int) -> str: + """Formats trace flags as 8-bit field""" + return cls._TRACE_FLAGS_HEX.format(trace_flags) + + @classmethod + def traceparent_from_context(cls, span_context: Context) -> str: + """Generates a liboboe W3C compatible trace_context from + provided OTel span context.""" + template = "-".join([ + cls._VERSION, + cls._TRACE_ID_HEX, + cls._SPAN_ID_HEX, + cls._TRACE_FLAGS_HEX + ]) + xtr = template.format( + span_context.trace_id, + span_context.span_id, + span_context.trace_flags + ) + logger.debug("Generated traceparent {} from {}".format(xtr, span_context)) + return xtr + + @classmethod + def sw_from_context(cls, span_context: Context) -> str: + """Formats tracestate sw value from SpanContext as 16-byte span_id + with 8-bit trace_flags. + + Example: 1a2b3c4d5e6f7g8h-01""" + sw = "-".join([cls._SPAN_ID_HEX, cls._TRACE_FLAGS_HEX]) + return sw.format(span_context.span_id, span_context.trace_flags) + + @classmethod + def sw_from_span_and_decision(cls, span_id: int, decision: str) -> str: + """Formats tracestate sw value from span_id and liboboe decision + as 16-byte span_id with 8-bit trace_flags. + + Example: 1a2b3c4d5e6f7g8h-01""" + sw = "-".join([cls._SPAN_ID_HEX, cls._DECISION]) + return sw.format(span_id, decision) diff --git a/setup.cfg b/setup.cfg index 430717b5..50733850 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,12 +16,20 @@ classifiers = Programming Language :: Python :: 3.8 [options] -python_requires = >=3.4 +python_requires = >=3.6 install_requires = - opentelemetry-api == 1.3.0 - opentelemetry-sdk == 1.3.0 - opentelemetry-instrumentation == 0.22b0 + opentelemetry-api == 1.11.0 + opentelemetry-sdk == 1.11.0 + opentelemetry-instrumentation == 0.30b0 [options.entry_points] opentelemetry_distro = solarwinds_distro = opentelemetry_distro_solarwinds.distro:SolarWindsDistro +opentelemetry_configurator = + solarwinds_configurator = opentelemetry_distro_solarwinds.configurator:SolarWindsConfigurator +opentelemetry_propagator = + solarwinds_propagator = opentelemetry_distro_solarwinds.propagator:SolarWindsPropagator +opentelemetry_traces_exporter = + solarwinds_exporter = opentelemetry_distro_solarwinds.exporter:SolarWindsSpanExporter +opentelemetry_traces_sampler = + solarwinds_sampler = opentelemetry_distro_solarwinds.sampler:ParentBasedSwSampler \ No newline at end of file diff --git a/setup.py b/setup.py index b8b3ceb5..7ef9e778 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ def link_oboe_lib(src_lib): if os.path.exists(dst): # if the destination library files exist already, they need to be deleted, otherwise linking will fail os.remove(dst) - log.info("Removed %s" % dst) + log.info("Removed {}".format(dst)) os.symlink(src_lib, dst) log.info("Created new link at {} to {}".format(dst, src_lib)) except Exception as ecp: