From 2f56859b15d659140cddd94417d25bfb2eb2342b Mon Sep 17 00:00:00 2001 From: Josh Suereth Date: Wed, 24 Mar 2021 17:00:26 -0400 Subject: [PATCH 1/3] Detangle z-pages exporter from z-pages webserver via shared object between. --- .../ext/zpages/tracez_data_aggregator.h | 10 +-- .../ext/zpages/tracez_processor.h | 24 +----- .../ext/zpages/tracez_shared_data.h | 64 +++++++++++++++ ext/include/opentelemetry/ext/zpages/zpages.h | 7 +- ext/src/zpages/CMakeLists.txt | 2 + ext/src/zpages/tracez_data_aggregator.cc | 6 +- ext/src/zpages/tracez_processor.cc | 25 +----- ext/src/zpages/tracez_shared_data.cc | 39 ++++++++++ .../zpages/tracez_data_aggregator_test.cc | 5 +- ext/test/zpages/tracez_processor_test.cc | 78 ++++++++++--------- 10 files changed, 167 insertions(+), 93 deletions(-) create mode 100644 ext/include/opentelemetry/ext/zpages/tracez_shared_data.h create mode 100644 ext/src/zpages/tracez_shared_data.cc diff --git a/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h b/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h index 6d2d0a054e..158c89a4d6 100644 --- a/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h +++ b/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h @@ -12,7 +12,7 @@ #include "opentelemetry/ext/zpages/latency_boundaries.h" #include "opentelemetry/ext/zpages/tracez_data.h" -#include "opentelemetry/ext/zpages/tracez_processor.h" +#include "opentelemetry/ext/zpages/tracez_shared_data.h" #include "opentelemetry/nostd/span.h" #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/sdk/trace/span_data.h" @@ -46,10 +46,10 @@ class TracezDataAggregator /** * Constructor creates a thread that calls a function to aggregate span data * at regular intervals. - * @param span_processor is the tracez span processor to be set + * @param shared_data is the shared set of spans to expose. * @param update_interval the time duration for updating the aggregated data. */ - TracezDataAggregator(std::shared_ptr span_processor, + TracezDataAggregator(std::shared_ptr shared_data, milliseconds update_interval = milliseconds(10)); /** Ends the thread set up in the constructor and destroys the object **/ @@ -135,8 +135,8 @@ class TracezDataAggregator void InsertIntoSampleSpanList(std::list &sample_spans, ThreadsafeSpanData &span_data); - /** Instance of span processor used to collect raw data **/ - std::shared_ptr tracez_span_processor_; + /** Instance of shared spans used to collect raw data **/ + std::shared_ptr tracez_shared_data_; /** * Tree map with key being the name of the span and value being a unique ptr diff --git a/ext/include/opentelemetry/ext/zpages/tracez_processor.h b/ext/include/opentelemetry/ext/zpages/tracez_processor.h index d0c8692a8c..cb3eda09d7 100644 --- a/ext/include/opentelemetry/ext/zpages/tracez_processor.h +++ b/ext/include/opentelemetry/ext/zpages/tracez_processor.h @@ -8,6 +8,7 @@ #include #include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/ext/zpages/tracez_shared_data.h" #include "opentelemetry/sdk/trace/processor.h" #include "opentelemetry/sdk/trace/recordable.h" @@ -23,16 +24,11 @@ namespace zpages class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor { public: - struct CollectedSpans - { - std::unordered_set running; - std::vector> completed; - }; - /* * Initialize a span processor. */ - explicit TracezSpanProcessor() noexcept {} + explicit TracezSpanProcessor(std::shared_ptr shared_data) noexcept + : shared_data_(shared_data) {} /* * Create a span recordable, which is span_data @@ -58,17 +54,6 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor */ void OnEnd(std::unique_ptr &&span) noexcept override; - /* - * Returns a snapshot of all spans stored. This snapshot has a copy of the - * stored running_spans and gives ownership of completed spans to the caller. - * Stored completed_spans are cleared from the processor. Currently, - * copy-on-write is utilized where possible to minimize contention, but locks - * may be added in the future. - * @return snapshot of all currently running spans and newly completed spans - * (spans never sent while complete) at the time that the function is called - */ - CollectedSpans GetSpanSnapshot() noexcept; - /* * For now, does nothing. In the future, it * may send all ended spans that have not yet been sent to the aggregator. @@ -96,8 +81,7 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor } private: - mutable std::mutex mtx_; - CollectedSpans spans_; + std::shared_ptr shared_data_; }; } // namespace zpages } // namespace ext diff --git a/ext/include/opentelemetry/ext/zpages/tracez_shared_data.h b/ext/include/opentelemetry/ext/zpages/tracez_shared_data.h new file mode 100644 index 0000000000..566698c94d --- /dev/null +++ b/ext/include/opentelemetry/ext/zpages/tracez_shared_data.h @@ -0,0 +1,64 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/sdk/trace/processor.h" +#include "opentelemetry/sdk/trace/recordable.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +/* + * The span processor passes and stores running and completed recordables (casted as span_data) + * to be used by the TraceZ Data Aggregator. + */ +class TracezSharedData +{ +public: + struct CollectedSpans + { + std::unordered_set running; + std::vector> completed; + }; + + /* + * Initialize a shared data storage. + */ + explicit TracezSharedData() noexcept {} + + /* + * Called when a span has been started. + */ + void OnStart(ThreadsafeSpanData *span) noexcept; + + /* + * Called when a span has ended. + */ + void OnEnd(std::unique_ptr &&span) noexcept; + + /* + * Returns a snapshot of all spans stored. This snapshot has a copy of the + * stored running_spans and gives ownership of completed spans to the caller. + * Stored completed_spans are cleared from the processor. Currently, + * copy-on-write is utilized where possible to minimize contention, but locks + * may be added in the future. + * @return snapshot of all currently running spans and newly completed spans + * (spans never sent while complete) at the time that the function is called + */ + CollectedSpans GetSpanSnapshot() noexcept; + +private: + mutable std::mutex mtx_; + CollectedSpans spans_; +}; +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/include/opentelemetry/ext/zpages/zpages.h b/ext/include/opentelemetry/ext/zpages/zpages.h index 9ef7ea5412..f8e2b13c5e 100644 --- a/ext/include/opentelemetry/ext/zpages/zpages.h +++ b/ext/include/opentelemetry/ext/zpages/zpages.h @@ -6,6 +6,7 @@ #include "opentelemetry/ext/zpages/tracez_data_aggregator.h" #include "opentelemetry/ext/zpages/tracez_http_server.h" #include "opentelemetry/ext/zpages/tracez_processor.h" +#include "opentelemetry/ext/zpages/tracez_shared_data.h" #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/sdk/trace/tracer_provider.h" @@ -14,6 +15,7 @@ using opentelemetry::ext::zpages::TracezDataAggregator; using opentelemetry::ext::zpages::TracezHttpServer; using opentelemetry::ext::zpages::TracezSpanProcessor; +using opentelemetry::ext::zpages::TracezSharedData; using std::chrono::microseconds; /** @@ -40,12 +42,13 @@ class ZPages */ ZPages() { - auto tracez_processor_ = std::make_shared(); + auto tracez_shared_data = std::make_shared(); + auto tracez_processor_ = std::make_shared(tracez_shared_data); auto tracez_provider_ = opentelemetry::nostd::shared_ptr( new opentelemetry::sdk::trace::TracerProvider(tracez_processor_)); auto tracez_aggregator = - std::unique_ptr(new TracezDataAggregator(tracez_processor_)); + std::unique_ptr(new TracezDataAggregator(tracez_shared_data)); tracez_server_ = std::unique_ptr(new TracezHttpServer(std::move(tracez_aggregator))); diff --git a/ext/src/zpages/CMakeLists.txt b/ext/src/zpages/CMakeLists.txt index 5440ea3fa6..64d9c73728 100644 --- a/ext/src/zpages/CMakeLists.txt +++ b/ext/src/zpages/CMakeLists.txt @@ -1,7 +1,9 @@ add_library( opentelemetry_zpages tracez_processor.cc + tracez_shared_data.cc tracez_data_aggregator.cc + ../../include/opentelemetry/ext/zpages/tracez_shared_data.h ../../include/opentelemetry/ext/zpages/tracez_processor.h ../../include/opentelemetry/ext/zpages/tracez_data_aggregator.h ../../include/opentelemetry/ext/zpages/tracez_http_server.h) diff --git a/ext/src/zpages/tracez_data_aggregator.cc b/ext/src/zpages/tracez_data_aggregator.cc index c47db41d92..ca1b32c577 100644 --- a/ext/src/zpages/tracez_data_aggregator.cc +++ b/ext/src/zpages/tracez_data_aggregator.cc @@ -6,10 +6,10 @@ namespace ext namespace zpages { -TracezDataAggregator::TracezDataAggregator(std::shared_ptr span_processor, +TracezDataAggregator::TracezDataAggregator(std::shared_ptr shared_data, milliseconds update_interval) { - tracez_span_processor_ = span_processor; + tracez_shared_data_ = shared_data; // Start a thread that calls AggregateSpans periodically or till notified. execute_.store(true, std::memory_order_release); @@ -153,7 +153,7 @@ void TracezDataAggregator::AggregateRunningSpans( void TracezDataAggregator::AggregateSpans() { - auto span_snapshot = tracez_span_processor_->GetSpanSnapshot(); + auto span_snapshot = tracez_shared_data_->GetSpanSnapshot(); /** * TODO: At this time in the project, there is no way of uniquely identifying * a span(their id's are not being set yet). diff --git a/ext/src/zpages/tracez_processor.cc b/ext/src/zpages/tracez_processor.cc index 1f88884a5f..fdf63a029c 100644 --- a/ext/src/zpages/tracez_processor.cc +++ b/ext/src/zpages/tracez_processor.cc @@ -9,34 +9,13 @@ namespace zpages void TracezSpanProcessor::OnStart(opentelemetry::sdk::trace::Recordable &span, const opentelemetry::trace::SpanContext &parent_context) noexcept { - std::lock_guard lock(mtx_); - spans_.running.insert(static_cast(&span)); + shared_data_->OnStart(static_cast(&span)); } void TracezSpanProcessor::OnEnd( std::unique_ptr &&span) noexcept { - if (span == nullptr) - return; - auto span_raw = static_cast(span.get()); - std::lock_guard lock(mtx_); - auto span_it = spans_.running.find(span_raw); - if (span_it != spans_.running.end()) - { - spans_.running.erase(span_it); - spans_.completed.push_back( - std::unique_ptr(static_cast(span.release()))); - } -} - -TracezSpanProcessor::CollectedSpans TracezSpanProcessor::GetSpanSnapshot() noexcept -{ - CollectedSpans snapshot; - std::lock_guard lock(mtx_); - snapshot.running = spans_.running; - snapshot.completed = std::move(spans_.completed); - spans_.completed.clear(); - return snapshot; + shared_data_->OnEnd(std::unique_ptr(static_cast(span.release()))); } } // namespace zpages diff --git a/ext/src/zpages/tracez_shared_data.cc b/ext/src/zpages/tracez_shared_data.cc new file mode 100644 index 0000000000..e248cebac2 --- /dev/null +++ b/ext/src/zpages/tracez_shared_data.cc @@ -0,0 +1,39 @@ +#include "opentelemetry/ext/zpages/tracez_shared_data.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +void TracezSharedData::OnStart(ThreadsafeSpanData *span) noexcept +{ + std::lock_guard lock(mtx_); + spans_.running.insert(span); +} + +void TracezSharedData::OnEnd(std::unique_ptr &&span) noexcept +{ + std::lock_guard lock(mtx_); + auto span_it = spans_.running.find(span.get()); + if (span_it != spans_.running.end()) + { + spans_.running.erase(span_it); + spans_.completed.push_back(std::unique_ptr(span.release())); + } +} + +TracezSharedData::CollectedSpans TracezSharedData::GetSpanSnapshot() noexcept +{ + CollectedSpans snapshot; + std::lock_guard lock(mtx_); + snapshot.running = spans_.running; + snapshot.completed = std::move(spans_.completed); + spans_.completed.clear(); + return snapshot; +} + + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/test/zpages/tracez_data_aggregator_test.cc b/ext/test/zpages/tracez_data_aggregator_test.cc index eb077d636a..82af11a274 100644 --- a/ext/test/zpages/tracez_data_aggregator_test.cc +++ b/ext/test/zpages/tracez_data_aggregator_test.cc @@ -34,11 +34,12 @@ class TracezDataAggregatorTest : public ::testing::Test protected: void SetUp() override { - std::shared_ptr processor(new TracezSpanProcessor()); + std::shared_ptr shared_data(new TracezSharedData()); + std::shared_ptr processor(new TracezSpanProcessor(shared_data)); auto resource = opentelemetry::sdk::resource::Resource::Create({}); tracer = std::shared_ptr(new Tracer(processor, resource)); tracez_data_aggregator = std::unique_ptr( - new TracezDataAggregator(processor, milliseconds(10))); + new TracezDataAggregator(shared_data, milliseconds(10))); } std::unique_ptr tracez_data_aggregator; diff --git a/ext/test/zpages/tracez_processor_test.cc b/ext/test/zpages/tracez_processor_test.cc index 2d9246495a..d16c2f2a96 100644 --- a/ext/test/zpages/tracez_processor_test.cc +++ b/ext/test/zpages/tracez_processor_test.cc @@ -18,12 +18,12 @@ using namespace opentelemetry::ext::zpages; * Helper function uses the current processor to update spans contained in completed_spans * and running_spans. completed_spans contains all spans (cumulative), unless marked otherwise */ -void UpdateSpans(std::shared_ptr &processor, +void UpdateSpans(std::shared_ptr &data, std::vector> &completed, std::unordered_set &running, bool store_only_new_completed = false) { - auto spans = processor->GetSpanSnapshot(); + auto spans = data->GetSpanSnapshot(); running = spans.running; if (store_only_new_completed) { @@ -136,10 +136,10 @@ bool ContainsNames(const std::vector &names, * Helper function calls GetSpanSnapshot() i times and does nothing with it * otherwise. Used for testing thread safety */ -void GetManySnapshots(std::shared_ptr &processor, int i) +void GetManySnapshots(std::shared_ptr &data, int i) { for (; i > 0; i--) - processor->GetSpanSnapshot(); + data->GetSpanSnapshot(); } /* @@ -175,17 +175,19 @@ class TracezProcessor : public ::testing::Test protected: void SetUp() override { - processor = std::shared_ptr(new TracezSpanProcessor()); + shared_data = std::shared_ptr(new TracezSharedData()); + processor = std::shared_ptr(new TracezSpanProcessor(shared_data)); auto resource = opentelemetry::sdk::resource::Resource::Create({}); tracer = std::shared_ptr(new Tracer(processor, resource)); - auto spans = processor->GetSpanSnapshot(); + auto spans = shared_data->GetSpanSnapshot(); running = spans.running; completed = std::move(spans.completed); span_names = {"s0", "s2", "s1", "s1", "s"}; } + std::shared_ptr shared_data; std::shared_ptr processor; std::shared_ptr tracer; @@ -218,14 +220,14 @@ TEST_F(TracezProcessor, NoSpans) TEST_F(TracezProcessor, OneSpanCumulative) { auto span = tracer->StartSpan(span_names[0]); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running, 0, 1, true)); EXPECT_EQ(running.size(), 1); EXPECT_EQ(completed.size(), 0); span->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1, true)); EXPECT_EQ(running.size(), 0); @@ -246,7 +248,7 @@ TEST_F(TracezProcessor, MultipleSpansCumulative) // Start and store spans using span_names for (const auto &name : span_names) span_vars.push_back(tracer->StartSpan(name)); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running)); // s0 s2 s1 s1 s EXPECT_EQ(running.size(), span_names.size()); @@ -255,7 +257,7 @@ TEST_F(TracezProcessor, MultipleSpansCumulative) // End all spans for (auto &span : span_vars) span->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, completed)); // s0 s2 s1 s1 s EXPECT_EQ(running.size(), 0); @@ -272,7 +274,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitCumulative) { for (const auto &name : span_names) span_vars.push_back(tracer->StartSpan(name)); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running)); // s0 s2 s1 s1 s EXPECT_EQ(running.size(), span_names.size()); @@ -280,7 +282,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitCumulative) // End 4th span span_vars[3]->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running, 0, 3)); // s0 s2 s1 EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s @@ -290,7 +292,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitCumulative) // End 2nd span span_vars[1]->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running, 0, 1)); // s0 EXPECT_TRUE(ContainsNames(span_names, running, 2, 3)); // + s1 @@ -302,7 +304,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitCumulative) // End 3rd span (last middle span) span_vars[2]->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running, 0, 1)); // s0 EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s @@ -313,7 +315,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitCumulative) // End remaining Spans span_vars[0]->End(); span_vars[4]->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, completed)); // s0 s2 s1 s1 s EXPECT_EQ(running.size(), 0); @@ -333,7 +335,7 @@ TEST_F(TracezProcessor, MultipleSpansOuterSplitCumulative) // End last span span_vars[4]->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running, 0, 4)); // s0 s2 s1 s1 EXPECT_TRUE(ContainsNames(span_names, completed, 4)); // s @@ -342,7 +344,7 @@ TEST_F(TracezProcessor, MultipleSpansOuterSplitCumulative) // End first span span_vars[0]->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running, 1, 4)); // s2 s1 s1 EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1)); // s0 @@ -353,7 +355,7 @@ TEST_F(TracezProcessor, MultipleSpansOuterSplitCumulative) // End remaining Spans for (int i = 1; i < 4; i++) span_vars[i]->End(); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, completed)); // s0 s2 s1 s1 s EXPECT_EQ(running.size(), 0); @@ -368,20 +370,20 @@ TEST_F(TracezProcessor, MultipleSpansOuterSplitCumulative) TEST_F(TracezProcessor, OneSpanNewOnly) { auto span = tracer->StartSpan(span_names[0]); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, running, 0, 1, true)); EXPECT_EQ(running.size(), 1); EXPECT_EQ(completed.size(), 0); span->End(); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1, true)); EXPECT_EQ(running.size(), 0); EXPECT_EQ(completed.size(), 1); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_EQ(running.size(), 0); EXPECT_EQ(completed.size(), 0); @@ -397,7 +399,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitNewOnly) { for (const auto &name : span_names) span_vars.push_back(tracer->StartSpan(name)); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_TRUE(ContainsNames(span_names, running, 0, 5, true)); // s0 s2 s1 s1 s EXPECT_EQ(running.size(), span_names.size()); @@ -405,7 +407,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitNewOnly) // End 4th span span_vars[3]->End(); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, running, 0, 3)); // s0 s2 s1 EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s @@ -416,7 +418,7 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitNewOnly) // End 2nd and 3rd span span_vars[1]->End(); span_vars[2]->End(); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, running, 0, 1)); // s0 EXPECT_TRUE(ContainsNames(span_names, running, 4)); // + s @@ -427,14 +429,14 @@ TEST_F(TracezProcessor, MultipleSpansMiddleSplitNewOnly) // End remaining Spans span_vars[0]->End(); span_vars[4]->End(); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1)); // s0 EXPECT_TRUE(ContainsNames(span_names, completed, 4)); // s EXPECT_EQ(running.size(), 0); EXPECT_EQ(completed.size(), 2); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_EQ(running.size(), 0); EXPECT_EQ(completed.size(), 0); @@ -453,7 +455,7 @@ TEST_F(TracezProcessor, MultipleSpansOuterSplitNewOnly) // End last span span_vars[4]->End(); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, running, 0, 4, true)); // s0 s2 s1 s1 EXPECT_TRUE(ContainsNames(span_names, completed, 4, 5, true)); // s @@ -462,7 +464,7 @@ TEST_F(TracezProcessor, MultipleSpansOuterSplitNewOnly) // End first span span_vars[0]->End(); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, running, 1, 4, true)); // s2 s1 s1 EXPECT_TRUE(ContainsNames(span_names, completed, 0, 1, true)); // s0 @@ -472,13 +474,13 @@ TEST_F(TracezProcessor, MultipleSpansOuterSplitNewOnly) // End remaining middle spans for (int i = 1; i < 4; i++) span_vars[i]->End(); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_TRUE(ContainsNames(span_names, completed, 1, 4, true)); // s2 s1 s1 EXPECT_EQ(running.size(), 0); EXPECT_EQ(completed.size(), 3); - UpdateSpans(processor, completed, running, true); + UpdateSpans(shared_data, completed, running, true); EXPECT_EQ(running.size(), 0); EXPECT_EQ(completed.size(), 0); @@ -495,7 +497,7 @@ TEST_F(TracezProcessor, FlushShutdown) EXPECT_TRUE(processor->ForceFlush()); EXPECT_TRUE(processor->Shutdown()); - UpdateSpans(processor, completed, running); + UpdateSpans(shared_data, completed, running); EXPECT_EQ(pre_running_sz, running.size()); EXPECT_EQ(pre_completed_sz, completed.size()); @@ -544,16 +546,16 @@ TEST_F(TracezProcessor, SnapshotThreadSafety) { std::vector> spans; - std::thread snap1(GetManySnapshots, std::ref(processor), 500); - std::thread snap2(GetManySnapshots, std::ref(processor), 500); + std::thread snap1(GetManySnapshots, std::ref(shared_data), 500); + std::thread snap2(GetManySnapshots, std::ref(shared_data), 500); snap1.join(); snap2.join(); StartManySpans(spans, tracer, 500); - std::thread snap3(GetManySnapshots, std::ref(processor), 500); - std::thread snap4(GetManySnapshots, std::ref(processor), 500); + std::thread snap3(GetManySnapshots, std::ref(shared_data), 500); + std::thread snap4(GetManySnapshots, std::ref(shared_data), 500); snap3.join(); snap4.join(); @@ -588,7 +590,7 @@ TEST_F(TracezProcessor, RunningSnapshotThreadSafety) std::vector> spans; std::thread start(StartManySpans, std::ref(spans), tracer, 500); - std::thread snapshots(GetManySnapshots, std::ref(processor), 500); + std::thread snapshots(GetManySnapshots, std::ref(shared_data), 500); start.join(); snapshots.join(); @@ -605,7 +607,7 @@ TEST_F(TracezProcessor, SnapshotCompletedThreadSafety) StartManySpans(spans, tracer, 500); - std::thread snapshots(GetManySnapshots, std::ref(processor), 500); + std::thread snapshots(GetManySnapshots, std::ref(shared_data), 500); std::thread end(EndAllSpans, std::ref(spans)); snapshots.join(); @@ -623,7 +625,7 @@ TEST_F(TracezProcessor, RunningSnapshotCompletedThreadSafety) StartManySpans(spans1, tracer, 500); std::thread start(StartManySpans, std::ref(spans2), tracer, 500); - std::thread snapshots(GetManySnapshots, std::ref(processor), 500); + std::thread snapshots(GetManySnapshots, std::ref(shared_data), 500); std::thread end(EndAllSpans, std::ref(spans1)); start.join(); From c36a491610f8f36ae77789240b6827204215f6bd Mon Sep 17 00:00:00 2001 From: Josh Suereth Date: Wed, 24 Mar 2021 17:14:53 -0400 Subject: [PATCH 2/3] Update zpages initialization to allow attaching to multiple tracer providers. --- ext/include/opentelemetry/ext/zpages/zpages.h | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/ext/include/opentelemetry/ext/zpages/zpages.h b/ext/include/opentelemetry/ext/zpages/zpages.h index f8e2b13c5e..f2bb4b8c59 100644 --- a/ext/include/opentelemetry/ext/zpages/zpages.h +++ b/ext/include/opentelemetry/ext/zpages/zpages.h @@ -30,9 +30,37 @@ class ZPages public: /** * This function is called if the user wishes to include zPages in their - * application. It creates a static instance of this class. + * application. It creates a static instance of this class and replaces the + * global TracerProvider with one that delegates spans to tracez. */ - static void Initialize() { static ZPages instance; } + static void Initialize() + { + Instance().ReplaceGlobalProvider(); + } + + /** + * Returns the singletone instnace of ZPages, useful for attaching z-pages span processors to non-global providers. + * + * Note: This will instantiate the Tracez instance and webserver if it hasn't already been instantiated. + */ + static ZPages& Instance() { + static ZPages instance; + return instance; + } + + /** Replaces the global tracer provider with an instance that exports to tracez. */ + void ReplaceGlobalProvider() + { + std::shared_ptr tracez_processor (MakeSpanProcessor().release()); + auto tracez_provider_ = opentelemetry::nostd::shared_ptr( + new opentelemetry::sdk::trace::TracerProvider(tracez_processor)); + opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_); + } + + /** Retruns a new span processor that will output to z-pages. */ + std::unique_ptr MakeSpanProcessor() { + return std::unique_ptr(new TracezSpanProcessor(tracez_shared_)); + } private: /** @@ -42,20 +70,13 @@ class ZPages */ ZPages() { - auto tracez_shared_data = std::make_shared(); - auto tracez_processor_ = std::make_shared(tracez_shared_data); - auto tracez_provider_ = opentelemetry::nostd::shared_ptr( - new opentelemetry::sdk::trace::TracerProvider(tracez_processor_)); - + // Construct shared data nd start tracez webserver. + tracez_shared_ = std::make_shared(); auto tracez_aggregator = - std::unique_ptr(new TracezDataAggregator(tracez_shared_data)); - + std::unique_ptr(new TracezDataAggregator(tracez_shared_)); tracez_server_ = std::unique_ptr(new TracezHttpServer(std::move(tracez_aggregator))); - tracez_server_->start(); - - opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_); } ~ZPages() @@ -64,5 +85,6 @@ class ZPages // program) tracez_server_->stop(); } + std::shared_ptr tracez_shared_; std::unique_ptr tracez_server_; }; From 5e8429a9f54025d4c083caed4cdc03d016a7ee41 Mon Sep 17 00:00:00 2001 From: Josh Suereth Date: Thu, 25 Mar 2021 13:44:18 -0400 Subject: [PATCH 3/3] clang format. --- .../ext/zpages/tracez_processor.h | 5 ++-- ext/include/opentelemetry/ext/zpages/zpages.h | 28 ++++++++++--------- ext/src/zpages/tracez_processor.cc | 3 +- ext/src/zpages/tracez_shared_data.cc | 1 - ext/test/zpages/tracez_processor_test.cc | 2 +- 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/ext/include/opentelemetry/ext/zpages/tracez_processor.h b/ext/include/opentelemetry/ext/zpages/tracez_processor.h index cb3eda09d7..a8f43373d1 100644 --- a/ext/include/opentelemetry/ext/zpages/tracez_processor.h +++ b/ext/include/opentelemetry/ext/zpages/tracez_processor.h @@ -27,8 +27,9 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor /* * Initialize a span processor. */ - explicit TracezSpanProcessor(std::shared_ptr shared_data) noexcept - : shared_data_(shared_data) {} + explicit TracezSpanProcessor(std::shared_ptr shared_data) noexcept + : shared_data_(shared_data) + {} /* * Create a span recordable, which is span_data diff --git a/ext/include/opentelemetry/ext/zpages/zpages.h b/ext/include/opentelemetry/ext/zpages/zpages.h index f2bb4b8c59..a32b2bf161 100644 --- a/ext/include/opentelemetry/ext/zpages/zpages.h +++ b/ext/include/opentelemetry/ext/zpages/zpages.h @@ -14,8 +14,8 @@ using opentelemetry::ext::zpages::TracezDataAggregator; using opentelemetry::ext::zpages::TracezHttpServer; -using opentelemetry::ext::zpages::TracezSpanProcessor; using opentelemetry::ext::zpages::TracezSharedData; +using opentelemetry::ext::zpages::TracezSpanProcessor; using std::chrono::microseconds; /** @@ -33,17 +33,17 @@ class ZPages * application. It creates a static instance of this class and replaces the * global TracerProvider with one that delegates spans to tracez. */ - static void Initialize() - { - Instance().ReplaceGlobalProvider(); - } + static void Initialize() { Instance().ReplaceGlobalProvider(); } - /** - * Returns the singletone instnace of ZPages, useful for attaching z-pages span processors to non-global providers. - * - * Note: This will instantiate the Tracez instance and webserver if it hasn't already been instantiated. + /** + * Returns the singletone instnace of ZPages, useful for attaching z-pages span processors to + * non-global providers. + * + * Note: This will instantiate the Tracez instance and webserver if it hasn't already been + * instantiated. */ - static ZPages& Instance() { + static ZPages &Instance() + { static ZPages instance; return instance; } @@ -51,14 +51,16 @@ class ZPages /** Replaces the global tracer provider with an instance that exports to tracez. */ void ReplaceGlobalProvider() { - std::shared_ptr tracez_processor (MakeSpanProcessor().release()); - auto tracez_provider_ = opentelemetry::nostd::shared_ptr( + std::shared_ptr tracez_processor( + MakeSpanProcessor().release()); + auto tracez_provider_ = opentelemetry::nostd::shared_ptr( new opentelemetry::sdk::trace::TracerProvider(tracez_processor)); opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_); } /** Retruns a new span processor that will output to z-pages. */ - std::unique_ptr MakeSpanProcessor() { + std::unique_ptr MakeSpanProcessor() + { return std::unique_ptr(new TracezSpanProcessor(tracez_shared_)); } diff --git a/ext/src/zpages/tracez_processor.cc b/ext/src/zpages/tracez_processor.cc index fdf63a029c..e5adef83f1 100644 --- a/ext/src/zpages/tracez_processor.cc +++ b/ext/src/zpages/tracez_processor.cc @@ -15,7 +15,8 @@ void TracezSpanProcessor::OnStart(opentelemetry::sdk::trace::Recordable &span, void TracezSpanProcessor::OnEnd( std::unique_ptr &&span) noexcept { - shared_data_->OnEnd(std::unique_ptr(static_cast(span.release()))); + shared_data_->OnEnd( + std::unique_ptr(static_cast(span.release()))); } } // namespace zpages diff --git a/ext/src/zpages/tracez_shared_data.cc b/ext/src/zpages/tracez_shared_data.cc index e248cebac2..66726de5da 100644 --- a/ext/src/zpages/tracez_shared_data.cc +++ b/ext/src/zpages/tracez_shared_data.cc @@ -33,7 +33,6 @@ TracezSharedData::CollectedSpans TracezSharedData::GetSpanSnapshot() noexcept return snapshot; } - } // namespace zpages } // namespace ext OPENTELEMETRY_END_NAMESPACE diff --git a/ext/test/zpages/tracez_processor_test.cc b/ext/test/zpages/tracez_processor_test.cc index d16c2f2a96..c0ec2dde39 100644 --- a/ext/test/zpages/tracez_processor_test.cc +++ b/ext/test/zpages/tracez_processor_test.cc @@ -175,7 +175,7 @@ class TracezProcessor : public ::testing::Test protected: void SetUp() override { - shared_data = std::shared_ptr(new TracezSharedData()); + shared_data = std::shared_ptr(new TracezSharedData()); processor = std::shared_ptr(new TracezSpanProcessor(shared_data)); auto resource = opentelemetry::sdk::resource::Resource::Create({});