Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions presto-docs/src/main/sphinx/develop/worker-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ along with the results. Then, the client uses that sequence number to request
the next chunk of results. The client keeps fetching results until it receives
``X-Presto-Buffer-Complete`` HTTP header with the value of "true".

If the worker times out populating a response, or the task has already failed
or been aborted, the worker will return empty results. The client can attempt
to retry the request. In the case where the task is in a terminal state, it
is assumed that the Control Plane will eventually handle the state change.

If the client missed a response it can repeat the request and the worker will
send the results again. Upon receiving an ack for a sequence number, the worker
deletes all results with the sequence number less than that and the client can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ TaskInfo updateTask(
* task or buffer has not been created yet, an uninitialized task is
* created and a future is returned.
* <p>
* Returns empty results if the Task is destroyed, e.g. because it fails
* or is aborted, or another request is made for the same data.
* <p>
* NOTE: this design assumes that only tasks and buffers that will
* eventually exist are queried.
*/
Expand Down
37 changes: 10 additions & 27 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void keepPromiseAlive(
[promiseHolder]() mutable { promiseHolder.reset(); });
}

std::unique_ptr<Result> createTimeOutResult(long token) {
std::unique_ptr<Result> createEmptyResult(long token) {
auto result = std::make_unique<Result>();
result->sequence = result->nextSequence = token;
result->data = folly::IOBuf::create(0);
Expand Down Expand Up @@ -165,7 +165,7 @@ void getData(
// Buffer was erased for current TaskId.
VLOG(1) << "Task " << taskId << ", buffer " << destination << ", sequence "
<< token << ", buffer not found.";
promiseHolder->promise.setValue(std::move(createTimeOutResult(token)));
promiseHolder->promise.setValue(std::move(createEmptyResult(token)));
}
}

Expand Down Expand Up @@ -805,41 +805,24 @@ folly::Future<std::unique_ptr<Result>> TaskManager::getResults(
// with incomplete empty pages.
promiseHolder->atDestruction(
[token](folly::Promise<std::unique_ptr<Result>> promise) {
auto result = std::make_unique<Result>();
result->sequence = token;
result->nextSequence = token;
result->complete = false;
result->data = folly::IOBuf::copyBuffer("");
promise.setValue(std::move(result));
promise.setValue(createEmptyResult(token));
});

auto timeoutFn = [this, token]() { return createTimeOutResult(token); };
auto timeoutFn = [this, token]() { return createEmptyResult(token); };

try {
auto prestoTask = findOrCreateTask(taskId);

// If the task is aborted or failed, then return an error.
if (prestoTask->info.taskStatus.state == protocol::TaskState::ABORTED) {
VELOX_USER_FAIL("Calling getResult() on a aborted task: {}", taskId);
LOG(WARNING) << "Calling getResult() on a aborted task: " << taskId;
promiseHolder->promise.setValue(createEmptyResult(token));
return std::move(future).via(httpSrvCpuExecutor_);
}
if (prestoTask->error != nullptr) {
try {
std::rethrow_exception(prestoTask->error);
} catch (const VeloxException& e) {
VELOX_USER_FAIL(
"Calling getResult() on a failed PrestoTask: {}. PrestoTask failure reason: {}",
taskId,
e.what());
} catch (const std::exception& e) {
VELOX_USER_FAIL(
"Calling getResult() on a failed PrestoTask: {}. PrestoTask failure reason: {}",
taskId,
e.what());
} catch (...) {
VELOX_USER_FAIL(
"Calling getResult() on a failed PrestoTask: {}. PrestoTask failure reason: UNKNOWN",
taskId);
}
LOG(WARNING) << "Calling getResult() on a failed PrestoTask: " << taskId;
promiseHolder->promise.setValue(createEmptyResult(token));
return std::move(future).via(httpSrvCpuExecutor_);
}

for (;;) {
Expand Down
27 changes: 14 additions & 13 deletions presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1111,23 +1111,24 @@ TEST_F(TaskManagerTest, getDataOnAbortedTask) {
ASSERT_TRUE(promiseFulfilled);
}

TEST_F(TaskManagerTest, getResultsErrorPropagation) {
TEST_F(TaskManagerTest, getResultsFromFailedTask) {
const protocol::TaskId taskId = "error-task.0.0.0.0";
std::exception e;
taskManager_->createOrUpdateErrorTask(taskId, std::make_exception_ptr(e), 0);

// We expect the exception type VeloxException to be reserved still.
EXPECT_THROW(
taskManager_
->getResults(
taskId,
0,
0,
protocol::DataSize("32MB"),
protocol::Duration("300s"),
http::CallbackRequestHandlerState::create())
.get(),
VeloxException);
// We expect to get empty results, rather than an exception.
auto results = taskManager_
->getResults(
taskId,
0,
0,
protocol::DataSize("32MB"),
protocol::Duration("300s"),
http::CallbackRequestHandlerState::create())
.get();

ASSERT_FALSE(results->complete);
ASSERT_EQ(results->data->capacity(), 0);
}

TEST_F(TaskManagerTest, testCumulativeMemory) {
Expand Down