Skip to content

Commit 915f96a

Browse files
committed
Add output stats for pos sink operators (prestodb#25915)
Summary: Adds output row stats for sapphire-velox related sink operators Properly close write file on broadcast write Reviewed By: singcha Differential Revision: D81271224
1 parent 6387368 commit 915f96a

File tree

3 files changed

+11
-1
lines changed

3 files changed

+11
-1
lines changed

presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ void BroadcastFileWriter::collect(const RowVectorPtr& input) {
8282
write(input);
8383
}
8484

85-
void BroadcastFileWriter::noMoreData() {}
85+
void BroadcastFileWriter::noMoreData() {
86+
if (writeFile_ != nullptr) {
87+
writeFile_->flush();
88+
writeFile_->close();
89+
}
90+
}
8691

8792
RowVectorPtr BroadcastFileWriter::fileStats() {
8893
// No rows written.

presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ class BroadcastWriteOperator : public Operator {
7070
}
7171

7272
fileBroadcastWriter_->collect(reorderedInput);
73+
auto lockedStats = stats_.wlock();
74+
lockedStats->addOutputVector(
75+
reorderedInput->estimateFlatSize(), reorderedInput->size());
7376
}
7477

7578
void noMoreInput() override {

presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ class ShuffleWriteOperator : public Operator {
104104
"collect");
105105
}
106106
}
107+
auto lockedStats = stats_.wlock();
108+
lockedStats->addOutputVector(input->estimateFlatSize(), input->size());
107109
}
108110

109111
void noMoreInput() override {

0 commit comments

Comments
 (0)