From 1ae3e6c64cc5cee90e6050c26968ebc3c18281be Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 22 Jun 2014 20:01:25 +0800 Subject: [PATCH] performance refine, support 3k+ connections(270kbps). 0.9.130 --- README.md | 1 + trunk/conf/full.conf | 2 +- trunk/configure | 2 +- trunk/src/app/srs_app_config.cpp | 2 +- trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_edge.cpp | 12 ++-- trunk/src/app/srs_app_forward.cpp | 12 ++-- trunk/src/app/srs_app_rtmp_conn.cpp | 82 ++++++++++++---------- trunk/src/app/srs_app_rtmp_conn.hpp | 5 +- trunk/src/app/srs_app_source.cpp | 66 ++++++++--------- trunk/src/app/srs_app_source.hpp | 23 +++--- trunk/src/core/srs_core.hpp | 2 +- trunk/src/core/srs_core_autofree.hpp | 49 ------------- trunk/src/rtmp/srs_protocol_msg_array.cpp | 51 ++++++++++++++ trunk/src/rtmp/srs_protocol_msg_array.hpp | 53 ++++++++++++++ trunk/src/rtmp/srs_protocol_rtmp_stack.cpp | 23 +++--- trunk/src/rtmp/srs_protocol_rtmp_stack.hpp | 4 +- trunk/src/srs/srs.upp | 2 + 18 files changed, 230 insertions(+), 162 deletions(-) create mode 100644 trunk/src/rtmp/srs_protocol_msg_array.cpp create mode 100644 trunk/src/rtmp/srs_protocol_msg_array.hpp diff --git a/README.md b/README.md index 57434e97c6..ada1c932b1 100755 --- a/README.md +++ b/README.md @@ -241,6 +241,7 @@ Supported operating systems and hardware: * 2013-10-17, Created.
## History +* v1.0, 2014-06-22, performance refine, support 3k+ connections(270kbps). 0.9.130 * v1.0, 2014-06-21, support edge [token traverse](https://github.com/winlinvip/simple-rtmp-server/wiki/DRM#tokentraverse), fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129 * v1.0, 2014-06-19, add connections count to api summaries. 0.9.127 * v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 15b401484e..e2cd62c5cc 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -36,7 +36,7 @@ srs_log_level trace; srs_log_file ./objs/srs.log; # the max connections. # if exceed the max connections, server will drop the new connection. -# default: 2000 +# default: 12345 max_connections 1000; # whether start as deamon # @remark: donot support reload. diff --git a/trunk/configure b/trunk/configure index a680ad7041..79d03d3b0e 100755 --- a/trunk/configure +++ b/trunk/configure @@ -456,7 +456,7 @@ MODULE_ID="RTMP" MODULE_DEPENDS=("CORE" "KERNEL") ModuleLibIncs=(${SRS_OBJS} ${LibSSLRoot}) MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_protocol_rtmp_stack" "srs_protocol_rtmp" - "srs_protocol_handshake" "srs_protocol_utility") + "srs_protocol_handshake" "srs_protocol_utility" "srs_protocol_msg_array") RTMP_INCS="src/rtmp"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh RTMP_OBJS="${MODULE_OBJS[@]}" # diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 307eb4531b..c18626d223 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1403,7 +1403,7 @@ int SrsConfig::get_max_connections() SrsConfDirective* conf = root->get("max_connections"); if (!conf || conf->arg0().empty()) { - return 2000; + return SRS_CONF_DEFAULT_MAX_CONNECTIONS; } return ::atoi(conf->arg0().c_str()); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index a2d99dd68b..b1c74589fc 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_PID_FILE "./objs/srs.pid" #define SRS_DEFAULT_CONF "conf/srs.conf" +#define SRS_CONF_DEFAULT_MAX_CONNECTIONS 12345 #define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html" #define SRS_CONF_DEFAULT_HLS_FRAGMENT 10 #define SRS_CONF_DEFAULT_HLS_WINDOW 60 diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 501f097abf..d934019dc8 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // when error, edge ingester sleep for a while and retry. #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) @@ -431,6 +432,7 @@ void SrsEdgeForwarder::stop() kbps->set_io(NULL, NULL); } +#define SYS_MAX_EDGE_SEND_MSGS 128 int SrsEdgeForwarder::cycle() { int ret = ERROR_SUCCESS; @@ -438,6 +440,8 @@ int SrsEdgeForwarder::cycle() client->set_recv_timeout(SRS_PULSE_TIMEOUT_US); SrsPithyPrint pithy_print(SRS_STAGE_EDGE); + + SrsSharedPtrMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS); while (pthread->can_loop()) { // switch to other st-threads. @@ -465,8 +469,7 @@ int SrsEdgeForwarder::cycle() // forward all messages. int count = 0; - SrsSharedPtrMessage** msgs = NULL; - if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) { + if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { srs_error("get message to forward to origin failed. ret=%d", ret); return ret; } @@ -488,16 +491,15 @@ int SrsEdgeForwarder::cycle() srs_verbose("no packets to forward."); continue; } - SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count); // all msgs to forward to origin. // @remark, becareful, all msgs must be free explicitly, // free by send_and_free_message or srs_freep. for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + SrsSharedPtrMessage* msg = msgs.msgs[i]; srs_assert(msg); - msgs[i] = NULL; + msgs.msgs[i] = NULL; if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { srs_error("edge publish forwarder send message to server failed. ret=%d", ret); diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index a9f71dea5f..d4db40e388 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -41,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // when error, forwarder sleep for a while and retry. #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) @@ -309,6 +310,7 @@ int SrsForwarder::connect_server() return ret; } +#define SYS_MAX_FORWARD_SEND_MSGS 128 int SrsForwarder::forward() { int ret = ERROR_SUCCESS; @@ -317,6 +319,8 @@ int SrsForwarder::forward() SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER); + SrsSharedPtrMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS); + while (pthread->can_loop()) { // switch to other st-threads. st_usleep(0); @@ -339,8 +343,7 @@ int SrsForwarder::forward() // forward all messages. int count = 0; - SrsSharedPtrMessage** msgs = NULL; - if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) { + if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { srs_error("get message to forward failed. ret=%d", ret); return ret; } @@ -360,16 +363,15 @@ int SrsForwarder::forward() srs_verbose("no packets to forward."); continue; } - SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count); // all msgs to forward. // @remark, becareful, all msgs must be free explicitly, // free by send_and_free_message or srs_freep. for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + SrsSharedPtrMessage* msg = msgs.msgs[i]; srs_assert(msg); - msgs[i] = NULL; + msgs.msgs[i] = NULL; if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { srs_error("forwarder send message to server failed. ret=%d", ret); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index dbb5f867a4..324243d6c1 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -50,6 +50,7 @@ using namespace std; #include #include #include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -382,7 +383,7 @@ int SrsRtmpConn::stream_service_cycle() } srs_info("start to publish stream %s success", req->stream.c_str()); - ret = fmle_publish(source); + ret = fmle_publishing(source); // when edge, notice edge to change state. // when origin, notice all service to unpublish. @@ -416,7 +417,7 @@ int SrsRtmpConn::stream_service_cycle() } srs_info("flash start to publish stream %s success", req->stream.c_str()); - ret = flash_publish(source); + ret = flash_publishing(source); // when edge, notice edge to change state. // when origin, notice all service to unpublish. @@ -476,6 +477,8 @@ int SrsRtmpConn::check_vhost() return ret; } +#define SYS_MAX_PLAY_SEND_MSGS 128 + int SrsRtmpConn::playing(SrsSource* source) { int ret = ERROR_SUCCESS; @@ -499,38 +502,43 @@ int SrsRtmpConn::playing(SrsSource* source) rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_US); SrsPithyPrint pithy_print(SRS_STAGE_PLAY_USER); + + SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS); + bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; + while (true) { - // switch to other st-threads. - st_usleep(0); - + // collect elapse for pithy print. pithy_print.elapse(); // read from client. if (true) { SrsMessage* msg = NULL; ret = rtmp->recv_message(&msg); - srs_verbose("play loop recv message. ret=%d", ret); - if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { - if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) { + + if (ret == ERROR_SOCKET_TIMEOUT) { + // it's ok, do nothing. + ret = ERROR_SUCCESS; + } else if (ret != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { srs_error("recv client control message failed. ret=%d", ret); } return ret; - } - if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { - if (!srs_is_system_control_error(ret)) { - srs_error("process play control message failed. ret=%d", ret); + } else { + if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { + if (!srs_is_system_control_error(ret)) { + srs_error("process play control message failed. ret=%d", ret); + } + return ret; } - return ret; } } // get messages from consumer. - SrsSharedPtrMessage** msgs = NULL; int count = 0; - if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) { + if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { srs_error("get messages from consumer failed. ret=%d", ret); return ret; } @@ -545,32 +553,29 @@ int SrsRtmpConn::playing(SrsSource* source) kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); } - if (count <= 0) { - srs_verbose("no packets in queue."); - continue; - } - SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count); - // sendout messages // @remark, becareful, all msgs must be free explicitly, // free by send_and_free_message or srs_freep. for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs[i]; + SrsSharedPtrMessage* msg = msgs.msgs[i]; // the send_message will free the msg, // so set the msgs[i] to NULL. - msgs[i] = NULL; - - srs_assert(msg); + msgs.msgs[i] = NULL; - // foreach msg, collect the duration. - // @remark: never use msg when sent it, for the protocol sdk will free it. - if (starttime < 0 || starttime > msg->header.timestamp) { + // only when user specifies the duration, + // we start to collect the durations for each message. + if (user_specified_duration_to_stop) { + // foreach msg, collect the duration. + // @remark: never use msg when sent it, for the protocol sdk will free it. + if (starttime < 0 || starttime > msg->header.timestamp) { + starttime = msg->header.timestamp; + } + duration += msg->header.timestamp - starttime; starttime = msg->header.timestamp; } - duration += msg->header.timestamp - starttime; - starttime = msg->header.timestamp; + // no need to assert msg, for the rtmp will assert it. if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) { srs_error("send message to client failed. ret=%d", ret); return ret; @@ -579,17 +584,22 @@ int SrsRtmpConn::playing(SrsSource* source) // if duration specified, and exceed it, stop play live. // @see: https://github.com/winlinvip/simple-rtmp-server/issues/45 - if (req->duration > 0 && duration >= (int64_t)req->duration) { - ret = ERROR_RTMP_DURATION_EXCEED; - srs_trace("stop live for duration exceed. ret=%d", ret); - return ret; + if (user_specified_duration_to_stop) { + if (duration >= (int64_t)req->duration) { + ret = ERROR_RTMP_DURATION_EXCEED; + srs_trace("stop live for duration exceed. ret=%d", ret); + return ret; + } } + + // switch to other threads, to anti dead loop. + st_usleep(0); } return ret; } -int SrsRtmpConn::fmle_publish(SrsSource* source) +int SrsRtmpConn::fmle_publishing(SrsSource* source) { int ret = ERROR_SUCCESS; @@ -668,7 +678,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) return ret; } -int SrsRtmpConn::flash_publish(SrsSource* source) +int SrsRtmpConn::flash_publishing(SrsSource* source) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 3fd77a0479..fde77f068b 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -48,6 +48,7 @@ class SrsHttpHooks; class SrsBandwidth; class SrsKbps; class SrsRtmpClient; +class SrsSharedPtrMessage; /** * the client provides the main logic control for RTMP clients. @@ -87,8 +88,8 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl virtual int stream_service_cycle(); virtual int check_vhost(); virtual int playing(SrsSource* source); - virtual int fmle_publish(SrsSource* source); - virtual int flash_publish(SrsSource* source); + virtual int fmle_publishing(SrsSource* source); + virtual int flash_publishing(SrsSource* source); virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge); virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg); private: diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index dbcd64aaa0..a1b2403cf5 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -136,7 +136,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; - if (msg->header.is_video() || msg->header.is_audio()) { + if (msg->header.is_audio() || msg->header.is_video()) { if (av_start_time == -1) { av_start_time = msg->header.timestamp; } @@ -153,7 +153,7 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg) return ret; } -int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) +int SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) { int ret = ERROR_SUCCESS; @@ -161,17 +161,8 @@ int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, in return ret; } - if (max_count == 0) { - count = (int)msgs.size(); - } else { - count = srs_min(max_count, (int)msgs.size()); - } - - if (count <= 0) { - return ret; - } - - pmsgs = new SrsSharedPtrMessage*[count]; + srs_assert(max_count > 0); + count = srs_min(max_count, (int)msgs.size()); for (int i = 0; i < count; i++) { pmsgs[i] = msgs[i]; @@ -275,11 +266,11 @@ int SrsConsumer::get_time() return jitter->get_time(); } -int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) +int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv) { int ret = ERROR_SUCCESS; - if (!source->is_atc()) { + if (!atc) { if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) { srs_freep(msg); return ret; @@ -293,8 +284,10 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv) return ret; } -int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count) +int SrsConsumer::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) { + srs_assert(max_count > 0); + if (should_update_source_id) { srs_trace("update source_id=%d", source->source_id()); should_update_source_id = false; @@ -305,7 +298,7 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c return ERROR_SUCCESS; } - return queue->get_packets(max_count, pmsgs, count); + return queue->dump_packets(max_count, pmsgs, count); } int SrsConsumer::on_play_client_pause(bool is_pause) @@ -391,14 +384,15 @@ void SrsGopCache::clear() cached_video_count = 0; } -int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) +int SrsGopCache::dump(SrsConsumer* consumer, bool atc, int tba, int tbv) { int ret = ERROR_SUCCESS; std::vector::iterator it; for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { SrsSharedPtrMessage* msg = *it; - if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) { + SrsSharedPtrMessage* copy = msg->copy(); + if ((ret = consumer->enqueue(copy, atc, tba, tbv)) != ERROR_SUCCESS) { srs_error("dispatch cached gop failed. ret=%d", ret); return ret; } @@ -926,7 +920,8 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata) std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + SrsSharedPtrMessage* copy = cache_metadata->copy(); + if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch the metadata failed. ret=%d", ret); return ret; } @@ -987,17 +982,17 @@ int SrsSource::on_audio(SrsMessage* audio) // copy to all consumer if (true) { - std::vector::iterator it; - for (it = consumers.begin(); it != consumers.end(); ++it) { - SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + for (int i = 0; i < (int)consumers.size(); i++) { + SrsConsumer* consumer = consumers.at(i); + SrsSharedPtrMessage* copy = msg->copy(); + if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch the audio failed. ret=%d", ret); return ret; } } srs_info("dispatch audio success."); } - + // copy to all forwarders. if (true) { std::vector::iterator it; @@ -1077,10 +1072,10 @@ int SrsSource::on_video(SrsMessage* video) // copy to all consumer if (true) { - std::vector::iterator it; - for (it = consumers.begin(); it != consumers.end(); ++it) { - SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + for (int i = 0; i < (int)consumers.size(); i++) { + SrsConsumer* consumer = consumers.at(i); + SrsSharedPtrMessage* copy = msg->copy(); + if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch the video failed. ret=%d", ret); return ret; } @@ -1327,27 +1322,27 @@ void SrsSource::on_unpublish() } // copy metadata. - if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch metadata failed. ret=%d", ret); return ret; } srs_info("dispatch metadata success"); // copy sequence header - if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch video sequence header failed. ret=%d", ret); return ret; } srs_info("dispatch video sequence header success"); - if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { + if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), atc, sample_rate, frame_rate)) != ERROR_SUCCESS) { srs_error("dispatch audio sequence header failed. ret=%d", ret); return ret; } srs_info("dispatch audio sequence header success"); // copy gop cache to client. - if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) { + if ((ret = gop_cache->dump(consumer, atc, sample_rate, frame_rate)) != ERROR_SUCCESS) { return ret; } @@ -1375,11 +1370,6 @@ void SrsSource::set_cache(bool enabled) gop_cache->set(enabled); } -bool SrsSource::is_atc() -{ - return atc; -} - int SrsSource::on_edge_start_play() { return play_edge->on_client_play(); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index fce1c7bc81..e35296d064 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -110,11 +110,11 @@ class SrsMessageQueue virtual int enqueue(SrsSharedPtrMessage* msg); /** * get packets in consumer queue. - * @pmsgs SrsMessages*[], output the prt array. - * @count the count in array. - * @max_count the max count to dequeue, 0 to dequeue all. + * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it. + * @count the count in array, output param. + * @max_count the max count to dequeue, must be positive. */ - virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); + virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); private: /** * remove a gop from the front. @@ -155,19 +155,20 @@ class SrsConsumer virtual int get_time(); /** * enqueue an shared ptr message. + * @param whether atc, donot use jitter correct if true. * @param tba timebase of audio. * used to calc the audio time delta if time-jitter detected. * @param tbv timebase of video. * used to calc the video time delta if time-jitter detected. */ - virtual int enqueue(SrsSharedPtrMessage* msg, int tba, int tbv); + virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv); /** * get packets in consumer queue. - * @pmsgs SrsMessages*[], output the prt array. - * @count the count in array. - * @max_count the max count to dequeue, 0 to dequeue all. + * @pmsgs SrsMessages*[], used to store the msgs, user must alloc it. + * @count the count in array, output param. + * @max_count the max count to dequeue, must be positive. */ - virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); + virtual int dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count); /** * when client send the pause message. */ @@ -208,7 +209,7 @@ class SrsGopCache */ virtual int cache(SrsSharedPtrMessage* msg); virtual void clear(); - virtual int dump(SrsConsumer* consumer, int tba, int tbv); + virtual int dump(SrsConsumer* consumer, bool atc, int tba, int tbv); /** * used for atc to get the time of gop cache, * the atc will adjust the sequence header timestamp to gop cache. @@ -346,8 +347,6 @@ class SrsSource : public ISrsReloadHandler virtual void set_cache(bool enabled); // internal public: - // for consumer, atc feature. - virtual bool is_atc(); // for edge, when play edge stream, check the state virtual int on_edge_start_play(); // for edge, when publish edge stream, check the state diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 7870166f61..62fcb68df7 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "129" +#define VERSION_REVISION "130" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index 2de455fd18..d021e0735d 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -66,53 +66,4 @@ class __SrsAutoFree } }; -/** -* auto free the array ptrs, for example, MyClass* msgs[10], -* which stores 10 MyClass* objects, this class will: -* 1. free each MyClass* in array. -* 2. free the msgs itself. -* 3. set msgs to NULL. -* @remark, MyClass* msgs[] equals to MyClass**, the ptr array equals ptr to ptr. -* Usage: -* MyClass* msgs[10]; -* // ...... use msgs. -* SrsAutoFreeArray(MyClass, msgs, 10); -*/ -#define SrsAutoFreeArray(className, instance, size) \ - __SrsAutoFreeArray _auto_free_array_##instance(&instance, size) -template -class __SrsAutoFreeArray -{ -private: - T*** ptr; - int size; -public: - /** - * auto delete the ptr array. - */ - __SrsAutoFreeArray(T*** _ptr, int _size) { - ptr = _ptr; - size = _size; - } - - virtual ~__SrsAutoFreeArray() { - if (ptr == NULL || *ptr == NULL) { - return; - } - - T** arr = *ptr; - for (int i = 0; i < size; i++) { - T* pobj = arr[i]; - if (pobj) { - delete pobj; - arr[i] = NULL; - } - } - - delete arr; - - *ptr = NULL; - } -}; - #endif \ No newline at end of file diff --git a/trunk/src/rtmp/srs_protocol_msg_array.cpp b/trunk/src/rtmp/srs_protocol_msg_array.cpp new file mode 100644 index 0000000000..e9dc402412 --- /dev/null +++ b/trunk/src/rtmp/srs_protocol_msg_array.cpp @@ -0,0 +1,51 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include + +SrsSharedPtrMessageArray::SrsSharedPtrMessageArray(int _size) +{ + srs_assert(_size > 0); + + msgs = new SrsSharedPtrMessage*[_size]; + size = _size; + + // initialize + for (int i = 0; i < _size; i++) { + msgs[i] = NULL; + } +} + +SrsSharedPtrMessageArray::~SrsSharedPtrMessageArray() +{ + // cleanup + for (int i = 0; i < size; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + srs_freep(msg); + } + + srs_freep(msgs); +} + diff --git a/trunk/src/rtmp/srs_protocol_msg_array.hpp b/trunk/src/rtmp/srs_protocol_msg_array.hpp new file mode 100644 index 0000000000..5366bdbe54 --- /dev/null +++ b/trunk/src/rtmp/srs_protocol_msg_array.hpp @@ -0,0 +1,53 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013-2014 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_RTMP_PROTOCOL_MSG_ARRAY_HPP +#define SRS_RTMP_PROTOCOL_MSG_ARRAY_HPP + +/* +#include +*/ + +#include + +class SrsSharedPtrMessage; + +/** +* the class to auto free the shared ptr message array. +*/ +class SrsSharedPtrMessageArray +{ +public: + /** + * when user already send the msg in msgs, please set to NULL, + * for instance, msg= msgs.msgs[i], msgs.msgs[i]=NULL, send(msg), + * where send(msg) will always send and free it. + */ + SrsSharedPtrMessage** msgs; + int size; +public: + SrsSharedPtrMessageArray(int _size); + virtual ~SrsSharedPtrMessageArray(); +}; + +#endif diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index ebba78217b..4cb5037712 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -437,13 +437,12 @@ int SrsProtocol::decode_message(SrsMessage* msg, SrsPacket** ppacket) return ret; } -int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet) +int SrsProtocol::do_send_message(SrsMessage* msg, SrsPacket* packet) { int ret = ERROR_SUCCESS; - // always free msg. + // always not NULL msg. srs_assert(msg); - SrsAutoFree(SrsMessage, msg); // we donot use the complex basic header, // ensure the basic header is 1bytes. @@ -497,7 +496,7 @@ int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet) *pheader++ = pp[3]; // chunk extended timestamp header, 0 or 4 bytes, big-endian - if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ + if(timestamp >= RTMP_EXTENDED_TIMESTAMP) { pp = (char*)×tamp; *pheader++ = pp[3]; *pheader++ = pp[2]; @@ -522,7 +521,7 @@ int SrsProtocol::do_send_and_free_message(SrsMessage* msg, SrsPacket* packet) // @see: ngx_rtmp_prepare_message // @see: http://blog.csdn.net/win_lin/article/details/13363699 u_int32_t timestamp = (u_int32_t)msg->header.timestamp; - if(timestamp >= RTMP_EXTENDED_TIMESTAMP){ + if (timestamp >= RTMP_EXTENDED_TIMESTAMP) { pp = (char*)×tamp; *pheader++ = pp[3]; *pheader++ = pp[2]; @@ -733,7 +732,12 @@ int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id) if (msg) { msg->header.stream_id = stream_id; } - return do_send_and_free_message(msg, NULL); + + // donot use the auto free to free the msg, + // for performance issue. + int ret = do_send_message(msg, NULL); + srs_freep(msg); + return ret; } int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) @@ -767,9 +771,10 @@ int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) msg->header.stream_id = stream_id; msg->header.perfer_cid = packet->get_perfer_cid(); - if ((ret = do_send_and_free_message(msg, packet)) != ERROR_SUCCESS) { - return ret; - } + // donot use the auto free to free the msg, + // for performance issue. + ret = do_send_message(msg, packet); + srs_freep(msg); return ret; } diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index ebdc575818..9286602070 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -174,10 +174,10 @@ class SrsProtocol virtual int send_and_free_packet(SrsPacket* packet, int stream_id); private: /** - * imp for send_and_free_message + * send out the message, donot free it, the caller must free the param msg. * @param packet the packet of message, NULL for raw message. */ - virtual int do_send_and_free_message(SrsMessage* msg, SrsPacket* packet); + virtual int do_send_message(SrsMessage* msg, SrsPacket* packet); /** * imp for decode_message */ diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index e30f5a8fb1..de1f2c4400 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -36,6 +36,8 @@ file ..\rtmp\srs_protocol_handshake.cpp, ..\rtmp\srs_protocol_io.hpp, ..\rtmp\srs_protocol_io.cpp, + ..\rtmp\srs_protocol_msg_array.hpp, + ..\rtmp\srs_protocol_msg_array.cpp, ..\rtmp\srs_protocol_rtmp.hpp, ..\rtmp\srs_protocol_rtmp.cpp, ..\rtmp\srs_protocol_rtmp_stack.hpp,