diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index 886a70a58627e..1ff4e44a8030b 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -194,6 +194,8 @@ jobs: -Duser.timezone=America/Bahia_Banderas \ -T1C fi + - store_artifacts: + path: '/tmp/PrestoNativeQueryRunnerUtils' linux-spark-e2e-tests: executor: build diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp index be85fd46045e2..e1f0535f8db22 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp @@ -16,6 +16,7 @@ #include #include #include "presto_cpp/main/PrestoExchangeSource.h" +#include "presto_cpp/main/PrestoServer.h" #include "presto_cpp/main/TaskManager.h" #include "presto_cpp/main/common/Counters.h" #include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h" @@ -70,14 +71,18 @@ PeriodicTaskManager::PeriodicTaskManager( const velox::cache::AsyncDataCache* const asyncDataCache, const std::unordered_map< std::string, - std::shared_ptr>& connectors) + std::shared_ptr>& connectors, + PrestoServer* server, + size_t stuckDriverThresholdMs) : driverCPUExecutor_(driverCPUExecutor), httpExecutor_(httpExecutor), taskManager_(taskManager), memoryAllocator_(memoryAllocator), asyncDataCache_(asyncDataCache), arbitrator_(velox::memory::memoryManager()->arbitrator()), - connectors_(connectors) {} + connectors_(connectors), + server_(server), + stuckDriverThresholdMs_(stuckDriverThresholdMs) {} void PeriodicTaskManager::start() { // If executors are null, don't bother starting this task. @@ -117,13 +122,15 @@ void PeriodicTaskManager::start() { addArbitratorStatsTask(); } - // This should be the last call in this method. - scheduler_.start(); + addWatchdogTask(); + + onceRunner_.start(); } void PeriodicTaskManager::stop() { - scheduler_.cancelAllFunctionsAndWait(); - scheduler_.shutdown(); + onceRunner_.cancelAllFunctionsAndWait(); + onceRunner_.shutdown(); + repeatedRunner_.stop(); } void PeriodicTaskManager::updateExecutorStats() { @@ -412,79 +419,89 @@ void PeriodicTaskManager::addCacheStatsUpdateTask() { "cache_counters"); } +namespace { + +class HiveConnectorStatsReporter { + public: + explicit HiveConnectorStatsReporter( + std::shared_ptr connector) + : connector_(std::move(connector)), + numElementsMetricName_(fmt::format( + kCounterHiveFileHandleCacheNumElementsFormat, + connector_->connectorId())), + pinnedSizeMetricName_(fmt::format( + kCounterHiveFileHandleCachePinnedSizeFormat, + connector_->connectorId())), + curSizeMetricName_(fmt::format( + kCounterHiveFileHandleCacheCurSizeFormat, + connector_->connectorId())), + numAccumulativeHitsMetricName_(fmt::format( + kCounterHiveFileHandleCacheNumAccumulativeHitsFormat, + connector_->connectorId())), + numAccumulativeLookupsMetricName_(fmt::format( + kCounterHiveFileHandleCacheNumAccumulativeLookupsFormat, + connector_->connectorId())), + numHitsMetricName_(fmt::format( + kCounterHiveFileHandleCacheNumHitsFormat, + connector_->connectorId())), + numLookupsMetricName_(fmt::format( + kCounterHiveFileHandleCacheNumLookupsFormat, + connector_->connectorId())) { + DEFINE_METRIC(numElementsMetricName_, velox::StatType::AVG); + DEFINE_METRIC(pinnedSizeMetricName_, velox::StatType::AVG); + DEFINE_METRIC(curSizeMetricName_, velox::StatType::AVG); + DEFINE_METRIC(numAccumulativeHitsMetricName_, velox::StatType::AVG); + DEFINE_METRIC(numAccumulativeLookupsMetricName_, velox::StatType::AVG); + DEFINE_METRIC(numHitsMetricName_, velox::StatType::AVG); + DEFINE_METRIC(numLookupsMetricName_, velox::StatType::AVG); + } + + void report() { + auto stats = connector_->fileHandleCacheStats(); + RECORD_METRIC_VALUE(numElementsMetricName_, stats.numElements); + RECORD_METRIC_VALUE(pinnedSizeMetricName_, stats.pinnedSize); + RECORD_METRIC_VALUE(curSizeMetricName_, stats.curSize); + RECORD_METRIC_VALUE(numAccumulativeHitsMetricName_, stats.numHits); + RECORD_METRIC_VALUE(numAccumulativeLookupsMetricName_, stats.numLookups); + RECORD_METRIC_VALUE(numHitsMetricName_, stats.numHits - oldNumHits_); + oldNumHits_ = stats.numHits; + RECORD_METRIC_VALUE( + numLookupsMetricName_, stats.numLookups - oldNumLookups_); + oldNumLookups_ = stats.numLookups; + } + + private: + const std::shared_ptr connector_; + const std::string numElementsMetricName_; + const std::string pinnedSizeMetricName_; + const std::string curSizeMetricName_; + const std::string numAccumulativeHitsMetricName_; + const std::string numAccumulativeLookupsMetricName_; + const std::string numHitsMetricName_; + const std::string numLookupsMetricName_; + size_t oldNumHits_{0}; + size_t oldNumLookups_{0}; +}; + +} // namespace + void PeriodicTaskManager::addConnectorStatsTask() { + std::vector reporters; for (const auto& itr : connectors_) { - static std::unordered_map oldValues; - // Export HiveConnector stats if (auto hiveConnector = std::dynamic_pointer_cast( itr.second)) { - auto connectorId = hiveConnector->connectorId(); - const auto kNumElementsMetricName = fmt::format( - kCounterHiveFileHandleCacheNumElementsFormat, connectorId); - const auto kPinnedSizeMetricName = - fmt::format(kCounterHiveFileHandleCachePinnedSizeFormat, connectorId); - const auto kCurSizeMetricName = - fmt::format(kCounterHiveFileHandleCacheCurSizeFormat, connectorId); - const auto kNumAccumulativeHitsMetricName = fmt::format( - kCounterHiveFileHandleCacheNumAccumulativeHitsFormat, connectorId); - const auto kNumAccumulativeLookupsMetricName = fmt::format( - kCounterHiveFileHandleCacheNumAccumulativeLookupsFormat, connectorId); - - const auto kNumHitsMetricName = - fmt::format(kCounterHiveFileHandleCacheNumHitsFormat, connectorId); - oldValues[kNumHitsMetricName] = 0; - const auto kNumLookupsMetricName = - fmt::format(kCounterHiveFileHandleCacheNumLookupsFormat, connectorId); - oldValues[kNumLookupsMetricName] = 0; - - // Exporting metrics types here since the metrics key is dynamic - DEFINE_METRIC(kNumElementsMetricName, facebook::velox::StatType::AVG); - DEFINE_METRIC(kPinnedSizeMetricName, facebook::velox::StatType::AVG); - DEFINE_METRIC(kCurSizeMetricName, facebook::velox::StatType::AVG); - DEFINE_METRIC( - kNumAccumulativeHitsMetricName, facebook::velox::StatType::AVG); - DEFINE_METRIC( - kNumAccumulativeLookupsMetricName, facebook::velox::StatType::AVG); - DEFINE_METRIC(kNumHitsMetricName, facebook::velox::StatType::AVG); - DEFINE_METRIC(kNumLookupsMetricName, facebook::velox::StatType::AVG); - - addTask( - [hiveConnector, - connectorId, - kNumElementsMetricName, - kPinnedSizeMetricName, - kCurSizeMetricName, - kNumAccumulativeHitsMetricName, - kNumAccumulativeLookupsMetricName, - kNumHitsMetricName, - kNumLookupsMetricName]() { - auto fileHandleCacheStats = hiveConnector->fileHandleCacheStats(); - RECORD_METRIC_VALUE( - kNumElementsMetricName, fileHandleCacheStats.numElements); - RECORD_METRIC_VALUE( - kPinnedSizeMetricName, fileHandleCacheStats.pinnedSize); - RECORD_METRIC_VALUE( - kCurSizeMetricName, fileHandleCacheStats.curSize); - RECORD_METRIC_VALUE( - kNumAccumulativeHitsMetricName, fileHandleCacheStats.numHits); - RECORD_METRIC_VALUE( - kNumAccumulativeLookupsMetricName, - fileHandleCacheStats.numLookups); - RECORD_METRIC_VALUE( - kNumHitsMetricName, - fileHandleCacheStats.numHits - oldValues[kNumHitsMetricName]); - oldValues[kNumHitsMetricName] = fileHandleCacheStats.numHits; - RECORD_METRIC_VALUE( - kNumLookupsMetricName, - fileHandleCacheStats.numLookups - - oldValues[kNumLookupsMetricName]); - oldValues[kNumLookupsMetricName] = fileHandleCacheStats.numLookups; - }, - kConnectorPeriodGlobalCounters, - fmt::format("{}.hive_connector_counters", connectorId)); + reporters.emplace_back(std::move(hiveConnector)); } } + addTask( + [reporters = std::move(reporters)]() mutable { + for (auto& reporter : reporters) { + reporter.report(); + } + }, + kConnectorPeriodGlobalCounters, + "ConnectorStats"); } void PeriodicTaskManager::updateOperatingSystemStats() { @@ -643,4 +660,48 @@ void PeriodicTaskManager::addHttpEndpointLatencyStatsTask() { kHttpEndpointLatencyPeriodGlobalCounters, "http_endpoint_counters"); } + +void PeriodicTaskManager::addWatchdogTask() { + addTask( + [this, + deadlockedTasks = std::vector(), + opCalls = std::vector()]() mutable { + deadlockedTasks.clear(); + opCalls.clear(); + if (!taskManager_->getLongRunningOpCalls( + stuckDriverThresholdMs_, deadlockedTasks, opCalls)) { + LOG(ERROR) + << "Cannot take lock on task manager, likely starving or deadlocked"; + RECORD_METRIC_VALUE(kCounterNumTasksDeadlock, 1); + detachWorker(); + return; + } + for (auto& taskId : deadlockedTasks) { + LOG(ERROR) << "Starving or deadlocked task: " << taskId; + } + RECORD_METRIC_VALUE(kCounterNumTasksDeadlock, deadlockedTasks.size()); + for (auto& call : opCalls) { + LOG(ERROR) << "Stuck operator: tid=" << call.tid + << " taskId=" << call.taskId << " opId=" << call.opId; + } + RECORD_METRIC_VALUE(kCounterNumStuckDrivers, opCalls.size()); + if (!deadlockedTasks.empty() || !opCalls.empty()) { + detachWorker(); + } + }, + 60'000'000, // 60 seconds + "Watchdog"); +} + +void PeriodicTaskManager::detachWorker() { + LOG(ERROR) << velox::process::TraceContext::statusLine(); + if (server_ && server_->nodeState() == NodeState::kActive) { + // Benefit of shutting down is that the queries that aren't stuck yet will + // be finished. While stopping announcement would kill them. + LOG(ERROR) + << "Changing node status to SHUTTING_DOWN due to detected stuck drivers"; + server_->setNodeState(NodeState::kShuttingDown); + } +} + } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h index a8ce6f81540c5..6299106729b9b 100644 --- a/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h +++ b/presto-native-execution/presto_cpp/main/PeriodicTaskManager.h @@ -14,6 +14,7 @@ #pragma once #include +#include #include "velox/common/memory/Memory.h" #include "velox/exec/Spill.h" @@ -33,6 +34,7 @@ class AsyncDataCache; namespace facebook::presto { class TaskManager; +class PrestoServer; /// Manages a set of periodic tasks via folly::FunctionScheduler. /// This is a place to add a new task or add more functionality to an existing @@ -47,7 +49,9 @@ class PeriodicTaskManager { const velox::cache::AsyncDataCache* const asyncDataCache, const std::unordered_map< std::string, - std::shared_ptr>& connectors); + std::shared_ptr>& connectors, + PrestoServer* server, + size_t stuckDriverThresholdMs); ~PeriodicTaskManager() { stop(); @@ -64,8 +68,19 @@ class PeriodicTaskManager { /// Add a task to run periodically. template void addTask(TFunc&& func, size_t periodMicros, const std::string& taskName) { - scheduler_.addFunction( - func, std::chrono::microseconds{periodMicros}, taskName); + repeatedRunner_.add( + taskName, + [taskName, + periodMicros, + func = std::forward(func)]() mutable noexcept { + try { + func(); + } catch (const std::exception& e) { + LOG(ERROR) << "Error running periodic task " << taskName << ": " + << e.what(); + } + return std::chrono::milliseconds(periodMicros / 1000); + }); } /// Add a task to run once. Before adding, cancels the any task that has same @@ -73,9 +88,11 @@ class PeriodicTaskManager { template void addTaskOnce(TFunc&& func, size_t periodMicros, const std::string& taskName) { - scheduler_.cancelFunction(taskName); - scheduler_.addFunctionOnce( - func, taskName, std::chrono::microseconds{periodMicros}); + onceRunner_.cancelFunction(taskName); + onceRunner_.addFunctionOnce( + std::forward(func), + taskName, + std::chrono::microseconds{periodMicros}); } /// Stops all periodic tasks. Returns only when everything is stopped. @@ -115,7 +132,10 @@ class PeriodicTaskManager { void addArbitratorStatsTask(); void updateArbitratorStatsTask(); - folly::FunctionScheduler scheduler_; + void addWatchdogTask(); + + void detachWorker(); + folly::CPUThreadPoolExecutor* const driverCPUExecutor_; folly::IOThreadPoolExecutor* const httpExecutor_; TaskManager* const taskManager_; @@ -125,6 +145,8 @@ class PeriodicTaskManager { const std::unordered_map< std::string, std::shared_ptr>& connectors_; + PrestoServer* const server_; + const size_t stuckDriverThresholdMs_; // Cache related stats int64_t lastMemoryCacheHits_{0}; @@ -146,6 +168,10 @@ class PeriodicTaskManager { // Renabled this after update velox. velox::common::SpillStats lastSpillStats_; velox::memory::MemoryArbitrator::Stats lastArbitratorStats_; + + // CAUTION: Declare last since the threads access other members of `this`. + folly::FunctionScheduler onceRunner_; + folly::ThreadedRepeatingFunctionRunner repeatedRunner_; }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 3763036bca57d..a43ba6145f27b 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -481,7 +481,9 @@ void PrestoServer::run() { taskManager_.get(), memoryAllocator, asyncDataCache, - velox::connector::getAllConnectors()); + velox::connector::getAllConnectors(), + this, + systemConfig->driverStuckOperatorThresholdMs()); addServerPeriodicTasks(); addAdditionalPeriodicTasks(); periodicTaskManager_->start(); diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 7481a77db0055..ea811511d32e4 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -1041,6 +1041,25 @@ DriverCountStats TaskManager::getDriverCountStats() const { return driverCountStats; } +bool TaskManager::getLongRunningOpCalls( + size_t thresholdDurationMs, + std::vector& deadlockedTasks, + std::vector& opCalls) const { + std::chrono::milliseconds lockTimeout(thresholdDurationMs); + auto taskMap = taskMap_.rlock(lockTimeout); + if (!taskMap) { + return false; + } + for (const auto& [_, prestoTask] : *taskMap) { + if (prestoTask->task != nullptr && + !prestoTask->task->getLongRunningOpCalls( + lockTimeout, thresholdDurationMs, opCalls)) { + deadlockedTasks.push_back(prestoTask->task->taskId()); + } + } + return true; +} + int32_t TaskManager::yieldTasks( int32_t numTargetThreadsToYield, int32_t timeSliceMicros) { diff --git a/presto-native-execution/presto_cpp/main/TaskManager.h b/presto-native-execution/presto_cpp/main/TaskManager.h index c47b8a58634cc..c440c2f15cfd2 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.h +++ b/presto-native-execution/presto_cpp/main/TaskManager.h @@ -146,6 +146,14 @@ class TaskManager { // in exec/Task.h). std::array getTaskNumbers(size_t& numTasks) const; + /// Populate the blocked tasks (failing to take lock on the mutex), and long + /// running operator calls across all drivers in all tasks. Returns false if + /// a lock on the taskMap cannot be taken, otherwise returns true. + bool getLongRunningOpCalls( + size_t thresholdDurationMs, + std::vector& deadlockedTasks, + std::vector& opCalls) const; + /// Build directory path for spilling for the given task. /// Always returns non-empty string. static std::string buildTaskSpillDirectoryPath( diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index d42cb4c6b05b5..1b10a4c03de9c 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -156,6 +156,7 @@ SystemConfig::SystemConfig() { NUM_PROP(kExchangeHttpClientNumIoThreadsHwMultiplier, 1.0), NUM_PROP(kConnectorNumIoThreadsHwMultiplier, 1.0), NUM_PROP(kDriverNumCpuThreadsHwMultiplier, 4.0), + NUM_PROP(kDriverStuckOperatorThresholdMs, 30 * 60 * 1000), NUM_PROP(kSpillerNumCpuThreadsHwMultiplier, 1.0), STR_PROP(kSpillerFileCreateConfig, ""), NONE_PROP(kSpillerSpillPath), @@ -346,6 +347,10 @@ double SystemConfig::driverNumCpuThreadsHwMultiplier() const { return optionalProperty(kDriverNumCpuThreadsHwMultiplier).value(); } +size_t SystemConfig::driverStuckOperatorThresholdMs() const { + return optionalProperty(kDriverStuckOperatorThresholdMs).value(); +} + double SystemConfig::spillerNumCpuThreadsHwMultiplier() const { return optionalProperty(kSpillerNumCpuThreadsHwMultiplier).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index ade512e12ceb3..74f6bbd93148c 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -219,6 +219,13 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kDriverNumCpuThreadsHwMultiplier{ "driver.num-cpu-threads-hw-multiplier"}; + /// Time duration threshold used to detect if an operator call in driver is + /// stuck or not. If any of the driver thread is detected as stuck by this + /// standard, we take the worker offline and further investigation on the + /// worker is required. + static constexpr std::string_view kDriverStuckOperatorThresholdMs{ + "driver.stuck-operator-threshold-ms"}; + /// Floating point number used in calculating how many threads we would use /// for Spiller CPU executor: hw_concurrency x multiplier. /// If 0.0 then spilling is disabled. @@ -551,6 +558,8 @@ class SystemConfig : public ConfigBase { double driverNumCpuThreadsHwMultiplier() const; + size_t driverStuckOperatorThresholdMs() const; + double spillerNumCpuThreadsHwMultiplier() const; std::string spillerFileCreateConfig() const; diff --git a/presto-native-execution/presto_cpp/main/common/Counters.cpp b/presto-native-execution/presto_cpp/main/common/Counters.cpp index 8495c21ba1b33..594d5f1a447d6 100644 --- a/presto-native-execution/presto_cpp/main/common/Counters.cpp +++ b/presto-native-execution/presto_cpp/main/common/Counters.cpp @@ -60,8 +60,10 @@ void registerPrestoMetrics() { DEFINE_METRIC(kCounterNumZombiePrestoTasks, facebook::velox::StatType::AVG); DEFINE_METRIC( kCounterNumTasksWithStuckOperator, facebook::velox::StatType::AVG); + DEFINE_METRIC(kCounterNumTasksDeadlock, facebook::velox::StatType::AVG); DEFINE_METRIC(kCounterNumRunningDrivers, facebook::velox::StatType::AVG); DEFINE_METRIC(kCounterNumBlockedDrivers, facebook::velox::StatType::AVG); + DEFINE_METRIC(kCounterNumStuckDrivers, facebook::velox::StatType::AVG); DEFINE_METRIC(kCounterMappedMemoryBytes, facebook::velox::StatType::AVG); DEFINE_METRIC(kCounterAllocatedMemoryBytes, facebook::velox::StatType::AVG); DEFINE_METRIC(kCounterMmapRawAllocBytesSmall, facebook::velox::StatType::AVG); diff --git a/presto-native-execution/presto_cpp/main/common/Counters.h b/presto-native-execution/presto_cpp/main/common/Counters.h index ecfbdb8561120..f51f537e7f601 100644 --- a/presto-native-execution/presto_cpp/main/common/Counters.h +++ b/presto-native-execution/presto_cpp/main/common/Counters.h @@ -68,10 +68,14 @@ constexpr folly::StringPiece kCounterNumZombiePrestoTasks{ "presto_cpp.num_zombie_presto_tasks"}; constexpr folly::StringPiece kCounterNumTasksWithStuckOperator{ "presto_cpp.num_tasks_with_stuck_operator"}; +constexpr folly::StringPiece kCounterNumTasksDeadlock{ + "presto_cpp.num_tasks_deadlock"}; constexpr folly::StringPiece kCounterNumRunningDrivers{ "presto_cpp.num_running_drivers"}; constexpr folly::StringPiece kCounterNumBlockedDrivers{ "presto_cpp.num_blocked_drivers"}; +constexpr folly::StringPiece kCounterNumStuckDrivers{ + "presto_cpp.num_stuck_drivers"}; /// Number of total OutputBuffer managed by all /// OutputBufferManager diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index bc09c80d3e4ba..9b34af08bb5a4 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -173,7 +173,9 @@ public static QueryRunner createNativeQueryRunner( Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)), Optional.of((workerIndex, discoveryUri) -> { try { - Path tempDirectoryPath = Files.createTempDirectory(PrestoNativeQueryRunnerUtils.class.getSimpleName()); + Path dir = Paths.get("/tmp", PrestoNativeQueryRunnerUtils.class.getSimpleName()); + Files.createDirectories(dir); + Path tempDirectoryPath = Files.createTempDirectory(dir, "worker"); log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString()); int port = 1234 + workerIndex;