Skip to content

Commit 9493104

Browse files
Merge pull request #9 from SarafApp/stream
✨ Add Stream Support to Query Builder
2 parents cdfe222 + 2768e44 commit 9493104

File tree

5 files changed

+181
-0
lines changed

5 files changed

+181
-0
lines changed

example/stream.php

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
use Dotenv\Dotenv;
4+
use React\EventLoop\Loop;
5+
use Saraf\QB\QueryBuilder\Core\DBFactory;
6+
use Saraf\QB\QueryBuilder\Enums\OrderDirection;
7+
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;
8+
9+
include "vendor/autoload.php";
10+
11+
// Loop
12+
$loop = Loop::get();
13+
14+
// Environments
15+
$env = Dotenv::createImmutable(__DIR__ . "/../");
16+
$env->load();
17+
18+
// Env Loader
19+
$DB_NAME = $_ENV['DB_NAME'];
20+
$DB_USER = $_ENV['DB_USER'];
21+
$DB_PASS = $_ENV['DB_PASS'];
22+
$DB_HOST = $_ENV['DB_HOST'];
23+
$DB_PORT_READ = $_ENV['DB_PORT_READ'];
24+
$DB_PORT_WRITE = $_ENV['DB_PORT_WRITE'];
25+
26+
27+
try {
28+
$dbFactory = new DBFactory(
29+
$loop,
30+
$DB_HOST,
31+
$DB_NAME,
32+
$DB_USER,
33+
$DB_PASS,
34+
$DB_PORT_WRITE,
35+
$DB_PORT_READ,
36+
5,
37+
5,
38+
2,
39+
2
40+
);
41+
} catch (DBFactoryException $e) {
42+
echo $e->getMessage();
43+
exit(1);
44+
}
45+
46+
// Without QB
47+
$dbFactory->getQueryBuilder()
48+
->select()
49+
->from("Users")
50+
->addColumn("id")
51+
->whereGreater("id", 1)
52+
->compile()
53+
->stream()
54+
->onData(function ($result) {
55+
echo "New Row Data:" . json_encode($result) . PHP_EOL;
56+
})
57+
->run();
58+
59+
// Without QueryBuilder
60+
$dbFactory->streamQuery("select id from Users where id > 1")
61+
->onData(function ($result) {
62+
echo "New Row Data:" . json_encode($result) . PHP_EOL;
63+
})
64+
->run();
65+
66+
$loop->addPeriodicTimer(1, function () {
67+
memory();
68+
});
69+
70+
function memory()
71+
{
72+
echo "Memory Stat: " . round(memory_get_usage() / 1_000_000, 2) . " / " . round(memory_get_usage(true) / 1_000_000, 2) . " MB" .
73+
" | Peak: " . round(memory_get_peak_usage() / 1_000_000, 2) . " / " . round(memory_get_peak_usage(true) / 1_000_000, 2) . " MB" . PHP_EOL;
74+
}
75+
76+
77+
$loop->run();

src/QueryBuilder/Core/DBFactory.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React\EventLoop\LoopInterface;
66
use React\MySQL\Factory;
77
use React\Promise\PromiseInterface;
8+
use React\Stream\ReadableStreamInterface;
89
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;
910
use Saraf\QB\QueryBuilder\Helpers\QBHelper;
1011
use Saraf\QB\QueryBuilder\QueryBuilder;
@@ -160,6 +161,28 @@ public function query(string $query): PromiseInterface
160161
});
161162
}
162163

164+
/**
165+
* @throws DBFactoryException
166+
*/
167+
public function streamQuery(string $query): StreamEventHandler
168+
{
169+
$isWrite = true;
170+
if (str_starts_with(strtolower($query), "select")
171+
|| str_starts_with(strtolower($query), "show")
172+
) $isWrite = false;
173+
174+
$bestConnections = $this->getBestConnection();
175+
176+
$connection = $isWrite
177+
? $this->writeConnections[$bestConnections['write']]
178+
: $this->readConnections[$bestConnections['read']];
179+
180+
if (!($connection instanceof DBWorker))
181+
throw new DBFactoryException("Connections Not Instance of Worker / Restart App");
182+
183+
return $connection->streamQuery($query);
184+
}
185+
163186
/**
164187
* @throws DBFactoryException
165188
*/

src/QueryBuilder/Core/DBWorker.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React\MySQL\ConnectionInterface;
66
use React\MySQL\QueryResult;
77
use React\Promise\PromiseInterface;
8+
use React\Stream\ReadableStreamInterface;
89

910
class DBWorker
1011
{
@@ -31,6 +32,14 @@ public function query(string $query): PromiseInterface
3132
});
3233
}
3334

35+
public function streamQuery(string $query): StreamEventHandler
36+
{
37+
$this->startJob();
38+
return (new StreamEventHandler($this->getConnection(), $query, function () {
39+
$this->endJob();
40+
}));
41+
}
42+
3443
protected function handleResult(QueryResult $result): array
3544
{
3645
if (!is_null($result->resultRows)) {

src/QueryBuilder/Core/EQuery.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use React\Promise\Promise;
66
use React\Promise\PromiseInterface;
7+
use React\Stream\ReadableStreamInterface;
78
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;
89

910
final class EQuery
@@ -30,6 +31,11 @@ public function commit(): PromiseInterface
3031
}
3132
}
3233

34+
public function stream(): StreamEventHandler
35+
{
36+
return $this->factory->streamQuery($this->query);
37+
}
38+
3339
public function getQuery(): Promise
3440
{
3541
return new Promise(function (callable $resolve) {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace Saraf\QB\QueryBuilder\Core;
4+
5+
use React\MySQL\ConnectionInterface;
6+
use React\Promise\Deferred;
7+
use React\Promise\PromiseInterface;
8+
9+
class StreamEventHandler
10+
{
11+
protected \Closure|null $onDataFn = null;
12+
protected Deferred $promise;
13+
14+
public function __construct(
15+
protected ConnectionInterface $connection,
16+
protected string $query,
17+
protected ?\Closure $onClosedWorker = null
18+
)
19+
{
20+
}
21+
22+
public function onData(\Closure $onDataFn): StreamEventHandler
23+
{
24+
$this->onDataFn = $onDataFn;
25+
return $this;
26+
}
27+
28+
/**
29+
* @throws \Exception
30+
*/
31+
public function run(mixed $initialValue = null): PromiseInterface
32+
{
33+
$promise = new Deferred();
34+
35+
if ($this->onDataFn == null) {
36+
throw new \Exception("onData is required");
37+
}
38+
39+
40+
$stream = $this->connection->queryStream($this->query);
41+
42+
$stream->on("data", function ($row) use (&$initialValue) {
43+
$initialValue = ($this->onDataFn)($row, $initialValue);
44+
});
45+
46+
$stream->on("error", function (\Throwable $error) use ($promise) {
47+
$promise->resolve([
48+
'result' => false,
49+
'error' => $error->getMessage(),
50+
]);
51+
});
52+
$stream->on("close", function () use (&$initialValue, $promise) {
53+
$promise->resolve([
54+
'result' => true,
55+
'data' => $initialValue
56+
]);
57+
});
58+
59+
// For Handling Inner Queue
60+
if ($this->onClosedWorker != null)
61+
$stream->on("close", $this->onClosedWorker);
62+
63+
return $promise->promise();
64+
}
65+
66+
}

0 commit comments

Comments
 (0)