diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
index 5f49c9933d..534fb60743 100644
--- a/.github/workflows/stale.yml
+++ b/.github/workflows/stale.yml
@@ -7,7 +7,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- - uses: actions/stale@v7
+ - uses: actions/stale@v8
with:
stale-issue-message: "This issue was marked as stale due to lack of activity."
days-before-issue-stale: 60
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8e47f770e6..0311312b2d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,6 +21,10 @@ Increment the:
[#2060](https://github.com/open-telemetry/opentelemetry-cpp/pull/2060)
* [BUILD] Restore detfault value of `OPENTELEMETRY_INSTALL` to `ON` when it's on
top level.[#2062](https://github.com/open-telemetry/opentelemetry-cpp/pull/2062)
+* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
+ [#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000)
+* [SEMANTIC CONVENTIONS] Upgrade to version 1.19.0
+ [#2017](https://github.com/open-telemetry/opentelemetry-cpp/pull/2017)
Important changes:
@@ -44,6 +48,12 @@ Important changes:
* As a result, a behavior change for GRPC SSL is possible,
because the endpoint scheme now takes precedence.
Please verify configuration settings for the GRPC endpoint.
+* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
+ [#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000)
+ * `LogRecordExporter` and `SpanExporter` add a new virtual function
+ `ForceFlush`, and if users implement any customized `LogRecordExporter` and
+ `SpanExporter`, they should also implement this function.There should be no
+ influence if users only use factory to create exporters.
## [1.8.3] 2023-03-06
diff --git a/api/include/opentelemetry/trace/semantic_conventions.h b/api/include/opentelemetry/trace/semantic_conventions.h
index d0485704ef..051d865f86 100644
--- a/api/include/opentelemetry/trace/semantic_conventions.h
+++ b/api/include/opentelemetry/trace/semantic_conventions.h
@@ -21,7 +21,7 @@ namespace SemanticConventions
/**
* The URL of the OpenTelemetry schema for these keys and values.
*/
-static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.18.0";
+static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.19.0";
/**
* The type of the exception (its fully-qualified class name, if applicable). The dynamic type of
@@ -40,6 +40,38 @@ static constexpr const char *kExceptionMessage = "exception.message";
*/
static constexpr const char *kExceptionStacktrace = "exception.stacktrace";
+/**
+ * HTTP request method.
+ */
+static constexpr const char *kHttpMethod = "http.method";
+
+/**
+ * HTTP response status code.
+ */
+static constexpr const char *kHttpStatusCode = "http.status_code";
+
+/**
+ * Kind of HTTP protocol used.
+ */
+static constexpr const char *kHttpFlavor = "http.flavor";
+
+/**
+ * The URI scheme identifying the used protocol.
+ */
+static constexpr const char *kHttpScheme = "http.scheme";
+
+/**
+ * The matched route (path template in the format used by the respective server framework). See note
+below
+ *
+ *
Notes:
+
- MUST NOT be populated when this is not supported by the HTTP server framework as the
+route attribute should have low-cardinality and the URI path can NOT substitute it. SHOULD include
+the application
+root if there is one.
+ */
+static constexpr const char *kHttpRoute = "http.route";
+
/**
* The name identifies the event.
*/
@@ -59,7 +91,7 @@ static constexpr const char *kEventDomain = "event.domain";
Lambda-Runtime-Invoked-Function-Arn} header on the {@code /runtime/invocation/next} applicable).
*
* Notes:
-
- This may be different from {@code faas.id} if an alias is involved.
+ - This may be different from {@code cloud.resource_id} if an alias is involved.
*/
static constexpr const char *kAwsLambdaInvokedArn = "aws.lambda.invoked_arn";
@@ -255,7 +287,7 @@ static constexpr const char *kOtelStatusCode = "otel.status_code";
static constexpr const char *kOtelStatusDescription = "otel.status_description";
/**
- * Type of the trigger which caused this function execution.
+ * Type of the trigger which caused this function invocation.
*
* Notes:
- For the server/consumer span on the incoming side,
@@ -268,9 +300,9 @@ lambda, which is often HTTP).
static constexpr const char *kFaasTrigger = "faas.trigger";
/**
- * The execution ID of the current function execution.
+ * The invocation ID of the current function invocation.
*/
-static constexpr const char *kFaasExecution = "faas.execution";
+static constexpr const char *kFaasInvocationId = "faas.invocation_id";
/**
* The name of the source on which the triggering operation was performed. For example, in Cloud
@@ -343,6 +375,30 @@ static constexpr const char *kFaasInvokedProvider = "faas.invoked_provider";
*/
static constexpr const char *kFaasInvokedRegion = "faas.invoked_region";
+/**
+ * The unique identifier of the feature flag.
+ */
+static constexpr const char *kFeatureFlagKey = "feature_flag.key";
+
+/**
+ * The name of the service provider that performs the flag evaluation.
+ */
+static constexpr const char *kFeatureFlagProviderName = "feature_flag.provider_name";
+
+/**
+ * SHOULD be a semantic identifier for a value. If one is unavailable, a stringified version of the
+value can be used.
+ *
+ * Notes:
+
- A semantic identifier, commonly referred to as a variant, provides a means
+for referring to a value without including the value itself. This can
+provide additional context for understanding the meaning behind a value.
+For example, the variant {@code red} maybe be used for the value {@code #c05543}.
- A
+stringified version of the value can be used in situations where a semantic identifier is
+unavailable. String representation of the value should be determined by the implementer.
+ */
+static constexpr const char *kFeatureFlagVariant = "feature_flag.variant";
+
/**
* Transport protocol used. See note below.
*/
@@ -521,31 +577,6 @@ static constexpr const char *kCodeLineno = "code.lineno";
*/
static constexpr const char *kCodeColumn = "code.column";
-/**
- * HTTP request method.
- */
-static constexpr const char *kHttpMethod = "http.method";
-
-/**
- * HTTP response status code.
- */
-static constexpr const char *kHttpStatusCode = "http.status_code";
-
-/**
- * Kind of HTTP protocol used.
- *
- * Notes:
-
- If {@code net.transport} is not specified, it can be assumed to be {@code IP.TCP} except
- if {@code http.flavor} is {@code QUIC}, in which case {@code IP.UDP} is assumed.
- */
-static constexpr const char *kHttpFlavor = "http.flavor";
-
-/**
- * Value of the HTTP
- * User-Agent header sent by the client.
- */
-static constexpr const char *kHttpUserAgent = "http.user_agent";
-
/**
* The size of the request payload body in bytes. This is the number of bytes transferred excluding
* headers and is often, but not always, present as the Notes:
- - MUST NOT be populated when this is not supported by the HTTP server framework as the
-route attribute should have low-cardinality and the URI path can NOT substitute it. SHOULD include
-the application root if there is one.
- */
-static constexpr const char *kHttpRoute = "http.route";
-
/**
* The IP address of the original client behind all proxies, if known (e.g. from X-Forwarded-For).
@@ -1054,7 +1069,79 @@ static constexpr const char *kRpcJsonrpcErrorCode = "rpc.jsonrpc.error_code";
*/
static constexpr const char *kRpcJsonrpcErrorMessage = "rpc.jsonrpc.error_message";
+/**
+ * Whether this is a received or sent message.
+ */
+static constexpr const char *kMessageType = "message.type";
+
+/**
+ * MUST be calculated as two different counters starting from {@code 1} one for sent messages and
+ one for received message.
+ *
+ * Notes:
+
- This way we guarantee that the values will be consistent between different
+ implementations.
+ */
+static constexpr const char *kMessageId = "message.id";
+
+/**
+ * Compressed size of the message in bytes.
+ */
+static constexpr const char *kMessageCompressedSize = "message.compressed_size";
+
+/**
+ * Uncompressed size of the message in bytes.
+ */
+static constexpr const char *kMessageUncompressedSize = "message.uncompressed_size";
+
+/**
+ * The error codes of the Connect
+ * request. Error codes are always string values.
+ */
+static constexpr const char *kRpcConnectRpcErrorCode = "rpc.connect_rpc.error_code";
+
+/**
+ * SHOULD be set to true if the exception event is recorded at a point where it is known that the
+exception is escaping the scope of the span.
+ *
+ * Notes:
+
- An exception is considered to have escaped (or left) the scope of a span,
+if that span is ended while the exception is still logically "in flight".
+This may be actually "in flight" in some languages (e.g. if the exception
+is passed to a Context manager's {@code __exit__} method in Python) but will
+usually be caught at the point of recording the exception in most languages.
- It is usually
+not possible to determine at the point where an exception is thrown whether it will escape the scope
+of a span. However, it is trivial to know that an exception will escape, if one checks for an active
+exception just before ending the span, as done in the example
+above.
- It follows that an exception may still escape the scope of the span even if the
+{@code exception.escaped} attribute was not set or set to false, since the event might have been
+recorded at a time where it was not clear whether the exception will escape.
+ */
+static constexpr const char *kExceptionEscaped = "exception.escaped";
+
+/**
+ * Value of the HTTP
+ * User-Agent header sent by the client.
+ */
+static constexpr const char *kUserAgentOriginal = "user_agent.original";
+
// Enum definitions
+namespace HttpFlavorValues
+{
+/** HTTP/1.0. */
+static constexpr const char *kHttp10 = "1.0";
+/** HTTP/1.1. */
+static constexpr const char *kHttp11 = "1.1";
+/** HTTP/2. */
+static constexpr const char *kHttp20 = "2.0";
+/** HTTP/3. */
+static constexpr const char *kHttp30 = "3.0";
+/** SPDY protocol. */
+static constexpr const char *kSpdy = "SPDY";
+/** QUIC protocol. */
+static constexpr const char *kQuic = "QUIC";
+} // namespace HttpFlavorValues
+
namespace EventDomainValues
{
/** Events from browser apps. */
@@ -1336,22 +1423,6 @@ static constexpr const char *kNrnsa = "nrnsa";
static constexpr const char *kLteCa = "lte_ca";
} // namespace NetHostConnectionSubtypeValues
-namespace HttpFlavorValues
-{
-/** HTTP/1.0. */
-static constexpr const char *kHttp10 = "1.0";
-/** HTTP/1.1. */
-static constexpr const char *kHttp11 = "1.1";
-/** HTTP/2. */
-static constexpr const char *kHttp20 = "2.0";
-/** HTTP/3. */
-static constexpr const char *kHttp30 = "3.0";
-/** SPDY protocol. */
-static constexpr const char *kSpdy = "SPDY";
-/** QUIC protocol. */
-static constexpr const char *kQuic = "QUIC";
-} // namespace HttpFlavorValues
-
namespace GraphqlOperationTypeValues
{
/** GraphQL query. */
@@ -1418,6 +1489,8 @@ static constexpr const char *kJavaRmi = "java_rmi";
static constexpr const char *kDotnetWcf = "dotnet_wcf";
/** Apache Dubbo. */
static constexpr const char *kApacheDubbo = "apache_dubbo";
+/** Connect RPC. */
+static constexpr const char *kConnectRpc = "connect_rpc";
} // namespace RpcSystemValues
namespace RpcGrpcStatusCodeValues
@@ -1458,6 +1531,50 @@ static constexpr const int kDataLoss = 15;
static constexpr const int kUnauthenticated = 16;
} // namespace RpcGrpcStatusCodeValues
+namespace MessageTypeValues
+{
+/** sent. */
+static constexpr const char *kSent = "SENT";
+/** received. */
+static constexpr const char *kReceived = "RECEIVED";
+} // namespace MessageTypeValues
+
+namespace RpcConnectRpcErrorCodeValues
+{
+/** cancelled. */
+static constexpr const char *kCancelled = "cancelled";
+/** unknown. */
+static constexpr const char *kUnknown = "unknown";
+/** invalid_argument. */
+static constexpr const char *kInvalidArgument = "invalid_argument";
+/** deadline_exceeded. */
+static constexpr const char *kDeadlineExceeded = "deadline_exceeded";
+/** not_found. */
+static constexpr const char *kNotFound = "not_found";
+/** already_exists. */
+static constexpr const char *kAlreadyExists = "already_exists";
+/** permission_denied. */
+static constexpr const char *kPermissionDenied = "permission_denied";
+/** resource_exhausted. */
+static constexpr const char *kResourceExhausted = "resource_exhausted";
+/** failed_precondition. */
+static constexpr const char *kFailedPrecondition = "failed_precondition";
+/** aborted. */
+static constexpr const char *kAborted = "aborted";
+/** out_of_range. */
+static constexpr const char *kOutOfRange = "out_of_range";
+/** unimplemented. */
+static constexpr const char *kUnimplemented = "unimplemented";
+/** internal. */
+static constexpr const char *kInternal = "internal";
+/** unavailable. */
+static constexpr const char *kUnavailable = "unavailable";
+/** data_loss. */
+static constexpr const char *kDataLoss = "data_loss";
+/** unauthenticated. */
+static constexpr const char *kUnauthenticated = "unauthenticated";
+} // namespace RpcConnectRpcErrorCodeValues
+
} // namespace SemanticConventions
} // namespace trace
OPENTELEMETRY_END_NAMESPACE
diff --git a/buildscripts/semantic-convention/generate.sh b/buildscripts/semantic-convention/generate.sh
index ee9d3ae1a4..22f556c486 100755
--- a/buildscripts/semantic-convention/generate.sh
+++ b/buildscripts/semantic-convention/generate.sh
@@ -15,10 +15,10 @@ ROOT_DIR="${SCRIPT_DIR}/../../"
# freeze the spec & generator tools versions to make SemanticAttributes generation reproducible
# repository: https://github.com/open-telemetry/opentelemetry-specification
-SEMCONV_VERSION=1.18.0
+SEMCONV_VERSION=1.19.0
# repository: https://github.com/open-telemetry/build-tools
-GENERATOR_VERSION=0.15.1
+GENERATOR_VERSION=0.18.0
SPEC_VERSION=v$SEMCONV_VERSION
SCHEMA_URL=https://opentelemetry.io/schemas/$SEMCONV_VERSION
@@ -46,7 +46,7 @@ docker run --rm \
-v ${SCRIPT_DIR}/templates:/templates \
-v ${ROOT_DIR}/api/include/opentelemetry/trace/:/output \
otel/semconvgen:$GENERATOR_VERSION \
- --only span \
+ --only span,event,attribute_group,scope \
-f /source code \
--template /templates/SemanticAttributes.h.j2 \
--output /output/semantic_conventions.h \
diff --git a/examples/otlp/grpc_log_main.cc b/examples/otlp/grpc_log_main.cc
index d6d9669a35..e0c85c38c2 100644
--- a/examples/otlp/grpc_log_main.cc
+++ b/examples/otlp/grpc_log_main.cc
@@ -11,6 +11,11 @@
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
# include "opentelemetry/trace/provider.h"
+// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
+// running exportings when destroy and shutdown exporters.It's optional to users.
+# include "opentelemetry/sdk/logs/logger_provider.h"
+# include "opentelemetry/sdk/trace/tracer_provider.h"
+
# include
# ifdef BAZEL_BUILD
@@ -42,6 +47,14 @@ void InitTracer()
void CleanupTracer()
{
+ // We call ForceFlush to prevent to cancel running exportings, It's optional.
+ opentelemetry::nostd::shared_ptr provider =
+ trace::Provider::GetTracerProvider();
+ if (provider)
+ {
+ static_cast(provider.get())->ForceFlush();
+ }
+
std::shared_ptr none;
trace::Provider::SetTracerProvider(none);
}
@@ -59,6 +72,14 @@ void InitLogger()
void CleanupLogger()
{
+ // We call ForceFlush to prevent to cancel running exportings, It's optional.
+ opentelemetry::nostd::shared_ptr provider =
+ logs::Provider::GetLoggerProvider();
+ if (provider)
+ {
+ static_cast(provider.get())->ForceFlush();
+ }
+
nostd::shared_ptr none;
opentelemetry::logs::Provider::SetLoggerProvider(none);
}
diff --git a/examples/otlp/grpc_main.cc b/examples/otlp/grpc_main.cc
index 679f0f0fd4..d7e3b29d6c 100644
--- a/examples/otlp/grpc_main.cc
+++ b/examples/otlp/grpc_main.cc
@@ -6,6 +6,10 @@
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
#include "opentelemetry/trace/provider.h"
+// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
+// destroy and shutdown exporters.It's optional to users.
+#include "opentelemetry/sdk/trace/tracer_provider.h"
+
#ifdef BAZEL_BUILD
# include "examples/common/foo_library/foo_library.h"
#else
@@ -32,6 +36,14 @@ void InitTracer()
void CleanupTracer()
{
+ // We call ForceFlush to prevent to cancel running exportings, It's optional.
+ opentelemetry::nostd::shared_ptr provider =
+ trace::Provider::GetTracerProvider();
+ if (provider)
+ {
+ static_cast(provider.get())->ForceFlush();
+ }
+
std::shared_ptr none;
trace::Provider::SetTracerProvider(none);
}
diff --git a/examples/otlp/http_log_main.cc b/examples/otlp/http_log_main.cc
index faddae58ad..a1171b04e0 100644
--- a/examples/otlp/http_log_main.cc
+++ b/examples/otlp/http_log_main.cc
@@ -13,6 +13,12 @@
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
# include "opentelemetry/trace/provider.h"
+// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
+// running exportings when destroy and shutdown exporters.It's optional to users.
+# include "opentelemetry/sdk/logs/logger_provider.h"
+# include "opentelemetry/sdk/trace/tracer_provider.h"
+
+# include
# include
# ifdef BAZEL_BUILD
@@ -32,11 +38,27 @@ namespace internal_log = opentelemetry::sdk::common::internal_log;
namespace
{
-opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts;
+opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts;
void InitTracer()
{
+ if (trace_opts.url.size() > 9)
+ {
+ if (trace_opts.url.substr(trace_opts.url.size() - 8) == "/v1/logs")
+ {
+ trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 8) + "/v1/traces";
+ }
+ else if (trace_opts.url.substr(trace_opts.url.size() - 9) == "/v1/logs/")
+ {
+ trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 9) + "/v1/traces";
+ }
+ else
+ {
+ trace_opts.url = opentelemetry::exporter::otlp::GetOtlpDefaultHttpTracesEndpoint();
+ }
+ }
+ std::cout << "Using " << trace_opts.url << " to export trace spans." << std::endl;
// Create OTLP exporter instance
- auto exporter = otlp::OtlpHttpExporterFactory::Create(opts);
+ auto exporter = otlp::OtlpHttpExporterFactory::Create(trace_opts);
auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter));
std::shared_ptr provider =
trace_sdk::TracerProviderFactory::Create(std::move(processor));
@@ -46,6 +68,14 @@ void InitTracer()
void CleanupTracer()
{
+ // We call ForceFlush to prevent to cancel running exportings, It's optional.
+ opentelemetry::nostd::shared_ptr provider =
+ trace::Provider::GetTracerProvider();
+ if (provider)
+ {
+ static_cast(provider.get())->ForceFlush();
+ }
+
std::shared_ptr none;
trace::Provider::SetTracerProvider(none);
}
@@ -53,6 +83,7 @@ void CleanupTracer()
opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts;
void InitLogger()
{
+ std::cout << "Using " << logger_opts.url << " to export log records." << std::endl;
logger_opts.console_debug = true;
// Create OTLP exporter instance
auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts);
@@ -65,6 +96,14 @@ void InitLogger()
void CleanupLogger()
{
+ // We call ForceFlush to prevent to cancel running exportings, It's optional.
+ opentelemetry::nostd::shared_ptr provider =
+ logs::Provider::GetLoggerProvider();
+ if (provider)
+ {
+ static_cast(provider.get())->ForceFlush();
+ }
+
std::shared_ptr none;
opentelemetry::logs::Provider::SetLoggerProvider(none);
}
@@ -83,12 +122,12 @@ int main(int argc, char *argv[])
{
if (argc > 1)
{
- opts.url = argv[1];
+ trace_opts.url = argv[1];
logger_opts.url = argv[1];
if (argc > 2)
{
- std::string debug = argv[2];
- opts.console_debug = debug != "" && debug != "0" && debug != "no";
+ std::string debug = argv[2];
+ trace_opts.console_debug = debug != "" && debug != "0" && debug != "no";
}
if (argc > 3)
@@ -96,13 +135,13 @@ int main(int argc, char *argv[])
std::string binary_mode = argv[3];
if (binary_mode.size() >= 3 && binary_mode.substr(0, 3) == "bin")
{
- opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
+ trace_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
logger_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
}
}
}
- if (opts.console_debug)
+ if (trace_opts.console_debug)
{
internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Debug);
}
diff --git a/examples/otlp/http_main.cc b/examples/otlp/http_main.cc
index 3eb482c6a4..09b8daeee2 100644
--- a/examples/otlp/http_main.cc
+++ b/examples/otlp/http_main.cc
@@ -8,6 +8,10 @@
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
#include "opentelemetry/trace/provider.h"
+// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
+// destroy and shutdown exporters.It's optional to users.
+#include "opentelemetry/sdk/trace/tracer_provider.h"
+
#include
#ifdef BAZEL_BUILD
@@ -38,6 +42,14 @@ void InitTracer()
void CleanupTracer()
{
+ // We call ForceFlush to prevent to cancel running exportings, It's optional.
+ opentelemetry::nostd::shared_ptr provider =
+ trace::Provider::GetTracerProvider();
+ if (provider)
+ {
+ static_cast(provider.get())->ForceFlush();
+ }
+
std::shared_ptr none;
trace::Provider::SetTracerProvider(none);
}
diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h
index 7a52810df0..06b4c3b727 100644
--- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h
+++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h
@@ -7,12 +7,17 @@
# include "nlohmann/json.hpp"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/ext/http/client/http_client_factory.h"
+# include "opentelemetry/nostd/shared_ptr.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/recordable.h"
# include
+# include
+# include
+# include
# include
+# include
OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
@@ -90,6 +95,14 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
const opentelemetry::nostd::span>
&records) noexcept override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
@@ -108,6 +121,18 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
std::shared_ptr http_client_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
+
+# ifdef ENABLE_ASYNC_EXPORT
+ struct SynchronizationData
+ {
+ std::atomic session_counter_;
+ std::atomic finished_session_counter_;
+ std::condition_variable force_flush_cv;
+ std::mutex force_flush_cv_m;
+ std::recursive_mutex force_flush_m;
+ };
+ nostd::shared_ptr synchronization_data_;
+# endif
};
} // namespace logs
} // namespace exporter
diff --git a/exporters/elasticsearch/src/es_log_record_exporter.cc b/exporters/elasticsearch/src/es_log_record_exporter.cc
index c440be685c..60a5d8d781 100644
--- a/exporters/elasticsearch/src/es_log_record_exporter.cc
+++ b/exporters/elasticsearch/src/es_log_record_exporter.cc
@@ -290,9 +290,19 @@ class AsyncResponseHandler : public http_client::EventHandler
# endif
ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter()
- : options_{ElasticsearchExporterOptions()},
- http_client_{ext::http::client::HttpClientFactory::Create()}
-{}
+ : options_{ElasticsearchExporterOptions()}, http_client_
+{
+ ext::http::client::HttpClientFactory::Create()
+}
+# ifdef ENABLE_ASYNC_EXPORT
+, synchronization_data_(new SynchronizationData())
+# endif
+{
+# ifdef ENABLE_ASYNC_EXPORT
+ synchronization_data_->finished_session_counter_.store(0);
+ synchronization_data_->session_counter_.store(0);
+# endif
+}
ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter(
const ElasticsearchExporterOptions &options)
@@ -343,10 +353,12 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
# ifdef ENABLE_ASYNC_EXPORT
// Send the request
- std::size_t span_count = records.size();
- auto handler = std::make_shared(
+ synchronization_data_->session_counter_.fetch_add(1, std::memory_order_release);
+ std::size_t span_count = records.size();
+ auto synchronization_data = synchronization_data_;
+ auto handler = std::make_shared(
session,
- [span_count](opentelemetry::sdk::common::ExportResult result) {
+ [span_count, synchronization_data](opentelemetry::sdk::common::ExportResult result) {
if (result != opentelemetry::sdk::common::ExportResult::kSuccess)
{
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] ERROR: Export "
@@ -358,6 +370,9 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
OTEL_INTERNAL_LOG_DEBUG("[ES Log Exporter] Export " << span_count
<< " trace span(s) success");
}
+
+ synchronization_data->finished_session_counter_.fetch_add(1, std::memory_order_release);
+ synchronization_data->force_flush_cv.notify_all();
return true;
},
options_.console_debug_);
@@ -401,6 +416,49 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
# endif
}
+bool ElasticsearchLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept
+{
+# ifdef ENABLE_ASYNC_EXPORT
+ std::lock_guard lock_guard{synchronization_data_->force_flush_m};
+ std::size_t running_counter =
+ synchronization_data_->session_counter_.load(std::memory_order_acquire);
+ // ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented
+ // in type 'long int' here. So we reset timeout to meet signed long int limit here.
+ timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
+ timeout, std::chrono::microseconds::zero());
+
+ std::chrono::steady_clock::duration timeout_steady =
+ std::chrono::duration_cast(timeout);
+ if (timeout_steady <= std::chrono::steady_clock::duration::zero())
+ {
+ timeout_steady = std::chrono::steady_clock::duration::max();
+ }
+
+ std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m);
+ // Wait for all the sessions to finish
+ while (timeout_steady > std::chrono::steady_clock::duration::zero())
+ {
+ if (synchronization_data_->finished_session_counter_.load(std::memory_order_acquire) >=
+ running_counter)
+ {
+ break;
+ }
+
+ std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
+ if (std::cv_status::no_timeout != synchronization_data_->force_flush_cv.wait_for(
+ lk_cv, std::chrono::seconds{options_.response_timeout_}))
+ {
+ break;
+ }
+ timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
+ }
+
+ return timeout_steady > std::chrono::steady_clock::duration::zero();
+# else
+ return true;
+# endif
+}
+
bool ElasticsearchLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
{
const std::lock_guard locked(lock_);
diff --git a/exporters/etw/CMakeLists.txt b/exporters/etw/CMakeLists.txt
index 15ab6e8ebb..b74a1b2cb5 100644
--- a/exporters/etw/CMakeLists.txt
+++ b/exporters/etw/CMakeLists.txt
@@ -11,8 +11,12 @@ target_include_directories(
set_target_properties(opentelemetry_exporter_etw PROPERTIES EXPORT_NAME
etw_exporter)
-target_link_libraries(opentelemetry_exporter_etw
- INTERFACE opentelemetry_api nlohmann_json::nlohmann_json)
+target_link_libraries(
+ opentelemetry_exporter_etw INTERFACE opentelemetry_api opentelemetry_trace
+ nlohmann_json::nlohmann_json)
+if(WITH_LOGS_PREVIEW)
+ target_link_libraries(opentelemetry_exporter_etw INTERFACE opentelemetry_logs)
+endif()
if(nlohmann_json_clone)
add_dependencies(opentelemetry_exporter_etw nlohmann_json::nlohmann_json)
endif()
diff --git a/exporters/jaeger/BUILD b/exporters/jaeger/BUILD
index c042535561..6f0e030b4d 100644
--- a/exporters/jaeger/BUILD
+++ b/exporters/jaeger/BUILD
@@ -189,6 +189,7 @@ cc_library(
deps = [
":jaeger_exporter",
"//sdk/src/common:global_log_handler",
+ "//sdk/src/trace",
],
)
diff --git a/exporters/jaeger/CMakeLists.txt b/exporters/jaeger/CMakeLists.txt
index 62d2961324..a02fa7c3e5 100644
--- a/exporters/jaeger/CMakeLists.txt
+++ b/exporters/jaeger/CMakeLists.txt
@@ -43,7 +43,8 @@ target_include_directories(
target_link_libraries(
opentelemetry_exporter_jaeger_trace
- PUBLIC opentelemetry_resources opentelemetry_http_client_curl
+ PUBLIC opentelemetry_resources opentelemetry_trace
+ opentelemetry_http_client_curl
PRIVATE thrift::thrift)
if(MSVC)
diff --git a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h
index b8571199a1..eac6dc6940 100644
--- a/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h
+++ b/exporters/jaeger/include/opentelemetry/exporters/jaeger/jaeger_exporter.h
@@ -47,6 +47,14 @@ class OPENTELEMETRY_DEPRECATED JaegerExporter final : public opentelemetry::sdk:
const nostd::span> &spans) noexcept
override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Shutdown the exporter.
* @param timeout an option timeout, default to max.
diff --git a/exporters/jaeger/src/jaeger_exporter.cc b/exporters/jaeger/src/jaeger_exporter.cc
index 8e9e75bd73..7f413caf30 100644
--- a/exporters/jaeger/src/jaeger_exporter.cc
+++ b/exporters/jaeger/src/jaeger_exporter.cc
@@ -98,6 +98,11 @@ void JaegerExporter::InitializeEndpoint()
assert(false);
}
+bool JaegerExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept
+{
+ return true;
+}
+
bool JaegerExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
{
const std::lock_guard locked(lock_);
diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h
index f5e400946f..34806bd200 100644
--- a/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h
+++ b/exporters/ostream/include/opentelemetry/exporters/ostream/log_record_exporter.h
@@ -39,6 +39,14 @@ class OStreamLogRecordExporter final : public opentelemetry::sdk::logs::LogRecor
const opentelemetry::nostd::span> &records) noexcept
override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Marks the OStream Log Exporter as shut down.
*/
diff --git a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h
index 985e05715a..baa4e860b1 100644
--- a/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h
+++ b/exporters/ostream/include/opentelemetry/exporters/ostream/span_exporter.h
@@ -38,6 +38,14 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
const opentelemetry::nostd::span>
&spans) noexcept override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
diff --git a/exporters/ostream/src/log_record_exporter.cc b/exporters/ostream/src/log_record_exporter.cc
index 0b1093ab9d..751de71e2c 100644
--- a/exporters/ostream/src/log_record_exporter.cc
+++ b/exporters/ostream/src/log_record_exporter.cc
@@ -113,6 +113,12 @@ sdk::common::ExportResult OStreamLogRecordExporter::Export(
return sdk::common::ExportResult::kSuccess;
}
+bool OStreamLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept
+{
+ sout_.flush();
+ return true;
+}
+
bool OStreamLogRecordExporter::Shutdown(std::chrono::microseconds) noexcept
{
const std::lock_guard locked(lock_);
diff --git a/exporters/ostream/src/span_exporter.cc b/exporters/ostream/src/span_exporter.cc
index 79be26b311..6a88c32235 100644
--- a/exporters/ostream/src/span_exporter.cc
+++ b/exporters/ostream/src/span_exporter.cc
@@ -98,6 +98,12 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
return sdk::common::ExportResult::kSuccess;
}
+bool OStreamSpanExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept
+{
+ sout_.flush();
+ return true;
+}
+
bool OStreamSpanExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
{
const std::lock_guard locked(lock_);
diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h
index 2868bf7211..7aff1e24a5 100644
--- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h
+++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_exporter.h
@@ -52,6 +52,14 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
sdk::common::ExportResult Export(
const nostd::span> &spans) noexcept override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h
index 05224550a3..becb308ca2 100644
--- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h
+++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h
@@ -55,6 +55,14 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo
const nostd::span> &records) noexcept
override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return.
diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h
index 673e0d0bb3..9188858fc1 100644
--- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h
+++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter.h
@@ -53,6 +53,14 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span> &spans) noexcept
override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h
index 2250cfe831..035b046b72 100644
--- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h
+++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h
@@ -53,6 +53,14 @@ class OtlpHttpLogRecordExporter final : public opentelemetry::sdk::logs::LogReco
const nostd::span> &records) noexcept
override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Shutdown this exporter.
* @param timeout The maximum time to wait for the shutdown method to return
diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h
index 036cb0ae7f..dc674ce9eb 100644
--- a/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h
+++ b/exporters/otlp/include/opentelemetry/exporters/otlp/protobuf_include_prefix.h
@@ -22,6 +22,9 @@
# pragma warning(disable : 4267)
# pragma warning(disable : 4668)
# pragma warning(disable : 4946)
+# pragma warning(disable : 6001)
+# pragma warning(disable : 6244)
+# pragma warning(disable : 6246)
#endif
#if defined(__GNUC__) && !defined(__clang__) && !defined(__apple_build_version__)
@@ -30,9 +33,34 @@
# endif
# pragma GCC diagnostic ignored "-Wunused-parameter"
# pragma GCC diagnostic ignored "-Wtype-limits"
+# pragma GCC diagnostic ignored "-Wsign-compare"
+# pragma GCC diagnostic ignored "-Wsign-conversion"
+# pragma GCC diagnostic ignored "-Wshadow"
+# pragma GCC diagnostic ignored "-Wuninitialized"
+# pragma GCC diagnostic ignored "-Wconversion"
+# if (__GNUC__ * 100 + __GNUC_MINOR__) >= 409
+# pragma GCC diagnostic ignored "-Wfloat-conversion"
+# endif
+# if (__GNUC__ * 100 + __GNUC_MINOR__) >= 501
+# pragma GCC diagnostic ignored "-Wsuggest-override"
+# endif
#elif defined(__clang__) || defined(__apple_build_version__)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wunused-parameter"
# pragma clang diagnostic ignored "-Wtype-limits"
# pragma clang diagnostic ignored "-Wshadow-field"
+# pragma clang diagnostic ignored "-Wsign-compare"
+# pragma clang diagnostic ignored "-Wsign-conversion"
+# pragma clang diagnostic ignored "-Wshadow"
+# pragma clang diagnostic ignored "-Wuninitialized"
+# pragma clang diagnostic ignored "-Wconversion"
+# if ((__clang_major__ * 100) + __clang_minor__) >= 305
+# pragma clang diagnostic ignored "-Wfloat-conversion"
+# endif
+# if ((__clang_major__ * 100) + __clang_minor__) >= 306
+# pragma clang diagnostic ignored "-Winconsistent-missing-override"
+# endif
+# if ((__clang_major__ * 100) + __clang_minor__) >= 1100
+# pragma clang diagnostic ignored "-Wsuggest-override"
+# endif
#endif
diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc
index c22ca5a54d..30a9b63fe4 100644
--- a/exporters/otlp/src/otlp_grpc_exporter.cc
+++ b/exporters/otlp/src/otlp_grpc_exporter.cc
@@ -71,6 +71,13 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
return sdk::common::ExportResult::kSuccess;
}
+bool OtlpGrpcExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept
+{
+ // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the
+ // running exporting finished here.
+ return true;
+}
+
bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
{
const std::lock_guard locked(lock_);
diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc
index 6f6bb38b65..6cef1d07b6 100644
--- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc
+++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc
@@ -90,6 +90,13 @@ bool OtlpGrpcLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */
return true;
}
+bool OtlpGrpcLogRecordExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept
+{
+ // TODO: When we implement async exporting in OTLP gRPC exporter in the future, we need wait the
+ // running exporting finished here.
+ return true;
+}
+
bool OtlpGrpcLogRecordExporter::isShutdown() const noexcept
{
const std::lock_guard locked(lock_);
diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc
index 58cff24c1e..24beafa194 100644
--- a/exporters/otlp/src/otlp_http_client.cc
+++ b/exporters/otlp/src/otlp_http_client.cc
@@ -793,34 +793,40 @@ bool OtlpHttpClient::ForceFlush(std::chrono::microseconds timeout) noexcept
// Wait for all the sessions to finish
std::unique_lock lock(session_waker_lock_);
- if (timeout <= std::chrono::microseconds::zero())
+
+ std::chrono::steady_clock::duration timeout_steady =
+ std::chrono::duration_cast(timeout);
+ if (timeout_steady <= std::chrono::steady_clock::duration::zero())
+ {
+ timeout_steady = std::chrono::steady_clock::duration::max();
+ }
+
+ while (timeout_steady > std::chrono::steady_clock::duration::zero())
{
- while (true)
{
+ std::lock_guard guard{session_manager_lock_};
+ if (running_sessions_.empty())
{
- std::lock_guard guard{session_manager_lock_};
- if (running_sessions_.empty())
- {
- break;
- }
- }
- // When changes of running_sessions_ and notify_one/notify_all happen between predicate
- // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon
- // as possible to call FinishSession() and cleanup resources.
- if (std::cv_status::timeout == session_waker_.wait_for(lock, options_.timeout))
- {
- cleanupGCSessions();
+ break;
}
}
- return true;
- }
- else
- {
- return session_waker_.wait_for(lock, timeout, [this] {
- std::lock_guard guard{session_manager_lock_};
- return running_sessions_.empty();
- });
+ // When changes of running_sessions_ and notify_one/notify_all happen between predicate
+ // checking and waiting, we should not wait forever.We should cleanup gc sessions here as soon
+ // as possible to call FinishSession() and cleanup resources.
+ std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
+ if (std::cv_status::timeout == session_waker_.wait_for(lock, options_.timeout))
+ {
+ cleanupGCSessions();
+ }
+ else
+ {
+ break;
+ }
+
+ timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}
+
+ return timeout_steady > std::chrono::steady_clock::duration::zero();
}
bool OtlpHttpClient::Shutdown(std::chrono::microseconds timeout) noexcept
diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc
index 78b587e766..852fee7e1f 100644
--- a/exporters/otlp/src/otlp_http_exporter.cc
+++ b/exporters/otlp/src/otlp_http_exporter.cc
@@ -115,6 +115,11 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export(
#endif
}
+bool OtlpHttpExporter::ForceFlush(std::chrono::microseconds timeout) noexcept
+{
+ return http_client_->ForceFlush(timeout);
+}
+
bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
return http_client_->Shutdown(timeout);
diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc
index 81bc37a340..98925773c5 100644
--- a/exporters/otlp/src/otlp_http_log_record_exporter.cc
+++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc
@@ -118,6 +118,11 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export(
# endif
}
+bool OtlpHttpLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept
+{
+ return http_client_->ForceFlush(timeout);
+}
+
bool OtlpHttpLogRecordExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
return http_client_->Shutdown(timeout);
diff --git a/exporters/zipkin/CMakeLists.txt b/exporters/zipkin/CMakeLists.txt
index 68836cc8d3..9cc997023f 100644
--- a/exporters/zipkin/CMakeLists.txt
+++ b/exporters/zipkin/CMakeLists.txt
@@ -7,6 +7,11 @@ add_library(
opentelemetry_exporter_zipkin_trace
src/zipkin_exporter.cc src/zipkin_exporter_factory.cc src/recordable.cc)
+target_include_directories(
+ opentelemetry_exporter_zipkin_trace
+ PUBLIC "$"
+ "$")
+
target_link_libraries(
opentelemetry_exporter_zipkin_trace
PUBLIC opentelemetry_trace opentelemetry_http_client_curl
diff --git a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h
index cb2d7b6954..8c66a251a4 100644
--- a/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h
+++ b/exporters/zipkin/include/opentelemetry/exporters/zipkin/zipkin_exporter.h
@@ -49,6 +49,14 @@ class ZipkinExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span> &spans) noexcept
override;
+ /**
+ * Force flush the exporter.
+ * @param timeout an option timeout, default to max.
+ * @return return true when all data are exported, and false when timeout
+ */
+ bool ForceFlush(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
+
/**
* Shut down the exporter.
* @param timeout an optional timeout, default to max.
diff --git a/exporters/zipkin/src/zipkin_exporter.cc b/exporters/zipkin/src/zipkin_exporter.cc
index 802ecccffc..b3f6fcfe61 100644
--- a/exporters/zipkin/src/zipkin_exporter.cc
+++ b/exporters/zipkin/src/zipkin_exporter.cc
@@ -110,6 +110,11 @@ void ZipkinExporter::InitializeLocalEndpoint()
local_end_point_["port"] = url_parser_.port_;
}
+bool ZipkinExporter::ForceFlush(std::chrono::microseconds /* timeout */) noexcept
+{
+ return true;
+}
+
bool ZipkinExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
{
const std::lock_guard locked(lock_);
diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h
index 0621316dcd..d13e8782f1 100644
--- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h
+++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h
@@ -117,6 +117,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor
std::atomic is_force_wakeup_background_worker;
std::atomic is_force_flush_pending;
std::atomic is_force_flush_notified;
+ std::atomic force_flush_timeout_us;
std::atomic is_shutdown;
};
@@ -128,6 +129,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor
* @param synchronization_data Synchronization data to be notified.
*/
static void NotifyCompletion(bool notify_force_flush,
+ const std::unique_ptr &exporter,
const std::shared_ptr &synchronization_data);
void GetWaitAdjustedTime(std::chrono::microseconds &timeout,
diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h
index a531577aef..069fd8784c 100644
--- a/sdk/include/opentelemetry/sdk/logs/exporter.h
+++ b/sdk/include/opentelemetry/sdk/logs/exporter.h
@@ -20,10 +20,11 @@ namespace logs
/**
* LogRecordExporter defines the interface that log exporters must implement.
*/
-class LogRecordExporter
+class OPENTELEMETRY_EXPORT LogRecordExporter
{
public:
- virtual ~LogRecordExporter() = default;
+ LogRecordExporter();
+ virtual ~LogRecordExporter();
/**
* Create a log recordable. This object will be used to record log data and
@@ -47,6 +48,12 @@ class LogRecordExporter
virtual sdk::common::ExportResult Export(
const nostd::span> &records) noexcept = 0;
+ /**
+ * Force flush the log records pushed into this log exporter.
+ */
+ virtual bool ForceFlush(
+ std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;
+
/**
* Marks the exporter as ShutDown and cleans up any resources as required.
* Shutdown should be called only once for each Exporter instance.
diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h
index ecb5cc7e72..83773ffb61 100644
--- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h
+++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h
@@ -63,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader
std::thread worker_thread_;
/* Synchronization primitives */
- std::condition_variable cv_;
- std::mutex cv_m_;
+ std::atomic is_force_flush_pending_;
+ std::atomic is_force_wakeup_background_worker_;
+ std::atomic is_force_flush_notified_;
+ std::condition_variable cv_, force_flush_cv_;
+ std::mutex cv_m_, force_flush_m_;
};
} // namespace metrics
diff --git a/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h b/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h
index 27aff268cc..895aa720d3 100644
--- a/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h
+++ b/sdk/include/opentelemetry/sdk/resource/semantic_conventions.h
@@ -23,7 +23,7 @@ namespace SemanticConventions
/**
* The URL of the OpenTelemetry schema for these keys and values.
*/
-static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.18.0";
+static constexpr const char *kSchemaUrl = "https://opentelemetry.io/schemas/1.19.0";
/**
* Array of brand name and version separated by a space
@@ -62,16 +62,6 @@ static constexpr const char *kBrowserPlatform = "browser.platform";
*/
static constexpr const char *kBrowserMobile = "browser.mobile";
-/**
- * Full user-agent string provided by the browser
- *
- * Notes:
-
- The user-agent value SHOULD be provided only from browsers that do not have a mechanism
- to retrieve brands and platform individually from the User-Agent Client Hints API. To retrieve the
- value, the legacy {@code navigator.userAgent} API can be used.
- */
-static constexpr const char *kBrowserUserAgent = "browser.user_agent";
-
/**
* Preferred language of the user using the browser
*
@@ -100,11 +90,41 @@ static constexpr const char *kCloudAccountId = "cloud.account.id";
href="https://aws.amazon.com/about-aws/global-infrastructure/regions_az/">AWS regions, Azure regions, Google Cloud regions, or Tencent Cloud regions.
-
+ href="https://www.tencentcloud.com/document/product/213/6091">Tencent Cloud regions.
*/
static constexpr const char *kCloudRegion = "cloud.region";
+/**
+ * Cloud provider-specific native identifier of the monitored cloud resource (e.g. an ARN on AWS, a
+fully qualified
+resource ID on Azure, a full resource name
+on GCP)
+ *
+ * Notes:
+
- On some cloud providers, it may not be possible to determine the full ID at startup,
+so it may be necessary to set {@code cloud.resource_id} as a span attribute instead.
- The
+exact value to use for {@code cloud.resource_id} depends on the cloud provider. The following
+well-known definitions MUST be used if you set this attribute and they apply:
- AWS
+Lambda: The function ARN. Take care
+not to use the "invoked ARN" directly but replace any alias suffix with
+the resolved function version, as the same runtime instance may be invokable with multiple different
+aliases.
- GCP: The URI of the resource
+- Azure: The Fully Qualified
+Resource ID of the invoked function, not the function app, having the form
+{@code
+/subscriptions//resourceGroups//providers/Microsoft.Web/sites//functions/}.
+This means that a span attribute MUST be used, as an Azure function app can host multiple functions
+that would usually share a TracerProvider.
+
+ */
+static constexpr const char *kCloudResourceId = "cloud.resource_id";
+
/**
* Cloud regions often have multiple, isolated locations known as zones to increase availability.
Availability zone represents the zone where the resource is running.
@@ -201,6 +221,21 @@ static constexpr const char *kAwsLogStreamNames = "aws.log.stream.names";
*/
static constexpr const char *kAwsLogStreamArns = "aws.log.stream.arns";
+/**
+ * Time and date the release was created
+ */
+static constexpr const char *kHerokuReleaseCreationTimestamp = "heroku.release.creation_timestamp";
+
+/**
+ * Commit hash for the current release
+ */
+static constexpr const char *kHerokuReleaseCommit = "heroku.release.commit";
+
+/**
+ * Unique identifier for the application
+ */
+static constexpr const char *kHerokuAppId = "heroku.app.id";
+
/**
* Container name used by container runtime.
*/
@@ -295,35 +330,11 @@ providers/products:Azure: The full name {@code
+the {@code cloud.resource_id} attribute).
*/
static constexpr const char *kFaasName = "faas.name";
-/**
- * The unique ID of the single function that this runtime instance executes.
- *
- * Notes:
-
- On some cloud providers, it may not be possible to determine the full ID at startup,
-so consider setting {@code faas.id} as a span attribute instead.
- The exact value to use for
-{@code faas.id} depends on the cloud provider:
- AWS Lambda: The function ARN. Take care
-not to use the "invoked ARN" directly but replace any alias suffix with
-the resolved function version, as the same runtime instance may be invokable with multiple different
-aliases.
- GCP: The URI of the resource
-- Azure: The Fully Qualified
-Resource ID of the invoked function, not the function app, having the form
-{@code
-/subscriptions//resourceGroups//providers/Microsoft.Web/sites//functions/}.
-This means that a span attribute MUST be used, as an Azure function app can host multiple functions
-that would usually share a TracerProvider.
-
- */
-static constexpr const char *kFaasId = "faas.id";
-
/**
* The immutable version of the function being executed.
*
@@ -352,19 +363,20 @@ static constexpr const char *kFaasVersion = "faas.version";
static constexpr const char *kFaasInstance = "faas.instance";
/**
- * The amount of memory available to the serverless function in MiB.
+ * The amount of memory available to the serverless function converted to Bytes.
*
* Notes:
- It's recommended to set this attribute since e.g. too little memory can easily stop a
Java AWS Lambda function from working correctly. On AWS Lambda, the environment variable {@code
- AWS_LAMBDA_FUNCTION_MEMORY_SIZE} provides this information.
+ AWS_LAMBDA_FUNCTION_MEMORY_SIZE} provides this information (which must be multiplied by
+ 1,048,576).
*/
static constexpr const char *kFaasMaxMemory = "faas.max_memory";
/**
* Unique host ID. For Cloud, this must be the instance_id assigned by the cloud provider. For
- * non-containerized Linux systems, the {@code machine-id} located in {@code /etc/machine-id} or
- * {@code /var/lib/dbus/machine-id} may be used.
+ * non-containerized systems, this should be the {@code machine-id}. See the table below for the
+ * sources to use to determine the {@code machine-id} based on operating system.
*/
static constexpr const char *kHostId = "host.id";
@@ -703,6 +715,8 @@ static constexpr const char *kAws = "aws";
static constexpr const char *kAzure = "azure";
/** Google Cloud Platform. */
static constexpr const char *kGcp = "gcp";
+/** Heroku Platform as a Service. */
+static constexpr const char *kHeroku = "heroku";
/** IBM Cloud. */
static constexpr const char *kIbmCloud = "ibm_cloud";
/** Tencent Cloud. */
diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
index 309d849bc7..60e93d8a9f 100644
--- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
+++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
@@ -115,6 +115,7 @@ class BatchSpanProcessor : public SpanProcessor
std::atomic is_force_wakeup_background_worker;
std::atomic is_force_flush_pending;
std::atomic is_force_flush_notified;
+ std::atomic force_flush_timeout_us;
std::atomic is_shutdown;
};
@@ -126,6 +127,7 @@ class BatchSpanProcessor : public SpanProcessor
* @param synchronization_data Synchronization data to be notified.
*/
static void NotifyCompletion(bool notify_force_flush,
+ const std::unique_ptr &exporter,
const std::shared_ptr &synchronization_data);
void GetWaitAdjustedTime(std::chrono::microseconds &timeout,
diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h
index 3f9bce9b84..ffacca2b35 100644
--- a/sdk/include/opentelemetry/sdk/trace/exporter.h
+++ b/sdk/include/opentelemetry/sdk/trace/exporter.h
@@ -20,7 +20,8 @@ namespace trace
class OPENTELEMETRY_EXPORT SpanExporter
{
public:
- virtual ~SpanExporter() = default;
+ SpanExporter();
+ virtual ~SpanExporter();
/**
* Create a span recordable. This object will be used to record span data and
@@ -42,6 +43,15 @@ class OPENTELEMETRY_EXPORT SpanExporter
const nostd::span>
&spans) noexcept = 0;
+ /**
+ * Export all spans that have been exported.
+ * @param timeout an optional timeout, the default timeout of 0 means that no
+ * timeout is applied.
+ * @return return true when all data are exported, and false when timeout
+ */
+ virtual bool ForceFlush(
+ std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;
+
/**
* Shut down the exporter.
* @param timeout an optional timeout.
diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h
index 6796a10848..671218d0cf 100644
--- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h
+++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h
@@ -55,7 +55,16 @@ class SimpleSpanProcessor : public SpanProcessor
}
}
- bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return true; }
+ bool ForceFlush(
+ std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
+ {
+ if (exporter_ != nullptr)
+ {
+ return exporter_->ForceFlush(timeout);
+ }
+
+ return true;
+ }
bool Shutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
diff --git a/sdk/include/opentelemetry/sdk/trace/tracer_context.h b/sdk/include/opentelemetry/sdk/trace/tracer_context.h
index 573af20b7d..6dcdda8e01 100644
--- a/sdk/include/opentelemetry/sdk/trace/tracer_context.h
+++ b/sdk/include/opentelemetry/sdk/trace/tracer_context.h
@@ -87,7 +87,7 @@ class TracerContext
/**
* Shutdown the span processor associated with this tracer provider.
*/
- bool Shutdown() noexcept;
+ bool Shutdown(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;
private:
// order of declaration is important here - resource object should be destroyed after processor.
diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt
index cbaea426e6..2b7db6eb7c 100644
--- a/sdk/src/logs/CMakeLists.txt
+++ b/sdk/src/logs/CMakeLists.txt
@@ -6,6 +6,7 @@ add_library(
logger_provider.cc
logger_provider_factory.cc
logger.cc
+ exporter.cc
event_logger_provider.cc
event_logger_provider_factory.cc
event_logger.cc
diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc
index 7903c3b24c..6b1db1c666 100644
--- a/sdk/src/logs/batch_log_record_processor.cc
+++ b/sdk/src/logs/batch_log_record_processor.cc
@@ -46,6 +46,7 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptris_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
+ synchronization_data_->force_flush_timeout_us.store(0);
synchronization_data_->is_shutdown.store(false);
}
@@ -88,6 +89,7 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m);
synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release);
+ synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release);
auto break_condition = [this]() {
if (synchronization_data_->is_shutdown.load() == true)
{
@@ -106,23 +108,24 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
// Fix timeout to meet requirement of wait_for
timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
- bool result;
- if (timeout <= std::chrono::microseconds::zero())
+
+ std::chrono::steady_clock::duration timeout_steady =
+ std::chrono::duration_cast(timeout);
+ if (timeout_steady <= std::chrono::steady_clock::duration::zero())
{
- bool wait_result = false;
- while (!wait_result)
- {
- // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
- // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
- // for ever
- wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_,
- break_condition);
- }
- result = true;
+ timeout_steady = std::chrono::steady_clock::duration::max();
}
- else
+
+ bool result = false;
+ while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
- result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition);
+ // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
+ // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
+ // for ever
+ std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
+ result = synchronization_data_->force_flush_cv.wait_for(lk_cv, scheduled_delay_millis_,
+ break_condition);
+ timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}
// If it's already signaled, we must wait util notified.
@@ -202,7 +205,7 @@ void BatchLogRecordProcessor::Export()
if (num_records_to_export == 0)
{
- NotifyCompletion(notify_force_flush, synchronization_data_);
+ NotifyCompletion(notify_force_flush, exporter_, synchronization_data_);
break;
}
@@ -218,12 +221,13 @@ void BatchLogRecordProcessor::Export()
exporter_->Export(
nostd::span>(records_arr.data(), records_arr.size()));
- NotifyCompletion(notify_force_flush, synchronization_data_);
+ NotifyCompletion(notify_force_flush, exporter_, synchronization_data_);
} while (true);
}
void BatchLogRecordProcessor::NotifyCompletion(
bool notify_force_flush,
+ const std::unique_ptr &exporter,
const std::shared_ptr &synchronization_data)
{
if (!synchronization_data)
@@ -233,6 +237,14 @@ void BatchLogRecordProcessor::NotifyCompletion(
if (notify_force_flush)
{
+ if (exporter)
+ {
+ std::chrono::microseconds timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
+ std::chrono::microseconds{
+ synchronization_data->force_flush_timeout_us.load(std::memory_order_acquire)},
+ std::chrono::microseconds::zero());
+ exporter->ForceFlush(timeout);
+ }
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
synchronization_data->force_flush_cv.notify_one();
}
diff --git a/sdk/src/logs/exporter.cc b/sdk/src/logs/exporter.cc
new file mode 100644
index 0000000000..20fdb6b25c
--- /dev/null
+++ b/sdk/src/logs/exporter.cc
@@ -0,0 +1,27 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+#ifdef ENABLE_LOGS_PREVIEW
+
+# include "opentelemetry/sdk/logs/exporter.h"
+
+OPENTELEMETRY_BEGIN_NAMESPACE
+namespace sdk
+{
+namespace logs
+{
+
+OPENTELEMETRY_EXPORT LogRecordExporter::LogRecordExporter() {}
+
+OPENTELEMETRY_EXPORT LogRecordExporter::~LogRecordExporter() {}
+
+OPENTELEMETRY_EXPORT bool LogRecordExporter::ForceFlush(
+ std::chrono::microseconds /*timeout*/) noexcept
+{
+ return true;
+}
+
+} // namespace logs
+} // namespace sdk
+OPENTELEMETRY_END_NAMESPACE
+#endif
diff --git a/sdk/src/logs/simple_log_record_processor.cc b/sdk/src/logs/simple_log_record_processor.cc
index ef923bc590..50d4c20fdd 100644
--- a/sdk/src/logs/simple_log_record_processor.cc
+++ b/sdk/src/logs/simple_log_record_processor.cc
@@ -43,8 +43,12 @@ void SimpleLogRecordProcessor::OnEmit(std::unique_ptr &&record) noex
/**
* The simple processor does not have any log records to flush so this method is not used
*/
-bool SimpleLogRecordProcessor::ForceFlush(std::chrono::microseconds /* timeout */) noexcept
+bool SimpleLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
+ if (exporter_ != nullptr)
+ {
+ return exporter_->ForceFlush(timeout);
+ }
return true;
}
diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc
index bcca86bee0..dfa4a5ee6b 100644
--- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc
+++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc
@@ -60,14 +60,15 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
- cv_.wait_for(lk, remaining_wait_interval_ms);
+ cv_.wait_for(lk, remaining_wait_interval_ms, [this]() {
+ if (is_force_wakeup_background_worker_.load(std::memory_order_acquire))
+ {
+ is_force_wakeup_background_worker_.store(false, std::memory_order_release);
+ return true;
+ }
+ return IsShutdown();
+ });
} while (IsShutdown() != true);
- // One last Collect and Export before shutdown
- auto status = CollectAndExportOnce();
- if (!status)
- {
- OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.")
- }
}
bool PeriodicExportingMetricReader::CollectAndExportOnce()
@@ -86,6 +87,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
return true;
});
});
+
std::future_status status;
do
{
@@ -96,12 +98,93 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce()
break;
}
} while (status != std::future_status::ready);
+ bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel);
+ if (notify_force_flush)
+ {
+ is_force_flush_notified_.store(true, std::memory_order_release);
+ force_flush_cv_.notify_one();
+ }
+
return true;
}
bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
- return exporter_->ForceFlush(timeout);
+ std::unique_lock lk_cv(force_flush_m_);
+ is_force_flush_pending_.store(true, std::memory_order_release);
+ auto break_condition = [this]() {
+ if (IsShutdown())
+ {
+ return true;
+ }
+
+ // Wake up the worker thread once.
+ if (is_force_flush_pending_.load(std::memory_order_acquire))
+ {
+ is_force_wakeup_background_worker_.store(true, std::memory_order_release);
+ cv_.notify_one();
+ }
+ return is_force_flush_notified_.load(std::memory_order_acquire);
+ };
+
+ auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
+ timeout, std::chrono::microseconds::zero());
+ std::chrono::steady_clock::duration timeout_steady =
+ std::chrono::duration_cast(wait_timeout);
+ if (timeout_steady <= std::chrono::steady_clock::duration::zero())
+ {
+ timeout_steady = std::chrono::steady_clock::duration::max();
+ }
+
+ bool result = false;
+ while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
+ {
+ // When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called
+ // between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait
+ // for ever
+ std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
+ result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition);
+ timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
+ }
+
+ // If it will be already signaled, we must wait until notified.
+ // We use a spin lock here
+ if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel))
+ {
+ for (int retry_waiting_times = 0;
+ false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times)
+ {
+ opentelemetry::common::SpinLockMutex::fast_yield();
+ if ((retry_waiting_times & 127) == 127)
+ {
+ std::this_thread::yield();
+ }
+ }
+ }
+ is_force_flush_notified_.store(false, std::memory_order_release);
+
+ if (result)
+ {
+ // - If original `timeout` is `zero`, use that in exporter::forceflush
+ // - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush
+ // - Else don't invoke exporter::forceflush ( as remaining time is zero or less)
+ if (timeout <= std::chrono::steady_clock::duration::zero())
+ {
+ result =
+ exporter_->ForceFlush(std::chrono::duration_cast(timeout));
+ }
+ else if (timeout_steady > std::chrono::steady_clock::duration::zero())
+ {
+ result = exporter_->ForceFlush(
+ std::chrono::duration_cast(timeout_steady));
+ }
+ else
+ {
+ // remaining timeout_steady is zero or less
+ result = false;
+ }
+ }
+ return result;
}
bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
diff --git a/sdk/src/trace/CMakeLists.txt b/sdk/src/trace/CMakeLists.txt
index 7f757f036c..94c2059548 100644
--- a/sdk/src/trace/CMakeLists.txt
+++ b/sdk/src/trace/CMakeLists.txt
@@ -9,6 +9,7 @@ add_library(
tracer_provider_factory.cc
tracer.cc
span.cc
+ exporter.cc
batch_span_processor.cc
batch_span_processor_factory.cc
simple_processor_factory.cc
diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc
index de6c457da7..da9f39f054 100644
--- a/sdk/src/trace/batch_span_processor.cc
+++ b/sdk/src/trace/batch_span_processor.cc
@@ -30,6 +30,7 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter,
synchronization_data_->is_force_wakeup_background_worker.store(false);
synchronization_data_->is_force_flush_pending.store(false);
synchronization_data_->is_force_flush_notified.store(false);
+ synchronization_data_->force_flush_timeout_us.store(0);
synchronization_data_->is_shutdown.store(false);
}
@@ -77,6 +78,7 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
std::unique_lock lk_cv(synchronization_data_->force_flush_cv_m);
synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release);
+ synchronization_data_->force_flush_timeout_us.store(timeout.count(), std::memory_order_release);
auto break_condition = [this]() {
if (synchronization_data_->is_shutdown.load() == true)
{
@@ -97,23 +99,23 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
// Fix timeout to meet requirement of wait_for
timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
- bool result;
- if (timeout <= std::chrono::microseconds::zero())
+ std::chrono::steady_clock::duration timeout_steady =
+ std::chrono::duration_cast(timeout);
+ if (timeout_steady <= std::chrono::steady_clock::duration::zero())
{
- bool wait_result = false;
- while (!wait_result)
- {
- // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
- // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
- // for ever
- wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_,
- break_condition);
- }
- result = true;
+ timeout_steady = std::chrono::steady_clock::duration::max();
}
- else
+
+ bool result = false;
+ while (!result && timeout_steady > std::chrono::steady_clock::duration::zero())
{
- result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition);
+ // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
+ // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
+ // for ever
+ std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
+ result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_,
+ break_condition);
+ timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
}
// If it will be already signaled, we must wait util notified.
@@ -192,7 +194,7 @@ void BatchSpanProcessor::Export()
if (num_records_to_export == 0)
{
- NotifyCompletion(notify_force_flush, synchronization_data_);
+ NotifyCompletion(notify_force_flush, exporter_, synchronization_data_);
break;
}
buffer_.Consume(num_records_to_export,
@@ -206,12 +208,13 @@ void BatchSpanProcessor::Export()
});
exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size()));
- NotifyCompletion(notify_force_flush, synchronization_data_);
+ NotifyCompletion(notify_force_flush, exporter_, synchronization_data_);
} while (true);
}
void BatchSpanProcessor::NotifyCompletion(
bool notify_force_flush,
+ const std::unique_ptr &exporter,
const std::shared_ptr &synchronization_data)
{
if (!synchronization_data)
@@ -221,6 +224,15 @@ void BatchSpanProcessor::NotifyCompletion(
if (notify_force_flush)
{
+ if (exporter)
+ {
+ std::chrono::microseconds timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
+ std::chrono::microseconds{
+ synchronization_data->force_flush_timeout_us.load(std::memory_order_acquire)},
+ std::chrono::microseconds::zero());
+ exporter->ForceFlush(timeout);
+ }
+
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
synchronization_data->force_flush_cv.notify_one();
}
diff --git a/sdk/src/trace/exporter.cc b/sdk/src/trace/exporter.cc
new file mode 100644
index 0000000000..5984583ba5
--- /dev/null
+++ b/sdk/src/trace/exporter.cc
@@ -0,0 +1,23 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+#include "opentelemetry/sdk/trace/exporter.h"
+
+OPENTELEMETRY_BEGIN_NAMESPACE
+namespace sdk
+{
+namespace trace
+{
+
+OPENTELEMETRY_EXPORT SpanExporter::SpanExporter() {}
+
+OPENTELEMETRY_EXPORT SpanExporter::~SpanExporter() {}
+
+OPENTELEMETRY_EXPORT bool SpanExporter::ForceFlush(std::chrono::microseconds /*timeout*/) noexcept
+{
+ return true;
+}
+
+} // namespace trace
+} // namespace sdk
+OPENTELEMETRY_END_NAMESPACE
diff --git a/sdk/src/trace/tracer.cc b/sdk/src/trace/tracer.cc
index 74e16bdfe0..07fd36d5b7 100644
--- a/sdk/src/trace/tracer.cc
+++ b/sdk/src/trace/tracer.cc
@@ -104,12 +104,22 @@ nostd::shared_ptr Tracer::StartSpan(
void Tracer::ForceFlushWithMicroseconds(uint64_t timeout) noexcept
{
- (void)timeout;
+ if (context_)
+ {
+ context_->ForceFlush(
+ std::chrono::microseconds{static_cast(timeout)});
+ }
}
void Tracer::CloseWithMicroseconds(uint64_t timeout) noexcept
{
- (void)timeout;
+ // Trace context is shared by many tracers.So we just call ForceFlush to flush all pending spans
+ // and do not shutdown it.
+ if (context_)
+ {
+ context_->ForceFlush(
+ std::chrono::microseconds{static_cast(timeout)});
+ }
}
} // namespace trace
} // namespace sdk
diff --git a/sdk/src/trace/tracer_context.cc b/sdk/src/trace/tracer_context.cc
index d204218021..0b89c5cefb 100644
--- a/sdk/src/trace/tracer_context.cc
+++ b/sdk/src/trace/tracer_context.cc
@@ -53,9 +53,9 @@ bool TracerContext::ForceFlush(std::chrono::microseconds timeout) noexcept
return processor_->ForceFlush(timeout);
}
-bool TracerContext::Shutdown() noexcept
+bool TracerContext::Shutdown(std::chrono::microseconds timeout) noexcept
{
- return processor_->Shutdown();
+ return processor_->Shutdown(timeout);
}
} // namespace trace
diff --git a/sdk/test/logs/batch_log_record_processor_test.cc b/sdk/test/logs/batch_log_record_processor_test.cc
index de3beb66fe..462e9b8397 100644
--- a/sdk/test/logs/batch_log_record_processor_test.cc
+++ b/sdk/test/logs/batch_log_record_processor_test.cc
@@ -70,10 +70,12 @@ class MockLogExporter final : public LogRecordExporter
{
public:
MockLogExporter(std::shared_ptr>> logs_received,
+ std::shared_ptr> force_flush_counter,
std::shared_ptr> is_shutdown,
std::shared_ptr> is_export_completed,
const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0))
: logs_received_(logs_received),
+ force_flush_counter_(force_flush_counter),
is_shutdown_(is_shutdown),
is_export_completed_(is_export_completed),
export_delay_(export_delay)
@@ -109,6 +111,12 @@ class MockLogExporter final : public LogRecordExporter
return ExportResult::kSuccess;
}
+ bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override
+ {
+ ++(*force_flush_counter_);
+ return true;
+ }
+
// toggles the boolean flag marking this exporter as shut down
bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override
{
@@ -118,6 +126,7 @@ class MockLogExporter final : public LogRecordExporter
private:
std::shared_ptr>> logs_received_;
+ std::shared_ptr> force_flush_counter_;
std::shared_ptr> is_shutdown_;
std::shared_ptr> is_export_completed_;
const std::chrono::milliseconds export_delay_;
@@ -134,6 +143,7 @@ class BatchLogRecordProcessorTest : public testing::Test // ::testing::Test
// is_shutdown flag, and the processor configuration options (default if unspecified)
std::shared_ptr GetMockProcessor(
std::shared_ptr>> logs_received,
+ std::shared_ptr> force_flush_counter,
std::shared_ptr> is_shutdown,
std::shared_ptr> is_export_completed =
std::shared_ptr>(new std::atomic(false)),
@@ -143,8 +153,8 @@ class BatchLogRecordProcessorTest : public testing::Test // ::testing::Test
const size_t max_export_batch_size = 512)
{
return std::shared_ptr(new BatchLogRecordProcessor(
- std::unique_ptr(
- new MockLogExporter(logs_received, is_shutdown, is_export_completed, export_delay)),
+ std::unique_ptr(new MockLogExporter(
+ logs_received, force_flush_counter, is_shutdown, is_export_completed, export_delay)),
max_queue_size, scheduled_delay_millis, max_export_batch_size));
}
};
@@ -154,9 +164,10 @@ TEST_F(BatchLogRecordProcessorTest, TestShutdown)
// initialize a batch log processor with the test exporter
std::shared_ptr>> logs_received(
new std::vector>);
+ std::shared_ptr> force_flush_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
- auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
+ auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown);
// Create a few test log records and send them to the processor
const int num_logs = 3;
@@ -189,11 +200,12 @@ TEST_F(BatchLogRecordProcessorTest, TestShutdown)
TEST_F(BatchLogRecordProcessorTest, TestForceFlush)
{
+ std::shared_ptr> force_flush_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr>> logs_received(
new std::vector>);
- auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
+ auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown);
const int num_logs = 2048;
for (int i = 0; i < num_logs; ++i)
@@ -204,6 +216,7 @@ TEST_F(BatchLogRecordProcessorTest, TestForceFlush)
}
EXPECT_TRUE(batch_processor->ForceFlush());
+ EXPECT_GT(force_flush_counter->load(), 0);
EXPECT_EQ(num_logs, logs_received->size());
for (int i = 0; i < num_logs; ++i)
@@ -219,7 +232,9 @@ TEST_F(BatchLogRecordProcessorTest, TestForceFlush)
batch_processor->OnEmit(std::move(log));
}
+ std::size_t force_flush_counter_before = force_flush_counter->load();
EXPECT_TRUE(batch_processor->ForceFlush());
+ EXPECT_GT(force_flush_counter->load(), force_flush_counter_before);
EXPECT_EQ(num_logs * 2, logs_received->size());
for (int i = 0; i < num_logs * 2; ++i)
@@ -232,13 +247,14 @@ TEST_F(BatchLogRecordProcessorTest, TestManyLogsLoss)
{
/* Test that when exporting more than max_queue_size logs, some are most likely lost*/
+ std::shared_ptr> force_flush_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr>> logs_received(
new std::vector>);
const int max_queue_size = 4096;
- auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
+ auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown);
// Create max_queue_size log records
for (int i = 0; i < max_queue_size; ++i)
@@ -258,10 +274,11 @@ TEST_F(BatchLogRecordProcessorTest, TestManyLogsLossLess)
{
/* Test that no logs are lost when sending max_queue_size logs */
+ std::shared_ptr> force_flush_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr>> logs_received(
new std::vector>);
- auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
+ auto batch_processor = GetMockProcessor(logs_received, force_flush_counter, is_shutdown);
const int num_logs = 2048;
@@ -286,6 +303,7 @@ TEST_F(BatchLogRecordProcessorTest, TestScheduledDelayMillis)
/* Test that max_export_batch_size logs are exported every scheduled_delay_millis
seconds */
+ std::shared_ptr> force_flush_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr> is_export_completed(new std::atomic(false));
std::shared_ptr>> logs_received(
@@ -295,8 +313,9 @@ TEST_F(BatchLogRecordProcessorTest, TestScheduledDelayMillis)
const std::chrono::milliseconds scheduled_delay_millis(2000);
const size_t max_export_batch_size = 512;
- auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed,
- export_delay, scheduled_delay_millis);
+ auto batch_processor =
+ GetMockProcessor(logs_received, force_flush_counter, is_shutdown, is_export_completed,
+ export_delay, scheduled_delay_millis);
for (std::size_t i = 0; i < max_export_batch_size; ++i)
{
diff --git a/sdk/test/logs/simple_log_record_processor_test.cc b/sdk/test/logs/simple_log_record_processor_test.cc
index 275991dc10..2d377cd924 100644
--- a/sdk/test/logs/simple_log_record_processor_test.cc
+++ b/sdk/test/logs/simple_log_record_processor_test.cc
@@ -69,10 +69,12 @@ class TestLogRecordable final : public opentelemetry::sdk::logs::Recordable
class TestExporter final : public LogRecordExporter
{
public:
- TestExporter(int *shutdown_counter,
+ TestExporter(int *force_flush_counter,
+ int *shutdown_counter,
std::shared_ptr>> logs_received,
size_t *batch_size_received)
- : shutdown_counter_(shutdown_counter),
+ : force_flush_counter_(force_flush_counter),
+ shutdown_counter_(shutdown_counter),
logs_received_(logs_received),
batch_size_received(batch_size_received)
{}
@@ -99,14 +101,27 @@ class TestExporter final : public LogRecordExporter
return ExportResult::kSuccess;
}
+ bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override
+ {
+ if (nullptr != force_flush_counter_)
+ {
+ ++(*force_flush_counter_);
+ }
+ return true;
+ }
+
// Increment the shutdown counter everytime this method is called
bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override
{
- *shutdown_counter_ += 1;
+ if (nullptr != shutdown_counter_)
+ {
+ *shutdown_counter_ += 1;
+ }
return true;
}
private:
+ int *force_flush_counter_;
int *shutdown_counter_;
std::shared_ptr>> logs_received_;
size_t *batch_size_received;
@@ -122,7 +137,7 @@ TEST(SimpleLogRecordProcessorTest, SendReceivedLogsToExporter)
size_t batch_size_received = 0;
std::unique_ptr exporter(
- new TestExporter(nullptr, logs_received, &batch_size_received));
+ new TestExporter(nullptr, nullptr, logs_received, &batch_size_received));
SimpleLogRecordProcessor processor(std::move(exporter));
@@ -152,7 +167,8 @@ TEST(SimpleLogRecordProcessorTest, ShutdownCalledOnce)
// Create a TestExporter
int num_shutdowns = 0;
- std::unique_ptr exporter(new TestExporter(&num_shutdowns, nullptr, nullptr));
+ std::unique_ptr exporter(
+ new TestExporter(nullptr, &num_shutdowns, nullptr, nullptr));
// Create a processor with the previous test exporter
SimpleLogRecordProcessor processor(std::move(exporter));
@@ -167,11 +183,28 @@ TEST(SimpleLogRecordProcessorTest, ShutdownCalledOnce)
EXPECT_EQ(1, num_shutdowns);
}
+TEST(SimpleLogRecordProcessorTest, ForceFlush)
+{
+ // Create a TestExporter
+ int num_force_flush = 0;
+
+ std::unique_ptr exporter(
+ new TestExporter(&num_force_flush, nullptr, nullptr, nullptr));
+
+ // Create a processor with the previous test exporter
+ SimpleLogRecordProcessor processor(std::move(exporter));
+
+ // The first time processor shutdown is called
+ EXPECT_EQ(0, num_force_flush);
+ EXPECT_EQ(true, processor.ForceFlush());
+ EXPECT_EQ(1, num_force_flush);
+}
+
// A test exporter that always returns failure when shut down
-class FailShutDownExporter final : public LogRecordExporter
+class FailShutDownForceFlushExporter final : public LogRecordExporter
{
public:
- FailShutDownExporter() {}
+ FailShutDownForceFlushExporter() {}
std::unique_ptr MakeRecordable() noexcept override
{
@@ -184,16 +217,29 @@ class FailShutDownExporter final : public LogRecordExporter
return ExportResult::kSuccess;
}
+ bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return false; }
+
bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return false; }
};
// Tests for when when processor should fail to shutdown
TEST(SimpleLogRecordProcessorTest, ShutDownFail)
{
- std::unique_ptr exporter(new FailShutDownExporter());
+ std::unique_ptr exporter(new FailShutDownForceFlushExporter());
SimpleLogRecordProcessor processor(std::move(exporter));
// Expect failure result when exporter fails to shutdown
EXPECT_EQ(false, processor.Shutdown());
}
+
+// Tests for when when processor should fail to force flush
+TEST(SimpleLogRecordProcessorTest, ForceFlushFail)
+{
+ std::unique_ptr exporter(new FailShutDownForceFlushExporter());
+ SimpleLogRecordProcessor processor(std::move(exporter));
+
+ // Expect failure result when exporter fails to force flush
+ EXPECT_EQ(false, processor.ForceFlush());
+}
+
#endif
diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc
index cd789b5028..e115f79f75 100644
--- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc
+++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc
@@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests)
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+ EXPECT_NO_THROW(reader.ForceFlush());
reader.Shutdown();
EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(),
static_cast(&producer)->GetDataCount());
diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc
index e9150d1ff1..b7a6558538 100644
--- a/sdk/test/trace/batch_span_processor_test.cc
+++ b/sdk/test/trace/batch_span_processor_test.cc
@@ -23,11 +23,13 @@ class MockSpanExporter final : public sdk::trace::SpanExporter
public:
MockSpanExporter(
std::shared_ptr>> spans_received,
+ std::shared_ptr> shut_down_counter,
std::shared_ptr> is_shutdown,
std::shared_ptr> is_export_completed =
std::shared_ptr>(new std::atomic(false)),
const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept
: spans_received_(spans_received),
+ shut_down_counter_(shut_down_counter),
is_shutdown_(is_shutdown),
is_export_completed_(is_export_completed),
export_delay_(export_delay)
@@ -60,6 +62,12 @@ class MockSpanExporter final : public sdk::trace::SpanExporter
return sdk::common::ExportResult::kSuccess;
}
+ bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override
+ {
+ ++(*shut_down_counter_);
+ return true;
+ }
+
bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override
{
*is_shutdown_ = true;
@@ -70,6 +78,7 @@ class MockSpanExporter final : public sdk::trace::SpanExporter
private:
std::shared_ptr>> spans_received_;
+ std::shared_ptr> shut_down_counter_;
std::shared_ptr> is_shutdown_;
std::shared_ptr> is_export_completed_;
// Meant exclusively to test force flush timeout
@@ -104,14 +113,16 @@ class BatchSpanProcessorTestPeer : public testing::Test
TEST_F(BatchSpanProcessorTestPeer, TestShutdown)
{
+ std::shared_ptr> shut_down_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr>> spans_received(
new std::vector>);
- auto batch_processor = std::shared_ptr(
- new sdk::trace::BatchSpanProcessor(std::unique_ptr(
- new MockSpanExporter(spans_received, is_shutdown)),
- sdk::trace::BatchSpanProcessorOptions()));
+ auto batch_processor =
+ std::shared_ptr(new sdk::trace::BatchSpanProcessor(
+ std::unique_ptr(
+ new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
+ sdk::trace::BatchSpanProcessorOptions()));
const int num_spans = 3;
auto test_spans = GetTestSpans(batch_processor, num_spans);
@@ -136,14 +147,16 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown)
TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
{
+ std::shared_ptr> shut_down_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr>> spans_received(
new std::vector>);
- auto batch_processor = std::shared_ptr(
- new sdk::trace::BatchSpanProcessor(std::unique_ptr(
- new MockSpanExporter(spans_received, is_shutdown)),
- sdk::trace::BatchSpanProcessorOptions()));
+ auto batch_processor =
+ std::shared_ptr(new sdk::trace::BatchSpanProcessor(
+ std::unique_ptr(
+ new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
+ sdk::trace::BatchSpanProcessorOptions()));
const int num_spans = 2048;
auto test_spans = GetTestSpans(batch_processor, num_spans);
@@ -157,6 +170,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_TRUE(batch_processor->ForceFlush());
+ EXPECT_GE(shut_down_counter->load(), 1);
EXPECT_EQ(num_spans, spans_received->size());
for (int i = 0; i < num_spans; ++i)
@@ -174,7 +188,9 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
// Give some time to export the spans
std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ auto shut_down_counter_before = shut_down_counter->load();
EXPECT_TRUE(batch_processor->ForceFlush());
+ EXPECT_GT(shut_down_counter->load(), shut_down_counter_before);
EXPECT_EQ(num_spans * 2, spans_received->size());
for (int i = 0; i < num_spans; ++i)
@@ -209,16 +225,18 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss)
auto log_handler = nostd::shared_ptr(new MockLogHandler());
sdk::common::internal_log::GlobalLogHandler::SetLogHandler(log_handler);
+ std::shared_ptr> shut_down_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr>> spans_received(
new std::vector>);
const int max_queue_size = 4096;
- auto batch_processor = std::shared_ptr(
- new sdk::trace::BatchSpanProcessor(std::unique_ptr(
- new MockSpanExporter(spans_received, is_shutdown)),
- sdk::trace::BatchSpanProcessorOptions()));
+ auto batch_processor =
+ std::shared_ptr(new sdk::trace::BatchSpanProcessor(
+ std::unique_ptr(
+ new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
+ sdk::trace::BatchSpanProcessorOptions()));
auto test_spans = GetTestSpans(batch_processor, max_queue_size);
@@ -259,16 +277,18 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess)
{
/* Test that no spans are lost when sending max_queue_size spans */
+ std::shared_ptr> shut_down_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr>> spans_received(
new std::vector>);
const int num_spans = 2048;
- auto batch_processor = std::shared_ptr(
- new sdk::trace::BatchSpanProcessor(std::unique_ptr(
- new MockSpanExporter(spans_received, is_shutdown)),
- sdk::trace::BatchSpanProcessorOptions()));
+ auto batch_processor =
+ std::shared_ptr(new sdk::trace::BatchSpanProcessor(
+ std::unique_ptr(
+ new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
+ sdk::trace::BatchSpanProcessorOptions()));
auto test_spans = GetTestSpans(batch_processor, num_spans);
@@ -294,6 +314,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis)
/* Test that max_export_batch_size spans are exported every schedule_delay_millis
seconds */
+ std::shared_ptr> shut_down_counter(new std::atomic(0));
std::shared_ptr> is_shutdown(new std::atomic(false));
std::shared_ptr> is_export_completed(new std::atomic(false));
std::shared_ptr>> spans_received(
@@ -305,8 +326,8 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis)
auto batch_processor =
std::shared_ptr(new sdk::trace::BatchSpanProcessor(
- std::unique_ptr(
- new MockSpanExporter(spans_received, is_shutdown, is_export_completed, export_delay)),
+ std::unique_ptr(new MockSpanExporter(
+ spans_received, shut_down_counter, is_shutdown, is_export_completed, export_delay)),
options));
auto test_spans = GetTestSpans(batch_processor, max_export_batch_size);
diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc
index f32485e7d0..7653bb2cfc 100644
--- a/sdk/test/trace/simple_processor_test.cc
+++ b/sdk/test/trace/simple_processor_test.cc
@@ -38,7 +38,9 @@ TEST(SimpleProcessor, ToInMemorySpanExporter)
class RecordShutdownExporter final : public SpanExporter
{
public:
- RecordShutdownExporter(int *shutdown_counter) : shutdown_counter_(shutdown_counter) {}
+ RecordShutdownExporter(int *force_flush_counter, int *shutdown_counter)
+ : force_flush_counter_(force_flush_counter), shutdown_counter_(shutdown_counter)
+ {}
std::unique_ptr MakeRecordable() noexcept override
{
@@ -51,6 +53,12 @@ class RecordShutdownExporter final : public SpanExporter
return ExportResult::kSuccess;
}
+ bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override
+ {
+ *force_flush_counter_ += 1;
+ return true;
+ }
+
bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override
{
*shutdown_counter_ += 1;
@@ -58,17 +66,70 @@ class RecordShutdownExporter final : public SpanExporter
}
private:
+ int *force_flush_counter_;
int *shutdown_counter_;
};
TEST(SimpleSpanProcessor, ShutdownCalledOnce)
{
+ int force_flush = 0;
int shutdowns = 0;
- RecordShutdownExporter *exporter = new RecordShutdownExporter(&shutdowns);
+ RecordShutdownExporter *exporter = new RecordShutdownExporter(&force_flush, &shutdowns);
SimpleSpanProcessor processor(std::unique_ptr{exporter});
EXPECT_EQ(0, shutdowns);
processor.Shutdown();
EXPECT_EQ(1, shutdowns);
processor.Shutdown();
EXPECT_EQ(1, shutdowns);
+
+ EXPECT_EQ(0, force_flush);
+}
+
+TEST(SimpleSpanProcessor, ForceFlush)
+{
+ int force_flush = 0;
+ int shutdowns = 0;
+ RecordShutdownExporter *exporter = new RecordShutdownExporter(&force_flush, &shutdowns);
+ SimpleSpanProcessor processor(std::unique_ptr{exporter});
+ processor.ForceFlush();
+ EXPECT_EQ(0, shutdowns);
+ EXPECT_EQ(1, force_flush);
+ processor.ForceFlush();
+ EXPECT_EQ(2, force_flush);
+}
+
+// An exporter that does nothing but record (and give back ) the # of times Shutdown was called.
+class FailShutDownForceFlushExporter final : public SpanExporter
+{
+public:
+ FailShutDownForceFlushExporter() {}
+
+ std::unique_ptr MakeRecordable() noexcept override
+ {
+ return std::unique_ptr(new SpanData());
+ }
+
+ ExportResult Export(const opentelemetry::nostd::span>
+ & /* recordables */) noexcept override
+ {
+ return ExportResult::kSuccess;
+ }
+
+ bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return false; }
+
+ bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return false; }
+};
+
+TEST(SimpleSpanProcessor, ShutdownFail)
+{
+ SimpleSpanProcessor processor(
+ std::unique_ptr{new FailShutDownForceFlushExporter()});
+ EXPECT_EQ(false, processor.Shutdown());
+}
+
+TEST(SimpleSpanProcessor, ForceFlushFail)
+{
+ SimpleSpanProcessor processor(
+ std::unique_ptr{new FailShutDownForceFlushExporter()});
+ EXPECT_EQ(false, processor.ForceFlush());
}