diff --git a/src/Communication/Socket/Socket.php b/src/Communication/Socket/Socket.php index d9a9a5f..1e5b2ce 100644 --- a/src/Communication/Socket/Socket.php +++ b/src/Communication/Socket/Socket.php @@ -5,6 +5,7 @@ use Aternos\Taskmaster\Communication\MessageInterface; use Aternos\Taskmaster\Communication\Socket\Exception\SocketReadException; use Aternos\Taskmaster\Communication\Socket\Exception\SocketWriteException; +use Aternos\Taskmaster\Taskmaster; use Generator; /** @@ -22,6 +23,8 @@ class Socket implements SocketInterface, SelectableSocketInterface */ protected mixed $socket; + protected string $receiveBuffer = ""; + /** * @param resource|Socket $socket */ @@ -62,10 +65,23 @@ public function receiveRaw(): Generator if (!is_resource($this->socket) || feof($this->socket)) { throw new SocketReadException("Could not read from socket."); } - $result = fgets($this->socket); + $result = $this->receiveBuffer; + do { + $chunk = fgets($this->socket, 10_001); + if ($chunk === false || strlen($chunk) === 0) { + break; + } + + $result .= $chunk; + } while (!str_ends_with($result, PHP_EOL)); if (!$result) { break; } + if (!str_ends_with($result, PHP_EOL)) { + $this->receiveBuffer = $result; + break; + } + $this->receiveBuffer = ""; $decoded = base64_decode($result); yield $decoded; } while (true); @@ -102,18 +118,22 @@ public function sendRaw(string $data): bool } $data = base64_encode($data); $data .= PHP_EOL; - $total = 0; - $expected = strlen($data); + $current = 0; + $total = strlen($data); do { if (!is_resource($this->socket) || feof($this->socket)) { throw new SocketWriteException("Could not write to socket."); } - $result = @fwrite($this->socket, $data); - if ($result === false || $result === 0) { + $chunk = substr($data, $current, 10_000); + $result = @fwrite($this->socket, $chunk); + if ($result === false) { throw new SocketWriteException("Could not write to socket."); } - $total += $result; - } while ($total < $expected); + if ($result === 0) { + usleep(Taskmaster::SOCKET_WAIT_TIME); + } + $current += $result; + } while ($current < $total); return true; } diff --git a/test/Integration/WorkerTestCase.php b/test/Integration/WorkerTestCase.php index 5aa0593..4d89840 100644 --- a/test/Integration/WorkerTestCase.php +++ b/test/Integration/WorkerTestCase.php @@ -8,6 +8,7 @@ use Aternos\Taskmaster\Test\Util\Task\CallbackTask; use Aternos\Taskmaster\Test\Util\Task\ChildExceptionTask; use Aternos\Taskmaster\Test\Util\Task\EmptyTask; +use Aternos\Taskmaster\Test\Util\Task\LargeTask; use Aternos\Taskmaster\Test\Util\Task\ParentExceptionTask; use Aternos\Taskmaster\Test\Util\Task\SynchronizedFieldTask; use Aternos\Taskmaster\Test\Util\Task\UnsynchronizedFieldTask; @@ -68,6 +69,14 @@ public function testGetTaskResult(): void $this->assertEquals(3, $task->getResult()); } + public function testRunLargeTask(): void + { + $task = new LargeTask(1_000_000); + $this->taskmaster->runTask($task); + $this->taskmaster->wait(); + $this->assertEquals(1_000_000, strlen($task->getResult())); + } + public function testGetTaskResultFromPromise(): void { $task = new AdditionTask(1, 2); diff --git a/test/Util/Task/LargeTask.php b/test/Util/Task/LargeTask.php new file mode 100644 index 0000000..799c433 --- /dev/null +++ b/test/Util/Task/LargeTask.php @@ -0,0 +1,25 @@ +data = str_repeat("T", $length); + } + + #[OnChild] public function run() + { + return $this->data; + } +} \ No newline at end of file