Skip to content

Commit

Permalink
Refactor HQUnidirStreamDispatcher
Browse files Browse the repository at this point in the history
Summary: This class's needs have changed since inception and can now be much simpler.  The base class just waits for enough data to dispatch the stream.  There are now two derived classes, one for Unidirectional stream (control, push) and one for Bidirectional (unused for now, eventually request|webtransport).

Reviewed By: kvtsoy

Differential Revision: D44116439

fbshipit-source-id: bbfae08fbfecc369c01cb6c7405a345bf1d16fce
  • Loading branch information
afrind authored and facebook-github-bot committed May 18, 2023
1 parent 674a019 commit 8902755
Show file tree
Hide file tree
Showing 15 changed files with 513 additions and 521 deletions.
2 changes: 1 addition & 1 deletion third-party/proxygen/src/proxygen/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ if (BUILD_QUIC)
http/session/HQDownstreamSession.cpp
http/session/HQSession.cpp
http/session/HQStreamBase.cpp
http/session/HQUnidirectionalCallbacks.cpp
http/session/HQStreamDispatcher.cpp
http/session/HQUpstreamSession.cpp
transport/H3DatagramAsyncSocket.cpp
transport/PersistentQuicPskCache.cpp
Expand Down
12 changes: 12 additions & 0 deletions third-party/proxygen/src/proxygen/lib/http/codec/HQFramer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ using PushId = uint64_t;
using ParseResult = folly::Optional<HTTP3::ErrorCode>;
using WriteResult = folly::Expected<size_t, quic::TransportErrorCode>;

enum class UnidirectionalStreamType : uint64_t {
CONTROL = 0x00,
PUSH = 0x01,
QPACK_ENCODER = 0x02,
QPACK_DECODER = 0x03,
GREASE = 0x21,
};

enum class BidirectionalStreamType : uint64_t {
REQUEST = 0x00, // Can be any reserved frame type valid on a bidi stream
};

enum class FrameType : uint64_t {
DATA = 0x00,
HEADERS = 0x01,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,11 @@
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>
#include <proxygen/lib/http/HTTPException.h>
#include <proxygen/lib/http/codec/HQFramer.h>
#include <proxygen/lib/http/codec/HTTPCodec.h>

namespace proxygen { namespace hq {

enum class UnidirectionalStreamType : uint64_t {
CONTROL = 0x00,
PUSH = 0x01,
QPACK_ENCODER = 0x02,
QPACK_DECODER = 0x03,
};
using StreamTypeType = std::underlying_type<UnidirectionalStreamType>::type;
std::ostream& operator<<(std::ostream& os, UnidirectionalStreamType type);

Expand All @@ -44,6 +39,9 @@ folly::Optional<Ret> withType(uint64_t typeval,
case UnidirectionalStreamType::QPACK_DECODER:
return functor(casted);
default:
if (isGreaseId(typeval)) {
return functor(UnidirectionalStreamType::GREASE);
}
return folly::none;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ class HQDownstreamSession : public HQSession {

bool erasePushStream(quic::StreamId streamId) override;

void onNewPushStream(quic::StreamId /* pushStreamId */,
hq::PushId /* pushId */,
size_t /* toConsume */) override {
void dispatchPushStream(quic::StreamId /* pushStreamId */,
hq::PushId /* pushId */,
size_t /* toConsume */) override {
LOG(DFATAL) << "nope";
}

Expand Down
87 changes: 22 additions & 65 deletions third-party/proxygen/src/proxygen/lib/http/session/HQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,39 +1158,10 @@ void HQSession::timeoutExpired() noexcept {
closeWhenIdle();
}

HQSession::HQControlStream* FOLLY_NULLABLE
HQSession::tryCreateIngressControlStream(quic::StreamId id, uint64_t preface) {
auto res = parseStreamPreface(preface);
if (!res) {
LOG(ERROR) << "Got unidirectional stream with unknown preface "
<< static_cast<uint64_t>(preface) << " streamID=" << id
<< " sess=" << *this;
return nullptr;
}

auto ctrlStream = createIngressControlStream(id, *res);
if (!ctrlStream) {
return nullptr;
}

sock_->setControlStream(id);
return ctrlStream;
}

folly::Optional<UnidirectionalStreamType> HQSession::parseStreamPreface(
folly::Optional<UnidirectionalStreamType> HQSession::parseUniStreamPreface(
uint64_t preface) {
hq::UnidirectionalTypeF parse = [](hq::UnidirectionalStreamType type)
-> folly::Optional<UnidirectionalStreamType> {
switch (type) {
case UnidirectionalStreamType::CONTROL:
case UnidirectionalStreamType::PUSH:
case UnidirectionalStreamType::QPACK_ENCODER:
case UnidirectionalStreamType::QPACK_DECODER:
return type;
default:
return folly::none;
}
};
-> folly::Optional<UnidirectionalStreamType> { return type; };
return hq::withType(preface, parse);
}

Expand Down Expand Up @@ -1222,13 +1193,11 @@ void HQSession::readControlStream(HQControlStream* ctrlStream) {
}

// Dispatcher method implementation
void HQSession::assignReadCallback(quic::StreamId id,
hq::UnidirectionalStreamType type,
size_t toConsume,
quic::QuicSocket::ReadCallback* const cb) {
void HQSession::dispatchControlStream(quic::StreamId id,
hq::UnidirectionalStreamType type,
size_t toConsume) {
VLOG(4) << __func__ << " streamID=" << id << " type=" << type
<< " toConsume=" << toConsume << " cb=" << std::hex << cb;
CHECK(cb) << "Bug in dispatcher - null callback passed";
<< " toConsume=" << toConsume;

auto consumeRes = sock_->consume(id, toConsume);
CHECK(!consumeRes.hasError()) << "Unexpected error consuming bytes";
Expand All @@ -1239,17 +1208,16 @@ void HQSession::assignReadCallback(quic::StreamId id,
*this, toConsume, static_cast<HTTPCodec::StreamID>(id));
}

auto ctrlStream =
tryCreateIngressControlStream(id, static_cast<uint64_t>(type));

auto ctrlStream = createIngressControlStream(id, type);
if (!ctrlStream) {
rejectStream(id);
return;
}
sock_->setControlStream(id);

// After reading the preface we can switch to the regular readCallback
sock_->setPeekCallback(id, nullptr);
sock_->setReadCallback(id, cb);
sock_->setReadCallback(id, &controlStreamReadCallback_);

// The transport will send notifications via the read callback
// for the *future* events, but not for this one.
Expand All @@ -1259,41 +1227,31 @@ void HQSession::assignReadCallback(quic::StreamId id,
controlStreamReadAvailable(id);
}

// Dispatcher method implementation
void HQSession::assignPeekCallback(quic::StreamId id,
hq::UnidirectionalStreamType type,
size_t toConsume,
quic::QuicSocket::PeekCallback* const cb) {
VLOG(4) << __func__ << " streamID=" << id << " type=" << type
<< " toConsume=" << toConsume << " cb=" << std::hex << cb;
CHECK(cb) << "Bug in dispatcher - null callback passed";

auto consumeRes = sock_->consume(id, toConsume);
CHECK(!consumeRes.hasError()) << "Unexpected error consuming bytes";

// Install the new peek callback
sock_->setPeekCallback(id, cb);
}

void HQSession::rejectStream(quic::StreamId id) {
// Do not read data for unknown unidirectional stream types.
// setReadCallback will send STOP_SENDING and rely on the peer sending a RESET
// to clear the stream in the transport. It is safe to stop reading from this
// stream. The peer is supposed to reset it on receipt of a STOP_SENDING
if (!sock_) {
return;
}
sock_->setReadCallback(
id, nullptr, HTTP3::ErrorCode::HTTP_STREAM_CREATION_ERROR);
sock_->setPeekCallback(id, nullptr);
if (sock_->isBidirectionalStream(id)) {
sock_->resetStream(id, HTTP3::ErrorCode::HTTP_STREAM_CREATION_ERROR);
}
}

size_t HQSession::cleanupPendingStreams() {
std::vector<quic::StreamId> streamsToCleanup;

// Collect the pending stream ids from the dispatcher
unidirectionalReadDispatcher_.invokeOnPendingStreamIDs(
[&](quic::StreamId pendingStreamId) {
streamsToCleanup.push_back(pendingStreamId);
});
// Cleanup pending streams in the dispatchers
unidirectionalReadDispatcher_.cleanup();
bidirectionalReadDispatcher_.cleanup();

// These streams have been dispatched as push streams but are missing their
// push promise
cleanupUnboundPushStreams(streamsToCleanup);

// Clean up the streams by detaching all callbacks
Expand Down Expand Up @@ -1326,9 +1284,8 @@ void HQSession::controlStreamReadAvailable(quic::StreamId id) {
readControlStream(ctrlStream);
}

void HQSession::controlStreamReadError(
quic::StreamId id,
const HQUnidirStreamDispatcher::Callback::ReadError& error) {
void HQSession::controlStreamReadError(quic::StreamId id,
const quic::QuicError& error) {
VLOG(4) << __func__ << " sess=" << *this << ": readError streamID=" << id
<< " error: " << error;

Expand Down
75 changes: 41 additions & 34 deletions third-party/proxygen/src/proxygen/lib/http/session/HQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <proxygen/lib/http/codec/HTTPSettings.h>
#include <proxygen/lib/http/session/HQByteEventTracker.h>
#include <proxygen/lib/http/session/HQStreamBase.h>
#include <proxygen/lib/http/session/HQUnidirectionalCallbacks.h>
#include <proxygen/lib/http/session/HQStreamDispatcher.h>
#include <proxygen/lib/http/session/HTTPSessionBase.h>
#include <proxygen/lib/http/session/HTTPSessionController.h>
#include <proxygen/lib/http/session/HTTPTransaction.h>
Expand Down Expand Up @@ -82,7 +82,8 @@ class HQSession
, public quic::QuicSocket::PingCallback
, public HTTPSessionBase
, public folly::EventBase::LoopCallback
, public HQUnidirStreamDispatcher::Callback {
, public HQUniStreamDispatcher::Callback
, public HQBidiStreamDispatcher::Callback {
// Forward declarations
public:
class HQStreamTransportBase;
Expand Down Expand Up @@ -277,11 +278,6 @@ class HQSession
return CodecProtocol::HTTP_3;
}

// for testing only
HQUnidirStreamDispatcher* getDispatcher() {
return &unidirectionalReadDispatcher_;
}

const TimePoint& getTransportStart() const {
return transportStart_;
}
Expand Down Expand Up @@ -605,37 +601,27 @@ class HQSession
return nullptr;
}

/*
* for HQ we need a read callback for unidirectional streams to read the
* stream type from the the wire to decide whether a stream is
* a control stream, a header codec/decoder stream or a push stream
*
* This part is now implemented in HQUnidirStreamDispatcher
*/
// Callback methods that are invoked by the stream dispatchers
void dispatchControlStream(quic::StreamId /* id */,
hq::UnidirectionalStreamType /* type */,
size_t /* toConsume */) override;

// Callback methods that are invoked by the dispatcher
void assignPeekCallback(
quic::StreamId /* id */,
hq::UnidirectionalStreamType /* type */,
size_t /* toConsume */,
quic::QuicSocket::PeekCallback* const /* cb */) override;
void dispatchRequestStream(quic::StreamId /* streamId */) override {
}

void assignReadCallback(
quic::StreamId /* id */,
hq::UnidirectionalStreamType /* type */,
size_t /* toConsume */,
quic::QuicSocket::ReadCallback* const /* cb */) override;
std::chrono::milliseconds getDispatchTimeout() const override {
return transactionsTimeout_;
}

void rejectStream(quic::StreamId /* id */) override;

folly::Optional<hq::UnidirectionalStreamType> parseStreamPreface(
folly::Optional<hq::UnidirectionalStreamType> parseUniStreamPreface(
uint64_t preface) override;

void controlStreamReadAvailable(quic::StreamId /* id */) override;

void controlStreamReadError(
quic::StreamId /* id */,
const HQUnidirStreamDispatcher::Callback::ReadError& /* err */) override;
folly::Optional<hq::BidirectionalStreamType> parseBidiStreamPreface(
uint64_t) override {
return hq::BidirectionalStreamType::REQUEST;
}

/**
* HQSession is an HTTPSessionBase that uses QUIC as the underlying transport
Expand Down Expand Up @@ -665,6 +651,8 @@ class HQSession
dropping_(false),
inLoopCallback_(false),
unidirectionalReadDispatcher_(*this, direction),
bidirectionalReadDispatcher_(*this, direction),
controlStreamReadCallback_(*this),
sessionObserverAccessor_(this),
sessionObserverContainer_(&sessionObserverAccessor_) {
codec_.add<HTTPChecks>();
Expand Down Expand Up @@ -749,8 +737,6 @@ class HQSession
HQStreamTransport* createStreamTransport(quic::StreamId streamId);

bool createEgressControlStreams();
HQControlStream* tryCreateIngressControlStream(quic::StreamId id,
uint64_t preface);

// Creates outgoing control stream.
bool createEgressControlStream(hq::UnidirectionalStreamType streamType);
Expand Down Expand Up @@ -1045,6 +1031,22 @@ class HQSession
bool readEOF_{false};
}; // HQControlStream

// Callback for the control stream - follows the read api
struct ControlStreamReadCallback : public quic::QuicSocket::ReadCallback {
explicit ControlStreamReadCallback(HQSession& session) : session_(session) {
}
~ControlStreamReadCallback() override = default;
void readAvailable(quic::StreamId id) noexcept override {
session_.controlStreamReadAvailable(id);
}
void readError(quic::StreamId id, quic::QuicError error) noexcept override {
session_.controlStreamReadError(id, error);
}

protected:
HQSession& session_;
};

public:
class HQStreamTransportBase
: public HQStreamBase
Expand Down Expand Up @@ -1813,10 +1815,15 @@ class HQSession
// Remove all callbacks from a stream during cleanup
void clearStreamCallbacks(quic::StreamId /* id */);

void controlStreamReadAvailable(quic::StreamId id);
void controlStreamReadError(quic::StreamId id, const quic::QuicError& error);

using ControlStreamsKey = std::pair<quic::StreamId, hq::StreamDirection>;
std::unordered_map<hq::UnidirectionalStreamType, HQControlStream>
controlStreams_;
HQUnidirStreamDispatcher unidirectionalReadDispatcher_;
HQUniStreamDispatcher unidirectionalReadDispatcher_;
HQBidiStreamDispatcher bidirectionalReadDispatcher_;
ControlStreamReadCallback controlStreamReadCallback_;
QPACKCodec qpackCodec_;

// Min Stream ID we haven't seen so far
Expand Down
Loading

0 comments on commit 8902755

Please sign in to comment.