Skip to content

Commit

Permalink
quic: various additional cleanups, fixes in Endpoint
Browse files Browse the repository at this point in the history
PR-URL: #51310
Reviewed-By: Yagiz Nizipli <[email protected]>
  • Loading branch information
jasnell authored and targos committed Feb 15, 2024
1 parent 17c554f commit 4d06d80
Show file tree
Hide file tree
Showing 25 changed files with 2,108 additions and 481 deletions.
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@
'src/quic/cid.cc',
'src/quic/data.cc',
'src/quic/endpoint.cc',
'src/quic/http3.cc',
'src/quic/logstream.cc',
'src/quic/packet.cc',
'src/quic/preferredaddress.cc',
Expand All @@ -370,6 +371,7 @@
'src/quic/cid.h',
'src/quic/data.h',
'src/quic/endpoint.h',
'src/quic/http3.h',
'src/quic/logstream.h',
'src/quic/packet.h',
'src/quic/preferredaddress.h',
Expand Down
3 changes: 2 additions & 1 deletion src/debug_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ void NODE_EXTERN_PRIVATE FWrite(FILE* file, const std::string& str);
V(WASI) \
V(MKSNAPSHOT) \
V(SNAPSHOT_SERDES) \
V(PERMISSION_MODEL)
V(PERMISSION_MODEL) \
V(QUIC)

enum class DebugCategory : unsigned int {
#define V(name) name,
Expand Down
120 changes: 91 additions & 29 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC

#include "application.h"
#include <async_wrap-inl.h>
#include <debug_utils-inl.h>
#include <node_bob.h>
#include <node_sockaddr-inl.h>
#include <uv.h>
#include <v8.h>
#include "defs.h"
#include "endpoint.h"
#include "http3.h"
#include "packet.h"
#include "session.h"

Expand All @@ -21,24 +24,48 @@ using v8::Value;

namespace quic {

struct Session::Application::StreamData final {
// The actual number of vectors in the struct, up to kMaxVectorCount.
size_t count = 0;
size_t remaining = 0;
// The stream identifier. If this is a negative value then no stream is
// identified.
int64_t id = -1;
int fin = 0;
ngtcp2_vec data[kMaxVectorCount]{};
ngtcp2_vec* buf = data;
BaseObjectPtr<Stream> stream;
};

// ============================================================================
// Session::Application_Options
const Session::Application_Options Session::Application_Options::kDefault = {};

Session::Application_Options::operator const nghttp3_settings() const {
// In theory, Application_Options might contain options for more than just
// HTTP/3. Here we extract only the properties that are relevant to HTTP/3.
return nghttp3_settings{
max_field_section_size,
static_cast<size_t>(qpack_max_dtable_capacity),
static_cast<size_t>(qpack_encoder_max_dtable_capacity),
static_cast<size_t>(qpack_blocked_streams),
enable_connect_protocol,
enable_datagrams,
};
}

std::string Session::Application_Options::ToString() const {
DebugIndentScope indent;
auto prefix = indent.Prefix();
std::string res("{");
res += prefix + "max header pairs: " + std::to_string(max_header_pairs);
res += prefix + "max header length: " + std::to_string(max_header_length);
res += prefix +
"max field section size: " + std::to_string(max_field_section_size);
res += prefix + "qpack max dtable capacity: " +
std::to_string(qpack_max_dtable_capacity);
res += prefix + "qpack encoder max dtable capacity: " +
std::to_string(qpack_encoder_max_dtable_capacity);
res += prefix +
"qpack blocked streams: " + std::to_string(qpack_blocked_streams);
res += prefix + "enable connect protocol: " +
(enable_connect_protocol ? std::string("yes") : std::string("no"));
res += prefix + "enable datagrams: " +
(enable_datagrams ? std::string("yes") : std::string("no"));
res += indent.Close();
return res;
}

Maybe<Session::Application_Options> Session::Application_Options::From(
Environment* env, Local<Value> value) {
if (value.IsEmpty()) {
if (value.IsEmpty() || (!value->IsUndefined() && !value->IsObject())) {
THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object");
return Nothing<Application_Options>();
}
Expand All @@ -49,11 +76,6 @@ Maybe<Session::Application_Options> Session::Application_Options::From(
return Just<Application_Options>(options);
}

if (!value->IsObject()) {
THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object");
return Nothing<Application_Options>();
}

auto params = value.As<Object>();

#define SET(name) \
Expand All @@ -63,7 +85,8 @@ Maybe<Session::Application_Options> Session::Application_Options::From(

if (!SET(max_header_pairs) || !SET(max_header_length) ||
!SET(max_field_section_size) || !SET(qpack_max_dtable_capacity) ||
!SET(qpack_encoder_max_dtable_capacity) || !SET(qpack_blocked_streams)) {
!SET(qpack_encoder_max_dtable_capacity) || !SET(qpack_blocked_streams) ||
!SET(enable_connect_protocol) || !SET(enable_datagrams)) {
return Nothing<Application_Options>();
}

Expand All @@ -78,16 +101,22 @@ Session::Application::Application(Session* session, const Options& options)
bool Session::Application::Start() {
// By default there is nothing to do. Specific implementations may
// override to perform more actions.
Debug(session_, "Session application started");
return true;
}

void Session::Application::AcknowledgeStreamData(Stream* stream,
size_t datalen) {
Debug(session_,
"Application acknowledging stream %" PRIi64 " data: %zu",
stream->id(),
datalen);
DCHECK_NOT_NULL(stream);
stream->Acknowledge(datalen);
}

void Session::Application::BlockStream(int64_t id) {
Debug(session_, "Application blocking stream %" PRIi64, id);
auto stream = session().FindStream(id);
if (stream) stream->EmitBlocked();
}
Expand All @@ -96,6 +125,7 @@ bool Session::Application::CanAddHeader(size_t current_count,
size_t current_headers_length,
size_t this_header_length) {
// By default headers are not supported.
Debug(session_, "Application cannot add header");
return false;
}

Expand All @@ -104,33 +134,39 @@ bool Session::Application::SendHeaders(const Stream& stream,
const v8::Local<v8::Array>& headers,
HeadersFlags flags) {
// By default do nothing.
Debug(session_, "Application cannot send headers");
return false;
}

void Session::Application::ResumeStream(int64_t id) {
Debug(session_, "Application resuming stream %" PRIi64, id);
// By default do nothing.
}

void Session::Application::ExtendMaxStreams(EndpointLabel label,
Direction direction,
uint64_t max_streams) {
Debug(session_, "Application extending max streams");
// By default do nothing.
}

void Session::Application::ExtendMaxStreamData(Stream* stream,
uint64_t max_data) {
Debug(session_, "Application extending max stream data");
// By default do nothing.
}

void Session::Application::CollectSessionTicketAppData(
SessionTicket::AppData* app_data) const {
Debug(session_, "Application collecting session ticket app data");
// By default do nothing.
}

SessionTicket::AppData::Status
Session::Application::ExtractSessionTicketAppData(
const SessionTicket::AppData& app_data,
SessionTicket::AppData::Source::Flag flag) {
Debug(session_, "Application extracting session ticket app data");
// By default we do not have any application data to retrieve.
return flag == SessionTicket::AppData::Source::Flag::STATUS_RENEW
? SessionTicket::AppData::Status::TICKET_USE_RENEW
Expand All @@ -140,14 +176,16 @@ Session::Application::ExtractSessionTicketAppData(
void Session::Application::SetStreamPriority(const Stream& stream,
StreamPriority priority,
StreamPriorityFlags flags) {
Debug(
session_, "Application setting stream %" PRIi64 " priority", stream.id());
// By default do nothing.
}

StreamPriority Session::Application::GetStreamPriority(const Stream& stream) {
return StreamPriority::DEFAULT;
}

BaseObjectPtr<Packet> Session::Application::CreateStreamDataPacket() {
Packet* Session::Application::CreateStreamDataPacket() {
return Packet::Create(env(),
session_->endpoint_.get(),
session_->remote_address_,
Expand All @@ -156,24 +194,37 @@ BaseObjectPtr<Packet> Session::Application::CreateStreamDataPacket() {
}

void Session::Application::StreamClose(Stream* stream, QuicError error) {
Debug(session_,
"Application closing stream %" PRIi64 " with error %s",
stream->id(),
error);
stream->Destroy(error);
}

void Session::Application::StreamStopSending(Stream* stream, QuicError error) {
Debug(session_,
"Application stopping sending on stream %" PRIi64 " with error %s",
stream->id(),
error);
DCHECK_NOT_NULL(stream);
stream->ReceiveStopSending(error);
}

void Session::Application::StreamReset(Stream* stream,
uint64_t final_size,
QuicError error) {
Debug(session_,
"Application resetting stream %" PRIi64 " with error %s",
stream->id(),
error);
stream->ReceiveStreamReset(final_size, error);
}

void Session::Application::SendPendingData() {
Debug(session_, "Application sending pending data");
PathStorage path;

BaseObjectPtr<Packet> packet;
Packet* packet = nullptr;
uint8_t* pos = nullptr;
int err = 0;

Expand All @@ -182,6 +233,7 @@ void Session::Application::SendPendingData() {
size_t packetSendCount = 0;

const auto updateTimer = [&] {
Debug(session_, "Application updating the session timer");
ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime());
session_->UpdateTimer();
};
Expand Down Expand Up @@ -209,9 +261,9 @@ void Session::Application::SendPendingData() {
return session_->Close(Session::CloseMethod::SILENT);
}

if (!packet) {
if (packet == nullptr) {
packet = CreateStreamDataPacket();
if (!packet) {
if (packet == nullptr) {
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
return session_->Close(Session::CloseMethod::SILENT);
}
Expand Down Expand Up @@ -319,12 +371,14 @@ class DefaultApplication final : public Session::Application {
const uint8_t* data,
size_t datalen,
Stream::ReceiveDataFlags flags) override {
Debug(&session(), "Default application receiving stream data");
DCHECK_NOT_NULL(stream);
if (!stream->is_destroyed()) stream->ReceiveData(data, datalen, flags);
return true;
}

int GetStreamData(StreamData* stream_data) override {
Debug(&session(), "Default application getting stream data");
DCHECK_NOT_NULL(stream_data);
// If the queue is empty, there aren't any streams with data yet
if (stream_queue_.IsEmpty()) return 0;
Expand Down Expand Up @@ -380,7 +434,10 @@ class DefaultApplication final : public Session::Application {
return 0;
}

void ResumeStream(int64_t id) override { ScheduleStream(id); }
void ResumeStream(int64_t id) override {
Debug(&session(), "Default application resuming stream %" PRIi64, id);
ScheduleStream(id);
}

bool ShouldSetFin(const StreamData& stream_data) override {
auto const is_empty = [](auto vec, size_t cnt) {
Expand All @@ -394,6 +451,7 @@ class DefaultApplication final : public Session::Application {
}

bool StreamCommit(StreamData* stream_data, size_t datalen) override {
Debug(&session(), "Default application committing stream data");
DCHECK_NOT_NULL(stream_data);
const auto consume = [](ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
ngtcp2_vec* v = *pvec;
Expand Down Expand Up @@ -425,13 +483,15 @@ class DefaultApplication final : public Session::Application {

private:
void ScheduleStream(int64_t id) {
Debug(&session(), "Default application scheduling stream %" PRIi64, id);
auto stream = session().FindStream(id);
if (stream && !stream->is_destroyed()) {
stream->Schedule(&stream_queue_);
}
}

void UnscheduleStream(int64_t id) {
Debug(&session(), "Default application unscheduling stream %" PRIi64, id);
auto stream = session().FindStream(id);
if (stream && !stream->is_destroyed()) stream->Unschedule();
}
Expand All @@ -440,13 +500,15 @@ class DefaultApplication final : public Session::Application {
};

std::unique_ptr<Session::Application> Session::select_application() {
// if (config.options.crypto_options.alpn == NGHTTP3_ALPN_H3)
// return std::make_unique<Http3>(session,
// config.options.application_options);

// In the future, we may end up supporting additional QUIC protocols. As they
// are added, extend the cases here to create and return them.

if (config_.options.tls_options.alpn == NGHTTP3_ALPN_H3) {
Debug(this, "Selecting HTTP/3 application");
return createHttp3Application(this, config_.options.application_options);
}

Debug(this, "Selecting default application");
return std::make_unique<DefaultApplication>(
this, config_.options.application_options);
}
Expand Down
18 changes: 17 additions & 1 deletion src/quic/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ class Session::Application : public MemoryRetainer {
protected:
inline Environment* env() const { return session_->env(); }
inline Session& session() { return *session_; }
inline const Session& session() const { return *session_; }

BaseObjectPtr<Packet> CreateStreamDataPacket();
Packet* CreateStreamDataPacket();

struct StreamData;

Expand All @@ -137,6 +138,21 @@ class Session::Application : public MemoryRetainer {
Session* session_;
};

struct Session::Application::StreamData final {
// The actual number of vectors in the struct, up to kMaxVectorCount.
size_t count = 0;
size_t remaining = 0;
// The stream identifier. If this is a negative value then no stream is
// identified.
int64_t id = -1;
int fin = 0;
ngtcp2_vec data[kMaxVectorCount]{};
ngtcp2_vec* buf = data;
BaseObjectPtr<Stream> stream;

inline operator nghttp3_vec() const { return {data[0].base, data[0].len}; }
};

} // namespace quic
} // namespace node

Expand Down
Loading

0 comments on commit 4d06d80

Please sign in to comment.