Skip to content

Commit

Permalink
call fsync with delay
Browse files Browse the repository at this point in the history
  • Loading branch information
artpaul authored Sep 16, 2024
1 parent 383c1d1 commit 136dcff
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 23 deletions.
59 changes: 47 additions & 12 deletions include/bitcask/bitcask.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <condition_variable>
#include <filesystem>
#include <functional>
#include <future>
#include <limits>
#include <memory>
#include <optional>
Expand All @@ -19,20 +20,32 @@ struct iovec;

namespace bitcask {

enum class FlushMode {
kNone = 0,
/// Flush in-core data only when closing an active file.
kOnClose,
/// Delay flushing of the written data.
kDelay,
/// Flush in-core data after each write.
kImmediately,
};

struct Options {
/// Number of active files.
uint8_t active_files = 1;

uint8_t compaction_levels = 2;
/// Mode of flushing in-core data to storage device.
FlushMode flush_mode = FlushMode::kNone;

std::chrono::nanoseconds flush_delay = std::chrono::milliseconds(1);

uint32_t max_file_size = std::numeric_limits<uint32_t>::max();

uint8_t compaction_levels = 2;

/// If the database has been loaded successfully, clean up any temporary files at startup.
bool clean_temporary_on_startup = true;

/// Flush in-core data to storage device after write.
bool data_sync = false;

/// If true, the store will be opened in read-only mode.
bool read_only = false;

Expand Down Expand Up @@ -71,6 +84,26 @@ class Database {
kGather = 2,
};

struct FileInfo;
struct WaitQueue;

struct ActiveFile {
/// Provides exclusive access for writing to the active file.
std::mutex write_mutex;
/// A file object designated for writing.
std::shared_ptr<FileInfo> file;

std::chrono::steady_clock::time_point last_write;
// Wait queue for delayed flushes.
std::weak_ptr<WaitQueue> wait_queue;

public:
std::error_code FlushWithDelay(
const std::chrono::nanoseconds delay, std::unique_lock<std::mutex> write_lock);

bool MaybeFlushImmediately(const std::chrono::nanoseconds delay);
};

struct FileInfo {
/// Path to the data file.
std::filesystem::path path;
Expand Down Expand Up @@ -130,6 +163,15 @@ class Database {
std::error_code EnsureReadable();
};

struct WaitQueue {
std::condition_variable cond;
std::promise<std::error_code> promise;
std::shared_future<std::error_code> result;

public:
WaitQueue() : result(promise.get_future()) {}
};

struct FileSections {
using Range = std::pair<uint64_t, uint64_t>;

Expand Down Expand Up @@ -247,7 +289,7 @@ class Database {
* @returns Descriptor of the written record or an error code if the write was unsuccessful.
*/
std::pair<Record, std::error_code> WriteEntry(const std::string_view key, const std::string_view value,
const uint64_t timestamp, const bool is_tombstone, const bool sync);
const uint64_t timestamp, const bool is_tombstone, const FlushMode flush_mode);

/**
* Appends index to the end of file.
Expand All @@ -269,13 +311,6 @@ class Database {
const std::function<FileInfoStatus(uint64_t)>& cb);

private:
struct ActiveFile {
/// Provides exclusive access for writing to the active file.
std::mutex write_mutex;
/// A file object designated for writing.
std::shared_ptr<FileInfo> file;
};

struct StringHash {
using is_transparent = void;

Expand Down
93 changes: 84 additions & 9 deletions src/bitcask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,53 @@ std::pair<size_t, std::error_code> ReadEntryImpl(const int fd, const size_t offs

} // namespace

std::error_code Database::ActiveFile::FlushWithDelay(
const std::chrono::nanoseconds delay, std::unique_lock<std::mutex> write_lock) {
// Check the thread is ownign the lock.
assert(write_lock.owns_lock());

auto queue = wait_queue.lock();

if (queue) {
write_lock.unlock();
// Wait for the result.
return queue->result.get();
} else {
this->wait_queue = (queue = std::make_shared<WaitQueue>());
}

const auto deadline = std::chrono::steady_clock::now() + delay;
// Wait until deadline.
while (queue->cond.wait_until(write_lock, deadline) != std::cv_status::timeout) {
if (!file) {
break;
}
}

std::error_code e;
// Flush the data if the file is still writeable.
if (file) {
if (int ret = ::fsync(file->fd); ret != 0) {
e = std::make_error_code(static_cast<std::errc>(errno));
}
}
// Notify other waitees.
queue->promise.set_value(e);
// Reset the queue.
this->wait_queue = {};

return e;
}

bool Database::ActiveFile::MaybeFlushImmediately(const std::chrono::nanoseconds delay) {
const auto now = std::chrono::steady_clock::now();
const bool result = (now - last_write) > delay;

last_write = now;

return result;
}

std::error_code Database::FileInfo::Append(const std::span<const iovec>& parts, const bool sync) noexcept {
const size_t length = std::accumulate(
parts.begin(), parts.end(), 0ul, [](const auto acc, const auto& p) { return acc + p.iov_len; });
Expand Down Expand Up @@ -191,7 +238,7 @@ Database::~Database() {
// Close writable files.
for (const auto& item : active_files_) {
if (item.file) {
item.file->CloseFile(options_.data_sync);
item.file->CloseFile(options_.flush_mode != FlushMode::kNone);
}
}
// Close read-only files.
Expand Down Expand Up @@ -232,8 +279,13 @@ std::error_code Database::Delete(const WriteOptions& options, const std::string_
}

[[maybe_unused]] Defer d([this, key] { UnlockKey(key); });
//
const auto flush_mode = (options.sync && (options_.flush_mode == FlushMode::kNone ||
options_.flush_mode == FlushMode::kOnClose))
? FlushMode::kImmediately
: options_.flush_mode;
// Write the tombstone.
auto [_, ec] = WriteEntry(key, {}, timestamp.value(), true, options.sync);
auto [_, ec] = WriteEntry(key, {}, timestamp.value(), true, flush_mode);
if (ec) {
return ec;
}
Expand Down Expand Up @@ -315,8 +367,13 @@ std::error_code Database::Put(
const uint64_t timestamp = ++clock_;

[[maybe_unused]] Defer d([this, key] { UnlockKey(key); });
//
const auto flush_mode = (options.sync && (options_.flush_mode == FlushMode::kNone ||
options_.flush_mode == FlushMode::kOnClose))
? FlushMode::kImmediately
: options_.flush_mode;
// Write the value with the specific timestamp.
auto [record, ec] = WriteEntry(key, value, timestamp, false, options.sync);
auto [record, ec] = WriteEntry(key, value, timestamp, false, flush_mode);
if (ec) {
return ec;
}
Expand Down Expand Up @@ -472,6 +529,10 @@ std::error_code Database::Rotate() {
for (auto& item : active_files_) {
// Acquire exclusive access for writing to active file.
std::lock_guard write_lock(item.write_mutex);
//
if (auto wait_queue = item.wait_queue.lock()) {
wait_queue->cond.notify_all();
}
// Check if there is a file opened for writing.
if (item.file) {
files.push_back(std::move(item.file));
Expand All @@ -480,7 +541,7 @@ std::error_code Database::Rotate() {

// Close files for writing.
for (const auto& f : files) {
f->CloseFile(options_.data_sync);
f->CloseFile(options_.flush_mode != FlushMode::kNone);
}
// Acquire exclusive access to the list of data files.
std::lock_guard file_lock(file_mutex_);
Expand Down Expand Up @@ -947,7 +1008,7 @@ std::error_code Database::ReadValue(
}

std::pair<Database::Record, std::error_code> Database::WriteEntry(const std::string_view key,
const std::string_view value, const uint64_t timestamp, const bool is_tombstone, const bool sync) {
const std::string_view value, const uint64_t timestamp, const bool is_tombstone, FlushMode flush_mode) {
ActiveFile& active_file = active_files_.size() == 1
? active_files_[0]
: active_files_[XXH64(key.data(), key.size(), 0) % active_files_.size()];
Expand All @@ -962,7 +1023,10 @@ std::pair<Database::Record, std::error_code> Database::WriteEntry(const std::str
// is not enough space left for writing.
if (active_file.file) {
if (IsCapacityExceeded(active_file.file->size, length)) {
active_file.file->CloseFile(sync);
active_file.file->CloseFile(flush_mode != FlushMode::kNone);
if (auto wait_queue = active_file.wait_queue.lock()) {
wait_queue->cond.notify_all();
}

// Acquire exclusive access to list of data files.
std::lock_guard file_lock(file_mutex_);
Expand All @@ -985,7 +1049,18 @@ std::pair<Database::Record, std::error_code> Database::WriteEntry(const std::str
return {active_file.file, {}};
};

return WriteEntryToFile(key, value, timestamp, is_tombstone, sync, file_provider);
if (flush_mode == FlushMode::kDelay && active_file.MaybeFlushImmediately(options_.flush_delay)) {
flush_mode = FlushMode::kImmediately;
}

auto [record, ec] = WriteEntryToFile(
key, value, timestamp, is_tombstone, flush_mode == FlushMode::kImmediately, file_provider);
// Return any error immediately.
if (ec || flush_mode != FlushMode::kDelay) {
return {record, ec};
}

return {record, active_file.FlushWithDelay(options_.flush_delay, std::move(write_lock))};
}

std::error_code Database::WriteIndex(const std::shared_ptr<FileInfo>& file) {
Expand Down Expand Up @@ -1106,9 +1181,9 @@ std::pair<Database::Record, std::error_code> Database::WriteEntryToFile(const st

// Count entries.
if (is_tombstone) {
file->tombstones.fetch_add(1);
file->tombstones.fetch_add(1, std::memory_order_relaxed);
} else {
file->records.fetch_add(1);
file->records.fetch_add(1, std::memory_order_relaxed);
}

const Record record{
Expand Down
33 changes: 31 additions & 2 deletions test/tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
namespace {

constexpr bitcask::Options kDefaultOptions{
.compaction_levels = 1,
.max_file_size = 32ull << 20,
.compaction_levels = 1,
};

} // namespace
Expand Down Expand Up @@ -130,7 +130,7 @@ TEST_CASE("Read after reopen") {
TemporaryDirectory temp_dir;
std::unique_ptr<bitcask::Database> db;
auto options = kDefaultOptions;
options.data_sync = true;
options.flush_mode = bitcask::FlushMode::kImmediately;

REQUIRE_FALSE(bitcask::Database::Open(options, temp_dir.GetPath(), db));
REQUIRE_FALSE(db->Put({}, "abc", "test"));
Expand Down Expand Up @@ -234,6 +234,35 @@ TEST_CASE("Write multiple active files") {
CHECK(tmp == "some content larger than header512");
}

TEST_CASE("Flush with delay") {
TemporaryDirectory temp_dir;
std::unique_ptr<bitcask::Database> db;

REQUIRE_FALSE(bitcask::Database::Open(
{.flush_mode = bitcask::FlushMode::kDelay, .max_file_size = 16 << 10}, temp_dir.GetPath(), db));

std::vector<std::thread> threads;
const std::string value = "some content larger than header";
const auto do_write = [&] {
for (size_t i = rand() % 10; i < 50; ++i) db->Put({}, std::to_string(i), value);
};

threads.emplace_back(do_write);
threads.emplace_back(do_write);
threads.emplace_back(do_write);
threads.emplace_back(do_write);
threads.emplace_back(do_write);

for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
std::string tmp;
auto s = db->Get({}, "12", &tmp);
CHECK(tmp == value);
}

TEST_CASE("Write single key by multiple threads") {
TemporaryDirectory temp_dir;
std::unique_ptr<bitcask::Database> db;
Expand Down

0 comments on commit 136dcff

Please sign in to comment.