Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp
/*/extensions/key_value @alyssawilk @ryantheoptimist
# Config Validators
/*/extensions/config/validators/minimum_clusters @adisuissa @htuch
# support library for filesystem based extensions
# File system based extensions
/*/extensions/common/async_files @mattklein123 @ravenblack
/*/extensions/filters/http/file_system_buffer @mattklein123 @ravenblack
# Google Cloud Platform Authentication Filter
/*/extensions/filters/http/gcp_authn @tyxia @yanavlasov
# DNS resolution
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/filters/http/file_system_buffer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "fragment",
srcs = ["fragment.cc"],
hdrs = ["fragment.h"],
deps = [
"//source/extensions/common/async_files",
],
)
97 changes: 97 additions & 0 deletions source/extensions/filters/http/file_system_buffer/fragment.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include "source/extensions/filters/http/file_system_buffer/fragment.h"

#include "source/common/buffer/buffer_impl.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace FileSystemBuffer {

Fragment::Fragment(Buffer::Instance& buffer)
: size_(buffer.length()), data_(MemoryFragment(buffer)) {}

Fragment::Fragment(Buffer::Instance& buffer, size_t size)
: size_(size), data_(MemoryFragment(buffer, size)) {}

Fragment::~Fragment() = default;
bool Fragment::isMemory() const { return absl::holds_alternative<MemoryFragment>(data_); }
bool Fragment::isStorage() const { return absl::holds_alternative<StorageFragment>(data_); }

MemoryFragment::MemoryFragment(Buffer::Instance& buffer)
: buffer_(std::make_unique<Buffer::OwnedImpl>()) {
buffer_->move(buffer);
}

MemoryFragment::MemoryFragment(Buffer::Instance& buffer, size_t size)
: buffer_(std::make_unique<Buffer::OwnedImpl>()) {
buffer_->move(buffer, size);
}

std::unique_ptr<Buffer::Instance> Fragment::extract() {
auto ret = absl::get<MemoryFragment>(data_).extract();
size_ = 0;
return ret;
}

std::unique_ptr<Buffer::Instance> MemoryFragment::extract() { return std::move(buffer_); }

absl::StatusOr<CancelFunction>
Fragment::toStorage(AsyncFileHandle file, off_t offset,
std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done) {
ASSERT(isMemory());
auto data = absl::get<MemoryFragment>(data_).extract();
data_.emplace<WritingFragment>();
return file->write(
*data, offset,
[this, dispatch = std::move(dispatch), size = size_, on_done = std::move(on_done),
offset](absl::StatusOr<size_t> result) {
// size is captured because we can't safely use 'this' until we're in the dispatch callback.
if (!result.ok()) {
dispatch([status = result.status(), on_done = std::move(on_done)]() { on_done(status); });
} else if (result.value() != size) {
auto status = absl::AbortedError(
fmt::format("buffer write wrote {} bytes, wanted {}", result.value(), size));
dispatch(
[on_done = std::move(on_done), status = std::move(status)]() { on_done(status); });
} else {
dispatch([this, status = result.status(), offset, on_done = std::move(on_done)] {
data_.emplace<StorageFragment>(offset);
on_done(absl::OkStatus());
});
}
});
}

absl::StatusOr<CancelFunction>
Fragment::fromStorage(AsyncFileHandle file, std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done) {
ASSERT(isStorage());
off_t offset = absl::get<StorageFragment>(data_).offset();
data_.emplace<ReadingFragment>();
return file->read(
offset, size_,
[this, dispatch = std::move(dispatch), size = size_,
on_done = std::move(on_done)](absl::StatusOr<std::unique_ptr<Buffer::Instance>> result) {
// size is captured because we can't safely use 'this' until we're in the dispatch callback.
if (!result.ok()) {
dispatch([on_done = std::move(on_done), status = result.status()]() { on_done(status); });
} else if (result.value()->length() != size) {
auto status = absl::AbortedError(
fmt::format("buffer read got {} bytes, wanted {}", result.value()->length(), size));
dispatch(
[on_done = std::move(on_done), status = std::move(status)]() { on_done(status); });
} else {
dispatch([this, on_done = std::move(on_done),
data = std::shared_ptr<Buffer::Instance>(result.value().release())]() {
data_.emplace<MemoryFragment>(*data);
on_done(absl::OkStatus());
});
}
});
}

} // namespace FileSystemBuffer
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
110 changes: 110 additions & 0 deletions source/extensions/filters/http/file_system_buffer/fragment.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#pragma once
#include <memory>
#include <string>
#include <vector>

#include "envoy/buffer/buffer.h"

#include "source/extensions/common/async_files/async_file_handle.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace FileSystemBuffer {

using Extensions::Common::AsyncFiles::AsyncFileHandle;
using Extensions::Common::AsyncFiles::CancelFunction;

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is in memory.
class MemoryFragment {
public:
explicit MemoryFragment(Buffer::Instance& buffer);
explicit MemoryFragment(Buffer::Instance& buffer, size_t size);
std::unique_ptr<Buffer::Instance> extract();

private:
std::unique_ptr<Buffer::OwnedImpl> buffer_;
};

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is being written to storage.
class WritingFragment {};

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is being read from storage.
class ReadingFragment {};

// Internal implementation detail exposed for use in Fragment's variant.
// Represents a buffer fragment that is currently in storage.
class StorageFragment {
public:
explicit StorageFragment(off_t offset) : offset_(offset) {}
off_t offset() const { return offset_; }

private:
const off_t offset_;
};

// A Fragment is a piece of the buffer queue used by the filter.
//
// Each fragment may be in memory, on disk, or in an unusable intermediate state
// while controlled by the AsyncFiles library.
//
// The fragments are queued in sequential order, and may not be in the same order
// in the buffer file, as we write "last fragment first" so that the soonest-needed
// fragments remain in memory as long as possible.
//
// Memory fragments are simply `Buffer::Instance`s. Storage fragments contain just
// enough information to reload a memory fragment from the buffer file.
class Fragment {
public:
explicit Fragment(Buffer::Instance& buffer);
explicit Fragment(Buffer::Instance& buffer, size_t size);
~Fragment();

// Starts the transition for this fragment from memory to storage.
//
// The on_done callback is sent to the dispatcher function after the file write completes.
//
// When called from a filter, the dispatcher function must abort without calling the
// callback if the filter or fragment has been destroyed.
absl::StatusOr<CancelFunction> toStorage(AsyncFileHandle file, off_t offset,
std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done);

// Starts the transition for this fragment from storage to memory.
//
// The on_done callback is sent to the dispatcher function after the file read completes.
//
// When called from a filter, the dispatcher function must abort without calling the
// callback if the filter or fragment has been destroyed.
absl::StatusOr<CancelFunction> fromStorage(AsyncFileHandle file,
std::function<void(std::function<void()>)> dispatch,
std::function<void(absl::Status)> on_done);

// Moves the buffer from a memory instance to the returned value and resets the fragment to
// size 0.
//
// It is an error to call extract() on a Fragment that is not in memory - an exception
// will be thrown.
Buffer::InstancePtr extract();

// Returns true if the fragment is in memory, false if in storage or transition.
bool isMemory() const;

// Returns true if the fragment is in storage, false if in memory or transition.
bool isStorage() const;

// Returns the size of the fragment in bytes.
size_t size() const { return size_; }

private:
size_t size_;
absl::variant<MemoryFragment, StorageFragment, ReadingFragment, WritingFragment> data_;
};

} // namespace FileSystemBuffer
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
21 changes: 21 additions & 0 deletions test/extensions/filters/http/file_system_buffer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_test",
"envoy_package",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_test(
name = "fragment_test",
srcs = ["fragment_test.cc"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/http/file_system_buffer:fragment",
"//test/extensions/common/async_files:mocks",
"//test/mocks/buffer:buffer_mocks",
"//test/test_common:status_utility_lib",
],
)
Loading