Skip to content

Commit

Permalink
Merge pull request php-amqplib#653 from ramunasd/handle_closed_connec…
Browse files Browse the repository at this point in the history
…tion

Handle broken pipe or closed connection exceptions
  • Loading branch information
lukebakken authored Mar 8, 2019
2 parents 4fd0b0d + 4ebc006 commit a4d00e5
Show file tree
Hide file tree
Showing 17 changed files with 758 additions and 105 deletions.
33 changes: 24 additions & 9 deletions PhpAmqpLib/Channel/AMQPChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace PhpAmqpLib\Channel;

use PhpAmqpLib\Exception\AMQPBasicCancelException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Message\AMQPMessage;
Expand Down Expand Up @@ -120,6 +121,14 @@ public function __construct($connection, $channel_id = null, $auto_decode = true
}
}

/**
* @return bool
*/
public function is_open()
{
return $this->is_open;
}

/**
* Tear down this object, after we've agreed to close with the server.
*/
Expand All @@ -130,6 +139,7 @@ protected function do_close()
}
$this->channel_id = $this->connection = null;
$this->is_open = false;
$this->callbacks = array();
}

/**
Expand Down Expand Up @@ -1147,15 +1157,20 @@ public function basic_publish(
$pkt = new AMQPWriter();
$pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));

$this->connection->send_content(
$this->channel_id,
60,
0,
mb_strlen($msg->body, 'ASCII'),
$msg->serialize_properties(),
$msg->body,
$pkt
);
try {
$this->connection->send_content(
$this->channel_id,
60,
0,
mb_strlen($msg->body, 'ASCII'),
$msg->serialize_properties(),
$msg->body,
$pkt
);
} catch (AMQPConnectionClosedException $e) {
$this->do_close();
throw $e;
}

if ($this->next_delivery_tag > 0) {
$this->published_messages[$this->next_delivery_tag] = $msg;
Expand Down
6 changes: 6 additions & 0 deletions PhpAmqpLib/Channel/AbstractChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPNotImplementedException;
Expand Down Expand Up @@ -347,6 +348,11 @@ public function wait($allowed_methods = null, $non_blocking = false, $timeout =
} catch (AMQPNoDataException $e) {
// no data ready for non-blocking actions - stop and exit
break;
} catch (AMQPConnectionClosedException $exception) {
if ($this instanceof AMQPChannel) {
$this->do_close();
}
throw $exception;
}

$this->validate_method_frame($frame_type);
Expand Down
1 change: 1 addition & 0 deletions PhpAmqpLib/Connection/AMQPSocketConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public function __construct(
$locale,
$io,
$heartbeat,
max($read_timeout, $write_timeout),
$channel_rpc_timeout
);
}
Expand Down
6 changes: 6 additions & 0 deletions PhpAmqpLib/Connection/AbstractConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ public function write($data)

try {
$this->getIO()->write($data);
} catch (AMQPConnectionClosedException $e) {
$this->do_close();
throw $e;
} catch (AMQPRuntimeException $e) {
$this->setIsConnected(false);
throw $e;
Expand Down Expand Up @@ -556,6 +559,9 @@ protected function wait_frame($timeout = 0)
$this->input->setTimeout($currentTimeout);
}
throw $e;
} catch (AMQPConnectionClosedException $exception) {
$this->do_close();
throw $exception;
}

$this->input->setTimeout($currentTimeout);
Expand Down
3 changes: 3 additions & 0 deletions PhpAmqpLib/Exception/AMQPConnectionClosedException.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
<?php
namespace PhpAmqpLib\Exception;

/**
* When connection was closed by server, proxy or some tunnel due to timeout or network issue.
*/
class AMQPConnectionClosedException extends AMQPRuntimeException
{
}
29 changes: 29 additions & 0 deletions PhpAmqpLib/Exception/AMQPTimeoutException.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
<?php

namespace PhpAmqpLib\Exception;

class AMQPTimeoutException extends \RuntimeException implements AMQPExceptionInterface
{
/**
* @var int|float|null
*/
private $timeout;

public function __construct($message = '', $timeout = 0, $code = 0, \Exception $previous = null)
{
parent::__construct($message, $code, $previous);
$this->timeout = $timeout;
}

/**
* @param int|float|null $timeout
* @param int $code
* @return self
*/
public static function writeTimeout($timeout, $code = 0)
{
return new self('Error sending data. Connection timed out.', $timeout, $code);
}

/**
* @return int|float|null
*/
public function getTimeout()
{
return $this->timeout;
}
}
25 changes: 22 additions & 3 deletions PhpAmqpLib/Wire/IO/AbstractIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,23 @@

abstract class AbstractIO
{
const BUFFER_SIZE = 8192;

/** @var string */
protected $host;

/** @var int */
protected $port;

/** @var int|float */
protected $connection_timeout;

/** @var int|float */
protected $read_timeout;

/** @var int|float */
protected $write_timeout;

/** @var int */
protected $heartbeat;

Expand All @@ -36,14 +47,22 @@ abstract class AbstractIO
protected $canDispatchPcntlSignal = false;

/**
* @param int $n
* @param int $len
* @return string
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws \PhpAmqpLib\Exception\AMQPSocketException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
*/
abstract public function read($n);
abstract public function read($len);

/**
* @param string $data
* @return mixed
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPSocketException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
*/
abstract public function write($data);

Expand Down
Loading

0 comments on commit a4d00e5

Please sign in to comment.