Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,15 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
}

velox::core::QueryConfig queryConfig{std::move(configStrings)};
// NOTE: the monotonically increasing 'poolId' is appended to 'queryId' to
// ensure that the name of root memory pool instance is always unique. In some
// edge case, we found some background activities such as the long-running
// memory arbitration process will still hold the query root memory pool even
// though the query ctx has been evicted out of the cache. The query ctx cache
// is still indexed by the query id.
static std::atomic_uint64_t poolId{0};
auto pool = memory::defaultMemoryManager().addRootPool(
queryId,
fmt::format("{}_{}", queryId, poolId++),
queryConfig.queryMaxMemoryPerNode() != 0
? queryConfig.queryMaxMemoryPerNode()
: SystemConfig::instance()->queryMaxMemoryPerNode(),
Expand Down Expand Up @@ -163,4 +170,13 @@ void QueryContextManager::visitAllContexts(
}
}

void QueryContextManager::testingClearCache() {
queryContextCache_.wlock()->testingClear();
}

void QueryContextCache::testingClear() {
queryCtxs_.clear();
queryIds_.clear();
}

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class QueryContextCache {
return queryCtxs_;
}

void testingClear();

private:
size_t capacity_;

Expand All @@ -106,11 +108,14 @@ class QueryContextManager {
const protocol::TaskId& taskId,
const protocol::SessionRepresentation& session);

// Calls the given functor for every present query context.
/// Calls the given functor for every present query context.
void visitAllContexts(std::function<void(
const protocol::QueryId&,
const velox::core::QueryCtx*)> visitor) const;

/// Test method to clear the query context cache.
void testingClearCache();

private:
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
const protocol::TaskId& taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* limitations under the License.
*/
#include "presto_cpp/main/QueryContextManager.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "presto_cpp/main/TaskManager.h"

Expand Down Expand Up @@ -68,4 +69,50 @@ TEST_F(QueryContextManagerTest, defaultSessionProperties) {
EXPECT_EQ(queryCtx->queryConfig().spillWriteBufferSize(), 1L << 20);
}

TEST_F(QueryContextManagerTest, duplicateQueryRootPoolName) {
const protocol::TaskId fakeTaskId = "scan.0.0.1.0";
const protocol::SessionRepresentation fakeSession{.systemProperties = {}};
auto* queryCtxManager = taskManager_->getQueryContextManager();
struct {
bool hasPendingReference;
bool clearCache;
bool expectedNewPoolName;

std::string debugString() const {
return fmt::format(
"hasPendingReference: {}, clearCache: {}, expectedNewPoolName: {}",
hasPendingReference,
clearCache,
expectedNewPoolName);
}
} testSettings[] = {
{true, true, true},
{true, false, false},
{false, true, true},
{false, false, true}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
queryCtxManager->testingClearCache();

auto queryCtx =
queryCtxManager->findOrCreateQueryCtx(fakeTaskId, fakeSession);
const auto poolName = queryCtx->pool()->name();
ASSERT_THAT(poolName, testing::HasSubstr("scan_"));
if (!testData.hasPendingReference) {
queryCtx.reset();
}
if (testData.clearCache) {
queryCtxManager->testingClearCache();
}
auto newQueryCtx =
queryCtxManager->findOrCreateQueryCtx(fakeTaskId, fakeSession);
const auto newPoolName = newQueryCtx->pool()->name();
ASSERT_THAT(newPoolName, testing::HasSubstr("scan_"));
if (testData.expectedNewPoolName) {
ASSERT_NE(poolName, newPoolName);
} else {
ASSERT_EQ(poolName, newPoolName);
}
}
}
} // namespace facebook::presto