Skip to content

Commit 5e85289

Browse files
committed
Slot provides Control endpoints ts
So that control endpoints on receiver and sender could mark unmarked packets with a timestamp. By doing so we avoid necessity of mocking core::timestamp. This note relates to the pipeline test mostly.
1 parent 99c8903 commit 5e85289

File tree

10 files changed

+31
-30
lines changed

10 files changed

+31
-30
lines changed

src/internal_modules/roc_pipeline/receiver_endpoint.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ packet::IWriter& ReceiverEndpoint::inbound_writer() {
199199
return *this;
200200
}
201201

202-
status::StatusCode ReceiverEndpoint::pull_packets() {
202+
status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_time) {
203203
roc_panic_if(init_status_ != status::StatusOK);
204204

205205
roc_panic_if(!parser_);
@@ -209,7 +209,7 @@ status::StatusCode ReceiverEndpoint::pull_packets() {
209209
// queue were added in a very short time or are being added currently. It's
210210
// acceptable to consider such packets late and pull them next time.
211211
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
212-
const status::StatusCode code = handle_packet_(packet);
212+
const status::StatusCode code = handle_packet_(packet, current_time);
213213
state_tracker_.unregister_packet();
214214

215215
if (code != status::StatusOK) {
@@ -220,7 +220,13 @@ status::StatusCode ReceiverEndpoint::pull_packets() {
220220
return status::StatusOK;
221221
}
222222

223-
status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet) {
223+
status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet,
224+
core::nanoseconds_t current_time) {
225+
// Apparently the packet is not from network, set its TS manually.
226+
if (packet->udp() && packet->udp()->receive_timestamp == 0 && current_time != 0) {
227+
packet->udp()->receive_timestamp = current_time;
228+
}
229+
224230
if (!parser_->parse(*packet, packet->buffer())) {
225231
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't parse");
226232
return status::StatusOK;

src/internal_modules/roc_pipeline/receiver_endpoint.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,13 @@ class ReceiverEndpoint : public core::RefCounted<ReceiverEndpoint, core::ArenaAl
9393
//! Packets are written to inbound_writer() from network thread.
9494
//! They don't appear in pipeline immediately. Instead, pipeline thread
9595
//! should periodically call pull_packets() to make them available.
96-
ROC_ATTR_NODISCARD status::StatusCode pull_packets();
96+
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
9797

9898
private:
9999
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);
100100

101-
status::StatusCode handle_packet_(const packet::PacketPtr& packet);
101+
status::StatusCode handle_packet_(const packet::PacketPtr& packet,
102+
core::nanoseconds_t current_time);
102103

103104
const address::Protocol proto_;
104105

src/internal_modules/roc_pipeline/receiver_slot.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,19 @@ status::StatusCode ReceiverSlot::refresh(core::nanoseconds_t current_time,
8484
status::StatusCode code = status::NoStatus;
8585

8686
if (source_endpoint_) {
87-
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
87+
if ((code = source_endpoint_->pull_packets(0)) != status::StatusOK) {
8888
return code;
8989
}
9090
}
9191

9292
if (repair_endpoint_) {
93-
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
93+
if ((code = repair_endpoint_->pull_packets(0)) != status::StatusOK) {
9494
return code;
9595
}
9696
}
9797

9898
if (control_endpoint_) {
99-
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
99+
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
100100
return code;
101101
}
102102
}

src/internal_modules/roc_pipeline/sender_endpoint.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ packet::IWriter* SenderEndpoint::inbound_writer() {
184184
return this;
185185
}
186186

187-
status::StatusCode SenderEndpoint::pull_packets() {
187+
status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time) {
188188
roc_panic_if(init_status_ != status::StatusOK);
189189

190190
if (!parser_) {
@@ -197,7 +197,7 @@ status::StatusCode SenderEndpoint::pull_packets() {
197197
// queue were added in a very short time or are being added currently. It's
198198
// acceptable to consider such packets late and pull them next time.
199199
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
200-
const status::StatusCode code = handle_packet_(packet);
200+
const status::StatusCode code = handle_packet_(packet, current_time);
201201
state_tracker_.unregister_packet();
202202

203203
if (code != status::StatusOK) {
@@ -208,7 +208,13 @@ status::StatusCode SenderEndpoint::pull_packets() {
208208
return status::StatusOK;
209209
}
210210

211-
status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet) {
211+
status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet,
212+
core::nanoseconds_t current_time) {
213+
// Apparently the packet is not from network, set it's TS manually.
214+
if (packet->udp() && packet->udp()->receive_timestamp == 0 && current_time != 0) {
215+
packet->udp()->receive_timestamp = current_time;
216+
}
217+
212218
if (!parser_->parse(*packet, packet->buffer())) {
213219
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't parse");
214220
return status::StatusOK;

src/internal_modules/roc_pipeline/sender_endpoint.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,13 @@ class SenderEndpoint : public core::NonCopyable<>, private packet::IWriter {
9191
//! Packets are written to inbound_writer() from network thread.
9292
//! They don't appear in pipeline immediately. Instead, pipeline thread
9393
//! should periodically call pull_packets() to make them available.
94-
ROC_ATTR_NODISCARD status::StatusCode pull_packets();
94+
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
9595

9696
private:
9797
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);
9898

99-
status::StatusCode handle_packet_(const packet::PacketPtr& packet);
99+
status::StatusCode handle_packet_(const packet::PacketPtr& packet,
100+
core::nanoseconds_t current_time);
100101

101102
const address::Protocol proto_;
102103

src/internal_modules/roc_pipeline/sender_slot.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -142,19 +142,19 @@ status::StatusCode SenderSlot::refresh(core::nanoseconds_t current_time,
142142
status::StatusCode code = status::NoStatus;
143143

144144
if (source_endpoint_) {
145-
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
145+
if ((code = source_endpoint_->pull_packets(0)) != status::StatusOK) {
146146
return code;
147147
}
148148
}
149149

150150
if (repair_endpoint_) {
151-
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
151+
if ((code = repair_endpoint_->pull_packets(0)) != status::StatusOK) {
152152
return code;
153153
}
154154
}
155155

156156
if (control_endpoint_) {
157-
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
157+
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
158158
return code;
159159
}
160160
}

src/internal_modules/roc_rtcp/communicator.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ Communicator::Communicator(const Config& config,
5252
, processed_packet_count_(0)
5353
, generated_packet_count_(0)
5454
, log_limiter_(LogInterval)
55-
, init_status_(status::NoStatus)
56-
, dumper_(dumper) {
55+
, init_status_(status::NoStatus) {
5756
if ((init_status_ = reporter_.init_status()) != status::StatusOK) {
5857
return;
5958
}

src/internal_modules/roc_rtcp/communicator.h

-2
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,6 @@ class Communicator : public core::NonCopyable<> {
170170
core::RateLimiter log_limiter_;
171171

172172
status::StatusCode init_status_;
173-
174-
dbgio::CsvDumper* dumper_;
175173
};
176174

177175
} // namespace rtcp

src/tests/roc_pipeline/test_helpers/control_writer.h

-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ class ControlWriter : public core::NonCopyable<> {
180180

181181
pp->udp()->src_addr = src_addr_;
182182
pp->udp()->dst_addr = dst_addr_;
183-
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);
184183

185184
pp->set_buffer(buffer);
186185

src/tests/roc_pipeline/test_loopback_sink_2_source.cpp

-9
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ class PacketProxy : core::NonCopyable<> {
179179
continue;
180180
}
181181
print_packet_(pp);
182-
set_ts_(pp);
183182
CHECK(source_writer_);
184183
LONGS_EQUAL(status::StatusOK, source_writer_->write(copy_packet_(pp)));
185184
n_source_++;
@@ -188,13 +187,11 @@ class PacketProxy : core::NonCopyable<> {
188187
continue;
189188
}
190189
print_packet_(pp);
191-
set_ts_(pp);
192190
CHECK(repair_writer_);
193191
LONGS_EQUAL(status::StatusOK, repair_writer_->write(copy_packet_(pp)));
194192
n_repair_++;
195193
} else if (pp->flags() & packet::Packet::FlagControl) {
196194
print_packet_(pp);
197-
set_ts_(pp);
198195
CHECK(control_writer_);
199196
LONGS_EQUAL(status::StatusOK, control_writer_->write(copy_packet_(pp)));
200197
n_control_++;
@@ -230,12 +227,6 @@ class PacketProxy : core::NonCopyable<> {
230227
}
231228
}
232229

233-
void set_ts_(packet::PacketPtr& pp) {
234-
if (pp->udp()) {
235-
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);
236-
}
237-
}
238-
239230
packet::PacketFactory& packet_factory_;
240231

241232
address::SocketAddr proxy_addr_;

0 commit comments

Comments
 (0)