Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
1c63244
Current Kafka codec code, squashed into a single commit
adamkotwasinski Apr 18, 2019
783a2f5
Remove all Kafka-related tests
adamkotwasinski Apr 19, 2019
b2f7cb7
Do not generate any Kafka structures
adamkotwasinski Apr 19, 2019
8cc9ecd
Remove generated tests
adamkotwasinski Apr 19, 2019
a1ccf2e
Remove Kafka codec - leave only codec library
adamkotwasinski Apr 19, 2019
007c09f
Remove Kafka request lib and generated protocol code (coverage passes)
adamkotwasinski Apr 19, 2019
db56d16
Add back Kafka request lib and generated protocol code
adamkotwasinski Apr 19, 2019
7e14420
Remove RequestStartParser's parse method
adamkotwasinski Apr 19, 2019
54a0298
Previous commit succeeds
adamkotwasinski Apr 20, 2019
221ffff
Add back RequestStartParser's parse method
adamkotwasinski Apr 20, 2019
f2fe371
Remove RequestHeaderParser's parse method
adamkotwasinski Apr 20, 2019
1180e26
Previous commit fails coverage
adamkotwasinski Apr 20, 2019
f90fd6f
Fix compile error
adamkotwasinski Apr 20, 2019
136b314
Remove RequestHeaderParser, but keep using RequestHeaderDeserializer
adamkotwasinski Apr 20, 2019
38dcf0c
Previous commit fails coverage
adamkotwasinski Apr 21, 2019
852c2e3
Remove (generated) Kafka request parser resolver
adamkotwasinski Apr 21, 2019
2315eac
Previous commit fails coverage
adamkotwasinski Apr 21, 2019
d15b15b
Remove SentinelParser
adamkotwasinski Apr 21, 2019
b77db2a
Previous commit fails coverage
adamkotwasinski Apr 22, 2019
6f95744
Remove anything notable from RequestStartParser; remove RequestParser
adamkotwasinski Apr 22, 2019
e18b335
Previous commit fails coverage
adamkotwasinski Apr 22, 2019
e14b784
Replace ser-composite-4 with ser-composite-0
adamkotwasinski Apr 22, 2019
2bfa1a7
Previous commit fails coverage
adamkotwasinski Apr 22, 2019
b37cc5c
Replace generated serialization_composite.h with real one
adamkotwasinski Apr 22, 2019
a086b96
Previous commit passes coverage
adamkotwasinski Apr 22, 2019
93a02cb
Remove all Kafka code; put in simple example of using generated code
adamkotwasinski Apr 22, 2019
c85587c
Make sure that generated code is covered by gcovr
adamkotwasinski Apr 22, 2019
de57bef
Previous build passes coverage (but does not generate report)
adamkotwasinski Apr 23, 2019
14aa690
Make scenario a bit more complex
adamkotwasinski Apr 23, 2019
3f938a3
Previous commit fails coverage
adamkotwasinski Apr 23, 2019
2fa802f
Simplify the example even more
adamkotwasinski Apr 23, 2019
adb3bf3
Previous commit fails coverage (what is good)
adamkotwasinski Apr 23, 2019
a108626
Remove test
adamkotwasinski Apr 23, 2019
1bfc24c
Previous commit fails coverage (what is good)
adamkotwasinski Apr 23, 2019
ee7b9ab
Split into normal lib & generated lib
adamkotwasinski Apr 23, 2019
d50f31b
Previous commit fails coverage
adamkotwasinski Apr 24, 2019
d5018b5
Revert splitting
adamkotwasinski Apr 24, 2019
3733a58
Pass an argument from normal code
adamkotwasinski Apr 24, 2019
a31a1be
Previous commit fails coverage
adamkotwasinski Apr 24, 2019
be63c1a
Simplify the code even more
adamkotwasinski Apr 24, 2019
ccabde1
Previous commit fails coverage
adamkotwasinski Apr 24, 2019
4081ba5
Put generated code in 'external' directory - maybe this will make cov…
adamkotwasinski Apr 24, 2019
e5770cc
Previous build passes coverage (and generates the report)
adamkotwasinski Apr 24, 2019
c692b4d
Put in some Kafka code again, but this time put generated stuff into …
adamkotwasinski Apr 24, 2019
08079f9
Previous commit succeeds
adamkotwasinski Apr 25, 2019
a3a9589
Generate Kafka structures
adamkotwasinski Apr 25, 2019
286323d
Previous build passes coverage (and generates the report)
adamkotwasinski Apr 25, 2019
b152744
Add Kafka codec
adamkotwasinski Apr 25, 2019
6e24521
Add the tests + change the paths for generated ones
adamkotwasinski Apr 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def api_dependencies():
locations = REPOSITORY_LOCATIONS,
build_file_content = OPENCENSUSTRACE_BUILD_CONTENT,
)
envoy_http_archive(
name = "kafka_source",
locations = REPOSITORY_LOCATIONS,
build_file_content = KAFKASOURCE_BUILD_CONTENT,
)

GOOGLEAPIS_BUILD_CONTENT = """
load("@com_google_protobuf//:protobuf.bzl", "cc_proto_library", "py_proto_library")
Expand Down Expand Up @@ -285,3 +290,15 @@ go_proto_library(
visibility = ["//visibility:public"],
)
"""

KAFKASOURCE_BUILD_CONTENT = """

filegroup(
name = "request_protocol_files",
srcs = glob([
"*Request.json",
]),
visibility = ["//visibility:public"],
)

"""
7 changes: 7 additions & 0 deletions api/bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ GOOGLEAPIS_SHA = "16f5b2e8bf1e747a32f9a62e211f8f33c94645492e9bbd72458061d9a9de1f
PROMETHEUS_GIT_SHA = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" # Nov 17, 2017
PROMETHEUS_SHA = "783bdaf8ee0464b35ec0c8704871e1e72afa0005c3f3587f65d9d6694bf3911b"

KAFKA_SOURCE_SHA = "ae7a1696c0a0302b43c5b21e515c37e6ecd365941f68a510a7e442eebddf39a1" # 2.2.0-rc2

REPOSITORY_LOCATIONS = dict(
bazel_skylib = dict(
sha256 = BAZEL_SKYLIB_SHA256,
Expand Down Expand Up @@ -48,4 +50,9 @@ REPOSITORY_LOCATIONS = dict(
strip_prefix = "opencensus-proto-" + OPENCENSUS_RELEASE + "/src/opencensus/proto/trace/v1",
urls = ["https://github.com/census-instrumentation/opencensus-proto/archive/v" + OPENCENSUS_RELEASE + ".tar.gz"],
),
kafka_source = dict(
sha256 = KAFKA_SOURCE_SHA,
strip_prefix = "kafka-2.2.0-rc2/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.2.0-rc2.zip"],
),
)
1 change: 1 addition & 0 deletions source/common/common/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Logger {
FUNCTION(http2) \
FUNCTION(hystrix) \
FUNCTION(init) \
FUNCTION(kafka) \
FUNCTION(lua) \
FUNCTION(main) \
FUNCTION(misc) \
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ EXTENSIONS = {
"envoy.filters.network.echo": "//source/extensions/filters/network/echo:config",
"envoy.filters.network.ext_authz": "//source/extensions/filters/network/ext_authz:config",
"envoy.filters.network.http_connection_manager": "//source/extensions/filters/network/http_connection_manager:config",
# NOTE: Kafka filter does not have a proper filter implemented right now. We are referencing to
# codec implementation that is going to be used by the filter.
"envoy.filters.network.kafka": "//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"envoy.filters.network.mongo_proxy": "//source/extensions/filters/network/mongo_proxy:config",
"envoy.filters.network.mysql_proxy": "//source/extensions/filters/network/mysql_proxy:config",
"envoy.filters.network.ratelimit": "//source/extensions/filters/network/ratelimit:config",
Expand Down
139 changes: 139 additions & 0 deletions source/extensions/filters/network/kafka/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
licenses(["notice"]) # Apache 2

# Kafka network filter.
# Public docs: docs/root/configuration/network_filters/kafka_filter.rst

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "kafka_request_codec_lib",
srcs = ["request_codec.cc"],
hdrs = [
"codec.h",
"request_codec.h",
],
deps = [
":kafka_request_lib",
":message_lib",
"//source/common/buffer:buffer_lib",
],
)

envoy_cc_library(
name = "kafka_request_lib",
srcs = [
"kafka_request_parser.cc",
"external/kafka_request_resolver.cc",
],
hdrs = [
"kafka_request.h",
"kafka_request_parser.h",
"external/requests.h",
],
deps = [
":parser_lib",
":serialization_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
],
)

genrule(
name = "kafka_generated_source",
srcs = [
"@kafka_source//:request_protocol_files",
],
outs = [
"external/requests.h",
"external/kafka_request_resolver.cc",
],
cmd = """
./$(location :kafka_code_generator) generate-source \
$(location external/requests.h) $(location external/kafka_request_resolver.cc) \
$(SRCS)
""",
tools = [
":kafka_code_generator",
],
)

py_binary(
name = "kafka_code_generator",
srcs = ["protocol_code_generator/kafka_generator.py"],
data = glob(["protocol_code_generator/*.j2"]),
main = "protocol_code_generator/kafka_generator.py",
deps = ["@com_github_pallets_jinja//:jinja2"],
)

envoy_cc_library(
name = "parser_lib",
hdrs = ["parser.h"],
deps = [
":kafka_protocol_lib",
":message_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "message_lib",
hdrs = [
"message.h",
],
deps = [
"//include/envoy/buffer:buffer_interface",
],
)

envoy_cc_library(
name = "serialization_lib",
hdrs = [
"serialization.h",
"external/serialization_composite.h",
],
deps = [
":kafka_protocol_lib",
"//include/envoy/buffer:buffer_interface",
"//source/common/common:byte_order_lib",
],
)

genrule(
name = "serialization_composite_generated_source",
srcs = [],
outs = [
"external/serialization_composite.h",
],
cmd = """
./$(location :serialization_composite_generator) generate-source \
$(location external/serialization_composite.h)
""",
tools = [
":serialization_composite_generator",
],
)

py_binary(
name = "serialization_composite_generator",
srcs = ["serialization_code_generator/serialization_composite_generator.py"],
data = glob(["serialization_code_generator/*.j2"]),
main = "serialization_code_generator/serialization_composite_generator.py",
deps = ["@com_github_pallets_jinja//:jinja2"],
)

envoy_cc_library(
name = "kafka_protocol_lib",
hdrs = [
"kafka_types.h",
],
external_deps = ["abseil_optional"],
deps = [
"//source/common/common:macros",
],
)
44 changes: 44 additions & 0 deletions source/extensions/filters/network/kafka/codec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include "envoy/buffer/buffer.h"
#include "envoy/common/pure.h"

#include "extensions/filters/network/kafka/message.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Kafka message decoder.
*/
class MessageDecoder {
public:
virtual ~MessageDecoder() = default;

/**
* Processes given buffer attempting to decode messages contained within.
* @param data buffer instance
*/
virtual void onData(Buffer::Instance& data) PURE;
};

/**
* Kafka message encoder.
*/
class MessageEncoder {
public:
virtual ~MessageEncoder() = default;

/**
* Encodes given message.
* @param message message to be encoded
*/
virtual void encode(const Message& message) PURE;
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
100 changes: 100 additions & 0 deletions source/extensions/filters/network/kafka/kafka_request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#pragma once

#include "envoy/common/exception.h"

#include "extensions/filters/network/kafka/message.h"
#include "extensions/filters/network/kafka/serialization.h"
#include "extensions/filters/network/kafka/external/serialization_composite.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Represents fields that are present in every Kafka request message.
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
struct RequestHeader {
int16_t api_key_;
int16_t api_version_;
int32_t correlation_id_;
NullableString client_id_;

bool operator==(const RequestHeader& rhs) const {
return api_key_ == rhs.api_key_ && api_version_ == rhs.api_version_ &&
correlation_id_ == rhs.correlation_id_ && client_id_ == rhs.client_id_;
};
};

/**
* Abstract Kafka request.
* Contains data present in every request (the header with request key, version, etc.).
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
class AbstractRequest : public Message {
public:
AbstractRequest(const RequestHeader& request_header) : request_header_{request_header} {};

/**
* Request's header.
*/
const RequestHeader request_header_;
};

/**
* Concrete request that carries data particular to given request type.
*/
template <typename RequestData> class ConcreteRequest : public AbstractRequest {
public:
/**
* Request header fields need to be initialized by user in case of newly created requests.
*/
ConcreteRequest(const RequestHeader& request_header, const RequestData& data)
: AbstractRequest{request_header}, data_{data} {};

/**
* Encodes given request into a buffer, with any extra configuration carried by the context.
*/
size_t encode(Buffer::Instance& dst) const override {
EncodingContext context{request_header_.api_version_};
size_t written{0};
// Encode request header.
written += context.encode(request_header_.api_key_, dst);
written += context.encode(request_header_.api_version_, dst);
written += context.encode(request_header_.correlation_id_, dst);
written += context.encode(request_header_.client_id_, dst);
// Encode request-specific data.
written += context.encode(data_, dst);
return written;
}

bool operator==(const ConcreteRequest<RequestData>& rhs) const {
return request_header_ == rhs.request_header_ && data_ == rhs.data_;
};

private:
const RequestData data_;
};

/**
* Request that did not have api_key & api_version that could be matched with any of
* request-specific parsers.
* Right now it acts as a placeholder only, and does not carry the request data.
*/
class UnknownRequest : public AbstractRequest {
public:
UnknownRequest(const RequestHeader& request_header) : AbstractRequest{request_header} {};

/**
* It is impossible to encode unknown request, as it is only a placeholder.
*/
size_t encode(Buffer::Instance&) const override {
throw EnvoyException("cannot serialize unknown request");
}
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading