Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/protobuf/utility.h"
#include "common/runtime/runtime_protos.h"

#include "server/active_udp_listener.h"
#include "server/connection_handler_impl.h"

#include "extensions/quic_listeners/quiche/envoy_quic_dispatcher.h"
Expand Down
21 changes: 21 additions & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ envoy_cc_library(
envoy_cc_library(
name = "connection_handler_lib",
deps = [
":active_udp_listener",
":connection_handler_impl",
],
)
Expand Down Expand Up @@ -95,6 +96,26 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "active_udp_listener",
srcs = ["active_udp_listener.cc"],
hdrs = [
"active_udp_listener.h",
],
deps = [
"//include/envoy/event:dispatcher_interface",
"//include/envoy/network:connection_handler_interface",
"//include/envoy/network:exception_interface",
"//include/envoy/network:filter_interface",
"//include/envoy/network:listen_socket_interface",
"//include/envoy/network:listener_interface",
"//include/envoy/server:active_udp_listener_config_interface",
"//include/envoy/server:listener_manager_interface",
"//source/extensions/transport_sockets:well_known_names",
"//source/server:connection_handler_impl",
],
)

envoy_cc_library(
name = "drain_manager_lib",
srcs = ["drain_manager_impl.cc"],
Expand Down
1 change: 1 addition & 0 deletions source/server/active_raw_udp_listener_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "envoy/api/v2/listener/listener.pb.h"

#include "server/active_udp_listener.h"
#include "server/connection_handler_impl.h"
#include "server/well_known_names.h"

Expand Down
135 changes: 135 additions & 0 deletions source/server/active_udp_listener.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "server/active_udp_listener.h"

#include "envoy/network/exception.h"
#include "envoy/server/listener_manager.h"
#include "envoy/stats/scope.h"

#include "spdlog/spdlog.h"

namespace Envoy {
namespace Server {
ActiveUdpListenerBase::ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener,
Network::ListenerConfig* config)
: ConnectionHandlerImpl::ActiveListenerImplBase(parent, config), worker_index_(worker_index),
concurrency_(concurrency), parent_(parent), listen_socket_(listen_socket),
udp_listener_(std::move(listener)) {
ASSERT(worker_index_ < concurrency_);
config_->udpListenerWorkerRouter()->get().registerWorkerForListener(*this);
}

ActiveUdpListenerBase::~ActiveUdpListenerBase() {
config_->udpListenerWorkerRouter()->get().unregisterWorkerForListener(*this);
}

void ActiveUdpListenerBase::post(Network::UdpRecvData&& data) {
ASSERT(!udp_listener_->dispatcher().isThreadSafe(),
"Shouldn't be posting if thread safe; use onWorkerData() instead.");

// It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must
// bundle the socket inside a shared_ptr that can be captured.
// TODO(mattklein123): It may be possible to change the post() API such that the lambda is only
// moved, but this is non-trivial and needs investigation.
auto data_to_post = std::make_shared<Network::UdpRecvData>();
*data_to_post = std::move(data);

udp_listener_->dispatcher().post(
[data_to_post, tag = config_->listenerTag(), &parent = parent_]() {
Network::UdpListenerCallbacksOptRef listener = parent.getUdpListenerCallbacks(tag);
if (listener.has_value()) {
listener->get().onDataWorker(std::move(*data_to_post));
}
});
}

void ActiveUdpListenerBase::onData(Network::UdpRecvData&& data) {
uint32_t dest = worker_index_;

// For concurrency == 1, the packet will always go to the current worker.
if (concurrency_ > 1) {
dest = destination(data);
ASSERT(dest < concurrency_);
}

if (dest == worker_index_) {
onDataWorker(std::move(data));
} else {
config_->udpListenerWorkerRouter()->get().deliver(dest, std::move(data));
}
}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config)
: ActiveRawUdpListener(worker_index, concurrency, parent,
config.listenSocketFactory().getListenSocket(), dispatcher, config) {}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr listen_socket_ptr,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config)
: ActiveRawUdpListener(worker_index, concurrency, parent, *listen_socket_ptr, listen_socket_ptr,
dispatcher, config) {}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::Socket& listen_socket,
Network::SocketSharedPtr listen_socket_ptr,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config)
: ActiveRawUdpListener(worker_index, concurrency, parent, listen_socket,
dispatcher.createUdpListener(listen_socket_ptr, *this), config) {}

ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener,
Network::ListenerConfig& config)
: ActiveUdpListenerBase(worker_index, concurrency, parent, listen_socket, std::move(listener),
&config),
read_filter_(nullptr) {
// Create the filter chain on creating a new udp listener
config_->filterChainFactory().createUdpListenerFilterChain(*this, *this);

// If filter is nullptr, fail the creation of the listener
if (read_filter_ == nullptr) {
throw Network::CreateListenerException(
fmt::format("Cannot create listener as no read filter registered for the udp listener: {} ",
config_->name()));
}

// Create udp_packet_writer
udp_packet_writer_ = config.udpPacketWriterFactory()->get().createUdpPacketWriter(
listen_socket_.ioHandle(), config.listenerScope());
}

void ActiveRawUdpListener::onDataWorker(Network::UdpRecvData&& data) { read_filter_->onData(data); }

void ActiveRawUdpListener::onReadReady() {}

void ActiveRawUdpListener::onWriteReady(const Network::Socket&) {
// TODO(sumukhs): This is not used now. When write filters are implemented, this is a
// trigger to invoke the on write ready API on the filters which is when they can write
// data.

// Clear write_blocked_ status for udpPacketWriter.
udp_packet_writer_->setWritable();
}

void ActiveRawUdpListener::onReceiveError(Api::IoError::IoErrorCode error_code) {
read_filter_->onReceiveError(error_code);
}

void ActiveRawUdpListener::addReadFilter(Network::UdpListenerReadFilterPtr&& filter) {
ASSERT(read_filter_ == nullptr, "Cannot add a 2nd UDP read filter");
read_filter_ = std::move(filter);
}

Network::UdpListener& ActiveRawUdpListener::udpListener() { return *udp_listener_; }

} // namespace Server
} // namespace Envoy
102 changes: 102 additions & 0 deletions source/server/active_udp_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#pragma once

#include <cstdint>
#include <memory>

#include "envoy/network/connection_handler.h"
#include "envoy/network/filter.h"
#include "envoy/network/listen_socket.h"
#include "envoy/network/listener.h"
#include "envoy/server/active_udp_listener_config.h"

// TODO(lambdai): remove connection_handler_impl after ActiveListenerImplBase is extracted from it.
#include "server/connection_handler_impl.h"

namespace Envoy {
namespace Server {

class ActiveUdpListenerBase : public ConnectionHandlerImpl::ActiveListenerImplBase,
public Network::ConnectionHandler::ActiveUdpListener {
public:
ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener, Network::ListenerConfig* config);
~ActiveUdpListenerBase() override;

// Network::UdpListenerCallbacks
void onData(Network::UdpRecvData&& data) final;
uint32_t workerIndex() const final { return worker_index_; }
void post(Network::UdpRecvData&& data) final;

// ActiveListenerImplBase
Network::Listener* listener() override { return udp_listener_.get(); }

protected:
uint32_t destination(const Network::UdpRecvData& /*data*/) const override {
// By default, route to the current worker.
return worker_index_;
}

const uint32_t worker_index_;
const uint32_t concurrency_;
Network::UdpConnectionHandler& parent_;
Network::Socket& listen_socket_;
Network::UdpListenerPtr udp_listener_;
};

/**
* Wrapper for an active udp listener owned by this handler.
*/
class ActiveRawUdpListener : public ActiveUdpListenerBase,
public Network::UdpListenerFilterManager,
public Network::UdpReadFilterCallbacks {
public:
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config);
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent,
Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config);
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Network::Socket& listen_socket,
Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher,
Network::ListenerConfig& config);
ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency,
Network::UdpConnectionHandler& parent, Network::Socket& listen_socket,
Network::UdpListenerPtr&& listener, Network::ListenerConfig& config);

// Network::UdpListenerCallbacks
void onReadReady() override;
void onWriteReady(const Network::Socket& socket) override;
void onReceiveError(Api::IoError::IoErrorCode error_code) override;
Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; }

// Network::UdpWorker
void onDataWorker(Network::UdpRecvData&& data) override;

// ActiveListenerImplBase
void pauseListening() override { udp_listener_->disable(); }
void resumeListening() override { udp_listener_->enable(); }
void shutdownListener() override {
// The read filter should be deleted before the UDP listener is deleted.
// The read filter refers to the UDP listener to send packets to downstream.
// If the UDP listener is deleted before the read filter, the read filter may try to use it
// after deletion.
read_filter_.reset();
udp_listener_.reset();
}

// Network::UdpListenerFilterManager
void addReadFilter(Network::UdpListenerReadFilterPtr&& filter) override;

// Network::UdpReadFilterCallbacks
Network::UdpListener& udpListener() override;

private:
Network::UdpListenerReadFilterPtr read_filter_;
Network::UdpPacketWriterPtr udp_packet_writer_;
};

} // namespace Server
} // namespace Envoy
Loading