12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- #include < cstdlib>
15
+ #include " mediapipe/calculators/core/packet_resampler_calculator.h"
16
+
16
17
#include < memory>
17
- #include < string>
18
-
19
- #include " absl/strings/str_cat.h"
20
- #include " mediapipe/calculators/core/packet_resampler_calculator.pb.h"
21
- #include " mediapipe/framework/calculator_framework.h"
22
- #include " mediapipe/framework/collection_item_id.h"
23
- #include " mediapipe/framework/deps/mathutil.h"
24
- #include " mediapipe/framework/deps/random_base.h"
25
- #include " mediapipe/framework/formats/video_stream_header.h"
26
- #include " mediapipe/framework/port/integral_types.h"
27
- #include " mediapipe/framework/port/logging.h"
28
- #include " mediapipe/framework/port/ret_check.h"
29
- #include " mediapipe/framework/port/status.h"
30
- #include " mediapipe/framework/port/status_macros.h"
31
- #include " mediapipe/framework/tool/options_util.h"
32
18
33
19
namespace {
34
20
@@ -45,120 +31,7 @@ std::unique_ptr<RandomBase> CreateSecureRandom(const std::string& seed) {
45
31
46
32
namespace mediapipe {
47
33
48
- // This calculator is used to normalize the frequency of the packets
49
- // out of a stream. Given a desired frame rate, packets are going to be
50
- // removed or added to achieve it.
51
- //
52
- // The jitter feature is disabled by default. To enable it, you need to
53
- // implement CreateSecureRandom(const std::string&).
54
- //
55
- // The data stream may be either specified as the only stream (by index)
56
- // or as the stream with tag "DATA".
57
- //
58
- // The input and output streams may be accompanied by a VIDEO_HEADER
59
- // stream. This stream includes a VideoHeader at Timestamp::PreStream().
60
- // The input VideoHeader on the VIDEO_HEADER stream will always be updated
61
- // with the resampler frame rate no matter what the options value for
62
- // output_header is before being output on the output VIDEO_HEADER stream.
63
- // If the input VideoHeader is not available, then only the frame rate
64
- // value will be set in the output.
65
- //
66
- // Related:
67
- // packet_downsampler_calculator.cc: skips packets regardless of timestamps.
68
- class PacketResamplerCalculator : public CalculatorBase {
69
- public:
70
- static ::mediapipe::Status GetContract (CalculatorContract* cc);
71
-
72
- ::mediapipe::Status Open (CalculatorContext* cc) override ;
73
- ::mediapipe::Status Close (CalculatorContext* cc) override ;
74
- ::mediapipe::Status Process (CalculatorContext* cc) override ;
75
-
76
- private:
77
- // Calculates the first sampled timestamp that incorporates a jittering
78
- // offset.
79
- void InitializeNextOutputTimestampWithJitter ();
80
- // Calculates the next sampled timestamp that incorporates a jittering offset.
81
- void UpdateNextOutputTimestampWithJitter ();
82
-
83
- // Logic for Process() when jitter_ != 0.0.
84
- ::mediapipe::Status ProcessWithJitter (CalculatorContext* cc);
85
-
86
- // Logic for Process() when jitter_ == 0.0.
87
- ::mediapipe::Status ProcessWithoutJitter (CalculatorContext* cc);
88
-
89
- // Given the current count of periods that have passed, this returns
90
- // the next valid timestamp of the middle point of the next period:
91
- // if count is 0, it returns the first_timestamp_.
92
- // if count is 1, it returns the first_timestamp_ + period (corresponding
93
- // to the first tick using exact fps)
94
- // e.g. for frame_rate=30 and first_timestamp_=0:
95
- // 0: 0
96
- // 1: 33333
97
- // 2: 66667
98
- // 3: 100000
99
- //
100
- // Can only be used if jitter_ equals zero.
101
- Timestamp PeriodIndexToTimestamp (int64 index) const ;
102
-
103
- // Given a Timestamp, finds the closest sync Timestamp based on
104
- // first_timestamp_ and the desired fps.
105
- //
106
- // Can only be used if jitter_ equals zero.
107
- int64 TimestampToPeriodIndex (Timestamp timestamp) const ;
108
-
109
- // Outputs a packet if it is in range (start_time_, end_time_).
110
- void OutputWithinLimits (CalculatorContext* cc, const Packet& packet) const ;
111
-
112
- // The timestamp of the first packet received.
113
- Timestamp first_timestamp_;
114
-
115
- // Number of frames per second (desired output frequency).
116
- double frame_rate_;
117
-
118
- // Inverse of frame_rate_.
119
- int64 frame_time_usec_;
120
-
121
- // Number of periods that have passed (= #packets sent to the output).
122
- //
123
- // Can only be used if jitter_ equals zero.
124
- int64 period_count_;
125
-
126
- // The last packet that was received.
127
- Packet last_packet_;
128
-
129
- VideoHeader video_header_;
130
- // The "DATA" input stream.
131
- CollectionItemId input_data_id_;
132
- // The "DATA" output stream.
133
- CollectionItemId output_data_id_;
134
-
135
- // Indicator whether to flush last packet even if its timestamp is greater
136
- // than the final stream timestamp. Set to false when jitter_ is non-zero.
137
- bool flush_last_packet_;
138
-
139
- // Jitter-related variables.
140
- std::unique_ptr<RandomBase> random_;
141
- double jitter_ = 0.0 ;
142
- Timestamp next_output_timestamp_;
143
-
144
- // If specified, output timestamps are aligned with base_timestamp.
145
- // Otherwise, they are aligned with the first input timestamp.
146
- Timestamp base_timestamp_;
147
-
148
- // If specified, only outputs at/after start_time are included.
149
- Timestamp start_time_;
150
-
151
- // If specified, only outputs before end_time are included.
152
- Timestamp end_time_;
153
-
154
- // If set, the output timestamps nearest to start_time and end_time
155
- // are included in the output, even if the nearest timestamp is not
156
- // between start_time and end_time.
157
- bool round_limits_;
158
- };
159
-
160
34
REGISTER_CALCULATOR (PacketResamplerCalculator);
161
-
162
35
namespace {
163
36
// Returns a TimestampDiff (assuming microseconds) corresponding to the
164
37
// given time in seconds.
@@ -279,7 +152,10 @@ ::mediapipe::Status PacketResamplerCalculator::Open(CalculatorContext* cc) {
279
152
" SecureRandom is not available. With \" jitter\" specified, "
280
153
" PacketResamplerCalculator processing cannot proceed." );
281
154
}
155
+ packet_reservoir_random_ = CreateSecureRandom (seed);
282
156
}
157
+ packet_reservoir_ =
158
+ std::make_unique<PacketReservoir>(packet_reservoir_random_.get ());
283
159
return ::mediapipe::OkStatus ();
284
160
}
285
161
@@ -294,6 +170,14 @@ ::mediapipe::Status PacketResamplerCalculator::Process(CalculatorContext* cc) {
294
170
}
295
171
}
296
172
if (jitter_ != 0.0 && random_ != nullptr ) {
173
+ // Packet reservior is used to make sure there's an output for every period,
174
+ // e.g. partial period at the end of the stream.
175
+ if (packet_reservoir_->IsEnabled () &&
176
+ (first_timestamp_ == Timestamp::Unset () ||
177
+ (cc->InputTimestamp () - next_output_timestamp_min_).Value () >= 0 )) {
178
+ auto curr_packet = cc->Inputs ().Get (input_data_id_).Value ();
179
+ packet_reservoir_->AddSample (curr_packet);
180
+ }
297
181
MP_RETURN_IF_ERROR (ProcessWithJitter (cc));
298
182
} else {
299
183
MP_RETURN_IF_ERROR (ProcessWithoutJitter (cc));
@@ -303,11 +187,14 @@ ::mediapipe::Status PacketResamplerCalculator::Process(CalculatorContext* cc) {
303
187
}
304
188
305
189
void PacketResamplerCalculator::InitializeNextOutputTimestampWithJitter () {
190
+ next_output_timestamp_min_ = first_timestamp_;
306
191
next_output_timestamp_ =
307
192
first_timestamp_ + frame_time_usec_ * random_->RandFloat ();
308
193
}
309
194
310
195
void PacketResamplerCalculator::UpdateNextOutputTimestampWithJitter () {
196
+ packet_reservoir_->Clear ();
197
+ packet_reservoir_->Disable ();
311
198
next_output_timestamp_ +=
312
199
frame_time_usec_ *
313
200
((1.0 - jitter_) + 2.0 * jitter_ * random_->RandFloat ());
@@ -339,10 +226,10 @@ ::mediapipe::Status PacketResamplerCalculator::ProcessWithJitter(
339
226
while (true ) {
340
227
const int64 last_diff =
341
228
(next_output_timestamp_ - last_packet_.Timestamp ()).Value ();
342
- RET_CHECK_GT (last_diff, 0.0 );
229
+ RET_CHECK_GT (last_diff, 0 );
343
230
const int64 curr_diff =
344
231
(next_output_timestamp_ - cc->InputTimestamp ()).Value ();
345
- if (curr_diff > 0.0 ) {
232
+ if (curr_diff > 0 ) {
346
233
break ;
347
234
}
348
235
OutputWithinLimits (cc, (std::abs (curr_diff) > last_diff
@@ -431,6 +318,9 @@ ::mediapipe::Status PacketResamplerCalculator::Close(CalculatorContext* cc) {
431
318
OutputWithinLimits (cc,
432
319
last_packet_.At (PeriodIndexToTimestamp (period_count_)));
433
320
}
321
+ if (!packet_reservoir_->IsEmpty ()) {
322
+ OutputWithinLimits (cc, packet_reservoir_->GetSample ());
323
+ }
434
324
return ::mediapipe::OkStatus ();
435
325
}
436
326
0 commit comments