Skip to content

Commit 99c8903

Browse files
committed
Fix tests
1 parent 8fc59cd commit 99c8903

18 files changed

+190
-262
lines changed

src/internal_modules/roc_netio/target_libuv/roc_netio/network_loop.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,11 @@ NetworkLoop::NetworkLoop(core::IPool& packet_pool,
140140
task_sem_.data = this;
141141
task_sem_initialized_ = true;
142142

143-
enable_realtime();
143+
if (!enable_realtime()) {
144+
roc_log(LogInfo,
145+
"network loop: can't set realtime priority of network thread. May need "
146+
"to be root");
147+
}
144148
if (!(started_ = Thread::start())) {
145149
init_status_ = status::StatusErrThread;
146150
return;

src/internal_modules/roc_node/receiver_decoder.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ status::StatusCode ReceiverDecoder::write_packet(address::Interface iface,
169169
roc_panic_if(!bytes);
170170
roc_panic_if(n_bytes == 0);
171171

172+
const core::nanoseconds_t capture_ts = core::timestamp(core::ClockUnix);
173+
172174
if (n_bytes > packet_factory_.packet_buffer_size()) {
173175
roc_log(LogError,
174176
"receiver decoder node:"
@@ -195,6 +197,7 @@ status::StatusCode ReceiverDecoder::write_packet(address::Interface iface,
195197
}
196198

197199
packet->add_flags(packet::Packet::FlagUDP);
200+
packet->udp()->receive_timestamp = capture_ts;
198201
packet->set_buffer(buffer);
199202

200203
packet::IWriter* writer = endpoint_writers_[iface];

src/internal_modules/roc_node/sender_encoder.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ SenderEncoder::write_packet(address::Interface iface, const void* bytes, size_t
226226
roc_panic_if(!bytes);
227227
roc_panic_if(n_bytes == 0);
228228

229+
const core::nanoseconds_t capture_ts = core::timestamp(core::ClockUnix);
230+
229231
if (n_bytes > packet_factory_.packet_buffer_size()) {
230232
roc_log(LogError,
231233
"sender encoder node:"
@@ -252,6 +254,7 @@ SenderEncoder::write_packet(address::Interface iface, const void* bytes, size_t
252254
}
253255

254256
packet->add_flags(packet::Packet::FlagUDP);
257+
packet->udp()->receive_timestamp = capture_ts;
255258
packet->set_buffer(buffer);
256259

257260
packet::IWriter* writer = endpoint_writers_[iface];

src/internal_modules/roc_pipeline/receiver_endpoint.cpp

+4-5
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ packet::IWriter& ReceiverEndpoint::inbound_writer() {
199199
return *this;
200200
}
201201

202-
status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_time) {
202+
status::StatusCode ReceiverEndpoint::pull_packets() {
203203
roc_panic_if(init_status_ != status::StatusOK);
204204

205205
roc_panic_if(!parser_);
@@ -209,7 +209,7 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti
209209
// queue were added in a very short time or are being added currently. It's
210210
// acceptable to consider such packets late and pull them next time.
211211
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
212-
const status::StatusCode code = handle_packet_(packet, current_time);
212+
const status::StatusCode code = handle_packet_(packet);
213213
state_tracker_.unregister_packet();
214214

215215
if (code != status::StatusOK) {
@@ -220,14 +220,13 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti
220220
return status::StatusOK;
221221
}
222222

223-
status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet,
224-
core::nanoseconds_t current_time) {
223+
status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet) {
225224
if (!parser_->parse(*packet, packet->buffer())) {
226225
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't parse");
227226
return status::StatusOK;
228227
}
229228

230-
const status::StatusCode code = session_group_.route_packet(packet, current_time);
229+
const status::StatusCode code = session_group_.route_packet(packet);
231230

232231
if (code == status::StatusNoRoute) {
233232
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't route");

src/internal_modules/roc_pipeline/receiver_endpoint.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,12 @@ class ReceiverEndpoint : public core::RefCounted<ReceiverEndpoint, core::ArenaAl
9393
//! Packets are written to inbound_writer() from network thread.
9494
//! They don't appear in pipeline immediately. Instead, pipeline thread
9595
//! should periodically call pull_packets() to make them available.
96-
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
96+
ROC_ATTR_NODISCARD status::StatusCode pull_packets();
9797

9898
private:
9999
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);
100100

101-
status::StatusCode handle_packet_(const packet::PacketPtr& packet,
102-
core::nanoseconds_t current_time);
101+
status::StatusCode handle_packet_(const packet::PacketPtr& packet);
103102

104103
const address::Protocol proto_;
105104

src/internal_modules/roc_pipeline/receiver_session_group.cpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,11 @@ void ReceiverSessionGroup::reclock_sessions(core::nanoseconds_t playback_time) {
140140
}
141141
}
142142

143-
status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet,
144-
core::nanoseconds_t current_time) {
143+
status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet) {
145144
roc_panic_if(init_status_ != status::StatusOK);
146145

147146
if (packet->has_flags(packet::Packet::FlagControl)) {
148-
return route_control_packet_(packet, current_time);
147+
return route_control_packet_(packet);
149148
}
150149

151150
return route_transport_packet_(packet);
@@ -344,15 +343,14 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
344343
}
345344

346345
status::StatusCode
347-
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
348-
core::nanoseconds_t current_time) {
346+
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) {
349347
if (!rtcp_communicator_) {
350348
roc_panic("session group: rtcp communicator is null");
351349
}
352350

353351
// This will invoke IParticipant methods implemented by us,
354352
// in particular notify_recv_stream() and maybe halt_recv_stream().
355-
return rtcp_communicator_->process_packet(packet, current_time);
353+
return rtcp_communicator_->process_packet(packet);
356354
}
357355

358356
bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) {

src/internal_modules/roc_pipeline/receiver_session_group.h

+2-4
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
9191
void reclock_sessions(core::nanoseconds_t playback_time);
9292

9393
//! Route packet to session.
94-
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
95-
core::nanoseconds_t current_time);
94+
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);
9695

9796
//! Get number of sessions in group.
9897
size_t num_sessions() const;
@@ -130,8 +129,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
130129
virtual void halt_recv_stream(packet::stream_source_t send_source_id);
131130

132131
status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
133-
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
134-
core::nanoseconds_t current_time);
132+
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);
135133

136134
bool can_create_session_(const packet::PacketPtr& packet);
137135

src/internal_modules/roc_pipeline/receiver_slot.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,19 @@ status::StatusCode ReceiverSlot::refresh(core::nanoseconds_t current_time,
8484
status::StatusCode code = status::NoStatus;
8585

8686
if (source_endpoint_) {
87-
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
87+
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
8888
return code;
8989
}
9090
}
9191

9292
if (repair_endpoint_) {
93-
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
93+
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
9494
return code;
9595
}
9696
}
9797

9898
if (control_endpoint_) {
99-
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
99+
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
100100
return code;
101101
}
102102
}

src/internal_modules/roc_pipeline/sender_endpoint.cpp

+4-5
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ packet::IWriter* SenderEndpoint::inbound_writer() {
184184
return this;
185185
}
186186

187-
status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time) {
187+
status::StatusCode SenderEndpoint::pull_packets() {
188188
roc_panic_if(init_status_ != status::StatusOK);
189189

190190
if (!parser_) {
@@ -197,7 +197,7 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time
197197
// queue were added in a very short time or are being added currently. It's
198198
// acceptable to consider such packets late and pull them next time.
199199
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
200-
const status::StatusCode code = handle_packet_(packet, current_time);
200+
const status::StatusCode code = handle_packet_(packet);
201201
state_tracker_.unregister_packet();
202202

203203
if (code != status::StatusOK) {
@@ -208,14 +208,13 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time
208208
return status::StatusOK;
209209
}
210210

211-
status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet,
212-
core::nanoseconds_t current_time) {
211+
status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet) {
213212
if (!parser_->parse(*packet, packet->buffer())) {
214213
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't parse");
215214
return status::StatusOK;
216215
}
217216

218-
const status::StatusCode code = sender_session_.route_packet(packet, current_time);
217+
const status::StatusCode code = sender_session_.route_packet(packet);
219218

220219
if (code == status::StatusNoRoute) {
221220
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't route");

src/internal_modules/roc_pipeline/sender_endpoint.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,12 @@ class SenderEndpoint : public core::NonCopyable<>, private packet::IWriter {
9191
//! Packets are written to inbound_writer() from network thread.
9292
//! They don't appear in pipeline immediately. Instead, pipeline thread
9393
//! should periodically call pull_packets() to make them available.
94-
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
94+
ROC_ATTR_NODISCARD status::StatusCode pull_packets();
9595

9696
private:
9797
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);
9898

99-
status::StatusCode handle_packet_(const packet::PacketPtr& packet,
100-
core::nanoseconds_t current_time);
99+
status::StatusCode handle_packet_(const packet::PacketPtr& packet);
101100

102101
const address::Protocol proto_;
103102

src/internal_modules/roc_pipeline/sender_session.cpp

+4-7
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,7 @@ status::StatusCode SenderSession::refresh(core::nanoseconds_t current_time,
281281
return status::StatusOK;
282282
}
283283

284-
status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
285-
core::nanoseconds_t current_time) {
284+
status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet) {
286285
roc_panic_if(init_status_ != status::StatusOK);
287286

288287
if (fail_status_ != status::NoStatus) {
@@ -294,7 +293,7 @@ status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
294293
roc_panic("sender session: unexpected non-control packet");
295294
}
296295

297-
return route_control_packet_(packet, current_time);
296+
return route_control_packet_(packet);
298297
}
299298

300299
status::StatusCode SenderSession::write(audio::Frame& frame) {
@@ -439,15 +438,13 @@ void SenderSession::start_feedback_monitor_() {
439438
feedback_monitor_->start();
440439
}
441440

442-
status::StatusCode
443-
SenderSession::route_control_packet_(const packet::PacketPtr& packet,
444-
core::nanoseconds_t current_time) {
441+
status::StatusCode SenderSession::route_control_packet_(const packet::PacketPtr& packet) {
445442
if (!rtcp_communicator_) {
446443
roc_panic("sender session: rtcp communicator is null");
447444
}
448445

449446
// This will invoke IParticipant methods implemented by us.
450-
return rtcp_communicator_->process_packet(packet, current_time);
447+
return rtcp_communicator_->process_packet(packet);
451448
}
452449

453450
} // namespace pipeline

src/internal_modules/roc_pipeline/sender_session.h

+2-4
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@ class SenderSession : public core::NonCopyable<>,
9696
//! This way feedback packets from receiver reach sender pipeline.
9797
//! Packets are stored inside internal pipeline queues, and then fetched
9898
//! when frame are passed from frame_writer().
99-
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
100-
core::nanoseconds_t current_time);
99+
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);
101100

102101
//! Get slot metrics.
103102
//! @remarks
@@ -133,8 +132,7 @@ class SenderSession : public core::NonCopyable<>,
133132

134133
void start_feedback_monitor_();
135134

136-
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
137-
core::nanoseconds_t current_time);
135+
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);
138136

139137
core::IArena& arena_;
140138

src/internal_modules/roc_pipeline/sender_slot.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -142,19 +142,19 @@ status::StatusCode SenderSlot::refresh(core::nanoseconds_t current_time,
142142
status::StatusCode code = status::NoStatus;
143143

144144
if (source_endpoint_) {
145-
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
145+
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
146146
return code;
147147
}
148148
}
149149

150150
if (repair_endpoint_) {
151-
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
151+
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
152152
return code;
153153
}
154154
}
155155

156156
if (control_endpoint_) {
157-
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
157+
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
158158
return code;
159159
}
160160
}

src/internal_modules/roc_rtcp/communicator.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ size_t Communicator::total_streams() const {
7272
return reporter_.total_streams();
7373
}
7474

75-
status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet,
76-
core::nanoseconds_t current_time) {
75+
status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet) {
7776
roc_panic_if(init_status_ != status::StatusOK);
7877

7978
roc_panic_if_msg(!packet, "rtcp communicator: null packet");

src/internal_modules/roc_rtcp/communicator.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ class Communicator : public core::NonCopyable<> {
7676

7777
//! Parse and process incoming packet.
7878
//! Invokes IParticipant methods during processing.
79-
ROC_ATTR_NODISCARD status::StatusCode
80-
process_packet(const packet::PacketPtr& packet, core::nanoseconds_t current_time);
79+
ROC_ATTR_NODISCARD status::StatusCode process_packet(const packet::PacketPtr& packet);
8180

8281
//! When we should generate packets next time.
8382
//! Returns absolute time.

src/tests/roc_pipeline/test_helpers/control_writer.h

+1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ class ControlWriter : public core::NonCopyable<> {
180180

181181
pp->udp()->src_addr = src_addr_;
182182
pp->udp()->dst_addr = dst_addr_;
183+
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);
183184

184185
pp->set_buffer(buffer);
185186

src/tests/roc_pipeline/test_loopback_sink_2_source.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ class PacketProxy : core::NonCopyable<> {
179179
continue;
180180
}
181181
print_packet_(pp);
182+
set_ts_(pp);
182183
CHECK(source_writer_);
183184
LONGS_EQUAL(status::StatusOK, source_writer_->write(copy_packet_(pp)));
184185
n_source_++;
@@ -187,11 +188,13 @@ class PacketProxy : core::NonCopyable<> {
187188
continue;
188189
}
189190
print_packet_(pp);
191+
set_ts_(pp);
190192
CHECK(repair_writer_);
191193
LONGS_EQUAL(status::StatusOK, repair_writer_->write(copy_packet_(pp)));
192194
n_repair_++;
193195
} else if (pp->flags() & packet::Packet::FlagControl) {
194196
print_packet_(pp);
197+
set_ts_(pp);
195198
CHECK(control_writer_);
196199
LONGS_EQUAL(status::StatusOK, control_writer_->write(copy_packet_(pp)));
197200
n_control_++;
@@ -227,6 +230,12 @@ class PacketProxy : core::NonCopyable<> {
227230
}
228231
}
229232

233+
void set_ts_(packet::PacketPtr& pp) {
234+
if (pp->udp()) {
235+
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);
236+
}
237+
}
238+
230239
packet::PacketFactory& packet_factory_;
231240

232241
address::SocketAddr proxy_addr_;

0 commit comments

Comments
 (0)