Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ class Instance {
*/
virtual Api::SysCallIntResult read(int fd, uint64_t max_length) PURE;

virtual Api::SysCallIntResult recvFrom(int fd, uint64_t max_length,
sockaddr_storage& peer_address,
socklen_t& sock_length) PURE;
/**
* Reserve space in the buffer.
* @param length supplies the amount of space to reserve.
Expand Down
10 changes: 10 additions & 0 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ class Dispatcher {
Network::ListenerCallbacks& cb, bool bind_to_port,
bool hand_off_restored_destination_connections) PURE;

/**
* Create a logical udp listener on a specific port.
* @param socket supplies the socket to listen on.
* @param cb supplies the udp listener callbacks to invoke for listener events.
* @param bind_to_port controls whether the listener binds to a transport port or not.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createUdpListener(Network::Socket& socket,
Network::UdpListenerCallbacks& cb,
bool bind_to_port) PURE;
/**
* Allocate a timer. @see Timer for docs on how to use the timer.
* @param cb supplies the callback to invoke when the timer fires.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class ConnectionHandler {
*/
virtual void addListener(ListenerConfig& config) PURE;

/**
* Adds UDP listener to the handler.
* @param config listener configuration options.
*/
virtual void addUdpListener(ListenerConfig& config) PURE;

/**
* Find a listener based on the provided listener address value.
* @param address supplies the address value.
Expand Down
32 changes: 32 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,38 @@ class ListenerCallbacks {
virtual void onNewConnection(ConnectionPtr&& new_connection) PURE;
};

/**
* UDP listener callbacks.
*/
class UdpListenerCallbacks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at some layer we're going to want to have the Udp stack have the equivalent of the existing ListenerCallbacks. While it's true that there's no UDP accept, the concepts "I have a new connection" and "I have gone through a series of filters acting at a very low level before I formally accept the connection" may still apply. Looking at Envoy listener filters, there's proxy proto and tls inspector. I think we can have similar functionality for UDP - we have our own hacked version of the proxy protocol header which we achieve via packet munging and Envoy will want an equivalent, and you can totally SNI-sniff QUIC.

Envoy current listener filters do work by delaying accept and doing peek, which is so not working for UDP :-P That said in-house there are areas where we do light-weight packet queueing until we have enough data we can tell how to proceed, so I think a logical equivalent of listener filters and "peek" do still kind of work and we will want them at the end of the day.

I'm fine landing this as-is, with a TODO to investigate sharing ListenerCallback in future once we have more of a handle of how things are going, but I figure it's at least worth discussing now. It may be worth at least considering if we can reuse more of the existing listener code now, with just config rejection if anyone tries to insert a tcp listener filter on the UDP stack.

@mpwarres @mattklein123

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, my thinking was to share as many of the public interfaces we have now as possible until we figure out we absolutely can't do it. Could we start potentially with that being the goal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also the original_src filter I'm writing (#5337). It should work just as well with UDP as with TCP as long as something can figure out the source (i.e. UDP version of proxy protocol) prior to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alyssawilk I think the UDP listener should be able to do what you suggested. Since at the UDP listener layer, we are still dealing with "Buffer", we should be able to stack filters. Right?

public:
virtual ~UdpListenerCallbacks() = default;

// TODO(conqerAtapple): Refactor `Connection` to accommodate UDP/QUIC.

/**
* On first packet received on a UDP socket. This allows the callback handler
* to establish filter chain (or any other prepararion).
*
* @param local_address Local bound socket network address.
* @param peer_address Network address of the peer.
* @param data Data buffer received.
*/
virtual void onNewConnection(Address::InstanceConstSharedPtr local_address,
Address::InstanceConstSharedPtr peer_address,
Buffer::InstancePtr&& data) PURE;
/**
* Called whenever data is received by the underlying Udp socket.
*
* @param local_address Local bound socket network address.
* @param peer_address Network address of the peer.
* @param data Data buffer received.
*/
virtual void onData(Address::InstanceConstSharedPtr local_address,
Address::InstanceConstSharedPtr peer_address,
Buffer::InstancePtr&& data) PURE;
};

/**
* An abstract socket listener. Free the listener to stop listening on the socket.
*/
Expand Down
25 changes: 25 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,31 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) {
static_cast<LibEventInstance&>(rhs).postProcess();
}

Api::SysCallIntResult OwnedImpl::recvFrom(int fd, uint64_t max_length, sockaddr_storage& peer_addr,
socklen_t& addr_len) {
if (max_length == 0) {
return {0, 0};
}

memset(&peer_addr, 0, sizeof(sockaddr_storage));

RawSlice slice;
const uint64_t num_slices = reserve(max_length, &slice, 1);

ASSERT(num_slices == 1);
// TODO(conqerAtapple): Use os_syscalls
const ssize_t rc = ::recvfrom(fd, slice.mem_, max_length, 0,
reinterpret_cast<struct sockaddr*>(&peer_addr), &addr_len);
if (rc < 0) {
return {static_cast<int>(rc), errno};
}

slice.len_ = std::min(slice.len_, static_cast<size_t>(rc));
commit(&slice, 1);

return {static_cast<int>(rc), errno};
}

Api::SysCallIntResult OwnedImpl::read(int fd, uint64_t max_length) {
if (max_length == 0) {
return {0, 0};
Expand Down
2 changes: 2 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class OwnedImpl : public LibEventInstance {
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
Api::SysCallIntResult read(int fd, uint64_t max_length) override;
Api::SysCallIntResult recvFrom(int fd, uint64_t max_length, sockaddr_storage& peer_address,
socklen_t& sock_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
ssize_t search(const void* data, uint64_t size, size_t start) const override;
Api::SysCallIntResult write(int fd) override;
Expand Down
8 changes: 8 additions & 0 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/network/connection_impl.h"
#include "common/network/dns_impl.h"
#include "common/network/listener_impl.h"
#include "common/network/udp_listener_impl.h"

#include "event2/event.h"

Expand Down Expand Up @@ -119,6 +120,13 @@ DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbac
hand_off_restored_destination_connections)};
}

Network::ListenerPtr DispatcherImpl::createUdpListener(Network::Socket& socket,
Network::UdpListenerCallbacks& cb,
bool bind_to_port) {
ASSERT(isThreadSafe());
return Network::ListenerPtr{new Network::UdpListenerImpl(*this, socket, cb, bind_to_port)};
}

TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
ASSERT(isThreadSafe());
return scheduler_->createTimer(cb);
Expand Down
4 changes: 3 additions & 1 deletion source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
/**
* @return event_base& the libevent base.
*/
event_base& base() { return *base_; }
event_base& base() const { return *base_; }

// Event::Dispatcher
TimeSystem& timeSystem() override { return time_system_; }
Expand All @@ -51,6 +51,8 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
Network::ListenerPtr createListener(Network::Socket& socket, Network::ListenerCallbacks& cb,
bool bind_to_port,
bool hand_off_restored_destination_connections) override;
Network::ListenerPtr createUdpListener(Network::Socket& socket, Network::UdpListenerCallbacks& cb,
bool bind_to_port) override;
TimerPtr createTimer(TimerCb cb) override;
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
Expand Down
5 changes: 5 additions & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,14 @@ envoy_cc_library(
envoy_cc_library(
name = "listener_lib",
srcs = [
"base_listener_impl.cc",
"listener_impl.cc",
"udp_listener_impl.cc",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now's also a good time to think about how we want to add build options.

I'm happy to have UDP default on (so our CI builds it) but make it easy for folks to opt out (especially to make things less painful for our windows build, while we work on cross-platform support. I strongly suspect QUIC specifically but also likely UDP in general are going to work for Linux for a first pass. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real preference for me on whether we allow compile out on a first pass. Maybe as a follow up? I'm fine either way but definitely a good callout.

],
hdrs = [
"base_listener_impl.h",
"listener_impl.h",
"udp_listener_impl.h",
],
deps = [
":address_lib",
Expand All @@ -138,6 +142,7 @@ envoy_cc_library(
"//include/envoy/network:listener_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:linked_object",
Expand Down
35 changes: 35 additions & 0 deletions source/common/network/base_listener_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "common/network/base_listener_impl.h"

#include <sys/un.h>

#include "envoy/common/exception.h"

#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/common/fmt.h"
#include "common/event/dispatcher_impl.h"
#include "common/event/file_event_impl.h"
#include "common/network/address_impl.h"

#include "event2/listener.h"

namespace Envoy {
namespace Network {

Address::InstanceConstSharedPtr BaseListenerImpl::getLocalAddress(int fd) {
return Address::addressFromFd(fd);
}

BaseListenerImpl::BaseListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket)
: local_address_(nullptr), dispatcher_(dispatcher), socket_(socket) {
const auto ip = socket.localAddress()->ip();

// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
}
}

} // namespace Network
} // namespace Envoy
30 changes: 30 additions & 0 deletions source/common/network/base_listener_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "envoy/network/listener.h"

#include "common/event/dispatcher_impl.h"
#include "common/event/libevent.h"
#include "common/network/listen_socket_impl.h"

#include "event2/event.h"

namespace Envoy {
namespace Network {

/**
* Base libevent implementation of Network::Listener.
*/
class BaseListenerImpl : public Listener {
public:
BaseListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket);

protected:
virtual Address::InstanceConstSharedPtr getLocalAddress(int fd);

Address::InstanceConstSharedPtr local_address_;
const Event::DispatcherImpl& dispatcher_;
Socket& socket_;
};

} // namespace Network
} // namespace Envoy
48 changes: 20 additions & 28 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
namespace Envoy {
namespace Network {

Address::InstanceConstSharedPtr ListenerImpl::getLocalAddress(int fd) {
return Address::addressFromFd(fd);
}

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
Expand All @@ -44,35 +40,31 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
listener->hand_off_restored_destination_connections_);
}

ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections)
: local_address_(nullptr), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
const auto ip = socket.localAddress()->ip();
void ListenerImpl::setupServerSocket(const Event::DispatcherImpl& dispatcher, Socket& socket) {
listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));

// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}

if (bind_to_port) {
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));

if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}
if (!Network::Socket::applyOptions(socket.options(), socket,
envoy::api::v2::core::SocketOption::STATE_LISTENING)) {
throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}",
socket.localAddress()->asString()));
}

if (!Network::Socket::applyOptions(socket.options(), socket,
envoy::api::v2::core::SocketOption::STATE_LISTENING)) {
throw CreateListenerException(fmt::format(
"cannot set post-listen socket option on socket: {}", socket.localAddress()->asString()));
}
evconnlistener_set_error_cb(listener_.get(), errorCallback);
}

evconnlistener_set_error_cb(listener_.get(), errorCallback);
ListenerImpl::ListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket,
ListenerCallbacks& cb, bool bind_to_port,
bool hand_off_restored_destination_connections)
: BaseListenerImpl(dispatcher, socket), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
if (bind_to_port) {
setupServerSocket(dispatcher, socket);
}
}

Expand Down
24 changes: 9 additions & 15 deletions source/common/network/listener_impl.h
Original file line number Diff line number Diff line change
@@ -1,38 +1,32 @@
#pragma once

#include "envoy/network/listener.h"

#include "common/event/dispatcher_impl.h"
#include "common/event/libevent.h"
#include "common/network/listen_socket_impl.h"

#include "event2/event.h"
#include "base_listener_impl.h"

namespace Envoy {
namespace Network {

/**
* libevent implementation of Network::Listener.
* libevent implementation of Network::Listener for TCP.
* TODO(conqerAtapple): Consider renaming the class to `TcpListenerImpl`.
*/
class ListenerImpl : public Listener {
class ListenerImpl : public BaseListenerImpl {
public:
ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
ListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections);

void disable();
void enable();
void disable() override;
void enable() override;

protected:
virtual Address::InstanceConstSharedPtr getLocalAddress(int fd);
void setupServerSocket(const Event::DispatcherImpl& dispatcher, Socket& socket);

Address::InstanceConstSharedPtr local_address_;
ListenerCallbacks& cb_;
const bool hand_off_restored_destination_connections_;

private:
static void errorCallback(evconnlistener* listener, void* context);
static void listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg);
static void errorCallback(evconnlistener* listener, void* context);

Event::Libevent::ListenerPtr listener_;
};
Expand Down
Loading