diff --git a/presto-docs/src/main/sphinx/develop/worker-protocol.rst b/presto-docs/src/main/sphinx/develop/worker-protocol.rst index d3462326d7539..84c1c7c551add 100644 --- a/presto-docs/src/main/sphinx/develop/worker-protocol.rst +++ b/presto-docs/src/main/sphinx/develop/worker-protocol.rst @@ -83,6 +83,11 @@ along with the results. Then, the client uses that sequence number to request the next chunk of results. The client keeps fetching results until it receives ``X-Presto-Buffer-Complete`` HTTP header with the value of "true". +If the worker times out populating a response, or the task has already failed +or been aborted, the worker will return empty results. The client can attempt +to retry the request. In the case where the task is in a terminal state, it +is assumed that the Control Plane will eventually handle the state change. + If the client missed a response it can repeat the request and the worker will send the results again. Upon receiving an ack for a sequence number, the worker deletes all results with the sequence number less than that and the client can diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java index c40007df27320..4959e4fdf1725 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java @@ -109,6 +109,9 @@ TaskInfo updateTask( * task or buffer has not been created yet, an uninitialized task is * created and a future is returned. *

+ * Returns empty results if the Task is destroyed, e.g. because it fails + * or is aborted, or another request is made for the same data. + *

* NOTE: this design assumes that only tasks and buffers that will * eventually exist are queried. */ diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 80dd42650c5e8..4600a7a411d6a 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -85,7 +85,7 @@ void keepPromiseAlive( [promiseHolder]() mutable { promiseHolder.reset(); }); } -std::unique_ptr createTimeOutResult(long token) { +std::unique_ptr createEmptyResult(long token) { auto result = std::make_unique(); result->sequence = result->nextSequence = token; result->data = folly::IOBuf::create(0); @@ -165,7 +165,7 @@ void getData( // Buffer was erased for current TaskId. VLOG(1) << "Task " << taskId << ", buffer " << destination << ", sequence " << token << ", buffer not found."; - promiseHolder->promise.setValue(std::move(createTimeOutResult(token))); + promiseHolder->promise.setValue(std::move(createEmptyResult(token))); } } @@ -805,41 +805,24 @@ folly::Future> TaskManager::getResults( // with incomplete empty pages. promiseHolder->atDestruction( [token](folly::Promise> promise) { - auto result = std::make_unique(); - result->sequence = token; - result->nextSequence = token; - result->complete = false; - result->data = folly::IOBuf::copyBuffer(""); - promise.setValue(std::move(result)); + promise.setValue(createEmptyResult(token)); }); - auto timeoutFn = [this, token]() { return createTimeOutResult(token); }; + auto timeoutFn = [this, token]() { return createEmptyResult(token); }; try { auto prestoTask = findOrCreateTask(taskId); // If the task is aborted or failed, then return an error. if (prestoTask->info.taskStatus.state == protocol::TaskState::ABORTED) { - VELOX_USER_FAIL("Calling getResult() on a aborted task: {}", taskId); + LOG(WARNING) << "Calling getResult() on a aborted task: " << taskId; + promiseHolder->promise.setValue(createEmptyResult(token)); + return std::move(future).via(httpSrvCpuExecutor_); } if (prestoTask->error != nullptr) { - try { - std::rethrow_exception(prestoTask->error); - } catch (const VeloxException& e) { - VELOX_USER_FAIL( - "Calling getResult() on a failed PrestoTask: {}. PrestoTask failure reason: {}", - taskId, - e.what()); - } catch (const std::exception& e) { - VELOX_USER_FAIL( - "Calling getResult() on a failed PrestoTask: {}. PrestoTask failure reason: {}", - taskId, - e.what()); - } catch (...) { - VELOX_USER_FAIL( - "Calling getResult() on a failed PrestoTask: {}. PrestoTask failure reason: UNKNOWN", - taskId); - } + LOG(WARNING) << "Calling getResult() on a failed PrestoTask: " << taskId; + promiseHolder->promise.setValue(createEmptyResult(token)); + return std::move(future).via(httpSrvCpuExecutor_); } for (;;) { diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index df12899cb4806..ba2d45087fb61 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -1111,23 +1111,24 @@ TEST_F(TaskManagerTest, getDataOnAbortedTask) { ASSERT_TRUE(promiseFulfilled); } -TEST_F(TaskManagerTest, getResultsErrorPropagation) { +TEST_F(TaskManagerTest, getResultsFromFailedTask) { const protocol::TaskId taskId = "error-task.0.0.0.0"; std::exception e; taskManager_->createOrUpdateErrorTask(taskId, std::make_exception_ptr(e), 0); - // We expect the exception type VeloxException to be reserved still. - EXPECT_THROW( - taskManager_ - ->getResults( - taskId, - 0, - 0, - protocol::DataSize("32MB"), - protocol::Duration("300s"), - http::CallbackRequestHandlerState::create()) - .get(), - VeloxException); + // We expect to get empty results, rather than an exception. + auto results = taskManager_ + ->getResults( + taskId, + 0, + 0, + protocol::DataSize("32MB"), + protocol::Duration("300s"), + http::CallbackRequestHandlerState::create()) + .get(); + + ASSERT_FALSE(results->complete); + ASSERT_EQ(results->data->capacity(), 0); } TEST_F(TaskManagerTest, testCumulativeMemory) {