From f18668318b511c520664b724b0bead925273ab06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 22 Feb 2019 10:58:48 +0100 Subject: [PATCH 1/3] Add TCP driver --- src/InfluxDB/Client.php | 7 +- src/InfluxDB/Driver/AbstractSocketDriver.php | 97 ++++++++++++++++++++ src/InfluxDB/Driver/TCP.php | 44 +++++++++ src/InfluxDB/Driver/UDP.php | 79 ++-------------- 4 files changed, 156 insertions(+), 71 deletions(-) create mode 100644 src/InfluxDB/Driver/AbstractSocketDriver.php create mode 100644 src/InfluxDB/Driver/TCP.php diff --git a/src/InfluxDB/Client.php b/src/InfluxDB/Client.php index 14c8c51..c5c8f33 100644 --- a/src/InfluxDB/Client.php +++ b/src/InfluxDB/Client.php @@ -8,6 +8,7 @@ use InfluxDB\Driver\Exception as DriverException; use InfluxDB\Driver\Guzzle; use InfluxDB\Driver\QueryDriverInterface; +use InfluxDB\Driver\TCP; use InfluxDB\Driver\UDP; /** @@ -258,6 +259,7 @@ public function listUsers() * * https+influxdb://username:pass@localhost:8086/databasename * udp+influxdb://username:pass@localhost:4444/databasename + * tcp+influxdb://username:pass@localhost:8094/databasename * * @param string $dsn * @param float $timeout @@ -303,9 +305,12 @@ public static function fromDSN($dsn, $timeout = 0, $verifySSL = false, $connectT $connectTimeout ); - // set the UDP driver when the DSN specifies UDP if ($modifier === 'udp') { + // set the UDP driver when the DSN specifies UDP $client->setDriver(new UDP($connParams['host'], $connParams['port'])); + } elseif ($modifier === 'tcp') { + // set the TCP driver when the DSN specifies TCP + $client->setDriver(new TCP($connParams['host'], $connParams['port'])); } return ($dbName ? $client->selectDB($dbName) : $client); diff --git a/src/InfluxDB/Driver/AbstractSocketDriver.php b/src/InfluxDB/Driver/AbstractSocketDriver.php new file mode 100644 index 0000000..7e4ef6b --- /dev/null +++ b/src/InfluxDB/Driver/AbstractSocketDriver.php @@ -0,0 +1,97 @@ +config['host'] = $host; + $this->config['port'] = $port; + } + + /** + * Close the stream (if created) + */ + public function __destruct() + { + if (isset($this->stream) && is_resource($this->stream)) { + fclose($this->stream); + } + } + + /** + * {@inheritdoc} + */ + public function setParameters(array $parameters) + { + $this->parameters = $parameters; + } + + /** + * {@inheritdoc} + */ + public function getParameters() + { + return $this->parameters; + } + + /** + * {@inheritdoc} + */ + public function write($data = null) + { + if (isset($this->stream) === false) { + $this->createStream(); + } + + $this->doWrite($data); + } + + /** + * {@inheritdoc} + */ + abstract public function isSuccess(); + + /** + * Perform write to socket + * @param mixed|null $data + */ + abstract protected function doWrite($data = null); + + /** + * Create the resource stream + */ + abstract protected function createStream(); + +} diff --git a/src/InfluxDB/Driver/TCP.php b/src/InfluxDB/Driver/TCP.php new file mode 100644 index 0000000..66f6f58 --- /dev/null +++ b/src/InfluxDB/Driver/TCP.php @@ -0,0 +1,44 @@ +result; + } + + /** + * {@inheritdoc} + */ + protected function createStream() + { + $host = sprintf('tcp://%s:%d', $this->config['host'], $this->config['port']); + + // stream the data using TCP and suppress any errors + $this->stream = @stream_socket_client($host); + } + + /** + * {@inheritdoc} + */ + protected function doWrite($data = null) + { + $this->result = false !== fwrite($this->stream, $data); + } +} diff --git a/src/InfluxDB/Driver/UDP.php b/src/InfluxDB/Driver/UDP.php index 0cd3010..6b5da80 100644 --- a/src/InfluxDB/Driver/UDP.php +++ b/src/InfluxDB/Driver/UDP.php @@ -10,76 +10,8 @@ * * @package InfluxDB\Driver */ -class UDP implements DriverInterface +class UDP extends AbstractSocketDriver implements DriverInterface { - /** - * Parameters - * - * @var array - */ - private $parameters; - - /** - * @var array - */ - private $config; - - /** - * - * @var resource - */ - private $stream; - - /** - * @param string $host IP/hostname of the InfluxDB host - * @param int $port Port of the InfluxDB process - */ - public function __construct($host, $port) - { - $this->config['host'] = $host; - $this->config['port'] = $port; - } - - /** - * Close the stream (if created) - */ - public function __destruct() - { - if (isset($this->stream) && is_resource($this->stream)) { - fclose($this->stream); - } - } - - /** - * {@inheritdoc} - */ - public function setParameters(array $parameters) - { - $this->parameters = $parameters; - } - - /** - * {@inheritdoc} - */ - public function getParameters() - { - return $this->parameters; - } - - /** - * {@inheritdoc} - */ - public function write($data = null) - { - if (isset($this->stream) === false) { - $this->createStream(); - } - - @stream_socket_sendto($this->stream, $data); - - return true; - } - /** * {@inheritdoc} */ @@ -89,7 +21,7 @@ public function isSuccess() } /** - * Create the resource stream + * {@inheritdoc} */ protected function createStream() { @@ -99,4 +31,11 @@ protected function createStream() $this->stream = @stream_socket_client($host); } + /** + * {@inheritdoc} + */ + protected function doWrite($data = null) + { + @stream_socket_sendto($this->stream, $data); + } } From 4edb2f82131234d001e98d5ce23eace55f5228df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Wed, 27 Feb 2019 10:20:19 +0100 Subject: [PATCH 2/3] Suppress errors from writing to stream --- src/InfluxDB/Driver/TCP.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/InfluxDB/Driver/TCP.php b/src/InfluxDB/Driver/TCP.php index 66f6f58..ac01dc1 100644 --- a/src/InfluxDB/Driver/TCP.php +++ b/src/InfluxDB/Driver/TCP.php @@ -39,6 +39,6 @@ protected function createStream() */ protected function doWrite($data = null) { - $this->result = false !== fwrite($this->stream, $data); + $this->result = false !== @fwrite($this->stream, $data); } } From 7a1cb157916bc1e1d3972b798fbb2d58a31e1af5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Wed, 27 Feb 2019 12:39:17 +0100 Subject: [PATCH 3/3] Always end TCP message with newline character --- src/InfluxDB/Driver/AbstractSocketDriver.php | 8 +++++--- src/InfluxDB/Driver/TCP.php | 4 ++-- src/InfluxDB/Driver/UDP.php | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/InfluxDB/Driver/AbstractSocketDriver.php b/src/InfluxDB/Driver/AbstractSocketDriver.php index 7e4ef6b..4a82fde 100644 --- a/src/InfluxDB/Driver/AbstractSocketDriver.php +++ b/src/InfluxDB/Driver/AbstractSocketDriver.php @@ -75,7 +75,9 @@ public function write($data = null) $this->createStream(); } - $this->doWrite($data); + if ($data !== null) { + $this->doWrite($data); + } } /** @@ -85,9 +87,9 @@ abstract public function isSuccess(); /** * Perform write to socket - * @param mixed|null $data + * @param mixed $data */ - abstract protected function doWrite($data = null); + abstract protected function doWrite($data); /** * Create the resource stream diff --git a/src/InfluxDB/Driver/TCP.php b/src/InfluxDB/Driver/TCP.php index ac01dc1..320224c 100644 --- a/src/InfluxDB/Driver/TCP.php +++ b/src/InfluxDB/Driver/TCP.php @@ -37,8 +37,8 @@ protected function createStream() /** * {@inheritdoc} */ - protected function doWrite($data = null) + protected function doWrite($data) { - $this->result = false !== @fwrite($this->stream, $data); + $this->result = false !== @fwrite($this->stream, "$data\n"); } } diff --git a/src/InfluxDB/Driver/UDP.php b/src/InfluxDB/Driver/UDP.php index 6b5da80..bacbc30 100644 --- a/src/InfluxDB/Driver/UDP.php +++ b/src/InfluxDB/Driver/UDP.php @@ -34,7 +34,7 @@ protected function createStream() /** * {@inheritdoc} */ - protected function doWrite($data = null) + protected function doWrite($data) { @stream_socket_sendto($this->stream, $data); }