Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions source/extensions/filters/network/kafka/mesh/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

# Kafka-mesh network filter.

envoy_extension_package()

envoy_cc_library(
name = "filter_lib",
srcs = ["filter.cc"],
hdrs = [
"filter.h",
],
tags = ["skip_on_windows"],
deps = [
":abstract_command_lib",
":request_processor_lib",
"//envoy/buffer:buffer_interface",
"//envoy/network:connection_interface",
"//envoy/network:filter_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"//source/extensions/filters/network/kafka:kafka_response_codec_lib",
],
)

envoy_cc_library(
name = "request_processor_lib",
srcs = [
"request_processor.cc",
],
hdrs = [
"request_processor.h",
],
tags = ["skip_on_windows"],
deps = [
":abstract_command_lib",
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
],
)

envoy_cc_library(
name = "abstract_command_lib",
srcs = [
"abstract_command.cc",
],
hdrs = [
"abstract_command.h",
],
tags = ["skip_on_windows"],
deps = [
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_response_lib",
"//source/extensions/filters/network/kafka:tagged_fields_lib",
],
)
28 changes: 28 additions & 0 deletions source/extensions/filters/network/kafka/mesh/abstract_command.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "source/extensions/filters/network/kafka/mesh/abstract_command.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

void BaseInFlightRequest::abandon() {
ENVOY_LOG(trace, "Abandoning request");
filter_active_ = false;
}

void BaseInFlightRequest::notifyFilter() {
if (filter_active_) {
ENVOY_LOG(trace, "Notifying filter for request");
filter_.onRequestReadyForAnswer();
} else {
ENVOY_LOG(trace, "Request has been finished, but we are not doing anything, because filter has "
"been already destroyed");
}
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
97 changes: 97 additions & 0 deletions source/extensions/filters/network/kafka/mesh/abstract_command.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include "source/common/common/logger.h"
#include "source/extensions/filters/network/kafka/kafka_response.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Represents single downstream client request.
* Responsible for performing the work on multiple upstream clusters and aggregating the results.
*/
class InFlightRequest {
Copy link
Contributor Author

@adamkotwasinski adamkotwasinski Jul 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementations of this interface will provide the proper handling to provide kafka-mesh features.
To have basic compliance with protocol we will require to handle the following types (these are links to stuff that will appear in "future PRs"):

public:
virtual ~InFlightRequest() = default;

/**
* Begins processing of given request with context provided.
*/
virtual void startProcessing() PURE;

/**
* Whether the given request has finished processing.
* E.g. produce requests need to be forwarded upstream and get a response from Kafka cluster for
* this to be true.
*/
virtual bool finished() const PURE;

/**
* Creates a Kafka answer object that can be sent downstream.
*/
virtual AbstractResponseSharedPtr computeAnswer() const PURE;

/**
* Abandon this request.
* In-flight requests that have been abandoned are not going to cause any action after they have
* finished processing.
*/
virtual void abandon() PURE;
};

using InFlightRequestSharedPtr = std::shared_ptr<InFlightRequest>;

/**
* Callback to be implemented by entities that are interested when the request has finished and has
* answer ready.
*/
// Impl note: Filter implements this interface to keep track of requests coming to it.
class AbstractRequestListener {
public:
virtual ~AbstractRequestListener() = default;

// Notifies the listener that a new request has been received.
virtual void onRequest(InFlightRequestSharedPtr request) PURE;

// Notified the listener, that the request finally has an answer ready.
// Usually this means that the request has been sent to upstream Kafka clusters and we got answers
// (unless it's something that could be responded to locally).
// IMPL: we do not need to pass request here, as filters need to answer in-order.
// What means that we always need to check if first answer is ready, even if the latter are
// already finished.
virtual void onRequestReadyForAnswer() PURE;
};

/**
* Helper base class for all in flight requests.
* Binds request to its origin filter.
*/
class BaseInFlightRequest : public InFlightRequest, protected Logger::Loggable<Logger::Id::kafka> {
public:
BaseInFlightRequest(AbstractRequestListener& filter) : filter_{filter} {};
void abandon() override;

protected:
/**
* Notify the originating filter that this request has an answer ready.
* This method is to be invoked by each request after it has finished processing.
* Obviously, if the filter is no longer active (connection got closed before we were ready to
* answer) nothing will happen.
*/
void notifyFilter();

// Filter that originated this request.
AbstractRequestListener& filter_;

// Whether the filter_ reference is still viable.
bool filter_active_ = true;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
102 changes: 102 additions & 0 deletions source/extensions/filters/network/kafka/mesh/filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include "source/extensions/filters/network/kafka/mesh/filter.h"

#include "envoy/network/connection.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/extensions/filters/network/kafka/external/requests.h"
#include "source/extensions/filters/network/kafka/external/responses.h"
#include "source/extensions/filters/network/kafka/response_codec.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

KafkaMeshFilter::KafkaMeshFilter(RequestDecoderSharedPtr request_decoder)
: request_decoder_{request_decoder} {}

KafkaMeshFilter::~KafkaMeshFilter() { abandonAllInFlightRequests(); }

Network::FilterStatus KafkaMeshFilter::onNewConnection() { return Network::FilterStatus::Continue; }

void KafkaMeshFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
read_filter_callbacks_ = &callbacks;
read_filter_callbacks_->connection().addConnectionCallbacks(*this);
}

Network::FilterStatus KafkaMeshFilter::onData(Buffer::Instance& data, bool) {
try {
request_decoder_->onData(data);
data.drain(data.length()); // All the bytes have been copied to decoder.
return Network::FilterStatus::StopIteration;
} catch (const EnvoyException& e) {
ENVOY_LOG(trace, "Could not process data from Kafka client: {}", e.what());
request_decoder_->reset();
// Something very wrong occurred, let's just close the connection.
read_filter_callbacks_->connection().close(Network::ConnectionCloseType::FlushWrite);
return Network::FilterStatus::StopIteration;
}
}

void KafkaMeshFilter::onEvent(Network::ConnectionEvent event) {
if (Network::ConnectionEvent::RemoteClose == event ||
Network::ConnectionEvent::LocalClose == event) {
// Connection is being closed but there might be some requests in flight, abandon them.
abandonAllInFlightRequests();
}
}

void KafkaMeshFilter::onAboveWriteBufferHighWatermark() {}

void KafkaMeshFilter::onBelowWriteBufferLowWatermark() {}

/**
* We have received a request we can actually process.
*/
void KafkaMeshFilter::onRequest(InFlightRequestSharedPtr request) {
requests_in_flight_.push_back(request);
request->startProcessing();
}

/**
* Our filter has been notified that a request that originated in this filter has an answer ready.
* Because the Kafka messages have ordering, we need to check all messages and can possibly send
* multiple answers in one go. This can happen if e.g. message 3 finishes first, then 2, then 1,
* what allows us to send 1, 2, 3 in one invocation.
*/
void KafkaMeshFilter::onRequestReadyForAnswer() {
while (!requests_in_flight_.empty()) {
InFlightRequestSharedPtr rq = requests_in_flight_.front();
if (rq->finished()) {
// The request has been finished, so we no longer need to store it.
requests_in_flight_.erase(requests_in_flight_.begin());

// And write the response downstream.
const AbstractResponseSharedPtr response = rq->computeAnswer();
Buffer::OwnedImpl buffer;
ResponseEncoder encoder{buffer};
encoder.encode(*response);
read_filter_callbacks_->connection().write(buffer, false);
} else {
break;
}
}
}

void KafkaMeshFilter::abandonAllInFlightRequests() {
for (const auto& request : requests_in_flight_) {
request->abandon();
}
requests_in_flight_.erase(requests_in_flight_.begin(), requests_in_flight_.end());
}

std::list<InFlightRequestSharedPtr>& KafkaMeshFilter::getRequestsInFlightForTest() {
return requests_in_flight_;
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
83 changes: 83 additions & 0 deletions source/extensions/filters/network/kafka/mesh/filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#pragma once

#include "envoy/common/time.h"
#include "envoy/network/connection.h"
#include "envoy/network/filter.h"
#include "envoy/stats/scope.h"

#include "source/common/common/logger.h"
#include "source/extensions/filters/network/kafka/external/requests.h"
#include "source/extensions/filters/network/kafka/mesh/abstract_command.h"
#include "source/extensions/filters/network/kafka/mesh/request_processor.h"
#include "source/extensions/filters/network/kafka/request_codec.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {
/**
* Main entry point.
* Decoded request bytes are passed to processor, that calls us back with enriched request.
* Request then gets invoked with upstream Kafka facade, which will (in future) maintain
* thread-local list of (enriched) Kafka producers. Filter is going to maintain a list of
* in-flight-request so it can send responses when they finish.
*
*
* +----------------+ <creates> +-----------------------+
* |RequestProcessor+----------------->AbstractInFlightRequest|
* +-------^--------+ +------^----------------+
* | |
* | |
* +-------+-------+ <in-flight-reference> |
* |KafkaMeshFilter+-------------------------+
* +-------+-------+
**/
class KafkaMeshFilter : public Network::ReadFilter,
public Network::ConnectionCallbacks,
public AbstractRequestListener,
private Logger::Loggable<Logger::Id::kafka> {
public:
// Visible for testing.
KafkaMeshFilter(RequestDecoderSharedPtr request_decoder);

// Non-trivial. See 'abandonAllInFlightRequests'.
~KafkaMeshFilter() override;

// Network::ReadFilter
Network::FilterStatus onNewConnection() override;
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override;
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

// AbstractRequestListener
void onRequest(InFlightRequestSharedPtr request) override;
void onRequestReadyForAnswer() override;

std::list<InFlightRequestSharedPtr>& getRequestsInFlightForTest();

private:
// Helper method invoked when connection gets dropped.
// Request references are going to be stored in 2 places: this filter (request's origin) and in
// UpstreamKafkaClient instances (to match pure-Kafka confirmations to the requests). Because
// filter can be destroyed before confirmations from Kafka are received, we are just going to mark
// related requests as abandoned, so they do not attempt to reference this filter anymore.
// Impl note: this is similar to what Redis filter does.
void abandonAllInFlightRequests();

const RequestDecoderSharedPtr request_decoder_;

Network::ReadFilterCallbacks* read_filter_callbacks_;

std::list<InFlightRequestSharedPtr> requests_in_flight_;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading