Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Version history
* tls: added verification of IP address SAN fields in certificates against configured SANs in the
certificate validation context.
* upstream: added network filter chains to upstream connections, see :ref:`filters<envoy_api_field_Cluster.filters>`.
* zookeeper: parse responses and emit latency stats.

1.11.0 (July 11, 2019)
======================
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/filters/network/zookeeper_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ Network::FilterFactoryCb ZooKeeperConfigFactory::createFilterFactoryFromProtoTyp

ZooKeeperFilterConfigSharedPtr filter_config(
std::make_shared<ZooKeeperFilterConfig>(stat_prefix, max_packet_bytes, context.scope()));
return [filter_config](Network::FilterManager& filter_manager) -> void {
filter_manager.addFilter(std::make_shared<ZooKeeperFilter>(filter_config));
auto& time_source = context.dispatcher().timeSource();

return [filter_config, &time_source](Network::FilterManager& filter_manager) -> void {
filter_manager.addFilter(std::make_shared<ZooKeeperFilter>(filter_config, time_source));
};
}

Expand Down
46 changes: 30 additions & 16 deletions source/extensions/filters/network/zookeeper_proxy/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ void DecoderImpl::decodeOnData(Buffer::Instance& data, uint64_t& offset) {
ensureMinLength(len, INT_LENGTH + XID_LENGTH);
ensureMaxLength(len);

auto start_time = time_source_.monotonicTime();

// Control requests, with XIDs <= 0.
//
// These are meant to control the state of a session:
Expand All @@ -65,17 +67,21 @@ void DecoderImpl::decodeOnData(Buffer::Instance& data, uint64_t& offset) {
switch (static_cast<XidCodes>(xid)) {
case XidCodes::CONNECT_XID:
parseConnect(data, offset, len);
requests_by_xid_[xid] = {OpCodes::CONNECT, std::move(start_time)};
return;
case XidCodes::PING_XID:
offset += OPCODE_LENGTH;
callbacks_.onPing();
requests_by_xid_[xid] = {OpCodes::PING, std::move(start_time)};
return;
case XidCodes::AUTH_XID:
parseAuthRequest(data, offset, len);
requests_by_xid_[xid] = {OpCodes::SETAUTH, std::move(start_time)};
return;
case XidCodes::SET_WATCHES_XID:
offset += OPCODE_LENGTH;
parseSetWatchesRequest(data, offset, len);
requests_by_xid_[xid] = {OpCodes::SETWATCHES, std::move(start_time)};
return;
default:
// WATCH_XID is generated by the server, so that and everything
Expand Down Expand Up @@ -155,7 +161,7 @@ void DecoderImpl::decodeOnData(Buffer::Instance& data, uint64_t& offset) {
throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode)));
}

requests_by_xid_[xid] = opcode;
requests_by_xid_[xid] = {opcode, std::move(start_time)};
}

void DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint64_t& offset) {
Expand All @@ -170,11 +176,26 @@ void DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint64_t& offset) {
const auto xid = helper_.peekInt32(data, offset);
const auto xid_code = static_cast<XidCodes>(xid);

// Find the corresponding request for this XID.
const auto it = requests_by_xid_.find(xid);

std::chrono::milliseconds latency;
OpCodes opcode;

if (xid_code != XidCodes::WATCH_XID) {
// If this fails, it's a server-side bug.
ASSERT(it != requests_by_xid_.end());
latency = std::chrono::duration_cast<std::chrono::milliseconds>(time_source_.monotonicTime() -
it->second.start_time);
opcode = it->second.opcode;
requests_by_xid_.erase(it);
}

// Connect responses are special, they have no full reply header
// but just an XID with no zxid nor error fields like the ones
// available for all other server generated messages.
if (xid_code == XidCodes::CONNECT_XID) {
parseConnectResponse(data, offset, len);
parseConnectResponse(data, offset, len, latency);
return;
}

Expand All @@ -183,13 +204,13 @@ void DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint64_t& offset) {
const auto error = helper_.peekInt32(data, offset);
switch (xid_code) {
case XidCodes::PING_XID:
callbacks_.onResponse(OpCodes::PING, xid, zxid, error);
callbacks_.onResponse(OpCodes::PING, xid, zxid, error, latency);
return;
case XidCodes::AUTH_XID:
callbacks_.onResponse(OpCodes::SETAUTH, xid, zxid, error);
callbacks_.onResponse(OpCodes::SETAUTH, xid, zxid, error, latency);
return;
case XidCodes::SET_WATCHES_XID:
callbacks_.onResponse(OpCodes::SETWATCHES, xid, zxid, error);
callbacks_.onResponse(OpCodes::SETWATCHES, xid, zxid, error, latency);
return;
case XidCodes::WATCH_XID:
parseWatchEvent(data, offset, len, zxid, error);
Expand All @@ -198,16 +219,8 @@ void DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint64_t& offset) {
break;
}

// Find the corresponding request for this XID.
const auto it = requests_by_xid_.find(xid);

// If this fails, it's a server-side bug.
ASSERT(it != requests_by_xid_.end());

const auto opcode = it->second;
requests_by_xid_.erase(it);
callbacks_.onResponse(opcode, xid, zxid, error, latency);
offset += (len - (XID_LENGTH + ZXID_LENGTH + INT_LENGTH));
callbacks_.onResponse(opcode, xid, zxid, error);
}

void DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const {
Expand Down Expand Up @@ -479,7 +492,8 @@ void DecoderImpl::decode(Buffer::Instance& data, DecodeType dtype) {
}
}

void DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len) {
void DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len,
const std::chrono::milliseconds& latency) {
ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH);

const auto timeout = helper_.peekInt32(data, offset);
Expand All @@ -490,7 +504,7 @@ void DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset,

const bool readonly = maybeReadBool(data, offset);

callbacks_.onConnectResponse(0, timeout, readonly);
callbacks_.onConnectResponse(0, timeout, readonly, latency);
}

void DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offset, const uint32_t len,
Expand Down
22 changes: 16 additions & 6 deletions source/extensions/filters/network/zookeeper_proxy/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ class DecoderCallbacks {
virtual void onRemoveWatchesRequest(const std::string& path, int32_t type) PURE;
virtual void onCloseRequest() PURE;
virtual void onResponseBytes(uint64_t bytes) PURE;
virtual void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly) PURE;
virtual void onResponse(OpCodes opcode, int32_t xid, int64_t zxid, int32_t error) PURE;
virtual void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly,
const std::chrono::milliseconds& latency) PURE;
virtual void onResponse(OpCodes opcode, int32_t xid, int64_t zxid, int32_t error,
const std::chrono::milliseconds& latency) PURE;
virtual void onWatchEvent(int32_t event_type, int32_t client_state, const std::string& path,
int64_t zxid, int32_t error) PURE;
};
Expand All @@ -117,15 +119,21 @@ using DecoderPtr = std::unique_ptr<Decoder>;

class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
public:
explicit DecoderImpl(DecoderCallbacks& callbacks, uint32_t max_packet_bytes)
: callbacks_(callbacks), max_packet_bytes_(max_packet_bytes), helper_(max_packet_bytes) {}
explicit DecoderImpl(DecoderCallbacks& callbacks, uint32_t max_packet_bytes,
TimeSource& time_source)
: callbacks_(callbacks), max_packet_bytes_(max_packet_bytes), helper_(max_packet_bytes),
time_source_(time_source) {}

// ZooKeeperProxy::Decoder
void onData(Buffer::Instance& data) override;
void onWrite(Buffer::Instance& data) override;

private:
enum class DecodeType { READ, WRITE };
struct RequestBegin {
OpCodes opcode;
MonotonicTime start_time;
};

void decode(Buffer::Instance& data, DecodeType dtype);
void decodeOnData(Buffer::Instance& data, uint64_t& offset);
Expand All @@ -151,15 +159,17 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
void ensureMinLength(int32_t len, int32_t minlen) const;
void ensureMaxLength(int32_t len) const;
std::string pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len);
void parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len);
void parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len,
const std::chrono::milliseconds& latency);
void parseWatchEvent(Buffer::Instance& data, uint64_t& offset, uint32_t len, int64_t zxid,
int32_t error);
bool maybeReadBool(Buffer::Instance& data, uint64_t& offset);

DecoderCallbacks& callbacks_;
const uint32_t max_packet_bytes_;
BufferHelper helper_;
std::unordered_map<int32_t, OpCodes> requests_by_xid_;
TimeSource& time_source_;
std::unordered_map<int32_t, RequestBegin> requests_by_xid_;
};

} // namespace ZooKeeperProxy
Expand Down
28 changes: 21 additions & 7 deletions source/extensions/filters/network/zookeeper_proxy/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ ZooKeeperFilterConfig::ZooKeeperFilterConfig(const std::string& stat_prefix,
const uint32_t max_packet_bytes, Stats::Scope& scope)
: scope_(scope), max_packet_bytes_(max_packet_bytes), stats_(generateStats(stat_prefix, scope)),
stat_name_set_(scope.symbolTable()), stat_prefix_(stat_name_set_.add(stat_prefix)),
auth_(stat_name_set_.add("auth")) {
auth_(stat_name_set_.add("auth")),
connect_latency_(stat_name_set_.add("connect_response_latency")) {
// https://zookeeper.apache.org/doc/r3.5.4-beta/zookeeperProgrammers.html#sc_BuiltinACLSchemes
// lists commons schemes: "world", "auth", "digest", "host", "x509", and
// "ip". These are used in filter.cc by appending "_rq".
Expand All @@ -32,8 +33,8 @@ ZooKeeperFilterConfig::ZooKeeperFilterConfig(const std::string& stat_prefix,
stat_name_set_.rememberBuiltin("x509_rq");
}

ZooKeeperFilter::ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config)
: config_(std::move(config)), decoder_(createDecoder(*this)) {}
ZooKeeperFilter::ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config, TimeSource& time_source)
: config_(std::move(config)), decoder_(createDecoder(*this, time_source)) {}

void ZooKeeperFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
read_callbacks_ = &callbacks;
Expand All @@ -53,8 +54,8 @@ Network::FilterStatus ZooKeeperFilter::onWrite(Buffer::Instance& data, bool) {

Network::FilterStatus ZooKeeperFilter::onNewConnection() { return Network::FilterStatus::Continue; }

DecoderPtr ZooKeeperFilter::createDecoder(DecoderCallbacks& callbacks) {
return std::make_unique<DecoderImpl>(callbacks, config_->maxPacketBytes());
DecoderPtr ZooKeeperFilter::createDecoder(DecoderCallbacks& callbacks, TimeSource& time_source) {
return std::make_unique<DecoderImpl>(callbacks, config_->maxPacketBytes(), time_source);
}

void ZooKeeperFilter::setDynamicMetadata(const std::string& key, const std::string& value) {
Expand Down Expand Up @@ -249,16 +250,23 @@ void ZooKeeperFilter::onCloseRequest() {
}

void ZooKeeperFilter::onConnectResponse(const int32_t proto_version, const int32_t timeout,
const bool readonly) {
const bool readonly,
const std::chrono::milliseconds& latency) {
config_->stats_.connect_resp_.inc();

Stats::SymbolTable::StoragePtr storage =
config_->scope_.symbolTable().join({config_->stat_prefix_, config_->connect_latency_});
config_->scope_.histogramFromStatName(Stats::StatName(storage.get()))
.recordValue(latency.count());

setDynamicMetadata({{"opname", "connect_response"},
{"protocol_version", std::to_string(proto_version)},
{"timeout", std::to_string(timeout)},
{"readonly", std::to_string(readonly)}});
}

void ZooKeeperFilter::onResponse(const OpCodes opcode, const int32_t xid, const int64_t zxid,
const int32_t error) {
const int32_t error, const std::chrono::milliseconds& latency) {
std::string opname = "";

switch (opcode) {
Expand Down Expand Up @@ -362,6 +370,12 @@ void ZooKeeperFilter::onResponse(const OpCodes opcode, const int32_t xid, const
break;
}

Stats::SymbolTable::StoragePtr storage = config_->scope_.symbolTable().join(
{config_->stat_prefix_,
config_->stat_name_set_.getStatName(absl::StrCat(opname, "_latency"))});
config_->scope_.histogramFromStatName(Stats::StatName(storage.get()))
.recordValue(latency.count());

setDynamicMetadata({{"opname", opname},
{"xid", std::to_string(xid)},
{"zxid", std::to_string(zxid)},
Expand Down
11 changes: 7 additions & 4 deletions source/extensions/filters/network/zookeeper_proxy/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ZooKeeperFilterConfig {
Stats::StatNameSet stat_name_set_;
const Stats::StatName stat_prefix_;
const Stats::StatName auth_;
const Stats::StatName connect_latency_;

private:
ZooKeeperProxyStats generateStats(const std::string& prefix, Stats::Scope& scope) {
Expand All @@ -124,7 +125,7 @@ class ZooKeeperFilter : public Network::Filter,
DecoderCallbacks,
Logger::Loggable<Logger::Id::filter> {
public:
explicit ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config);
ZooKeeperFilter(ZooKeeperFilterConfigSharedPtr config, TimeSource& time_source);

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
Expand Down Expand Up @@ -159,12 +160,14 @@ class ZooKeeperFilter : public Network::Filter,
void onGetAllChildrenNumberRequest(const std::string& path) override;
void onCloseRequest() override;
void onResponseBytes(uint64_t bytes) override;
void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly) override;
void onResponse(OpCodes opcode, int32_t xid, int64_t zxid, int32_t error) override;
void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly,
const std::chrono::milliseconds& latency) override;
void onResponse(OpCodes opcode, int32_t xid, int64_t zxid, int32_t error,
const std::chrono::milliseconds& latency) override;
void onWatchEvent(int32_t event_type, int32_t client_state, const std::string& path, int64_t zxid,
int32_t error) override;

DecoderPtr createDecoder(DecoderCallbacks& callbacks);
DecoderPtr createDecoder(DecoderCallbacks& callbacks, TimeSource& time_source);
void setDynamicMetadata(const std::string& key, const std::string& value);
void setDynamicMetadata(const std::vector<std::pair<const std::string, const std::string>>& data);
void clearDynamicMetadata();
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/network/zookeeper_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ envoy_extension_cc_test(
deps = [
"//source/extensions/filters/network/zookeeper_proxy:config",
"//test/mocks/network:network_mocks",
"//test/test_common:simulated_time_system_lib",
],
)

Expand Down
14 changes: 13 additions & 1 deletion test/extensions/filters/network/zookeeper_proxy/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "extensions/filters/network/zookeeper_proxy/filter.h"

#include "test/mocks/network/mocks.h"
#include "test/test_common/simulated_time_system.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"
Expand Down Expand Up @@ -31,7 +32,7 @@ class ZooKeeperFilterTest : public testing::Test {

void initialize() {
config_ = std::make_shared<ZooKeeperFilterConfig>(stat_prefix_, 1048576, scope_);
filter_ = std::make_unique<ZooKeeperFilter>(config_);
filter_ = std::make_unique<ZooKeeperFilter>(config_, time_system_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down Expand Up @@ -489,6 +490,14 @@ class ZooKeeperFilterTest : public testing::Test {
EXPECT_EQ(1UL, stat.value());
EXPECT_EQ(20UL, config_->stats().response_bytes_.value());
EXPECT_EQ(0UL, config_->stats().decoder_error_.value());
const auto histogram_name =
fmt::format("test.zookeeper.{}_latency", metadata_values[0].find("opname")->second);
EXPECT_NE(absl::nullopt, findHistogram(histogram_name));
}

Stats::OptionalHistogram findHistogram(const std::string& name) {
Stats::StatNameManagedStorage storage(name, scope_.symbolTable());
return scope_.findHistogram(storage.statName());
}

Stats::IsolatedStoreImpl scope_;
Expand All @@ -497,6 +506,7 @@ class ZooKeeperFilterTest : public testing::Test {
std::string stat_prefix_{"test.zookeeper"};
NiceMock<Network::MockReadFilterCallbacks> filter_callbacks_;
NiceMock<Envoy::StreamInfo::MockStreamInfo> stream_info_;
Event::SimulatedTimeSystem time_system_;
};

TEST_F(ZooKeeperFilterTest, Connect) {
Expand All @@ -516,6 +526,7 @@ TEST_F(ZooKeeperFilterTest, Connect) {
EXPECT_EQ(1UL, config_->stats().connect_resp_.value());
EXPECT_EQ(24UL, config_->stats().response_bytes_.value());
EXPECT_EQ(0UL, config_->stats().decoder_error_.value());
EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.connect_response_latency"));
}

TEST_F(ZooKeeperFilterTest, ConnectReadonly) {
Expand All @@ -536,6 +547,7 @@ TEST_F(ZooKeeperFilterTest, ConnectReadonly) {
EXPECT_EQ(1UL, config_->stats().connect_resp_.value());
EXPECT_EQ(25UL, config_->stats().response_bytes_.value());
EXPECT_EQ(0UL, config_->stats().decoder_error_.value());
EXPECT_NE(absl::nullopt, findHistogram("test.zookeeper.connect_response_latency"));
}

TEST_F(ZooKeeperFilterTest, Fallback) {
Expand Down