Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 76 additions & 138 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,59 @@ std::optional<protocol::Duration> getMaxWait(proxygen::HTTPMessage* message) {
return protocol::Duration(
headers.getSingleOrEmpty(protocol::PRESTO_MAX_WAIT_HTTP_HEADER));
}

bool shouldUseThrift(const proxygen::HTTPMessage& message) {
const auto& acceptHeader =
message.getHeaders().getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
return acceptHeader.find(http::kMimeTypeApplicationThrift) !=
std::string::npos;
}

template <typename T, typename ThriftT>
void sendPrestoResponse(
proxygen::ResponseHandler* downstream,
const T& data,
bool sendThrift) {
if (sendThrift) {
ThriftT thriftData;
toThrift(data, thriftData);
http::sendOkThriftResponse(downstream, thriftWrite(thriftData));
} else {
http::sendOkResponse(downstream, json(data));
}
}

/// Creates a CallbackRequestHandler that executes a void work function on the
/// given executor, then sends an empty OK response. On exception, sends an
/// error response. Used for simple fire-and-forget handlers.
template <typename WorkFn>
proxygen::RequestHandler* executeAndRespond(
folly::Executor* executor,
WorkFn&& workFn) {
return new http::CallbackRequestHandler(
[executor, work = std::forward<WorkFn>(workFn)](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream,
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
folly::via(executor, std::move(work))
.via(
folly::getKeepAliveToken(
folly::EventBaseManager::get()->getEventBase()))
.thenValue([downstream, handlerState](auto&& /* unused */) {
if (!handlerState->requestExpired()) {
http::sendOkResponse(downstream);
}
})
.thenError(
folly::tag_t<std::exception>{},
[downstream, handlerState](auto&& e) {
if (!handlerState->requestExpired()) {
http::sendErrorResponse(downstream, e.what());
}
});
});
}
} // namespace

void TaskResource::registerUris(http::HttpServer& server) {
Expand Down Expand Up @@ -136,34 +189,9 @@ proxygen::RequestHandler* TaskResource::abortResults(
const std::vector<std::string>& pathMatch) {
protocol::TaskId taskId = pathMatch[1];
long destination = folly::to<long>(pathMatch[2]);
return new http::CallbackRequestHandler(
[this, taskId, destination](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream,
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
folly::via(
httpSrvCpuExecutor_,
[this, taskId, destination, handlerState]() {
taskManager_.abortResults(taskId, destination);
return true;
})
.via(
folly::getKeepAliveToken(
folly::EventBaseManager::get()->getEventBase()))
.thenValue([downstream, handlerState](auto&& /* unused */) {
if (!handlerState->requestExpired()) {
http::sendOkResponse(downstream);
}
})
.thenError(
folly::tag_t<std::exception>{},
[downstream, handlerState](auto&& e) {
if (!handlerState->requestExpired()) {
http::sendErrorResponse(downstream, e.what());
}
});
});
return executeAndRespond(httpSrvCpuExecutor_, [this, taskId, destination]() {
taskManager_.abortResults(taskId, destination);
});
}

proxygen::RequestHandler* TaskResource::acknowledgeResults(
Expand All @@ -172,34 +200,9 @@ proxygen::RequestHandler* TaskResource::acknowledgeResults(
protocol::TaskId taskId = pathMatch[1];
long bufferId = folly::to<long>(pathMatch[2]);
long token = folly::to<long>(pathMatch[3]);

return new http::CallbackRequestHandler(
[this, taskId, bufferId, token](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream,
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
folly::via(
httpSrvCpuExecutor_,
[this, taskId, bufferId, token]() {
taskManager_.acknowledgeResults(taskId, bufferId, token);
return true;
})
.via(
folly::getKeepAliveToken(
folly::EventBaseManager::get()->getEventBase()))
.thenValue([downstream, handlerState](auto&& /* unused */) {
if (!handlerState->requestExpired()) {
http::sendOkResponse(downstream);
}
})
.thenError(
folly::tag_t<std::exception>{},
[downstream, handlerState](auto&& e) {
if (!handlerState->requestExpired()) {
http::sendErrorResponse(downstream, e.what());
}
});
return executeAndRespond(
httpSrvCpuExecutor_, [this, taskId, bufferId, token]() {
taskManager_.acknowledgeResults(taskId, bufferId, token);
});
}

Expand All @@ -216,10 +219,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
bool summarize = message->hasQueryParam("summarize");

const auto& headers = message->getHeaders();
const auto& acceptHeader =
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
const auto sendThrift =
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
const auto sendThrift = shouldUseThrift(*message);
const auto& contentHeader =
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
const auto receiveThrift =
Expand Down Expand Up @@ -282,14 +282,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
folly::EventBaseManager::get()->getEventBase()))
.thenValue([downstream, handlerState, sendThrift](auto taskInfo) {
if (!handlerState->requestExpired()) {
if (sendThrift) {
thrift::TaskInfo thriftTaskInfo;
toThrift(*taskInfo, thriftTaskInfo);
http::sendOkThriftResponse(
downstream, thriftWrite(thriftTaskInfo));
} else {
http::sendOkResponse(downstream, json(*taskInfo));
}
sendPrestoResponse<protocol::TaskInfo, thrift::TaskInfo>(
downstream, *taskInfo, sendThrift);
}
})
.thenError(
Expand Down Expand Up @@ -419,11 +413,7 @@ proxygen::RequestHandler* TaskResource::deleteTask(
message->getQueryParam(protocol::PRESTO_ABORT_TASK_URL_PARAM) == "true";
}
bool summarize = message->hasQueryParam("summarize");
const auto& headers = message->getHeaders();
const auto& acceptHeader =
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
const auto sendThrift =
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
const auto sendThrift = shouldUseThrift(*message);

return new http::CallbackRequestHandler(
[this, taskId, abort, summarize, sendThrift](
Expand All @@ -448,14 +438,8 @@ proxygen::RequestHandler* TaskResource::deleteTask(
sendTaskNotFound(downstream, taskId);
return;
}
if (sendThrift) {
thrift::TaskInfo thriftTaskInfo;
toThrift(*taskInfo, thriftTaskInfo);
http::sendOkThriftResponse(
downstream, thriftWrite(thriftTaskInfo));
} else {
http::sendOkResponse(downstream, json(*taskInfo));
}
sendPrestoResponse<protocol::TaskInfo, thrift::TaskInfo>(
downstream, *taskInfo, sendThrift);
}
})
.thenError(
Expand Down Expand Up @@ -565,12 +549,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
protocol::TaskId taskId = pathMatch[1];
auto currentState = getCurrentState(message);
auto maxWait = getMaxWait(message);

const auto& headers = message->getHeaders();
const auto& acceptHeader =
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
const auto sendThrift =
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
const auto sendThrift = shouldUseThrift(*message);

return new http::CallbackRequestHandler(
[this, sendThrift, taskId, currentState, maxWait](
Expand All @@ -596,15 +575,10 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
[sendThrift, downstream, taskId, handlerState](
std::unique_ptr<protocol::TaskStatus> taskStatus) {
if (!handlerState->requestExpired()) {
if (sendThrift) {
thrift::TaskStatus thriftTaskStatus;
toThrift(*taskStatus, thriftTaskStatus);
http::sendOkThriftResponse(
downstream, thriftWrite(thriftTaskStatus));
} else {
json taskStatusJson = *taskStatus;
http::sendOkResponse(downstream, taskStatusJson);
}
sendPrestoResponse<
protocol::TaskStatus,
thrift::TaskStatus>(
downstream, *taskStatus, sendThrift);
}
})
.thenError(
Expand All @@ -629,12 +603,7 @@ proxygen::RequestHandler* TaskResource::getTaskInfo(
auto currentState = getCurrentState(message);
auto maxWait = getMaxWait(message);
bool summarize = message->hasQueryParam("summarize");

const auto& headers = message->getHeaders();
const auto& acceptHeader =
headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
const auto sendThrift =
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
const auto sendThrift = shouldUseThrift(*message);

return new http::CallbackRequestHandler(
[this, taskId, currentState, maxWait, summarize, sendThrift](
Expand All @@ -661,14 +630,8 @@ proxygen::RequestHandler* TaskResource::getTaskInfo(
.thenValue([downstream, taskId, handlerState, sendThrift](
std::unique_ptr<protocol::TaskInfo> taskInfo) {
if (!handlerState->requestExpired()) {
if (sendThrift) {
thrift::TaskInfo thriftTaskInfo;
toThrift(*taskInfo, thriftTaskInfo);
http::sendOkThriftResponse(
downstream, thriftWrite(thriftTaskInfo));
} else {
http::sendOkResponse(downstream, json(*taskInfo));
}
sendPrestoResponse<protocol::TaskInfo, thrift::TaskInfo>(
downstream, *taskInfo, sendThrift);
}
})
.thenError(
Expand All @@ -690,33 +653,8 @@ proxygen::RequestHandler* TaskResource::removeRemoteSource(
const std::vector<std::string>& pathMatch) {
protocol::TaskId taskId = pathMatch[1];
auto remoteId = pathMatch[2];

return new http::CallbackRequestHandler(
[this, taskId, remoteId](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream,
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
folly::via(
httpSrvCpuExecutor_,
[this, taskId, remoteId, downstream]() {
taskManager_.removeRemoteSource(taskId, remoteId);
})
.via(
folly::getKeepAliveToken(
folly::EventBaseManager::get()->getEventBase()))
.thenValue([downstream, handlerState](auto&& /* unused */) {
if (!handlerState->requestExpired()) {
http::sendOkResponse(downstream);
}
})
.thenError(
folly::tag_t<std::exception>{},
[downstream, handlerState](const std::exception& e) {
if (!handlerState->requestExpired()) {
http::sendErrorResponse(downstream, e.what());
}
});
});
return executeAndRespond(httpSrvCpuExecutor_, [this, taskId, remoteId]() {
taskManager_.removeRemoteSource(taskId, remoteId);
});
}
} // namespace facebook::presto
Loading