Skip to content

Commit d12f73e

Browse files
authored
Merge pull request #4 from aternosorg/socket-chunks
send socket messages in chunks, test larger messages
2 parents 65af214 + 7942845 commit d12f73e

File tree

3 files changed

+61
-7
lines changed

3 files changed

+61
-7
lines changed

src/Communication/Socket/Socket.php

+27-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Aternos\Taskmaster\Communication\MessageInterface;
66
use Aternos\Taskmaster\Communication\Socket\Exception\SocketReadException;
77
use Aternos\Taskmaster\Communication\Socket\Exception\SocketWriteException;
8+
use Aternos\Taskmaster\Taskmaster;
89
use Generator;
910

1011
/**
@@ -22,6 +23,8 @@ class Socket implements SocketInterface, SelectableSocketInterface
2223
*/
2324
protected mixed $socket;
2425

26+
protected string $receiveBuffer = "";
27+
2528
/**
2629
* @param resource|Socket $socket
2730
*/
@@ -62,10 +65,23 @@ public function receiveRaw(): Generator
6265
if (!is_resource($this->socket) || feof($this->socket)) {
6366
throw new SocketReadException("Could not read from socket.");
6467
}
65-
$result = fgets($this->socket);
68+
$result = $this->receiveBuffer;
69+
do {
70+
$chunk = fgets($this->socket, 10_001);
71+
if ($chunk === false || strlen($chunk) === 0) {
72+
break;
73+
}
74+
75+
$result .= $chunk;
76+
} while (!str_ends_with($result, PHP_EOL));
6677
if (!$result) {
6778
break;
6879
}
80+
if (!str_ends_with($result, PHP_EOL)) {
81+
$this->receiveBuffer = $result;
82+
break;
83+
}
84+
$this->receiveBuffer = "";
6985
$decoded = base64_decode($result);
7086
yield $decoded;
7187
} while (true);
@@ -102,18 +118,22 @@ public function sendRaw(string $data): bool
102118
}
103119
$data = base64_encode($data);
104120
$data .= PHP_EOL;
105-
$total = 0;
106-
$expected = strlen($data);
121+
$current = 0;
122+
$total = strlen($data);
107123
do {
108124
if (!is_resource($this->socket) || feof($this->socket)) {
109125
throw new SocketWriteException("Could not write to socket.");
110126
}
111-
$result = @fwrite($this->socket, $data);
112-
if ($result === false || $result === 0) {
127+
$chunk = substr($data, $current, 10_000);
128+
$result = @fwrite($this->socket, $chunk);
129+
if ($result === false) {
113130
throw new SocketWriteException("Could not write to socket.");
114131
}
115-
$total += $result;
116-
} while ($total < $expected);
132+
if ($result === 0) {
133+
usleep(Taskmaster::SOCKET_WAIT_TIME);
134+
}
135+
$current += $result;
136+
} while ($current < $total);
117137
return true;
118138
}
119139

test/Integration/WorkerTestCase.php

+9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Aternos\Taskmaster\Test\Util\Task\CallbackTask;
99
use Aternos\Taskmaster\Test\Util\Task\ChildExceptionTask;
1010
use Aternos\Taskmaster\Test\Util\Task\EmptyTask;
11+
use Aternos\Taskmaster\Test\Util\Task\LargeTask;
1112
use Aternos\Taskmaster\Test\Util\Task\ParentExceptionTask;
1213
use Aternos\Taskmaster\Test\Util\Task\SynchronizedFieldTask;
1314
use Aternos\Taskmaster\Test\Util\Task\UnsynchronizedFieldTask;
@@ -68,6 +69,14 @@ public function testGetTaskResult(): void
6869
$this->assertEquals(3, $task->getResult());
6970
}
7071

72+
public function testRunLargeTask(): void
73+
{
74+
$task = new LargeTask(1_000_000);
75+
$this->taskmaster->runTask($task);
76+
$this->taskmaster->wait();
77+
$this->assertEquals(1_000_000, strlen($task->getResult()));
78+
}
79+
7180
public function testGetTaskResultFromPromise(): void
7281
{
7382
$task = new AdditionTask(1, 2);

test/Util/Task/LargeTask.php

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
namespace Aternos\Taskmaster\Test\Util\Task;
4+
5+
use Aternos\Taskmaster\Task\OnBoth;
6+
use Aternos\Taskmaster\Task\OnChild;
7+
use Aternos\Taskmaster\Task\Task;
8+
9+
class LargeTask extends Task
10+
{
11+
#[OnBoth] protected string $data;
12+
13+
/**
14+
* @param int $length
15+
*/
16+
public function __construct(int $length = 100_000)
17+
{
18+
$this->data = str_repeat("T", $length);
19+
}
20+
21+
#[OnChild] public function run()
22+
{
23+
return $this->data;
24+
}
25+
}

0 commit comments

Comments
 (0)