From 9b222f2af143059c82326cfd926b5fb58db016c6 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Mon, 20 Mar 2023 13:29:21 +0800 Subject: [PATCH 1/2] Add `ForceFlush` for all `LogRecordExporter`s and `SpanExporter`s. (#2000) --- CHANGELOG.md | 8 +++ examples/otlp/grpc_log_main.cc | 21 ++++++ examples/otlp/grpc_main.cc | 12 ++++ examples/otlp/http_log_main.cc | 53 ++++++++++++-- examples/otlp/http_main.cc | 12 ++++ .../elasticsearch/es_log_record_exporter.h | 25 +++++++ .../src/es_log_record_exporter.cc | 70 +++++++++++++++++-- exporters/etw/CMakeLists.txt | 8 ++- exporters/jaeger/BUILD | 1 + exporters/jaeger/CMakeLists.txt | 3 +- .../exporters/jaeger/jaeger_exporter.h | 8 +++ exporters/jaeger/src/jaeger_exporter.cc | 5 ++ .../exporters/ostream/log_record_exporter.h | 8 +++ .../exporters/ostream/span_exporter.h | 8 +++ exporters/ostream/src/log_record_exporter.cc | 6 ++ exporters/ostream/src/span_exporter.cc | 6 ++ .../exporters/otlp/otlp_grpc_exporter.h | 8 +++ .../otlp/otlp_grpc_log_record_exporter.h | 8 +++ .../exporters/otlp/otlp_http_exporter.h | 8 +++ .../otlp/otlp_http_log_record_exporter.h | 8 +++ exporters/otlp/src/otlp_grpc_exporter.cc | 7 ++ .../otlp/src/otlp_grpc_log_record_exporter.cc | 7 ++ exporters/otlp/src/otlp_http_client.cc | 50 +++++++------ exporters/otlp/src/otlp_http_exporter.cc | 5 ++ .../otlp/src/otlp_http_log_record_exporter.cc | 5 ++ .../exporters/zipkin/zipkin_exporter.h | 8 +++ exporters/zipkin/src/zipkin_exporter.cc | 5 ++ .../sdk/logs/batch_log_record_processor.h | 2 + sdk/include/opentelemetry/sdk/logs/exporter.h | 11 ++- .../sdk/trace/batch_span_processor.h | 2 + .../opentelemetry/sdk/trace/exporter.h | 12 +++- .../sdk/trace/simple_processor.h | 11 ++- .../opentelemetry/sdk/trace/tracer_context.h | 2 +- sdk/src/logs/CMakeLists.txt | 1 + sdk/src/logs/batch_log_record_processor.cc | 44 +++++++----- sdk/src/logs/exporter.cc | 27 +++++++ sdk/src/logs/simple_log_record_processor.cc | 6 +- sdk/src/trace/CMakeLists.txt | 1 + sdk/src/trace/batch_span_processor.cc | 44 +++++++----- sdk/src/trace/exporter.cc | 23 ++++++ sdk/src/trace/tracer.cc | 14 +++- sdk/src/trace/tracer_context.cc | 4 +- .../logs/batch_log_record_processor_test.cc | 35 +++++++--- .../logs/simple_log_record_processor_test.cc | 62 +++++++++++++--- sdk/test/trace/batch_span_processor_test.cc | 57 ++++++++++----- sdk/test/trace/simple_processor_test.cc | 65 ++++++++++++++++- 46 files changed, 680 insertions(+), 116 deletions(-) create mode 100644 sdk/src/logs/exporter.cc create mode 100644 sdk/src/trace/exporter.cc diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e47f770e6..0d8e3c7c47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ Increment the: [#2060](https://github.com/open-telemetry/opentelemetry-cpp/pull/2060) * [BUILD] Restore detfault value of `OPENTELEMETRY_INSTALL` to `ON` when it's on top level.[#2062](https://github.com/open-telemetry/opentelemetry-cpp/pull/2062) +* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter` + [#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000) Important changes: @@ -44,6 +46,12 @@ Important changes: * As a result, a behavior change for GRPC SSL is possible, because the endpoint scheme now takes precedence. Please verify configuration settings for the GRPC endpoint. +* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter` + [#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000) + * `LogRecordExporter` and `SpanExporter` add a new virtual function + `ForceFlush`, and if users implement any customized `LogRecordExporter` and + `SpanExporter`, they should also implement this function.There should be no + influence if users only use factory to create exporters. ## [1.8.3] 2023-03-06 diff --git a/examples/otlp/grpc_log_main.cc b/examples/otlp/grpc_log_main.cc index d6d9669a35..e0c85c38c2 100644 --- a/examples/otlp/grpc_log_main.cc +++ b/examples/otlp/grpc_log_main.cc @@ -11,6 +11,11 @@ # include "opentelemetry/sdk/trace/tracer_provider_factory.h" # include "opentelemetry/trace/provider.h" +// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel +// running exportings when destroy and shutdown exporters.It's optional to users. +# include "opentelemetry/sdk/logs/logger_provider.h" +# include "opentelemetry/sdk/trace/tracer_provider.h" + # include # ifdef BAZEL_BUILD @@ -42,6 +47,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } @@ -59,6 +72,14 @@ void InitLogger() void CleanupLogger() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + logs::Provider::GetLoggerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + nostd::shared_ptr none; opentelemetry::logs::Provider::SetLoggerProvider(none); } diff --git a/examples/otlp/grpc_main.cc b/examples/otlp/grpc_main.cc index 679f0f0fd4..d7e3b29d6c 100644 --- a/examples/otlp/grpc_main.cc +++ b/examples/otlp/grpc_main.cc @@ -6,6 +6,10 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" +// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when +// destroy and shutdown exporters.It's optional to users. +#include "opentelemetry/sdk/trace/tracer_provider.h" + #ifdef BAZEL_BUILD # include "examples/common/foo_library/foo_library.h" #else @@ -32,6 +36,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } diff --git a/examples/otlp/http_log_main.cc b/examples/otlp/http_log_main.cc index faddae58ad..a1171b04e0 100644 --- a/examples/otlp/http_log_main.cc +++ b/examples/otlp/http_log_main.cc @@ -13,6 +13,12 @@ # include "opentelemetry/sdk/trace/tracer_provider_factory.h" # include "opentelemetry/trace/provider.h" +// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel +// running exportings when destroy and shutdown exporters.It's optional to users. +# include "opentelemetry/sdk/logs/logger_provider.h" +# include "opentelemetry/sdk/trace/tracer_provider.h" + +# include # include # ifdef BAZEL_BUILD @@ -32,11 +38,27 @@ namespace internal_log = opentelemetry::sdk::common::internal_log; namespace { -opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts; +opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts; void InitTracer() { + if (trace_opts.url.size() > 9) + { + if (trace_opts.url.substr(trace_opts.url.size() - 8) == "/v1/logs") + { + trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 8) + "/v1/traces"; + } + else if (trace_opts.url.substr(trace_opts.url.size() - 9) == "/v1/logs/") + { + trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 9) + "/v1/traces"; + } + else + { + trace_opts.url = opentelemetry::exporter::otlp::GetOtlpDefaultHttpTracesEndpoint(); + } + } + std::cout << "Using " << trace_opts.url << " to export trace spans." << std::endl; // Create OTLP exporter instance - auto exporter = otlp::OtlpHttpExporterFactory::Create(opts); + auto exporter = otlp::OtlpHttpExporterFactory::Create(trace_opts); auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter)); std::shared_ptr provider = trace_sdk::TracerProviderFactory::Create(std::move(processor)); @@ -46,6 +68,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } @@ -53,6 +83,7 @@ void CleanupTracer() opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts; void InitLogger() { + std::cout << "Using " << logger_opts.url << " to export log records." << std::endl; logger_opts.console_debug = true; // Create OTLP exporter instance auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts); @@ -65,6 +96,14 @@ void InitLogger() void CleanupLogger() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + logs::Provider::GetLoggerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; opentelemetry::logs::Provider::SetLoggerProvider(none); } @@ -83,12 +122,12 @@ int main(int argc, char *argv[]) { if (argc > 1) { - opts.url = argv[1]; + trace_opts.url = argv[1]; logger_opts.url = argv[1]; if (argc > 2) { - std::string debug = argv[2]; - opts.console_debug = debug != "" && debug != "0" && debug != "no"; + std::string debug = argv[2]; + trace_opts.console_debug = debug != "" && debug != "0" && debug != "no"; } if (argc > 3) @@ -96,13 +135,13 @@ int main(int argc, char *argv[]) std::string binary_mode = argv[3]; if (binary_mode.size() >= 3 && binary_mode.substr(0, 3) == "bin") { - opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary; + trace_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary; logger_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary; } } } - if (opts.console_debug) + if (trace_opts.console_debug) { internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Debug); } diff --git a/examples/otlp/http_main.cc b/examples/otlp/http_main.cc index 3eb482c6a4..09b8daeee2 100644 --- a/examples/otlp/http_main.cc +++ b/examples/otlp/http_main.cc @@ -8,6 +8,10 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" +// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when +// destroy and shutdown exporters.It's optional to users. +#include "opentelemetry/sdk/trace/tracer_provider.h" + #include #ifdef BAZEL_BUILD @@ -38,6 +42,14 @@ void InitTracer() void CleanupTracer() { + // We call ForceFlush to prevent to cancel running exportings, It's optional. + opentelemetry::nostd::shared_ptr provider = + trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + std::shared_ptr none; trace::Provider::SetTracerProvider(none); } diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h index 7a52810df0..06b4c3b727 100644 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h @@ -7,12 +7,17 @@ # include "nlohmann/json.hpp" # include "opentelemetry/common/spin_lock_mutex.h" # include "opentelemetry/ext/http/client/http_client_factory.h" +# include "opentelemetry/nostd/shared_ptr.h" # include "opentelemetry/nostd/type_traits.h" # include "opentelemetry/sdk/logs/exporter.h" # include "opentelemetry/sdk/logs/recordable.h" # include +# include +# include +# include # include +# include OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -90,6 +95,14 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo const opentelemetry::nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return @@ -108,6 +121,18 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo std::shared_ptr http_client_; mutable opentelemetry::common::SpinLockMutex lock_; bool isShutdown() const noexcept; + +# ifdef ENABLE_ASYNC_EXPORT + struct SynchronizationData + { + std::atomic session_counter_; + std::atomic finished_session_counter_; + std::condition_variable force_flush_cv; + std::mutex force_flush_cv_m; + std::recursive_mutex force_flush_m; + }; + nostd::shared_ptr synchronization_data_; +# endif }; } // namespace logs } // namespace exporter diff --git a/exporters/elasticsearch/src/es_log_record_exporter.cc b/exporters/elasticsearch/src/es_log_record_exporter.cc index c440be685c..60a5d8d781 100644 --- a/exporters/elasticsearch/src/es_log_record_exporter.cc +++ b/exporters/elasticsearch/src/es_log_record_exporter.cc @@ -290,9 +290,19 @@ class AsyncResponseHandler : public http_client::EventHandler # endif ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter() - : options_{ElasticsearchExporterOptions()}, - http_client_{ext::http::client::HttpClientFactory::Create()} -{} + : options_{ElasticsearchExporterOptions()}, http_client_ +{ + ext::http::client::HttpClientFactory::Create() +} +# ifdef ENABLE_ASYNC_EXPORT +, synchronization_data_(new SynchronizationData()) +# endif +{ +# ifdef ENABLE_ASYNC_EXPORT + synchronization_data_->finished_session_counter_.store(0); + synchronization_data_->session_counter_.store(0); +# endif +} ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter( const ElasticsearchExporterOptions &options) @@ -343,10 +353,12 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export( # ifdef ENABLE_ASYNC_EXPORT // Send the request - std::size_t span_count = records.size(); - auto handler = std::make_shared( + synchronization_data_->session_counter_.fetch_add(1, std::memory_order_release); + std::size_t span_count = records.size(); + auto synchronization_data = synchronization_data_; + auto handler = std::make_shared( session, - [span_count](opentelemetry::sdk::common::ExportResult result) { + [span_count, synchronization_data](opentelemetry::sdk::common::ExportResult result) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] ERROR: Export " @@ -358,6 +370,9 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export( OTEL_INTERNAL_LOG_DEBUG("[ES Log Exporter] Export " << span_count << " trace span(s) success"); } + + synchronization_data->finished_session_counter_.fetch_add(1, std::memory_order_release); + synchronization_data->force_flush_cv.notify_all(); return true; }, options_.console_debug_); @@ -401,6 +416,49 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export( # endif } +bool ElasticsearchLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +{ +# ifdef ENABLE_ASYNC_EXPORT + std::lock_guard lock_guard{synchronization_data_->force_flush_m}; + std::size_t running_counter = + synchronization_data_->session_counter_.load(std::memory_order_acquire); + // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented + // in type 'long int' here. So we reset timeout to meet signed long int limit here. + timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); + // Wait for all the sessions to finish + while (timeout_steady > std::chrono::steady_clock::duration::zero()) + { + if (synchronization_data_->finished_session_counter_.load(std::memory_order_acquire) >= + running_counter) + { + break; + } + + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + if (std::cv_status::no_timeout != synchronization_data_->force_flush_cv.wait_for( + lk_cv, std::chrono::seconds{options_.response_timeout_})) + { + break; + } + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } + + return timeout_steady > std::chrono::steady_clock::duration::zero(); +# else + return true; +# endif +} + bool ElasticsearchLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/etw/CMakeLists.txt b/exporters/etw/CMakeLists.txt index 15ab6e8ebb..b74a1b2cb5 100644 --- a/exporters/etw/CMakeLists.txt +++ b/exporters/etw/CMakeLists.txt @@ -11,8 +11,12 @@ target_include_directories( set_target_properties(opentelemetry_exporter_etw PROPERTIES EXPORT_NAME etw_exporter) -target_link_libraries(opentelemetry_exporter_etw - INTERFACE opentelemetry_api nlohmann_json::nlohmann_json) +target_link_libraries( + opentelemetry_exporter_etw INTERFACE opentelemetry_api opentelemetry_trace + nlohmann_json::nlohmann_json) +if(WITH_LOGS_PREVIEW) + target_link_libraries(opentelemetry_exporter_etw INTERFACE opentelemetry_logs) +endif() if(nlohmann_json_clone) add_dependencies(opentelemetry_exporter_etw nlohmann_json::nlohmann_json) endif() diff --git a/exporters/jaeger/BUILD b/exporters/jaeger/BUILD index c042535561..6f0e030b4d 100644 --- a/exporters/jaeger/BUILD +++ b/exporters/jaeger/BUILD @@ -189,6 +189,7 @@ cc_library( deps = [ ":jaeger_exporter", "//sdk/src/common:global_log_handler", + "//sdk/src/trace", ], ) diff --git a/exporters/jaeger/CMakeLists.txt b/exporters/jaeger/CMakeLists.txt index 62d2961324..a02fa7c3e5 100644 --- a/exporters/jaeger/CMakeLists.txt +++ b/exporters/jaeger/CMakeLists.txt @@ -43,7 +43,8 @@ target_include_directories( target_link_libraries( opentelemetry_exporter_jaeger_trace - PUBLIC opentelemetry_resources opentelemetry_http_client_curl + PUBLIC opentelemetry_resources opentelemetry_trace + opentelemetry_http_client_curl PRIVATE thrift::thrift) if(MSVC) diff --git a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h index b8571199a1..eac6dc6940 100644 --- a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h +++ b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h @@ -47,6 +47,14 @@ class OPENTELEMETRY_DEPRECATED JaegerExporter final : public opentelemetry::sdk: const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown the exporter. * @param timeout an option timeout, default to max. diff --git a/exporters/jaeger/src/jaeger_exporter.cc b/exporters/jaeger/src/jaeger_exporter.cc index 8e9e75bd73..7f413caf30 100644 --- a/exporters/jaeger/src/jaeger_exporter.cc +++ b/exporters/jaeger/src/jaeger_exporter.cc @@ -98,6 +98,11 @@ void JaegerExporter::InitializeEndpoint() assert(false); } +bool JaegerExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + return true; +} + bool JaegerExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h index f5e400946f..34806bd200 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h @@ -39,6 +39,14 @@ class OStreamLogRecordExporter final : public opentelemetry::sdk::logs::LogRecor const opentelemetry::nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Marks the OStream Log Exporter as shut down. */ diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h index 985e05715a..baa4e860b1 100644 --- a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h +++ b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h @@ -38,6 +38,14 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter const opentelemetry::nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; diff --git a/exporters/ostream/src/log_record_exporter.cc b/exporters/ostream/src/log_record_exporter.cc index 0b1093ab9d..751de71e2c 100644 --- a/exporters/ostream/src/log_record_exporter.cc +++ b/exporters/ostream/src/log_record_exporter.cc @@ -113,6 +113,12 @@ sdk::common::ExportResult OStreamLogRecordExporter::Export( return sdk::common::ExportResult::kSuccess; } +bool OStreamLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + sout_.flush(); + return true; +} + bool OStreamLogRecordExporter::Shutdown(std::chrono::microseconds) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/ostream/src/span_exporter.cc b/exporters/ostream/src/span_exporter.cc index 79be26b311..6a88c32235 100644 --- a/exporters/ostream/src/span_exporter.cc +++ b/exporters/ostream/src/span_exporter.cc @@ -98,6 +98,12 @@ sdk::common::ExportResult OStreamSpanExporter::Export( return sdk::common::ExportResult::kSuccess; } +bool OStreamSpanExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + sout_.flush(); + return true; +} + bool OStreamSpanExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h index a28e6fca85..ab64d7e769 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h @@ -52,6 +52,14 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter sdk::common::ExportResult Export( const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h index 05224550a3..becb308ca2 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h @@ -55,6 +55,14 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo const nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h index 673e0d0bb3..9188858fc1 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h @@ -53,6 +53,14 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, the default timeout of 0 means that no diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h index 2250cfe831..035b046b72 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h @@ -53,6 +53,14 @@ class OtlpHttpLogRecordExporter final : public opentelemetry::sdk::logs::LogReco const nostd::span> &records) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shutdown this exporter. * @param timeout The maximum time to wait for the shutdown method to return diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 5f4cbc99e2..89ad1fb2cf 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -72,6 +72,13 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( return sdk::common::ExportResult::kSuccess; } +bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the + // running exporting finished here. + return true; +} + bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index e8a3b8a46a..1873ac8ad7 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -91,6 +91,13 @@ bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */ return true; } +bool OtlpGrpcLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the + // running exporting finished here. + return true; +} + bool OtlpGrpcLogRecordExporter::isShutdown() const noexcept { const std::lock_guard locked(lock_); diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 58cff24c1e..24beafa194 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -793,34 +793,40 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept // Wait for all the sessions to finish std::unique_lock lock(session_waker_lock_); - if (timeout <= std::chrono::microseconds::zero()) + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + while (timeout_steady > std::chrono::steady_clock::duration::zero()) { - while (true) { + std::lock_guard guard{session_manager_lock_}; + if (running_sessions_.empty()) { - std::lock_guard guard{session_manager_lock_}; - if (running_sessions_.empty()) - { - break; - } - } - // When changes of running_sessions_ and notify_one/notify_all happen between predicate - // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon - // as possible to call FinishSession() and cleanup resources. - if (std::cv_status::timeout == session_waker_.wait_for(lock, options_.timeout)) - { - cleanupGCSessions(); + break; } } - return true; - } - else - { - return session_waker_.wait_for(lock, timeout, [this] { - std::lock_guard guard{session_manager_lock_}; - return running_sessions_.empty(); - }); + // When changes of running_sessions_ and notify_one/notify_all happen between predicate + // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon + // as possible to call FinishSession() and cleanup resources. + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + if (std::cv_status::timeout == session_waker_.wait_for(lock, options_.timeout)) + { + cleanupGCSessions(); + } + else + { + break; + } + + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } + + return timeout_steady > std::chrono::steady_clock::duration::zero(); } bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 78b587e766..852fee7e1f 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -115,6 +115,11 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( #endif } +bool OtlpHttpExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return http_client_->ForceFlush(timeout); +} + bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index 81bc37a340..98925773c5 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -118,6 +118,11 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( # endif } +bool OtlpHttpLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return http_client_->ForceFlush(timeout); +} + bool OtlpHttpLogRecordExporter::Shutdown(std::chrono::microseconds timeout) noexcept { return http_client_->Shutdown(timeout); diff --git a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h index cb2d7b6954..8c66a251a4 100644 --- a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h +++ b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h @@ -49,6 +49,14 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter const nostd::span> &spans) noexcept override; + /** + * Force flush the exporter. + * @param timeout an option timeout, default to max. + * @return return true when all data are exported, and false when timeout + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + /** * Shut down the exporter. * @param timeout an optional timeout, default to max. diff --git a/exporters/zipkin/src/zipkin_exporter.cc b/exporters/zipkin/src/zipkin_exporter.cc index 802ecccffc..b3f6fcfe61 100644 --- a/exporters/zipkin/src/zipkin_exporter.cc +++ b/exporters/zipkin/src/zipkin_exporter.cc @@ -110,6 +110,11 @@ void ZipkinExporter::InitializeLocalEndpoint() local_end_point_["port"] = url_parser_.port_; } +bool ZipkinExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +{ + return true; +} + bool ZipkinExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h index 0621316dcd..d13e8782f1 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h @@ -117,6 +117,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor std::atomic is_force_wakeup_background_worker; std::atomic is_force_flush_pending; std::atomic is_force_flush_notified; + std::atomic force_flush_timeout_us; std::atomic is_shutdown; }; @@ -128,6 +129,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor * @param synchronization_data Synchronization data to be notified. */ static void NotifyCompletion(bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); void GetWaitAdjustedTime(std::chrono::microseconds &timeout, diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index a531577aef..069fd8784c 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -20,10 +20,11 @@ namespace logs /** * LogRecordExporter defines the interface that log exporters must implement. */ -class LogRecordExporter +class OPENTELEMETRY_EXPORT LogRecordExporter { public: - virtual ~LogRecordExporter() = default; + LogRecordExporter(); + virtual ~LogRecordExporter(); /** * Create a log recordable. This object will be used to record log data and @@ -47,6 +48,12 @@ class LogRecordExporter virtual sdk::common::ExportResult Export( const nostd::span> &records) noexcept = 0; + /** + * Force flush the log records pushed into this log exporter. + */ + virtual bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; + /** * Marks the exporter as ShutDown and cleans up any resources as required. * Shutdown should be called only once for each Exporter instance. diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index 309d849bc7..60e93d8a9f 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -115,6 +115,7 @@ class BatchSpanProcessor : public SpanProcessor std::atomic is_force_wakeup_background_worker; std::atomic is_force_flush_pending; std::atomic is_force_flush_notified; + std::atomic force_flush_timeout_us; std::atomic is_shutdown; }; @@ -126,6 +127,7 @@ class BatchSpanProcessor : public SpanProcessor * @param synchronization_data Synchronization data to be notified. */ static void NotifyCompletion(bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data); void GetWaitAdjustedTime(std::chrono::microseconds &timeout, diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h index 3f9bce9b84..ffacca2b35 100644 --- a/sdk/include/opentelemetry/sdk/trace/exporter.h +++ b/sdk/include/opentelemetry/sdk/trace/exporter.h @@ -20,7 +20,8 @@ namespace trace class OPENTELEMETRY_EXPORT SpanExporter { public: - virtual ~SpanExporter() = default; + SpanExporter(); + virtual ~SpanExporter(); /** * Create a span recordable. This object will be used to record span data and @@ -42,6 +43,15 @@ class OPENTELEMETRY_EXPORT SpanExporter const nostd::span> &spans) noexcept = 0; + /** + * Export all spans that have been exported. + * @param timeout an optional timeout, the default timeout of 0 means that no + * timeout is applied. + * @return return true when all data are exported, and false when timeout + */ + virtual bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; + /** * Shut down the exporter. * @param timeout an optional timeout. diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h index 6796a10848..671218d0cf 100644 --- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h @@ -55,7 +55,16 @@ class SimpleSpanProcessor : public SpanProcessor } } - bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return true; } + bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override + { + if (exporter_ != nullptr) + { + return exporter_->ForceFlush(timeout); + } + + return true; + } bool Shutdown( std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override diff --git a/sdk/include/opentelemetry/sdk/trace/tracer_context.h b/sdk/include/opentelemetry/sdk/trace/tracer_context.h index 573af20b7d..6dcdda8e01 100644 --- a/sdk/include/opentelemetry/sdk/trace/tracer_context.h +++ b/sdk/include/opentelemetry/sdk/trace/tracer_context.h @@ -87,7 +87,7 @@ class TracerContext /** * Shutdown the span processor associated with this tracer provider. */ - bool Shutdown() noexcept; + bool Shutdown(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; private: // order of declaration is important here - resource object should be destroyed after processor. diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index cbaea426e6..2b7db6eb7c 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -6,6 +6,7 @@ add_library( logger_provider.cc logger_provider_factory.cc logger.cc + exporter.cc event_logger_provider.cc event_logger_provider_factory.cc event_logger.cc diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index 7903c3b24c..6b1db1c666 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -46,6 +46,7 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptris_force_wakeup_background_worker.store(false); synchronization_data_->is_force_flush_pending.store(false); synchronization_data_->is_force_flush_notified.store(false); + synchronization_data_->force_flush_timeout_us.store(0); synchronization_data_->is_shutdown.store(false); } @@ -88,6 +89,7 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); auto break_condition = [this]() { if (synchronization_data_->is_shutdown.load() == true) { @@ -106,23 +108,24 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex // Fix timeout to meet requirement of wait_for timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( timeout, std::chrono::microseconds::zero()); - bool result; - if (timeout <= std::chrono::microseconds::zero()) + + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) { - bool wait_result = false; - while (!wait_result) - { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever - wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_, - break_condition); - } - result = true; + timeout_steady = std::chrono::steady_clock::duration::max(); } - else + + bool result = false; + while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition); + // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called + // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait + // for ever + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_, + break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } // If it's already signaled, we must wait util notified. @@ -202,7 +205,7 @@ void BatchLogRecordProcessor::Export() if (num_records_to_export == 0) { - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); break; } @@ -218,12 +221,13 @@ void BatchLogRecordProcessor::Export() exporter_->Export( nostd::span>(records_arr.data(), records_arr.size())); - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); } while (true); } void BatchLogRecordProcessor::NotifyCompletion( bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { if (!synchronization_data) @@ -233,6 +237,14 @@ void BatchLogRecordProcessor::NotifyCompletion( if (notify_force_flush) { + if (exporter) + { + std::chrono::microseconds timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + std::chrono::microseconds{ + synchronization_data->force_flush_timeout_us.load(std::memory_order_acquire)}, + std::chrono::microseconds::zero()); + exporter->ForceFlush(timeout); + } synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); synchronization_data->force_flush_cv.notify_one(); } diff --git a/sdk/src/logs/exporter.cc b/sdk/src/logs/exporter.cc new file mode 100644 index 0000000000..20fdb6b25c --- /dev/null +++ b/sdk/src/logs/exporter.cc @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifdef ENABLE_LOGS_PREVIEW + +# include "opentelemetry/sdk/logs/exporter.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ + +OPENTELEMETRY_EXPORT LogRecordExporter::LogRecordExporter() {} + +OPENTELEMETRY_EXPORT LogRecordExporter::~LogRecordExporter() {} + +OPENTELEMETRY_EXPORT bool LogRecordExporter::ForceFlush( + std::chrono::microseconds /*timeout*/) noexcept +{ + return true; +} + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/logs/simple_log_record_processor.cc b/sdk/src/logs/simple_log_record_processor.cc index ef923bc590..50d4c20fdd 100644 --- a/sdk/src/logs/simple_log_record_processor.cc +++ b/sdk/src/logs/simple_log_record_processor.cc @@ -43,8 +43,12 @@ void SimpleLogRecordProcessor::OnEmit(std::unique_ptr &&record) noex /** * The simple processor does not have any log records to flush so this method is not used */ -bool SimpleLogRecordProcessor::ForceFlush(std::chrono::microseconds /* timeout */) noexcept +bool SimpleLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept { + if (exporter_ != nullptr) + { + return exporter_->ForceFlush(timeout); + } return true; } diff --git a/sdk/src/trace/CMakeLists.txt b/sdk/src/trace/CMakeLists.txt index 7f757f036c..94c2059548 100644 --- a/sdk/src/trace/CMakeLists.txt +++ b/sdk/src/trace/CMakeLists.txt @@ -9,6 +9,7 @@ add_library( tracer_provider_factory.cc tracer.cc span.cc + exporter.cc batch_span_processor.cc batch_span_processor_factory.cc simple_processor_factory.cc diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index de6c457da7..da9f39f054 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -30,6 +30,7 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, synchronization_data_->is_force_wakeup_background_worker.store(false); synchronization_data_->is_force_flush_pending.store(false); synchronization_data_->is_force_flush_notified.store(false); + synchronization_data_->force_flush_timeout_us.store(0); synchronization_data_->is_shutdown.store(false); } @@ -77,6 +78,7 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m); synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release); + synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release); auto break_condition = [this]() { if (synchronization_data_->is_shutdown.load() == true) { @@ -97,23 +99,23 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept // Fix timeout to meet requirement of wait_for timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( timeout, std::chrono::microseconds::zero()); - bool result; - if (timeout <= std::chrono::microseconds::zero()) + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) { - bool wait_result = false; - while (!wait_result) - { - // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called - // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait - // for ever - wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, - break_condition); - } - result = true; + timeout_steady = std::chrono::steady_clock::duration::max(); } - else + + bool result = false; + while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) { - result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition); + // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called + // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait + // for ever + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, + break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; } // If it will be already signaled, we must wait util notified. @@ -192,7 +194,7 @@ void BatchSpanProcessor::Export() if (num_records_to_export == 0) { - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); break; } buffer_.Consume(num_records_to_export, @@ -206,12 +208,13 @@ void BatchSpanProcessor::Export() }); exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); - NotifyCompletion(notify_force_flush, synchronization_data_); + NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); } while (true); } void BatchSpanProcessor::NotifyCompletion( bool notify_force_flush, + const std::unique_ptr &exporter, const std::shared_ptr &synchronization_data) { if (!synchronization_data) @@ -221,6 +224,15 @@ void BatchSpanProcessor::NotifyCompletion( if (notify_force_flush) { + if (exporter) + { + std::chrono::microseconds timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + std::chrono::microseconds{ + synchronization_data->force_flush_timeout_us.load(std::memory_order_acquire)}, + std::chrono::microseconds::zero()); + exporter->ForceFlush(timeout); + } + synchronization_data->is_force_flush_notified.store(true, std::memory_order_release); synchronization_data->force_flush_cv.notify_one(); } diff --git a/sdk/src/trace/exporter.cc b/sdk/src/trace/exporter.cc new file mode 100644 index 0000000000..5984583ba5 --- /dev/null +++ b/sdk/src/trace/exporter.cc @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/sdk/trace/exporter.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace trace +{ + +OPENTELEMETRY_EXPORT SpanExporter::SpanExporter() {} + +OPENTELEMETRY_EXPORT SpanExporter::~SpanExporter() {} + +OPENTELEMETRY_EXPORT bool SpanExporter::ForceFlush(std::chrono::microseconds /*timeout*/) noexcept +{ + return true; +} + +} // namespace trace +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/trace/tracer.cc b/sdk/src/trace/tracer.cc index 74e16bdfe0..07fd36d5b7 100644 --- a/sdk/src/trace/tracer.cc +++ b/sdk/src/trace/tracer.cc @@ -104,12 +104,22 @@ nostd::shared_ptr Tracer::StartSpan( void Tracer::ForceFlushWithMicroseconds(uint64_t timeout) noexcept { - (void)timeout; + if (context_) + { + context_->ForceFlush( + std::chrono::microseconds{static_cast(timeout)}); + } } void Tracer::CloseWithMicroseconds(uint64_t timeout) noexcept { - (void)timeout; + // Trace context is shared by many tracers.So we just call ForceFlush to flush all pending spans + // and do not shutdown it. + if (context_) + { + context_->ForceFlush( + std::chrono::microseconds{static_cast(timeout)}); + } } } // namespace trace } // namespace sdk diff --git a/sdk/src/trace/tracer_context.cc b/sdk/src/trace/tracer_context.cc index d204218021..0b89c5cefb 100644 --- a/sdk/src/trace/tracer_context.cc +++ b/sdk/src/trace/tracer_context.cc @@ -53,9 +53,9 @@ bool TracerContext::ForceFlush(std::chrono::microseconds timeout) noexcept return processor_->ForceFlush(timeout); } -bool TracerContext::Shutdown() noexcept +bool TracerContext::Shutdown(std::chrono::microseconds timeout) noexcept { - return processor_->Shutdown(); + return processor_->Shutdown(timeout); } } // namespace trace diff --git a/sdk/test/logs/batch_log_record_processor_test.cc b/sdk/test/logs/batch_log_record_processor_test.cc index de3beb66fe..462e9b8397 100644 --- a/sdk/test/logs/batch_log_record_processor_test.cc +++ b/sdk/test/logs/batch_log_record_processor_test.cc @@ -70,10 +70,12 @@ class MockLogExporter final : public LogRecordExporter { public: MockLogExporter(std::shared_ptr>> logs_received, + std::shared_ptr> force_flush_counter, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed, const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) : logs_received_(logs_received), + force_flush_counter_(force_flush_counter), is_shutdown_(is_shutdown), is_export_completed_(is_export_completed), export_delay_(export_delay) @@ -109,6 +111,12 @@ class MockLogExporter final : public LogRecordExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + ++(*force_flush_counter_); + return true; + } + // toggles the boolean flag marking this exporter as shut down bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { @@ -118,6 +126,7 @@ class MockLogExporter final : public LogRecordExporter private: std::shared_ptr>> logs_received_; + std::shared_ptr> force_flush_counter_; std::shared_ptr> is_shutdown_; std::shared_ptr> is_export_completed_; const std::chrono::milliseconds export_delay_; @@ -134,6 +143,7 @@ class BatchLogRecordProcessorTest : public testing::Test // ::testing::Test // is_shutdown flag, and the processor configuration options (default if unspecified) std::shared_ptr GetMockProcessor( std::shared_ptr>> logs_received, + std::shared_ptr> force_flush_counter, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = std::shared_ptr>(new std::atomic(false)), @@ -143,8 +153,8 @@ class BatchLogRecordProcessorTest : public testing::Test // ::testing::Test const size_t max_export_batch_size = 512) { return std::shared_ptr(new BatchLogRecordProcessor( - std::unique_ptr( - new MockLogExporter(logs_received, is_shutdown, is_export_completed, export_delay)), + std::unique_ptr(new MockLogExporter( + logs_received, force_flush_counter, is_shutdown, is_export_completed, export_delay)), max_queue_size, scheduled_delay_millis, max_export_batch_size)); } }; @@ -154,9 +164,10 @@ TEST_F(BatchLogRecordProcessorTest, TestShutdown) // initialize a batch log processor with the test exporter std::shared_ptr>> logs_received( new std::vector>); + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); // Create a few test log records and send them to the processor const int num_logs = 3; @@ -189,11 +200,12 @@ TEST_F(BatchLogRecordProcessorTest, TestShutdown) TEST_F(BatchLogRecordProcessorTest, TestForceFlush) { + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> logs_received( new std::vector>); - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); const int num_logs = 2048; for (int i = 0; i < num_logs; ++i) @@ -204,6 +216,7 @@ TEST_F(BatchLogRecordProcessorTest, TestForceFlush) } EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GT(force_flush_counter->load(), 0); EXPECT_EQ(num_logs, logs_received->size()); for (int i = 0; i < num_logs; ++i) @@ -219,7 +232,9 @@ TEST_F(BatchLogRecordProcessorTest, TestForceFlush) batch_processor->OnEmit(std::move(log)); } + std::size_t force_flush_counter_before = force_flush_counter->load(); EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GT(force_flush_counter->load(), force_flush_counter_before); EXPECT_EQ(num_logs * 2, logs_received->size()); for (int i = 0; i < num_logs * 2; ++i) @@ -232,13 +247,14 @@ TEST_F(BatchLogRecordProcessorTest, TestManyLogsLoss) { /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> logs_received( new std::vector>); const int max_queue_size = 4096; - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); // Create max_queue_size log records for (int i = 0; i < max_queue_size; ++i) @@ -258,10 +274,11 @@ TEST_F(BatchLogRecordProcessorTest, TestManyLogsLossLess) { /* Test that no logs are lost when sending max_queue_size logs */ + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> logs_received( new std::vector>); - auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown); const int num_logs = 2048; @@ -286,6 +303,7 @@ TEST_F(BatchLogRecordProcessorTest, TestScheduledDelayMillis) /* Test that max_export_batch_size logs are exported every scheduled_delay_millis seconds */ + std::shared_ptr> force_flush_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> is_export_completed(new std::atomic(false)); std::shared_ptr>> logs_received( @@ -295,8 +313,9 @@ TEST_F(BatchLogRecordProcessorTest, TestScheduledDelayMillis) const std::chrono::milliseconds scheduled_delay_millis(2000); const size_t max_export_batch_size = 512; - auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, scheduled_delay_millis); + auto batch_processor = + GetMockProcessor(logs_received, force_flush_counter, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis); for (std::size_t i = 0; i < max_export_batch_size; ++i) { diff --git a/sdk/test/logs/simple_log_record_processor_test.cc b/sdk/test/logs/simple_log_record_processor_test.cc index 275991dc10..2d377cd924 100644 --- a/sdk/test/logs/simple_log_record_processor_test.cc +++ b/sdk/test/logs/simple_log_record_processor_test.cc @@ -69,10 +69,12 @@ class TestLogRecordable final : public opentelemetry::sdk::logs::Recordable class TestExporter final : public LogRecordExporter { public: - TestExporter(int *shutdown_counter, + TestExporter(int *force_flush_counter, + int *shutdown_counter, std::shared_ptr>> logs_received, size_t *batch_size_received) - : shutdown_counter_(shutdown_counter), + : force_flush_counter_(force_flush_counter), + shutdown_counter_(shutdown_counter), logs_received_(logs_received), batch_size_received(batch_size_received) {} @@ -99,14 +101,27 @@ class TestExporter final : public LogRecordExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + if (nullptr != force_flush_counter_) + { + ++(*force_flush_counter_); + } + return true; + } + // Increment the shutdown counter everytime this method is called bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { - *shutdown_counter_ += 1; + if (nullptr != shutdown_counter_) + { + *shutdown_counter_ += 1; + } return true; } private: + int *force_flush_counter_; int *shutdown_counter_; std::shared_ptr>> logs_received_; size_t *batch_size_received; @@ -122,7 +137,7 @@ TEST(SimpleLogRecordProcessorTest, SendReceivedLogsToExporter) size_t batch_size_received = 0; std::unique_ptr exporter( - new TestExporter(nullptr, logs_received, &batch_size_received)); + new TestExporter(nullptr, nullptr, logs_received, &batch_size_received)); SimpleLogRecordProcessor processor(std::move(exporter)); @@ -152,7 +167,8 @@ TEST(SimpleLogRecordProcessorTest, ShutdownCalledOnce) // Create a TestExporter int num_shutdowns = 0; - std::unique_ptr exporter(new TestExporter(&num_shutdowns, nullptr, nullptr)); + std::unique_ptr exporter( + new TestExporter(nullptr, &num_shutdowns, nullptr, nullptr)); // Create a processor with the previous test exporter SimpleLogRecordProcessor processor(std::move(exporter)); @@ -167,11 +183,28 @@ TEST(SimpleLogRecordProcessorTest, ShutdownCalledOnce) EXPECT_EQ(1, num_shutdowns); } +TEST(SimpleLogRecordProcessorTest, ForceFlush) +{ + // Create a TestExporter + int num_force_flush = 0; + + std::unique_ptr exporter( + new TestExporter(&num_force_flush, nullptr, nullptr, nullptr)); + + // Create a processor with the previous test exporter + SimpleLogRecordProcessor processor(std::move(exporter)); + + // The first time processor shutdown is called + EXPECT_EQ(0, num_force_flush); + EXPECT_EQ(true, processor.ForceFlush()); + EXPECT_EQ(1, num_force_flush); +} + // A test exporter that always returns failure when shut down -class FailShutDownExporter final : public LogRecordExporter +class FailShutDownForceFlushExporter final : public LogRecordExporter { public: - FailShutDownExporter() {} + FailShutDownForceFlushExporter() {} std::unique_ptr MakeRecordable() noexcept override { @@ -184,16 +217,29 @@ class FailShutDownExporter final : public LogRecordExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return false; } + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return false; } }; // Tests for when when processor should fail to shutdown TEST(SimpleLogRecordProcessorTest, ShutDownFail) { - std::unique_ptr exporter(new FailShutDownExporter()); + std::unique_ptr exporter(new FailShutDownForceFlushExporter()); SimpleLogRecordProcessor processor(std::move(exporter)); // Expect failure result when exporter fails to shutdown EXPECT_EQ(false, processor.Shutdown()); } + +// Tests for when when processor should fail to force flush +TEST(SimpleLogRecordProcessorTest, ForceFlushFail) +{ + std::unique_ptr exporter(new FailShutDownForceFlushExporter()); + SimpleLogRecordProcessor processor(std::move(exporter)); + + // Expect failure result when exporter fails to force flush + EXPECT_EQ(false, processor.ForceFlush()); +} + #endif diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index e9150d1ff1..b7a6558538 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -23,11 +23,13 @@ class MockSpanExporter final : public sdk::trace::SpanExporter public: MockSpanExporter( std::shared_ptr>> spans_received, + std::shared_ptr> shut_down_counter, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = std::shared_ptr>(new std::atomic(false)), const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept : spans_received_(spans_received), + shut_down_counter_(shut_down_counter), is_shutdown_(is_shutdown), is_export_completed_(is_export_completed), export_delay_(export_delay) @@ -60,6 +62,12 @@ class MockSpanExporter final : public sdk::trace::SpanExporter return sdk::common::ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override + { + ++(*shut_down_counter_); + return true; + } + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { *is_shutdown_ = true; @@ -70,6 +78,7 @@ class MockSpanExporter final : public sdk::trace::SpanExporter private: std::shared_ptr>> spans_received_; + std::shared_ptr> shut_down_counter_; std::shared_ptr> is_shutdown_; std::shared_ptr> is_export_completed_; // Meant exclusively to test force flush timeout @@ -104,14 +113,16 @@ class BatchSpanProcessorTestPeer : public testing::Test TEST_F(BatchSpanProcessorTestPeer, TestShutdown) { + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); const int num_spans = 3; auto test_spans = GetTestSpans(batch_processor, num_spans); @@ -136,14 +147,16 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) { + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); const int num_spans = 2048; auto test_spans = GetTestSpans(batch_processor, num_spans); @@ -157,6 +170,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) std::this_thread::sleep_for(std::chrono::milliseconds(50)); EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GE(shut_down_counter->load(), 1); EXPECT_EQ(num_spans, spans_received->size()); for (int i = 0; i < num_spans; ++i) @@ -174,7 +188,9 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) // Give some time to export the spans std::this_thread::sleep_for(std::chrono::milliseconds(50)); + auto shut_down_counter_before = shut_down_counter->load(); EXPECT_TRUE(batch_processor->ForceFlush()); + EXPECT_GT(shut_down_counter->load(), shut_down_counter_before); EXPECT_EQ(num_spans * 2, spans_received->size()); for (int i = 0; i < num_spans; ++i) @@ -209,16 +225,18 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss) auto log_handler = nostd::shared_ptr(new MockLogHandler()); sdk::common::internal_log::GlobalLogHandler::SetLogHandler(log_handler); + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); const int max_queue_size = 4096; - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); auto test_spans = GetTestSpans(batch_processor, max_queue_size); @@ -259,16 +277,18 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess) { /* Test that no spans are lost when sending max_queue_size spans */ + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); const int num_spans = 2048; - auto batch_processor = std::shared_ptr( - new sdk::trace::BatchSpanProcessor(std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown)), - sdk::trace::BatchSpanProcessorOptions())); + auto batch_processor = + std::shared_ptr(new sdk::trace::BatchSpanProcessor( + std::unique_ptr( + new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), + sdk::trace::BatchSpanProcessorOptions())); auto test_spans = GetTestSpans(batch_processor, num_spans); @@ -294,6 +314,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis) /* Test that max_export_batch_size spans are exported every schedule_delay_millis seconds */ + std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> is_export_completed(new std::atomic(false)); std::shared_ptr>> spans_received( @@ -305,8 +326,8 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis) auto batch_processor = std::shared_ptr(new sdk::trace::BatchSpanProcessor( - std::unique_ptr( - new MockSpanExporter(spans_received, is_shutdown, is_export_completed, export_delay)), + std::unique_ptr(new MockSpanExporter( + spans_received, shut_down_counter, is_shutdown, is_export_completed, export_delay)), options)); auto test_spans = GetTestSpans(batch_processor, max_export_batch_size); diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc index f32485e7d0..7653bb2cfc 100644 --- a/sdk/test/trace/simple_processor_test.cc +++ b/sdk/test/trace/simple_processor_test.cc @@ -38,7 +38,9 @@ TEST(SimpleProcessor, ToInMemorySpanExporter) class RecordShutdownExporter final : public SpanExporter { public: - RecordShutdownExporter(int *shutdown_counter) : shutdown_counter_(shutdown_counter) {} + RecordShutdownExporter(int *force_flush_counter, int *shutdown_counter) + : force_flush_counter_(force_flush_counter), shutdown_counter_(shutdown_counter) + {} std::unique_ptr MakeRecordable() noexcept override { @@ -51,6 +53,12 @@ class RecordShutdownExporter final : public SpanExporter return ExportResult::kSuccess; } + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + *force_flush_counter_ += 1; + return true; + } + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { *shutdown_counter_ += 1; @@ -58,17 +66,70 @@ class RecordShutdownExporter final : public SpanExporter } private: + int *force_flush_counter_; int *shutdown_counter_; }; TEST(SimpleSpanProcessor, ShutdownCalledOnce) { + int force_flush = 0; int shutdowns = 0; - RecordShutdownExporter *exporter = new RecordShutdownExporter(&shutdowns); + RecordShutdownExporter *exporter = new RecordShutdownExporter(&force_flush, &shutdowns); SimpleSpanProcessor processor(std::unique_ptr{exporter}); EXPECT_EQ(0, shutdowns); processor.Shutdown(); EXPECT_EQ(1, shutdowns); processor.Shutdown(); EXPECT_EQ(1, shutdowns); + + EXPECT_EQ(0, force_flush); +} + +TEST(SimpleSpanProcessor, ForceFlush) +{ + int force_flush = 0; + int shutdowns = 0; + RecordShutdownExporter *exporter = new RecordShutdownExporter(&force_flush, &shutdowns); + SimpleSpanProcessor processor(std::unique_ptr{exporter}); + processor.ForceFlush(); + EXPECT_EQ(0, shutdowns); + EXPECT_EQ(1, force_flush); + processor.ForceFlush(); + EXPECT_EQ(2, force_flush); +} + +// An exporter that does nothing but record (and give back ) the # of times Shutdown was called. +class FailShutDownForceFlushExporter final : public SpanExporter +{ +public: + FailShutDownForceFlushExporter() {} + + std::unique_ptr MakeRecordable() noexcept override + { + return std::unique_ptr(new SpanData()); + } + + ExportResult Export(const opentelemetry::nostd::span> + & /* recordables */) noexcept override + { + return ExportResult::kSuccess; + } + + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return false; } + + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return false; } +}; + +TEST(SimpleSpanProcessor, ShutdownFail) +{ + SimpleSpanProcessor processor( + std::unique_ptr{new FailShutDownForceFlushExporter()}); + EXPECT_EQ(false, processor.Shutdown()); +} + +TEST(SimpleSpanProcessor, ForceFlushFail) +{ + SimpleSpanProcessor processor( + std::unique_ptr{new FailShutDownForceFlushExporter()}); + EXPECT_EQ(false, processor.ForceFlush()); } From 2b2fe449ac09b072eb12654ade1e02ac8bb81624 Mon Sep 17 00:00:00 2001 From: WenTao Ou Date: Fri, 24 Mar 2023 02:05:51 +0800 Subject: [PATCH 2/2] Ignore more warning of generated protobuf files than not included in `-Wall` and `-Wextra` (#2067) --- .../exporters/otlp/protobuf_include_prefix.h | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h index 036cb0ae7f..dc674ce9eb 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h @@ -22,6 +22,9 @@ # pragma warning(disable : 4267) # pragma warning(disable : 4668) # pragma warning(disable : 4946) +# pragma warning(disable : 6001) +# pragma warning(disable : 6244) +# pragma warning(disable : 6246) #endif #if defined(__GNUC__) && !defined(__clang__) && !defined(__apple_build_version__) @@ -30,9 +33,34 @@ # endif # pragma GCC diagnostic ignored "-Wunused-parameter" # pragma GCC diagnostic ignored "-Wtype-limits" +# pragma GCC diagnostic ignored "-Wsign-compare" +# pragma GCC diagnostic ignored "-Wsign-conversion" +# pragma GCC diagnostic ignored "-Wshadow" +# pragma GCC diagnostic ignored "-Wuninitialized" +# pragma GCC diagnostic ignored "-Wconversion" +# if (__GNUC__ * 100 + __GNUC_MINOR__) >= 409 +# pragma GCC diagnostic ignored "-Wfloat-conversion" +# endif +# if (__GNUC__ * 100 + __GNUC_MINOR__) >= 501 +# pragma GCC diagnostic ignored "-Wsuggest-override" +# endif #elif defined(__clang__) || defined(__apple_build_version__) # pragma clang diagnostic push # pragma clang diagnostic ignored "-Wunused-parameter" # pragma clang diagnostic ignored "-Wtype-limits" # pragma clang diagnostic ignored "-Wshadow-field" +# pragma clang diagnostic ignored "-Wsign-compare" +# pragma clang diagnostic ignored "-Wsign-conversion" +# pragma clang diagnostic ignored "-Wshadow" +# pragma clang diagnostic ignored "-Wuninitialized" +# pragma clang diagnostic ignored "-Wconversion" +# if ((__clang_major__ * 100) + __clang_minor__) >= 305 +# pragma clang diagnostic ignored "-Wfloat-conversion" +# endif +# if ((__clang_major__ * 100) + __clang_minor__) >= 306 +# pragma clang diagnostic ignored "-Winconsistent-missing-override" +# endif +# if ((__clang_major__ * 100) + __clang_minor__) >= 1100 +# pragma clang diagnostic ignored "-Wsuggest-override" +# endif #endif