Skip to content

Commit

Permalink
For #1657: Refine connection arch, remove hierachy
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Nov 5, 2020
1 parent 2a14dc0 commit 24125b9
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 52 deletions.
57 changes: 47 additions & 10 deletions trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)
srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd));
}

SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip, port);
ISrsStartableConneciton* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip, port);
conns.push_back(conn);

if ((err = conn->start()) != srs_success) {
Expand All @@ -97,14 +97,14 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd)

void SrsAppCasterFlv::remove(ISrsResource* c)
{
SrsHttpConn* conn = dynamic_cast<SrsHttpConn*>(c);
ISrsStartableConneciton* conn = dynamic_cast<ISrsStartableConneciton*>(c);

std::vector<SrsHttpConn*>::iterator it;
std::vector<ISrsStartableConneciton*>::iterator it;
if ((it = std::find(conns.begin(), conns.end(), conn)) != conns.end()) {
conns.erase(it);
}

// fixbug: SrsHttpConn for CasterFlv is not freed, which could cause memory leak
// fixbug: ISrsStartableConneciton for CasterFlv is not freed, which could cause memory leak
// so, free conn which is not managed by SrsServer->conns;
// @see: https://github.com/ossrs/srs/issues/826
manager->remove(c);
Expand Down Expand Up @@ -141,23 +141,23 @@ srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
return err;
}

SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int cport)
{
manager = cm;
sdk = NULL;
pprint = SrsPithyPrint::create_caster();
conn = new SrsHttpConn(this, fd, m, cip, cport);
ip = cip;
port = cport;
}

SrsDynamicHttpConn::~SrsDynamicHttpConn()
{
srs_freep(conn);
srs_freep(sdk);
srs_freep(pprint);
}

srs_error_t SrsDynamicHttpConn::on_got_http_message(ISrsHttpMessage* msg)
{
return srs_success;
}

srs_error_t SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -249,6 +249,43 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod
return err;
}

srs_error_t SrsDynamicHttpConn::on_http_message(ISrsHttpMessage* msg)
{
return srs_success;
}

void SrsDynamicHttpConn::on_conn_done()
{
// Because we use manager to manage this object,
// not the http connection object, so we must remove it here.
manager->remove(this);
}

std::string SrsDynamicHttpConn::desc()
{
return "DHttpConn";
}

std::string SrsDynamicHttpConn::remote_ip()
{
return conn->remote_ip();
}

const SrsContextId& SrsDynamicHttpConn::get_id()
{
return conn->get_id();
}

srs_error_t SrsDynamicHttpConn::start()
{
return conn->start();
}

void SrsDynamicHttpConn::remark(int64_t* in, int64_t* out)
{
conn->remark(in, out);
}

SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h)
{
http = h;
Expand Down
31 changes: 27 additions & 4 deletions trunk/src/app/srs_app_caster_flv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SrsAppCasterFlv : virtual public ISrsTcpHandler
private:
std::string output;
SrsHttpServeMux* http_mux;
std::vector<SrsHttpConn*> conns;
std::vector<ISrsStartableConneciton*> conns;
SrsResourceManager* manager;
public:
SrsAppCasterFlv(SrsConfDirective* c);
Expand All @@ -72,21 +72,44 @@ class SrsAppCasterFlv : virtual public ISrsTcpHandler
};

// The dynamic http connection, never drop the body.
class SrsDynamicHttpConn : public SrsHttpConn
class SrsDynamicHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsHttpMessageHandler
{
private:
// The manager object to manage the connection.
ISrsResourceManager* manager;
std::string output;
SrsPithyPrint* pprint;
SrsSimpleRtmpClient* sdk;
SrsHttpConn* conn;
private:
// The ip and port of client.
std::string ip;
int port;
public:
SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port);
virtual ~SrsDynamicHttpConn();
public:
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg);
public:
virtual srs_error_t proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o);
private:
virtual srs_error_t do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec);
// Extract APIs from SrsTcpConnection.
// Interface ISrsHttpMessageHandler.
public:
virtual srs_error_t on_http_message(ISrsHttpMessage* msg);
virtual void on_conn_done();
// Interface ISrsResource.
public:
virtual std::string desc();
// Interface ISrsConnection.
public:
virtual std::string remote_ip();
virtual const SrsContextId& get_id();
// Interface ISrsStartable
public:
virtual srs_error_t start();
// Interface ISrsKbpsDelta
public:
virtual void remark(int64_t* in, int64_t* out);
};

// The http wrapper for file reader, to read http post stream like a file.
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,7 @@ srs_error_t SrsHttpApi::cycle()
srs_error_t err = do_cycle();

// Notify manager to remove it.
// Note that we create this object, so we use manager to remove it.
manager->remove(this);

// success.
Expand Down
91 changes: 79 additions & 12 deletions trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,22 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_app_st.hpp>

SrsHttpConn::SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport)
ISrsHttpMessageHandler::ISrsHttpMessageHandler()
{
}

ISrsHttpMessageHandler::~ISrsHttpMessageHandler()
{
}

SrsHttpConn::SrsHttpConn(ISrsHttpMessageHandler* handler, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport)
{
parser = new SrsHttpParser();
cors = new SrsHttpCorsMux();
http_mux = m;
handler_ = handler;

skt = new SrsTcpConnection(fd);
manager = cm;
ip = cip;
port = cport;
create_time = srsu2ms(srs_get_system_time());
Expand Down Expand Up @@ -150,7 +158,7 @@ srs_error_t SrsHttpConn::do_cycle()
last_req = hreq->to_request(hreq->host());

// may should discard the body.
if ((err = on_got_http_message(req)) != srs_success) {
if ((err = handler_->on_http_message(req)) != srs_success) {
break;
}

Expand All @@ -176,6 +184,16 @@ srs_error_t SrsHttpConn::do_cycle()
return err;
}

ISrsHttpMessageHandler* SrsHttpConn::handler()
{
return handler_;
}

srs_error_t SrsHttpConn::pull()
{
return trd->pull();
}

srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -239,8 +257,8 @@ srs_error_t SrsHttpConn::cycle()
{
srs_error_t err = do_cycle();

// Notify manager to remove it.
manager->remove(this);
// Notify handler to handle it.
handler_->on_conn_done();

// success.
if (err == srs_success) {
Expand Down Expand Up @@ -284,29 +302,41 @@ void SrsHttpConn::expire()
trd->interrupt();
}

SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port)
SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port)
{
manager = cm;
conn = new SrsHttpConn(this, fd, m, cip, port);
stfd = fd;
}

SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn()
{
srs_freep(conn);
}

srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
{
srs_error_t err = srs_success;

SrsStSocket skt;

// We start a socket to read the stfd, which is writing by conn.
// It's ok, because conn never read it after processing the HTTP request.
if ((err = skt.initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}

// Check user interrupt by interval.
skt->set_recv_timeout(3 * SRS_UTIME_SECONDS);
skt.set_recv_timeout(3 * SRS_UTIME_SECONDS);

// drop all request body.
char body[4096];
while (true) {
if ((err = trd->pull()) != srs_success) {
if ((err = conn->pull()) != srs_success) {
return srs_error_wrap(err, "timeout");
}

if ((err = skt->read(body, 4096, NULL)) != srs_success) {
if ((err = skt.read(body, 4096, NULL)) != srs_success) {
// Because we use timeout to check trd state, so we should ignore any timeout.
if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) {
srs_freep(err);
Expand All @@ -320,7 +350,7 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
return err;
}

srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg)
srs_error_t SrsResponseOnlyHttpConn::on_http_message(ISrsHttpMessage* msg)
{
srs_error_t err = srs_success;

Expand All @@ -343,9 +373,46 @@ srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg)
return err;
}

void SrsResponseOnlyHttpConn::expire()
void SrsResponseOnlyHttpConn::on_conn_done()
{
// Because we use manager to manage this object,
// not the http connection object, so we must remove it here.
manager->remove(this);
}

srs_error_t SrsResponseOnlyHttpConn::set_tcp_nodelay(bool v)
{
return conn->set_tcp_nodelay(v);
}

srs_error_t SrsResponseOnlyHttpConn::set_socket_buffer(srs_utime_t buffer_v)
{
return conn->set_socket_buffer(buffer_v);
}

std::string SrsResponseOnlyHttpConn::desc()
{
return "ROHttpConn";
}

std::string SrsResponseOnlyHttpConn::remote_ip()
{
return conn->remote_ip();
}

const SrsContextId& SrsResponseOnlyHttpConn::get_id()
{
return conn->get_id();
}

srs_error_t SrsResponseOnlyHttpConn::start()
{
return conn->start();
}

void SrsResponseOnlyHttpConn::remark(int64_t* in, int64_t* out)
{
SrsHttpConn::expire();
conn->remark(in, out);
}

SrsHttpServer::SrsHttpServer(SrsServer* svr)
Expand Down
Loading

0 comments on commit 24125b9

Please sign in to comment.