Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Force evict when downloading vector index files #9391

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 118 additions & 13 deletions dbms/src/Storages/S3/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <chrono>
#include <cmath>
#include <filesystem>
#include <magic_enum.hpp>
#include <queue>

namespace ProfileEvents
{
Expand Down Expand Up @@ -210,7 +212,7 @@ FileSegmentPtr FileCache::get(const S3::S3FilenameView & s3_fname, const std::op
// We don't know the exact size of a object/file, but we need reserve space to save the object/file.
// A certain amount of space is reserved for each file type.
auto estimzted_size = filesize ? *filesize : getEstimatedSizeOfFileType(file_type);
if (!reserveSpaceImpl(file_type, estimzted_size, /*try_evict*/ true))
if (!reserveSpaceImpl(file_type, estimzted_size, EvictMode::TryEvict))
{
// Space not enough.
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_full).Increment();
Expand Down Expand Up @@ -258,7 +260,7 @@ FileSegmentPtr FileCache::getOrWait(const S3::S3FilenameView & s3_fname, const s
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_miss).Increment();

auto estimated_size = filesize ? *filesize : getEstimatedSizeOfFileType(file_type);
if (!reserveSpaceImpl(file_type, estimated_size, /*try_evict*/ true))
if (!reserveSpaceImpl(file_type, estimated_size, EvictMode::ForceEvict))
{
// Space not enough.
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_full).Increment();
Expand Down Expand Up @@ -361,20 +363,25 @@ std::pair<Int64, std::list<String>::iterator> FileCache::removeImpl(
return {release_size, table.remove(s3_key)};
}

bool FileCache::reserveSpaceImpl(FileType reserve_for, UInt64 size, bool try_evict)
bool FileCache::reserveSpaceImpl(FileType reserve_for, UInt64 size, EvictMode evict)
{
if (cache_used + size <= cache_capacity)
{
cache_used += size;
CurrentMetrics::set(CurrentMetrics::DTFileCacheUsed, cache_used);
return true;
}
if (try_evict)
if (evict == EvictMode::TryEvict || evict == EvictMode::ForceEvict)
{
UInt64 min_evict_size = size - (cache_capacity - cache_used);
LOG_DEBUG(log, "tryEvictFile for {} min_evict_size={}", magic_enum::enum_name(reserve_for), min_evict_size);
tryEvictFile(reserve_for, min_evict_size);
return reserveSpaceImpl(reserve_for, size, /*try_evict*/ false);
LOG_DEBUG(
log,
"tryEvictFile for {} min_evict_size={} evict_mode={}",
magic_enum::enum_name(reserve_for),
min_evict_size,
magic_enum::enum_name(evict));
tryEvictFile(reserve_for, min_evict_size, evict);
return reserveSpaceImpl(reserve_for, size, EvictMode::NoEvict);
}
return false;
}
Expand All @@ -383,21 +390,27 @@ bool FileCache::reserveSpaceImpl(FileType reserve_for, UInt64 size, bool try_evi
// Distinguish cache priority according to file type. The larger the file type, the lower the priority.
// First, try to evict files which not be used recently with the same type. => Try to evict old files.
// Second, try to evict files with lower priority. => Try to evict lower priority files.
// Finally, evict files with higher priority, if space is still not sufficient. Higher priority files
// are usually smaller. If we don't evict them, it is very possible that cache is full of these higher
// priority small files and we can't effectively cache any lower-priority large files.
std::vector<FileType> FileCache::getEvictFileTypes(FileType evict_for)
{
std::vector<FileType> evict_types;
evict_types.push_back(evict_for); // First, try evict with the same file type.
constexpr auto all_file_types = magic_enum::enum_values<FileType>(); // all_file_types are sorted by enum value.
// Second, try evict from the lower proirity file type.
for (auto itr = std::rbegin(all_file_types); itr != std::rend(all_file_types) && *itr > evict_for; ++itr)
for (auto itr = std::rbegin(all_file_types); itr != std::rend(all_file_types); ++itr)
{
evict_types.push_back(*itr);
if (*itr != evict_for)
evict_types.push_back(*itr);
}
return evict_types;
}

void FileCache::tryEvictFile(FileType evict_for, UInt64 size)
void FileCache::tryEvictFile(FileType evict_for, UInt64 size, EvictMode evict)
{
RUNTIME_CHECK(evict != EvictMode::NoEvict);

auto file_types = getEvictFileTypes(evict_for);
for (auto evict_from : file_types)
{
Expand All @@ -414,9 +427,18 @@ void FileCache::tryEvictFile(FileType evict_for, UInt64 size)
}
else
{
size = 0;
break;
}
}

if (size > 0 && evict == EvictMode::ForceEvict)
{
// After a series of tryEvict, the space is still not sufficient,
// so we do a force eviction.
auto evicted_size = forceEvict(size);
LOG_DEBUG(log, "forceEvict required_size={} evicted_size={}", size, evicted_size);
}
}

UInt64 FileCache::tryEvictFrom(FileType evict_for, UInt64 size, FileType evict_from)
Expand Down Expand Up @@ -460,10 +482,93 @@ UInt64 FileCache::tryEvictFrom(FileType evict_for, UInt64 size, FileType evict_f
return total_released_size;
}

bool FileCache::reserveSpace(FileType reserve_for, UInt64 size, bool try_evict)
struct ForceEvictCandidate
{
UInt64 file_type_slot;
String s3_key;
FileSegmentPtr file_segment;
std::chrono::time_point<std::chrono::system_clock> last_access_time; // Order by this field
};

struct ForceEvictCandidateComparer
{
bool operator()(ForceEvictCandidate a, ForceEvictCandidate b) { return a.last_access_time > b.last_access_time; }
};

UInt64 FileCache::forceEvict(UInt64 size_to_evict)
{
if (size_to_evict == 0)
return 0;

// For a force evict, we simply evict from the oldest to the newest, until
// space is sufficient.

std::priority_queue<ForceEvictCandidate, std::vector<ForceEvictCandidate>, ForceEvictCandidateComparer>
evict_candidates;

// First, pick an item from all levels.

size_t total_released_size = 0;

constexpr auto all_file_types = magic_enum::enum_values<FileType>();
std::vector<std::list<String>::iterator> each_type_lru_iters; // Stores the iterator of next candicate to add
each_type_lru_iters.reserve(all_file_types.size());
for (const auto file_type : all_file_types)
{
auto file_type_slot = static_cast<UInt64>(file_type);
auto iter = tables[file_type_slot].begin();
if (iter != tables[file_type_slot].end())
{
const auto & s3_key = *iter;
const auto & f = tables[file_type_slot].get(s3_key, /*update_lru*/ false);
evict_candidates.emplace(ForceEvictCandidate{
.file_type_slot = file_type_slot,
.s3_key = s3_key,
.file_segment = f,
.last_access_time = f->getLastAccessTime(),
});
iter++;
}
each_type_lru_iters.emplace_back(iter);
}

// Then we iterate the heap to remove the file with oldest access time.

while (!evict_candidates.empty())
{
auto to_evict = evict_candidates.top(); // intentionally copy
evict_candidates.pop();

const auto file_type_slot = to_evict.file_type_slot;
if (each_type_lru_iters[file_type_slot] != tables[file_type_slot].end())
{
const auto s3_key = *each_type_lru_iters[file_type_slot];
const auto & f = tables[file_type_slot].get(s3_key, /*update_lru*/ false);
evict_candidates.emplace(ForceEvictCandidate{
.file_type_slot = file_type_slot,
.s3_key = s3_key,
.file_segment = f,
.last_access_time = f->getLastAccessTime(),
});
each_type_lru_iters[file_type_slot]++;
}

auto [released_size, next_itr] = removeImpl(tables[file_type_slot], to_evict.s3_key, to_evict.file_segment);
LOG_DEBUG(log, "ForceEvict {} size={}", to_evict.s3_key, released_size);
if (released_size >= 0) // removed
{
total_released_size += released_size;
if (total_released_size >= size_to_evict)
break;
}
}
return total_released_size;
}

bool FileCache::reserveSpace(FileType reserve_for, UInt64 size, EvictMode evict)
{
std::lock_guard lock(mtx);
return reserveSpaceImpl(reserve_for, size, try_evict);
return reserveSpaceImpl(reserve_for, size, evict);
}

void FileCache::releaseSpaceImpl(UInt64 size)
Expand Down Expand Up @@ -551,7 +656,7 @@ bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size,
if (content_length > reserved_size)
{
// Need more space.
return reserveSpace(reserve_for, content_length - reserved_size, /*try_evict*/ true);
return reserveSpace(reserve_for, content_length - reserved_size, EvictMode::TryEvict);
}
else if (content_length < reserved_size)
{
Expand Down
21 changes: 18 additions & 3 deletions dbms/src/Storages/S3/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ class FileSegment
return status;
}

auto getLastAccessTime() const
{
std::unique_lock lock(mtx);
return last_access_time;
}

private:
mutable std::mutex mtx;
const String local_fname;
Expand Down Expand Up @@ -303,14 +309,23 @@ class FileCache
static FileSegment::FileType getFileType(const String & fname);
static FileSegment::FileType getFileTypeOfColData(const std::filesystem::path & p);
bool canCache(FileSegment::FileType file_type) const;
bool reserveSpaceImpl(FileSegment::FileType reserve_for, UInt64 size, bool try_evict);

enum class EvictMode
{
NoEvict,
TryEvict,
ForceEvict,
};

bool reserveSpaceImpl(FileSegment::FileType reserve_for, UInt64 size, EvictMode evict);
void releaseSpaceImpl(UInt64 size);
void releaseSpace(UInt64 size);
bool reserveSpace(FileSegment::FileType reserve_for, UInt64 size, bool try_evict);
bool reserveSpace(FileSegment::FileType reserve_for, UInt64 size, EvictMode evict);
bool finalizeReservedSize(FileSegment::FileType reserve_for, UInt64 reserved_size, UInt64 content_length);
static std::vector<FileSegment::FileType> getEvictFileTypes(FileSegment::FileType evict_for);
void tryEvictFile(FileSegment::FileType evict_for, UInt64 size);
void tryEvictFile(FileSegment::FileType evict_for, UInt64 size, EvictMode evict);
UInt64 tryEvictFrom(FileSegment::FileType evict_for, UInt64 size, FileSegment::FileType evict_from);
UInt64 forceEvict(UInt64 size);

// This function is used for test.
std::vector<FileSegmentPtr> getAll();
Expand Down
Loading