Skip to content

Commit

Permalink
for bug #241, merge big chunks for publish, no use.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Dec 2, 2014
1 parent 463e1fb commit 6b57597
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 21 deletions.
10 changes: 8 additions & 2 deletions trunk/src/app/srs_app_recv_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,16 @@ void SrsPublishRecvThread::on_thread_start()
{
// we donot set the auto response to false,
// for the main thread never send message.

// notice the protocol stack to merge chunks to big buffer.
// for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
// so we can use read_fullly(64KB) to merge all chunks in 1s.
// @see https://github.com/winlinvip/simple-rtmp-server/issues/241
rtmp->set_merge_chunks(true);
}

void SrsPublishRecvThread::on_thread_stop()
{
// we donot set the auto response to true,
// for we donot set to false yet.
// revert state
rtmp->set_merge_chunks(false);
}
25 changes: 20 additions & 5 deletions trunk/src/kernel/srs_kernel_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>

#define SOCKET_READ_SIZE 4096
// 4096=4KB
// 16384=16KB
// 65536=64KB
#define SOCKET_READ_SIZE 16384

ISrsBufferReader::ISrsBufferReader()
{
Expand All @@ -38,10 +41,13 @@ ISrsBufferReader::~ISrsBufferReader()

SrsBuffer::SrsBuffer()
{
merge_chunks_in_big_buffer = false;
buffer = new char[SOCKET_READ_SIZE];
}

SrsBuffer::~SrsBuffer()
{
srs_freep(buffer);
}

int SrsBuffer::length()
Expand Down Expand Up @@ -88,11 +94,15 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
}

while (length() < required_size) {
char buffer[SOCKET_READ_SIZE];

ssize_t nread;
if ((ret = reader->read(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {
return ret;
if (merge_chunks_in_big_buffer) {
if ((ret = reader->read_fully(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {
return ret;
}
} else {
if ((ret = reader->read(buffer, SOCKET_READ_SIZE, &nread)) != ERROR_SUCCESS) {
return ret;
}
}

srs_assert((int)nread > 0);
Expand All @@ -102,4 +112,9 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size)
return ret;
}

void SrsBuffer::set_merge_chunks(bool v)
{
merge_chunks_in_big_buffer = v;
}


26 changes: 26 additions & 0 deletions trunk/src/kernel/srs_kernel_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,16 @@ class ISrsBufferReader
virtual ~ISrsBufferReader();
// for protocol/amf0/msg-codec
public:
/**
* read some bytes of data.
* @param nread, the actually read size, NULL to ignore.
*/
virtual int read(void* buf, size_t size, ssize_t* nread) = 0;
/**
* read specified size bytes of data
* @param nread, the actually read size, NULL to ignore.
*/
virtual int read_fully(void* buf, size_t size, ssize_t* nread) = 0;
};

/**
Expand All @@ -53,6 +62,15 @@ class SrsBuffer
{
private:
std::vector<char> data;
/**
* notice the protocol stack to merge chunks to big buffer.
* for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
* so we can use read_fullly(64KB) to merge all chunks in 1s.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/241
*/
bool merge_chunks_in_big_buffer;
// the socket recv buffer.
char* buffer;
public:
SrsBuffer();
virtual ~SrsBuffer();
Expand Down Expand Up @@ -89,6 +107,14 @@ class SrsBuffer
* @remark, we actually maybe read more than required_size, maybe 4k for example.
*/
virtual int grow(ISrsBufferReader* reader, int required_size);
public:
/**
* notice the protocol stack to merge chunks to big buffer.
* for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
* so we can use read_fullly(64KB) to merge all chunks in 1s.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/241
*/
virtual void set_merge_chunks(bool v);
};

#endif
20 changes: 6 additions & 14 deletions trunk/src/rtmp/srs_protocol_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
| IBufferReader | | IStatistic | | IBufferWriter |
+---------------+ +--------------------+ +---------------+
| + read() | | + get_recv_bytes() | | + write() |
+------+--------+ | + get_recv_bytes() | | + writev() |
/ \ +---+--------------+-+ +-------+-------+
| / \ / \ / \
| + readfully() | | + get_recv_bytes() | | + writev() |
+------+--------+ +---+--------------+-+ +-------+-------+
/ \ / \ / \ / \
| | | |
+------+------------------+-+ +-----+----------------+--+
| IProtocolReader | | IProtocolWriter |
+---------------------------+ +-------------------------+
| + readfully() | | + set_send_timeout() |
| + set_recv_timeout() | +-------+-----------------+
+------------+--------------+ / \
/ \ |
| + set_recv_timeout() | | + set_send_timeout() |
+------------+--------------+ +-------+-----------------+
/ \ / \
| |
+--+-----------------------------+-+
| IProtocolReaderWriter |
Expand Down Expand Up @@ -123,13 +122,6 @@ class ISrsProtocolReader : public virtual ISrsBufferReader, public virtual ISrsP
* get the recv timeout in us.
*/
virtual int64_t get_recv_timeout() = 0;
// for handshake.
public:
/**
* read specified size bytes of data
* @param nread, the actually read size, NULL to ignore.
*/
virtual int read_fully(void* buf, size_t size, ssize_t* nread) = 0;
};

/**
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/rtmp/srs_protocol_rtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,11 @@ void SrsRtmpServer::set_auto_response(bool v)
protocol->set_auto_response(v);
}

void SrsRtmpServer::set_merge_chunks(bool v)
{
protocol->set_merge_chunks(v);
}

void SrsRtmpServer::set_recv_timeout(int64_t timeout_us)
{
protocol->set_recv_timeout(timeout_us);
Expand Down
7 changes: 7 additions & 0 deletions trunk/src/rtmp/srs_protocol_rtmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,13 @@ class SrsRtmpServer
*/
virtual void set_auto_response(bool v);
/**
* notice the protocol stack to merge chunks to big buffer.
* for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
* so we can use read_fullly(64KB) to merge all chunks in 1s.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/241
*/
virtual void set_merge_chunks(bool v);
/**
* set/get the recv timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
*/
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/rtmp/srs_protocol_stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,11 @@ int SrsProtocol::manual_response_flush()
return ret;
}

void SrsProtocol::set_merge_chunks(bool v)
{
in_buffer->set_merge_chunks(v);
}

void SrsProtocol::set_recv_timeout(int64_t timeout_us)
{
return skt->set_recv_timeout(timeout_us);
Expand Down
7 changes: 7 additions & 0 deletions trunk/src/rtmp/srs_protocol_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ class SrsProtocol
* @see the auto_response_when_recv and manual_response_queue.
*/
virtual int manual_response_flush();
/**
* notice the protocol stack to merge chunks to big buffer.
* for example, the buffer is 64KB=512kb, it's 1s buffer for 500kbps video stream.
* so we can use read_fullly(64KB) to merge all chunks in 1s.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/241
*/
virtual void set_merge_chunks(bool v);
public:
/**
* set/get the recv timeout in us.
Expand Down
5 changes: 5 additions & 0 deletions trunk/src/utest/srs_utest_kernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ int MockBufferReader::read(void* buf, size_t size, ssize_t* nread)
return ERROR_SUCCESS;
}

int MockBufferReader::read_fully(void* buf, size_t size, ssize_t* nread)
{
return read(buf, size, nread);
}

#ifdef ENABLE_UTEST_KERNEL

VOID TEST(KernelBufferTest, DefaultObject)
Expand Down
1 change: 1 addition & 0 deletions trunk/src/utest/srs_utest_kernel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class MockBufferReader: public ISrsBufferReader
virtual ~MockBufferReader();
public:
virtual int read(void* buf, size_t size, ssize_t* nread);
virtual int read_fully(void* buf, size_t size, ssize_t* nread);
};

class MockSrsFileWriter : public SrsFileWriter
Expand Down

0 comments on commit 6b57597

Please sign in to comment.