Skip to content

Commit

Permalink
For #1657, refine api for http
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Nov 6, 2020
1 parent 5782b45 commit 7916214
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 47 deletions.
8 changes: 7 additions & 1 deletion trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd,
manager = cm;
sdk = NULL;
pprint = SrsPithyPrint::create_caster();
conn = new SrsHttpConn(this, fd, m, cip, cport);
skt = new SrsTcpConnection(fd);
conn = new SrsHttpConn(this, skt, m, cip, cport);
ip = cip;
port = cport;

Expand All @@ -158,6 +159,7 @@ SrsDynamicHttpConn::~SrsDynamicHttpConn()
_srs_config->unsubscribe(this);

srs_freep(conn);
srs_freep(skt);
srs_freep(sdk);
srs_freep(pprint);
}
Expand Down Expand Up @@ -307,6 +309,10 @@ srs_error_t SrsDynamicHttpConn::start()
return srs_error_wrap(err, "set cors=%d", v);
}

if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}

return conn->start();
}

Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_caster_flv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class SrsDynamicHttpConn : virtual public ISrsStartableConneciton, virtual publi
std::string output;
SrsPithyPrint* pprint;
SrsSimpleRtmpClient* sdk;
SrsTcpConnection* skt;
SrsHttpConn* conn;
private:
// The ip and port of client.
Expand Down
13 changes: 11 additions & 2 deletions trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1674,10 +1674,11 @@ srs_error_t SrsGoApiTcmalloc::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
}
#endif

SrsHttpApi::SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port)
SrsHttpApi::SrsHttpApi(bool https, ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port)
{
manager = cm;
conn = new SrsHttpConn(this, fd, m, cip, port);
skt = new SrsTcpConnection(fd);
conn = new SrsHttpConn(this, skt, m, cip, port);

_srs_config->subscribe(this);
}
Expand All @@ -1687,6 +1688,7 @@ SrsHttpApi::~SrsHttpApi()
_srs_config->unsubscribe(this);

srs_freep(conn);
srs_freep(skt);
}

srs_error_t SrsHttpApi::on_start()
Expand Down Expand Up @@ -1745,6 +1747,9 @@ srs_error_t SrsHttpApi::on_conn_done(srs_error_t r0)

std::string SrsHttpApi::desc()
{
if (ssl) {
return "HttpsConn";
}
return "HttpConn";
}

Expand All @@ -1768,6 +1773,10 @@ srs_error_t SrsHttpApi::start()
return srs_error_wrap(err, "set cors=%d", v);
}

if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}

return conn->start();
}

Expand Down
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_http_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ class SrsHttpApi : virtual public ISrsStartableConneciton, virtual public ISrsHt
private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
SrsTcpConnection* skt;
SrsHttpConn* conn;
public:
SrsHttpApi(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
SrsHttpApi(bool https, ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpApi();
// Interface ISrsHttpConnOwner.
public:
Expand Down
43 changes: 14 additions & 29 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ ISrsHttpConnOwner::~ISrsHttpConnOwner()
{
}

SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner* handler, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport)
SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner* handler, ISrsProtocolReadWriter* fd, ISrsHttpServeMux* m, string cip, int cport)
{
parser = new SrsHttpParser();
cors = new SrsHttpCorsMux();
http_mux = m;
handler_ = handler;

skt = new SrsTcpConnection(fd);
skt = fd;
ip = cip;
port = cport;
create_time = srsu2ms(srs_get_system_time());
Expand Down Expand Up @@ -111,10 +111,6 @@ srs_error_t SrsHttpConn::start()
{
srs_error_t err = srs_success;

if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}

if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
Expand Down Expand Up @@ -293,16 +289,6 @@ srs_error_t SrsHttpConn::set_jsonp(bool v)
return srs_success;
}

srs_error_t SrsHttpConn::set_tcp_nodelay(bool v)
{
return skt->set_tcp_nodelay(v);
}

srs_error_t SrsHttpConn::set_socket_buffer(srs_utime_t buffer_v)
{
return skt->set_socket_buffer(buffer_v);
}

string SrsHttpConn::remote_ip()
{
return ip;
Expand All @@ -321,8 +307,8 @@ void SrsHttpConn::expire()
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
{
manager = cm;
conn = new SrsHttpConn(this, fd, m, cip, port);
stfd = fd;
skt = new SrsTcpConnection(fd);
conn = new SrsHttpConn(this, skt, m, cip, port);

_srs_config->subscribe(this);
}
Expand All @@ -332,31 +318,26 @@ SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn()
_srs_config->unsubscribe(this);

srs_freep(conn);
srs_freep(skt);
}

srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
{
srs_error_t err = srs_success;

SrsStSocket skt;
// Check user interrupt by interval.
skt->set_recv_timeout(3 * SRS_UTIME_SECONDS);

// We start a socket to read the stfd, which is writing by conn.
// It's ok, because conn never read it after processing the HTTP request.
if ((err = skt.initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}

// Check user interrupt by interval.
skt.set_recv_timeout(3 * SRS_UTIME_SECONDS);

// drop all request body.
char body[4096];
while (true) {
if ((err = conn->pull()) != srs_success) {
return srs_error_wrap(err, "timeout");
}

if ((err = skt.read(body, 4096, NULL)) != srs_success) {
if ((err = skt->read(body, 4096, NULL)) != srs_success) {
// Because we use timeout to check trd state, so we should ignore any timeout.
if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) {
srs_freep(err);
Expand Down Expand Up @@ -420,12 +401,12 @@ srs_error_t SrsResponseOnlyHttpConn::on_conn_done(srs_error_t r0)

srs_error_t SrsResponseOnlyHttpConn::set_tcp_nodelay(bool v)
{
return conn->set_tcp_nodelay(v);
return skt->set_tcp_nodelay(v);
}

srs_error_t SrsResponseOnlyHttpConn::set_socket_buffer(srs_utime_t buffer_v)
{
return conn->set_socket_buffer(buffer_v);
return skt->set_socket_buffer(buffer_v);
}

std::string SrsResponseOnlyHttpConn::desc()
Expand All @@ -452,6 +433,10 @@ srs_error_t SrsResponseOnlyHttpConn::start()
return srs_error_wrap(err, "set cors=%d", v);
}

if ((err = skt->initialize()) != srs_success) {
return srs_error_wrap(err, "init socket");
}

return conn->start();
}

Expand Down
11 changes: 3 additions & 8 deletions trunk/src/app/srs_app_http_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsC
SrsHttpCorsMux* cors;
ISrsHttpConnOwner* handler_;
protected:
SrsTcpConnection* skt;
ISrsProtocolReadWriter* skt;
// Each connection start a green thread,
// when thread stop, the connection will be delete by server.
SrsCoroutine* trd;
Expand All @@ -102,7 +102,7 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsC
// for current connection to log self create time and calculate the living time.
int64_t create_time;
public:
SrsHttpConn(ISrsHttpConnOwner* handler, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
SrsHttpConn(ISrsHttpConnOwner* handler, ISrsProtocolReadWriter* fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsHttpConn();
// Interface ISrsResource.
public:
Expand Down Expand Up @@ -133,11 +133,6 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsC
virtual srs_error_t set_crossdomain_enabled(bool v);
// Whether enable the JSONP.
virtual srs_error_t set_jsonp(bool v);
public:
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in srs_utime_t.
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
Expand All @@ -154,8 +149,8 @@ class SrsResponseOnlyHttpConn : virtual public ISrsStartableConneciton, virtual
private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
SrsTcpConnection* skt;
SrsHttpConn* conn;
srs_netfd_t stfd;
public:
SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsResponseOnlyHttpConn();
Expand Down
13 changes: 7 additions & 6 deletions trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,24 +625,25 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess

// Try to use fast flv encoder, remember that it maybe NULL.
SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc);

// Note that the handler of hc now is rohc.
SrsResponseOnlyHttpConn* rohc = dynamic_cast<SrsResponseOnlyHttpConn*>(hc->handler());
srs_assert(rohc);

// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
if (tcp_nodelay) {
if ((err = hc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
if ((err = rohc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
return srs_error_wrap(err, "set tcp nodelay");
}
}

srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
if ((err = hc->set_socket_buffer(mw_sleep)) != srs_success) {
if ((err = rohc->set_socket_buffer(mw_sleep)) != srs_success) {
return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
}

// Note that the handler of hc now is rohc.
SrsResponseOnlyHttpConn* rohc = dynamic_cast<SrsResponseOnlyHttpConn*>(hc->handler());
srs_assert(rohc);

// Start a thread to receive all messages from client, then drop them.
SrsHttpRecvThread* trd = new SrsHttpRecvThread(rohc);
SrsAutoFree(SrsHttpRecvThread, trd);

Expand Down

0 comments on commit 7916214

Please sign in to comment.