Skip to content

Commit 35ab0ea

Browse files
committed
for ossrs#250, use buffer to cache bytes, for system will split the udp packet.
1 parent b38bae2 commit 35ab0ea

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

trunk/src/app/srs_app_mpegts_udp.cpp

+33-5
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,21 @@ using namespace std;
3535
#include <srs_kernel_ts.hpp>
3636
#include <srs_kernel_stream.hpp>
3737
#include <srs_kernel_ts.hpp>
38+
#include <srs_kernel_buffer.hpp>
3839

3940
#ifdef SRS_AUTO_STREAM_CASTER
4041

4142
SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
4243
{
4344
stream = new SrsStream();
4445
context = new SrsTsContext();
46+
buffer = new SrsSimpleBuffer();
4547
output = _srs_config->get_stream_caster_output(c);
4648
}
4749

4850
SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
4951
{
52+
srs_freep(buffer);
5053
srs_freep(stream);
5154
srs_freep(context);
5255
}
@@ -58,16 +61,36 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
5861
std::string peer_ip = inet_ntoa(from->sin_addr);
5962
int peer_port = ntohs(from->sin_port);
6063

64+
// append to buffer.
65+
buffer->append(buf, nb_buf);
66+
67+
// find the sync byte of mpegts.
68+
char* p = buffer->bytes();
69+
for (int i = 0; i < buffer->length(); i++) {
70+
if (p[i] != 0x47) {
71+
continue;
72+
}
73+
74+
if (i > 0) {
75+
buffer->erase(i);
76+
}
77+
break;
78+
}
79+
6180
// drop ts packet when size not modulus by 188
62-
if (nb_buf < SRS_TS_PACKET_SIZE || (nb_buf % SRS_TS_PACKET_SIZE) != 0) {
63-
srs_warn("udp: drop %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf);
81+
if (buffer->length() < SRS_TS_PACKET_SIZE) {
82+
srs_info("udp: wait %s:%d packet %d/%d bytes",
83+
peer_ip.c_str(), peer_port, nb_buf, buffer->length());
6484
return ret;
6585
}
66-
srs_info("udp: got %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf);
86+
srs_info("udp: got %s:%d packet %d/%d bytes",
87+
peer_ip.c_str(), peer_port, nb_buf, buffer->length());
6788

6889
// use stream to parse ts packet.
69-
for (int i = 0; i < nb_buf; i += SRS_TS_PACKET_SIZE) {
70-
if ((ret = stream->initialize(buf + i, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
90+
int nb_packet = buffer->length() / SRS_TS_PACKET_SIZE;
91+
for (int i = 0; i < nb_packet; i++) {
92+
char* p = buffer->bytes() + (i * SRS_TS_PACKET_SIZE);
93+
if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
7194
return ret;
7295
}
7396

@@ -80,6 +103,11 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
80103
}
81104
srs_info("mpegts: parse udp packet completed");
82105

106+
// erase consumed bytes
107+
if (nb_packet > 0) {
108+
buffer->erase(nb_packet * SRS_TS_PACKET_SIZE);
109+
}
110+
83111
return ret;
84112
}
85113

trunk/src/app/srs_app_mpegts_udp.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class sockaddr_in;
3636
class SrsStream;
3737
class SrsTsContext;
3838
class SrsConfDirective;
39+
class SrsSimpleBuffer;
3940

4041
#ifdef SRS_AUTO_STREAM_CASTER
4142

@@ -49,6 +50,7 @@ class SrsMpegtsOverUdp : public ISrsTsHandler
4950
private:
5051
SrsStream* stream;
5152
SrsTsContext* context;
53+
SrsSimpleBuffer* buffer;
5254
std::string output;
5355
public:
5456
SrsMpegtsOverUdp(SrsConfDirective* c);

0 commit comments

Comments
 (0)