Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4ec6b33
WIP: Kafka codec
adamkotwasinski Sep 25, 2018
aa51c6b
Fix compile error after rebases
adamkotwasinski Nov 3, 2018
074f868
Remove all request types except OffsetCommit v0..v1 (for review)
adamkotwasinski Nov 14, 2018
f341df7
Remove bytes buffers (for review - unused in these requests)
adamkotwasinski Nov 14, 2018
12ece4f
Apply review fixes
adamkotwasinski Nov 14, 2018
431a64c
Apply review fixes
adamkotwasinski Nov 15, 2018
ac5f851
Introduce composite deserializers for 2, 3, 4 delegates
adamkotwasinski Nov 19, 2018
8cb67ee
Review fixes:
adamkotwasinski Nov 20, 2018
7e3f54c
Apply review fixes:
adamkotwasinski Nov 27, 2018
d0534c3
Introduce generated code for Kafka requests:
adamkotwasinski Jan 10, 2019
db7763e
Add python tool for generating Kafka request classes; keep all versio…
adamkotwasinski Nov 27, 2018
00aac5b
In case of parse errors, consume rest of request properly; apply clan…
adamkotwasinski Feb 21, 2019
255f120
Merge remote-tracking branch 'envoy/master' into mergetest
adamkotwasinski Feb 25, 2019
bbf0e08
Fix formatting and clang-tidy
adamkotwasinski Feb 26, 2019
76cc44f
Fix spelling
adamkotwasinski Feb 26, 2019
6f86d76
Fixes after review: string_view used instead of raw pointers; documen…
adamkotwasinski Mar 11, 2019
23c1b9d
Merge remote-tracking branch 'envoy/master' into HEAD
adamkotwasinski Mar 14, 2019
f369bfa
Fixes: spelling, clang-tidy, documentation, cleaning up internal API;…
adamkotwasinski Mar 19, 2019
28641b4
Download whole Kafka specification; test fixes
adamkotwasinski Mar 19, 2019
3b9e324
Activate kafka tests in builds; review fixes: documentation, formatting
adamkotwasinski Mar 22, 2019
89c1a59
Attempt to point to Kafka codec in extension build; fix buffer overfl…
adamkotwasinski Mar 22, 2019
acdbcb3
Explicitly provide type of test values in generated Kafka tests
adamkotwasinski Mar 22, 2019
f5f5f32
Reorganize test code
adamkotwasinski Mar 25, 2019
20c7bf1
Create separate test class for each of Kafka tests; add missing forma…
adamkotwasinski Mar 26, 2019
81c97c8
Put Kafka tests in dedicated namespaces to avoid duplicate mock class…
adamkotwasinski Mar 26, 2019
130d2ac
Merge remote-tracking branch 'envoy/master' into codec
adamkotwasinski Mar 27, 2019
d47c2c2
Kick CI
adamkotwasinski Mar 28, 2019
145f28d
Kick CI
adamkotwasinski Mar 28, 2019
11d9288
Kick CI
adamkotwasinski Mar 28, 2019
ebe2134
Merge remote-tracking branch 'envoy/master' into codec
adamkotwasinski Apr 15, 2019
7f1abca
Put generated files in directories named 'external', so they do not g…
adamkotwasinski Apr 25, 2019
36f76c7
Add missing test to NullableArrayDeserializer
adamkotwasinski Apr 25, 2019
98c7752
Refactoring:
adamkotwasinski Apr 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def api_dependencies():
locations = REPOSITORY_LOCATIONS,
build_file_content = OPENCENSUSTRACE_BUILD_CONTENT,
)
envoy_http_archive(
name = "kafka_source",
locations = REPOSITORY_LOCATIONS,
build_file_content = KAFKASOURCE_BUILD_CONTENT,
)

GOOGLEAPIS_BUILD_CONTENT = """
load("@com_google_protobuf//:protobuf.bzl", "cc_proto_library", "py_proto_library")
Expand Down Expand Up @@ -285,3 +290,15 @@ go_proto_library(
visibility = ["//visibility:public"],
)
"""

KAFKASOURCE_BUILD_CONTENT = """

filegroup(
name = "request_protocol_files",
srcs = glob([
"*Request.json",
]),
visibility = ["//visibility:public"],
)

"""
7 changes: 7 additions & 0 deletions api/bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ GOOGLEAPIS_SHA = "16f5b2e8bf1e747a32f9a62e211f8f33c94645492e9bbd72458061d9a9de1f
PROMETHEUS_GIT_SHA = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" # Nov 17, 2017
PROMETHEUS_SHA = "783bdaf8ee0464b35ec0c8704871e1e72afa0005c3f3587f65d9d6694bf3911b"

KAFKA_SOURCE_SHA = "ae7a1696c0a0302b43c5b21e515c37e6ecd365941f68a510a7e442eebddf39a1" # 2.2.0-rc2

REPOSITORY_LOCATIONS = dict(
bazel_skylib = dict(
sha256 = BAZEL_SKYLIB_SHA256,
Expand Down Expand Up @@ -48,4 +50,9 @@ REPOSITORY_LOCATIONS = dict(
strip_prefix = "opencensus-proto-" + OPENCENSUS_RELEASE + "/src/opencensus/proto/trace/v1",
urls = ["https://github.com/census-instrumentation/opencensus-proto/archive/v" + OPENCENSUS_RELEASE + ".tar.gz"],
),
kafka_source = dict(
sha256 = KAFKA_SOURCE_SHA,
strip_prefix = "kafka-2.2.0-rc2/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.2.0-rc2.zip"],
),
)
1 change: 1 addition & 0 deletions source/common/common/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Logger {
FUNCTION(http2) \
FUNCTION(hystrix) \
FUNCTION(init) \
FUNCTION(kafka) \
FUNCTION(lua) \
FUNCTION(main) \
FUNCTION(misc) \
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ EXTENSIONS = {
"envoy.filters.network.echo": "//source/extensions/filters/network/echo:config",
"envoy.filters.network.ext_authz": "//source/extensions/filters/network/ext_authz:config",
"envoy.filters.network.http_connection_manager": "//source/extensions/filters/network/http_connection_manager:config",
# NOTE: Kafka filter does not have a proper filter implemented right now. We are referencing to
# codec implementation that is going to be used by the filter.
"envoy.filters.network.kafka": "//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"envoy.filters.network.mongo_proxy": "//source/extensions/filters/network/mongo_proxy:config",
"envoy.filters.network.mysql_proxy": "//source/extensions/filters/network/mysql_proxy:config",
"envoy.filters.network.ratelimit": "//source/extensions/filters/network/ratelimit:config",
Expand Down
139 changes: 139 additions & 0 deletions source/extensions/filters/network/kafka/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
licenses(["notice"]) # Apache 2

# Kafka network filter.
# Public docs: docs/root/configuration/network_filters/kafka_filter.rst

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "kafka_request_codec_lib",
srcs = ["request_codec.cc"],
hdrs = [
"codec.h",
"request_codec.h",
],
deps = [
":kafka_request_lib",
":message_lib",
"//source/common/buffer:buffer_lib",
],
)

envoy_cc_library(
name = "kafka_request_lib",
srcs = [
"kafka_request_parser.cc",
"kafka_request_resolver.cc",
],
hdrs = [
"kafka_request.h",
"kafka_request_parser.h",
"requests.h",
],
deps = [
":parser_lib",
":serialization_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
],
)

genrule(
name = "kafka_generated_source",
srcs = [
"@kafka_source//:request_protocol_files",
],
outs = [
"requests.h",
"kafka_request_resolver.cc",
],
cmd = """
./$(location :kafka_code_generator) generate-source \
$(location requests.h) $(location kafka_request_resolver.cc) \
$(SRCS)
""",
tools = [
":kafka_code_generator",
],
)

py_binary(
name = "kafka_code_generator",
srcs = ["protocol_code_generator/kafka_generator.py"],
data = glob(["protocol_code_generator/*.j2"]),
main = "protocol_code_generator/kafka_generator.py",
deps = ["@com_github_pallets_jinja//:jinja2"],
)

envoy_cc_library(
name = "parser_lib",
hdrs = ["parser.h"],
deps = [
":kafka_protocol_lib",
":message_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "message_lib",
hdrs = [
"message.h",
],
deps = [
"//include/envoy/buffer:buffer_interface",
],
)

envoy_cc_library(
name = "serialization_lib",
hdrs = [
"serialization.h",
"serialization_composite.h",
],
deps = [
":kafka_protocol_lib",
"//include/envoy/buffer:buffer_interface",
"//source/common/common:byte_order_lib",
],
)

genrule(
name = "serialization_composite_generated_source",
srcs = [],
outs = [
"serialization_composite.h",
],
cmd = """
./$(location :serialization_composite_generator) generate-source \
$(location serialization_composite.h)
""",
tools = [
":serialization_composite_generator",
],
)

py_binary(
name = "serialization_composite_generator",
srcs = ["serialization_code_generator/serialization_composite_generator.py"],
data = glob(["serialization_code_generator/*.j2"]),
main = "serialization_code_generator/serialization_composite_generator.py",
deps = ["@com_github_pallets_jinja//:jinja2"],
)

envoy_cc_library(
name = "kafka_protocol_lib",
hdrs = [
"kafka_types.h",
],
external_deps = ["abseil_optional"],
deps = [
"//source/common/common:macros",
],
)
44 changes: 44 additions & 0 deletions source/extensions/filters/network/kafka/codec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"

#include "extensions/filters/network/kafka/message.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Kafka message decoder.
*/
class MessageDecoder {
public:
virtual ~MessageDecoder() = default;

/**
* Processes given buffer attempting to decode messages contained within.
* @param data buffer instance
*/
virtual void onData(Buffer::Instance& data) PURE;
Comment thread
mattklein123 marked this conversation as resolved.
};

/**
* Kafka message encoder.
*/
class MessageEncoder {
public:
virtual ~MessageEncoder() = default;

/**
* Encodes given message.
* @param message message to be encoded
*/
virtual void encode(const Message& message) PURE;
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
100 changes: 100 additions & 0 deletions source/extensions/filters/network/kafka/kafka_request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#pragma once

#include "envoy/common/exception.h"

#include "extensions/filters/network/kafka/message.h"
#include "extensions/filters/network/kafka/serialization.h"
#include "extensions/filters/network/kafka/serialization_composite.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Represents fields that are present in every Kafka request message.
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
struct RequestHeader {
Comment thread
mattklein123 marked this conversation as resolved.
Outdated
int16_t api_key_;
int16_t api_version_;
int32_t correlation_id_;
NullableString client_id_;

bool operator==(const RequestHeader& rhs) const {
return api_key_ == rhs.api_key_ && api_version_ == rhs.api_version_ &&
correlation_id_ == rhs.correlation_id_ && client_id_ == rhs.client_id_;
};
};

/**
* Abstract Kafka request.
* Contains data present in every request (the header with request key, version, etc.).
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
class AbstractRequest : public Message {
public:
AbstractRequest(const RequestHeader& request_header) : request_header_{request_header} {};

/**
* Request's header.
*/
const RequestHeader request_header_;
};

/**
* Concrete request that carries data particular to given request type.
*/
template <typename RequestData> class ConcreteRequest : public AbstractRequest {
public:
/**
* Request header fields need to be initialized by user in case of newly created requests.
*/
ConcreteRequest(const RequestHeader& request_header, const RequestData& data)
: AbstractRequest{request_header}, data_{data} {};

/**
* Encodes given request into a buffer, with any extra configuration carried by the context.
*/
size_t encode(Buffer::Instance& dst) const override {
EncodingContext context{request_header_.api_version_};
size_t written{0};
// Encode request header.
written += context.encode(request_header_.api_key_, dst);
written += context.encode(request_header_.api_version_, dst);
written += context.encode(request_header_.correlation_id_, dst);
written += context.encode(request_header_.client_id_, dst);
// Encode request-specific data.
written += context.encode(data_, dst);
return written;
}

bool operator==(const ConcreteRequest<RequestData>& rhs) const {
return request_header_ == rhs.request_header_ && data_ == rhs.data_;
};

private:
const RequestData data_;
};

/**
* Request that did not have api_key & api_version that could be matched with any of
* request-specific parsers.
* Right now it acts as a placeholder only, and does not carry the request data.
*/
class UnknownRequest : public AbstractRequest {
public:
UnknownRequest(const RequestHeader& request_header) : AbstractRequest{request_header} {};

/**
* It is impossible to encode unknown request, as it is only a placeholder.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this can never happen in practice? If so should it be NOT_IMPLEMENTED?

*/
size_t encode(Buffer::Instance&) const override {
throw EnvoyException("cannot serialize unknown request");
}
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading