Skip to content

Commit

Permalink
add frame length parser
Browse files Browse the repository at this point in the history
Summary:
Currently the logic for parsing a Rocket from the AsyncTransport has multiple different parsing strategies are in the same logic space (Parser.h and Parser-inl.h).  In these files there are around 4 different implemetations of paring logic controlled with if/else statements. There are also different different buffers that can be used depending on the logic being used. This diff creates a ParserStrategy class that can be implemented to encapsulate the different parsing strategies that we want to use. The idea is to eventually move the Parser logic in to different strategy classes, and remove the Parser-inl.h file.

There's an implementation of the ParserStrategy that uses the RSocket frame length to determine when to emit a new frame that shows you'd use the ParserStrategy.

Reviewed By: praihan, AkramaMirza

Differential Revision: D43428840

fbshipit-source-id: bd73f24c3adc4a3545ef7e655c0ebf0d3612d013
  • Loading branch information
Robert Roeser authored and facebook-github-bot committed May 16, 2023
1 parent 00920a2 commit 83b0673
Show file tree
Hide file tree
Showing 6 changed files with 568 additions and 0 deletions.
10 changes: 10 additions & 0 deletions thrift/conformance/stresstest/example/ExampleScenarios.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ THRIFT_STRESS_TEST(Echo4096b) {
co_await client->co_echo(s);
}

THRIFT_STRESS_TEST(Echo32k) {
static std::string const s(1 << 15, '?');
co_await client->co_echo(s);
}

THRIFT_STRESS_TEST(Echo512k) {
static std::string const s(1 << 19, '?');
co_await client->co_echo(s);
}

THRIFT_STRESS_TEST(Echo4M) {
static std::string const s(4096000, '?');
co_await client->co_echo(s);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <thrift/lib/cpp2/transport/rocket/framing/FrameType.h>
#include <thrift/lib/cpp2/transport/rocket/framing/Frames.h>
#include <thrift/lib/cpp2/transport/rocket/framing/Serializer.h>
#include <thrift/lib/cpp2/transport/rocket/framing/Util.h>
#include <thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy.h>

namespace apache {
namespace thrift {
namespace rocket {

template <class T>
FrameLengthParserStrategy<T>::~FrameLengthParserStrategy() {
if (frameLengthAndFieldSize_) {
owner_.decMemoryUsage(frameLengthAndFieldSize_);
}
}

template <class T>
void FrameLengthParserStrategy<T>::getReadBuffer(
void** bufReturn, size_t* lenReturn) {
auto tail = readBufQueue_.tailroom();
if (tail < Serializer::kBytesForFrameOrMetadataLength) {
const auto ret = readBufQueue_.preallocate(minBufferSize_, maxBufferSize_);
*bufReturn = ret.first;
*lenReturn = ret.second;
} else {
*bufReturn = readBufQueue_.writableTail();
*lenReturn = tail;
}
}

template <class T>
void FrameLengthParserStrategy<T>::readDataAvailable(size_t len) {
incrSize(len);
readBufQueue_.postallocate(len);
drainReadBufQueue<true>();
}

template <class T>
void FrameLengthParserStrategy<T>::readBufferAvailable(
std::unique_ptr<folly::IOBuf> buf) {
incrSize(buf->computeChainDataLength());
readBufQueue_.append(std::move(buf), true, true);
drainReadBufQueue<false>();
}

template <class T>
template <bool resize>
void FrameLengthParserStrategy<T>::drainReadBufQueue() {
while (size_ >= Serializer::kBytesForFrameOrMetadataLength) {
if (!frameLength_) {
computeFrameLength();

if (UNLIKELY(!owner_.incMemoryUsage(frameLengthAndFieldSize_))) {
return;
}

if (resize) {
tryResize();
}
}

if (size_ < frameLengthAndFieldSize_) {
return;
}

// skip frame length field
readBufQueue_.trimStart(Serializer::kBytesForFrameOrMetadataLength);

// split out frame
auto frame = readBufQueue_.split(frameLength_);

// hand frame off
owner_.handleFrame(std::move(frame));

// reset the frame length fields
resetFrameLength();
}
}

template <class T>
void FrameLengthParserStrategy<T>::computeFrameLength() {
cursor_.reset(readBufQueue_.front());
frameLength_ = readFrameOrMetadataSize(cursor_);
frameLengthAndFieldSize_ =
frameLength_ + Serializer::kBytesForFrameOrMetadataLength;
}

template <class T>
void FrameLengthParserStrategy<T>::resetFrameLength() {
owner_.decMemoryUsage(frameLengthAndFieldSize_);
size_ -= frameLengthAndFieldSize_;
frameLength_ = 0;
frameLengthAndFieldSize_ = 0;
}

template <class T>
void FrameLengthParserStrategy<T>::tryResize() {
if (readBufQueue_.tailroom() < frameLength_) {
auto max = std::max(frameLengthAndFieldSize_, maxBufferSize_);
readBufQueue_.preallocate(minBufferSize_, max, max);
}
}

} // namespace rocket
} // namespace thrift
} // namespace apache
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/io/Cursor.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>

namespace apache {
namespace thrift {
namespace rocket {

template <class T>
class FrameLengthParserStrategy {
public:
explicit FrameLengthParserStrategy(
T& owner, size_t minBufferSize = 256, size_t maxBufferSize = 4096)
: owner_(owner),
minBufferSize_(minBufferSize),
maxBufferSize_(maxBufferSize) {}
~FrameLengthParserStrategy();

void getReadBuffer(void** bufReturn, size_t* lenReturn);
void readDataAvailable(size_t len);
void readBufferAvailable(std::unique_ptr<folly::IOBuf> buf);

// Functions for testing
size_t getFrameLength() { return frameLength_; }
size_t getFrameLengthAndFieldSize() { return frameLengthAndFieldSize_; }
size_t getSize() { return size_; }

private:
template <bool resize>
FOLLY_ALWAYS_INLINE void drainReadBufQueue();

FOLLY_ALWAYS_INLINE void computeFrameLength();
FOLLY_ALWAYS_INLINE void resetFrameLength();
FOLLY_ALWAYS_INLINE void incrSize(size_t delta) { size_ += delta; }
FOLLY_ALWAYS_INLINE void tryResize();

T& owner_;
size_t size_{0};
size_t frameLength_{0};
size_t frameLengthAndFieldSize_{0};
size_t minBufferSize_;
size_t maxBufferSize_;
folly::IOBufQueue readBufQueue_{folly::IOBufQueue::cacheChainLength()};
folly::io::Cursor cursor_{readBufQueue_.front()};
};

} // namespace rocket
} // namespace thrift
} // namespace apache

#include <thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy-inl.h>
53 changes: 53 additions & 0 deletions thrift/lib/cpp2/transport/rocket/framing/parser/ParserStrategy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>

namespace apache {
namespace thrift {
namespace rocket {

// C++20 concept
// template <typename Strategy>
// concept ParserStrategy = requires (Strategy s) {
// { s.getReadBuffer(std::declval<void**>(), std::declval<size_t*>()) } ->
// std::same_as<void>; { s.readDataAvailable(std::declval<size_t>()) } ->
// std::same_as<void>; {
// s.readBufferAvailable(std::declval<std::unique_ptr<folly::IOBuf>>()) } ->
// std::same_as<void>;
//};
template <class T, template <typename> class Strategy>
class ParserStrategy : private Strategy<T> {
public:
explicit ParserStrategy(T& owner) : Strategy<T>(owner) {}

void getReadBuffer(void** bufReturn, size_t* lenReturn) {
Strategy<T>::getReadBuffer(bufReturn, lenReturn);
}

void readDataAvailable(size_t len) { Strategy<T>::readDataAvailable(len); }

void readBufferAvailable(std::unique_ptr<folly::IOBuf> buf) {
Strategy<T>::readBufferAvailable(std::move(buf));
}
};

} // namespace rocket
} // namespace thrift
} // namespace apache
Loading

0 comments on commit 83b0673

Please sign in to comment.