Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix LoggerContext::Shutdown and tsan of OtlpHttpClient #1592

Merged
merged 2 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -847,20 +847,20 @@ void OtlpHttpClient::ReleaseSession(
{
bool has_session = false;

{
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};
std::lock_guard<std::recursive_mutex> guard{session_manager_lock_};

auto session_iter = running_sessions_.find(&session);
if (session_iter != running_sessions_.end())
{
// Move session and handle into gc list, and they will be destroyed later
gc_sessions_.emplace_back(std::move(session_iter->second));
running_sessions_.erase(session_iter);
auto session_iter = running_sessions_.find(&session);
if (session_iter != running_sessions_.end())
{
// Move session and handle into gc list, and they will be destroyed later
gc_sessions_.emplace_back(std::move(session_iter->second));
running_sessions_.erase(session_iter);

has_session = true;
}
has_session = true;
}

// Call session_waker_.notify_all() with session_manager_lock_ locked to keep session_waker_
// available when destroying OtlpHttpClient
if (has_session)
{
session_waker_.notify_all();
Expand Down
6 changes: 4 additions & 2 deletions sdk/include/opentelemetry/sdk/logs/simple_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ class SimpleLogProcessor : public LogProcessor
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

bool IsShutdown() const noexcept;

private:
// The configured exporter
std::unique_ptr<LogExporter> exporter_;
// The lock used to ensure the exporter is not called concurrently
opentelemetry::common::SpinLockMutex lock_;
// The atomic boolean flag to ensure the ShutDown() function is only called once
std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT;
// The atomic boolean to ensure the ShutDown() function is only called once
std::atomic<bool> is_shutdown_;
};
} // namespace logs
} // namespace sdk
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/logs/logger_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ bool LoggerContext::ForceFlush(std::chrono::microseconds timeout) noexcept

bool LoggerContext::Shutdown(std::chrono::microseconds timeout) noexcept
{
return processor_->ForceFlush(timeout);
return processor_->Shutdown(timeout);
}

} // namespace logs
Expand Down
10 changes: 8 additions & 2 deletions sdk/src/logs/simple_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace logs
* @param exporter the configured exporter where log records are sent
*/
SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr<LogExporter> &&exporter)
: exporter_(std::move(exporter))
: exporter_(std::move(exporter)), is_shutdown_(false)
{}

std::unique_ptr<Recordable> SimpleLogProcessor::MakeRecordable() noexcept
Expand Down Expand Up @@ -51,13 +51,19 @@ bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
// Should only shutdown exporter ONCE.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire) && exporter_ != nullptr)
if (!is_shutdown_.exchange(true, std::memory_order_acq_rel) && exporter_ != nullptr)
{
return exporter_->Shutdown(timeout);
}

return true;
}

bool SimpleLogProcessor::IsShutdown() const noexcept
{
return is_shutdown_.load(std::memory_order_acquire);
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Expand Down
2 changes: 2 additions & 0 deletions sdk/test/logs/logger_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ TEST(LoggerProviderSDK, GetResource)
TEST(LoggerProviderSDK, Shutdown)
{
std::unique_ptr<SimpleLogProcessor> processor(new SimpleLogProcessor(nullptr));
SimpleLogProcessor *processor_ptr = processor.get();
std::vector<std::unique_ptr<LogProcessor>> processors;
processors.push_back(std::move(processor));

LoggerProvider lp(std::make_shared<LoggerContext>(std::move(processors)));

EXPECT_TRUE(lp.Shutdown());
EXPECT_TRUE(processor_ptr->IsShutdown());

// It's safe to shutdown again
EXPECT_TRUE(lp.Shutdown());
Expand Down