From 9612a888d8c542b1d5148435d0e2a1b08b76b14f Mon Sep 17 00:00:00 2001 From: Raven Black Date: Wed, 13 Apr 2022 21:18:00 +0000 Subject: [PATCH 1/8] Add buffer fragment class Signed-off-by: Raven Black --- CODEOWNERS | 3 +- .../filters/http/file_system_buffer/BUILD | 18 ++ .../http/file_system_buffer/fragment.cc | 136 +++++++++++ .../http/file_system_buffer/fragment.h | 102 +++++++++ .../filters/http/file_system_buffer/BUILD | 21 ++ .../http/file_system_buffer/fragment_test.cc | 212 ++++++++++++++++++ 6 files changed, 491 insertions(+), 1 deletion(-) create mode 100644 source/extensions/filters/http/file_system_buffer/BUILD create mode 100644 source/extensions/filters/http/file_system_buffer/fragment.cc create mode 100644 source/extensions/filters/http/file_system_buffer/fragment.h create mode 100644 test/extensions/filters/http/file_system_buffer/BUILD create mode 100644 test/extensions/filters/http/file_system_buffer/fragment_test.cc diff --git a/CODEOWNERS b/CODEOWNERS index 5e900f8c4d4b4..94868ef70f59f 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -187,8 +187,9 @@ extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp /*/extensions/key_value @alyssawilk @ryantheoptimist # Config Validators /*/extensions/config/validators/minimum_clusters @adisuissa @htuch -# support library for filesystem based extensions +# filesystem based extensions /*/extensions/common/async_files @mattklein123 @ravenblack +/*/extensions/filters/http/file_system_buffer @mattklein123 @ravenblack # Google Cloud Platform Authentication Filter /*/extensions/filters/http/gcp_authn @tyxia @yanavlasov # DNS resolution diff --git a/source/extensions/filters/http/file_system_buffer/BUILD b/source/extensions/filters/http/file_system_buffer/BUILD new file mode 100644 index 0000000000000..d7938ea999304 --- /dev/null +++ b/source/extensions/filters/http/file_system_buffer/BUILD @@ -0,0 +1,18 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_extension_package() + +envoy_cc_library( + name = "fragment", + srcs = ["fragment.cc"], + hdrs = ["fragment.h"], + deps = [ + "//source/extensions/common/async_files", + ], +) diff --git a/source/extensions/filters/http/file_system_buffer/fragment.cc b/source/extensions/filters/http/file_system_buffer/fragment.cc new file mode 100644 index 0000000000000..cd9cfa43d2873 --- /dev/null +++ b/source/extensions/filters/http/file_system_buffer/fragment.cc @@ -0,0 +1,136 @@ +#include "source/extensions/filters/http/file_system_buffer/fragment.h" + +#include "source/common/buffer/buffer_impl.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace FileSystemBuffer { + +class FragmentData { +public: + virtual bool isMemory() const { return false; } + virtual bool isStorage() const { return false; } + virtual ~FragmentData() = default; +}; + +class MemoryFragment : public FragmentData { +public: + explicit MemoryFragment(Buffer::Instance& buffer); // NOLINT(runtime/references) + explicit MemoryFragment(Buffer::Instance& buffer, + size_t size); // NOLINT(runtime/references) + std::unique_ptr extract(); + bool isMemory() const override { return true; } + +private: + std::unique_ptr buffer_; +}; + +class WritingFragment : public FragmentData {}; + +class ReadingFragment : public FragmentData {}; + +class StorageFragment : public FragmentData { +public: + explicit StorageFragment(off_t offset) : offset_(offset) {} + off_t offset() const { return offset_; } + bool isStorage() const override { return true; } + +private: + const off_t offset_; +}; + +Fragment::Fragment(Buffer::Instance& buffer) // NOLINT(runtime/references) + : size_(buffer.length()), data_(std::make_unique(buffer)) {} + +Fragment::Fragment(Buffer::Instance& buffer, size_t size) // NOLINT(runtime/references) + : size_(size), data_(std::make_unique(buffer, size)) {} + +Fragment::~Fragment() = default; +bool Fragment::isMemory() const { return data_->isMemory(); } +bool Fragment::isStorage() const { return data_->isStorage(); } + +MemoryFragment::MemoryFragment(Buffer::Instance& buffer) // NOLINT(runtime/references) + : buffer_(std::make_unique()) { + buffer_->move(buffer); +} + +MemoryFragment::MemoryFragment(Buffer::Instance& buffer, + size_t size) // NOLINT(runtime/references) + : buffer_(std::make_unique()) { + buffer_->move(buffer, size); +} + +std::unique_ptr Fragment::extract() { + ASSERT(isMemory()); + auto ret = dynamic_cast(data_.get())->extract(); + data_.reset(); + size_ = 0; + return ret; +} + +std::unique_ptr MemoryFragment::extract() { return std::move(buffer_); } + +absl::StatusOr +Fragment::toStorage(AsyncFileHandle file, off_t offset, + std::function)> on_done) { + ASSERT(isMemory()); + auto data = dynamic_cast(data_.get())->extract(); + data_ = std::make_unique(); + return file->write(*data, offset, + [fragment = this, size = size_, offset, + on_done = std::move(on_done)](absl::StatusOr result) { + // This lambda always runs in the async file thread, so must not use the + // fragment directly as it may have been destroyed. It is passed through to + // call from on_done's callback, where it can be used again as that function + // is called from the envoy thread only if the filter has *not* been + // destroyed in the meantime. + if (!result.ok()) { + on_done(result.status()); + return; + } + if (result.value() != size) { + on_done(absl::AbortedError(fmt::format( + "buffer write wrote {} bytes, wanted {}", result.value(), size))); + return; + } + on_done([fragment, offset]() { + fragment->data_ = std::make_unique(offset); + }); + }); +} + +absl::StatusOr +Fragment::fromStorage(AsyncFileHandle file, + std::function)> on_done) { + ASSERT(isStorage()); + off_t offset = dynamic_cast(data_.get())->offset(); + data_ = std::make_unique(); + return file->read( + offset, size_, + [fragment = this, size = size_, + on_done](absl::StatusOr> result) { + // This lambda always runs in the async file thread, so must not use the + // fragment directly as it may have been destroyed. It is passed through to + // call from on_done's callback, where it can be used again as that function + // is called from the envoy thread only if the filter has *not* been destroyed + // in the meantime. + if (!result.ok()) { + on_done(result.status()); + return; + } + if (result.value()->length() != size) { + on_done(absl::AbortedError( + fmt::format("buffer read got {} bytes, wanted {}", result.value()->length(), size))); + return; + } + on_done([fragment, data = std::shared_ptr(result.value().release())]() { + fragment->data_ = std::make_unique(*data); + }); + }); +} + +} // namespace FileSystemBuffer +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/file_system_buffer/fragment.h b/source/extensions/filters/http/file_system_buffer/fragment.h new file mode 100644 index 0000000000000..89b78300465a3 --- /dev/null +++ b/source/extensions/filters/http/file_system_buffer/fragment.h @@ -0,0 +1,102 @@ +#pragma once +#include +#include +#include + +#include "envoy/buffer/buffer.h" + +#include "source/extensions/common/async_files/async_file_handle.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace FileSystemBuffer { + +class FragmentData; +using UpdateFragmentFunction = std::function; +using Extensions::Common::AsyncFiles::AsyncFileHandle; +using Extensions::Common::AsyncFiles::CancelFunction; + +// A Fragment is a piece of the buffer queue used by the filter. +// +// Each fragment may be in memory, on disk, or in an unusable intermediate state +// while controlled by the AsyncFiles library. +// +// The fragments are queued in sequential order, and may not be in the same order +// in the buffer file, as we write "last fragment first" so that the soonest-needed +// fragments remain in memory as long as possible. +// +// Memory fragments are simply `Buffer::Instance`s. Storage fragments contain just +// enough information to reload a memory fragment from the buffer file. +class Fragment { +public: + explicit Fragment(Buffer::Instance& buffer); + explicit Fragment(Buffer::Instance& buffer, size_t size); + ~Fragment(); + + // Starts the transition for this fragment from memory to storage. + // + // The on_done callback is called when the file write completes. + // + // The callback includes its own callback parameter update_fragment (on success), + // so that the operation of updating the fragment can be dispatched to be performed + // in the envoy worker thread, rather than being performed on the callback thread. + // (This is important to allow for proper handling if the context was deleted while + // the operation was in flight, for example.) + // i.e. typical usage resembles: + // auto result = fragment.toStorage(file_handle, offset, + // [this](absl::StatusOr status_or_update_fragment) { + // if (status_or_update_fragment.ok()) { + // auto update_fragment = status_or_update_fragment.value(); + // dispatcher->dispatch([this, update_fragment]() { + // update_fragment(); + // // do more stuff with the fragment, or in reaction to it being updated. + // }); + // } else { + // dispatcher->dispatch([this, status = status_or_update_fragment.status()]() { + // // do stuff in response to the error. + // }); + // } + // }); + absl::StatusOr + toStorage(AsyncFileHandle file, off_t offset, + std::function)> on_done); + + // Starts the transition for this fragment from storage to memory. + // + // The on_done callback is called when the file read completes. + // + // The callback includes its own callback parameter update_fragment (on success), + // so that the operation of updating the fragment can be dispatched to be performed + // in the envoy worker thread, rather than being performed on the callback thread. + // (This is important to allow for proper handling if the context was deleted while + // the operation was in flight, for example.) + // + // See toStorage for example usage. + absl::StatusOr + fromStorage(AsyncFileHandle file, + std::function)> on_done); + + // Removes the buffer from a memory instance and resets it to size 0. + // + // It is an error to call extract() on a Fragment that is not in memory. + Buffer::InstancePtr extract(); + + // Returns true if the fragment is in memory, false if in storage or transition. + bool isMemory() const; + + // Returns true if the fragment is in storage, false if in memory or transition. + bool isStorage() const; + + // Returns the size of the fragment in bytes. + size_t size() const { return size_; } + +private: + size_t size_; + std::unique_ptr data_; +}; + +} // namespace FileSystemBuffer +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/file_system_buffer/BUILD b/test/extensions/filters/http/file_system_buffer/BUILD new file mode 100644 index 0000000000000..4a6468bd3b837 --- /dev/null +++ b/test/extensions/filters/http/file_system_buffer/BUILD @@ -0,0 +1,21 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_test( + name = "fragment_test", + srcs = ["fragment_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//source/extensions/filters/http/file_system_buffer:fragment", + "//test/extensions/common/async_files:mocks", + "//test/mocks/buffer:buffer_mocks", + "//test/test_common:status_utility_lib", + ], +) diff --git a/test/extensions/filters/http/file_system_buffer/fragment_test.cc b/test/extensions/filters/http/file_system_buffer/fragment_test.cc new file mode 100644 index 0000000000000..ffb39c4b2799b --- /dev/null +++ b/test/extensions/filters/http/file_system_buffer/fragment_test.cc @@ -0,0 +1,212 @@ +#include +#include +#include + +#include "source/extensions/filters/http/file_system_buffer/fragment.h" + +#include "test/extensions/common/async_files/mocks.h" +#include "test/mocks/buffer/mocks.h" +#include "test/test_common/status_utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace FileSystemBuffer { + +// TODO: using StatusHelpers::HasStatus; +// TODO: using StatusHelpers::HasStatusMessage; +using Extensions::Common::AsyncFiles::CancelFunction; +using Extensions::Common::AsyncFiles::MockAsyncFileContext; +using Extensions::Common::AsyncFiles::MockAsyncFileHandle; +using StatusHelpers::IsOk; +using ::testing::_; +using ::testing::Eq; +using ::testing::HasSubstr; +using ::testing::StrictMock; + +std::function>)> storageSuccessCallback() { + return [](absl::StatusOr> status_or_callback) { + ASSERT_OK(status_or_callback); + auto& inner_callback = status_or_callback.value(); + inner_callback(); + }; +} + +template +std::function>)> +storageFailureCallback(MatcherT matcher) { + return [matcher](absl::StatusOr> status_or_callback) { + EXPECT_THAT(status_or_callback.status(), matcher); + }; +} + +class FileSystemBufferFilterFragmentTest : public ::testing::Test { +public: +protected: + MockAsyncFileHandle handle_ = std::make_shared>(); + + void moveFragmentToStorage(Fragment* frag) { + EXPECT_CALL(*handle_, write(_, _, _)) + .WillOnce( + [frag](Buffer::Instance&, off_t, std::function)> callback) { + callback(frag->size()); + return []() {}; + }); + EXPECT_OK(frag->toStorage(handle_, 123, storageSuccessCallback())); + } +}; + +TEST_F(FileSystemBufferFilterFragmentTest, CreatesAndExtractsWithoutCopying) { + Buffer::OwnedImpl input("hello"); + void* original_address = input.frontSlice().mem_; + Fragment frag(input); + EXPECT_TRUE(frag.isMemory()); + EXPECT_FALSE(frag.isStorage()); + EXPECT_EQ(frag.size(), 5); + auto out = frag.extract(); + EXPECT_EQ(out->toString(), "hello"); + EXPECT_EQ(out->frontSlice().mem_, original_address); +} + +TEST_F(FileSystemBufferFilterFragmentTest, CreatesFragmentFromPartialBufferAndConsumes) { + Buffer::OwnedImpl input("hello"); + Fragment frag(input, 3); + EXPECT_TRUE(frag.isMemory()); + EXPECT_FALSE(frag.isStorage()); + EXPECT_EQ(frag.size(), 3); + auto out = frag.extract(); + EXPECT_EQ(out->toString(), "hel"); + EXPECT_EQ(input.toString(), "lo"); +} + +TEST_F(FileSystemBufferFilterFragmentTest, WritesAndReadsBack) { + Buffer::OwnedImpl input("hello"); + Fragment frag(input); + std::function)> captured_write_callback; + EXPECT_CALL(*handle_, write(BufferStringEqual("hello"), 123, _)) + .WillOnce([&captured_write_callback](Buffer::Instance&, off_t, + std::function)> callback) { + captured_write_callback = std::move(callback); + return []() {}; + }); + // Request the fragment be moved to storage. + EXPECT_OK(frag.toStorage(handle_, 123, storageSuccessCallback())); + // Before the file confirms written, the state should be neither in memory nor in storage. + EXPECT_FALSE(frag.isMemory()); + EXPECT_FALSE(frag.isStorage()); + // Fake the file thread confirming 5 bytes were written. + captured_write_callback(5); + // Now the fragment should be tagged as being in storage. + EXPECT_TRUE(frag.isStorage()); + EXPECT_FALSE(frag.isMemory()); + std::function>)> captured_read_callback; + EXPECT_CALL(*handle_, read(123, 5, _)) + .WillOnce( + [&captured_read_callback]( + off_t, size_t, + std::function>)> callback) { + captured_read_callback = std::move(callback); + return []() {}; + }); + // Request the fragment be moved from storage. + EXPECT_OK(frag.fromStorage(handle_, storageSuccessCallback())); + // Before the file confirms read, the state should be neither in memory nor storage. + EXPECT_FALSE(frag.isMemory()); + EXPECT_FALSE(frag.isStorage()); + // Fake the file thread completing read. + captured_read_callback(std::make_unique("hello")); + // Now the fragment should be tagged as being in memory. + EXPECT_TRUE(frag.isMemory()); + EXPECT_FALSE(frag.isStorage()); + // The data extracted from the fragment should be the same as what was read. + auto out = frag.extract(); + EXPECT_EQ(out->toString(), "hello"); +} + +TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnWriteError) { + Buffer::OwnedImpl input("hello"); + Fragment frag(input); + auto write_error = absl::UnknownError("write error"); + std::function)> captured_write_callback; + EXPECT_CALL(*handle_, write(BufferStringEqual("hello"), 123, _)) + .WillOnce([&captured_write_callback](Buffer::Instance&, off_t, + std::function)> callback) { + captured_write_callback = std::move(callback); + return []() {}; + }); + // Request the fragment be moved to storage. + EXPECT_OK(frag.toStorage(handle_, 123, storageFailureCallback(Eq(write_error)))); + + // Fake file system declares a write error. This should + // provoke the expected error in the callback above. + captured_write_callback(write_error); +} + +TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnWriteIncomplete) { + Buffer::OwnedImpl input("hello"); + Fragment frag(input); + std::function)> captured_write_callback; + EXPECT_CALL(*handle_, write(BufferStringEqual("hello"), 123, _)) + .WillOnce([&captured_write_callback](Buffer::Instance&, off_t, + std::function)> callback) { + captured_write_callback = std::move(callback); + return []() {}; + }); + // Request the fragment be moved to storage. + EXPECT_OK(frag.toStorage(handle_, 123, storageFailureCallback(Not(IsOk())))); + // TODO: EXPECT_CALL(callback, Call(HasStatusMessage(HasSubstr("wrote 2 bytes, wanted 5")))); + + // Fake file says it wrote 2 bytes when the fragment was of size 5 - this should + // provoke the expected error in the callback above. + captured_write_callback(2); +} + +TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnReadError) { + Buffer::OwnedImpl input("hello"); + Fragment frag(input); + moveFragmentToStorage(&frag); + auto read_error = absl::UnknownError("read error"); + std::function>)> captured_read_callback; + EXPECT_CALL(*handle_, read(123, 5, _)) + .WillOnce( + [&captured_read_callback]( + off_t, size_t, + std::function>)> callback) { + captured_read_callback = std::move(callback); + return []() {}; + }); + // Request the fragment be moved from storage. + EXPECT_OK(frag.fromStorage(handle_, storageFailureCallback(Eq(read_error)))); + // Fake file system declares a read error. This should + // provoke the expected error in the callback above. + captured_read_callback(read_error); +} + +TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnReadIncomplete) { + Buffer::OwnedImpl input("hello"); + Fragment frag(input); + moveFragmentToStorage(&frag); + std::function>)> captured_read_callback; + EXPECT_CALL(*handle_, read(123, 5, _)) + .WillOnce( + [&captured_read_callback]( + off_t, size_t, + std::function>)> callback) { + captured_read_callback = std::move(callback); + return []() {}; + }); + // Request the fragment be moved from storage. + EXPECT_OK(frag.fromStorage(handle_, storageFailureCallback(Not(IsOk())))); + // TODO: EXPECT_CALL(callback, Call(HasStatusMessage(HasSubstr("read got 2 bytes, wanted 5")))); + + // Fake file system declares a read error. This should + // provoke the expected error in the callback above. + captured_read_callback(std::make_unique("he")); +} + +} // namespace FileSystemBuffer +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file From 8db5fbe2d4ee06afe3bc9c626e17974480ea1fc1 Mon Sep 17 00:00:00 2001 From: Raven Black Date: Fri, 22 Apr 2022 19:02:12 +0000 Subject: [PATCH 2/8] Use status matchers Signed-off-by: Raven Black --- .../filters/http/file_system_buffer/fragment_test.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/extensions/filters/http/file_system_buffer/fragment_test.cc b/test/extensions/filters/http/file_system_buffer/fragment_test.cc index ffb39c4b2799b..6425853cc1002 100644 --- a/test/extensions/filters/http/file_system_buffer/fragment_test.cc +++ b/test/extensions/filters/http/file_system_buffer/fragment_test.cc @@ -15,11 +15,10 @@ namespace Extensions { namespace HttpFilters { namespace FileSystemBuffer { -// TODO: using StatusHelpers::HasStatus; -// TODO: using StatusHelpers::HasStatusMessage; using Extensions::Common::AsyncFiles::CancelFunction; using Extensions::Common::AsyncFiles::MockAsyncFileContext; using Extensions::Common::AsyncFiles::MockAsyncFileHandle; +using StatusHelpers::HasStatusMessage; using StatusHelpers::IsOk; using ::testing::_; using ::testing::Eq; @@ -155,8 +154,9 @@ TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnWriteIncomplete) { return []() {}; }); // Request the fragment be moved to storage. - EXPECT_OK(frag.toStorage(handle_, 123, storageFailureCallback(Not(IsOk())))); - // TODO: EXPECT_CALL(callback, Call(HasStatusMessage(HasSubstr("wrote 2 bytes, wanted 5")))); + EXPECT_OK(frag.toStorage( + handle_, 123, + storageFailureCallback(HasStatusMessage(HasSubstr("wrote 2 bytes, wanted 5"))))); // Fake file says it wrote 2 bytes when the fragment was of size 5 - this should // provoke the expected error in the callback above. @@ -198,8 +198,8 @@ TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnReadIncomplete) { return []() {}; }); // Request the fragment be moved from storage. - EXPECT_OK(frag.fromStorage(handle_, storageFailureCallback(Not(IsOk())))); - // TODO: EXPECT_CALL(callback, Call(HasStatusMessage(HasSubstr("read got 2 bytes, wanted 5")))); + EXPECT_OK(frag.fromStorage( + handle_, storageFailureCallback(HasStatusMessage(HasSubstr("read got 2 bytes, wanted 5"))))); // Fake file system declares a read error. This should // provoke the expected error in the callback above. From 9cdad8acf424b7c82957a4d517b0b68dbaaa7d13 Mon Sep 17 00:00:00 2001 From: Raven Black Date: Fri, 22 Apr 2022 19:25:08 +0000 Subject: [PATCH 3/8] Add trailing newline Signed-off-by: Raven Black --- .../extensions/filters/http/file_system_buffer/fragment_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/extensions/filters/http/file_system_buffer/fragment_test.cc b/test/extensions/filters/http/file_system_buffer/fragment_test.cc index 6425853cc1002..7b353ddef574d 100644 --- a/test/extensions/filters/http/file_system_buffer/fragment_test.cc +++ b/test/extensions/filters/http/file_system_buffer/fragment_test.cc @@ -209,4 +209,4 @@ TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnReadIncomplete) { } // namespace FileSystemBuffer } // namespace HttpFilters } // namespace Extensions -} // namespace Envoy \ No newline at end of file +} // namespace Envoy From 4105c457812f3eee20a9b809c9c76313e338237a Mon Sep 17 00:00:00 2001 From: Raven Black Date: Fri, 22 Apr 2022 20:34:46 +0000 Subject: [PATCH 4/8] Remove outdated NOLINTs, case-and-space CODEOWNERS comment Signed-off-by: Raven Black --- CODEOWNERS | 2 +- .../filters/http/file_system_buffer/fragment.cc | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/CODEOWNERS b/CODEOWNERS index 94868ef70f59f..081213ee82076 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -187,7 +187,7 @@ extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp /*/extensions/key_value @alyssawilk @ryantheoptimist # Config Validators /*/extensions/config/validators/minimum_clusters @adisuissa @htuch -# filesystem based extensions +# File system based extensions /*/extensions/common/async_files @mattklein123 @ravenblack /*/extensions/filters/http/file_system_buffer @mattklein123 @ravenblack # Google Cloud Platform Authentication Filter diff --git a/source/extensions/filters/http/file_system_buffer/fragment.cc b/source/extensions/filters/http/file_system_buffer/fragment.cc index cd9cfa43d2873..e4b53ad42ec70 100644 --- a/source/extensions/filters/http/file_system_buffer/fragment.cc +++ b/source/extensions/filters/http/file_system_buffer/fragment.cc @@ -16,9 +16,8 @@ class FragmentData { class MemoryFragment : public FragmentData { public: - explicit MemoryFragment(Buffer::Instance& buffer); // NOLINT(runtime/references) - explicit MemoryFragment(Buffer::Instance& buffer, - size_t size); // NOLINT(runtime/references) + explicit MemoryFragment(Buffer::Instance& buffer); + explicit MemoryFragment(Buffer::Instance& buffer, size_t size); std::unique_ptr extract(); bool isMemory() const override { return true; } @@ -40,23 +39,22 @@ class StorageFragment : public FragmentData { const off_t offset_; }; -Fragment::Fragment(Buffer::Instance& buffer) // NOLINT(runtime/references) +Fragment::Fragment(Buffer::Instance& buffer) : size_(buffer.length()), data_(std::make_unique(buffer)) {} -Fragment::Fragment(Buffer::Instance& buffer, size_t size) // NOLINT(runtime/references) +Fragment::Fragment(Buffer::Instance& buffer, size_t size) : size_(size), data_(std::make_unique(buffer, size)) {} Fragment::~Fragment() = default; bool Fragment::isMemory() const { return data_->isMemory(); } bool Fragment::isStorage() const { return data_->isStorage(); } -MemoryFragment::MemoryFragment(Buffer::Instance& buffer) // NOLINT(runtime/references) +MemoryFragment::MemoryFragment(Buffer::Instance& buffer) : buffer_(std::make_unique()) { buffer_->move(buffer); } -MemoryFragment::MemoryFragment(Buffer::Instance& buffer, - size_t size) // NOLINT(runtime/references) +MemoryFragment::MemoryFragment(Buffer::Instance& buffer, size_t size) : buffer_(std::make_unique()) { buffer_->move(buffer, size); } From 70fae07c7fc5fec575e80c9e415554a3e61a7f7d Mon Sep 17 00:00:00 2001 From: Raven Black Date: Tue, 26 Apr 2022 15:22:58 +0000 Subject: [PATCH 5/8] Use variant rather than pointer, take dispatch function rather than requiring caller callback call a callback Signed-off-by: Raven Black --- .../http/file_system_buffer/fragment.cc | 131 +++++++----------- .../http/file_system_buffer/fragment.h | 74 +++++----- .../http/file_system_buffer/fragment_test.cc | 34 +++-- 3 files changed, 99 insertions(+), 140 deletions(-) diff --git a/source/extensions/filters/http/file_system_buffer/fragment.cc b/source/extensions/filters/http/file_system_buffer/fragment.cc index e4b53ad42ec70..3f924f905b839 100644 --- a/source/extensions/filters/http/file_system_buffer/fragment.cc +++ b/source/extensions/filters/http/file_system_buffer/fragment.cc @@ -7,47 +7,15 @@ namespace Extensions { namespace HttpFilters { namespace FileSystemBuffer { -class FragmentData { -public: - virtual bool isMemory() const { return false; } - virtual bool isStorage() const { return false; } - virtual ~FragmentData() = default; -}; - -class MemoryFragment : public FragmentData { -public: - explicit MemoryFragment(Buffer::Instance& buffer); - explicit MemoryFragment(Buffer::Instance& buffer, size_t size); - std::unique_ptr extract(); - bool isMemory() const override { return true; } - -private: - std::unique_ptr buffer_; -}; - -class WritingFragment : public FragmentData {}; - -class ReadingFragment : public FragmentData {}; - -class StorageFragment : public FragmentData { -public: - explicit StorageFragment(off_t offset) : offset_(offset) {} - off_t offset() const { return offset_; } - bool isStorage() const override { return true; } - -private: - const off_t offset_; -}; - Fragment::Fragment(Buffer::Instance& buffer) - : size_(buffer.length()), data_(std::make_unique(buffer)) {} + : size_(buffer.length()), data_(MemoryFragment(buffer)) {} Fragment::Fragment(Buffer::Instance& buffer, size_t size) - : size_(size), data_(std::make_unique(buffer, size)) {} + : size_(size), data_(MemoryFragment(buffer, size)) {} Fragment::~Fragment() = default; -bool Fragment::isMemory() const { return data_->isMemory(); } -bool Fragment::isStorage() const { return data_->isStorage(); } +bool Fragment::isMemory() const { return absl::holds_alternative(data_); } +bool Fragment::isStorage() const { return absl::holds_alternative(data_); } MemoryFragment::MemoryFragment(Buffer::Instance& buffer) : buffer_(std::make_unique()) { @@ -60,9 +28,7 @@ MemoryFragment::MemoryFragment(Buffer::Instance& buffer, size_t size) } std::unique_ptr Fragment::extract() { - ASSERT(isMemory()); - auto ret = dynamic_cast(data_.get())->extract(); - data_.reset(); + auto ret = absl::get(data_).extract(); size_ = 0; return ret; } @@ -71,60 +37,57 @@ std::unique_ptr MemoryFragment::extract() { return std::move(b absl::StatusOr Fragment::toStorage(AsyncFileHandle file, off_t offset, - std::function)> on_done) { + std::function)> dispatch, + std::function on_done) { ASSERT(isMemory()); - auto data = dynamic_cast(data_.get())->extract(); - data_ = std::make_unique(); - return file->write(*data, offset, - [fragment = this, size = size_, offset, - on_done = std::move(on_done)](absl::StatusOr result) { - // This lambda always runs in the async file thread, so must not use the - // fragment directly as it may have been destroyed. It is passed through to - // call from on_done's callback, where it can be used again as that function - // is called from the envoy thread only if the filter has *not* been - // destroyed in the meantime. - if (!result.ok()) { - on_done(result.status()); - return; - } - if (result.value() != size) { - on_done(absl::AbortedError(fmt::format( - "buffer write wrote {} bytes, wanted {}", result.value(), size))); - return; - } - on_done([fragment, offset]() { - fragment->data_ = std::make_unique(offset); - }); - }); + auto data = absl::get(data_).extract(); + data_.emplace(); + return file->write( + *data, offset, + [this, dispatch = std::move(dispatch), size = size_, on_done = std::move(on_done), + offset](absl::StatusOr result) { + // size is captured because we can't safely use 'this' until we're in the dispatch callback. + if (!result.ok()) { + dispatch([status = result.status(), on_done = std::move(on_done)]() { on_done(status); }); + } else if (result.value() != size) { + auto status = absl::AbortedError( + fmt::format("buffer write wrote {} bytes, wanted {}", result.value(), size)); + dispatch( + [on_done = std::move(on_done), status = std::move(status)]() { on_done(status); }); + } else { + dispatch([this, status = result.status(), offset, on_done = std::move(on_done)] { + data_.emplace(offset); + on_done(absl::OkStatus()); + }); + } + }); } absl::StatusOr -Fragment::fromStorage(AsyncFileHandle file, - std::function)> on_done) { +Fragment::fromStorage(AsyncFileHandle file, std::function)> dispatch, + std::function on_done) { ASSERT(isStorage()); - off_t offset = dynamic_cast(data_.get())->offset(); - data_ = std::make_unique(); + off_t offset = absl::get(data_).offset(); + data_.emplace(); return file->read( offset, size_, - [fragment = this, size = size_, - on_done](absl::StatusOr> result) { - // This lambda always runs in the async file thread, so must not use the - // fragment directly as it may have been destroyed. It is passed through to - // call from on_done's callback, where it can be used again as that function - // is called from the envoy thread only if the filter has *not* been destroyed - // in the meantime. + [this, dispatch = std::move(dispatch), size = size_, + on_done = std::move(on_done)](absl::StatusOr> result) { + // size is captured because we can't safely use 'this' until we're in the dispatch callback. if (!result.ok()) { - on_done(result.status()); - return; - } - if (result.value()->length() != size) { - on_done(absl::AbortedError( - fmt::format("buffer read got {} bytes, wanted {}", result.value()->length(), size))); - return; + dispatch([on_done = std::move(on_done), status = result.status()]() { on_done(status); }); + } else if (result.value()->length() != size) { + auto status = absl::AbortedError( + fmt::format("buffer read got {} bytes, wanted {}", result.value()->length(), size)); + dispatch( + [on_done = std::move(on_done), status = std::move(status)]() { on_done(status); }); + } else { + dispatch([this, on_done = std::move(on_done), + data = std::shared_ptr(result.value().release())]() { + data_.emplace(*data); + on_done(absl::OkStatus()); + }); } - on_done([fragment, data = std::shared_ptr(result.value().release())]() { - fragment->data_ = std::make_unique(*data); - }); }); } diff --git a/source/extensions/filters/http/file_system_buffer/fragment.h b/source/extensions/filters/http/file_system_buffer/fragment.h index 89b78300465a3..fc8392051a91e 100644 --- a/source/extensions/filters/http/file_system_buffer/fragment.h +++ b/source/extensions/filters/http/file_system_buffer/fragment.h @@ -12,11 +12,32 @@ namespace Extensions { namespace HttpFilters { namespace FileSystemBuffer { -class FragmentData; -using UpdateFragmentFunction = std::function; using Extensions::Common::AsyncFiles::AsyncFileHandle; using Extensions::Common::AsyncFiles::CancelFunction; +class MemoryFragment { +public: + explicit MemoryFragment(Buffer::Instance& buffer); + explicit MemoryFragment(Buffer::Instance& buffer, size_t size); + std::unique_ptr extract(); + +private: + std::unique_ptr buffer_; +}; + +class WritingFragment {}; + +class ReadingFragment {}; + +class StorageFragment { +public: + explicit StorageFragment(off_t offset) : offset_(offset) {} + off_t offset() const { return offset_; } + +private: + const off_t offset_; +}; + // A Fragment is a piece of the buffer queue used by the filter. // // Each fragment may be in memory, on disk, or in an unusable intermediate state @@ -36,46 +57,23 @@ class Fragment { // Starts the transition for this fragment from memory to storage. // - // The on_done callback is called when the file write completes. + // The on_done callback is sent to the dispatcher function after the file write completes. // - // The callback includes its own callback parameter update_fragment (on success), - // so that the operation of updating the fragment can be dispatched to be performed - // in the envoy worker thread, rather than being performed on the callback thread. - // (This is important to allow for proper handling if the context was deleted while - // the operation was in flight, for example.) - // i.e. typical usage resembles: - // auto result = fragment.toStorage(file_handle, offset, - // [this](absl::StatusOr status_or_update_fragment) { - // if (status_or_update_fragment.ok()) { - // auto update_fragment = status_or_update_fragment.value(); - // dispatcher->dispatch([this, update_fragment]() { - // update_fragment(); - // // do more stuff with the fragment, or in reaction to it being updated. - // }); - // } else { - // dispatcher->dispatch([this, status = status_or_update_fragment.status()]() { - // // do stuff in response to the error. - // }); - // } - // }); - absl::StatusOr - toStorage(AsyncFileHandle file, off_t offset, - std::function)> on_done); + // When called from a filter, the dispatcher function must abort without calling the + // callback if the filter or fragment has been destroyed. + absl::StatusOr toStorage(AsyncFileHandle file, off_t offset, + std::function)> dispatch, + std::function on_done); // Starts the transition for this fragment from storage to memory. // - // The on_done callback is called when the file read completes. - // - // The callback includes its own callback parameter update_fragment (on success), - // so that the operation of updating the fragment can be dispatched to be performed - // in the envoy worker thread, rather than being performed on the callback thread. - // (This is important to allow for proper handling if the context was deleted while - // the operation was in flight, for example.) + // The on_done callback is sent to the dispatcher function after the file read completes. // - // See toStorage for example usage. - absl::StatusOr - fromStorage(AsyncFileHandle file, - std::function)> on_done); + // When called from a filter, the dispatcher function must abort without calling the + // callback if the filter or fragment has been destroyed. + absl::StatusOr fromStorage(AsyncFileHandle file, + std::function)> dispatch, + std::function on_done); // Removes the buffer from a memory instance and resets it to size 0. // @@ -93,7 +91,7 @@ class Fragment { private: size_t size_; - std::unique_ptr data_; + absl::variant data_; }; } // namespace FileSystemBuffer diff --git a/test/extensions/filters/http/file_system_buffer/fragment_test.cc b/test/extensions/filters/http/file_system_buffer/fragment_test.cc index 7b353ddef574d..0079da849557f 100644 --- a/test/extensions/filters/http/file_system_buffer/fragment_test.cc +++ b/test/extensions/filters/http/file_system_buffer/fragment_test.cc @@ -25,22 +25,17 @@ using ::testing::Eq; using ::testing::HasSubstr; using ::testing::StrictMock; -std::function>)> storageSuccessCallback() { - return [](absl::StatusOr> status_or_callback) { - ASSERT_OK(status_or_callback); - auto& inner_callback = status_or_callback.value(); - inner_callback(); - }; +std::function storageSuccessCallback() { + return [](absl::Status status) { ASSERT_OK(status); }; } template -std::function>)> -storageFailureCallback(MatcherT matcher) { - return [matcher](absl::StatusOr> status_or_callback) { - EXPECT_THAT(status_or_callback.status(), matcher); - }; +std::function storageFailureCallback(MatcherT matcher) { + return [matcher](absl::Status status) { EXPECT_THAT(status, matcher); }; } +void dispatchImmediately(std::function callback) { callback(); } + class FileSystemBufferFilterFragmentTest : public ::testing::Test { public: protected: @@ -53,7 +48,7 @@ class FileSystemBufferFilterFragmentTest : public ::testing::Test { callback(frag->size()); return []() {}; }); - EXPECT_OK(frag->toStorage(handle_, 123, storageSuccessCallback())); + EXPECT_OK(frag->toStorage(handle_, 123, &dispatchImmediately, storageSuccessCallback())); } }; @@ -91,7 +86,7 @@ TEST_F(FileSystemBufferFilterFragmentTest, WritesAndReadsBack) { return []() {}; }); // Request the fragment be moved to storage. - EXPECT_OK(frag.toStorage(handle_, 123, storageSuccessCallback())); + EXPECT_OK(frag.toStorage(handle_, 123, &dispatchImmediately, storageSuccessCallback())); // Before the file confirms written, the state should be neither in memory nor in storage. EXPECT_FALSE(frag.isMemory()); EXPECT_FALSE(frag.isStorage()); @@ -110,7 +105,7 @@ TEST_F(FileSystemBufferFilterFragmentTest, WritesAndReadsBack) { return []() {}; }); // Request the fragment be moved from storage. - EXPECT_OK(frag.fromStorage(handle_, storageSuccessCallback())); + EXPECT_OK(frag.fromStorage(handle_, &dispatchImmediately, storageSuccessCallback())); // Before the file confirms read, the state should be neither in memory nor storage. EXPECT_FALSE(frag.isMemory()); EXPECT_FALSE(frag.isStorage()); @@ -136,7 +131,8 @@ TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnWriteError) { return []() {}; }); // Request the fragment be moved to storage. - EXPECT_OK(frag.toStorage(handle_, 123, storageFailureCallback(Eq(write_error)))); + EXPECT_OK( + frag.toStorage(handle_, 123, &dispatchImmediately, storageFailureCallback(Eq(write_error)))); // Fake file system declares a write error. This should // provoke the expected error in the callback above. @@ -155,7 +151,7 @@ TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnWriteIncomplete) { }); // Request the fragment be moved to storage. EXPECT_OK(frag.toStorage( - handle_, 123, + handle_, 123, &dispatchImmediately, storageFailureCallback(HasStatusMessage(HasSubstr("wrote 2 bytes, wanted 5"))))); // Fake file says it wrote 2 bytes when the fragment was of size 5 - this should @@ -178,7 +174,8 @@ TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnReadError) { return []() {}; }); // Request the fragment be moved from storage. - EXPECT_OK(frag.fromStorage(handle_, storageFailureCallback(Eq(read_error)))); + EXPECT_OK( + frag.fromStorage(handle_, &dispatchImmediately, storageFailureCallback(Eq(read_error)))); // Fake file system declares a read error. This should // provoke the expected error in the callback above. captured_read_callback(read_error); @@ -199,7 +196,8 @@ TEST_F(FileSystemBufferFilterFragmentTest, ReturnsErrorOnReadIncomplete) { }); // Request the fragment be moved from storage. EXPECT_OK(frag.fromStorage( - handle_, storageFailureCallback(HasStatusMessage(HasSubstr("read got 2 bytes, wanted 5"))))); + handle_, &dispatchImmediately, + storageFailureCallback(HasStatusMessage(HasSubstr("read got 2 bytes, wanted 5"))))); // Fake file system declares a read error. This should // provoke the expected error in the callback above. From 07758959268837cf36726de323fef790dc5d35ab Mon Sep 17 00:00:00 2001 From: Raven Black Date: Tue, 26 Apr 2022 15:56:20 +0000 Subject: [PATCH 6/8] Clarify and add comments Signed-off-by: Raven Black --- .../filters/http/file_system_buffer/fragment.h | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/source/extensions/filters/http/file_system_buffer/fragment.h b/source/extensions/filters/http/file_system_buffer/fragment.h index fc8392051a91e..afff5e63017d4 100644 --- a/source/extensions/filters/http/file_system_buffer/fragment.h +++ b/source/extensions/filters/http/file_system_buffer/fragment.h @@ -15,6 +15,8 @@ namespace FileSystemBuffer { using Extensions::Common::AsyncFiles::AsyncFileHandle; using Extensions::Common::AsyncFiles::CancelFunction; +// Internal implementation detail exposed for use in Fragment's variant. +// Represents a buffer fragment that is in memory. class MemoryFragment { public: explicit MemoryFragment(Buffer::Instance& buffer); @@ -25,10 +27,16 @@ class MemoryFragment { std::unique_ptr buffer_; }; +// Internal implementation detail exposed for use in Fragment's variant. +// Represents a buffer fragment that is being written to storage. class WritingFragment {}; +// Internal implementation detail exposed for use in Fragment's variant. +// Represents a buffer fragment that is being read from storage. class ReadingFragment {}; +// Internal implementation detail exposed for use in Fragment's variant. +// Represents a buffer fragment that is currently in storage. class StorageFragment { public: explicit StorageFragment(off_t offset) : offset_(offset) {} @@ -75,9 +83,11 @@ class Fragment { std::function)> dispatch, std::function on_done); - // Removes the buffer from a memory instance and resets it to size 0. + // Moves the buffer from a memory instance to the returned value and resets the fragment to + // size 0. // - // It is an error to call extract() on a Fragment that is not in memory. + // It is an error to call extract() on a Fragment that is not in memory - an exception + // will be thrown. Buffer::InstancePtr extract(); // Returns true if the fragment is in memory, false if in storage or transition. From 983459b623c484737829727b0350fa7b76041a6e Mon Sep 17 00:00:00 2001 From: Raven Black Date: Tue, 26 Apr 2022 18:57:09 +0000 Subject: [PATCH 7/8] Avoid upsetting clang-tidy by using std::move rather than release to convert unique_ptr to shared_ptr to satisfy lambda (!) Signed-off-by: Raven Black --- source/extensions/filters/http/file_system_buffer/fragment.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/extensions/filters/http/file_system_buffer/fragment.cc b/source/extensions/filters/http/file_system_buffer/fragment.cc index 3f924f905b839..57b16705fbda9 100644 --- a/source/extensions/filters/http/file_system_buffer/fragment.cc +++ b/source/extensions/filters/http/file_system_buffer/fragment.cc @@ -83,7 +83,7 @@ Fragment::fromStorage(AsyncFileHandle file, std::function(result.value().release())]() { + data = std::shared_ptr(std::move(result.value()))]() { data_.emplace(*data); on_done(absl::OkStatus()); }); From f2c7ffd76ed591a2ba59c7d3c9e0e49796323de3 Mon Sep 17 00:00:00 2001 From: Raven Black Date: Tue, 26 Apr 2022 19:12:46 +0000 Subject: [PATCH 8/8] make shared_ptr outside of lambda capture, to avoid scaring clangtidy Signed-off-by: Raven Black --- .../extensions/filters/http/file_system_buffer/fragment.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/extensions/filters/http/file_system_buffer/fragment.cc b/source/extensions/filters/http/file_system_buffer/fragment.cc index 57b16705fbda9..d2da3bbd1c879 100644 --- a/source/extensions/filters/http/file_system_buffer/fragment.cc +++ b/source/extensions/filters/http/file_system_buffer/fragment.cc @@ -82,9 +82,9 @@ Fragment::fromStorage(AsyncFileHandle file, std::function(std::move(result.value()))]() { - data_.emplace(*data); + auto buffer = std::shared_ptr(std::move(result.value())); + dispatch([this, on_done = std::move(on_done), buffer = std::move(buffer)]() { + data_.emplace(*buffer); on_done(absl::OkStatus()); }); }