Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
fb1db47
core: add envoy internal address
lambdai Aug 27, 2020
6fceaf2
listener: add internal listener api
lambdai Aug 27, 2020
df05bba
rename Network::ListenerImpl and Network::ListenerCallbacks
lambdai Aug 27, 2020
d7cac74
rename file
lambdai Aug 27, 2020
be6f09d
scaffold of internal listener impl
lambdai Aug 27, 2020
d305855
add missing file
lambdai Aug 27, 2020
b6d9da5
remove client address and add tests
lambdai Aug 27, 2020
c413df9
Merge remote-tracking branch 'origin/master' into eiaddr
lambdai Aug 27, 2020
48e0640
remove unimplement to fix docs
lambdai Aug 28, 2020
e81a41e
guard internal address
lambdai Sep 3, 2020
72cd41d
Merge remote-tracking branch 'me/eiaddr' into eiaddr
lambdai Sep 3, 2020
a337bde
add integration test for NullIoSocketHandleImpl
lambdai Sep 3, 2020
dc87455
add integration test for NullIoSocketHandleImpl
lambdai Sep 3, 2020
3be784d
Merge branch 'eiaddr' of github.com:lambdai/envoy-dai into eiaddr
lambdai Sep 3, 2020
51914a9
fix spelling
lambdai Sep 4, 2020
f12c3d2
Merge remote-tracking branch 'origin/master' into eiaddr
lambdai Sep 4, 2020
219e0da
Merge branch 'eiaddr' into eilistener
lambdai Sep 4, 2020
f8816b0
incomplete impl of internal listener
lambdai Sep 9, 2020
bc57e78
adding active internal listener
lambdai Sep 9, 2020
4c2d693
add listener manager test
lambdai Sep 11, 2020
cfca895
refactor ConnectionSocketImpl: has SocketImpl instead of inherit
lambdai Sep 11, 2020
07c1ecb
rename listener to stream_listener in tcpsocket
lambdai Sep 14, 2020
171b1d9
extract generic listener filter
lambdai Sep 14, 2020
c99aa54
active internal socket
lambdai Sep 14, 2020
b25f28d
adding BufferedIoSocketHandleImpl, start with recv
lambdai Sep 17, 2020
73c5074
format
lambdai Sep 17, 2020
8439a67
add buffered_io_socket_handle_test
lambdai Sep 21, 2020
d7cff5c
add flow control test
lambdai Sep 21, 2020
808ea05
add EventScheduleBasic test
lambdai Sep 23, 2020
50479ca
add more event test
lambdai Sep 23, 2020
5fdec41
add test close
lambdai Sep 24, 2020
b55e03e
add shutdown test
lambdai Sep 24, 2020
4a7970c
fixing test: working on TestReadFlowAfterShutdownWrite
lambdai Sep 25, 2020
5ffedb4
close buffered_io_socket_handle_impl_test
lambdai Sep 25, 2020
7a972a5
Merge branch 'master' into eilistener
lambdai Sep 25, 2020
81d4c2e
add basic closing client connection test
lambdai Sep 26, 2020
32eca3f
add connection callback and active close test in CLosingClientImpl
lambdai Sep 27, 2020
80a9d2c
dispatcher: create internal connection to nowhere
lambdai Sep 28, 2020
a5572cb
fixing create connection
lambdai Sep 28, 2020
254daa4
basic internal client connection test
lambdai Sep 29, 2020
568e42b
fix connect probe and add client write test
lambdai Sep 29, 2020
953326b
various fixes: client id on dispatcher, StreamListener for active Int…
lambdai Sep 30, 2020
cd30f84
adding back chain tcp proxy integration test
lambdai Oct 1, 2020
95b5a54
fix all internal listener integration test
lambdai Oct 1, 2020
abbffa5
fix test
lambdai Oct 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/envoy/config/listener/v3/internal_listener.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
syntax = "proto3";

package envoy.config.listener.v3;

import "udpa/annotations/status.proto";
import "udpa/annotations/versioning.proto";

option java_package = "io.envoyproxy.envoy.config.listener.v3";
option java_outer_classname = "InternalListenerProto";
option java_multiple_files = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

// [#not-implemented-hide:]
// [#protodoc-title: internal listener]
// Describes a type of internal listener which expects to serve the cluster in
// the same envoy process.
message InternalListener {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.listener.v2.InternalListener";
}
9 changes: 8 additions & 1 deletion api/envoy/config/listener/v3/listener.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "envoy/config/core/v3/base.proto";
import "envoy/config/core/v3/extension.proto";
import "envoy/config/core/v3/socket_option.proto";
import "envoy/config/listener/v3/api_listener.proto";
import "envoy/config/listener/v3/internal_listener.proto";
import "envoy/config/listener/v3/listener_components.proto";
import "envoy/config/listener/v3/udp_listener_config.proto";

Expand Down Expand Up @@ -36,7 +37,7 @@ message ListenerCollection {
repeated udpa.core.v1.CollectionEntry entries = 1;
}

// [#next-free-field: 25]
// [#next-free-field: 26]
message Listener {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.Listener";

Expand Down Expand Up @@ -263,4 +264,10 @@ message Listener {
// The maximum length a tcp listener's pending connections queue can grow to. If no value is
// provided net.core.somaxconn will be used on Linux and 128 otherwise.
google.protobuf.UInt32Value tcp_backlog_size = 24;

// Used to represent an internal listener, which accepts connection from the cluster in the same envoy process.
// When this field is set, the address field must be :ref:`envoy internal address
// <envoy_api_field_config.core.v3.EnvoyInternalAddress>`.
// [#not-implemented-hide:]
InternalListener internal_listener = 25;
}
20 changes: 20 additions & 0 deletions api/envoy/config/listener/v4alpha/internal_listener.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion api/envoy/config/listener/v4alpha/listener.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion generated_api_shadow/envoy/config/listener/v3/listener.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ class Dispatcher {
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) PURE;

/**
* Creates an client internal connection. Does NOT initiate the connection;
* the caller must then call connect() on the returned Network::ClientConnection.
* @param internal_address supplies the internal address to connect to.
* @param local_address supplies an address to bind to or nullptr if no bind is necessary.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe explain what binding is in internal address context?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Will add that listener side connection could obtain the bind local address for debug.

I also want to add here that I want to introduce a generic "socket options" param in the future to deliver more information including "original address" which is supposed to be "ip/port" address raising this internal address.

* @return Network::ClientConnectionPtr a client connection that is owned by the caller.
*/
virtual Network::ClientConnectionPtr
createInternalConnection(Network::Address::InstanceConstSharedPtr internal_address,
Network::Address::InstanceConstSharedPtr local_address) PURE;
/**
* Creates an async DNS resolver. The resolver should only be used on the thread that runs this
* dispatcher.
Expand Down
17 changes: 17 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ class ListenerConfig {
*/
virtual UdpPacketWriterFactoryOptRef udpPacketWriterFactory() PURE;

/**
* @return true if this listener is internal listener.
*/
virtual bool isInternalListener() PURE;

/**
* @return the ``UdpListenerWorkerRouter`` for this listener. This will
* be non-empty iff this is a UDP listener.
Expand Down Expand Up @@ -203,6 +208,18 @@ class TcpListenerCallbacks {
virtual void onReject() PURE;
};

/**
* Callbacks invoked by a internal listener.
*/
class InternalListenerCallbacks {
public:
virtual ~InternalListenerCallbacks() = default;

virtual void setupNewConnection(Network::ConnectionPtr server_conn,
Network::ConnectionSocketPtr socket) PURE;
virtual void onNewSocket(Network::ConnectionSocketPtr socket) PURE;
};

/**
* Utility struct that encapsulates the information from a udp socket's recvmmsg call.
*/
Expand Down
1 change: 1 addition & 0 deletions source/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/common:thread_lib",
"//source/common/filesystem:watcher_lib",
"//source/common/network:buffered_io_socket_handle_lib",
"//source/common/network:connection_lib",
"//source/common/network:dns_lib",
"//source/common/network:listener_lib",
Expand Down
69 changes: 69 additions & 0 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "common/network/address_impl.h"
#include "common/network/raw_buffer_socket.h"
#include "envoy/api/api.h"
#include "envoy/network/listen_socket.h"
#include "envoy/network/listener.h"
Expand All @@ -18,6 +21,7 @@
#include "common/event/signal_impl.h"
#include "common/event/timer_impl.h"
#include "common/filesystem/watcher_impl.h"
#include "common/network/buffered_io_socket_handle_impl.h"
#include "common/network/connection_impl.h"
#include "common/network/dns_impl.h"
#include "common/network/tcp_listener_impl.h"
Expand Down Expand Up @@ -117,6 +121,71 @@ DispatcherImpl::createClientConnection(Network::Address::InstanceConstSharedPtr
std::move(transport_socket), options);
}

namespace {
Network::Address::InstanceConstSharedPtr
nextClientAddress(const Network::Address::InstanceConstSharedPtr& server_address) {
uint64_t id = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean this to be static? If so fix and test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. It should be static. The local address is well used, only need to guarantee such address is not nullptr.
Look like you are fine with such a debug purpose address :)

return std::make_shared<Network::Address::EnvoyInternalInstance>(absl::StrCat(server_address->asStringView(), "_", ++id));
}
} // namespace

Network::ClientConnectionPtr
DispatcherImpl::createInternalConnection(Network::Address::InstanceConstSharedPtr internal_address,
Network::Address::InstanceConstSharedPtr local_address) {
ASSERT(isThreadSafe());
if (internal_address == nullptr) {
return nullptr;
}
if (local_address == nullptr) {
local_address = nextClientAddress(internal_address);
}
// Find the internal listener callback. The listener will setup the server connection.
auto iter = internal_listeners_.find(internal_address->asString());
for (const auto& [name, _] : internal_listeners_) {
ENVOY_LOG_MISC(debug, "lambdai: p listener {}", name);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to do my next pass once you've had a chance to clean these up, and do a sweep to make sure the code is commented.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is adding 3000 lines. Do you think it is realistic to merge this PR without splitting?

If not I prefer to keep the debug purpose log: I will backport the split PRs and see if the sub PRs breaks this overall PR: This PR has quite a few real world integration tests. These integration tests must pass but cannot be put in early sub PRs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think given the complexity it'd be better to split out what we can. It could be that it's just 2-3? As said I think the buffer code would split out pretty easily so let's give that a try and see what's left.
Also w.r.t. logging I think it's fine to leave many of the logs in (all the error corner case logs for example) and just move them from lambdai/LOG_MISC to real logs. It's fine to tackle that as the last step when the big PR is ready for review if you prefer.

}
if (iter == internal_listeners_.end()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I would have expected this in connect(). When we do normal IP style create and connect do we validate before connect?
no strong opinion either way, but if you prefer this maybe comment about where validation is done in include/ ?

Actually I think if you move this work to CONNECT you can skip the entire closingClientConnectionImpl which would be nice clean up

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I want to split ClosingClientConnectionImpl and ClientConnectionImpl:

The underlying BufferedIoSocketHandleImpl is aware of single transition: from "connected" to "disconnected".
If we set up the server connection in connect(), the client connection need to track "uninitialized" to "connected", "uninitialized" to "fail to connect".

BTW, The model internal connection I am borrowing from "socketpair()"

ENVOY_LOG_MISC(debug, "lambdai: no valid listener registered for envoy internal address {}",
internal_address->asString());
return std::make_unique<Network::ClosingClientConnectionImpl>(*this, internal_address,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also some of these are good to keep as debug logs, though not MISC_LOG

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. Will assign reasonable logger and level in the split PRs

local_address);
}

auto client_io_handle_ = std::make_unique<Network::BufferedIoSocketHandleImpl>();
auto server_io_handle_ = std::make_unique<Network::BufferedIoSocketHandleImpl>();
Comment on lines +154 to +155
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferedIoSocketHandleImpl as extension here?

client_io_handle_->setWritablePeer(server_io_handle_.get());
server_io_handle_->setWritablePeer(client_io_handle_.get());

Network::RawBufferSocketFactory client_transport_socket_factory;
// ConnectionSocket conn_socket
auto client_conn_socket = std::make_unique<Network::InternalConnectionSocketImpl>(
std::move(client_io_handle_), local_address, internal_address);
auto server_conn_socket = std::make_unique<Network::InternalConnectionSocketImpl>(
std::move(server_io_handle_), internal_address, local_address);
ENVOY_LOG_MISC(debug, "lambdai: internal address {}", internal_address->asString());
ENVOY_LOG_MISC(debug, "lambdai: client address {}", local_address->asString());

auto client_conn = std::make_unique<Network::ClientConnectionImpl>(
*this, internal_address, local_address,
client_transport_socket_factory.createTransportSocket(nullptr), nullptr,
std::move(client_conn_socket));

(iter->second)(internal_address, std::move(server_conn_socket));
return client_conn;
}

void DispatcherImpl::registerInternalListener(
const std::string& internal_listener_id,
DispatcherImpl::InternalConnectionCallback internal_conn_callback) {
if (internal_conn_callback == nullptr) {
ENVOY_LOG_MISC(debug, "lambdai: unregister pipe factory on address {}", internal_listener_id);
internal_listeners_.erase(internal_listener_id);
} else {
ENVOY_LOG_MISC(debug, "lambdai: register pipe factory on address {}", internal_listener_id);
internal_listeners_[internal_listener_id] = internal_conn_callback;
}
}

Network::DnsResolverSharedPtr DispatcherImpl::createDnsResolver(
const std::vector<Network::Address::InstanceConstSharedPtr>& resolvers,
const bool use_tcp_for_dns_lookups) {
Expand Down
17 changes: 16 additions & 1 deletion source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "envoy/event/deferred_deletable.h"
#include "envoy/event/dispatcher.h"
#include "envoy/network/connection_handler.h"
#include "envoy/network/listen_socket.h"
#include "envoy/stats/scope.h"

#include "common/common/logger.h"
Expand All @@ -21,6 +22,9 @@
#include "common/signal/fatal_error_handler.h"

namespace Envoy {
namespace Network {
class BufferedIoSocketHandleImpl;
}
namespace Event {

/**
Expand All @@ -34,7 +38,9 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
DispatcherImpl(const std::string& name, Buffer::WatermarkFactoryPtr&& factory, Api::Api& api,
Event::TimeSystem& time_system);
~DispatcherImpl() override;

using InternalConnectionCallback =
std::function<void(const Network::Address::InstanceConstSharedPtr& address,
std::unique_ptr<Network::ConnectionSocket> internal_socket)>;
/**
* @return event_base& the libevent base.
*/
Expand All @@ -53,6 +59,14 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
Network::Address::InstanceConstSharedPtr source_address,
Network::TransportSocketPtr&& transport_socket,
const Network::ConnectionSocket::OptionsSharedPtr& options) override;
// Register the internal listener to setup the internal connection. Pass nullptr callback to
// unregister.
void registerInternalListener(const std::string& internal_listener_id,
InternalConnectionCallback internal_conn_callback);

Network::ClientConnectionPtr
createInternalConnection(Network::Address::InstanceConstSharedPtr internal_address,
Network::Address::InstanceConstSharedPtr local_address) override;
Network::DnsResolverSharedPtr
createDnsResolver(const std::vector<Network::Address::InstanceConstSharedPtr>& resolvers,
const bool use_tcp_for_dns_lookups) override;
Expand Down Expand Up @@ -121,6 +135,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
const ScopeTrackedObject* current_object_{};
bool deferred_deleting_{};
MonotonicTime approximate_monotonic_time_;
absl::flat_hash_map<std::string, InternalConnectionCallback> internal_listeners_;
};

} // namespace Event
Expand Down
Loading