Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4ec6b33
WIP: Kafka codec
adamkotwasinski Sep 25, 2018
aa51c6b
Fix compile error after rebases
adamkotwasinski Nov 3, 2018
074f868
Remove all request types except OffsetCommit v0..v1 (for review)
adamkotwasinski Nov 14, 2018
f341df7
Remove bytes buffers (for review - unused in these requests)
adamkotwasinski Nov 14, 2018
12ece4f
Apply review fixes
adamkotwasinski Nov 14, 2018
431a64c
Apply review fixes
adamkotwasinski Nov 15, 2018
ac5f851
Introduce composite deserializers for 2, 3, 4 delegates
adamkotwasinski Nov 19, 2018
8cb67ee
Review fixes:
adamkotwasinski Nov 20, 2018
7e3f54c
Apply review fixes:
adamkotwasinski Nov 27, 2018
d0534c3
Introduce generated code for Kafka requests:
adamkotwasinski Jan 10, 2019
db7763e
Add python tool for generating Kafka request classes; keep all versio…
adamkotwasinski Nov 27, 2018
00aac5b
In case of parse errors, consume rest of request properly; apply clan…
adamkotwasinski Feb 21, 2019
255f120
Merge remote-tracking branch 'envoy/master' into mergetest
adamkotwasinski Feb 25, 2019
bbf0e08
Fix formatting and clang-tidy
adamkotwasinski Feb 26, 2019
76cc44f
Fix spelling
adamkotwasinski Feb 26, 2019
6f86d76
Fixes after review: string_view used instead of raw pointers; documen…
adamkotwasinski Mar 11, 2019
23c1b9d
Merge remote-tracking branch 'envoy/master' into HEAD
adamkotwasinski Mar 14, 2019
f369bfa
Fixes: spelling, clang-tidy, documentation, cleaning up internal API;…
adamkotwasinski Mar 19, 2019
28641b4
Download whole Kafka specification; test fixes
adamkotwasinski Mar 19, 2019
3b9e324
Activate kafka tests in builds; review fixes: documentation, formatting
adamkotwasinski Mar 22, 2019
89c1a59
Attempt to point to Kafka codec in extension build; fix buffer overfl…
adamkotwasinski Mar 22, 2019
acdbcb3
Explicitly provide type of test values in generated Kafka tests
adamkotwasinski Mar 22, 2019
f5f5f32
Reorganize test code
adamkotwasinski Mar 25, 2019
20c7bf1
Create separate test class for each of Kafka tests; add missing forma…
adamkotwasinski Mar 26, 2019
81c97c8
Put Kafka tests in dedicated namespaces to avoid duplicate mock class…
adamkotwasinski Mar 26, 2019
130d2ac
Merge remote-tracking branch 'envoy/master' into codec
adamkotwasinski Mar 27, 2019
d47c2c2
Kick CI
adamkotwasinski Mar 28, 2019
145f28d
Kick CI
adamkotwasinski Mar 28, 2019
11d9288
Kick CI
adamkotwasinski Mar 28, 2019
ebe2134
Merge remote-tracking branch 'envoy/master' into codec
adamkotwasinski Apr 15, 2019
7f1abca
Put generated files in directories named 'external', so they do not g…
adamkotwasinski Apr 25, 2019
36f76c7
Add missing test to NullableArrayDeserializer
adamkotwasinski Apr 25, 2019
98c7752
Refactoring:
adamkotwasinski Apr 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def api_dependencies():
locations = REPOSITORY_LOCATIONS,
build_file_content = OPENCENSUSTRACE_BUILD_CONTENT,
)
envoy_http_archive(
name = "kafka_source",
locations = REPOSITORY_LOCATIONS,
build_file_content = KAFKASOURCE_BUILD_CONTENT,
)

GOOGLEAPIS_BUILD_CONTENT = """
load("@com_google_protobuf//:protobuf.bzl", "cc_proto_library", "py_proto_library")
Expand Down Expand Up @@ -285,3 +290,15 @@ go_proto_library(
visibility = ["//visibility:public"],
)
"""

KAFKASOURCE_BUILD_CONTENT = """

filegroup(
name = "request_protocol_files",
srcs = glob([
"*Request.json",
]),
visibility = ["//visibility:public"],
)

"""
7 changes: 7 additions & 0 deletions api/bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ GOOGLEAPIS_SHA = "16f5b2e8bf1e747a32f9a62e211f8f33c94645492e9bbd72458061d9a9de1f
PROMETHEUS_GIT_SHA = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" # Nov 17, 2017
PROMETHEUS_SHA = "783bdaf8ee0464b35ec0c8704871e1e72afa0005c3f3587f65d9d6694bf3911b"

KAFKA_SOURCE_SHA = "ae7a1696c0a0302b43c5b21e515c37e6ecd365941f68a510a7e442eebddf39a1" # 2.2.0-rc2

REPOSITORY_LOCATIONS = dict(
bazel_skylib = dict(
sha256 = BAZEL_SKYLIB_SHA256,
Expand Down Expand Up @@ -48,4 +50,9 @@ REPOSITORY_LOCATIONS = dict(
strip_prefix = "opencensus-proto-" + OPENCENSUS_RELEASE + "/src/opencensus/proto/trace/v1",
urls = ["https://github.com/census-instrumentation/opencensus-proto/archive/v" + OPENCENSUS_RELEASE + ".tar.gz"],
),
kafka_source = dict(
sha256 = KAFKA_SOURCE_SHA,
strip_prefix = "kafka-2.2.0-rc2/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.2.0-rc2.zip"],
),
)
1 change: 1 addition & 0 deletions source/common/common/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Logger {
FUNCTION(http2) \
FUNCTION(hystrix) \
FUNCTION(init) \
FUNCTION(kafka) \
FUNCTION(lua) \
FUNCTION(main) \
FUNCTION(misc) \
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ EXTENSIONS = {
"envoy.filters.network.echo": "//source/extensions/filters/network/echo:config",
"envoy.filters.network.ext_authz": "//source/extensions/filters/network/ext_authz:config",
"envoy.filters.network.http_connection_manager": "//source/extensions/filters/network/http_connection_manager:config",
# NOTE: Kafka filter does not have a proper filter implemented right now. We are referencing to
# codec implementation that is going to be used by the filter.
"envoy.filters.network.kafka": "//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"envoy.filters.network.mongo_proxy": "//source/extensions/filters/network/mongo_proxy:config",
"envoy.filters.network.mysql_proxy": "//source/extensions/filters/network/mysql_proxy:config",
"envoy.filters.network.ratelimit": "//source/extensions/filters/network/ratelimit:config",
Expand Down
137 changes: 137 additions & 0 deletions source/extensions/filters/network/kafka/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
licenses(["notice"]) # Apache 2

# Kafka network filter.
# Public docs: docs/root/configuration/network_filters/kafka_filter.rst

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "kafka_request_codec_lib",
srcs = ["request_codec.cc"],
hdrs = [
"codec.h",
"request_codec.h",
],
deps = [
":kafka_request_parser_lib",
"//source/common/buffer:buffer_lib",
],
)

envoy_cc_library(
name = "kafka_request_parser_lib",
srcs = [
"external/kafka_request_resolver.cc",
"kafka_request_parser.cc",
],
hdrs = [
"external/requests.h",
"kafka_request_parser.h",
],
deps = [
":kafka_request_lib",
":parser_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "kafka_request_lib",
srcs = [
],
hdrs = [
"kafka_request.h",
],
deps = [
":serialization_lib",
],
)

genrule(
name = "kafka_generated_source",
srcs = [
"@kafka_source//:request_protocol_files",
],
outs = [
"external/requests.h",
"external/kafka_request_resolver.cc",
],
cmd = """
./$(location :kafka_code_generator) generate-source \
$(location external/requests.h) $(location external/kafka_request_resolver.cc) \
$(SRCS)
""",
tools = [
":kafka_code_generator",
],
)

py_binary(
name = "kafka_code_generator",
srcs = ["protocol_code_generator/kafka_generator.py"],
data = glob(["protocol_code_generator/*.j2"]),
main = "protocol_code_generator/kafka_generator.py",
deps = ["@com_github_pallets_jinja//:jinja2"],
)

envoy_cc_library(
name = "parser_lib",
hdrs = ["parser.h"],
deps = [
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "serialization_lib",
hdrs = [
"external/serialization_composite.h",
"serialization.h",
],
deps = [
":kafka_types_lib",
"//include/envoy/buffer:buffer_interface",
"//source/common/common:byte_order_lib",
],
)

genrule(
name = "serialization_composite_generated_source",
srcs = [],
outs = [
"external/serialization_composite.h",
],
cmd = """
./$(location :serialization_composite_generator) generate-source \
$(location external/serialization_composite.h)
""",
tools = [
":serialization_composite_generator",
],
)

py_binary(
name = "serialization_composite_generator",
srcs = ["serialization_code_generator/serialization_composite_generator.py"],
data = glob(["serialization_code_generator/*.j2"]),
main = "serialization_code_generator/serialization_composite_generator.py",
deps = ["@com_github_pallets_jinja//:jinja2"],
)

envoy_cc_library(
name = "kafka_types_lib",
hdrs = [
"kafka_types.h",
],
external_deps = ["abseil_optional"],
deps = [
"//source/common/common:macros",
],
)
43 changes: 43 additions & 0 deletions source/extensions/filters/network/kafka/codec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Kafka message decoder.
*/
class MessageDecoder {
public:
virtual ~MessageDecoder() = default;

/**
* Processes given buffer attempting to decode messages contained within.
* @param data buffer instance.
*/
virtual void onData(Buffer::Instance& data) PURE;
};

/**
* Kafka message encoder.
* @param MessageType encoded message type (request or response).
*/
template <typename MessageType> class MessageEncoder {
public:
virtual ~MessageEncoder() = default;

/**
* Encodes given message.
* @param message message to be encoded.
*/
virtual void encode(const MessageType& message) PURE;
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
112 changes: 112 additions & 0 deletions source/extensions/filters/network/kafka/kafka_request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#pragma once

#include "envoy/common/exception.h"

#include "extensions/filters/network/kafka/external/serialization_composite.h"
#include "extensions/filters/network/kafka/serialization.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Represents fields that are present in every Kafka request message.
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
struct RequestHeader {
int16_t api_key_;
int16_t api_version_;
int32_t correlation_id_;
NullableString client_id_;

bool operator==(const RequestHeader& rhs) const {
return api_key_ == rhs.api_key_ && api_version_ == rhs.api_version_ &&
correlation_id_ == rhs.correlation_id_ && client_id_ == rhs.client_id_;
};
};

/**
* Carries information that could be extracted during the failed parse.
*/
class RequestParseFailure {
public:
RequestParseFailure(const RequestHeader& request_header) : request_header_{request_header} {};

/**
* Request's header.
*/
const RequestHeader request_header_;
};

typedef std::shared_ptr<RequestParseFailure> RequestParseFailureSharedPtr;

/**
* Abstract Kafka request.
* Contains data present in every request (the header with request key, version, etc.).
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
class AbstractRequest {
public:
virtual ~AbstractRequest() = default;

/**
* Constructs a request with given header data.
* @param request_header request's header.
*/
AbstractRequest(const RequestHeader& request_header) : request_header_{request_header} {};

/**
* Encode the contents of this message into a given buffer.
* @param dst buffer instance to keep serialized message
*/
virtual size_t encode(Buffer::Instance& dst) const PURE;

/**
* Request's header.
*/
const RequestHeader request_header_;
};

typedef std::shared_ptr<AbstractRequest> AbstractRequestSharedPtr;

/**
* Concrete request that carries data particular to given request type.
* @param Data concrete request data type.
*/
template <typename Data> class Request : public AbstractRequest {
public:
/**
* Request header fields need to be initialized by user in case of newly created requests.
*/
Request(const RequestHeader& request_header, const Data& data)
: AbstractRequest{request_header}, data_{data} {};

/**
* Encodes given request into a buffer, with any extra configuration carried by the context.
*/
size_t encode(Buffer::Instance& dst) const override {
EncodingContext context{request_header_.api_version_};
size_t written{0};
// Encode request header.
written += context.encode(request_header_.api_key_, dst);
written += context.encode(request_header_.api_version_, dst);
written += context.encode(request_header_.correlation_id_, dst);
written += context.encode(request_header_.client_id_, dst);
// Encode request-specific data.
written += context.encode(data_, dst);
return written;
}

bool operator==(const Request<Data>& rhs) const {
return request_header_ == rhs.request_header_ && data_ == rhs.data_;
};

private:
const Data data_;
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading