diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 6806f1cdf9..3753704bb3 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -3,6 +3,7 @@ #pragma once +#include #include #include #include @@ -275,7 +276,7 @@ class OtlpHttpClient std::shared_ptr http_client); // Stores if this HTTP client had its Shutdown() method called - bool is_shutdown_; + std::atomic is_shutdown_; // The configuration options associated with this HTTP client. const OtlpHttpClientOptions options_; @@ -296,6 +297,8 @@ class OtlpHttpClient // Condition variable and mutex to control the concurrency count of running sessions std::mutex session_waker_lock_; std::condition_variable session_waker_; + std::atomic start_session_counter_; + std::atomic finished_session_counter_; }; } // namespace otlp } // namespace exporter diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index a5760bdd81..d3d59bce6e 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -567,14 +567,19 @@ bool OtlpGrpcClient::Shutdown(std::chrono::microseconds timeout) noexcept return true; } + bool force_flush_result; if (false == is_shutdown_.exchange(true, std::memory_order_acq_rel)) { - is_shutdown_ = true; + force_flush_result = ForceFlush(timeout); async_data_->cq.Shutdown(); } + else + { + force_flush_result = ForceFlush(timeout); + } - return ForceFlush(timeout); + return force_flush_result; } #endif diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 94f02a9967..07b9108a3a 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -41,9 +41,11 @@ // clang-format off #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" // IWYU pragma: keep -#include +// clang-format on #include +#include #include +// clang-format off #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on @@ -661,7 +663,11 @@ void ConvertListFieldToJson(nlohmann::json &value, } // namespace OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options) - : is_shutdown_(false), options_(options), http_client_(http_client::HttpClientFactory::Create()) + : is_shutdown_(false), + options_(options), + http_client_(http_client::HttpClientFactory::Create()), + start_session_counter_(0), + finished_session_counter_(0) { http_client_->SetMaxSessionsPerConnection(options_.max_requests_per_connection); } @@ -700,7 +706,11 @@ OtlpHttpClient::~OtlpHttpClient() OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options, std::shared_ptr http_client) - : is_shutdown_(false), options_(options), http_client_(std::move(http_client)) + : is_shutdown_(false), + options_(options), + http_client_(std::move(http_client)), + start_session_counter_(0), + finished_session_counter_(0) { http_client_->SetMaxSessionsPerConnection(options_.max_requests_per_connection); } @@ -799,6 +809,8 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept timeout_steady = (std::chrono::steady_clock::duration::max)(); } + size_t wait_counter = start_session_counter_.load(std::memory_order_acquire); + while (timeout_steady > std::chrono::steady_clock::duration::zero()) { { @@ -816,7 +828,7 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept { cleanupGCSessions(); } - else + else if (finished_session_counter_.load(std::memory_order_acquire) >= wait_counter) { break; } @@ -829,20 +841,24 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept { + is_shutdown_.store(true, std::memory_order_release); + + bool force_flush_result = ForceFlush(timeout); + { std::lock_guard guard{session_manager_lock_}; - is_shutdown_ = true; // Shutdown the session manager http_client_->CancelAllSessions(); http_client_->FinishAllSessions(); } - ForceFlush(timeout); - + // Wait util all sessions are canceled. while (cleanupGCSessions()) - ; - return true; + { + ForceFlush(std::chrono::milliseconds{1}); + } + return force_flush_result; } void OtlpHttpClient::ReleaseSession( @@ -859,6 +875,7 @@ void OtlpHttpClient::ReleaseSession( gc_sessions_.emplace_back(std::move(session_iter->second)); running_sessions_.erase(session_iter); + finished_session_counter_.fetch_add(1, std::memory_order_release); has_session = true; } @@ -1003,6 +1020,7 @@ void OtlpHttpClient::addSession(HttpSessionData &&session_data) noexcept store_session_data = std::move(session_data); } + start_session_counter_.fetch_add(1, std::memory_order_release); // Send request after the session is added session->SendRequest(handle); } @@ -1027,7 +1045,7 @@ bool OtlpHttpClient::cleanupGCSessions() noexcept bool OtlpHttpClient::IsShutdown() const noexcept { - return is_shutdown_; + return is_shutdown_.load(std::memory_order_acquire); } } // namespace otlp