Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ jobs:
-Duser.timezone=America/Bahia_Banderas \
-T1C
fi
- store_artifacts:
path: '/tmp/PrestoNativeQueryRunnerUtils'

linux-spark-e2e-tests:
executor: build
Expand Down
205 changes: 133 additions & 72 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/stop_watch.h>
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/TaskManager.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand Down Expand Up @@ -70,14 +71,18 @@ PeriodicTaskManager::PeriodicTaskManager(
const velox::cache::AsyncDataCache* const asyncDataCache,
const std::unordered_map<
std::string,
std::shared_ptr<velox::connector::Connector>>& connectors)
std::shared_ptr<velox::connector::Connector>>& connectors,
PrestoServer* server,
size_t stuckDriverThresholdMs)
: driverCPUExecutor_(driverCPUExecutor),
httpExecutor_(httpExecutor),
taskManager_(taskManager),
memoryAllocator_(memoryAllocator),
asyncDataCache_(asyncDataCache),
arbitrator_(velox::memory::memoryManager()->arbitrator()),
connectors_(connectors) {}
connectors_(connectors),
server_(server),
stuckDriverThresholdMs_(stuckDriverThresholdMs) {}

void PeriodicTaskManager::start() {
// If executors are null, don't bother starting this task.
Expand Down Expand Up @@ -117,13 +122,15 @@ void PeriodicTaskManager::start() {
addArbitratorStatsTask();
}

// This should be the last call in this method.
scheduler_.start();
addWatchdogTask();

onceRunner_.start();
}

void PeriodicTaskManager::stop() {
scheduler_.cancelAllFunctionsAndWait();
scheduler_.shutdown();
onceRunner_.cancelAllFunctionsAndWait();
onceRunner_.shutdown();
repeatedRunner_.stop();
}

void PeriodicTaskManager::updateExecutorStats() {
Expand Down Expand Up @@ -412,79 +419,89 @@ void PeriodicTaskManager::addCacheStatsUpdateTask() {
"cache_counters");
}

namespace {

class HiveConnectorStatsReporter {
public:
explicit HiveConnectorStatsReporter(
std::shared_ptr<velox::connector::hive::HiveConnector> connector)
: connector_(std::move(connector)),
numElementsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumElementsFormat,
connector_->connectorId())),
pinnedSizeMetricName_(fmt::format(
kCounterHiveFileHandleCachePinnedSizeFormat,
connector_->connectorId())),
curSizeMetricName_(fmt::format(
kCounterHiveFileHandleCacheCurSizeFormat,
connector_->connectorId())),
numAccumulativeHitsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeHitsFormat,
connector_->connectorId())),
numAccumulativeLookupsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeLookupsFormat,
connector_->connectorId())),
numHitsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumHitsFormat,
connector_->connectorId())),
numLookupsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumLookupsFormat,
connector_->connectorId())) {
DEFINE_METRIC(numElementsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(pinnedSizeMetricName_, velox::StatType::AVG);
DEFINE_METRIC(curSizeMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numAccumulativeHitsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numAccumulativeLookupsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numHitsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numLookupsMetricName_, velox::StatType::AVG);
}

void report() {
auto stats = connector_->fileHandleCacheStats();
RECORD_METRIC_VALUE(numElementsMetricName_, stats.numElements);
RECORD_METRIC_VALUE(pinnedSizeMetricName_, stats.pinnedSize);
RECORD_METRIC_VALUE(curSizeMetricName_, stats.curSize);
RECORD_METRIC_VALUE(numAccumulativeHitsMetricName_, stats.numHits);
RECORD_METRIC_VALUE(numAccumulativeLookupsMetricName_, stats.numLookups);
RECORD_METRIC_VALUE(numHitsMetricName_, stats.numHits - oldNumHits_);
oldNumHits_ = stats.numHits;
RECORD_METRIC_VALUE(
numLookupsMetricName_, stats.numLookups - oldNumLookups_);
oldNumLookups_ = stats.numLookups;
}

private:
const std::shared_ptr<velox::connector::hive::HiveConnector> connector_;
const std::string numElementsMetricName_;
const std::string pinnedSizeMetricName_;
const std::string curSizeMetricName_;
const std::string numAccumulativeHitsMetricName_;
const std::string numAccumulativeLookupsMetricName_;
const std::string numHitsMetricName_;
const std::string numLookupsMetricName_;
size_t oldNumHits_{0};
size_t oldNumLookups_{0};
};

} // namespace

void PeriodicTaskManager::addConnectorStatsTask() {
std::vector<HiveConnectorStatsReporter> reporters;
for (const auto& itr : connectors_) {
static std::unordered_map<std::string, int64_t> oldValues;
// Export HiveConnector stats
if (auto hiveConnector =
std::dynamic_pointer_cast<velox::connector::hive::HiveConnector>(
itr.second)) {
auto connectorId = hiveConnector->connectorId();
const auto kNumElementsMetricName = fmt::format(
kCounterHiveFileHandleCacheNumElementsFormat, connectorId);
const auto kPinnedSizeMetricName =
fmt::format(kCounterHiveFileHandleCachePinnedSizeFormat, connectorId);
const auto kCurSizeMetricName =
fmt::format(kCounterHiveFileHandleCacheCurSizeFormat, connectorId);
const auto kNumAccumulativeHitsMetricName = fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeHitsFormat, connectorId);
const auto kNumAccumulativeLookupsMetricName = fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeLookupsFormat, connectorId);

const auto kNumHitsMetricName =
fmt::format(kCounterHiveFileHandleCacheNumHitsFormat, connectorId);
oldValues[kNumHitsMetricName] = 0;
const auto kNumLookupsMetricName =
fmt::format(kCounterHiveFileHandleCacheNumLookupsFormat, connectorId);
oldValues[kNumLookupsMetricName] = 0;

// Exporting metrics types here since the metrics key is dynamic
DEFINE_METRIC(kNumElementsMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kPinnedSizeMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kCurSizeMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(
kNumAccumulativeHitsMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(
kNumAccumulativeLookupsMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kNumHitsMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kNumLookupsMetricName, facebook::velox::StatType::AVG);

addTask(
[hiveConnector,
connectorId,
kNumElementsMetricName,
kPinnedSizeMetricName,
kCurSizeMetricName,
kNumAccumulativeHitsMetricName,
kNumAccumulativeLookupsMetricName,
kNumHitsMetricName,
kNumLookupsMetricName]() {
auto fileHandleCacheStats = hiveConnector->fileHandleCacheStats();
RECORD_METRIC_VALUE(
kNumElementsMetricName, fileHandleCacheStats.numElements);
RECORD_METRIC_VALUE(
kPinnedSizeMetricName, fileHandleCacheStats.pinnedSize);
RECORD_METRIC_VALUE(
kCurSizeMetricName, fileHandleCacheStats.curSize);
RECORD_METRIC_VALUE(
kNumAccumulativeHitsMetricName, fileHandleCacheStats.numHits);
RECORD_METRIC_VALUE(
kNumAccumulativeLookupsMetricName,
fileHandleCacheStats.numLookups);
RECORD_METRIC_VALUE(
kNumHitsMetricName,
fileHandleCacheStats.numHits - oldValues[kNumHitsMetricName]);
oldValues[kNumHitsMetricName] = fileHandleCacheStats.numHits;
RECORD_METRIC_VALUE(
kNumLookupsMetricName,
fileHandleCacheStats.numLookups -
oldValues[kNumLookupsMetricName]);
oldValues[kNumLookupsMetricName] = fileHandleCacheStats.numLookups;
},
kConnectorPeriodGlobalCounters,
fmt::format("{}.hive_connector_counters", connectorId));
reporters.emplace_back(std::move(hiveConnector));
}
}
addTask(
[reporters = std::move(reporters)]() mutable {
for (auto& reporter : reporters) {
reporter.report();
}
},
kConnectorPeriodGlobalCounters,
"ConnectorStats");
}

void PeriodicTaskManager::updateOperatingSystemStats() {
Expand Down Expand Up @@ -643,4 +660,48 @@ void PeriodicTaskManager::addHttpEndpointLatencyStatsTask() {
kHttpEndpointLatencyPeriodGlobalCounters,
"http_endpoint_counters");
}

void PeriodicTaskManager::addWatchdogTask() {
addTask(
[this,
deadlockedTasks = std::vector<std::string>(),
opCalls = std::vector<velox::exec::Task::OpCallInfo>()]() mutable {
deadlockedTasks.clear();
opCalls.clear();
if (!taskManager_->getLongRunningOpCalls(
stuckDriverThresholdMs_, deadlockedTasks, opCalls)) {
LOG(ERROR)
<< "Cannot take lock on task manager, likely starving or deadlocked";
RECORD_METRIC_VALUE(kCounterNumTasksDeadlock, 1);
detachWorker();
return;
}
for (auto& taskId : deadlockedTasks) {
LOG(ERROR) << "Starving or deadlocked task: " << taskId;
}
RECORD_METRIC_VALUE(kCounterNumTasksDeadlock, deadlockedTasks.size());
for (auto& call : opCalls) {
LOG(ERROR) << "Stuck operator: tid=" << call.tid
<< " taskId=" << call.taskId << " opId=" << call.opId;
}
RECORD_METRIC_VALUE(kCounterNumStuckDrivers, opCalls.size());
if (!deadlockedTasks.empty() || !opCalls.empty()) {
detachWorker();
}
},
60'000'000, // 60 seconds
"Watchdog");
}

void PeriodicTaskManager::detachWorker() {
LOG(ERROR) << velox::process::TraceContext::statusLine();
if (server_ && server_->nodeState() == NodeState::kActive) {
// Benefit of shutting down is that the queries that aren't stuck yet will
// be finished. While stopping announcement would kill them.
LOG(ERROR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we discussed internally. But shutdown state is specifically when the server gets a SIGTERM. This is breaking the semantics of the NodeState and can confuse coordinator with wrong state. Other options can be marking worker unhealthy or disable announcement. Did we observer some queries were ending successfully when worker got stuck?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amitkdutta

We didn't have enough data to see observe if some queries would finish successfully as we only seen this phenomenon 5 times exactly and late enough that everything was blocked.
However, I don't see why this might not be the case.

Since INACTIVE has that special weird meaning/limitation, I actually start seeing the SHUTTING_DOWN state as a good candidate to put the worker into to isolate the whole cluster from the problem.
I think about it like this:

  1. We detected a service-breaking issue.
  2. Our first reflex was 'restart' and continue.
  3. But then we thought: what about debugging? And what about some queries that can still finish successfully?

So putting ourselves in the SHUTTING_DOWN state and waiting for an engineer to debug looks very logical in this light.

<< "Changing node status to SHUTTING_DOWN due to detected stuck drivers";
server_->setNodeState(NodeState::kShuttingDown);
}
}

} // namespace facebook::presto
40 changes: 33 additions & 7 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#pragma once

#include <folly/experimental/FunctionScheduler.h>
#include <folly/experimental/ThreadedRepeatingFunctionRunner.h>
#include "velox/common/memory/Memory.h"
#include "velox/exec/Spill.h"

Expand All @@ -33,6 +34,7 @@ class AsyncDataCache;
namespace facebook::presto {

class TaskManager;
class PrestoServer;

/// Manages a set of periodic tasks via folly::FunctionScheduler.
/// This is a place to add a new task or add more functionality to an existing
Expand All @@ -47,7 +49,9 @@ class PeriodicTaskManager {
const velox::cache::AsyncDataCache* const asyncDataCache,
const std::unordered_map<
std::string,
std::shared_ptr<velox::connector::Connector>>& connectors);
std::shared_ptr<velox::connector::Connector>>& connectors,
PrestoServer* server,
size_t stuckDriverThresholdMs);

~PeriodicTaskManager() {
stop();
Expand All @@ -64,18 +68,31 @@ class PeriodicTaskManager {
/// Add a task to run periodically.
template <typename TFunc>
void addTask(TFunc&& func, size_t periodMicros, const std::string& taskName) {
scheduler_.addFunction(
func, std::chrono::microseconds{periodMicros}, taskName);
repeatedRunner_.add(
taskName,
[taskName,
periodMicros,
func = std::forward<TFunc>(func)]() mutable noexcept {
try {
func();
} catch (const std::exception& e) {
LOG(ERROR) << "Error running periodic task " << taskName << ": "
<< e.what();
}
return std::chrono::milliseconds(periodMicros / 1000);
});
}

/// Add a task to run once. Before adding, cancels the any task that has same
/// name.
template <typename TFunc>
void
addTaskOnce(TFunc&& func, size_t periodMicros, const std::string& taskName) {
scheduler_.cancelFunction(taskName);
scheduler_.addFunctionOnce(
func, taskName, std::chrono::microseconds{periodMicros});
onceRunner_.cancelFunction(taskName);
onceRunner_.addFunctionOnce(
std::forward<TFunc>(func),
taskName,
std::chrono::microseconds{periodMicros});
}

/// Stops all periodic tasks. Returns only when everything is stopped.
Expand Down Expand Up @@ -115,7 +132,10 @@ class PeriodicTaskManager {
void addArbitratorStatsTask();
void updateArbitratorStatsTask();

folly::FunctionScheduler scheduler_;
void addWatchdogTask();

void detachWorker();

folly::CPUThreadPoolExecutor* const driverCPUExecutor_;
folly::IOThreadPoolExecutor* const httpExecutor_;
TaskManager* const taskManager_;
Expand All @@ -125,6 +145,8 @@ class PeriodicTaskManager {
const std::unordered_map<
std::string,
std::shared_ptr<velox::connector::Connector>>& connectors_;
PrestoServer* const server_;
const size_t stuckDriverThresholdMs_;

// Cache related stats
int64_t lastMemoryCacheHits_{0};
Expand All @@ -146,6 +168,10 @@ class PeriodicTaskManager {
// Renabled this after update velox.
velox::common::SpillStats lastSpillStats_;
velox::memory::MemoryArbitrator::Stats lastArbitratorStats_;

// CAUTION: Declare last since the threads access other members of `this`.
folly::FunctionScheduler onceRunner_;
folly::ThreadedRepeatingFunctionRunner repeatedRunner_;
};

} // namespace facebook::presto
4 changes: 3 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ void PrestoServer::run() {
taskManager_.get(),
memoryAllocator,
asyncDataCache,
velox::connector::getAllConnectors());
velox::connector::getAllConnectors(),
this,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we provide Presto server here, then we do need to provide cache, driver executor? We can get those members from Presto server object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not exposed as public. Do we want to expose them?

systemConfig->driverStuckOperatorThresholdMs());
addServerPeriodicTasks();
addAdditionalPeriodicTasks();
periodicTaskManager_->start();
Expand Down
19 changes: 19 additions & 0 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,25 @@ DriverCountStats TaskManager::getDriverCountStats() const {
return driverCountStats;
}

bool TaskManager::getLongRunningOpCalls(
size_t thresholdDurationMs,
std::vector<std::string>& deadlockedTasks,
std::vector<velox::exec::Task::OpCallInfo>& opCalls) const {
std::chrono::milliseconds lockTimeout(thresholdDurationMs);
auto taskMap = taskMap_.rlock(lockTimeout);
if (!taskMap) {
return false;
}
for (const auto& [_, prestoTask] : *taskMap) {
if (prestoTask->task != nullptr &&
!prestoTask->task->getLongRunningOpCalls(
lockTimeout, thresholdDurationMs, opCalls)) {
deadlockedTasks.push_back(prestoTask->task->taskId());
}
}
return true;
}

int32_t TaskManager::yieldTasks(
int32_t numTargetThreadsToYield,
int32_t timeSliceMicros) {
Expand Down
Loading