Skip to content

Commit ffe429d

Browse files
MediaPipe Teamcopybara-github
MediaPipe Team
authored andcommitted
Adds output stream stats to GraphRuntimeInfo
PiperOrigin-RevId: 730870783
1 parent 70015e0 commit ffe429d

10 files changed

+183
-20
lines changed

Diff for: mediapipe/framework/BUILD

+4-2
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,8 @@ cc_library(
958958
"//mediapipe/framework/tool:tag_map",
959959
"@com_google_absl//absl/base:core_headers",
960960
"@com_google_absl//absl/log:absl_check",
961+
"@com_google_absl//absl/status",
962+
"@com_google_absl//absl/strings",
961963
"@com_google_absl//absl/synchronization",
962964
],
963965
)
@@ -974,9 +976,10 @@ cc_library(
974976
":packet_type",
975977
":port",
976978
":timestamp",
977-
"//mediapipe/framework/port:source_location",
978979
"//mediapipe/framework/port:status",
980+
"@com_google_absl//absl/base:core_headers",
979981
"@com_google_absl//absl/log:absl_check",
982+
"@com_google_absl//absl/log:absl_log",
980983
"@com_google_absl//absl/synchronization",
981984
],
982985
)
@@ -1473,7 +1476,6 @@ cc_test(
14731476
"//mediapipe/framework/tool:source",
14741477
"@com_google_absl//absl/log:absl_check",
14751478
"@com_google_absl//absl/log:absl_log",
1476-
"@com_google_absl//absl/memory",
14771479
],
14781480
)
14791481

Diff for: mediapipe/framework/calculator_node.cc

+17-8
Original file line numberDiff line numberDiff line change
@@ -230,26 +230,35 @@ absl::Status CalculatorNode::Initialize(
230230
}
231231

232232
CalculatorRuntimeInfo CalculatorNode::GetStreamMonitoringInfo() const {
233-
CalculatorRuntimeInfo calulator_info;
234-
calulator_info.set_calculator_name(DebugName());
233+
CalculatorRuntimeInfo calculator_info;
234+
calculator_info.set_calculator_name(DebugName());
235235
{
236236
absl::MutexLock lock(&runtime_info_mutex_);
237-
calulator_info.set_last_process_start_unix_us(
237+
calculator_info.set_last_process_start_unix_us(
238238
absl::ToUnixMicros(last_process_start_ts_));
239-
calulator_info.set_last_process_finish_unix_us(
239+
calculator_info.set_last_process_finish_unix_us(
240240
absl::ToUnixMicros(last_process_finish_ts_));
241241
}
242-
const auto monitoring_info = input_stream_handler_->GetMonitoringInfo();
242+
const auto input_stream_info = input_stream_handler_->GetMonitoringInfo();
243243
for (const auto& [stream_name, queue_size, num_packets_added,
244-
minimum_timestamp_or_bound] : monitoring_info) {
245-
auto* stream_info = calulator_info.add_input_stream_infos();
244+
minimum_timestamp_or_bound] : input_stream_info) {
245+
auto* stream_info = calculator_info.add_input_stream_infos();
246246
stream_info->set_stream_name(stream_name);
247247
stream_info->set_queue_size(queue_size);
248248
stream_info->set_number_of_packets_added(num_packets_added);
249249
stream_info->set_minimum_timestamp_or_bound(
250250
minimum_timestamp_or_bound.Value());
251251
}
252-
return calulator_info;
252+
const auto output_stream_info = output_stream_handler_->GetMonitoringInfo();
253+
for (const auto& [stream_name, num_packets_added,
254+
minimum_timestamp_or_bound] : output_stream_info) {
255+
auto* stream_info = calculator_info.add_output_stream_infos();
256+
stream_info->set_stream_name(stream_name);
257+
stream_info->set_number_of_packets_added(num_packets_added);
258+
stream_info->set_minimum_timestamp_or_bound(
259+
minimum_timestamp_or_bound.Value());
260+
}
261+
return calculator_info;
253262
}
254263

255264
absl::Status CalculatorNode::InitializeOutputSidePackets(

Diff for: mediapipe/framework/calculator_node_test.cc

+38-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
#include "absl/log/absl_check.h"
2222
#include "absl/log/absl_log.h"
23-
#include "absl/memory/memory.h"
2423
#include "mediapipe/framework/calculator_framework.h"
2524
#include "mediapipe/framework/output_side_packet_impl.h"
2625
#include "mediapipe/framework/port/gmock.h"
@@ -573,5 +572,43 @@ TEST_F(CalculatorNodeTest, CleanupAfterRunTwiceWithTags) {
573572
TestCleanupAfterRunTwice();
574573
}
575574

575+
TEST_F(CalculatorNodeTest, ShouldGenerateStreamMonitoringInfo) {
576+
InitializeEnvironment(/*use_tags=*/false);
577+
MP_ASSERT_OK(PrepareNodeForRun());
578+
579+
SimulateParentOpenNode();
580+
MP_EXPECT_OK(node_->OpenNode());
581+
582+
OutputStreamShard stream_a_shard;
583+
stream_a_shard.SetSpec(stream_a_manager_->Spec());
584+
stream_a_shard.Add(new int(1), Timestamp(1));
585+
stream_a_manager_->PropagateUpdatesToMirrors(Timestamp(2), &stream_a_shard);
586+
EXPECT_EQ(1, schedule_count_);
587+
EXPECT_TRUE(node_->TryToBeginScheduling());
588+
MP_EXPECT_OK(node_->ProcessNode(cc_));
589+
cc_ = nullptr;
590+
node_->EndScheduling();
591+
EXPECT_EQ(1, schedule_count_);
592+
593+
const auto stream_monitoring_info = node_->GetStreamMonitoringInfo();
594+
EXPECT_EQ(stream_monitoring_info.calculator_name(), "CountCalculator");
595+
EXPECT_EQ(stream_monitoring_info.input_stream_infos_size(), 1);
596+
EXPECT_THAT(stream_monitoring_info.input_stream_infos(0),
597+
mediapipe::EqualsProto(
598+
mediapipe::ParseTextProtoOrDie<InputStreamRuntimeInfo>(R"pb(
599+
stream_name: ":0:stream_a"
600+
number_of_packets_added: 1
601+
minimum_timestamp_or_bound: 2
602+
)pb")));
603+
EXPECT_EQ(stream_monitoring_info.output_stream_infos_size(), 1);
604+
EXPECT_THAT(stream_monitoring_info.output_stream_infos(0),
605+
mediapipe::EqualsProto(
606+
mediapipe::ParseTextProtoOrDie<OutputStreamRuntimeInfo>(R"pb(
607+
stream_name: ":0:stream_b"
608+
number_of_packets_added: 1
609+
minimum_timestamp_or_bound: 2
610+
)pb")));
611+
}
612+
576613
} // namespace
577614
} // namespace mediapipe

Diff for: mediapipe/framework/graph_runtime_info.proto

+17-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ option java_package = "com.google.mediapipe.proto";
2020
option java_outer_classname = "GraphRuntimeInfoProto";
2121

2222
// The runtime info for an input stream.
23-
message StreamRuntimeInfo {
23+
message InputStreamRuntimeInfo {
2424
// The name of the stream in the format "TAG:index:stream_name"
2525
string stream_name = 1;
2626

@@ -34,6 +34,18 @@ message StreamRuntimeInfo {
3434
int64 minimum_timestamp_or_bound = 4;
3535
}
3636

37+
// The runtime info for an output stream.
38+
message OutputStreamRuntimeInfo {
39+
// The name of the stream in the format "TAG:index:stream_name"
40+
string stream_name = 1;
41+
42+
// The total number of packets added to the queue.
43+
int32 number_of_packets_added = 2;
44+
45+
// The minimum timestamp or timestanp bound of the stream.
46+
int64 minimum_timestamp_or_bound = 3;
47+
}
48+
3749
// The runtime info for a calculator.
3850
message CalculatorRuntimeInfo {
3951
// The name of the calculator.
@@ -49,7 +61,10 @@ message CalculatorRuntimeInfo {
4961
int64 timestamp_bound = 4;
5062

5163
// The runtime info for each input stream of the calculator.
52-
repeated StreamRuntimeInfo input_stream_infos = 5;
64+
repeated InputStreamRuntimeInfo input_stream_infos = 5;
65+
66+
// The runtime info for each output stream of the calculator.
67+
repeated OutputStreamRuntimeInfo output_stream_infos = 6;
5368
}
5469

5570
// The runtime info for the whole graph.

Diff for: mediapipe/framework/output_stream_handler.cc

+29
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@
1414

1515
#include "mediapipe/framework/output_stream_handler.h"
1616

17+
#include <string>
18+
#include <vector>
19+
1720
#include "absl/log/absl_check.h"
21+
#include "absl/status/status.h"
22+
#include "absl/strings/str_cat.h"
1823
#include "absl/synchronization/mutex.h"
1924
#include "mediapipe/framework/collection_item_id.h"
2025
#include "mediapipe/framework/output_stream_shard.h"
@@ -121,6 +126,30 @@ std::string OutputStreamHandler::FirstStreamName() const {
121126
return (*output_stream_managers_.begin())->Name();
122127
}
123128

129+
std::string OutputStreamHandler::DebugStreamName(CollectionItemId id) const {
130+
const auto tag_map = output_stream_managers_.TagMap();
131+
const std::string& stream_name = tag_map->Names()[id.value()];
132+
const auto& [stream_tag, stream_idx] = tag_map->TagAndIndexFromId(id);
133+
return absl::StrCat(stream_tag, ":", stream_idx, ":", stream_name);
134+
}
135+
136+
std::vector<OutputStreamHandler::OutputStreamMonitoringInfo>
137+
OutputStreamHandler::GetMonitoringInfo() {
138+
std::vector<OutputStreamMonitoringInfo> monitoring_info_vector;
139+
for (CollectionItemId id = output_stream_managers_.BeginId();
140+
id < output_stream_managers_.EndId(); ++id) {
141+
const auto& stream = output_stream_managers_.Get(id);
142+
if (!stream) {
143+
continue;
144+
}
145+
monitoring_info_vector.emplace_back(OutputStreamMonitoringInfo(
146+
{.stream_name = DebugStreamName(id),
147+
.num_packets_added = stream->NumPacketsAdded(),
148+
.next_timestamp_bound = stream->NextTimestampBound()}));
149+
}
150+
return monitoring_info_vector;
151+
}
152+
124153
void OutputStreamHandler::TryPropagateTimestampBound(Timestamp input_bound) {
125154
// TODO Some non-range values, such as PostStream(), should also be
126155
// propagated.

Diff for: mediapipe/framework/output_stream_handler.h

+19-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <unordered_map>
2323
#include <unordered_set>
2424
#include <utility>
25+
#include <vector>
2526

2627
// TODO: Move protos in another CL after the C++ code migration.
2728
#include "absl/base/thread_annotations.h"
@@ -34,7 +35,6 @@
3435
#include "mediapipe/framework/output_stream_manager.h"
3536
#include "mediapipe/framework/packet_set.h"
3637
#include "mediapipe/framework/port/logging.h"
37-
#include "mediapipe/framework/port/status.h"
3838
#include "mediapipe/framework/timestamp.h"
3939
#include "mediapipe/framework/tool/tag_map.h"
4040

@@ -49,6 +49,15 @@ class OutputStreamHandler {
4949
OutputStreamToSourcesMap;
5050
typedef internal::Collection<OutputStreamManager*> OutputStreamManagerSet;
5151

52+
// Struct to return monitoring info via GetMonitoringInfo;
53+
struct OutputStreamMonitoringInfo {
54+
std::string stream_name;
55+
// The total number of packets added to the output stream.
56+
int num_packets_added;
57+
// The next timestamp bound of the output stream.
58+
Timestamp next_timestamp_bound;
59+
};
60+
5261
// The constructor of the OutputStreamHandler takes four arguments.
5362
// The tag_map argument holds the information needed for tag/index retrieval
5463
// for the output streams; the calculator_context_manager for managing the
@@ -91,7 +100,7 @@ class OutputStreamHandler {
91100
}
92101

93102
// Calls OutputStreamManager::PrepareForRun(error_callback) per stream, and
94-
// resets data memebers.
103+
// resets data members.
95104
void PrepareForRun(const std::function<void(absl::Status)>& error_callback)
96105
ABSL_LOCKS_EXCLUDED(timestamp_mutex_);
97106

@@ -124,6 +133,14 @@ class OutputStreamHandler {
124133
return output_stream_managers_;
125134
}
126135

136+
// Return the stream name for an input stream in the format:
137+
// stream_tag:stream_index:stream_name.
138+
std::string DebugStreamName(CollectionItemId id) const;
139+
140+
// Returns a vector of tuples of stream name, number of packets added, and
141+
// the next timestamp bound for each stream (for monitoring purposes).
142+
std::vector<OutputStreamMonitoringInfo> GetMonitoringInfo();
143+
127144
protected:
128145
// Checks if the given input bound should be propagated or not. If any output
129146
// streams with OffsetEnabled() need to have the timestamp bounds updated,

Diff for: mediapipe/framework/output_stream_manager.cc

+12-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
#include "mediapipe/framework/output_stream_manager.h"
1616

17+
#include <list>
18+
1719
#include "absl/log/absl_check.h"
20+
#include "absl/log/absl_log.h"
1821
#include "absl/synchronization/mutex.h"
1922
#include "mediapipe/framework/input_stream_handler.h"
2023
#include "mediapipe/framework/port/status_builder.h"
@@ -40,6 +43,7 @@ void OutputStreamManager::PrepareForRun(
4043
absl::MutexLock lock(&stream_mutex_);
4144
next_timestamp_bound_ = Timestamp::PreStream();
4245
closed_ = false;
46+
num_packets_added_ = 0;
4347
}
4448
}
4549

@@ -165,20 +169,21 @@ Timestamp OutputStreamManager::ComputeOutputTimestampBound(
165169
void OutputStreamManager::PropagateUpdatesToMirrors(
166170
Timestamp next_timestamp_bound, OutputStreamShard* output_stream_shard) {
167171
ABSL_CHECK(output_stream_shard);
172+
std::list<Packet>* packets_to_propagate = output_stream_shard->OutputQueue();
168173
{
169174
if (next_timestamp_bound != Timestamp::Unset()) {
170175
absl::MutexLock lock(&stream_mutex_);
171176
next_timestamp_bound_ = next_timestamp_bound;
177+
num_packets_added_ += packets_to_propagate->size();
172178
VLOG(3) << "Next timestamp bound for output " << output_stream_spec_.name
173179
<< " is " << next_timestamp_bound_;
174180
}
175181
}
176-
std::list<Packet>* packets_to_propagate = output_stream_shard->OutputQueue();
177182
VLOG(3) << "Output stream: " << Name()
178183
<< " queue size: " << packets_to_propagate->size();
179184
VLOG(3) << "Output stream: " << Name()
180185
<< " next timestamp: " << next_timestamp_bound;
181-
bool add_packets = !packets_to_propagate->empty();
186+
const bool add_packets = !packets_to_propagate->empty();
182187
bool set_bound =
183188
(next_timestamp_bound != Timestamp::Unset()) &&
184189
(!add_packets ||
@@ -218,4 +223,9 @@ void OutputStreamManager::ResetShard(OutputStreamShard* output_stream_shard) {
218223
output_stream_shard->Reset(next_timestamp_bound, closed);
219224
}
220225

226+
int OutputStreamManager::NumPacketsAdded() const {
227+
absl::MutexLock lock(&stream_mutex_);
228+
return num_packets_added_;
229+
}
230+
221231
} // namespace mediapipe

Diff for: mediapipe/framework/output_stream_manager.h

+9
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
#ifndef MEDIAPIPE_FRAMEWORK_OUTPUT_STREAM_MANAGER_H_
1616
#define MEDIAPIPE_FRAMEWORK_OUTPUT_STREAM_MANAGER_H_
1717

18+
#include <cstdint>
1819
#include <functional>
1920
#include <string>
2021
#include <vector>
2122

23+
#include "absl/base/thread_annotations.h"
2224
#include "absl/synchronization/mutex.h"
2325
#include "mediapipe/framework/output_stream_shard.h"
2426
#include "mediapipe/framework/packet.h"
@@ -102,6 +104,10 @@ class OutputStreamManager {
102104
OutputStreamSpec* Spec() { return &output_stream_spec_; }
103105
const OutputStreamSpec* Spec() const { return &output_stream_spec_; }
104106

107+
// Returns the total number of packets added to the output stream. This is
108+
// used for monitoring purposes.
109+
int NumPacketsAdded() const;
110+
105111
private:
106112
// The necessary information to locate an InputStreamImpl.
107113
struct Mirror {
@@ -120,6 +126,9 @@ class OutputStreamManager {
120126
mutable absl::Mutex stream_mutex_;
121127
Timestamp next_timestamp_bound_ ABSL_GUARDED_BY(stream_mutex_);
122128
bool closed_ ABSL_GUARDED_BY(stream_mutex_);
129+
// Monotonically increasing total number of packets added to the output
130+
// stream. This is used for monitoring purposes.
131+
int64_t num_packets_added_ ABSL_GUARDED_BY(stream_mutex_) = 0;
123132
};
124133

125134
} // namespace mediapipe

Diff for: mediapipe/framework/output_stream_manager_test.cc

+19-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
#include "mediapipe/framework/output_stream_manager.h"
1616

17+
#include <functional>
1718
#include <memory>
19+
#include <string>
1820

1921
#include "absl/memory/memory.h"
2022
#include "mediapipe/framework/input_stream_handler.h"
@@ -32,7 +34,7 @@ namespace {
3234

3335
class OutputStreamManagerTest : public ::testing::Test {
3436
protected:
35-
OutputStreamManagerTest() {}
37+
OutputStreamManagerTest() = default;
3638

3739
void SetUp() override {
3840
packet_type_.Set<std::string>();
@@ -688,5 +690,21 @@ TEST_F(OutputStreamManagerTest, AddPacketAndMovePacket) {
688690
EXPECT_TRUE(errors_.empty());
689691
}
690692

693+
TEST_F(OutputStreamManagerTest, ShouldReportNumPacketsAdded) {
694+
Timestamp input_timestamp = Timestamp(0);
695+
output_stream_shard_.AddPacket(
696+
MakePacket<std::string>("packet 1").At(Timestamp(1)));
697+
output_stream_shard_.AddPacket(
698+
MakePacket<std::string>("packet 2").At(Timestamp(2)));
699+
700+
Timestamp output_bound = output_stream_manager_->ComputeOutputTimestampBound(
701+
output_stream_shard_, input_timestamp);
702+
output_stream_manager_->PropagateUpdatesToMirrors(output_bound,
703+
&output_stream_shard_);
704+
ASSERT_TRUE(errors_.empty());
705+
EXPECT_EQ(input_stream_manager_.NumPacketsAdded(), 2);
706+
EXPECT_TRUE(output_stream_shard_.IsEmpty());
707+
}
708+
691709
} // namespace
692710
} // namespace mediapipe

0 commit comments

Comments
 (0)