Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp
/*/extensions/key_value @alyssawilk @ryantheoptimist
# Config Validators
/*/extensions/config/validators/minimum_clusters @adisuissa @htuch
# support library for filesystem based extensions
# File system based extensions
/*/extensions/common/async_files @mattklein123 @ravenblack
/*/extensions/filters/http/file_system_buffer @mattklein123 @ravenblack
# Google Cloud Platform Authentication Filter
/*/extensions/filters/http/gcp_authn @tyxia @yanavlasov
# DNS resolution
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/filters/http/file_system_buffer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "fragment",
srcs = ["fragment.cc"],
hdrs = ["fragment.h"],
deps = [
"//source/extensions/common/async_files",
],
)
97 changes: 97 additions & 0 deletions source/extensions/filters/http/file_system_buffer/fragment.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include "source/extensions/filters/http/file_system_buffer/fragment.h"

#include "source/common/buffer/buffer_impl.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace FileSystemBuffer {

Fragment::Fragment(Buffer::Instance& buffer)
: size_(buffer.length()), data_(MemoryFragment(buffer)) {}

Fragment::Fragment(Buffer::Instance& buffer, size_t size)
: size_(size), data_(MemoryFragment(buffer, size)) {}

Fragment::~Fragment() = default;
bool Fragment::isMemory() const { return absl::holds_alternative<MemoryFragment>(data_); }
bool Fragment::isStorage() const { return absl::holds_alternative<StorageFragment>(data_); }

MemoryFragment::MemoryFragment(Buffer::Instance& buffer)
: buffer_(std::make_unique<Buffer::OwnedImpl>()) {
buffer_->move(buffer);
}

MemoryFragment::MemoryFragment(Buffer::Instance& buffer, size_t size)
: buffer_(std::make_unique<Buffer::OwnedImpl>()) {
buffer_->move(buffer, size);
}

std::unique_ptr<Buffer::Instance> Fragment::extract() {
auto ret = absl::get<MemoryFragment>(data_).extract();
size_ = 0;
return ret;
}

std::unique_ptr<Buffer::Instance> MemoryFragment::extract() { return std::move(buffer_); }

absl::StatusOr<CancelFunction>
Fragment::toStorage(AsyncFileHandle file, off_t offset,
std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done) {
ASSERT(isMemory());
auto data = absl::get<MemoryFragment>(data_).extract();
data_.emplace<WritingFragment>();
return file->write(
*data, offset,
[this, dispatch = std::move(dispatch), size = size_, on_done = std::move(on_done),
offset](absl::StatusOr<size_t> result) {
// size is captured because we can't safely use 'this' until we're in the dispatch callback.
if (!result.ok()) {
dispatch([status = result.status(), on_done = std::move(on_done)]() { on_done(status); });
} else if (result.value() != size) {
auto status = absl::AbortedError(
fmt::format("buffer write wrote {} bytes, wanted {}", result.value(), size));
dispatch(
[on_done = std::move(on_done), status = std::move(status)]() { on_done(status); });
} else {
dispatch([this, status = result.status(), offset, on_done = std::move(on_done)] {
data_.emplace<StorageFragment>(offset);
on_done(absl::OkStatus());
});
}
});
}

absl::StatusOr<CancelFunction>
Fragment::fromStorage(AsyncFileHandle file, std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done) {
ASSERT(isStorage());
off_t offset = absl::get<StorageFragment>(data_).offset();
data_.emplace<ReadingFragment>();
return file->read(
offset, size_,
[this, dispatch = std::move(dispatch), size = size_,
on_done = std::move(on_done)](absl::StatusOr<std::unique_ptr<Buffer::Instance>> result) {
// size is captured because we can't safely use 'this' until we're in the dispatch callback.
if (!result.ok()) {
dispatch([on_done = std::move(on_done), status = result.status()]() { on_done(status); });
} else if (result.value()->length() != size) {
auto status = absl::AbortedError(
fmt::format("buffer read got {} bytes, wanted {}", result.value()->length(), size));
dispatch(
[on_done = std::move(on_done), status = std::move(status)]() { on_done(status); });
} else {
auto buffer = std::shared_ptr<Buffer::Instance>(std::move(result.value()));
dispatch([this, on_done = std::move(on_done), buffer = std::move(buffer)]() {
data_.emplace<MemoryFragment>(*buffer);
on_done(absl::OkStatus());
});
}
});
}

} // namespace FileSystemBuffer
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
110 changes: 110 additions & 0 deletions source/extensions/filters/http/file_system_buffer/fragment.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#pragma once
#include <memory>
#include <string>
#include <vector>

#include "envoy/buffer/buffer.h"

#include "source/extensions/common/async_files/async_file_handle.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace FileSystemBuffer {

using Extensions::Common::AsyncFiles::AsyncFileHandle;
using Extensions::Common::AsyncFiles::CancelFunction;

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is in memory.
class MemoryFragment {
public:
explicit MemoryFragment(Buffer::Instance& buffer);
explicit MemoryFragment(Buffer::Instance& buffer, size_t size);
std::unique_ptr<Buffer::Instance> extract();

private:
std::unique_ptr<Buffer::OwnedImpl> buffer_;
};

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is being written to storage.
class WritingFragment {};

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is being read from storage.
class ReadingFragment {};

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is currently in storage.
class StorageFragment {
public:
explicit StorageFragment(off_t offset) : offset_(offset) {}
off_t offset() const { return offset_; }

private:
const off_t offset_;
};

// A Fragment is a piece of the buffer queue used by the filter.
//
// Each fragment may be in memory, on disk, or in an unusable intermediate state
// while controlled by the AsyncFiles library.
//
// The fragments are queued in sequential order, and may not be in the same order
// in the buffer file, as we write "last fragment first" so that the soonest-needed
// fragments remain in memory as long as possible.
//
// Memory fragments are simply `Buffer::Instance`s. Storage fragments contain just
// enough information to reload a memory fragment from the buffer file.
class Fragment {
public:
explicit Fragment(Buffer::Instance& buffer);
explicit Fragment(Buffer::Instance& buffer, size_t size);
~Fragment();

// Starts the transition for this fragment from memory to storage.
//
// The on_done callback is sent to the dispatcher function after the file write completes.
//
// When called from a filter, the dispatcher function must abort without calling the
// callback if the filter or fragment has been destroyed.
absl::StatusOr<CancelFunction> toStorage(AsyncFileHandle file, off_t offset,
std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done);

// Starts the transition for this fragment from storage to memory.
//
// The on_done callback is sent to the dispatcher function after the file read completes.
//
// When called from a filter, the dispatcher function must abort without calling the
// callback if the filter or fragment has been destroyed.
absl::StatusOr<CancelFunction> fromStorage(AsyncFileHandle file,
std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done);

// Moves the buffer from a memory instance to the returned value and resets the fragment to
// size 0.
//
// It is an error to call extract() on a Fragment that is not in memory - an exception
// will be thrown.
Buffer::InstancePtr extract();

// Returns true if the fragment is in memory, false if in storage or transition.
bool isMemory() const;

// Returns true if the fragment is in storage, false if in memory or transition.
bool isStorage() const;

// Returns the size of the fragment in bytes.
size_t size() const { return size_; }

private:
size_t size_;
absl::variant<MemoryFragment, StorageFragment, ReadingFragment, WritingFragment> data_;
};

} // namespace FileSystemBuffer
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
21 changes: 21 additions & 0 deletions test/extensions/filters/http/file_system_buffer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_test",
"envoy_package",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_test(
name = "fragment_test",
srcs = ["fragment_test.cc"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/http/file_system_buffer:fragment",
"//test/extensions/common/async_files:mocks",
"//test/mocks/buffer:buffer_mocks",
"//test/test_common:status_utility_lib",
],
)
Loading