Skip to content

Commit

Permalink
Merge pull request #91 from frigus02/otel027
Browse files Browse the repository at this point in the history
Update to OpenTelemetry 0.27
  • Loading branch information
frigus02 authored Nov 12, 2024
2 parents 6973d24 + 3a23927 commit 4921c34
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 94 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

- Upgrade `opentelemetry` dependencies to `v0.27`.
- Upgrade `thiserror` dependency to `v2`.

## [0.36.0] - 2024-10-15

- Upgrade `opentelemetry` dependencies to `v0.26`. Thanks, [sezna@](https://github.com/sezna).
Expand Down
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,29 @@ async-trait = "0.1"
bytes = "1"
chrono = "0.4"
flate2 = "1"
futures-util = { version = "0.3", default-features = false, optional = true }
http = "1"
once_cell = "1"
futures-util = { version = "0.3", default-features = false, optional = true }
opentelemetry = "0.26"
opentelemetry_sdk = "0.26"
opentelemetry-http = "0.26"
opentelemetry-semantic-conventions = { version = "0.26", features = ["semconv_experimental"] }
opentelemetry = "0.27"
opentelemetry-http = "0.27"
opentelemetry-semantic-conventions = { version = "0.27", features = ["semconv_experimental"] }
opentelemetry_sdk = "0.27"
reqwest = { version = "0.12", default-features = false, features = ["blocking"], optional = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_repr = "0.1"
thiserror = "1"
sysinfo = { version = "0.30", optional = true }
thiserror = "2"

[dev-dependencies]
async-std = { version = "1.13.0", features = ["attributes"] }
doc-comment = "0.3.3"
env_logger = "0.11.3"
insta = "1.39.0"
log = { version = "0.4", features = ["kv", "kv_sval"] }
opentelemetry_sdk = { version = "0.26", features = ["rt-async-std", "rt-tokio", "rt-tokio-current-thread", "logs_level_enabled"] }
opentelemetry-http = { version = "0.26", features = ["reqwest"] }
opentelemetry-appender-log = { version = "0.26", features = ["with-serde"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-async-std", "rt-tokio", "rt-tokio-current-thread", "spec_unstable_logs_enabled"] }
opentelemetry-http = { version = "0.27", features = ["reqwest"] }
opentelemetry-appender-log = { version = "0.27", features = ["with-serde"] }
rand = "0.8.5"
regex = "1.10.5"
reqwest = { version = "0.12", features = ["blocking"] }
Expand Down
4 changes: 2 additions & 2 deletions examples/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
&[KeyValue::new("state", "idle"), KeyValue::new("cpu", 0)],
)
})
.init();
.build();

// Recorder
let server_duration = meter
.u64_histogram("http.server.duration")
.with_unit("milliseconds")
.init();
.build();
let mut rng = thread_rng();
for _ in 1..10 {
server_duration.record(
Expand Down
1 change: 1 addition & 0 deletions src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl AttrValue for AnyValue {
s.push('}');
s.into()
}
_ => format!("{:?}", self).into(),
}
}
}
Expand Down
35 changes: 22 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
//!
//! // Record value
//! let meter = global::meter("example");
//! let histogram = meter.f64_histogram("pi").init();
//! let histogram = meter.f64_histogram("pi").build();
//! histogram.record(3.14, &[]);
//!
//! // Simulate work, during which metrics will periodically be reported.
Expand Down Expand Up @@ -372,10 +372,13 @@ mod uploader_quick_pulse;
use connection_string::DEFAULT_LIVE_ENDPOINT;
use connection_string::{ConnectionString, DEFAULT_BREEZE_ENDPOINT};
pub use models::context_tag_keys::attrs;
use opentelemetry::trace::ExportError as TraceExportError;
#[cfg(feature = "trace")]
use opentelemetry::InstrumentationScope;
#[cfg(feature = "trace")]
use opentelemetry::{global, trace::TracerProvider as _, KeyValue, Value};
pub use opentelemetry_http::HttpClient;
use opentelemetry_sdk::export::ExportError;
use opentelemetry_sdk::export::ExportError as SdkExportError;
#[cfg(any(feature = "trace", feature = "logs"))]
use opentelemetry_sdk::Resource;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -701,11 +704,7 @@ where
/// that.
pub fn install_simple(self) -> Tracer {
let trace_provider = self.build_simple();
let tracer = trace_provider
.tracer_builder("opentelemetry-application-insights")
.with_version(env!("CARGO_PKG_VERSION"))
.with_schema_url(semcov::SCHEMA_URL)
.build();
let tracer = trace_provider.tracer_with_scope(scope());
let _previous_provider = global::set_tracer_provider(trace_provider);
tracer
}
Expand All @@ -716,11 +715,7 @@ where
/// that.
pub fn install_batch<R: RuntimeChannel>(self, runtime: R) -> Tracer {
let trace_provider = self.build_batch(runtime);
let tracer = trace_provider
.tracer_builder("opentelemetry-application-insights")
.with_version(env!("CARGO_PKG_VERSION"))
.with_schema_url(semcov::SCHEMA_URL)
.build();
let tracer = trace_provider.tracer_with_scope(scope());
let _previous_provider = global::set_tracer_provider(trace_provider);
tracer
}
Expand Down Expand Up @@ -818,6 +813,14 @@ fn append_v2_track(uri: impl ToString) -> Result<http::Uri, http::uri::InvalidUr
uploader::append_path(uri, "v2/track")
}

#[cfg(feature = "trace")]
fn scope() -> InstrumentationScope {
InstrumentationScope::builder("opentelemetry-application-insights")
.with_version(env!("CARGO_PKG_VERSION"))
.with_schema_url(semcov::SCHEMA_URL)
.build()
}

/// Errors that occurred during span export.
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
Expand Down Expand Up @@ -867,7 +870,13 @@ pub enum Error {
QuickPulseShutdown(opentelemetry_sdk::runtime::TrySendError),
}

impl ExportError for Error {
impl SdkExportError for Error {
fn exporter_name(&self) -> &'static str {
"application-insights"
}
}

impl TraceExportError for Error {
fn exporter_name(&self) -> &'static str {
"application-insights"
}
Expand Down
11 changes: 4 additions & 7 deletions src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@ use crate::{
Exporter,
};
use async_trait::async_trait;
use opentelemetry::{
logs::{LogResult, Severity},
InstrumentationLibrary,
};
use opentelemetry::{logs::Severity, InstrumentationScope};
use opentelemetry_http::HttpClient;
use opentelemetry_sdk::{
export::logs::{LogBatch, LogExporter},
logs::LogRecord,
logs::{LogRecord, LogResult},
Resource,
};
use opentelemetry_semantic_conventions as semcov;
Expand All @@ -30,7 +27,7 @@ fn is_exception(record: &LogRecord) -> bool {
impl<C> Exporter<C> {
fn create_envelope_for_log(
&self,
(record, instrumentation_lib): (&LogRecord, &InstrumentationLibrary),
(record, instrumentation_scope): (&LogRecord, &InstrumentationScope),
) -> Envelope {
let (data, name) = if is_exception(record) {
(
Expand All @@ -57,7 +54,7 @@ impl<C> Exporter<C> {
i_key: Some(self.instrumentation_key.clone().into()),
tags: Some(get_tags_for_log(
record,
instrumentation_lib,
instrumentation_scope,
&self.resource,
)),
data: Some(data),
Expand Down
59 changes: 23 additions & 36 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,22 @@ use crate::{
Exporter,
};
use async_trait::async_trait;
use opentelemetry::{
global,
metrics::{MetricsError, Result as MetricsResult},
KeyValue,
};
use opentelemetry::{otel_warn, KeyValue};
use opentelemetry_http::HttpClient;
use opentelemetry_sdk::metrics::{
data::{ExponentialHistogram, Gauge, Histogram, Metric, ResourceMetrics, Sum, Temporality},
exporter::PushMetricsExporter,
reader::TemporalitySelector,
InstrumentKind,
data::{ExponentialHistogram, Gauge, Histogram, Metric, ResourceMetrics, Sum},
exporter::PushMetricExporter,
MetricResult, Temporality,
};
use std::{convert::TryInto, sync::Arc, time::SystemTime};

#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
impl<C> TemporalitySelector for Exporter<C>
where
C: Send + Sync,
{
fn temporality(&self, kind: InstrumentKind) -> Temporality {
// See https://github.com/frigus02/opentelemetry-application-insights/issues/74#issuecomment-2108488385
match kind {
InstrumentKind::Counter
| InstrumentKind::Histogram
| InstrumentKind::ObservableCounter
| InstrumentKind::Gauge
| InstrumentKind::ObservableGauge => Temporality::Delta,
InstrumentKind::UpDownCounter | InstrumentKind::ObservableUpDownCounter => {
Temporality::Cumulative
}
}
}
}

#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
#[async_trait]
impl<C> PushMetricsExporter for Exporter<C>
impl<C> PushMetricExporter for Exporter<C>
where
C: HttpClient + 'static,
{
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricsResult<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
let client = Arc::clone(&self.client);
let endpoint = Arc::clone(&self.endpoint);

Expand All @@ -62,8 +37,7 @@ where
.chain(
scope_metrics
.scope
.attributes
.iter()
.attributes()
.map(|kv| (&kv.key, &kv.value)),
)
.chain(data.attrs.iter().map(|kv| (&kv.key, &kv.value)))
Expand All @@ -89,13 +63,26 @@ where
Ok(())
}

async fn force_flush(&self) -> MetricsResult<()> {
async fn force_flush(&self) -> MetricResult<()> {
Ok(())
}

fn shutdown(&self) -> MetricsResult<()> {
fn shutdown(&self) -> MetricResult<()> {
Ok(())
}

fn temporality(&self) -> Temporality {
// Application Insights only supports Delta temporality as defined in the spec:
//
// > Choose Delta aggregation temporality for Counter, Asynchronous Counter and Histogram
// > instrument kinds, choose Cumulative aggregation for UpDownCounter and Asynchronous
// > UpDownCounter instrument kinds.
//
// See:
// - https://github.com/open-telemetry/opentelemetry-specification/blob/58bfe48eabe887545198d66c43f44071b822373f/specification/metrics/sdk_exporters/otlp.md?plain=1#L46-L47
// - https://github.com/frigus02/opentelemetry-application-insights/issues/74#issuecomment-2108488385
Temporality::Delta
}
}

struct EnvelopeData {
Expand Down Expand Up @@ -153,7 +140,7 @@ fn map_metric(metric: &Metric) -> Vec<EnvelopeData> {
} else if let Some(sum) = data.downcast_ref::<Sum<f64>>() {
map_sum(metric, sum)
} else {
global::handle_error(MetricsError::Other("unknown aggregator".into()));
otel_warn!(name: "ApplicationInsights.ExportMetrics.UnknownAggregator");
Vec::new()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/quick_pulse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl<R: RuntimeChannel> SpanProcessor for QuickPulseManager<R> {
impl<R: RuntimeChannel> Drop for QuickPulseManager<R> {
fn drop(&mut self) {
if let Err(err) = self.shutdown() {
opentelemetry::global::handle_error(err);
opentelemetry::otel_warn!(name: "ApplicationInsights.LiveMetrics.ShutdownFailed", error = err);
}
}
}
Expand Down
19 changes: 9 additions & 10 deletions src/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use opentelemetry::trace::{SpanId, SpanKind};
#[cfg(feature = "metrics")]
use opentelemetry::KeyValue;
use opentelemetry::{InstrumentationLibrary, Key};
use opentelemetry::{InstrumentationScope, Key};
#[cfg(feature = "trace")]
use opentelemetry_sdk::export::trace::SpanData;
#[cfg(feature = "logs")]
Expand All @@ -18,7 +18,7 @@ use std::collections::HashMap;
#[cfg(feature = "trace")]
pub(crate) fn get_tags_for_span(span: &SpanData, resource: &Resource) -> Tags {
let mut tags = Tags::new();
build_tags_from_resource_attrs(&mut tags, resource, &span.instrumentation_lib);
build_tags_from_resource_attrs(&mut tags, resource, &span.instrumentation_scope);

let attrs_map = build_tags_from_attrs(
&mut tags,
Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) fn get_tags_for_span(span: &SpanData, resource: &Resource) -> Tags {
#[cfg(feature = "trace")]
pub(crate) fn get_tags_for_event(span: &SpanData, resource: &Resource) -> Tags {
let mut tags = Tags::new();
build_tags_from_resource_attrs(&mut tags, resource, &span.instrumentation_lib);
build_tags_from_resource_attrs(&mut tags, resource, &span.instrumentation_scope);

tags.insert(tags::OPERATION_ID, span.span_context.trace_id().to_string());
tags.insert(
Expand All @@ -80,7 +80,7 @@ pub(crate) fn get_tags_for_event(span: &SpanData, resource: &Resource) -> Tags {
#[cfg(feature = "metrics")]
pub(crate) fn get_tags_for_metric(
resource: &Resource,
scope: &InstrumentationLibrary,
scope: &InstrumentationScope,
attrs: &[KeyValue],
) -> Tags {
let mut tags = Tags::new();
Expand All @@ -97,11 +97,11 @@ pub(crate) fn get_tags_for_metric(
#[cfg(feature = "logs")]
pub(crate) fn get_tags_for_log(
record: &LogRecord,
instrumentation_lib: &InstrumentationLibrary,
instrumentation_scope: &InstrumentationScope,
resource: &Resource,
) -> Tags {
let mut tags = Tags::new();
build_tags_from_resource_attrs(&mut tags, resource, instrumentation_lib);
build_tags_from_resource_attrs(&mut tags, resource, instrumentation_scope);

build_tags_from_attrs(
&mut tags,
Expand Down Expand Up @@ -151,15 +151,14 @@ where
fn build_tags_from_resource_attrs(
tags: &mut Tags,
resource: &Resource,
instrumentation_lib: &InstrumentationLibrary,
instrumentation_scope: &InstrumentationScope,
) {
let attrs = resource
.iter()
.map(|(k, v)| (k, v as &dyn AttrValue))
.chain(
instrumentation_lib
.attributes
.iter()
instrumentation_scope
.attributes()
.map(|kv| (&kv.key, &kv.value as &dyn AttrValue)),
);
let attrs_map = build_tags_from_attrs(tags, attrs);
Expand Down
Loading

0 comments on commit 4921c34

Please sign in to comment.