Skip to content

Commit 51aecb8

Browse files
committed
for ossrs#250, decode the ts packet header and adaptation field.
1 parent bd39590 commit 51aecb8

File tree

5 files changed

+300
-12
lines changed

5 files changed

+300
-12
lines changed

trunk/src/app/srs_app_mpegts_udp.cpp

+25-6
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,21 @@ using namespace std;
3333
#include <srs_kernel_log.hpp>
3434
#include <srs_app_config.hpp>
3535
#include <srs_kernel_ts.hpp>
36+
#include <srs_kernel_stream.hpp>
37+
#include <srs_kernel_ts.hpp>
38+
#include <srs_core_autofree.hpp>
3639

3740
#ifdef SRS_AUTO_STREAM_CASTER
3841

3942
SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
4043
{
44+
stream = new SrsStream();
4145
output = _srs_config->get_stream_caster_output(c);
4246
}
4347

4448
SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
4549
{
50+
srs_freep(stream);
4651
}
4752

4853
int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
@@ -59,21 +64,35 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
5964
}
6065
srs_info("udp: got %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf);
6166

62-
// process each ts packet
67+
// use stream to parse ts packet.
6368
for (int i = 0; i < nb_buf; i += SRS_TS_PACKET_SIZE) {
64-
char* ts_packet = buf + i;
65-
if ((ret = on_ts_packet(ts_packet)) != ERROR_SUCCESS) {
66-
srs_warn("mpegts: ignore ts packet error. ret=%d", ret);
67-
continue;
69+
if ((ret = stream->initialize(buf + i, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
70+
return ret;
71+
}
72+
73+
// process each ts packet
74+
if ((ret = on_ts_packet(stream)) != ERROR_SUCCESS) {
75+
break;
6876
}
77+
srs_info("mpegts: parse ts packet completed");
6978
}
79+
srs_info("mpegts: parse udp packet completed");
7080

7181
return ret;
7282
}
7383

74-
int SrsMpegtsOverUdp::on_ts_packet(char* ts_packet)
84+
int SrsMpegtsOverUdp::on_ts_packet(SrsStream* stream)
7585
{
7686
int ret = ERROR_SUCCESS;
87+
88+
SrsTsPacket* packet = new SrsTsPacket();
89+
SrsAutoFree(SrsTsPacket, packet);
90+
91+
if ((ret = packet->decode(stream)) != ERROR_SUCCESS) {
92+
srs_error("mpegts: decode ts packet failed. ret=%d", ret);
93+
return ret;
94+
}
95+
7796
return ret;
7897
}
7998

trunk/src/app/srs_app_mpegts_udp.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3333
class sockaddr_in;
3434
#include <string>
3535

36+
class SrsStream;
3637
class SrsConfDirective;
3738

3839
#ifdef SRS_AUTO_STREAM_CASTER
@@ -43,6 +44,7 @@ class SrsConfDirective;
4344
class SrsMpegtsOverUdp
4445
{
4546
private:
47+
SrsStream* stream;
4648
std::string output;
4749
public:
4850
SrsMpegtsOverUdp(SrsConfDirective* c);
@@ -60,9 +62,9 @@ class SrsMpegtsOverUdp
6062
virtual int on_udp_packet(sockaddr_in* from, char* buf, int nb_buf);
6163
private:
6264
/**
63-
* when got a ts packet, in size TS_PACKET_SIZE.
65+
* the stream contains the ts packet to parse.
6466
*/
65-
virtual int on_ts_packet(char* ts_packet);
67+
virtual int on_ts_packet(SrsStream* stream);
6668
};
6769

6870
#endif

trunk/src/kernel/srs_kernel_error.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
219219
#define ERROR_MP3_DECODE_ERROR 4009
220220
#define ERROR_STREAM_CASTER_ENGINE 4010
221221
#define ERROR_STREAM_CASTER_PORT 4011
222+
#define ERROR_STREAM_CASTER_TS_HEADER 4012
223+
#define ERROR_STREAM_CASTER_TS_SYNC_BYTE 4013
224+
#define ERROR_STREAM_CASTER_TS_AF 4014
222225

223226
/**
224227
* whether the error code is an system control error.

trunk/src/kernel/srs_kernel_ts.cpp

+256-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ using namespace std;
3838
#include <srs_kernel_avc.hpp>
3939
#include <srs_kernel_buffer.hpp>
4040
#include <srs_kernel_utility.hpp>
41+
#include <srs_kernel_stream.hpp>
4142

4243
// in ms, for HLS aac sync time.
4344
#define SRS_CONF_DEFAULT_AAC_SYNC 100
@@ -418,8 +419,65 @@ SrsTsPacket::~SrsTsPacket()
418419
srs_freep(adaptation_field);
419420
}
420421

421-
SrsTsAdaptationField::SrsTsAdaptationField()
422+
int SrsTsPacket::decode(SrsStream* stream)
422423
{
424+
int ret = ERROR_SUCCESS;
425+
426+
int pos = stream->pos();
427+
428+
// 4B ts packet header.
429+
if (!stream->require(4)) {
430+
ret = ERROR_STREAM_CASTER_TS_HEADER;
431+
srs_error("ts: demux header failed. ret=%d", ret);
432+
return ret;
433+
}
434+
435+
sync_byte = stream->read_1bytes();
436+
if (sync_byte != 0x47) {
437+
ret = ERROR_STREAM_CASTER_TS_SYNC_BYTE;
438+
srs_error("ts: sync_bytes must be 0x47, actual=%#x. ret=%d", sync_byte, ret);
439+
return ret;
440+
}
441+
442+
int16_t pidv = stream->read_2bytes();
443+
transport_error_indicator = (pidv >> 15) & 0x01;
444+
payload_unit_start_indicator = (pidv >> 14) & 0x01;
445+
transport_priority = (pidv >> 13) & 0x01;
446+
pid = (SrsTsPid)(pidv & 0x1FFF);
447+
448+
int8_t ccv = stream->read_1bytes();
449+
transport_scrambling_control = (SrsTsScrambled)((ccv >> 6) & 0x03);
450+
adaption_field_control = (SrsTsAdaptationFieldType)((ccv >> 4) & 0x03);
451+
continuity_counter = (SrsTsPid)(ccv & 0x0F);
452+
453+
// TODO: FIXME: create pids map when got new pid.
454+
455+
srs_info("ts: header sync=%#x error=%d unit_start=%d priotiry=%d pid=%d scrambling=%d adaption=%d counter=%d",
456+
sync_byte, transport_error_indicator, payload_unit_start_indicator, transport_priority, pid,
457+
transport_scrambling_control, adaption_field_control, continuity_counter);
458+
459+
// optional: adaptation field
460+
if (adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) {
461+
srs_freep(adaptation_field);
462+
adaptation_field = new SrsTsAdaptationField(this);
463+
464+
if ((ret = adaptation_field->decode(stream)) != ERROR_SUCCESS) {
465+
srs_error("ts: demux af faield. ret=%d", ret);
466+
return ret;
467+
}
468+
srs_verbose("ts: demux af ok.");
469+
}
470+
471+
// calc the user defined data size for payload.
472+
int nb_payload = SRS_TS_PACKET_SIZE - (stream->pos() - pos);
473+
474+
return ret;
475+
}
476+
477+
SrsTsAdaptationField::SrsTsAdaptationField(SrsTsPacket* pkt)
478+
{
479+
packet = pkt;
480+
423481
adaption_field_length = 0;
424482
discontinuity_indicator = 0;
425483
random_access_indicator = 0;
@@ -456,6 +514,203 @@ SrsTsAdaptationField::SrsTsAdaptationField()
456514

457515
SrsTsAdaptationField::~SrsTsAdaptationField()
458516
{
517+
srs_freep(transport_private_data);
518+
}
519+
520+
int SrsTsAdaptationField::decode(SrsStream* stream)
521+
{
522+
int ret = ERROR_SUCCESS;
523+
524+
if (!stream->require(2)) {
525+
ret = ERROR_STREAM_CASTER_TS_AF;
526+
srs_error("ts: demux af failed. ret=%d", ret);
527+
return ret;
528+
}
529+
adaption_field_length = stream->read_1bytes();
530+
531+
// When the adaptation_field_control value is '11', the value of the adaptation_field_length shall
532+
// be in the range 0 to 182.
533+
if (packet->adaption_field_control == SrsTsAdaptationFieldTypeBoth && adaption_field_length > 182) {
534+
ret = ERROR_STREAM_CASTER_TS_AF;
535+
srs_error("ts: demux af length failed, must in [0, 182], actual=%d. ret=%d", adaption_field_length, ret);
536+
return ret;
537+
}
538+
// When the adaptation_field_control value is '10', the value of the adaptation_field_length shall
539+
// be 183.
540+
if (packet->adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly && adaption_field_length != 183) {
541+
ret = ERROR_STREAM_CASTER_TS_AF;
542+
srs_error("ts: demux af length failed, must be 183, actual=%d. ret=%d", adaption_field_length, ret);
543+
return ret;
544+
}
545+
546+
// no adaptation field.
547+
if (adaption_field_length == 0) {
548+
srs_info("ts: demux af empty.");
549+
return ret;
550+
}
551+
552+
// the adaptation field start at here.
553+
int pos_af = stream->pos();
554+
int8_t tmpv = stream->read_1bytes();
555+
556+
discontinuity_indicator = (tmpv >> 7) & 0x01;
557+
random_access_indicator = (tmpv >> 6) & 0x01;
558+
elementary_stream_priority_indicator = (tmpv >> 5) & 0x01;
559+
PCR_flag = (tmpv >> 4) & 0x01;
560+
OPCR_flag = (tmpv >> 3) & 0x01;
561+
splicing_point_flag = (tmpv >> 2) & 0x01;
562+
transport_private_data_flag = (tmpv >> 1) & 0x01;
563+
adaptation_field_extension_flag = (tmpv >> 0) & 0x01;
564+
565+
if (PCR_flag) {
566+
if (!stream->require(6)) {
567+
ret = ERROR_STREAM_CASTER_TS_AF;
568+
srs_error("ts: demux af PCR_flag failed. ret=%d", ret);
569+
return ret;
570+
}
571+
572+
char* pp = NULL;
573+
char* p = stream->data();
574+
stream->skip(6);
575+
576+
pp = (char*)&program_clock_reference_base;
577+
pp[5] = *p++;
578+
pp[4] = *p++;
579+
pp[3] = *p++;
580+
pp[2] = *p++;
581+
pp[1] = *p++;
582+
pp[0] = *p++;
583+
584+
// @remark, use pcr base and ignore the extension
585+
// @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370
586+
program_clock_reference_extension = program_clock_reference_base & 0x1ff;
587+
program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL;
588+
}
589+
590+
if (OPCR_flag) {
591+
if (!stream->require(6)) {
592+
ret = ERROR_STREAM_CASTER_TS_AF;
593+
srs_error("ts: demux af OPCR_flag failed. ret=%d", ret);
594+
return ret;
595+
}
596+
597+
char* pp = NULL;
598+
char* p = stream->data();
599+
stream->skip(6);
600+
601+
pp = (char*)&original_program_clock_reference_base;
602+
pp[5] = *p++;
603+
pp[4] = *p++;
604+
pp[3] = *p++;
605+
pp[2] = *p++;
606+
pp[1] = *p++;
607+
pp[0] = *p++;
608+
609+
// @remark, use pcr base and ignore the extension
610+
// @see https://github.com/winlinvip/simple-rtmp-server/issues/250#issuecomment-71349370
611+
original_program_clock_reference_extension = program_clock_reference_base & 0x1ff;
612+
original_program_clock_reference_base = (program_clock_reference_base >> 15) & 0x1ffffffffLL;
613+
}
614+
615+
if (splicing_point_flag) {
616+
if (!stream->require(1)) {
617+
ret = ERROR_STREAM_CASTER_TS_AF;
618+
srs_error("ts: demux af splicing_point_flag failed. ret=%d", ret);
619+
return ret;
620+
}
621+
splice_countdown = stream->read_1bytes();
622+
}
623+
624+
if (transport_private_data_flag) {
625+
if (!stream->require(1)) {
626+
ret = ERROR_STREAM_CASTER_TS_AF;
627+
srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret);
628+
return ret;
629+
}
630+
transport_private_data_length = (u_int8_t)stream->read_1bytes();
631+
632+
if (!stream->require(transport_private_data_length)) {
633+
ret = ERROR_STREAM_CASTER_TS_AF;
634+
srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret);
635+
return ret;
636+
}
637+
srs_freep(transport_private_data);
638+
transport_private_data = new char[transport_private_data_length];
639+
stream->read_bytes(transport_private_data, transport_private_data_length);
640+
}
641+
642+
if (adaptation_field_extension_flag) {
643+
int pos_af_ext = stream->pos();
644+
645+
if (!stream->require(2)) {
646+
ret = ERROR_STREAM_CASTER_TS_AF;
647+
srs_error("ts: demux af adaptation_field_extension_flag failed. ret=%d", ret);
648+
return ret;
649+
}
650+
adaptation_field_extension_length = (u_int8_t)stream->read_1bytes();
651+
ltw_flag = stream->read_1bytes();
652+
653+
piecewise_rate_flag = (ltw_flag >> 6) & 0x01;
654+
seamless_splice_flag = (ltw_flag >> 5) & 0x01;
655+
ltw_flag = (ltw_flag >> 7) & 0x01;
656+
657+
if (ltw_flag) {
658+
if (!stream->require(2)) {
659+
ret = ERROR_STREAM_CASTER_TS_AF;
660+
srs_error("ts: demux af ltw_flag failed. ret=%d", ret);
661+
return ret;
662+
}
663+
ltw_offset = stream->read_2bytes();
664+
665+
ltw_valid_flag = (ltw_offset >> 15) &0x01;
666+
ltw_offset &= 0x7FFF;
667+
}
668+
669+
if (piecewise_rate_flag) {
670+
if (!stream->require(3)) {
671+
ret = ERROR_STREAM_CASTER_TS_AF;
672+
srs_error("ts: demux af piecewise_rate_flag failed. ret=%d", ret);
673+
return ret;
674+
}
675+
piecewise_rate = stream->read_3bytes();
676+
677+
piecewise_rate &= 0x3FFFFF;
678+
}
679+
680+
if (seamless_splice_flag) {
681+
if (!stream->require(5)) {
682+
ret = ERROR_STREAM_CASTER_TS_AF;
683+
srs_error("ts: demux af seamless_splice_flag failed. ret=%d", ret);
684+
return ret;
685+
}
686+
marker_bit0 = stream->read_1bytes();
687+
DTS_next_AU1 = stream->read_2bytes();
688+
DTS_next_AU2 = stream->read_2bytes();
689+
690+
splice_type = (marker_bit0 >> 4) & 0x0F;
691+
DTS_next_AU0 = (marker_bit0 >> 1) & 0x07;
692+
marker_bit0 &= 0x01;
693+
694+
marker_bit1 = DTS_next_AU1 & 0x01;
695+
DTS_next_AU1 = (DTS_next_AU1 >> 1) & 0x7FFF;
696+
697+
marker_bit2 = DTS_next_AU2 & 0x01;
698+
DTS_next_AU2 = (DTS_next_AU2 >> 1) & 0x7FFF;
699+
}
700+
701+
nb_af_ext_reserved = adaptation_field_extension_length - (stream->pos() - pos_af_ext);
702+
stream->skip(nb_af_ext_reserved);
703+
}
704+
705+
nb_af_reserved = adaption_field_length - (stream->pos() - pos_af);
706+
stream->skip(nb_af_reserved);
707+
708+
srs_info("ts: af parsed, discontinuity=%d random=%d priority=%d PCR=%d OPCR=%d slicing=%d private=%d extension=%d/%d pcr=%"PRId64"/%d opcr=%"PRId64"/%d",
709+
discontinuity_indicator, random_access_indicator, elementary_stream_priority_indicator, PCR_flag, OPCR_flag, splicing_point_flag,
710+
transport_private_data_flag, adaptation_field_extension_flag, adaptation_field_extension_length, program_clock_reference_base,
711+
program_clock_reference_extension, original_program_clock_reference_base, original_program_clock_reference_extension);
712+
713+
return ret;
459714
}
460715

461716
SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w)

0 commit comments

Comments
 (0)