Skip to content

Commit

Permalink
performance refine, support 3k+ connections(270kbps). 0.9.130
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 22, 2014
1 parent e9c96af commit 1ae3e6c
Show file tree
Hide file tree
Showing 18 changed files with 230 additions and 162 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.<br/>

## History
* v1.0, 2014-06-22, performance refine, support 3k+ connections(270kbps). 0.9.130
* v1.0, 2014-06-21, support edge [token traverse](https://github.com/winlinvip/simple-rtmp-server/wiki/DRM#tokentraverse), fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129
* v1.0, 2014-06-19, add connections count to api summaries. 0.9.127
* v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126
Expand Down
2 changes: 1 addition & 1 deletion trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ srs_log_level trace;
srs_log_file ./objs/srs.log;
# the max connections.
# if exceed the max connections, server will drop the new connection.
# default: 2000
# default: 12345
max_connections 1000;
# whether start as deamon
# @remark: donot support reload.
Expand Down
2 changes: 1 addition & 1 deletion trunk/configure
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ MODULE_ID="RTMP"
MODULE_DEPENDS=("CORE" "KERNEL")
ModuleLibIncs=(${SRS_OBJS} ${LibSSLRoot})
MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_protocol_rtmp_stack" "srs_protocol_rtmp"
"srs_protocol_handshake" "srs_protocol_utility")
"srs_protocol_handshake" "srs_protocol_utility" "srs_protocol_msg_array")
RTMP_INCS="src/rtmp"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh
RTMP_OBJS="${MODULE_OBJS[@]}"
#
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,7 @@ int SrsConfig::get_max_connections()

SrsConfDirective* conf = root->get("max_connections");
if (!conf || conf->arg0().empty()) {
return 2000;
return SRS_CONF_DEFAULT_MAX_CONNECTIONS;
}

return ::atoi(conf->arg0().c_str());
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_PID_FILE "./objs/srs.pid"
#define SRS_DEFAULT_CONF "conf/srs.conf"

#define SRS_CONF_DEFAULT_MAX_CONNECTIONS 12345
#define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html"
#define SRS_CONF_DEFAULT_HLS_FRAGMENT 10
#define SRS_CONF_DEFAULT_HLS_WINDOW 60
Expand Down
12 changes: 7 additions & 5 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_app_socket.hpp>
#include <srs_app_kbps.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_msg_array.hpp>

// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
Expand Down Expand Up @@ -431,13 +432,16 @@ void SrsEdgeForwarder::stop()
kbps->set_io(NULL, NULL);
}

#define SYS_MAX_EDGE_SEND_MSGS 128
int SrsEdgeForwarder::cycle()
{
int ret = ERROR_SUCCESS;

client->set_recv_timeout(SRS_PULSE_TIMEOUT_US);

SrsPithyPrint pithy_print(SRS_STAGE_EDGE);

SrsSharedPtrMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);

while (pthread->can_loop()) {
// switch to other st-threads.
Expand Down Expand Up @@ -465,8 +469,7 @@ int SrsEdgeForwarder::cycle()

// forward all messages.
int count = 0;
SrsSharedPtrMessage** msgs = NULL;
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward to origin failed. ret=%d", ret);
return ret;
}
Expand All @@ -488,16 +491,15 @@ int SrsEdgeForwarder::cycle()
srs_verbose("no packets to forward.");
continue;
}
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);

// all msgs to forward to origin.
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs.msgs[i];

srs_assert(msg);
msgs[i] = NULL;
msgs.msgs[i] = NULL;

if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("edge publish forwarder send message to server failed. ret=%d", ret);
Expand Down
12 changes: 7 additions & 5 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_protocol_rtmp.hpp>
#include <srs_app_kbps.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_msg_array.hpp>

// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
Expand Down Expand Up @@ -309,6 +310,7 @@ int SrsForwarder::connect_server()
return ret;
}

#define SYS_MAX_FORWARD_SEND_MSGS 128
int SrsForwarder::forward()
{
int ret = ERROR_SUCCESS;
Expand All @@ -317,6 +319,8 @@ int SrsForwarder::forward()

SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);

SrsSharedPtrMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);

while (pthread->can_loop()) {
// switch to other st-threads.
st_usleep(0);
Expand All @@ -339,8 +343,7 @@ int SrsForwarder::forward()

// forward all messages.
int count = 0;
SrsSharedPtrMessage** msgs = NULL;
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward failed. ret=%d", ret);
return ret;
}
Expand All @@ -360,16 +363,15 @@ int SrsForwarder::forward()
srs_verbose("no packets to forward.");
continue;
}
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);

// all msgs to forward.
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs.msgs[i];

srs_assert(msg);
msgs[i] = NULL;
msgs.msgs[i] = NULL;

if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
srs_error("forwarder send message to server failed. ret=%d", ret);
Expand Down
82 changes: 46 additions & 36 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_msg_array.hpp>

// when stream is busy, for example, streaming is already
// publishing, when a new client to request to publish,
Expand Down Expand Up @@ -382,7 +383,7 @@ int SrsRtmpConn::stream_service_cycle()
}

srs_info("start to publish stream %s success", req->stream.c_str());
ret = fmle_publish(source);
ret = fmle_publishing(source);

// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
Expand Down Expand Up @@ -416,7 +417,7 @@ int SrsRtmpConn::stream_service_cycle()
}

srs_info("flash start to publish stream %s success", req->stream.c_str());
ret = flash_publish(source);
ret = flash_publishing(source);

// when edge, notice edge to change state.
// when origin, notice all service to unpublish.
Expand Down Expand Up @@ -476,6 +477,8 @@ int SrsRtmpConn::check_vhost()
return ret;
}

#define SYS_MAX_PLAY_SEND_MSGS 128

int SrsRtmpConn::playing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
Expand All @@ -499,38 +502,43 @@ int SrsRtmpConn::playing(SrsSource* source)
rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_US);

SrsPithyPrint pithy_print(SRS_STAGE_PLAY_USER);

SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS);

bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;

while (true) {
// switch to other st-threads.
st_usleep(0);

// collect elapse for pithy print.
pithy_print.elapse();

// read from client.
if (true) {
SrsMessage* msg = NULL;
ret = rtmp->recv_message(&msg);

srs_verbose("play loop recv message. ret=%d", ret);
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {

if (ret == ERROR_SOCKET_TIMEOUT) {
// it's ok, do nothing.
ret = ERROR_SUCCESS;
} else if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("recv client control message failed. ret=%d", ret);
}
return ret;
}
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret)) {
srs_error("process play control message failed. ret=%d", ret);
} else {
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret)) {
srs_error("process play control message failed. ret=%d", ret);
}
return ret;
}
return ret;
}
}

// get messages from consumer.
SrsSharedPtrMessage** msgs = NULL;
int count = 0;
if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}
Expand All @@ -545,32 +553,29 @@ int SrsRtmpConn::playing(SrsSource* source)
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
}

if (count <= 0) {
srs_verbose("no packets in queue.");
continue;
}
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);

// sendout messages
// @remark, becareful, all msgs must be free explicitly,
// free by send_and_free_message or srs_freep.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
SrsSharedPtrMessage* msg = msgs.msgs[i];

// the send_message will free the msg,
// so set the msgs[i] to NULL.
msgs[i] = NULL;

srs_assert(msg);
msgs.msgs[i] = NULL;

// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->header.timestamp) {
// only when user specifies the duration,
// we start to collect the durations for each message.
if (user_specified_duration_to_stop) {
// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->header.timestamp) {
starttime = msg->header.timestamp;
}
duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp;
}
duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp;

// no need to assert msg, for the rtmp will assert it.
if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
srs_error("send message to client failed. ret=%d", ret);
return ret;
Expand All @@ -579,17 +584,22 @@ int SrsRtmpConn::playing(SrsSource* source)

// if duration specified, and exceed it, stop play live.
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/45
if (req->duration > 0 && duration >= (int64_t)req->duration) {
ret = ERROR_RTMP_DURATION_EXCEED;
srs_trace("stop live for duration exceed. ret=%d", ret);
return ret;
if (user_specified_duration_to_stop) {
if (duration >= (int64_t)req->duration) {
ret = ERROR_RTMP_DURATION_EXCEED;
srs_trace("stop live for duration exceed. ret=%d", ret);
return ret;
}
}

// switch to other threads, to anti dead loop.
st_usleep(0);
}

return ret;
}

int SrsRtmpConn::fmle_publish(SrsSource* source)
int SrsRtmpConn::fmle_publishing(SrsSource* source)
{
int ret = ERROR_SUCCESS;

Expand Down Expand Up @@ -668,7 +678,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
return ret;
}

int SrsRtmpConn::flash_publish(SrsSource* source)
int SrsRtmpConn::flash_publishing(SrsSource* source)
{
int ret = ERROR_SUCCESS;

Expand Down
5 changes: 3 additions & 2 deletions trunk/src/app/srs_app_rtmp_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class SrsHttpHooks;
class SrsBandwidth;
class SrsKbps;
class SrsRtmpClient;
class SrsSharedPtrMessage;

/**
* the client provides the main logic control for RTMP clients.
Expand Down Expand Up @@ -87,8 +88,8 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl
virtual int stream_service_cycle();
virtual int check_vhost();
virtual int playing(SrsSource* source);
virtual int fmle_publish(SrsSource* source);
virtual int flash_publish(SrsSource* source);
virtual int fmle_publishing(SrsSource* source);
virtual int flash_publishing(SrsSource* source);
virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
private:
Expand Down
Loading

1 comment on commit 1ae3e6c

@winlinvip
Copy link
Member Author

@winlinvip winlinvip commented on 1ae3e6c Apr 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SRS1是1.8K连接,SRS2的起点2.7K。
这个Commit没有实质性的性能提升,是SRS2在SRS1基础上做的所有优化的结果。

Please sign in to comment.