@@ -57,6 +57,68 @@ ISrsUdpHandler::~ISrsUdpHandler()
57
57
{
58
58
}
59
59
60
+ SrsMpegtsQueue::SrsMpegtsQueue ()
61
+ {
62
+ nb_audios = nb_videos = 0 ;
63
+ }
64
+
65
+ SrsMpegtsQueue::~SrsMpegtsQueue ()
66
+ {
67
+ std::map<int64_t , SrsSharedPtrMessage*>::iterator it;
68
+ for (it = msgs.begin (); it != msgs.end (); ++it) {
69
+ SrsSharedPtrMessage* msg = it->second ;
70
+ srs_freep (msg);
71
+ }
72
+ msgs.clear ();
73
+ }
74
+
75
+ int SrsMpegtsQueue::push (SrsSharedPtrMessage* msg)
76
+ {
77
+ int ret = ERROR_SUCCESS;
78
+
79
+ if (msgs.find (msg->timestamp ) != msgs.end ()) {
80
+ srs_warn (" mpegts: free the msg for dts exists, dts=%" PRId64, msg->timestamp );
81
+ srs_freep (msg);
82
+ return ret;
83
+ }
84
+
85
+ if (msg->is_audio ()) {
86
+ nb_audios++;
87
+ }
88
+
89
+ if (msg->is_video ()) {
90
+ nb_videos++;
91
+ }
92
+
93
+ msgs[msg->timestamp ] = msg;
94
+
95
+ return ret;
96
+ }
97
+
98
+ SrsSharedPtrMessage* SrsMpegtsQueue::dequeue ()
99
+ {
100
+ // got 2+ videos and audios, ok to dequeue.
101
+ bool av_ok = nb_videos >= 2 && nb_audios >= 2 ;
102
+ // 100 videos about 30s, while 300 audios about 30s
103
+ bool av_overflow = nb_videos > 100 || nb_audios > 300 ;
104
+
105
+ if (av_ok || av_overflow) {
106
+ std::map<int64_t , SrsSharedPtrMessage*>::iterator it = msgs.begin ();
107
+ SrsSharedPtrMessage* msg = it->second ;
108
+ msgs.erase (it);
109
+
110
+ if (msg->is_audio ()) {
111
+ nb_audios--;
112
+ }
113
+
114
+ if (msg->is_video ()) {
115
+ nb_videos--;
116
+ }
117
+ }
118
+
119
+ return NULL ;
120
+ }
121
+
60
122
SrsMpegtsOverUdp::SrsMpegtsOverUdp (SrsConfDirective* c)
61
123
{
62
124
stream = new SrsStream ();
@@ -72,6 +134,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
72
134
h264_sps_changed = false ;
73
135
h264_pps_changed = false ;
74
136
h264_sps_pps_sent = false ;
137
+ queue = new SrsMpegtsQueue ();
75
138
}
76
139
77
140
SrsMpegtsOverUdp::~SrsMpegtsOverUdp ()
@@ -82,6 +145,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
82
145
srs_freep (stream);
83
146
srs_freep (context);
84
147
srs_freep (avc);
148
+ srs_freep (queue);
85
149
}
86
150
87
151
int SrsMpegtsOverUdp::on_udp_packet (sockaddr_in* from, char * buf, int nb_buf)
@@ -280,11 +344,14 @@ int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsStream* avs)
280
344
281
345
// it may be return error, but we must process all packets.
282
346
if ((ret = write_h264_raw_frame (frame, frame_size, dts, pts)) != ERROR_SUCCESS) {
283
- if (ret = ERROR_H264_DROP_BEFORE_SPS_PPS) {
347
+ if (ret == ERROR_H264_DROP_BEFORE_SPS_PPS) {
284
348
continue ;
285
349
}
286
350
return ret;
287
351
}
352
+
353
+ // for video, drop others with same pts/dts.
354
+ break ;
288
355
}
289
356
290
357
return ret;
@@ -399,15 +466,28 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da
399
466
SrsSharedPtrMessage* msg = NULL ;
400
467
401
468
if ((ret = srs_rtmp_create_msg (type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) {
469
+ srs_error (" mpegts: create shared ptr msg failed. ret=%d" , ret);
402
470
return ret;
403
471
}
404
-
405
472
srs_assert (msg);
406
-
407
- // send out encoded msg.
408
- if ((ret = client->send_and_free_message (msg, stream_id)) != ERROR_SUCCESS) {
473
+
474
+ // push msg to queue.
475
+ if ((ret = queue->push (msg)) != ERROR_SUCCESS) {
476
+ srs_error (" mpegts: push msg to queue failed. ret=%d" , ret);
409
477
return ret;
410
478
}
479
+
480
+ // for all ready msg, dequeue and send out.
481
+ for (;;) {
482
+ if ((msg = queue->dequeue ()) == NULL ) {
483
+ break ;
484
+ }
485
+
486
+ // send out encoded msg.
487
+ if ((ret = client->send_and_free_message (msg, stream_id)) != ERROR_SUCCESS) {
488
+ return ret;
489
+ }
490
+ }
411
491
412
492
return ret;
413
493
}
0 commit comments