diff --git a/trunk/src/app/srs_app_conn.cpp b/trunk/src/app/srs_app_conn.cpp index 34c12c3be8..9a6bced537 100644 --- a/trunk/src/app/srs_app_conn.cpp +++ b/trunk/src/app/srs_app_conn.cpp @@ -484,11 +484,6 @@ srs_error_t SrsTcpConnection::cycle() return srs_success; } -SrsContextId SrsTcpConnection::srs_id() -{ - return trd->cid(); -} - string SrsTcpConnection::remote_ip() { return ip; diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index 7f8fd17485..be45cdad57 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -110,7 +110,7 @@ class SrsResourceManager : virtual public ISrsCoroutineHandler, virtual public I // all connections accept from listener must extends from this base class, // server will add the connection to manager, and delete it when remove. class SrsTcpConnection : virtual public ISrsConnection, virtual public ISrsCoroutineHandler - , virtual public ISrsKbpsDelta, virtual public ISrsReloadHandler + , virtual public ISrsKbpsDelta, virtual public ISrsReloadHandler, virtual public ISrsStartable { protected: // Each connection start a green thread, @@ -143,6 +143,8 @@ class SrsTcpConnection : virtual public ISrsConnection, virtual public ISrsCorou public: // To dipose the connection. virtual void dispose(); +// Interface ISrsStartable +public: // Start the client green thread. // when server get a client from listener, // 1. server will create an concrete connection(for instance, RTMP connection), @@ -151,6 +153,7 @@ class SrsTcpConnection : virtual public ISrsConnection, virtual public ISrsCorou // when client cycle thread stop, invoke the on_thread_stop(), which will use server // To remove the client by server->remove(this). virtual srs_error_t start(); +public: // Set socket option TCP_NODELAY. virtual srs_error_t set_tcp_nodelay(bool v); // Set socket option SO_SNDBUF in srs_utime_t. @@ -161,10 +164,6 @@ class SrsTcpConnection : virtual public ISrsConnection, virtual public ISrsCorou // when serve connection completed, terminate the loop which will terminate the thread, // thread will invoke the on_thread_stop() when it terminated. virtual srs_error_t cycle(); -public: - // Get the srs id which identify the client. - // TODO: FIXME: Rename to cid. - virtual SrsContextId srs_id(); // Interface ISrsConnection. public: virtual std::string remote_ip(); diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index 02a94c3abc..da3fbfd3ce 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -254,6 +254,7 @@ class SrsGoApiTcmalloc : public ISrsHttpHandler }; #endif +// TODO: FIXME: Refine arch, change to use SrsTcpConnection class SrsHttpApi : virtual public SrsTcpConnection, virtual public ISrsReloadHandler { private: diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index a545fa8172..703fda65f5 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -55,6 +55,7 @@ class SrsHttpStreamServer; class SrsHttpStaticServer; // The http connection which request the static or stream content. +// TODO: FIXME: Refine arch, change to use SrsTcpConnection class SrsHttpConn : public SrsTcpConnection { protected: diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index be9f04bc71..8661c89382 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -83,6 +83,7 @@ class SrsClientInfo }; // The client provides the main logic control for RTMP clients. +// TODO: FIXME: Refine arch, change to use SrsTcpConnection class SrsRtmpConn : virtual public SrsTcpConnection, virtual public ISrsReloadHandler { // For the thread to directly access any field of connection. diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 3aeef041b8..3e96413448 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1462,11 +1462,12 @@ void SrsServer::resample_kbps() // collect delta from all clients. for (int i = 0; i < (int)conn_manager->size(); i++) { - SrsTcpConnection* conn = dynamic_cast(conn_manager->at(i)); + ISrsResource* c = conn_manager->at(i); + ISrsKbpsDelta* conn = dynamic_cast(conn_manager->at(i)); // add delta of connection to server kbps., // for next sample() of server kbps can get the stat. - stat->kbps_add_delta(conn); + stat->kbps_add_delta(c->get_id(), conn); } // TODO: FXME: support all other connections. @@ -1481,22 +1482,21 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) { srs_error_t err = srs_success; - SrsTcpConnection* conn = NULL; + ISrsResource* r = NULL; - if ((err = fd2conn(type, stfd, &conn)) != srs_success) { + if ((err = fd_to_resource(type, stfd, &r)) != srs_success) { if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) { srs_close_stfd(stfd); srs_error_reset(err); return srs_success; } - return srs_error_wrap(err, "fd2conn"); + return srs_error_wrap(err, "fd to resource"); } - srs_assert(conn); + srs_assert(r); // directly enqueue, the cycle thread will remove the client. - conn_manager->add(conn); - - // cycle will start process thread and when finished remove the client. - // @remark never use the conn, for it maybe destroyed. + conn_manager->add(r); + + ISrsStartable* conn = dynamic_cast(r); if ((err = conn->start()) != srs_success) { return srs_error_wrap(err, "start conn coroutine"); } @@ -1509,7 +1509,7 @@ SrsHttpServeMux* SrsServer::api_server() return http_api_mux; } -srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpConnection** pconn) +srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr) { srs_error_t err = srs_success; @@ -1549,11 +1549,11 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon } if (type == SrsListenerRtmpStream) { - *pconn = new SrsRtmpConn(this, stfd, ip, port); + *pr = new SrsRtmpConn(this, stfd, ip, port); } else if (type == SrsListenerHttpApi) { - *pconn = new SrsHttpApi(this, stfd, http_api_mux, ip, port); + *pr = new SrsHttpApi(this, stfd, http_api_mux, ip, port); } else if (type == SrsListenerHttpStream) { - *pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port); + *pr = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port); } else { srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port); srs_close_stfd(stfd); @@ -1565,11 +1565,11 @@ srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpCon void SrsServer::remove(ISrsResource* c) { - SrsTcpConnection* conn = dynamic_cast(c); + ISrsKbpsDelta* conn = dynamic_cast(c); SrsStatistic* stat = SrsStatistic::instance(); - stat->kbps_add_delta(conn); - stat->on_disconnect(conn->srs_id()); + stat->kbps_add_delta(c->get_id(), conn); + stat->on_disconnect(c->get_id()); // use manager to free it async. conn_manager->remove(c); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 0f8a3892a2..b06fd30ba6 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -342,7 +342,7 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan // TODO: FIXME: Fetch from hybrid server manager. virtual SrsHttpServeMux* api_server(); private: - virtual srs_error_t fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsTcpConnection** pconn); + virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsResource** pr); // Interface ISrsResourceManager public: // A callback for connection to remove itself. diff --git a/trunk/src/app/srs_app_st.cpp b/trunk/src/app/srs_app_st.cpp index e1cc9ba74a..b11c50fd19 100755 --- a/trunk/src/app/srs_app_st.cpp +++ b/trunk/src/app/srs_app_st.cpp @@ -40,6 +40,14 @@ ISrsCoroutineHandler::~ISrsCoroutineHandler() { } +ISrsStartable::ISrsStartable() +{ +} + +ISrsStartable::~ISrsStartable() +{ +} + SrsCoroutine::SrsCoroutine() { } diff --git a/trunk/src/app/srs_app_st.hpp b/trunk/src/app/srs_app_st.hpp index 7191dfc369..42313ed7ac 100644 --- a/trunk/src/app/srs_app_st.hpp +++ b/trunk/src/app/srs_app_st.hpp @@ -68,14 +68,23 @@ class ISrsCoroutineHandler virtual srs_error_t cycle() = 0; }; +// Start the object, generally a croutine. +class ISrsStartable +{ +public: + ISrsStartable(); + virtual ~ISrsStartable(); +public: + virtual srs_error_t start() = 0; +}; + // The corotine object. -class SrsCoroutine +class SrsCoroutine : public ISrsStartable { public: SrsCoroutine(); virtual ~SrsCoroutine(); public: - virtual srs_error_t start() = 0; virtual void stop() = 0; virtual void interrupt() = 0; // @return a copy of error, which should be freed by user. diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 851ab86e7b..b24583d06b 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -453,7 +453,7 @@ srs_error_t SrsStatistic::on_client(SrsContextId cid, SrsRequest* req, SrsTcpCon return err; } -void SrsStatistic::on_disconnect(SrsContextId cid) +void SrsStatistic::on_disconnect(const SrsContextId& cid) { // TODO: FIXME: We should use UUID for client ID. std::string id = cid.c_str(); @@ -474,10 +474,10 @@ void SrsStatistic::on_disconnect(SrsContextId cid) vhost->nb_clients--; } -void SrsStatistic::kbps_add_delta(SrsTcpConnection* conn) +void SrsStatistic::kbps_add_delta(const SrsContextId& cid, ISrsKbpsDelta* delta) { // TODO: FIXME: Should not use context id as connection id. - std::string id = conn->srs_id().c_str(); + std::string id = cid.c_str(); if (clients.find(id) == clients.end()) { return; } @@ -486,7 +486,7 @@ void SrsStatistic::kbps_add_delta(SrsTcpConnection* conn) // resample the kbps to collect the delta. int64_t in, out; - conn->remark(&in, &out); + delta->remark(&in, &out); // add delta of connection to kbps. // for next sample() of server kbps can get the stat. diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 69445df827..3320fac872 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -39,6 +39,7 @@ class SrsRequest; class SrsTcpConnection; class SrsJsonObject; class SrsJsonArray; +class ISrsKbpsDelta; struct SrsStatisticVhost { @@ -211,11 +212,10 @@ class SrsStatistic : public ISrsProtocolPerf // only got the request object, so the client specified by id maybe not // exists in stat. // TODO: FIXME: We should not use context id as client id. - virtual void on_disconnect(SrsContextId id); + virtual void on_disconnect(const SrsContextId& id); // Sample the kbps, add delta bytes of conn. // Use kbps_sample() to get all result of kbps stat. - // TODO: FIXME: the add delta must use ISrsKbpsDelta interface instead. - virtual void kbps_add_delta(SrsTcpConnection* conn); + virtual void kbps_add_delta(const SrsContextId& cid, ISrsKbpsDelta* delta); // Calc the result for all kbps. // @return the server kbps. virtual SrsKbps* kbps_sample();