Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/extensions/tracers/datadog/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ envoy_cc_library(
external_deps = ["dd_opentracing_cpp"],
deps = [
"//source/common/config:utility_lib",
"//source/common/http:async_client_utility_lib",
"//source/common/tracing:http_tracer_lib",
"//source/extensions/tracers:well_known_names",
"//source/extensions/tracers/common/ot:opentracing_driver_lib",
Expand Down
19 changes: 13 additions & 6 deletions source/extensions/tracers/datadog/datadog_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,29 @@ void TraceReporter::flushTraces() {
ENVOY_LOG(debug, "submitting {} trace(s) to {} with payload size {}", pendingTraces,
encoder_->path(), encoder_->payload().size());

driver_.clusterManager()
.httpAsyncClientForCluster(driver_.cluster()->name())
.send(std::move(message), *this,
Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(1000U)));
Http::AsyncClient::Request* request =
driver_.clusterManager()
.httpAsyncClientForCluster(driver_.cluster()->name())
.send(std::move(message), *this,
Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(1000U)));
if (request) {
active_requests_.add(*request);
}

encoder_->clearTraces();
}
}

void TraceReporter::onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) {
void TraceReporter::onFailure(const Http::AsyncClient::Request& request,
Http::AsyncClient::FailureReason) {
active_requests_.remove(request);
ENVOY_LOG(debug, "failure submitting traces to datadog agent");
driver_.tracerStats().reports_failed_.inc();
}

void TraceReporter::onSuccess(const Http::AsyncClient::Request&,
void TraceReporter::onSuccess(const Http::AsyncClient::Request& request,
Http::ResponseMessagePtr&& http_response) {
active_requests_.remove(request);
uint64_t responseStatus = Http::Utility::getResponseStatus(http_response->headers());
if (responseStatus != enumToInt(Http::Code::OK)) {
// TODO: Consider adding retries for failed submissions.
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/tracers/datadog/datadog_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/http/async_client_utility.h"
#include "common/http/header_map_impl.h"
#include "common/json/json_loader.h"

Expand Down Expand Up @@ -127,6 +128,9 @@ class TraceReporter : public Http::AsyncClient::Callbacks,
TraceEncoderSharedPtr encoder_;

std::map<std::string, Http::LowerCaseString> lower_case_headers_;

// Track active HTTP requests to be able to cancel them on destruction.
Http::AsyncClientRequestTracker active_requests_;
};
} // namespace Datadog
} // namespace Tracers
Expand Down
62 changes: 62 additions & 0 deletions test/extensions/tracers/datadog/datadog_tracer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
#include "gtest/gtest.h"

using testing::_;
using testing::DoAll;
using testing::Eq;
using testing::Invoke;
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;
using testing::StrictMock;
using testing::WithArg;

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -165,6 +168,65 @@ TEST_F(DatadogDriverTest, FlushSpansTimer) {
EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_failed").value());
}

TEST_F(DatadogDriverTest, CancelInflightRequestsOnDestruction) {
setupValidDriver();

StrictMock<Http::MockAsyncClientRequest> request1(&cm_.async_client_),
request2(&cm_.async_client_), request3(&cm_.async_client_), request4(&cm_.async_client_);
Http::AsyncClient::Callbacks* callback{};
const absl::optional<std::chrono::milliseconds> timeout(std::chrono::seconds(1));

// Expect 4 separate report requests to be made.
EXPECT_CALL(cm_.async_client_,
send_(_, _, Http::AsyncClient::RequestOptions().setTimeout(timeout)))
.WillOnce(DoAll(WithArg<1>(SaveArgAddress(&callback)), Return(&request1)))
.WillOnce(Return(&request2))
.WillOnce(Return(&request3))
.WillOnce(Return(&request4));
// Expect timer to be re-enabled on each tick.
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(900), _)).Times(4);

// Trigger 1st report request.
driver_
->startSpan(config_, request_headers_, operation_name_, start_time_,
{Tracing::Reason::Sampling, true})
->finishSpan();
timer_->invokeCallback();
// Trigger 2nd report request.
driver_
->startSpan(config_, request_headers_, operation_name_, start_time_,
{Tracing::Reason::Sampling, true})
->finishSpan();
timer_->invokeCallback();
// Trigger 3rd report request.
driver_
->startSpan(config_, request_headers_, operation_name_, start_time_,
{Tracing::Reason::Sampling, true})
->finishSpan();
timer_->invokeCallback();
// Trigger 4th report request.
driver_
->startSpan(config_, request_headers_, operation_name_, start_time_,
{Tracing::Reason::Sampling, true})
->finishSpan();
timer_->invokeCallback();

Http::ResponseMessagePtr msg(new Http::ResponseMessageImpl(
Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "404"}}}));
// Simulate completion of the 2nd report request.
callback->onSuccess(request2, std::move(msg));

// Simulate failure of the 3rd report request.
callback->onFailure(request3, Http::AsyncClient::FailureReason::Reset);

// Expect 1st and 4th requests to be cancelled on destruction.
EXPECT_CALL(request1, cancel());
EXPECT_CALL(request4, cancel());

// Trigger destruction.
driver_.reset();
}

} // namespace
} // namespace Datadog
} // namespace Tracers
Expand Down