Skip to content
35 changes: 35 additions & 0 deletions docs/root/configuration/network_filters/zookeeper_proxy_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,34 @@ following statistics:
checkwatches_rq, Counter, Number of checkwatches requests
removewatches_rq, Counter, Number of removewatches requests
check_rq, Counter, Number of check requests
response_bytes, Counter, Number of bytes in decoded response messages
connect_resp, Counter, Number of connect responses
ping_resp, Counter, Number of ping responses
auth_resp, Counter, Number of auth responses
watch_event, Counter, Number of watch events fired by the server
getdata_resp, Counter, Number of getdata responses
create_resp, Counter, Number of create responses
create2_resp, Counter, Number of create2 responses
createcontainer_resp, Counter, Number of createcontainer responses
createttl_resp, Counter, Number of createttl responses
setdata_resp, Counter, Number of setdata responses
getchildren_resp, Counter, Number of getchildren responses
getchildren2_resp, Counter, Number of getchildren2 responses
getephemerals_resp, Counter, Number of getephemerals responses
getallchildrennumber_resp, Counter, Number of getallchildrennumber responses
remove_resp, Counter, Number of remove responses
exists_resp, Counter, Number of exists responses
getacl_resp, Counter, Number of getacl responses
setacl_resp, Counter, Number of setacl responses
sync_resp, Counter, Number of sync responses
multi_resp, Counter, Number of multi responses
reconfig_resp, Counter, Number of reconfig responses
close_resp, Counter, Number of close responses
setauth_resp, Counter, Number of setauth responses
setwatches_resp, Counter, Number of setwatches responses
checkwatches_resp, Counter, Number of checkwatches responses
removewatches_resp, Counter, Number of removewatches responses
check_resp, Counter, Number of check responses

.. _config_network_filters_zookeeper_proxy_dynamic_metadata:

Expand All @@ -90,3 +118,10 @@ The ZooKeeper filter emits the following dynamic metadata for each message parse
<bytes>, string, "The size of the request message in bytes"
<watch>, string, "True if a watch is being set, false otherwise"
<version>, string, "The version parameter, if any, given with the request"
<timeout>, string, "The timeout parameter in a connect response"
<protocol_version>, string, "The protocol version in a connect response"
<readonly>, string, "The readonly flag in a connect response"
<zxid>, string, "The zxid field in a response header"
<error>, string, "The error field in a response header"
<client_state>, string, "The state field in a watch event"
<event_type>, string, "The event type in a a watch event"
144 changes: 122 additions & 22 deletions source/extensions/filters/network/zookeeper_proxy/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <string>

#include "common/common/enum_to_int.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
Expand All @@ -16,6 +18,8 @@ constexpr uint32_t ZXID_LENGTH = 8;
constexpr uint32_t TIMEOUT_LENGTH = 4;
constexpr uint32_t SESSION_LENGTH = 8;
constexpr uint32_t MULTI_HEADER_LENGTH = 9;
constexpr uint32_t PROTOCOL_VERSION_LENGTH = 4;
constexpr uint32_t SERVER_HEADER_LENGTH = 16;

const char* createFlagsToString(CreateFlags flags) {
switch (flags) {
Expand All @@ -38,17 +42,9 @@ const char* createFlagsToString(CreateFlags flags) {
return "unknown";
}

void DecoderImpl::decode(Buffer::Instance& data, uint64_t& offset) {
ENVOY_LOG(trace, "zookeeper_proxy: decoding {} bytes at offset {}", data.length(), offset);

// Reset the helper's cursor, to ensure the current message stays within the
// allowed max length, even when it's different than the declared length
// by the message.
//
// Note: we need to keep two cursors — offset and helper_'s internal one — because
// a buffer may contain multiple messages, so offset is global and helper_'s
// internal cursor is reset for each individual message.
helper_.reset();
void DecoderImpl::decodeOnData(Buffer::Instance& data, uint64_t& offset) {
ENVOY_LOG(trace, "zookeeper_proxy: decoding request with {} bytes at offset {}", data.length(),
offset);

// Check message length.
const int32_t len = helper_.peekInt32(data, offset);
Expand Down Expand Up @@ -93,8 +89,8 @@ void DecoderImpl::decode(Buffer::Instance& data, uint64_t& offset) {
// for two cases: auth requests can happen at any time and ping requests
// must happen every 1/3 of the negotiated session timeout, to keep
// the session alive.
const int32_t opcode = helper_.peekInt32(data, offset);
switch (static_cast<OpCodes>(opcode)) {
const auto opcode = static_cast<OpCodes>(helper_.peekInt32(data, offset));
switch (opcode) {
case OpCodes::GETDATA:
parseGetDataRequest(data, offset, len);
break;
Expand Down Expand Up @@ -156,8 +152,62 @@ void DecoderImpl::decode(Buffer::Instance& data, uint64_t& offset) {
callbacks_.onCloseRequest();
break;
default:
throw EnvoyException(fmt::format("Unknown opcode: {}", opcode));
throw EnvoyException(fmt::format("Unknown opcode: {}", enumToSignedInt(opcode)));
}

requests_by_xid_[xid] = opcode;
}

void DecoderImpl::decodeOnWrite(Buffer::Instance& data, uint64_t& offset) {
ENVOY_LOG(trace, "zookeeper_proxy: decoding response with {} bytes at offset {}", data.length(),
offset);

// Check message length.
const int32_t len = helper_.peekInt32(data, offset);
ensureMinLength(len, INT_LENGTH + XID_LENGTH);
ensureMaxLength(len);

const auto xid = helper_.peekInt32(data, offset);
const auto xid_code = static_cast<XidCodes>(xid);

// Connect responses are special, they have no full reply header
// but just an XID with no zxid nor error fields like the ones
// available for all other server generated messages.
if (xid_code == XidCodes::CONNECT_XID) {
parseConnectResponse(data, offset, len);
return;
}

// Control responses that aren't connect, with XIDs <= 0.
const auto zxid = helper_.peekInt64(data, offset);
const auto error = helper_.peekInt32(data, offset);
switch (xid_code) {
case XidCodes::PING_XID:
callbacks_.onResponse(OpCodes::PING, xid, zxid, error);
return;
case XidCodes::AUTH_XID:
callbacks_.onResponse(OpCodes::SETAUTH, xid, zxid, error);
return;
case XidCodes::SET_WATCHES_XID:
callbacks_.onResponse(OpCodes::SETWATCHES, xid, zxid, error);
return;
case XidCodes::WATCH_XID:
parseWatchEvent(data, offset, len, zxid, error);
return;
default:
break;
}

// Find the corresponding request for this XID.
const auto it = requests_by_xid_.find(xid);

// If this fails, it's a server-side bug.
ASSERT(it != requests_by_xid_.end());

const auto opcode = it->second;
requests_by_xid_.erase(it);
Comment thread
rgs1 marked this conversation as resolved.
offset += (len - (XID_LENGTH + ZXID_LENGTH + INT_LENGTH));
callbacks_.onResponse(opcode, xid, zxid, error);
}

void DecoderImpl::ensureMinLength(const int32_t len, const int32_t minlen) const {
Expand All @@ -181,11 +231,7 @@ void DecoderImpl::parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_
// Skip password.
skipString(data, offset);

// Read readonly flag, if it's there.
bool readonly{};
if (data.length() >= offset + 1) {
readonly = helper_.peekBool(data, offset);
}
const bool readonly = maybeReadBool(data, offset);

callbacks_.onConnect(readonly);
}
Expand Down Expand Up @@ -397,20 +443,74 @@ void DecoderImpl::skipStrings(Buffer::Instance& data, uint64_t& offset) {
}
}

void DecoderImpl::onData(Buffer::Instance& data) {
void DecoderImpl::onData(Buffer::Instance& data) { decode(data, DecodeType::READ); }

void DecoderImpl::onWrite(Buffer::Instance& data) { decode(data, DecodeType::WRITE); }

void DecoderImpl::decode(Buffer::Instance& data, DecodeType dtype) {
uint64_t offset = 0;

try {
while (offset < data.length()) {
// Reset the helper's cursor, to ensure the current message stays within the
// allowed max length, even when it's different than the declared length
// by the message.
//
// Note: we need to keep two cursors — offset and helper_'s internal one — because
// a buffer may contain multiple messages, so offset is global and helper_'s
// internal cursor is reset for each individual message.
helper_.reset();

const uint64_t current = offset;
decode(data, offset);
callbacks_.onRequestBytes(offset - current);
switch (dtype) {
case DecodeType::READ:
decodeOnData(data, offset);
callbacks_.onRequestBytes(offset - current);
Comment thread
rgs1 marked this conversation as resolved.
break;
case DecodeType::WRITE:
decodeOnWrite(data, offset);
callbacks_.onResponseBytes(offset - current);
break;
}
}
} catch (const EnvoyException& e) {
ENVOY_LOG(debug, "zookeeper_proxy: decoding exception {}", e.what());
callbacks_.onDecodeError();
}
}

void DecoderImpl::parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len) {
ensureMinLength(len, PROTOCOL_VERSION_LENGTH + TIMEOUT_LENGTH + SESSION_LENGTH + INT_LENGTH);

const auto timeout = helper_.peekInt32(data, offset);

// Skip session id + password.
offset += SESSION_LENGTH;
skipString(data, offset);

const bool readonly = maybeReadBool(data, offset);

callbacks_.onConnectResponse(0, timeout, readonly);
}

void DecoderImpl::parseWatchEvent(Buffer::Instance& data, uint64_t& offset, const uint32_t len,
const int64_t zxid, const int32_t error) {
ensureMinLength(len, SERVER_HEADER_LENGTH + (3 * INT_LENGTH));
Comment thread
rgs1 marked this conversation as resolved.

const auto event_type = helper_.peekInt32(data, offset);
const auto client_state = helper_.peekInt32(data, offset);
const auto path = helper_.peekString(data, offset);

callbacks_.onWatchEvent(event_type, client_state, path, zxid, error);
}

bool DecoderImpl::maybeReadBool(Buffer::Instance& data, uint64_t& offset) {
if (data.length() >= offset + 1) {
return helper_.peekBool(data, offset);
}
return false;
}

} // namespace ZooKeeperProxy
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
18 changes: 17 additions & 1 deletion source/extensions/filters/network/zookeeper_proxy/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class DecoderCallbacks {
virtual void onCheckWatchesRequest(const std::string& path, int32_t type) PURE;
virtual void onRemoveWatchesRequest(const std::string& path, int32_t type) PURE;
virtual void onCloseRequest() PURE;
virtual void onResponseBytes(uint64_t bytes) PURE;
virtual void onConnectResponse(int32_t proto_version, int32_t timeout, bool readonly) PURE;
virtual void onResponse(OpCodes opcode, int32_t xid, int64_t zxid, int32_t error) PURE;
virtual void onWatchEvent(int32_t event_type, int32_t client_state, const std::string& path,
int64_t zxid, int32_t error) PURE;
};

/**
Expand All @@ -105,6 +110,7 @@ class Decoder {
virtual ~Decoder() = default;

virtual void onData(Buffer::Instance& data) PURE;
virtual void onWrite(Buffer::Instance& data) PURE;
};

using DecoderPtr = std::unique_ptr<Decoder>;
Expand All @@ -116,9 +122,14 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {

// ZooKeeperProxy::Decoder
void onData(Buffer::Instance& data) override;
void onWrite(Buffer::Instance& data) override;

private:
void decode(Buffer::Instance& data, uint64_t& offset);
enum class DecodeType { READ, WRITE };

void decode(Buffer::Instance& data, DecodeType dtype);
void decodeOnData(Buffer::Instance& data, uint64_t& offset);
void decodeOnWrite(Buffer::Instance& data, uint64_t& offset);
void parseConnect(Buffer::Instance& data, uint64_t& offset, uint32_t len);
void parseAuthRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len);
void parseGetDataRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len);
Expand All @@ -140,10 +151,15 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
void ensureMinLength(int32_t len, int32_t minlen) const;
void ensureMaxLength(int32_t len) const;
std::string pathOnlyRequest(Buffer::Instance& data, uint64_t& offset, uint32_t len);
void parseConnectResponse(Buffer::Instance& data, uint64_t& offset, uint32_t len);
void parseWatchEvent(Buffer::Instance& data, uint64_t& offset, uint32_t len, int64_t zxid,
int32_t error);
bool maybeReadBool(Buffer::Instance& data, uint64_t& offset);

DecoderCallbacks& callbacks_;
const uint32_t max_packet_bytes_;
BufferHelper helper_;
std::unordered_map<int32_t, OpCodes> requests_by_xid_;
};

} // namespace ZooKeeperProxy
Expand Down
Loading