Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,34 @@ protocol::TaskStatus PrestoTask::updateStatusLocked() {
return info.taskStatus;
}

void PrestoTask::updateOutputBufferInfoLocked(
const velox::exec::TaskStats& taskStats) {
if (!taskStats.outputBufferStats.has_value()) {
return;
}
const auto& outputBufferStats = taskStats.outputBufferStats.value();
auto& outputBufferInfo = info.outputBuffers;
outputBufferInfo.type =
velox::core::PartitionedOutputNode::kindString(outputBufferStats.kind);
outputBufferInfo.canAddBuffers = !outputBufferStats.noMoreBuffers;
outputBufferInfo.canAddPages = !outputBufferStats.noMoreData;
outputBufferInfo.totalBufferedBytes = outputBufferStats.bufferedBytes;
outputBufferInfo.totalBufferedPages = outputBufferStats.bufferedPages;
outputBufferInfo.totalPagesSent = outputBufferStats.totalPagesSent;
outputBufferInfo.totalRowsSent = outputBufferStats.totalRowsSent;
// TODO: populate state and destination buffer stats in info.outputBuffers.
}

protocol::TaskInfo PrestoTask::updateInfoLocked() {
protocol::TaskStatus taskStatus = updateStatusLocked();

// Return limited info if there is no exec task.
if (!task) {
if (task == nullptr) {
return info;
}

const velox::exec::TaskStats taskStats = task->taskStats();
updateOutputBufferInfoLocked(taskStats);
protocol::TaskStats& prestoTaskStats = info.stats;
// Clear the old runtime metrics as not all of them would be overwritten by
// the new ones.
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ struct PrestoTask {

protocol::TaskStatus updateStatusLocked();
protocol::TaskInfo updateInfoLocked();
void updateOutputBufferInfoLocked(const velox::exec::TaskStats& taskStats);

std::string toJsonString() const;

Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 243 files