Skip to content

Commit

Permalink
Split Zpages webserver hosting from Exporter (#626)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsuereth authored Mar 30, 2021
1 parent 179a7f4 commit 59915e3
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 102 deletions.
10 changes: 5 additions & 5 deletions ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

#include "opentelemetry/ext/zpages/latency_boundaries.h"
#include "opentelemetry/ext/zpages/tracez_data.h"
#include "opentelemetry/ext/zpages/tracez_processor.h"
#include "opentelemetry/ext/zpages/tracez_shared_data.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/trace/span_data.h"
Expand Down Expand Up @@ -46,10 +46,10 @@ class TracezDataAggregator
/**
* Constructor creates a thread that calls a function to aggregate span data
* at regular intervals.
* @param span_processor is the tracez span processor to be set
* @param shared_data is the shared set of spans to expose.
* @param update_interval the time duration for updating the aggregated data.
*/
TracezDataAggregator(std::shared_ptr<TracezSpanProcessor> span_processor,
TracezDataAggregator(std::shared_ptr<TracezSharedData> shared_data,
milliseconds update_interval = milliseconds(10));

/** Ends the thread set up in the constructor and destroys the object **/
Expand Down Expand Up @@ -135,8 +135,8 @@ class TracezDataAggregator
void InsertIntoSampleSpanList(std::list<ThreadsafeSpanData> &sample_spans,
ThreadsafeSpanData &span_data);

/** Instance of span processor used to collect raw data **/
std::shared_ptr<TracezSpanProcessor> tracez_span_processor_;
/** Instance of shared spans used to collect raw data **/
std::shared_ptr<TracezSharedData> tracez_shared_data_;

/**
* Tree map with key being the name of the span and value being a unique ptr
Expand Down
25 changes: 5 additions & 20 deletions ext/include/opentelemetry/ext/zpages/tracez_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <vector>

#include "opentelemetry/ext/zpages/threadsafe_span_data.h"
#include "opentelemetry/ext/zpages/tracez_shared_data.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/recordable.h"

Expand All @@ -23,16 +24,12 @@ namespace zpages
class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor
{
public:
struct CollectedSpans
{
std::unordered_set<ThreadsafeSpanData *> running;
std::vector<std::unique_ptr<ThreadsafeSpanData>> completed;
};

/*
* Initialize a span processor.
*/
explicit TracezSpanProcessor() noexcept {}
explicit TracezSpanProcessor(std::shared_ptr<TracezSharedData> shared_data) noexcept
: shared_data_(shared_data)
{}

/*
* Create a span recordable, which is span_data
Expand All @@ -58,17 +55,6 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor
*/
void OnEnd(std::unique_ptr<opentelemetry::sdk::trace::Recordable> &&span) noexcept override;

/*
* Returns a snapshot of all spans stored. This snapshot has a copy of the
* stored running_spans and gives ownership of completed spans to the caller.
* Stored completed_spans are cleared from the processor. Currently,
* copy-on-write is utilized where possible to minimize contention, but locks
* may be added in the future.
* @return snapshot of all currently running spans and newly completed spans
* (spans never sent while complete) at the time that the function is called
*/
CollectedSpans GetSpanSnapshot() noexcept;

/*
* For now, does nothing. In the future, it
* may send all ended spans that have not yet been sent to the aggregator.
Expand Down Expand Up @@ -96,8 +82,7 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor
}

private:
mutable std::mutex mtx_;
CollectedSpans spans_;
std::shared_ptr<TracezSharedData> shared_data_;
};
} // namespace zpages
} // namespace ext
Expand Down
64 changes: 64 additions & 0 deletions ext/include/opentelemetry/ext/zpages/tracez_shared_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#pragma once

#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <utility>
#include <vector>

#include "opentelemetry/ext/zpages/threadsafe_span_data.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/recordable.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
{
namespace zpages
{
/*
* The span processor passes and stores running and completed recordables (casted as span_data)
* to be used by the TraceZ Data Aggregator.
*/
class TracezSharedData
{
public:
struct CollectedSpans
{
std::unordered_set<ThreadsafeSpanData *> running;
std::vector<std::unique_ptr<ThreadsafeSpanData>> completed;
};

/*
* Initialize a shared data storage.
*/
explicit TracezSharedData() noexcept {}

/*
* Called when a span has been started.
*/
void OnStart(ThreadsafeSpanData *span) noexcept;

/*
* Called when a span has ended.
*/
void OnEnd(std::unique_ptr<ThreadsafeSpanData> &&span) noexcept;

/*
* Returns a snapshot of all spans stored. This snapshot has a copy of the
* stored running_spans and gives ownership of completed spans to the caller.
* Stored completed_spans are cleared from the processor. Currently,
* copy-on-write is utilized where possible to minimize contention, but locks
* may be added in the future.
* @return snapshot of all currently running spans and newly completed spans
* (spans never sent while complete) at the time that the function is called
*/
CollectedSpans GetSpanSnapshot() noexcept;

private:
mutable std::mutex mtx_;
CollectedSpans spans_;
};
} // namespace zpages
} // namespace ext
OPENTELEMETRY_END_NAMESPACE
49 changes: 38 additions & 11 deletions ext/include/opentelemetry/ext/zpages/zpages.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
#include "opentelemetry/ext/zpages/tracez_data_aggregator.h"
#include "opentelemetry/ext/zpages/tracez_http_server.h"
#include "opentelemetry/ext/zpages/tracez_processor.h"
#include "opentelemetry/ext/zpages/tracez_shared_data.h"

#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/provider.h"

using opentelemetry::ext::zpages::TracezDataAggregator;
using opentelemetry::ext::zpages::TracezHttpServer;
using opentelemetry::ext::zpages::TracezSharedData;
using opentelemetry::ext::zpages::TracezSpanProcessor;
using std::chrono::microseconds;

Expand All @@ -28,9 +30,39 @@ class ZPages
public:
/**
* This function is called if the user wishes to include zPages in their
* application. It creates a static instance of this class.
* application. It creates a static instance of this class and replaces the
* global TracerProvider with one that delegates spans to tracez.
*/
static void Initialize() { static ZPages instance; }
static void Initialize() { Instance().ReplaceGlobalProvider(); }

/**
* Returns the singletone instnace of ZPages, useful for attaching z-pages span processors to
* non-global providers.
*
* Note: This will instantiate the Tracez instance and webserver if it hasn't already been
* instantiated.
*/
static ZPages &Instance()
{
static ZPages instance;
return instance;
}

/** Replaces the global tracer provider with an instance that exports to tracez. */
void ReplaceGlobalProvider()
{
std::shared_ptr<opentelemetry::sdk::trace::SpanProcessor> tracez_processor(
MakeSpanProcessor().release());
auto tracez_provider_ = opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new opentelemetry::sdk::trace::TracerProvider(tracez_processor));
opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_);
}

/** Retruns a new span processor that will output to z-pages. */
std::unique_ptr<TracezSpanProcessor> MakeSpanProcessor()
{
return std::unique_ptr<TracezSpanProcessor>(new TracezSpanProcessor(tracez_shared_));
}

private:
/**
Expand All @@ -40,19 +72,13 @@ class ZPages
*/
ZPages()
{
auto tracez_processor_ = std::make_shared<TracezSpanProcessor>();
auto tracez_provider_ = opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new opentelemetry::sdk::trace::TracerProvider(tracez_processor_));

// Construct shared data nd start tracez webserver.
tracez_shared_ = std::make_shared<TracezSharedData>();
auto tracez_aggregator =
std::unique_ptr<TracezDataAggregator>(new TracezDataAggregator(tracez_processor_));

std::unique_ptr<TracezDataAggregator>(new TracezDataAggregator(tracez_shared_));
tracez_server_ =
std::unique_ptr<TracezHttpServer>(new TracezHttpServer(std::move(tracez_aggregator)));

tracez_server_->start();

opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_);
}

~ZPages()
Expand All @@ -61,5 +87,6 @@ class ZPages
// program)
tracez_server_->stop();
}
std::shared_ptr<TracezSharedData> tracez_shared_;
std::unique_ptr<TracezHttpServer> tracez_server_;
};
2 changes: 2 additions & 0 deletions ext/src/zpages/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
add_library(
opentelemetry_zpages
tracez_processor.cc
tracez_shared_data.cc
tracez_data_aggregator.cc
../../include/opentelemetry/ext/zpages/tracez_shared_data.h
../../include/opentelemetry/ext/zpages/tracez_processor.h
../../include/opentelemetry/ext/zpages/tracez_data_aggregator.h
../../include/opentelemetry/ext/zpages/tracez_http_server.h)
Expand Down
6 changes: 3 additions & 3 deletions ext/src/zpages/tracez_data_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ namespace ext
namespace zpages
{

TracezDataAggregator::TracezDataAggregator(std::shared_ptr<TracezSpanProcessor> span_processor,
TracezDataAggregator::TracezDataAggregator(std::shared_ptr<TracezSharedData> shared_data,
milliseconds update_interval)
{
tracez_span_processor_ = span_processor;
tracez_shared_data_ = shared_data;

// Start a thread that calls AggregateSpans periodically or till notified.
execute_.store(true, std::memory_order_release);
Expand Down Expand Up @@ -153,7 +153,7 @@ void TracezDataAggregator::AggregateRunningSpans(

void TracezDataAggregator::AggregateSpans()
{
auto span_snapshot = tracez_span_processor_->GetSpanSnapshot();
auto span_snapshot = tracez_shared_data_->GetSpanSnapshot();
/**
* TODO: At this time in the project, there is no way of uniquely identifying
* a span(their id's are not being set yet).
Expand Down
26 changes: 3 additions & 23 deletions ext/src/zpages/tracez_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,14 @@ namespace zpages
void TracezSpanProcessor::OnStart(opentelemetry::sdk::trace::Recordable &span,
const opentelemetry::trace::SpanContext &parent_context) noexcept
{
std::lock_guard<std::mutex> lock(mtx_);
spans_.running.insert(static_cast<ThreadsafeSpanData *>(&span));
shared_data_->OnStart(static_cast<ThreadsafeSpanData *>(&span));
}

void TracezSpanProcessor::OnEnd(
std::unique_ptr<opentelemetry::sdk::trace::Recordable> &&span) noexcept
{
if (span == nullptr)
return;
auto span_raw = static_cast<ThreadsafeSpanData *>(span.get());
std::lock_guard<std::mutex> lock(mtx_);
auto span_it = spans_.running.find(span_raw);
if (span_it != spans_.running.end())
{
spans_.running.erase(span_it);
spans_.completed.push_back(
std::unique_ptr<ThreadsafeSpanData>(static_cast<ThreadsafeSpanData *>(span.release())));
}
}

TracezSpanProcessor::CollectedSpans TracezSpanProcessor::GetSpanSnapshot() noexcept
{
CollectedSpans snapshot;
std::lock_guard<std::mutex> lock(mtx_);
snapshot.running = spans_.running;
snapshot.completed = std::move(spans_.completed);
spans_.completed.clear();
return snapshot;
shared_data_->OnEnd(
std::unique_ptr<ThreadsafeSpanData>(static_cast<ThreadsafeSpanData *>(span.release())));
}

} // namespace zpages
Expand Down
38 changes: 38 additions & 0 deletions ext/src/zpages/tracez_shared_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "opentelemetry/ext/zpages/tracez_shared_data.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
{
namespace zpages
{

void TracezSharedData::OnStart(ThreadsafeSpanData *span) noexcept
{
std::lock_guard<std::mutex> lock(mtx_);
spans_.running.insert(span);
}

void TracezSharedData::OnEnd(std::unique_ptr<ThreadsafeSpanData> &&span) noexcept
{
std::lock_guard<std::mutex> lock(mtx_);
auto span_it = spans_.running.find(span.get());
if (span_it != spans_.running.end())
{
spans_.running.erase(span_it);
spans_.completed.push_back(std::unique_ptr<ThreadsafeSpanData>(span.release()));
}
}

TracezSharedData::CollectedSpans TracezSharedData::GetSpanSnapshot() noexcept
{
CollectedSpans snapshot;
std::lock_guard<std::mutex> lock(mtx_);
snapshot.running = spans_.running;
snapshot.completed = std::move(spans_.completed);
spans_.completed.clear();
return snapshot;
}

} // namespace zpages
} // namespace ext
OPENTELEMETRY_END_NAMESPACE
5 changes: 3 additions & 2 deletions ext/test/zpages/tracez_data_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ class TracezDataAggregatorTest : public ::testing::Test
protected:
void SetUp() override
{
std::shared_ptr<TracezSpanProcessor> processor(new TracezSpanProcessor());
std::shared_ptr<TracezSharedData> shared_data(new TracezSharedData());
std::shared_ptr<TracezSpanProcessor> processor(new TracezSpanProcessor(shared_data));
auto resource = opentelemetry::sdk::resource::Resource::Create({});
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(processor, resource));
tracez_data_aggregator = std::unique_ptr<TracezDataAggregator>(
new TracezDataAggregator(processor, milliseconds(10)));
new TracezDataAggregator(shared_data, milliseconds(10)));
}

std::unique_ptr<TracezDataAggregator> tracez_data_aggregator;
Expand Down
Loading

0 comments on commit 59915e3

Please sign in to comment.