From 9b222f2af143059c82326cfd926b5fb58db016c6 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Mon, 20 Mar 2023 13:29:21 +0800 Subject: [PATCH 1/6] Add `ForceFlush` for all `LogRecordExporter`s and `SpanExporter`s. (#2000) --- CHANGELOG.md | 8 +++ examples/otlp/grpc_log_main.cc | 21 ++++++ examples/otlp/grpc_main.cc | 12 ++++ examples/otlp/http_log_main.cc | 53 ++++++++++++-- examples/otlp/http_main.cc | 12 ++++ .../elasticsearch/es_log_record_exporter.h | 25 +++++++ .../src/es_log_record_exporter.cc | 70 +++++++++++++++++-- exporters/etw/CMakeLists.txt | 8 ++- exporters/jaeger/BUILD | 1 + exporters/jaeger/CMakeLists.txt | 3 +- .../exporters/jaeger/jaeger_exporter.h | 8 +++ exporters/jaeger/src/jaeger_exporter.cc | 5 ++ .../exporters/ostream/log_record_exporter.h | 8 +++ .../exporters/ostream/span_exporter.h | 8 +++ exporters/ostream/src/log_record_exporter.cc | 6 ++ exporters/ostream/src/span_exporter.cc | 6 ++ .../exporters/otlp/otlp_grpc_exporter.h | 8 +++ .../otlp/otlp_grpc_log_record_exporter.h | 8 +++ .../exporters/otlp/otlp_http_exporter.h | 8 +++ .../otlp/otlp_http_log_record_exporter.h | 8 +++ exporters/otlp/src/otlp_grpc_exporter.cc | 7 ++ .../otlp/src/otlp_grpc_log_record_exporter.cc | 7 ++ exporters/otlp/src/otlp_http_client.cc | 50 +++++++------ exporters/otlp/src/otlp_http_exporter.cc | 5 ++ .../otlp/src/otlp_http_log_record_exporter.cc | 5 ++ .../exporters/zipkin/zipkin_exporter.h | 8 +++ exporters/zipkin/src/zipkin_exporter.cc | 5 ++ .../sdk/logs/batch_log_record_processor.h | 2 + sdk/include/opentelemetry/sdk/logs/exporter.h | 11 ++- .../sdk/trace/batch_span_processor.h | 2 + .../opentelemetry/sdk/trace/exporter.h | 12 +++- .../sdk/trace/simple_processor.h | 11 ++- .../opentelemetry/sdk/trace/tracer_context.h | 2 +- sdk/src/logs/CMakeLists.txt | 1 + sdk/src/logs/batch_log_record_processor.cc | 44 +++++++----- sdk/src/logs/exporter.cc | 27 +++++++ sdk/src/logs/simple_log_record_processor.cc | 6 +- sdk/src/trace/CMakeLists.txt | 1 + sdk/src/trace/batch_span_processor.cc | 44 +++++++----- sdk/src/trace/exporter.cc | 23 ++++++ sdk/src/trace/tracer.cc | 14 +++- sdk/src/trace/tracer_context.cc | 4 +- .../logs/batch_log_record_processor_test.cc | 35 +++++++--- .../logs/simple_log_record_processor_test.cc | 62 +++++++++++++--- sdk/test/trace/batch_span_processor_test.cc | 57 ++++++++++----- sdk/test/trace/simple_processor_test.cc | 65 ++++++++++++++++- 46 files changed, 680 insertions(+), 116 deletions(-) create mode 100644 sdk/src/logs/exporter.cc create mode 100644 sdk/src/trace/exporter.cc diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e47f770e6..0d8e3c7c47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ Increment the: [#2060](https://github.com/open-telemetry/opentelemetry-cpp/pull/2060) * [BUILD] Restore detfault value of `OPENTELEMETRY_INSTALL` to `ON` when it's on top level.[#2062](https://github.com/open-telemetry/opentelemetry-cpp/pull/2062) +* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter` + [#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000) Important changes: @@ -44,6 +46,12 @@ Important changes: * As a result, a behavior change for GRPC SSL is possible, because the endpoint scheme now takes precedence. Please verify configuration settings for the GRPC endpoint. +* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter` + [#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000) + * `LogRecordExporter` and `SpanExporter` add a new virtual function + `ForceFlush`, and if users implement any customized `LogRecordExporter` and + `SpanExporter`, they should also implement this function.There should be no + influence if users only use factory to create exporters. ## [1.8.3] 2023-03-06 diff --git a/examples/otlp/grpc_log_main.cc b/examples/otlp/grpc_log_main.cc index d6d9669a35..e0c85c38c2 100644 --- a/examples/otlp/grpc_log_main.cc +++ b/examples/otlp/grpc_log_main.cc @@ -11,6 +11,11 @@ # include "opentelemetry/sdk/trace/tracer_provider_factory.h" # include "opentelemetry/trace/provider.h" +// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel +// running exportings when destroy and shutdown exporters.It's optional to users. +# include "opentelemetry/sdk/logs/logger_provider.h" +# include "opentelemetry/sdk/trace/tracer_provider.h" + # include # ifdef BAZEL_BUILD @@ -42,6 +47,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } @@ -59,6 +72,14 @@ void InitLogger() void CleanupLogger() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + logs::Provider::GetLoggerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + nostd::shared_ptr none; opentelemetry::logs::Provider::SetLoggerProvider(none); } diff --git a/examples/otlp/grpc_main.cc b/examples/otlp/grpc_main.cc index 679f0f0fd4..d7e3b29d6c 100644 --- a/examples/otlp/grpc_main.cc +++ b/examples/otlp/grpc_main.cc @@ -6,6 +6,10 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" +// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when +// destroy and shutdown exporters.It's optional to users. +#include "opentelemetry/sdk/trace/tracer_provider.h" + #ifdef BAZEL_BUILD # include "examples/common/foo_library/foo_library.h" #else @@ -32,6 +36,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } diff --git a/examples/otlp/http_log_main.cc b/examples/otlp/http_log_main.cc index faddae58ad..a1171b04e0 100644 --- a/examples/otlp/http_log_main.cc +++ b/examples/otlp/http_log_main.cc @@ -13,6 +13,12 @@ # include "opentelemetry/sdk/trace/tracer_provider_factory.h" # include "opentelemetry/trace/provider.h" +// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel +// running exportings when destroy and shutdown exporters.It's optional to users. +# include "opentelemetry/sdk/logs/logger_provider.h" +# include "opentelemetry/sdk/trace/tracer_provider.h" + +# include # include # ifdef BAZEL_BUILD @@ -32,11 +38,27 @@ namespace internal_log = opentelemetry::sdk::common::internal_log; namespace { -opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts; +opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts; void InitTracer() { + if (trace_opts.url.size() > 9) + { + if (trace_opts.url.substr(trace_opts.url.size() - 8) == "/v1/logs") + { + trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 8) + "/v1/traces"; + } + else if (trace_opts.url.substr(trace_opts.url.size() - 9) == "/v1/logs/") + { + trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 9) + "/v1/traces"; + } + else + { + trace_opts.url = opentelemetry::exporter::otlp::GetOtlpDefaultHttpTracesEndpoint(); + } + } + std::cout << "Using " << trace_opts.url << " to export trace spans." << std::endl; // Create OTLP exporter instance - auto exporter = otlp::OtlpHttpExporterFactory::Create(opts); + auto exporter = otlp::OtlpHttpExporterFactory::Create(trace_opts); auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter)); std::shared_ptr provider = trace_sdk::TracerProviderFactory::Create(std::move(processor)); @@ -46,6 +68,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } @@ -53,6 +83,7 @@ void CleanupTracer() opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts; void InitLogger() { + std::cout << "Using " << logger_opts.url << " to export log records." << std::endl; logger_opts.console_debug = true; // Create OTLP exporter instance auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts); @@ -65,6 +96,14 @@ void InitLogger() void CleanupLogger() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + logs::Provider::GetLoggerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; opentelemetry::logs::Provider::SetLoggerProvider(none); } @@ -83,12 +122,12 @@ int main(int argc, char *argv[]) { if (argc > 1) { - opts.url = argv[1]; + trace_opts.url = argv[1]; logger_opts.url = argv[1]; if (argc > 2) { - std::string debug = argv[2]; - opts.console_debug = debug != "" && debug != "0" && debug != "no"; + std::string debug = argv[2]; + trace_opts.console_debug = debug != "" && debug != "0" && debug != "no"; } if (argc > 3) @@ -96,13 +135,13 @@ int main(int argc, char *argv[]) std::string binary_mode = argv[3]; if (binary_mode.size() >= 3 && binary_mode.substr(0, 3) == "bin") { - opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary; + trace_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary; logger_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary; } } } - if (opts.console_debug) + if (trace_opts.console_debug) { internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Debug); } diff --git a/examples/otlp/http_main.cc b/examples/otlp/http_main.cc index 3eb482c6a4..09b8daeee2 100644 --- a/examples/otlp/http_main.cc +++ b/examples/otlp/http_main.cc @@ -8,6 +8,10 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" +// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when +// destroy and shutdown exporters.It's optional to users. +#include "opentelemetry/sdk/trace/tracer_provider.h" + #include #ifdef BAZEL_BUILD @@ -38,6 +42,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h index 7a52810df0..06b4c3b727 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h @@ -7,12 +7,17 @@ # include "nlohmann/json.hpp" # include "opentelemetry/common/spin_lock_mutex.h" # include "opentelemetry/ext/http/client/http_client_factory.h" +# include "opentelemetry/nostd/shared_ptr.h" # include "opentelemetry/nostd/type_traits.h" # include "opentelemetry/sdk/logs/exporter.h" # include "opentelemetry/sdk/logs/recordable.h" # include +# include +# include +# include # include +# include OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -90,6 +95,14 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo const opentelemetry::nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return @@ -108,6 +121,18 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo std::shared_ptr http_client_; mutable opentelemetry::common::SpinLockMutex lock_; bool isShutdown() const noexcept; + +# ifdef ENABLE_ASYNC_EXPORT + struct SynchronizationData + { + std::atomic session_counter_; + std::atomic finished_session_counter_; + std::condition_variable force_flush_cv; + std::mutex force_flush_cv_m; + std::recursive_mutex force_flush_m; + }; + nostd::shared_ptr synchronization_data_; +# endif }; } // namespace logs } // namespace exporter diff --git a/exporters/elasticsearch/src/es_log_record_exporter.cc b/exporters/elasticsearch/src/es_log_record_exporter.cc index c440be685c..60a5d8d781 100644 --- a/exporters/elasticsearch/src/es_log_record_exporter.cc +++ b/exporters/elasticsearch/src/es_log_record_exporter.cc @@ -290,9 +290,19 @@ class AsyncResponseHandler : public http_client::EventHandler # endif ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter() - : options_{ElasticsearchExporterOptions()}, - http_client_{ext::http::client::HttpClientFactory::Create()} -{} + : options_{ElasticsearchExporterOptions()}, http_client_ +{ + ext::http::client::HttpClientFactory::Create() +} +# ifdef ENABLE_ASYNC_EXPORT +, synchronization_data_(new SynchronizationData()) +# endif +{ +# ifdef ENABLE_ASYNC_EXPORT + synchronization_data_->finished_session_counter_.store(0); + synchronization_data_->session_counter_.store(0); +# endif +} ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter( const ElasticsearchExporterOptions &options) @@ -343,10 +353,12 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export( # ifdef ENABLE_ASYNC_EXPORT // Send the request - std::size_t span_count = records.size(); - auto handler = std::make_shared( + synchronization_data_->session_counter_.fetch_add(1, std::memory_order_release); + std::size_t span_count = records.size(); + auto synchronization_data = synchronization_data_; + auto handler = std::make_shared( session, - [span_count](opentelemetry::sdk::common::ExportResult result) { + [span_count, synchronization_data](opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] ERROR: Export " @@ -358,6 +370,9 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export( OTEL_INTERNAL_LOG_DEBUG("[ES Log Exporter] Export " << span_count << " trace span(s) success"); } + + synchronization_data->finished_session_counter_.fetch_add(1, std::memory_order_release); + synchronization_data->force_flush_cv.notify_all(); return true; }, options_.console_debug_); @@ -401,6 +416,49 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export( # endif } +bool ElasticsearchLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +{ +# ifdef ENABLE_ASYNC_EXPORT + std::lock_guard lock_guard{synchronization_data_->force_flush_m}; + std::size_t running_counter = + synchronization_data_->session_counter_.load(std::memory_order_acquire); + // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented + // in type 'long int' here. So we reset timeout to meet signed long int limit here. + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); + // Wait for all the sessions to finish + while (timeout_steady > std::chrono::steady_clock::duration::zero()) + { + if (synchronization_data_->finished_session_counter_.load(std::memory_order_acquire) >= + running_counter) + { + break; + } + + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + if (std::cv_status::no_timeout != synchronization_data_->force_flush_cv.wait_for( + lk_cv, std::chrono::seconds{options_.response_timeout_})) + { + break; + } + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } + + return timeout_steady > std::chrono::steady_clock::duration::zero(); +# else + return true; +# endif +} + bool ElasticsearchLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/etw/CMakeLists.txt b/exporters/etw/CMakeLists.txt index 15ab6e8ebb..b74a1b2cb5 100644 --- a/exporters/etw/CMakeLists.txt +++ b/exporters/etw/CMakeLists.txt @@ -11,8 +11,12 @@ target_include_directories( set_target_properties(opentelemetry_exporter_etw PROPERTIES EXPORT_NAME etw_exporter) -target_link_libraries(opentelemetry_exporter_etw - INTERFACE opentelemetry_api nlohmann_json::nlohmann_json) +target_link_libraries( + opentelemetry_exporter_etw INTERFACE opentelemetry_api opentelemetry_trace + nlohmann_json::nlohmann_json) +if(WITH_LOGS_PREVIEW) + target_link_libraries(opentelemetry_exporter_etw INTERFACE opentelemetry_logs) +endif() if(nlohmann_json_clone) add_dependencies(opentelemetry_exporter_etw nlohmann_json::nlohmann_json) endif() diff --git a/exporters/jaeger/BUILD b/exporters/jaeger/BUILD index c042535561..6f0e030b4d 100644 --- a/exporters/jaeger/BUILD +++ b/exporters/jaeger/BUILD @@ -189,6 +189,7 @@ cc_library( deps = [ ":jaeger_exporter", "//sdk/src/common:global_log_handler", + "//sdk/src/trace", ], ) diff --git a/exporters/jaeger/CMakeLists.txt b/exporters/jaeger/CMakeLists.txt index 62d2961324..a02fa7c3e5 100644 --- a/exporters/jaeger/CMakeLists.txt +++ b/exporters/jaeger/CMakeLists.txt @@ -43,7 +43,8 @@ target_include_directories( target_link_libraries( opentelemetry_exporter_jaeger_trace - PUBLIC opentelemetry_resources opentelemetry_http_client_curl + PUBLIC opentelemetry_resources opentelemetry_trace + opentelemetry_http_client_curl PRIVATE thrift::thrift) if(MSVC) diff --git a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h index b8571199a1..eac6dc6940 100644 --- a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h +++ b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h @@ -47,6 +47,14 @@ class OPENTELEMETRY_DEPRECATED JaegerExporter final : public opentelemetry::sdk: const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown the exporter. * @param timeout an option timeout, default to max. diff --git a/exporters/jaeger/src/jaeger_exporter.cc b/exporters/jaeger/src/jaeger_exporter.cc index 8e9e75bd73..7f413caf30 100644 --- a/exporters/jaeger/src/jaeger_exporter.cc +++ b/exporters/jaeger/src/jaeger_exporter.cc @@ -98,6 +98,11 @@ void JaegerExporter::InitializeEndpoint() assert(false); } +bool JaegerExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + return true; +} + bool JaegerExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h index f5e400946f..34806bd200 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h @@ -39,6 +39,14 @@ class OStreamLogRecordExporter final : public opentelemetry::sdk::logs::LogRecor const opentelemetry::nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Marks the OStream Log Exporter as shut down. */ diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h index 985e05715a..baa4e860b1 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h @@ -38,6 +38,14 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter const opentelemetry::nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; diff --git a/exporters/ostream/src/log_record_exporter.cc b/exporters/ostream/src/log_record_exporter.cc index 0b1093ab9d..751de71e2c 100644 --- a/exporters/ostream/src/log_record_exporter.cc +++ b/exporters/ostream/src/log_record_exporter.cc @@ -113,6 +113,12 @@ sdk::common::ExportResult OStreamLogRecordExporter::Export( return sdk::common::ExportResult::kSuccess; } +bool OStreamLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + sout_.flush(); + return true; +} + bool OStreamLogRecordExporter::Shutdown(std::chrono::microseconds) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/ostream/src/span_exporter.cc b/exporters/ostream/src/span_exporter.cc index 79be26b311..6a88c32235 100644 --- a/exporters/ostream/src/span_exporter.cc +++ b/exporters/ostream/src/span_exporter.cc @@ -98,6 +98,12 @@ sdk::common::ExportResult OStreamSpanExporter::Export( return sdk::common::ExportResult::kSuccess; } +bool OStreamSpanExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + sout_.flush(); + return true; +} + bool OStreamSpanExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index a28e6fca85..ab64d7e769 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -52,6 +52,14 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter sdk::common::ExportResult Export( const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h index 05224550a3..becb308ca2 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h @@ -55,6 +55,14 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo const nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 673e0d0bb3..9188858fc1 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -53,6 +53,14 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h index 2250cfe831..035b046b72 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h @@ -53,6 +53,14 @@ class OtlpHttpLogRecordExporter final : public opentelemetry::sdk::logs::LogReco const nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 5f4cbc99e2..89ad1fb2cf 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -72,6 +72,13 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } +bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the + // running exporting finished here. + return true; +} + bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index e8a3b8a46a..1873ac8ad7 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -91,6 +91,13 @@ bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */ return true; } +bool OtlpGrpcLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the + // running exporting finished here. + return true; +} + bool OtlpGrpcLogRecordExporter::isShutdown() const noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 58cff24c1e..24beafa194 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -793,34 +793,40 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept // Wait for all the sessions to finish std::unique_lock lock(session_waker_lock_); - if (timeout <= std::chrono::microseconds::zero()) + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + while (timeout_steady > std::chrono::steady_clock::duration::zero()) { - while (true) { + std::lock_guard guard{session_manager_lock_}; + if (running_sessions_.empty()) { - std::lock_guard guard{session_manager_lock_}; - if (running_sessions_.empty()) - { - break; - } - } - // When changes of running_sessions_ and notify_one/notify_all happen between predicate - // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon - // as possible to call FinishSession() and cleanup resources. - if (std::cv_status::timeout == session_waker_.wait_for(lock, options_.timeout)) - { - cleanupGCSessions(); + break; } } - return true; - } - else - { - return session_waker_.wait_for(lock, timeout, [this] { - std::lock_guard guard{session_manager_lock_}; - return running_sessions_.empty(); - }); + // When changes of running_sessions_ and notify_one/notify_all happen between predicate + // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon + // as possible to call FinishSession() and cleanup resources. + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + if (std::cv_status::timeout == session_waker_.wait_for(lock, options_.timeout)) + { + cleanupGCSessions(); + } + else + { + break; + } + + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } + + return timeout_steady > std::chrono::steady_clock::duration::zero(); } bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 78b587e766..852fee7e1f 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -115,6 +115,11 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( #endif } +bool OtlpHttpExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return http_client_->ForceFlush(timeout); +} + bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index 81bc37a340..98925773c5 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -118,6 +118,11 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( # endif } +bool OtlpHttpLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return http_client_->ForceFlush(timeout); +} + bool OtlpHttpLogRecordExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h index cb2d7b6954..8c66a251a4 100644 --- a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h +++ b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h @@ -49,6 +49,14 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, default to max. diff --git a/exporters/zipkin/src/zipkin_exporter.cc b/exporters/zipkin/src/zipkin_exporter.cc index 802ecccffc..b3f6fcfe61 100644 --- a/exporters/zipkin/src/zipkin_exporter.cc +++ b/exporters/zipkin/src/zipkin_exporter.cc @@ -110,6 +110,11 @@ void ZipkinExporter::InitializeLocalEndpoint() local_end_point_["port"] = url_parser_.port_; } +bool ZipkinExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + return true; +} + bool ZipkinExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h index 0621316dcd..d13e8782f1 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h @@ -117,6 +117,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor std::atomic is_force_wakeup_background_worker; std::atomic is_force_flush_pending; std::atomic is_force_flush_notified; + std::atomic force_flush_timeout_us; std::atomic is_shutdown; }; @@ -128,6 +129,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor * @param synchronization_data Synchronization data to be notified. */ static void NotifyCompletion(bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); void GetWaitAdjustedTime(std::chrono::microseconds &timeout, diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index a531577aef..069fd8784c 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -20,10 +20,11 @@ namespace logs /** * LogRecordExporter defines the interface that log exporters must implement. */ -class LogRecordExporter +class OPENTELEMETRY_EXPORT LogRecordExporter { public: - virtual ~LogRecordExporter() = default; + LogRecordExporter(); + virtual ~LogRecordExporter(); /** * Create a log recordable. This object will be used to record log data and @@ -47,6 +48,12 @@ class LogRecordExporter virtual sdk::common::ExportResult Export( const nostd::span> &records) noexcept = 0; + /** + * Force flush the log records pushed into this log exporter. + */ + virtual bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; + /** * Marks the exporter as ShutDown and cleans up any resources as required. * Shutdown should be called only once for each Exporter instance. diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index 309d849bc7..60e93d8a9f 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -115,6 +115,7 @@ class BatchSpanProcessor : public SpanProcessor std::atomic is_force_wakeup_background_worker; std::atomic is_force_flush_pending; std::atomic is_force_flush_notified; + std::atomic force_flush_timeout_us; std::atomic is_shutdown; }; @@ -126,6 +127,7 @@ class BatchSpanProcessor : public SpanProcessor * @param synchronization_data Synchronization data to be notified. */ static void NotifyCompletion(bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); void GetWaitAdjustedTime(std::chrono::microseconds &timeout, diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h index 3f9bce9b84..ffacca2b35 100644 --- a/sdk/include/opentelemetry/sdk/trace/exporter.h +++ b/sdk/include/opentelemetry/sdk/trace/exporter.h @@ -20,7 +20,8 @@ namespace trace class OPENTELEMETRY_EXPORT SpanExporter { public: - virtual ~SpanExporter() = default; + SpanExporter(); + virtual ~SpanExporter(); /** * Create a span recordable. This object will be used to record span data and @@ -42,6 +43,15 @@ class OPENTELEMETRY_EXPORT SpanExporter const nostd::span> &spans) noexcept = 0; + /** + * Export all spans that have been exported. + * @param timeout an optional timeout, the default timeout of 0 means that no + * timeout is applied. + * @return return true when all data are exported, and false when timeout + */ + virtual bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; + /** * Shut down the exporter. * @param timeout an optional timeout. diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h index 6796a10848..671218d0cf 100644 --- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h @@ -55,7 +55,16 @@ class SimpleSpanProcessor : public SpanProcessor } } - bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return true; } + bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override + { + if (exporter_ != nullptr) + { + return exporter_->ForceFlush(timeout); + } + + return true; + } bool Shutdown( std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override diff --git a/sdk/include/opentelemetry/sdk/trace/tracer_context.h b/sdk/include/opentelemetry/sdk/trace/tracer_context.h index 573af20b7d..6dcdda8e01 100644 --- a/sdk/include/opentelemetry/sdk/trace/tracer_context.h +++ b/sdk/include/opentelemetry/sdk/trace/tracer_context.h @@ -87,7 +87,7 @@ class TracerContext /** * Shutdown the span processor associated with this tracer provider. */ - bool Shutdown() noexcept; + bool Shutdown(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; private: // order of declaration is important here - resource object should be destroyed after processor. diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index cbaea426e6..2b7db6eb7c 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -6,6 +6,7 @@ add_library( logger_provider.cc logger_provider_factory.cc logger.cc + exporter.cc event_logger_provider.cc event_logger_provider_factory.cc event_logger.cc diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index 7903c3b24c..6b1db1c666 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -46,6 +46,7 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptris_force_wakeup_background_worker.store(false); synchronization_data_->is_force_flush_pending.store(false); synchronization_data_->is_force_flush_notified.store(false); + synchronization_data_->force_flush_timeout_us.store(0); synchronization_data_->is_shutdown.store(false); } @@ -88,6 +89,7 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); auto break_condition = [this]() { if (synchronization_data_->is_shutdown.load() == true) { @@ -106,23 +108,24 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex // Fix timeout to meet requirement of wait_for timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( timeout, std::chrono::microseconds::zero()); - bool result; - if (timeout <= std::chrono::microseconds::zero()) + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) { - bool wait_result = false; - while (!wait_result) - { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever - wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_, - break_condition); - } - result = true; + timeout_steady = std::chrono::steady_clock::duration::max(); } - else + + bool result = false; + while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition); + // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called + // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait + // for ever + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_, + break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } // If it's already signaled, we must wait util notified. @@ -202,7 +205,7 @@ void BatchLogRecordProcessor::Export() if (num_records_to_export == 0) { - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); break; } @@ -218,12 +221,13 @@ void BatchLogRecordProcessor::Export() exporter_->Export( nostd::span>(records_arr.data(), records_arr.size())); - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); } while (true); } void BatchLogRecordProcessor::NotifyCompletion( bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { if (!synchronization_data) @@ -233,6 +237,14 @@ void BatchLogRecordProcessor::NotifyCompletion( if (notify_force_flush) { + if (exporter) + { + std::chrono::microseconds timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + std::chrono::microseconds{ + synchronization_data->force_flush_timeout_us.load(std::memory_order_acquire)}, + std::chrono::microseconds::zero()); + exporter->ForceFlush(timeout); + } synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); synchronization_data->force_flush_cv.notify_one(); } diff --git a/sdk/src/logs/exporter.cc b/sdk/src/logs/exporter.cc new file mode 100644 index 0000000000..20fdb6b25c --- /dev/null +++ b/sdk/src/logs/exporter.cc @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifdef ENABLE_LOGS_PREVIEW + +# include "opentelemetry/sdk/logs/exporter.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ + +OPENTELEMETRY_EXPORT LogRecordExporter::LogRecordExporter() {} + +OPENTELEMETRY_EXPORT LogRecordExporter::~LogRecordExporter() {} + +OPENTELEMETRY_EXPORT bool LogRecordExporter::ForceFlush( + std::chrono::microseconds /*timeout*/) noexcept +{ + return true; +} + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/logs/simple_log_record_processor.cc b/sdk/src/logs/simple_log_record_processor.cc index ef923bc590..50d4c20fdd 100644 --- a/sdk/src/logs/simple_log_record_processor.cc +++ b/sdk/src/logs/simple_log_record_processor.cc @@ -43,8 +43,12 @@ void SimpleLogRecordProcessor::OnEmit(std::unique_ptr &&record) noex /** * The simple processor does not have any log records to flush so this method is not used */ -bool SimpleLogRecordProcessor::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool SimpleLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept { + if (exporter_ != nullptr) + { + return exporter_->ForceFlush(timeout); + } return true; } diff --git a/sdk/src/trace/CMakeLists.txt b/sdk/src/trace/CMakeLists.txt index 7f757f036c..94c2059548 100644 --- a/sdk/src/trace/CMakeLists.txt +++ b/sdk/src/trace/CMakeLists.txt @@ -9,6 +9,7 @@ add_library( tracer_provider_factory.cc tracer.cc span.cc + exporter.cc batch_span_processor.cc batch_span_processor_factory.cc simple_processor_factory.cc diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index de6c457da7..da9f39f054 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -30,6 +30,7 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, synchronization_data_->is_force_wakeup_background_worker.store(false); synchronization_data_->is_force_flush_pending.store(false); synchronization_data_->is_force_flush_notified.store(false); + synchronization_data_->force_flush_timeout_us.store(0); synchronization_data_->is_shutdown.store(false); } @@ -77,6 +78,7 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); auto break_condition = [this]() { if (synchronization_data_->is_shutdown.load() == true) { @@ -97,23 +99,23 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept // Fix timeout to meet requirement of wait_for timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( timeout, std::chrono::microseconds::zero()); - bool result; - if (timeout <= std::chrono::microseconds::zero()) + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) { - bool wait_result = false; - while (!wait_result) - { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever - wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, - break_condition); - } - result = true; + timeout_steady = std::chrono::steady_clock::duration::max(); } - else + + bool result = false; + while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition); + // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called + // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait + // for ever + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, + break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } // If it will be already signaled, we must wait util notified. @@ -192,7 +194,7 @@ void BatchSpanProcessor::Export() if (num_records_to_export == 0) { - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); break; } buffer_.Consume(num_records_to_export, @@ -206,12 +208,13 @@ void BatchSpanProcessor::Export() }); exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); } while (true); } void BatchSpanProcessor::NotifyCompletion( bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { if (!synchronization_data) @@ -221,6 +224,15 @@ void BatchSpanProcessor::NotifyCompletion( if (notify_force_flush) { + if (exporter) + { + std::chrono::microseconds timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + std::chrono::microseconds{ + synchronization_data->force_flush_timeout_us.load(std::memory_order_acquire)}, + std::chrono::microseconds::zero()); + exporter->ForceFlush(timeout); + } + synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); synchronization_data->force_flush_cv.notify_one(); } diff --git a/sdk/src/trace/exporter.cc b/sdk/src/trace/exporter.cc new file mode 100644 index 0000000000..5984583ba5 --- /dev/null +++ b/sdk/src/trace/exporter.cc @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/sdk/trace/exporter.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace trace +{ + +OPENTELEMETRY_EXPORT SpanExporter::SpanExporter() {} + +OPENTELEMETRY_EXPORT SpanExporter::~SpanExporter() {} + +OPENTELEMETRY_EXPORT bool SpanExporter::ForceFlush(std::chrono::microseconds /*timeout*/) noexcept +{ + return true; +} + +} // namespace trace +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/trace/tracer.cc b/sdk/src/trace/tracer.cc index 74e16bdfe0..07fd36d5b7 100644 --- a/sdk/src/trace/tracer.cc +++ b/sdk/src/trace/tracer.cc @@ -104,12 +104,22 @@ nostd::shared_ptr Tracer::StartSpan( void Tracer::ForceFlushWithMicroseconds(uint64_t timeout) noexcept { - (void)timeout; + if (context_) + { + context_->ForceFlush( + std::chrono::microseconds{static_cast(timeout)}); + } } void Tracer::CloseWithMicroseconds(uint64_t timeout) noexcept { - (void)timeout; + // Trace context is shared by many tracers.So we just call ForceFlush to flush all pending spans + // and do not shutdown it. + if (context_) + { + context_->ForceFlush( + std::chrono::microseconds{static_cast(timeout)}); + } } } // namespace trace } // namespace sdk diff --git a/sdk/src/trace/tracer_context.cc b/sdk/src/trace/tracer_context.cc index d204218021..0b89c5cefb 100644 --- a/sdk/src/trace/tracer_context.cc +++ b/sdk/src/trace/tracer_context.cc @@ -53,9 +53,9 @@ bool TracerContext::ForceFlush(std::chrono::microseconds timeout) noexcept return processor_->ForceFlush(timeout); } -bool TracerContext::Shutdown() noexcept +bool TracerContext::Shutdown(std::chrono::microseconds timeout) noexcept { - return processor_->Shutdown(); + return processor_->Shutdown(timeout); } } // namespace trace diff --git a/sdk/test/logs/batch_log_record_processor_test.cc b/sdk/test/logs/batch_log_record_processor_test.cc index de3beb66fe..462e9b8397 100644 --- a/sdk/test/logs/batch_log_record_processor_test.cc +++ b/sdk/test/logs/batch_log_record_processor_test.cc @@ -70,10 +70,12 @@ class MockLogExporter final : public LogRecordExporter { public: MockLogExporter(std::shared_ptr>> logs_received, + std::shared_ptr> force_flush_counter, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed, const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) : logs_received_(logs_received), + force_flush_counter_(force_flush_counter), is_shutdown_(is_shutdown), is_export_completed_(is_export_completed), export_delay_(export_delay) @@ -109,6 +111,12 @@ class MockLogExporter final : public LogRecordExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + ++(*force_flush_counter_); + return true; + } + // toggles the boolean flag marking this exporter as shut down bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { @@ -118,6 +126,7 @@ class MockLogExporter final : public LogRecordExporter private: std::shared_ptr>> logs_received_; + std::shared_ptr> force_flush_counter_; std::shared_ptr> is_shutdown_; std::shared_ptr> is_export_completed_; const std::chrono::milliseconds export_delay_; @@ -134,6 +143,7 @@ class BatchLogRecordProcessorTest : public testing::Test // ::testing::Test // is_shutdown flag, and the processor configuration options (default if unspecified) std::shared_ptr GetMockProcessor( std::shared_ptr>> logs_received, + std::shared_ptr> force_flush_counter, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = std::shared_ptr>(new std::atomic(false)), @@ -143,8 +153,8 @@ class BatchLogRecordProcessorTest : public testing::Test // ::testing::Test const size_t max_export_batch_size = 512) { return std::shared_ptr(new BatchLogRecordProcessor( - std::unique_ptr( - new MockLogExporter(logs_received, is_shutdown, is_export_completed, export_delay)), + std::unique_ptr(new MockLogExporter( + logs_received, force_flush_counter, is_shutdown, is_export_completed, export_delay)), max_queue_size, scheduled_delay_millis, max_export_batch_size)); } }; @@ -154,9 +164,10 @@ TEST_F(BatchLogRecordProcessorTest, TestShutdown) // initialize a batch log processor with the test exporter std::shared_ptr>> logs_received( new std::vector>); + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); // Create a few test log records and send them to the processor const int num_logs = 3; @@ -189,11 +200,12 @@ TEST_F(BatchLogRecordProcessorTest, TestShutdown) TEST_F(BatchLogRecordProcessorTest, TestForceFlush) { + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> logs_received( new std::vector>); - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); const int num_logs = 2048; for (int i = 0; i < num_logs; ++i) @@ -204,6 +216,7 @@ TEST_F(BatchLogRecordProcessorTest, TestForceFlush) } EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GT(force_flush_counter->load(), 0); EXPECT_EQ(num_logs, logs_received->size()); for (int i = 0; i < num_logs; ++i) @@ -219,7 +232,9 @@ TEST_F(BatchLogRecordProcessorTest, TestForceFlush) batch_processor->OnEmit(std::move(log)); } + std::size_t force_flush_counter_before = force_flush_counter->load(); EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GT(force_flush_counter->load(), force_flush_counter_before); EXPECT_EQ(num_logs * 2, logs_received->size()); for (int i = 0; i < num_logs * 2; ++i) @@ -232,13 +247,14 @@ TEST_F(BatchLogRecordProcessorTest, TestManyLogsLoss) { /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> logs_received( new std::vector>); const int max_queue_size = 4096; - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); // Create max_queue_size log records for (int i = 0; i < max_queue_size; ++i) @@ -258,10 +274,11 @@ TEST_F(BatchLogRecordProcessorTest, TestManyLogsLossLess) { /* Test that no logs are lost when sending max_queue_size logs */ + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> logs_received( new std::vector>); - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); const int num_logs = 2048; @@ -286,6 +303,7 @@ TEST_F(BatchLogRecordProcessorTest, TestScheduledDelayMillis) /* Test that max_export_batch_size logs are exported every scheduled_delay_millis seconds */ + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> is_export_completed(new std::atomic(false)); std::shared_ptr>> logs_received( @@ -295,8 +313,9 @@ TEST_F(BatchLogRecordProcessorTest, TestScheduledDelayMillis) const std::chrono::milliseconds scheduled_delay_millis(2000); const size_t max_export_batch_size = 512; - auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, scheduled_delay_millis); + auto batch_processor = + GetMockProcessor(logs_received, force_flush_counter, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis); for (std::size_t i = 0; i < max_export_batch_size; ++i) { diff --git a/sdk/test/logs/simple_log_record_processor_test.cc b/sdk/test/logs/simple_log_record_processor_test.cc index 275991dc10..2d377cd924 100644 --- a/sdk/test/logs/simple_log_record_processor_test.cc +++ b/sdk/test/logs/simple_log_record_processor_test.cc @@ -69,10 +69,12 @@ class TestLogRecordable final : public opentelemetry::sdk::logs::Recordable class TestExporter final : public LogRecordExporter { public: - TestExporter(int *shutdown_counter, + TestExporter(int *force_flush_counter, + int *shutdown_counter, std::shared_ptr>> logs_received, size_t *batch_size_received) - : shutdown_counter_(shutdown_counter), + : force_flush_counter_(force_flush_counter), + shutdown_counter_(shutdown_counter), logs_received_(logs_received), batch_size_received(batch_size_received) {} @@ -99,14 +101,27 @@ class TestExporter final : public LogRecordExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + if (nullptr != force_flush_counter_) + { + ++(*force_flush_counter_); + } + return true; + } + // Increment the shutdown counter everytime this method is called bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { - *shutdown_counter_ += 1; + if (nullptr != shutdown_counter_) + { + *shutdown_counter_ += 1; + } return true; } private: + int *force_flush_counter_; int *shutdown_counter_; std::shared_ptr>> logs_received_; size_t *batch_size_received; @@ -122,7 +137,7 @@ TEST(SimpleLogRecordProcessorTest, SendReceivedLogsToExporter) size_t batch_size_received = 0; std::unique_ptr exporter( - new TestExporter(nullptr, logs_received, &batch_size_received)); + new TestExporter(nullptr, nullptr, logs_received, &batch_size_received)); SimpleLogRecordProcessor processor(std::move(exporter)); @@ -152,7 +167,8 @@ TEST(SimpleLogRecordProcessorTest, ShutdownCalledOnce) // Create a TestExporter int num_shutdowns = 0; - std::unique_ptr exporter(new TestExporter(&num_shutdowns, nullptr, nullptr)); + std::unique_ptr exporter( + new TestExporter(nullptr, &num_shutdowns, nullptr, nullptr)); // Create a processor with the previous test exporter SimpleLogRecordProcessor processor(std::move(exporter)); @@ -167,11 +183,28 @@ TEST(SimpleLogRecordProcessorTest, ShutdownCalledOnce) EXPECT_EQ(1, num_shutdowns); } +TEST(SimpleLogRecordProcessorTest, ForceFlush) +{ + // Create a TestExporter + int num_force_flush = 0; + + std::unique_ptr exporter( + new TestExporter(&num_force_flush, nullptr, nullptr, nullptr)); + + // Create a processor with the previous test exporter + SimpleLogRecordProcessor processor(std::move(exporter)); + + // The first time processor shutdown is called + EXPECT_EQ(0, num_force_flush); + EXPECT_EQ(true, processor.ForceFlush()); + EXPECT_EQ(1, num_force_flush); +} + // A test exporter that always returns failure when shut down -class FailShutDownExporter final : public LogRecordExporter +class FailShutDownForceFlushExporter final : public LogRecordExporter { public: - FailShutDownExporter() {} + FailShutDownForceFlushExporter() {} std::unique_ptr MakeRecordable() noexcept override { @@ -184,16 +217,29 @@ class FailShutDownExporter final : public LogRecordExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return false; } + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return false; } }; // Tests for when when processor should fail to shutdown TEST(SimpleLogRecordProcessorTest, ShutDownFail) { - std::unique_ptr exporter(new FailShutDownExporter()); + std::unique_ptr exporter(new FailShutDownForceFlushExporter()); SimpleLogRecordProcessor processor(std::move(exporter)); // Expect failure result when exporter fails to shutdown EXPECT_EQ(false, processor.Shutdown()); } + +// Tests for when when processor should fail to force flush +TEST(SimpleLogRecordProcessorTest, ForceFlushFail) +{ + std::unique_ptr exporter(new FailShutDownForceFlushExporter()); + SimpleLogRecordProcessor processor(std::move(exporter)); + + // Expect failure result when exporter fails to force flush + EXPECT_EQ(false, processor.ForceFlush()); +} + #endif diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index e9150d1ff1..b7a6558538 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -23,11 +23,13 @@ class MockSpanExporter final : public sdk::trace::SpanExporter public: MockSpanExporter( std::shared_ptr>> spans_received, + std::shared_ptr> shut_down_counter, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = std::shared_ptr>(new std::atomic(false)), const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept : spans_received_(spans_received), + shut_down_counter_(shut_down_counter), is_shutdown_(is_shutdown), is_export_completed_(is_export_completed), export_delay_(export_delay) @@ -60,6 +62,12 @@ class MockSpanExporter final : public sdk::trace::SpanExporter return sdk::common::ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override + { + ++(*shut_down_counter_); + return true; + } + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { *is_shutdown_ = true; @@ -70,6 +78,7 @@ class MockSpanExporter final : public sdk::trace::SpanExporter private: std::shared_ptr>> spans_received_; + std::shared_ptr> shut_down_counter_; std::shared_ptr> is_shutdown_; std::shared_ptr> is_export_completed_; // Meant exclusively to test force flush timeout @@ -104,14 +113,16 @@ class BatchSpanProcessorTestPeer : public testing::Test TEST_F(BatchSpanProcessorTestPeer, TestShutdown) { + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); const int num_spans = 3; auto test_spans = GetTestSpans(batch_processor, num_spans); @@ -136,14 +147,16 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) { + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); const int num_spans = 2048; auto test_spans = GetTestSpans(batch_processor, num_spans); @@ -157,6 +170,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) std::this_thread::sleep_for(std::chrono::milliseconds(50)); EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GE(shut_down_counter->load(), 1); EXPECT_EQ(num_spans, spans_received->size()); for (int i = 0; i < num_spans; ++i) @@ -174,7 +188,9 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) // Give some time to export the spans std::this_thread::sleep_for(std::chrono::milliseconds(50)); + auto shut_down_counter_before = shut_down_counter->load(); EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GT(shut_down_counter->load(), shut_down_counter_before); EXPECT_EQ(num_spans * 2, spans_received->size()); for (int i = 0; i < num_spans; ++i) @@ -209,16 +225,18 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss) auto log_handler = nostd::shared_ptr(new MockLogHandler()); sdk::common::internal_log::GlobalLogHandler::SetLogHandler(log_handler); + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); const int max_queue_size = 4096; - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); auto test_spans = GetTestSpans(batch_processor, max_queue_size); @@ -259,16 +277,18 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess) { /* Test that no spans are lost when sending max_queue_size spans */ + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); const int num_spans = 2048; - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); auto test_spans = GetTestSpans(batch_processor, num_spans); @@ -294,6 +314,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis) /* Test that max_export_batch_size spans are exported every schedule_delay_millis seconds */ + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> is_export_completed(new std::atomic(false)); std::shared_ptr>> spans_received( @@ -305,8 +326,8 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis) auto batch_processor = std::shared_ptr(new sdk::trace::BatchSpanProcessor( - std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown, is_export_completed, export_delay)), + std::unique_ptr(new MockSpanExporter( + spans_received, shut_down_counter, is_shutdown, is_export_completed, export_delay)), options)); auto test_spans = GetTestSpans(batch_processor, max_export_batch_size); diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc index f32485e7d0..7653bb2cfc 100644 --- a/sdk/test/trace/simple_processor_test.cc +++ b/sdk/test/trace/simple_processor_test.cc @@ -38,7 +38,9 @@ TEST(SimpleProcessor, ToInMemorySpanExporter) class RecordShutdownExporter final : public SpanExporter { public: - RecordShutdownExporter(int *shutdown_counter) : shutdown_counter_(shutdown_counter) {} + RecordShutdownExporter(int *force_flush_counter, int *shutdown_counter) + : force_flush_counter_(force_flush_counter), shutdown_counter_(shutdown_counter) + {} std::unique_ptr MakeRecordable() noexcept override { @@ -51,6 +53,12 @@ class RecordShutdownExporter final : public SpanExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + *force_flush_counter_ += 1; + return true; + } + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { *shutdown_counter_ += 1; @@ -58,17 +66,70 @@ class RecordShutdownExporter final : public SpanExporter } private: + int *force_flush_counter_; int *shutdown_counter_; }; TEST(SimpleSpanProcessor, ShutdownCalledOnce) { + int force_flush = 0; int shutdowns = 0; - RecordShutdownExporter *exporter = new RecordShutdownExporter(&shutdowns); + RecordShutdownExporter *exporter = new RecordShutdownExporter(&force_flush, &shutdowns); SimpleSpanProcessor processor(std::unique_ptr{exporter}); EXPECT_EQ(0, shutdowns); processor.Shutdown(); EXPECT_EQ(1, shutdowns); processor.Shutdown(); EXPECT_EQ(1, shutdowns); + + EXPECT_EQ(0, force_flush); +} + +TEST(SimpleSpanProcessor, ForceFlush) +{ + int force_flush = 0; + int shutdowns = 0; + RecordShutdownExporter *exporter = new RecordShutdownExporter(&force_flush, &shutdowns); + SimpleSpanProcessor processor(std::unique_ptr{exporter}); + processor.ForceFlush(); + EXPECT_EQ(0, shutdowns); + EXPECT_EQ(1, force_flush); + processor.ForceFlush(); + EXPECT_EQ(2, force_flush); +} + +// An exporter that does nothing but record (and give back ) the # of times Shutdown was called. +class FailShutDownForceFlushExporter final : public SpanExporter +{ +public: + FailShutDownForceFlushExporter() {} + + std::unique_ptr MakeRecordable() noexcept override + { + return std::unique_ptr(new SpanData()); + } + + ExportResult Export(const opentelemetry::nostd::span> + & /* recordables */) noexcept override + { + return ExportResult::kSuccess; + } + + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return false; } + + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return false; } +}; + +TEST(SimpleSpanProcessor, ShutdownFail) +{ + SimpleSpanProcessor processor( + std::unique_ptr{new FailShutDownForceFlushExporter()}); + EXPECT_EQ(false, processor.Shutdown()); +} + +TEST(SimpleSpanProcessor, ForceFlushFail) +{ + SimpleSpanProcessor processor( + std::unique_ptr{new FailShutDownForceFlushExporter()}); + EXPECT_EQ(false, processor.ForceFlush()); } From 2b2fe449ac09b072eb12654ade1e02ac8bb81624 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Fri, 24 Mar 2023 02:05:51 +0800 Subject: [PATCH 2/6] Ignore more warning of generated protobuf files than not included in `-Wall` and `-Wextra` (#2067) --- .../exporters/otlp/protobuf_include_prefix.h | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h index 036cb0ae7f..dc674ce9eb 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h @@ -22,6 +22,9 @@ # pragma warning(disable : 4267) # pragma warning(disable : 4668) # pragma warning(disable : 4946) +# pragma warning(disable : 6001) +# pragma warning(disable : 6244) +# pragma warning(disable : 6246) #endif #if defined(__GNUC__) && !defined(__clang__) && !defined(__apple_build_version__) @@ -30,9 +33,34 @@ # endif # pragma GCC diagnostic ignored "-Wunused-parameter" # pragma GCC diagnostic ignored "-Wtype-limits" +# pragma GCC diagnostic ignored "-Wsign-compare" +# pragma GCC diagnostic ignored "-Wsign-conversion" +# pragma GCC diagnostic ignored "-Wshadow" +# pragma GCC diagnostic ignored "-Wuninitialized" +# pragma GCC diagnostic ignored "-Wconversion" +# if (__GNUC__ * 100 + __GNUC_MINOR__) >= 409 +# pragma GCC diagnostic ignored "-Wfloat-conversion" +# endif +# if (__GNUC__ * 100 + __GNUC_MINOR__) >= 501 +# pragma GCC diagnostic ignored "-Wsuggest-override" +# endif #elif defined(__clang__) || defined(__apple_build_version__) # pragma clang diagnostic push # pragma clang diagnostic ignored "-Wunused-parameter" # pragma clang diagnostic ignored "-Wtype-limits" # pragma clang diagnostic ignored "-Wshadow-field" +# pragma clang diagnostic ignored "-Wsign-compare" +# pragma clang diagnostic ignored "-Wsign-conversion" +# pragma clang diagnostic ignored "-Wshadow" +# pragma clang diagnostic ignored "-Wuninitialized" +# pragma clang diagnostic ignored "-Wconversion" +# if ((__clang_major__ * 100) + __clang_minor__) >= 305 +# pragma clang diagnostic ignored "-Wfloat-conversion" +# endif +# if ((__clang_major__ * 100) + __clang_minor__) >= 306 +# pragma clang diagnostic ignored "-Winconsistent-missing-override" +# endif +# if ((__clang_major__ * 100) + __clang_minor__) >= 1100 +# pragma clang diagnostic ignored "-Wsuggest-override" +# endif #endif From fb3bcb5b52944a6580e76bd49785b24860df0d10 Mon Sep 17 00:00:00 2001 From: Cengizhan Pasaoglu Date: Thu, 23 Mar 2023 20:03:19 +0100 Subject: [PATCH 3/6] Include directory path added for Zipkin exporter example (#2069) Co-authored-by: Lalit Kumar Bhasin --- exporters/zipkin/CMakeLists.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/exporters/zipkin/CMakeLists.txt b/exporters/zipkin/CMakeLists.txt index 68836cc8d3..9cc997023f 100644 --- a/exporters/zipkin/CMakeLists.txt +++ b/exporters/zipkin/CMakeLists.txt @@ -7,6 +7,11 @@ add_library( opentelemetry_exporter_zipkin_trace src/zipkin_exporter.cc src/zipkin_exporter_factory.cc src/recordable.cc) +target_include_directories( + opentelemetry_exporter_zipkin_trace + PUBLIC "$" + "$") + target_link_libraries( opentelemetry_exporter_zipkin_trace PUBLIC opentelemetry_trace opentelemetry_http_client_curl From 3305bd73fcba6df97d1a2dbfc67c1283bf535839 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 26 Mar 2023 20:42:54 +0200 Subject: [PATCH 4/6] Bump actions/stale from 7 to 8 (#2070) Bumps [actions/stale](https://github.com/actions/stale) from 7 to 8. - [Release notes](https://github.com/actions/stale/releases) - [Changelog](https://github.com/actions/stale/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/stale/compare/v7...v8) --- updated-dependencies: - dependency-name: actions/stale dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/stale.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 5f49c9933d..534fb60743 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -7,7 +7,7 @@ jobs: stale: runs-on: ubuntu-latest steps: - - uses: actions/stale@v7 + - uses: actions/stale@v8 with: stale-issue-message: "This issue was marked as stale due to lack of activity." days-before-issue-stale: 60 From 380f0f2543ec064e20e55e37c0d04f1660700582 Mon Sep 17 00:00:00 2001 From: Marc Alff Date: Mon, 27 Mar 2023 15:34:20 +0200 Subject: [PATCH 5/6] Upgraded semantic conventions to 1.19.0 (#2017) --- CHANGELOG.md | 2 + .../trace/semantic_conventions.h | 241 +++++++++++++----- buildscripts/semantic-convention/generate.sh | 6 +- .../sdk/resource/semantic_conventions.h | 98 ++++--- 4 files changed, 240 insertions(+), 107 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d8e3c7c47..0311312b2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ Increment the: top level.[#2062](https://github.com/open-telemetry/opentelemetry-cpp/pull/2062) * [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter` [#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000) +* [SEMANTIC CONVENTIONS] Upgrade to version 1.19.0 + [#2017](https://github.com/open-telemetry/opentelemetry-cpp/pull/2017) Important changes: diff --git a/api/include/opentelemetry/trace/semantic_conventions.h b/api/include/opentelemetry/trace/semantic_conventions.h index d0485704ef..051d865f86 100644 --- a/api/include/opentelemetry/trace/semantic_conventions.h +++ b/api/include/opentelemetry/trace/semantic_conventions.h @@ -21,7 +21,7 @@ namespace SemanticConventions /** * The URL of the OpenTelemetry schema for these keys and values. */ -static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.18.0"; +static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.19.0"; /** * The type of the exception (its fully-qualified class name, if applicable). The dynamic type of @@ -40,6 +40,38 @@ static constexpr const char *kExceptionMessage = "exception.message"; */ static constexpr const char *kExceptionStacktrace = "exception.stacktrace"; +/** + * HTTP request method. + */ +static constexpr const char *kHttpMethod = "http.method"; + +/** + * HTTP response status code. + */ +static constexpr const char *kHttpStatusCode = "http.status_code"; + +/** + * Kind of HTTP protocol used. + */ +static constexpr const char *kHttpFlavor = "http.flavor"; + +/** + * The URI scheme identifying the used protocol. + */ +static constexpr const char *kHttpScheme = "http.scheme"; + +/** + * The matched route (path template in the format used by the respective server framework). See note +below + * + *

Notes: +

  • MUST NOT be populated when this is not supported by the HTTP server framework as the +route attribute should have low-cardinality and the URI path can NOT substitute it. SHOULD include +the application +root if there is one.
+ */ +static constexpr const char *kHttpRoute = "http.route"; + /** * The name identifies the event. */ @@ -59,7 +91,7 @@ static constexpr const char *kEventDomain = "event.domain"; Lambda-Runtime-Invoked-Function-Arn} header on the {@code /runtime/invocation/next} applicable). * *

Notes: -

  • This may be different from {@code faas.id} if an alias is involved.
+
  • This may be different from {@code cloud.resource_id} if an alias is involved.
*/ static constexpr const char *kAwsLambdaInvokedArn = "aws.lambda.invoked_arn"; @@ -255,7 +287,7 @@ static constexpr const char *kOtelStatusCode = "otel.status_code"; static constexpr const char *kOtelStatusDescription = "otel.status_description"; /** - * Type of the trigger which caused this function execution. + * Type of the trigger which caused this function invocation. * *

Notes:

  • For the server/consumer span on the incoming side, @@ -268,9 +300,9 @@ lambda, which is often HTTP).
static constexpr const char *kFaasTrigger = "faas.trigger"; /** - * The execution ID of the current function execution. + * The invocation ID of the current function invocation. */ -static constexpr const char *kFaasExecution = "faas.execution"; +static constexpr const char *kFaasInvocationId = "faas.invocation_id"; /** * The name of the source on which the triggering operation was performed. For example, in Cloud @@ -343,6 +375,30 @@ static constexpr const char *kFaasInvokedProvider = "faas.invoked_provider"; */ static constexpr const char *kFaasInvokedRegion = "faas.invoked_region"; +/** + * The unique identifier of the feature flag. + */ +static constexpr const char *kFeatureFlagKey = "feature_flag.key"; + +/** + * The name of the service provider that performs the flag evaluation. + */ +static constexpr const char *kFeatureFlagProviderName = "feature_flag.provider_name"; + +/** + * SHOULD be a semantic identifier for a value. If one is unavailable, a stringified version of the +value can be used. + * + *

Notes: +

  • A semantic identifier, commonly referred to as a variant, provides a means +for referring to a value without including the value itself. This can +provide additional context for understanding the meaning behind a value. +For example, the variant {@code red} maybe be used for the value {@code #c05543}.
  • A +stringified version of the value can be used in situations where a semantic identifier is +unavailable. String representation of the value should be determined by the implementer.
+ */ +static constexpr const char *kFeatureFlagVariant = "feature_flag.variant"; + /** * Transport protocol used. See note below. */ @@ -521,31 +577,6 @@ static constexpr const char *kCodeLineno = "code.lineno"; */ static constexpr const char *kCodeColumn = "code.column"; -/** - * HTTP request method. - */ -static constexpr const char *kHttpMethod = "http.method"; - -/** - * HTTP response status code. - */ -static constexpr const char *kHttpStatusCode = "http.status_code"; - -/** - * Kind of HTTP protocol used. - * - *

Notes: -

  • If {@code net.transport} is not specified, it can be assumed to be {@code IP.TCP} except - if {@code http.flavor} is {@code QUIC}, in which case {@code IP.UDP} is assumed.
- */ -static constexpr const char *kHttpFlavor = "http.flavor"; - -/** - * Value of the HTTP - * User-Agent header sent by the client. - */ -static constexpr const char *kHttpUserAgent = "http.user_agent"; - /** * The size of the request payload body in bytes. This is the number of bytes transferred excluding * headers and is often, but not always, present as the Notes: - - */ -static constexpr const char *kHttpRoute = "http.route"; - /** * The IP address of the original client behind all proxies, if known (e.g. from X-Forwarded-For). @@ -1054,7 +1069,79 @@ static constexpr const char *kRpcJsonrpcErrorCode = "rpc.jsonrpc.error_code"; */ static constexpr const char *kRpcJsonrpcErrorMessage = "rpc.jsonrpc.error_message"; +/** + * Whether this is a received or sent message. + */ +static constexpr const char *kMessageType = "message.type"; + +/** + * MUST be calculated as two different counters starting from {@code 1} one for sent messages and + one for received message. + * + *

Notes: +

  • This way we guarantee that the values will be consistent between different + implementations.
+ */ +static constexpr const char *kMessageId = "message.id"; + +/** + * Compressed size of the message in bytes. + */ +static constexpr const char *kMessageCompressedSize = "message.compressed_size"; + +/** + * Uncompressed size of the message in bytes. + */ +static constexpr const char *kMessageUncompressedSize = "message.uncompressed_size"; + +/** + * The error codes of the Connect + * request. Error codes are always string values. + */ +static constexpr const char *kRpcConnectRpcErrorCode = "rpc.connect_rpc.error_code"; + +/** + * SHOULD be set to true if the exception event is recorded at a point where it is known that the +exception is escaping the scope of the span. + * + *

Notes: +

  • An exception is considered to have escaped (or left) the scope of a span, +if that span is ended while the exception is still logically "in flight". +This may be actually "in flight" in some languages (e.g. if the exception +is passed to a Context manager's {@code __exit__} method in Python) but will +usually be caught at the point of recording the exception in most languages.
  • It is usually +not possible to determine at the point where an exception is thrown whether it will escape the scope +of a span. However, it is trivial to know that an exception will escape, if one checks for an active +exception just before ending the span, as done in the example +above.
  • It follows that an exception may still escape the scope of the span even if the +{@code exception.escaped} attribute was not set or set to false, since the event might have been +recorded at a time where it was not clear whether the exception will escape.
+ */ +static constexpr const char *kExceptionEscaped = "exception.escaped"; + +/** + * Value of the HTTP + * User-Agent header sent by the client. + */ +static constexpr const char *kUserAgentOriginal = "user_agent.original"; + // Enum definitions +namespace HttpFlavorValues +{ +/** HTTP/1.0. */ +static constexpr const char *kHttp10 = "1.0"; +/** HTTP/1.1. */ +static constexpr const char *kHttp11 = "1.1"; +/** HTTP/2. */ +static constexpr const char *kHttp20 = "2.0"; +/** HTTP/3. */ +static constexpr const char *kHttp30 = "3.0"; +/** SPDY protocol. */ +static constexpr const char *kSpdy = "SPDY"; +/** QUIC protocol. */ +static constexpr const char *kQuic = "QUIC"; +} // namespace HttpFlavorValues + namespace EventDomainValues { /** Events from browser apps. */ @@ -1336,22 +1423,6 @@ static constexpr const char *kNrnsa = "nrnsa"; static constexpr const char *kLteCa = "lte_ca"; } // namespace NetHostConnectionSubtypeValues -namespace HttpFlavorValues -{ -/** HTTP/1.0. */ -static constexpr const char *kHttp10 = "1.0"; -/** HTTP/1.1. */ -static constexpr const char *kHttp11 = "1.1"; -/** HTTP/2. */ -static constexpr const char *kHttp20 = "2.0"; -/** HTTP/3. */ -static constexpr const char *kHttp30 = "3.0"; -/** SPDY protocol. */ -static constexpr const char *kSpdy = "SPDY"; -/** QUIC protocol. */ -static constexpr const char *kQuic = "QUIC"; -} // namespace HttpFlavorValues - namespace GraphqlOperationTypeValues { /** GraphQL query. */ @@ -1418,6 +1489,8 @@ static constexpr const char *kJavaRmi = "java_rmi"; static constexpr const char *kDotnetWcf = "dotnet_wcf"; /** Apache Dubbo. */ static constexpr const char *kApacheDubbo = "apache_dubbo"; +/** Connect RPC. */ +static constexpr const char *kConnectRpc = "connect_rpc"; } // namespace RpcSystemValues namespace RpcGrpcStatusCodeValues @@ -1458,6 +1531,50 @@ static constexpr const int kDataLoss = 15; static constexpr const int kUnauthenticated = 16; } // namespace RpcGrpcStatusCodeValues +namespace MessageTypeValues +{ +/** sent. */ +static constexpr const char *kSent = "SENT"; +/** received. */ +static constexpr const char *kReceived = "RECEIVED"; +} // namespace MessageTypeValues + +namespace RpcConnectRpcErrorCodeValues +{ +/** cancelled. */ +static constexpr const char *kCancelled = "cancelled"; +/** unknown. */ +static constexpr const char *kUnknown = "unknown"; +/** invalid_argument. */ +static constexpr const char *kInvalidArgument = "invalid_argument"; +/** deadline_exceeded. */ +static constexpr const char *kDeadlineExceeded = "deadline_exceeded"; +/** not_found. */ +static constexpr const char *kNotFound = "not_found"; +/** already_exists. */ +static constexpr const char *kAlreadyExists = "already_exists"; +/** permission_denied. */ +static constexpr const char *kPermissionDenied = "permission_denied"; +/** resource_exhausted. */ +static constexpr const char *kResourceExhausted = "resource_exhausted"; +/** failed_precondition. */ +static constexpr const char *kFailedPrecondition = "failed_precondition"; +/** aborted. */ +static constexpr const char *kAborted = "aborted"; +/** out_of_range. */ +static constexpr const char *kOutOfRange = "out_of_range"; +/** unimplemented. */ +static constexpr const char *kUnimplemented = "unimplemented"; +/** internal. */ +static constexpr const char *kInternal = "internal"; +/** unavailable. */ +static constexpr const char *kUnavailable = "unavailable"; +/** data_loss. */ +static constexpr const char *kDataLoss = "data_loss"; +/** unauthenticated. */ +static constexpr const char *kUnauthenticated = "unauthenticated"; +} // namespace RpcConnectRpcErrorCodeValues + } // namespace SemanticConventions } // namespace trace OPENTELEMETRY_END_NAMESPACE diff --git a/buildscripts/semantic-convention/generate.sh b/buildscripts/semantic-convention/generate.sh index ee9d3ae1a4..22f556c486 100755 --- a/buildscripts/semantic-convention/generate.sh +++ b/buildscripts/semantic-convention/generate.sh @@ -15,10 +15,10 @@ ROOT_DIR="${SCRIPT_DIR}/../../" # freeze the spec & generator tools versions to make SemanticAttributes generation reproducible # repository: https://github.com/open-telemetry/opentelemetry-specification -SEMCONV_VERSION=1.18.0 +SEMCONV_VERSION=1.19.0 # repository: https://github.com/open-telemetry/build-tools -GENERATOR_VERSION=0.15.1 +GENERATOR_VERSION=0.18.0 SPEC_VERSION=v$SEMCONV_VERSION SCHEMA_URL=https://opentelemetry.io/schemas/$SEMCONV_VERSION @@ -46,7 +46,7 @@ docker run --rm \ -v ${SCRIPT_DIR}/templates:/templates \ -v ${ROOT_DIR}/api/include/opentelemetry/trace/:/output \ otel/semconvgen:$GENERATOR_VERSION \ - --only span \ + --only span,event,attribute_group,scope \ -f /source code \ --template /templates/SemanticAttributes.h.j2 \ --output /output/semantic_conventions.h \ diff --git a/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h b/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h index 27aff268cc..895aa720d3 100644 --- a/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h +++ b/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h @@ -23,7 +23,7 @@ namespace SemanticConventions /** * The URL of the OpenTelemetry schema for these keys and values. */ -static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.18.0"; +static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.19.0"; /** * Array of brand name and version separated by a space @@ -62,16 +62,6 @@ static constexpr const char *kBrowserPlatform = "browser.platform"; */ static constexpr const char *kBrowserMobile = "browser.mobile"; -/** - * Full user-agent string provided by the browser - * - *

Notes: -

  • The user-agent value SHOULD be provided only from browsers that do not have a mechanism - to retrieve brands and platform individually from the User-Agent Client Hints API. To retrieve the - value, the legacy {@code navigator.userAgent} API can be used.
- */ -static constexpr const char *kBrowserUserAgent = "browser.user_agent"; - /** * Preferred language of the user using the browser * @@ -100,11 +90,41 @@ static constexpr const char *kCloudAccountId = "cloud.account.id"; href="https://aws.amazon.com/about-aws/global-infrastructure/regions_az/">AWS regions, Azure regions, Google Cloud regions, or Tencent Cloud regions. - + href="https://www.tencentcloud.com/document/product/213/6091">Tencent Cloud regions. */ static constexpr const char *kCloudRegion = "cloud.region"; +/** + * Cloud provider-specific native identifier of the monitored cloud resource (e.g. an ARN on AWS, a +fully qualified +resource ID on Azure, a full resource name +on GCP) + * + *

Notes: +

  • On some cloud providers, it may not be possible to determine the full ID at startup, +so it may be necessary to set {@code cloud.resource_id} as a span attribute instead.
  • The +exact value to use for {@code cloud.resource_id} depends on the cloud provider. The following +well-known definitions MUST be used if you set this attribute and they apply:
  • AWS +Lambda: The function ARN. Take care +not to use the "invoked ARN" directly but replace any alias suffix with +the resolved function version, as the same runtime instance may be invokable with multiple different +aliases.
  • GCP: The URI of the resource
  • +
  • Azure: The Fully Qualified +Resource ID of the invoked function, not the function app, having the form +{@code +/subscriptions//resourceGroups//providers/Microsoft.Web/sites//functions/}. +This means that a span attribute MUST be used, as an Azure function app can host multiple functions +that would usually share a TracerProvider.
  • +
+ */ +static constexpr const char *kCloudResourceId = "cloud.resource_id"; + /** * Cloud regions often have multiple, isolated locations known as zones to increase availability. Availability zone represents the zone where the resource is running. @@ -201,6 +221,21 @@ static constexpr const char *kAwsLogStreamNames = "aws.log.stream.names"; */ static constexpr const char *kAwsLogStreamArns = "aws.log.stream.arns"; +/** + * Time and date the release was created + */ +static constexpr const char *kHerokuReleaseCreationTimestamp = "heroku.release.creation_timestamp"; + +/** + * Commit hash for the current release + */ +static constexpr const char *kHerokuReleaseCommit = "heroku.release.commit"; + +/** + * Unique identifier for the application + */ +static constexpr const char *kHerokuAppId = "heroku.app.id"; + /** * Container name used by container runtime. */ @@ -295,35 +330,11 @@ providers/products:
  • Azure: The full name {@code +the {@code cloud.resource_id} attribute).
  • */ static constexpr const char *kFaasName = "faas.name"; -/** - * The unique ID of the single function that this runtime instance executes. - * - *

    Notes: -

    • On some cloud providers, it may not be possible to determine the full ID at startup, -so consider setting {@code faas.id} as a span attribute instead.
    • The exact value to use for -{@code faas.id} depends on the cloud provider:
    • AWS Lambda: The function ARN. Take care -not to use the "invoked ARN" directly but replace any alias suffix with -the resolved function version, as the same runtime instance may be invokable with multiple different -aliases.
    • GCP: The URI of the resource
    • -
    • Azure: The Fully Qualified -Resource ID of the invoked function, not the function app, having the form -{@code -/subscriptions//resourceGroups//providers/Microsoft.Web/sites//functions/}. -This means that a span attribute MUST be used, as an Azure function app can host multiple functions -that would usually share a TracerProvider.
    • -
    - */ -static constexpr const char *kFaasId = "faas.id"; - /** * The immutable version of the function being executed. * @@ -352,19 +363,20 @@ static constexpr const char *kFaasVersion = "faas.version"; static constexpr const char *kFaasInstance = "faas.instance"; /** - * The amount of memory available to the serverless function in MiB. + * The amount of memory available to the serverless function converted to Bytes. * *

    Notes:

    • It's recommended to set this attribute since e.g. too little memory can easily stop a Java AWS Lambda function from working correctly. On AWS Lambda, the environment variable {@code - AWS_LAMBDA_FUNCTION_MEMORY_SIZE} provides this information.
    + AWS_LAMBDA_FUNCTION_MEMORY_SIZE} provides this information (which must be multiplied by + 1,048,576). */ static constexpr const char *kFaasMaxMemory = "faas.max_memory"; /** * Unique host ID. For Cloud, this must be the instance_id assigned by the cloud provider. For - * non-containerized Linux systems, the {@code machine-id} located in {@code /etc/machine-id} or - * {@code /var/lib/dbus/machine-id} may be used. + * non-containerized systems, this should be the {@code machine-id}. See the table below for the + * sources to use to determine the {@code machine-id} based on operating system. */ static constexpr const char *kHostId = "host.id"; @@ -703,6 +715,8 @@ static constexpr const char *kAws = "aws"; static constexpr const char *kAzure = "azure"; /** Google Cloud Platform. */ static constexpr const char *kGcp = "gcp"; +/** Heroku Platform as a Service. */ +static constexpr const char *kHeroku = "heroku"; /** IBM Cloud. */ static constexpr const char *kIbmCloud = "ibm_cloud"; /** Tencent Cloud. */ From bd7a3c7b4148af2bcba68aaaa38e0c21567cce4b Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 27 Mar 2023 15:14:27 -0700 Subject: [PATCH 6/6] [Metrics SDK] Implement Forceflush for Periodic Metric Reader (#2064) --- .../export/periodic_exporting_metric_reader.h | 7 +- .../periodic_exporting_metric_reader.cc | 99 +++++++++++++++++-- .../periodic_exporting_metric_reader_test.cc | 1 + 3 files changed, 97 insertions(+), 10 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index ecb5cc7e72..83773ffb61 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -63,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader std::thread worker_thread_; /* Synchronization primitives */ - std::condition_variable cv_; - std::mutex cv_m_; + std::atomic is_force_flush_pending_; + std::atomic is_force_wakeup_background_worker_; + std::atomic is_force_flush_notified_; + std::condition_variable cv_, force_flush_cv_; + std::mutex cv_m_, force_flush_m_; }; } // namespace metrics diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index bcca86bee0..dfa4a5ee6b 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -60,14 +60,15 @@ void PeriodicExportingMetricReader::DoBackgroundWork() auto end = std::chrono::steady_clock::now(); auto export_time_ms = std::chrono::duration_cast(end - start); auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; - cv_.wait_for(lk, remaining_wait_interval_ms); + cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { + if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) + { + is_force_wakeup_background_worker_.store(false, std::memory_order_release); + return true; + } + return IsShutdown(); + }); } while (IsShutdown() != true); - // One last Collect and Export before shutdown - auto status = CollectAndExportOnce(); - if (!status) - { - OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.") - } } bool PeriodicExportingMetricReader::CollectAndExportOnce() @@ -86,6 +87,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() return true; }); }); + std::future_status status; do { @@ -96,12 +98,93 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() break; } } while (status != std::future_status::ready); + bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); + if (notify_force_flush) + { + is_force_flush_notified_.store(true, std::memory_order_release); + force_flush_cv_.notify_one(); + } + return true; } bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept { - return exporter_->ForceFlush(timeout); + std::unique_lock lk_cv(force_flush_m_); + is_force_flush_pending_.store(true, std::memory_order_release); + auto break_condition = [this]() { + if (IsShutdown()) + { + return true; + } + + // Wake up the worker thread once. + if (is_force_flush_pending_.load(std::memory_order_acquire)) + { + is_force_wakeup_background_worker_.store(true, std::memory_order_release); + cv_.notify_one(); + } + return is_force_flush_notified_.load(std::memory_order_acquire); + }; + + auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(wait_timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + bool result = false; + while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) + { + // When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called + // between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait + // for ever + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } + + // If it will be already signaled, we must wait until notified. + // We use a spin lock here + if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel)) + { + for (int retry_waiting_times = 0; + false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times) + { + opentelemetry::common::SpinLockMutex::fast_yield(); + if ((retry_waiting_times & 127) == 127) + { + std::this_thread::yield(); + } + } + } + is_force_flush_notified_.store(false, std::memory_order_release); + + if (result) + { + // - If original `timeout` is `zero`, use that in exporter::forceflush + // - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush + // - Else don't invoke exporter::forceflush ( as remaining time is zero or less) + if (timeout <= std::chrono::steady_clock::duration::zero()) + { + result = + exporter_->ForceFlush(std::chrono::duration_cast(timeout)); + } + else if (timeout_steady > std::chrono::steady_clock::duration::zero()) + { + result = exporter_->ForceFlush( + std::chrono::duration_cast(timeout_steady)); + } + else + { + // remaining timeout_steady is zero or less + result = false; + } + } + return result; } bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index cd789b5028..e115f79f75 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests) MockMetricProducer producer; reader.SetMetricProducer(&producer); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + EXPECT_NO_THROW(reader.ForceFlush()); reader.Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount());