Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions velox/benchmarks/tpch/TpchBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,10 @@ class TpchBenchmark {
static_cast<uint64_t>(FLAGS_ssd_checkpoint_interval_gb) << 30);
}

auto allocator = std::make_shared<memory::MmapAllocator>(options);
allocator_ = std::make_shared<cache::AsyncDataCache>(
allocator, memoryBytes, std::move(ssdCache));
allocator_ = std::make_shared<memory::MmapAllocator>(options);
cache_ =
cache::AsyncDataCache::create(allocator_.get(), std::move(ssdCache));
cache::AsyncDataCache::setInstance(cache_.get());
memory::MemoryAllocator::setDefaultInstance(allocator_.get());
}
functions::prestosql::registerAllScalarFunctions();
Expand Down Expand Up @@ -261,6 +262,10 @@ class TpchBenchmark {
connector::registerConnector(hiveConnector);
}

void shutdown() {
cache_->prepareShutdown();
}

std::pair<std::unique_ptr<TaskCursor>, std::vector<RowVectorPtr>> run(
const TpchPlan& tpchPlan) {
int32_t repeat = 0;
Expand Down Expand Up @@ -382,15 +387,13 @@ class TpchBenchmark {
}
#endif

auto cache = dynamic_cast<cache::AsyncDataCache*>(allocator_.get());
if (cache) {
cache->clear();
if (cache_) {
cache_->clear();
}
}
if (FLAGS_clear_ssd_cache) {
auto cache = dynamic_cast<cache::AsyncDataCache*>(allocator_.get());
if (cache) {
auto ssdCache = cache->ssdCache();
if (cache_) {
auto ssdCache = cache_->ssdCache();
if (ssdCache) {
ssdCache->clear();
}
Expand Down Expand Up @@ -462,7 +465,7 @@ class TpchBenchmark {
std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
std::unique_ptr<folly::IOThreadPoolExecutor> cacheExecutor_;
std::shared_ptr<memory::MemoryAllocator> allocator_;

std::shared_ptr<cache::AsyncDataCache> cache_;
// Parameter combinations to try. Each element specifies a flag and possible
// values. All permutations are tried.
std::vector<ParameterDim> parameters_;
Expand Down Expand Up @@ -578,6 +581,7 @@ int tpchBenchmarkMain() {
} else {
benchmark.runAllCombinations();
}
benchmark.shutdown();
queryBuilder.reset();
return 0;
}
98 changes: 41 additions & 57 deletions velox/common/caching/AsyncDataCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ AsyncDataCacheEntry::AsyncDataCacheEntry(CacheShard* shard) : shard_(shard) {
}

AsyncDataCacheEntry::~AsyncDataCacheEntry() {
shard_->cache()->freeNonContiguous(data_);
shard_->cache()->allocator()->freeNonContiguous(data_);
Comment thread
xiaoxmeng marked this conversation as resolved.
Outdated
}

void AsyncDataCacheEntry::setExclusiveToShared() {
Expand Down Expand Up @@ -108,7 +108,7 @@ void AsyncDataCacheEntry::initialize(FileCacheKey key) {
tinyData_.clear();
auto sizePages = bits::roundUp(size_, memory::AllocationTraits::kPageSize) /
memory::AllocationTraits::kPageSize;
if (cache->allocateNonContiguous(sizePages, data_)) {
if (cache->allocator()->allocateNonContiguous(sizePages, data_)) {
cache->incrementCachedPages(data().numPages());
} else {
// No memory to cover 'this'.
Expand Down Expand Up @@ -324,7 +324,7 @@ void CacheShard::removeEntryLocked(AsyncDataCacheEntry* entry) {
auto numPages = entry->data().numPages();
if (numPages) {
cache_->incrementCachedPages(-numPages);
cache_->freeNonContiguous(entry->data());
cache_->allocator()->freeNonContiguous(entry->data());
}
}
}
Expand Down Expand Up @@ -412,7 +412,7 @@ void CacheShard::evict(uint64_t bytesToFree, bool evictAllUnpinned) {

void CacheShard::freeAllocations(std::vector<memory::Allocation>& allocations) {
for (auto& allocation : allocations) {
cache_->freeNonContiguous(allocation);
cache_->allocator()->freeNonContiguous(allocation);
}
allocations.clear();
}
Expand Down Expand Up @@ -495,24 +495,52 @@ void CacheShard::appendSsdSaveable(std::vector<CachePin>& pins) {
}

AsyncDataCache::AsyncDataCache(
const std::shared_ptr<MemoryAllocator>& allocator,
uint64_t /* maxBytes */,
memory::MemoryAllocator* allocator,
std::unique_ptr<SsdCache> ssdCache)
: allocator_(allocator), ssdCache_(std::move(ssdCache)), cachedPages_(0) {
for (auto i = 0; i < kNumShards; ++i) {
shards_.push_back(std::make_unique<CacheShard>(this));
}
}

AsyncDataCache::AsyncDataCache(
const std::shared_ptr<MemoryAllocator>& allocator,
std::unique_ptr<SsdCache> ssdCache)
: allocator_(allocator), ssdCache_(std::move(ssdCache)), cachedPages_(0) {
for (auto i = 0; i < kNumShards; ++i) {
shards_.push_back(std::make_unique<CacheShard>(this));
AsyncDataCache::~AsyncDataCache() {}

// static
std::shared_ptr<AsyncDataCache> AsyncDataCache::create(
memory::MemoryAllocator* allocator,
std::unique_ptr<SsdCache> ssdCache) {
auto cache = std::make_shared<AsyncDataCache>(allocator, std::move(ssdCache));
allocator->registerCache(cache);
return cache;
}

// static
AsyncDataCache* AsyncDataCache::getInstance() {
return *getInstancePtr();
}

// static
void AsyncDataCache::setInstance(AsyncDataCache* asyncDataCache) {
*getInstancePtr() = asyncDataCache;
}

// static
AsyncDataCache** AsyncDataCache::getInstancePtr() {
static AsyncDataCache* cache_{nullptr};
return &cache_;
}

void AsyncDataCache::prepareShutdown() {
for (auto& shard : shards_) {
shard->prepareShutdown();
}
}

void CacheShard::prepareShutdown() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just name it as shutdown()

entries_.clear();
freeEntries_.clear();
}

CachePin AsyncDataCache::findOrCreate(
RawFileCacheKey key,
uint64_t size,
Expand Down Expand Up @@ -611,50 +639,6 @@ void AsyncDataCache::backoff(int32_t counter) {
std::this_thread::sleep_for(std::chrono::microseconds(usec)); // NOLINT
}

bool AsyncDataCache::allocateNonContiguous(
MachinePageCount numPages,
memory::Allocation& out,
ReservationCallback reservationCB,
MachinePageCount minSizeClass) {
return makeSpace(numPages, [&]() {
return allocator_->allocateNonContiguous(
numPages, out, reservationCB, minSizeClass);
});
}

bool AsyncDataCache::allocateContiguous(
memory::MachinePageCount numPages,
memory::Allocation* collateral,
memory::ContiguousAllocation& allocation,
ReservationCallback reservationCB,
memory::MachinePageCount maxPages) {
return makeSpace(numPages, [&]() {
return allocator_->allocateContiguous(
numPages, collateral, allocation, reservationCB, maxPages);
});
}

bool AsyncDataCache::growContiguous(
MachinePageCount increment,
memory::ContiguousAllocation& allocation,
ReservationCallback reservationCB) {
return makeSpace(increment, [&]() {
return allocator_->growContiguous(increment, allocation, reservationCB);
});
}

void* AsyncDataCache::allocateBytes(uint64_t bytes, uint16_t alignment) {
void* result = nullptr;
makeSpace(
bits::roundUp(bytes, memory::AllocationTraits::kPageSize) /
memory::AllocationTraits::kPageSize,
[&]() {
result = allocator_->allocateBytes(bytes, alignment);
return result != nullptr;
});
return result;
}

void AsyncDataCache::incrementNew(uint64_t size) {
newBytes_ += size;
if (!ssdCache_) {
Expand Down Expand Up @@ -725,7 +709,7 @@ std::string AsyncDataCache::toString() const {
<< " read pins " << stats.numShared << " write pins "
<< stats.numExclusive << " unused prefetch " << stats.numPrefetch
<< " Alloc Megaclocks " << (stats.allocClocks >> 20)
<< " allocated pages " << numAllocated() << " cached pages "
<< " allocated pages " << allocator_->numAllocated() << " cached pages "
<< cachedPages_;
out << "\nBacking: " << allocator_->toString();
if (ssdCache_) {
Expand Down
Loading