diff --git a/thrift/conformance/stresstest/example/ExampleScenarios.cpp b/thrift/conformance/stresstest/example/ExampleScenarios.cpp index b1630e3ea29..1bf498aa435 100644 --- a/thrift/conformance/stresstest/example/ExampleScenarios.cpp +++ b/thrift/conformance/stresstest/example/ExampleScenarios.cpp @@ -40,6 +40,16 @@ THRIFT_STRESS_TEST(Echo4096b) { co_await client->co_echo(s); } +THRIFT_STRESS_TEST(Echo32k) { + static std::string const s(1 << 15, '?'); + co_await client->co_echo(s); +} + +THRIFT_STRESS_TEST(Echo512k) { + static std::string const s(1 << 19, '?'); + co_await client->co_echo(s); +} + THRIFT_STRESS_TEST(Echo4M) { static std::string const s(4096000, '?'); co_await client->co_echo(s); diff --git a/thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy-inl.h b/thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy-inl.h new file mode 100644 index 00000000000..96afcad7ad2 --- /dev/null +++ b/thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy-inl.h @@ -0,0 +1,125 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace apache { +namespace thrift { +namespace rocket { + +template +FrameLengthParserStrategy::~FrameLengthParserStrategy() { + if (frameLengthAndFieldSize_) { + owner_.decMemoryUsage(frameLengthAndFieldSize_); + } +} + +template +void FrameLengthParserStrategy::getReadBuffer( + void** bufReturn, size_t* lenReturn) { + auto tail = readBufQueue_.tailroom(); + if (tail < Serializer::kBytesForFrameOrMetadataLength) { + const auto ret = readBufQueue_.preallocate(minBufferSize_, maxBufferSize_); + *bufReturn = ret.first; + *lenReturn = ret.second; + } else { + *bufReturn = readBufQueue_.writableTail(); + *lenReturn = tail; + } +} + +template +void FrameLengthParserStrategy::readDataAvailable(size_t len) { + incrSize(len); + readBufQueue_.postallocate(len); + drainReadBufQueue(); +} + +template +void FrameLengthParserStrategy::readBufferAvailable( + std::unique_ptr buf) { + incrSize(buf->computeChainDataLength()); + readBufQueue_.append(std::move(buf), true, true); + drainReadBufQueue(); +} + +template +template +void FrameLengthParserStrategy::drainReadBufQueue() { + while (size_ >= Serializer::kBytesForFrameOrMetadataLength) { + if (!frameLength_) { + computeFrameLength(); + + if (UNLIKELY(!owner_.incMemoryUsage(frameLengthAndFieldSize_))) { + return; + } + + if (resize) { + tryResize(); + } + } + + if (size_ < frameLengthAndFieldSize_) { + return; + } + + // skip frame length field + readBufQueue_.trimStart(Serializer::kBytesForFrameOrMetadataLength); + + // split out frame + auto frame = readBufQueue_.split(frameLength_); + + // hand frame off + owner_.handleFrame(std::move(frame)); + + // reset the frame length fields + resetFrameLength(); + } +} + +template +void FrameLengthParserStrategy::computeFrameLength() { + cursor_.reset(readBufQueue_.front()); + frameLength_ = readFrameOrMetadataSize(cursor_); + frameLengthAndFieldSize_ = + frameLength_ + Serializer::kBytesForFrameOrMetadataLength; +} + +template +void FrameLengthParserStrategy::resetFrameLength() { + owner_.decMemoryUsage(frameLengthAndFieldSize_); + size_ -= frameLengthAndFieldSize_; + frameLength_ = 0; + frameLengthAndFieldSize_ = 0; +} + +template +void FrameLengthParserStrategy::tryResize() { + if (readBufQueue_.tailroom() < frameLength_) { + auto max = std::max(frameLengthAndFieldSize_, maxBufferSize_); + readBufQueue_.preallocate(minBufferSize_, max, max); + } +} + +} // namespace rocket +} // namespace thrift +} // namespace apache diff --git a/thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy.h b/thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy.h new file mode 100644 index 00000000000..18cc5b9cba1 --- /dev/null +++ b/thrift/lib/cpp2/transport/rocket/framing/parser/FrameLengthParserStrategy.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace apache { +namespace thrift { +namespace rocket { + +template +class FrameLengthParserStrategy { + public: + explicit FrameLengthParserStrategy( + T& owner, size_t minBufferSize = 256, size_t maxBufferSize = 4096) + : owner_(owner), + minBufferSize_(minBufferSize), + maxBufferSize_(maxBufferSize) {} + ~FrameLengthParserStrategy(); + + void getReadBuffer(void** bufReturn, size_t* lenReturn); + void readDataAvailable(size_t len); + void readBufferAvailable(std::unique_ptr buf); + + // Functions for testing + size_t getFrameLength() { return frameLength_; } + size_t getFrameLengthAndFieldSize() { return frameLengthAndFieldSize_; } + size_t getSize() { return size_; } + + private: + template + FOLLY_ALWAYS_INLINE void drainReadBufQueue(); + + FOLLY_ALWAYS_INLINE void computeFrameLength(); + FOLLY_ALWAYS_INLINE void resetFrameLength(); + FOLLY_ALWAYS_INLINE void incrSize(size_t delta) { size_ += delta; } + FOLLY_ALWAYS_INLINE void tryResize(); + + T& owner_; + size_t size_{0}; + size_t frameLength_{0}; + size_t frameLengthAndFieldSize_{0}; + size_t minBufferSize_; + size_t maxBufferSize_; + folly::IOBufQueue readBufQueue_{folly::IOBufQueue::cacheChainLength()}; + folly::io::Cursor cursor_{readBufQueue_.front()}; +}; + +} // namespace rocket +} // namespace thrift +} // namespace apache + +#include diff --git a/thrift/lib/cpp2/transport/rocket/framing/parser/ParserStrategy.h b/thrift/lib/cpp2/transport/rocket/framing/parser/ParserStrategy.h new file mode 100644 index 00000000000..a74b3444637 --- /dev/null +++ b/thrift/lib/cpp2/transport/rocket/framing/parser/ParserStrategy.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace apache { +namespace thrift { +namespace rocket { + +// C++20 concept +// template +// concept ParserStrategy = requires (Strategy s) { +// { s.getReadBuffer(std::declval(), std::declval()) } -> +// std::same_as; { s.readDataAvailable(std::declval()) } -> +// std::same_as; { +// s.readBufferAvailable(std::declval>()) } -> +// std::same_as; +//}; +template class Strategy> +class ParserStrategy : private Strategy { + public: + explicit ParserStrategy(T& owner) : Strategy(owner) {} + + void getReadBuffer(void** bufReturn, size_t* lenReturn) { + Strategy::getReadBuffer(bufReturn, lenReturn); + } + + void readDataAvailable(size_t len) { Strategy::readDataAvailable(len); } + + void readBufferAvailable(std::unique_ptr buf) { + Strategy::readBufferAvailable(std::move(buf)); + } +}; + +} // namespace rocket +} // namespace thrift +} // namespace apache diff --git a/thrift/lib/cpp2/transport/rocket/framing/parser/test/FrameLengthParserStrategyTest.cpp b/thrift/lib/cpp2/transport/rocket/framing/parser/test/FrameLengthParserStrategyTest.cpp new file mode 100644 index 00000000000..110d1a8c54c --- /dev/null +++ b/thrift/lib/cpp2/transport/rocket/framing/parser/test/FrameLengthParserStrategyTest.cpp @@ -0,0 +1,223 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +namespace apache { +namespace thrift { +namespace rocket { + +class FakeOwner { + public: + void handleFrame(std::unique_ptr buf) { + frames_.push_back(std::move(buf)); + } + bool incMemoryUsage(uint32_t n) { + memoryCounter_ += n; + return true; + } + void decMemoryUsage(uint32_t n) { memoryCounter_ -= n; } + + std::unique_ptr customAlloc(size_t) { return nullptr; } + + std::vector> frames_{}; + + uint32_t memoryCounter_ = 0; +}; + +TEST(FrameLengthParserTest, testAppendFrame) { + FakeOwner owner; + FrameLengthParserStrategy parser(owner); + + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(23); + + EXPECT_EQ(parser.getSize(), 0); + EXPECT_EQ(parser.getFrameLength(), 0); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 0); + EXPECT_EQ(owner.memoryCounter_, 0); + EXPECT_EQ(owner.frames_.size(), 1); + + auto frame = std::move(owner.frames_[0]); + EXPECT_EQ(frame->length(), 20); +} + +TEST(FrameLengthParserTest, testAppendLessThanFullFrame) { + FakeOwner owner; + FrameLengthParserStrategy parser(owner); + + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(10); + + EXPECT_EQ(parser.getSize(), 10); + EXPECT_EQ(parser.getFrameLength(), 20); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 23); + EXPECT_EQ(owner.memoryCounter_, 23); + EXPECT_EQ(owner.frames_.size(), 0); +} + +TEST(FrameLengthParserTest, testAppendLessTwiceAndGetAFrame) { + FakeOwner owner; + FrameLengthParserStrategy parser(owner); + + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(10); + + EXPECT_EQ(parser.getSize(), 10); + EXPECT_EQ(parser.getFrameLength(), 20); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 23); + EXPECT_EQ(owner.memoryCounter_, 23); + EXPECT_EQ(owner.frames_.size(), 0); + + parser.readDataAvailable(13); + + EXPECT_EQ(parser.getSize(), 0); + EXPECT_EQ(parser.getFrameLength(), 0); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 0); + EXPECT_EQ(owner.memoryCounter_, 0); + EXPECT_EQ(owner.frames_.size(), 1); +} + +TEST(FrameLengthParserTest, testAppendMultipleFrames) { + FakeOwner owner; + FrameLengthParserStrategy parser(owner); + + { + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(23); + + EXPECT_EQ(parser.getSize(), 0); + EXPECT_EQ(parser.getFrameLength(), 0); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 0); + EXPECT_EQ(owner.memoryCounter_, 0); + EXPECT_EQ(owner.frames_.size(), 1); + } + + { + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(23); + + EXPECT_EQ(parser.getSize(), 0); + EXPECT_EQ(parser.getFrameLength(), 0); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 0); + EXPECT_EQ(owner.memoryCounter_, 0); + EXPECT_EQ(owner.frames_.size(), 2); + } + + { + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(23); + + EXPECT_EQ(parser.getSize(), 0); + EXPECT_EQ(parser.getFrameLength(), 0); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 0); + EXPECT_EQ(owner.memoryCounter_, 0); + EXPECT_EQ(owner.frames_.size(), 3); + } +} + +TEST(FrameLengthParserTest, testAppendUsingIOBuf) { + FakeOwner owner; + FrameLengthParserStrategy parser(owner); + + folly::IOBufQueue queue = + folly::IOBufQueue{folly::IOBufQueue::cacheChainLength()}; + + auto lenBuf = folly::IOBuf::create(3); + HeaderSerializer serializer(lenBuf->writableBuffer(), 3); + serializer.writeFrameOrMetadataSize(20); + lenBuf->append(3); + + queue.append(std::move(lenBuf)); + + EXPECT_EQ(queue.chainLength(), 3); + + auto strBuf = folly::IOBuf::copyBuffer(std::string(20, 'b')); + + EXPECT_EQ(strBuf->length(), 20); + + queue.append(std::move(strBuf)); + + EXPECT_EQ(queue.chainLength(), 23); + + auto buf = queue.split(23); + + EXPECT_EQ(buf->computeChainDataLength(), 23); + EXPECT_EQ(queue.chainLength(), 0); + + parser.readBufferAvailable(std::move(buf)); + + EXPECT_EQ(parser.getSize(), 0); + EXPECT_EQ(parser.getFrameLength(), 0); + EXPECT_EQ(parser.getFrameLengthAndFieldSize(), 0); + EXPECT_EQ(owner.memoryCounter_, 0); + EXPECT_EQ(owner.frames_.size(), 1); +} + +} // namespace rocket +} // namespace thrift +} // namespace apache diff --git a/thrift/lib/cpp2/transport/rocket/framing/parser/test/ParserStrategyTest.cpp b/thrift/lib/cpp2/transport/rocket/framing/parser/test/ParserStrategyTest.cpp new file mode 100644 index 00000000000..f68eccb7f38 --- /dev/null +++ b/thrift/lib/cpp2/transport/rocket/framing/parser/test/ParserStrategyTest.cpp @@ -0,0 +1,88 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +namespace apache { +namespace thrift { +namespace rocket { + +class FakeOwner { + public: + void handleFrame(std::unique_ptr buf) { + frames_.push_back(std::move(buf)); + } + bool incMemoryUsage(uint32_t n) { + memoryCounter_ += n; + return true; + } + void decMemoryUsage(uint32_t n) { memoryCounter_ -= n; } + + std::unique_ptr customAlloc(size_t) { return nullptr; } + + std::vector> frames_{}; + + uint32_t memoryCounter_ = 0; +}; + +TEST(ParserTest, testAppendFrame) { + FakeOwner owner; + ParserStrategy parser(owner); + + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(23); + + EXPECT_EQ(owner.memoryCounter_, 0); + EXPECT_EQ(owner.frames_.size(), 1); + + auto frame = std::move(owner.frames_[0]); + EXPECT_EQ(frame->length(), 20); +} + +TEST(ParserTest, testAppendLessThanFullFrame) { + FakeOwner owner; + ParserStrategy parser(owner); + + void* buf; + size_t lenReturn; + parser.getReadBuffer(&buf, &lenReturn); + + HeaderSerializer serializer(static_cast(buf), lenReturn); + serializer.writeFrameOrMetadataSize(20); + std::string b(20, 'b'); + memcpy(&static_cast(buf)[3], b.data(), 20); + + parser.readDataAvailable(10); + + EXPECT_EQ(owner.memoryCounter_, 23); + EXPECT_EQ(owner.frames_.size(), 0); +} + +} // namespace rocket +} // namespace thrift +} // namespace apache