Skip to content

Commit 79adc5c

Browse files
committed
Automatically close response stream if connection closes
1 parent 693c3c8 commit 79adc5c

File tree

5 files changed

+190
-6
lines changed

5 files changed

+190
-6
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,12 @@ or use it for body data that needs to calculated.
509509

510510
If the request handler resolves with a response stream that is already closed,
511511
it will simply send an empty response body.
512+
If the client closes the connection while the stream is still open, the
513+
response stream will automatically be closed.
514+
If a promise is resolved with a streaming body after the client closes, the
515+
response stream will automatically be closed.
516+
The `close` event can be used to clean up any pending resources allocated
517+
in this case (if applicable).
512518

513519
If the response body is a `string`, a `Content-Length` header will be added
514520
automatically.

src/ChunkedEncoder.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,9 @@ public function close()
5252
}
5353

5454
$this->closed = true;
55-
56-
$this->readable = false;
55+
$this->input->close();
5756

5857
$this->emit('close');
59-
6058
$this->removeAllListeners();
6159
}
6260

src/Server.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,12 +341,27 @@ private function handleResponseBody(ResponseInterface $response, ConnectionInter
341341

342342
$stream = $response->getBody();
343343

344+
// close response stream if connection is already closed
345+
if (!$connection->isWritable()) {
346+
return $stream->close();
347+
}
348+
344349
$connection->write(Psr7Implementation\str($response));
350+
345351
if ($stream->isReadable()) {
346352
if ($response->getHeaderLine('Transfer-Encoding') === 'chunked') {
347353
$stream = new ChunkedEncoder($stream);
348354
}
349355

356+
// Close response stream once connection closes.
357+
// Note that this TCP/IP close detection may take some time,
358+
// in particular this may only fire on a later read/write attempt
359+
// because we stop/pause reading from the connection once the
360+
// request has been processed.
361+
$connection->on('close', function() use ($stream) {
362+
$stream->close();
363+
});
364+
350365
$stream->pipe($connection);
351366
} else {
352367
if ($response->getHeaderLine('Transfer-Encoding') === 'chunked') {

tests/FunctionalServerTest.php

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ public function testSecureHttpsOnHttpStandardPortReturnsUriWithPort()
353353
$socket->close();
354354
}
355355

356-
public function testClosedStreamFromRequestHandlerWillBeSendEmptyBody()
356+
public function testClosedStreamFromRequestHandlerWillSendEmptyBody()
357357
{
358358
$loop = Factory::create();
359359
$socket = new Socket(0, $loop);
@@ -379,6 +379,73 @@ public function testClosedStreamFromRequestHandlerWillBeSendEmptyBody()
379379

380380
$socket->close();
381381
}
382+
383+
public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesWhileSendingBody()
384+
{
385+
$loop = Factory::create();
386+
$socket = new Socket(0, $loop);
387+
$connector = new Connector($loop);
388+
389+
$stream = new ThroughStream();
390+
$stream->on('close', $this->expectCallableOnce());
391+
392+
$server = new Server($socket, function (RequestInterface $request) use ($stream) {
393+
return new Response(200, array(), $stream);
394+
});
395+
396+
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
397+
$conn->write("GET / HTTP/1.0\r\nContent-Length: 100\r\n\r\n");
398+
399+
$loop->addTimer(0.01, function() use ($conn) {
400+
$conn->end();
401+
});
402+
403+
return Stream\buffer($conn);
404+
});
405+
406+
$response = Block\await($result, $loop, 1.0);
407+
408+
$this->assertStringStartsWith("HTTP/1.0 200 OK", $response);
409+
$this->assertStringEndsWith("\r\n\r\n", $response);
410+
411+
$socket->close();
412+
}
413+
414+
public function testStreamFromRequestHandlerWillBeClosedIfConnectionClosesButWillOnlyBeDetectedOnNextWrite()
415+
{
416+
$loop = Factory::create();
417+
$socket = new Socket(0, $loop);
418+
$connector = new Connector($loop);
419+
420+
$stream = new ThroughStream();
421+
$stream->on('close', $this->expectCallableOnce());
422+
423+
$server = new Server($socket, function (RequestInterface $request) use ($stream) {
424+
return new Response(200, array(), $stream);
425+
});
426+
427+
$result = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) use ($loop) {
428+
$conn->write("GET / HTTP/1.0\r\n\r\n");
429+
430+
$loop->addTimer(0.01, function() use ($conn) {
431+
$conn->end();
432+
});
433+
434+
return Stream\buffer($conn);
435+
});
436+
437+
$response = Block\await($result, $loop, 1.0);
438+
439+
$stream->write('nope');
440+
Block\sleep(0.01, $loop);
441+
$stream->write('nope');
442+
Block\sleep(0.01, $loop);
443+
444+
$this->assertStringStartsWith("HTTP/1.0 200 OK", $response);
445+
$this->assertStringEndsWith("\r\n\r\n", $response);
446+
447+
$socket->close();
448+
}
382449
}
383450

384451
function noScheme($uri)

tests/ServerTest.php

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,39 @@ function ($data) use (&$buffer) {
677677
$this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer);
678678
}
679679

680-
public function testStreamAlreadyClosedWillSendEmptyBodyPlainHttp10()
680+
public function testResponseStreamEndingWillSendEmptyBodyChunkedEncoded()
681+
{
682+
$stream = new ThroughStream();
683+
684+
$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
685+
return new Response(200, array(), $stream);
686+
});
687+
688+
$buffer = '';
689+
690+
$this->connection
691+
->expects($this->any())
692+
->method('write')
693+
->will(
694+
$this->returnCallback(
695+
function ($data) use (&$buffer) {
696+
$buffer .= $data;
697+
}
698+
)
699+
);
700+
701+
$this->socket->emit('connection', array($this->connection));
702+
703+
$data = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
704+
$this->connection->emit('data', array($data));
705+
706+
$stream->end();
707+
708+
$this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer);
709+
$this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer);
710+
}
711+
712+
public function testResponseStreamAlreadyClosedWillSendEmptyBodyPlainHttp10()
681713
{
682714
$stream = new ThroughStream();
683715
$stream->close();
@@ -706,7 +738,73 @@ function ($data) use (&$buffer) {
706738

707739
$this->assertStringStartsWith("HTTP/1.0 200 OK\r\n", $buffer);
708740
$this->assertStringEndsWith("\r\n\r\n", $buffer);
709-
}
741+
}
742+
743+
public function testResponseStreamWillBeClosedIfConnectionIsAlreadyClosed()
744+
{
745+
$stream = new ThroughStream();
746+
$stream->on('close', $this->expectCallableOnce());
747+
748+
$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
749+
return new Response(200, array(), $stream);
750+
});
751+
752+
$buffer = '';
753+
754+
$this->connection
755+
->expects($this->any())
756+
->method('write')
757+
->will(
758+
$this->returnCallback(
759+
function ($data) use (&$buffer) {
760+
$buffer .= $data;
761+
}
762+
)
763+
);
764+
765+
$this->connection = $this->getMockBuilder('React\Socket\Connection')
766+
->disableOriginalConstructor()
767+
->setMethods(
768+
array(
769+
'write',
770+
'end',
771+
'close',
772+
'pause',
773+
'resume',
774+
'isReadable',
775+
'isWritable',
776+
'getRemoteAddress',
777+
'getLocalAddress',
778+
'pipe'
779+
)
780+
)
781+
->getMock();
782+
783+
$this->connection->expects($this->once())->method('isWritable')->willReturn(false);
784+
$this->connection->expects($this->never())->method('write');
785+
$this->connection->expects($this->never())->method('write');
786+
787+
$this->socket->emit('connection', array($this->connection));
788+
789+
$data = $this->createGetRequest();
790+
$this->connection->emit('data', array($data));
791+
}
792+
793+
public function testResponseStreamWillBeClosedIfConnectionEmitsCloseEvent()
794+
{
795+
$stream = new ThroughStream();
796+
$stream->on('close', $this->expectCallableOnce());
797+
798+
$server = new Server($this->socket, function (ServerRequestInterface $request) use ($stream) {
799+
return new Response(200, array(), $stream);
800+
});
801+
802+
$this->socket->emit('connection', array($this->connection));
803+
804+
$data = $this->createGetRequest();
805+
$this->connection->emit('data', array($data));
806+
$this->connection->emit('close');
807+
}
710808

711809
public function testResponseContainsSameRequestProtocolVersionAndChunkedBodyForHttp11()
712810
{

0 commit comments

Comments
 (0)