Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 110 additions & 16 deletions presto-native-execution/presto_cpp/main/QueryContextManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,68 @@ inline QueryId queryIdFromTaskId(const TaskId& taskId) {

} // namespace

std::shared_ptr<velox::core::QueryCtx> QueryContextCache::get(
const protocol::QueryId& queryId) {
auto iter = queryCtxs_.find(queryId);
if (iter == queryCtxs_.end()) {
return nullptr;
}

queryIds_.erase(iter->second.idListIterator);

if (auto queryCtx = iter->second.queryCtx.lock()) {
// Move the queryId to front, if queryCtx is still alive.
queryIds_.push_front(queryId);
iter->second.idListIterator = queryIds_.begin();
return queryCtx;
}
queryCtxs_.erase(iter);
return nullptr;
}

std::shared_ptr<velox::core::QueryCtx> QueryContextCache::insert(
const protocol::QueryId& queryId,
std::shared_ptr<velox::core::QueryCtx> queryCtx) {
if (queryCtxs_.size() >= capacity_) {
evict();
}
queryIds_.push_front(queryId);
queryCtxs_[queryId] = {
folly::to_weak_ptr(queryCtx), queryIds_.begin(), false};
return queryCtx;
}

bool QueryContextCache::hasStartedTasks(
const protocol::QueryId& queryId) const {
auto iter = queryCtxs_.find(queryId);
if (iter != queryCtxs_.end()) {
return iter->second.hasStartedTasks;
}
return false;
}

void QueryContextCache::setTasksStarted(const protocol::QueryId& queryId) {
auto iter = queryCtxs_.find(queryId);
if (iter != queryCtxs_.end()) {
iter->second.hasStartedTasks = true;
}
}

void QueryContextCache::evict() {
// Evict least recently used queryCtx if it is not referenced elsewhere.
for (auto victim = queryIds_.end(); victim != queryIds_.begin();) {
--victim;
if (!queryCtxs_[*victim].queryCtx.lock()) {
queryCtxs_.erase(*victim);
queryIds_.erase(victim);
return;
}
}

// All queries are still inflight. Increase capacity.
capacity_ = std::max(kInitialCapacity, capacity_ * 2);
Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Unbounded growth of cache capacity could lead to excessive memory usage.

Please set an upper limit on cache capacity or add a warning to prevent uncontrolled memory usage.

Suggested change
// All queries are still inflight. Increase capacity.
capacity_ = std::max(kInitialCapacity, capacity_ * 2);
// All queries are still inflight. Increase capacity, but do not exceed kMaxCapacity.
constexpr size_t kMaxCapacity = 1024; // Set an appropriate upper limit.
size_t newCapacity = std::max(kInitialCapacity, capacity_ * 2);
if (newCapacity > kMaxCapacity) {
newCapacity = kMaxCapacity;
LOG(WARNING) << "QueryContextCache capacity reached maximum limit (" << kMaxCapacity << ").";
}
capacity_ = newCapacity;

}

QueryContextManager::QueryContextManager(
folly::Executor* driverExecutor,
folly::Executor* spillerExecutor)
Expand All @@ -43,25 +105,58 @@ std::shared_ptr<velox::core::QueryCtx>
QueryContextManager::findOrCreateQueryCtx(
const protocol::TaskId& taskId,
const protocol::TaskUpdateRequest& taskUpdateRequest) {
return findOrCreateQueryCtx(
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
return findOrCreateQueryCtxLocked(
taskId,
toVeloxConfigs(
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
toConnectorConfigs(taskUpdateRequest));
}

std::shared_ptr<velox::core::QueryCtx>
QueryContextManager::findOrCreateBatchQueryCtx(
const protocol::TaskId& taskId,
const protocol::TaskUpdateRequest& taskUpdateRequest) {
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
auto queryCtx = findOrCreateQueryCtxLocked(
taskId,
toVeloxConfigs(
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
toConnectorConfigs(taskUpdateRequest));
if (queryCtx->pool()->aborted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Accessing queryCtx->pool() without null check may risk dereferencing a null pointer.

Add a null check for queryCtx before accessing pool() to prevent undefined behavior if findOrCreateQueryCtxLocked returns nullptr.

// In Batch mode, only one query is running at a time. When tasks fail
// during memory arbitration, the query memory pool will be set
// aborted, failing any successive tasks immediately. Yet one task
// should not fail other newly admitted tasks because of task retries
// and server reuse. Failure control among tasks should be
// independent. So if query memory pool is aborted already, a cache clear is
// performed to allow successive tasks to create a new query context to
// continue execution.
VELOX_CHECK_EQ(queryContextCache_.size(), 1);
queryContextCache_.clear();
queryCtx = findOrCreateQueryCtxLocked(
taskId,
toVeloxConfigs(
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
toConnectorConfigs(taskUpdateRequest));
}
return queryCtx;
}

bool QueryContextManager::queryHasStartedTasks(
const protocol::TaskId& taskId) const {
return queryContextCache_.rlock()->hasStartedTasks(queryIdFromTaskId(taskId));
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
return queryContextCache_.hasStartedTasks(queryIdFromTaskId(taskId));
}

void QueryContextManager::setQueryHasStartedTasks(
const protocol::TaskId& taskId) {
queryContextCache_.wlock()->setHasStartedTasks(queryIdFromTaskId(taskId));
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
queryContextCache_.setTasksStarted(queryIdFromTaskId(taskId));
}

std::shared_ptr<core::QueryCtx> QueryContextManager::createAndCacheQueryCtx(
QueryContextCache& cache,
std::shared_ptr<core::QueryCtx>
QueryContextManager::createAndCacheQueryCtxLocked(
const QueryId& queryId,
velox::core::QueryConfig&& queryConfig,
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>&&
Expand All @@ -75,18 +170,17 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::createAndCacheQueryCtx(
std::move(pool),
spillerExecutor_,
queryId);
return cache.insert(queryId, std::move(queryCtx));
return queryContextCache_.insert(queryId, std::move(queryCtx));
}

std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtxLocked(
const TaskId& taskId,
velox::core::QueryConfig&& queryConfig,
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>&&
connectorConfigs) {
const QueryId queryId{queryIdFromTaskId(taskId)};

auto lockedCache = queryContextCache_.wlock();
if (auto queryCtx = lockedCache->get(queryId)) {
if (auto queryCtx = queryContextCache_.get(queryId)) {
return queryCtx;
}

Expand All @@ -111,8 +205,7 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
nullptr,
poolDbgOpts);

return createAndCacheQueryCtx(
*lockedCache,
return createAndCacheQueryCtxLocked(
queryId,
std::move(queryConfig),
std::move(connectorConfigs),
Expand All @@ -123,19 +216,20 @@ void QueryContextManager::visitAllContexts(
const std::function<
void(const protocol::QueryId&, const velox::core::QueryCtx*)>& visitor)
const {
auto lockedCache = queryContextCache_.rlock();
for (const auto& it : lockedCache->ctxs()) {
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
for (const auto& it : queryContextCache_.ctxMap()) {
if (const auto queryCtxSP = it.second.queryCtx.lock()) {
visitor(it.first, queryCtxSP.get());
}
}
}

void QueryContextManager::testingClearCache() {
queryContextCache_.wlock()->testingClear();
void QueryContextManager::clearCache() {
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
queryContextCache_.clear();
}

void QueryContextCache::testingClear() {
void QueryContextCache::clear() {
queryCtxs_.clear();
queryIds_.clear();
}
Expand Down
83 changes: 21 additions & 62 deletions presto-native-execution/presto_cpp/main/QueryContextManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <memory>
#include <mutex>
#include <unordered_map>

#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
Expand Down Expand Up @@ -44,77 +45,31 @@ class QueryContextCache {
return queryCtxs_.size();
}

std::shared_ptr<velox::core::QueryCtx> get(const protocol::QueryId& queryId) {
auto iter = queryCtxs_.find(queryId);
if (iter != queryCtxs_.end()) {
queryIds_.erase(iter->second.idListIterator);

if (auto queryCtx = iter->second.queryCtx.lock()) {
// Move the queryId to front, if queryCtx is still alive.
queryIds_.push_front(queryId);
iter->second.idListIterator = queryIds_.begin();
return queryCtx;
} else {
queryCtxs_.erase(iter);
}
}
return nullptr;
const QueryCtxMap& ctxMap() const {
return queryCtxs_;
}

std::shared_ptr<velox::core::QueryCtx> get(const protocol::QueryId& queryId);

std::shared_ptr<velox::core::QueryCtx> insert(
const protocol::QueryId& queryId,
std::shared_ptr<velox::core::QueryCtx> queryCtx) {
if (queryCtxs_.size() >= capacity_) {
evict();
}
queryIds_.push_front(queryId);
queryCtxs_[queryId] = {
folly::to_weak_ptr(queryCtx), queryIds_.begin(), false};
return queryCtx;
}
std::shared_ptr<velox::core::QueryCtx> queryCtx);

bool hasStartedTasks(const protocol::QueryId& queryId) const {
auto iter = queryCtxs_.find(queryId);
if (iter != queryCtxs_.end()) {
return iter->second.hasStartedTasks;
}
return false;
}
bool hasStartedTasks(const protocol::QueryId& queryId) const;

void setHasStartedTasks(const protocol::QueryId& queryId) {
auto iter = queryCtxs_.find(queryId);
if (iter != queryCtxs_.end()) {
iter->second.hasStartedTasks = true;
}
}
void setTasksStarted(const protocol::QueryId& queryId);

void evict() {
// Evict least recently used queryCtx if it is not referenced elsewhere.
for (auto victim = queryIds_.end(); victim != queryIds_.begin();) {
--victim;
if (!queryCtxs_[*victim].queryCtx.lock()) {
queryCtxs_.erase(*victim);
queryIds_.erase(victim);
return;
}
}

// All queries are still inflight. Increase capacity.
capacity_ = std::max(kInitialCapacity, capacity_ * 2);
}
const QueryCtxMap& ctxs() const {
return queryCtxs_;
}
void evict();

void testingClear();
void clear();

private:
static constexpr size_t kInitialCapacity = 256UL;

size_t capacity_;

QueryCtxMap queryCtxs_;
QueryIdList queryIds_;

static constexpr size_t kInitialCapacity = 256UL;
};

class QueryContextManager {
Expand All @@ -129,6 +84,10 @@ class QueryContextManager {
const protocol::TaskId& taskId,
const protocol::TaskUpdateRequest& taskUpdateRequest);

std::shared_ptr<velox::core::QueryCtx> findOrCreateBatchQueryCtx(
const protocol::TaskId& taskId,
const protocol::TaskUpdateRequest& taskUpdateRequest);

/// Returns true if the given task's query has at least one task started.
bool queryHasStartedTasks(const protocol::TaskId& taskId) const;

Expand All @@ -142,30 +101,30 @@ class QueryContextManager {
visitor) const;

/// Test method to clear the query context cache.
void testingClearCache();
void clearCache();

protected:
folly::Executor* const driverExecutor_{nullptr};
folly::Executor* const spillerExecutor_{nullptr};
QueryContextCache queryContextCache_;

private:
virtual std::shared_ptr<velox::core::QueryCtx> createAndCacheQueryCtx(
QueryContextCache& cache,
virtual std::shared_ptr<velox::core::QueryCtx> createAndCacheQueryCtxLocked(
const protocol::QueryId& queryId,
velox::core::QueryConfig&& queryConfig,
std::unordered_map<
std::string,
std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigs,
std::shared_ptr<velox::memory::MemoryPool>&& pool);

std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtxLocked(
const protocol::TaskId& taskId,
velox::core::QueryConfig&& queryConfig,
std::unordered_map<
std::string,
std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigStrings);

folly::Synchronized<QueryContextCache> queryContextCache_;
mutable std::mutex queryContextCacheMutex_;
};

} // namespace facebook::presto
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
}

auto queryCtx =
taskManager_.getQueryContextManager()->findOrCreateQueryCtx(
taskManager_.getQueryContextManager()->findOrCreateBatchQueryCtx(
taskId, updateRequest);

VeloxBatchQueryPlanConverter converter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ TEST_F(QueryContextCacheTest, hasStartedTasks) {
auto queryId = fmt::format("query-{}", i);
EXPECT_FALSE(queryContextCache.hasStartedTasks(queryId));
if (i % 2 == 0) {
queryContextCache.setHasStartedTasks(queryId);
queryContextCache.setTasksStarted(queryId);
}
}

Expand Down
Loading
Loading