Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Add ForceFlush for all LogRecordExporters and SpanExporters. #2000

Merged
merged 9 commits into from
Mar 20, 2023
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Increment the:
[#2060](https://github.com/open-telemetry/opentelemetry-cpp/pull/2060)
* [BUILD] Restore detfault value of `OPENTELEMETRY_INSTALL` to `ON` when it's on
top level.[#2062](https://github.com/open-telemetry/opentelemetry-cpp/pull/2062)
* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
[#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000)

Important changes:

Expand All @@ -44,6 +46,12 @@ Important changes:
* As a result, a behavior change for GRPC SSL is possible,
because the endpoint scheme now takes precedence.
Please verify configuration settings for the GRPC endpoint.
* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
[#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000)
* `LogRecordExporter` and `SpanExporter` add a new virtual function
`ForceFlush`, and if users implement any customized `LogRecordExporter` and
`SpanExporter`, they should also implement this function.There should be no
influence if users only use factory to create exporters.

## [1.8.3] 2023-03-06

Expand Down
21 changes: 21 additions & 0 deletions examples/otlp/grpc_log_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
# include "opentelemetry/trace/provider.h"

// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
// running exportings when destroy and shutdown exporters.It's optional to users.
# include "opentelemetry/sdk/logs/logger_provider.h"
# include "opentelemetry/sdk/trace/tracer_provider.h"

# include <string>

# ifdef BAZEL_BUILD
Expand Down Expand Up @@ -42,6 +47,14 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}
Expand All @@ -59,6 +72,14 @@ void InitLogger()

void CleanupLogger()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<logs::LoggerProvider> provider =
logs::Provider::GetLoggerProvider();
if (provider)
{
static_cast<logs_sdk::LoggerProvider *>(provider.get())->ForceFlush();
}

nostd::shared_ptr<logs::LoggerProvider> none;
opentelemetry::logs::Provider::SetLoggerProvider(none);
}
Expand Down
12 changes: 12 additions & 0 deletions examples/otlp/grpc_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
#include "opentelemetry/trace/provider.h"

// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
// destroy and shutdown exporters.It's optional to users.
#include "opentelemetry/sdk/trace/tracer_provider.h"

#ifdef BAZEL_BUILD
# include "examples/common/foo_library/foo_library.h"
#else
Expand All @@ -32,6 +36,14 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}
Expand Down
53 changes: 46 additions & 7 deletions examples/otlp/http_log_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
# include "opentelemetry/trace/provider.h"

// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
// running exportings when destroy and shutdown exporters.It's optional to users.
# include "opentelemetry/sdk/logs/logger_provider.h"
# include "opentelemetry/sdk/trace/tracer_provider.h"

# include <iostream>
# include <string>

# ifdef BAZEL_BUILD
Expand All @@ -32,11 +38,27 @@ namespace internal_log = opentelemetry::sdk::common::internal_log;
namespace
{

opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts;
opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts;
void InitTracer()
{
if (trace_opts.url.size() > 9)
{
if (trace_opts.url.substr(trace_opts.url.size() - 8) == "/v1/logs")
{
trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 8) + "/v1/traces";
}
else if (trace_opts.url.substr(trace_opts.url.size() - 9) == "/v1/logs/")
{
trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 9) + "/v1/traces";
}
else
{
trace_opts.url = opentelemetry::exporter::otlp::GetOtlpDefaultHttpTracesEndpoint();
}
}
std::cout << "Using " << trace_opts.url << " to export trace spans." << std::endl;
// Create OTLP exporter instance
auto exporter = otlp::OtlpHttpExporterFactory::Create(opts);
auto exporter = otlp::OtlpHttpExporterFactory::Create(trace_opts);
auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter));
std::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace_sdk::TracerProviderFactory::Create(std::move(processor));
Expand All @@ -46,13 +68,22 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}

opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts;
void InitLogger()
{
std::cout << "Using " << logger_opts.url << " to export log records." << std::endl;
logger_opts.console_debug = true;
// Create OTLP exporter instance
auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts);
Expand All @@ -65,6 +96,14 @@ void InitLogger()

void CleanupLogger()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<logs::LoggerProvider> provider =
logs::Provider::GetLoggerProvider();
if (provider)
{
static_cast<logs_sdk::LoggerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<logs::LoggerProvider> none;
opentelemetry::logs::Provider::SetLoggerProvider(none);
}
Expand All @@ -83,26 +122,26 @@ int main(int argc, char *argv[])
{
if (argc > 1)
{
opts.url = argv[1];
trace_opts.url = argv[1];
logger_opts.url = argv[1];
if (argc > 2)
{
std::string debug = argv[2];
opts.console_debug = debug != "" && debug != "0" && debug != "no";
std::string debug = argv[2];
trace_opts.console_debug = debug != "" && debug != "0" && debug != "no";
}

if (argc > 3)
{
std::string binary_mode = argv[3];
if (binary_mode.size() >= 3 && binary_mode.substr(0, 3) == "bin")
{
opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
trace_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
logger_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
}
}
}

if (opts.console_debug)
if (trace_opts.console_debug)
{
internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Debug);
}
Expand Down
12 changes: 12 additions & 0 deletions examples/otlp/http_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
#include "opentelemetry/trace/provider.h"

// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
// destroy and shutdown exporters.It's optional to users.
#include "opentelemetry/sdk/trace/tracer_provider.h"

#include <string>

#ifdef BAZEL_BUILD
Expand Down Expand Up @@ -38,6 +42,14 @@ void InitTracer()

void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exportings, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
}

std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
# include "nlohmann/json.hpp"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/ext/http/client/http_client_factory.h"
# include "opentelemetry/nostd/shared_ptr.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/recordable.h"

# include <time.h>
# include <atomic>
# include <condition_variable>
# include <cstddef>
# include <iostream>
# include <mutex>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
Expand Down Expand Up @@ -90,6 +95,14 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

/**
* Force flush the exporter.
* @param timeout an option timeout, default to max.
* @return return true when all data are exported, and false when timeout
*/
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
Expand All @@ -108,6 +121,18 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
std::shared_ptr<ext::http::client::HttpClient> http_client_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;

# ifdef ENABLE_ASYNC_EXPORT
struct SynchronizationData
{
std::atomic<std::size_t> session_counter_;
std::atomic<std::size_t> finished_session_counter_;
std::condition_variable force_flush_cv;
std::mutex force_flush_cv_m;
std::recursive_mutex force_flush_m;
};
nostd::shared_ptr<SynchronizationData> synchronization_data_;
# endif
};
} // namespace logs
} // namespace exporter
Expand Down
70 changes: 64 additions & 6 deletions exporters/elasticsearch/src/es_log_record_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,19 @@ class AsyncResponseHandler : public http_client::EventHandler
# endif

ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter()
: options_{ElasticsearchExporterOptions()},
http_client_{ext::http::client::HttpClientFactory::Create()}
{}
: options_{ElasticsearchExporterOptions()}, http_client_
{
ext::http::client::HttpClientFactory::Create()
}
# ifdef ENABLE_ASYNC_EXPORT
, synchronization_data_(new SynchronizationData())
# endif
{
# ifdef ENABLE_ASYNC_EXPORT
synchronization_data_->finished_session_counter_.store(0);
synchronization_data_->session_counter_.store(0);
# endif
}

ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter(
const ElasticsearchExporterOptions &options)
Expand Down Expand Up @@ -343,10 +353,12 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(

# ifdef ENABLE_ASYNC_EXPORT
// Send the request
std::size_t span_count = records.size();
auto handler = std::make_shared<AsyncResponseHandler>(
synchronization_data_->session_counter_.fetch_add(1, std::memory_order_release);
std::size_t span_count = records.size();
auto synchronization_data = synchronization_data_;
auto handler = std::make_shared<AsyncResponseHandler>(
session,
[span_count](opentelemetry::sdk::common::ExportResult result) {
[span_count, synchronization_data](opentelemetry::sdk::common::ExportResult result) {
if (result != opentelemetry::sdk::common::ExportResult::kSuccess)
{
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] ERROR: Export "
Expand All @@ -358,6 +370,9 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
OTEL_INTERNAL_LOG_DEBUG("[ES Log Exporter] Export " << span_count
<< " trace span(s) success");
}

synchronization_data->finished_session_counter_.fetch_add(1, std::memory_order_release);
synchronization_data->force_flush_cv.notify_all();
return true;
},
options_.console_debug_);
Expand Down Expand Up @@ -401,6 +416,49 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
# endif
}

bool ElasticsearchLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept
{
# ifdef ENABLE_ASYNC_EXPORT
std::lock_guard<std::recursive_mutex> lock_guard{synchronization_data_->force_flush_m};
std::size_t running_counter =
synchronization_data_->session_counter_.load(std::memory_order_acquire);
// ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented
// in type 'long int' here. So we reset timeout to meet signed long int limit here.
timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());

std::chrono::steady_clock::duration timeout_steady =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(timeout);
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
{
timeout_steady = std::chrono::steady_clock::duration::max();
}

std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);
// Wait for all the sessions to finish
while (timeout_steady > std::chrono::steady_clock::duration::zero())
{
if (synchronization_data_->finished_session_counter_.load(std::memory_order_acquire) >=
running_counter)
{
break;
}

std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
if (std::cv_status::no_timeout != synchronization_data_->force_flush_cv.wait_for(
lk_cv, std::chrono::seconds{options_.response_timeout_}))
{
break;
}
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}

return timeout_steady > std::chrono::steady_clock::duration::zero();
# else
return true;
# endif
}

bool ElasticsearchLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
Expand Down
8 changes: 6 additions & 2 deletions exporters/etw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ target_include_directories(
set_target_properties(opentelemetry_exporter_etw PROPERTIES EXPORT_NAME
etw_exporter)

target_link_libraries(opentelemetry_exporter_etw
INTERFACE opentelemetry_api nlohmann_json::nlohmann_json)
target_link_libraries(
opentelemetry_exporter_etw INTERFACE opentelemetry_api opentelemetry_trace
nlohmann_json::nlohmann_json)
if(WITH_LOGS_PREVIEW)
target_link_libraries(opentelemetry_exporter_etw INTERFACE opentelemetry_logs)
endif()
lalitb marked this conversation as resolved.
Show resolved Hide resolved
if(nlohmann_json_clone)
add_dependencies(opentelemetry_exporter_etw nlohmann_json::nlohmann_json)
endif()
Expand Down
Loading