Skip to content

Commit

Permalink
Merge pull request #4 from AgentSoftware/NOTICKET-rate-limit
Browse files Browse the repository at this point in the history
NOTICKET Added rate limit
  • Loading branch information
samhannan authored Apr 5, 2024
2 parents 4a79e88 + 80cc814 commit bd56d31
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ composer.lock
/phpunit.xml
.phpunit.result.cache
.php-cs-fixer.cache
.idea
19 changes: 12 additions & 7 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,36 @@ parameters:
path: src/RawSqsJob.php

-
message: "#^Cannot access offset 'Body' on mixed\\.$#"
count: 2
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:log\\(\\) has parameter \\$context with no value type specified in iterable type array\\.$#"
count: 1
path: src/RawSqsQueue.php

-
message: "#^Cannot access offset 0 on mixed\\.$#"
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:pushRaw\\(\\) has parameter \\$options with no value type specified in iterable type array\\.$#"
count: 1
path: src/RawSqsQueue.php

-
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:pushRaw\\(\\) has parameter \\$options with no value type specified in iterable type array\\.$#"
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:querySqs\\(\\) return type has no value type specified in iterable type Aws\\\\Result\\.$#"
count: 1
path: src/RawSqsQueue.php

-
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:querySqs\\(\\) return type has no value type specified in iterable type array\\.$#"
count: 1
path: src/RawSqsQueue.php

-
message: "#^Parameter \\#1 \\$json of function json_decode expects string, mixed given\\.$#"
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:querySqs\\(\\) return type has no value type specified in iterable type array\\|Aws\\\\Result\\.$#"
count: 1
path: src/RawSqsQueue.php

-
message: "#^Parameter \\#1 \\$value of function count expects array\\|Countable, mixed given\\.$#"
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:receiveMessage\\(\\) return type has no value type specified in iterable type Aws\\\\Result\\.$#"
count: 1
path: src/RawSqsQueue.php

-
message: "#^Parameter \\#3 \\$job of class Illuminate\\\\Queue\\\\Jobs\\\\SqsJob constructor expects array, mixed given\\.$#"
message: "#^Method AgentSoftware\\\\LaravelRawSqsConnector\\\\RawSqsQueue\\:\\:receiveMessage\\(\\) return type has no value type specified in iterable type array\\.$#"
count: 1
path: src/RawSqsQueue.php
5 changes: 5 additions & 0 deletions src/RawSqsConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public function connect(array $config): Queue|RawSqsQueue
$rawSqsQueue->setJobClass($config['job_class']);
}

if (Arr::get($config, 'rate_limit')) {
$limit = $config['rate_limit'];
$rawSqsQueue->setRateLimit(is_callable($limit) ? $limit() : $limit);
}

return $rawSqsQueue;
}

Expand Down
81 changes: 74 additions & 7 deletions src/RawSqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

namespace AgentSoftware\LaravelRawSqsConnector;

use Aws\Result;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\InvalidPayloadException;
use Illuminate\Queue\Jobs\SqsJob;
use Illuminate\Queue\SqsQueue;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\RateLimiter;
use Illuminate\Support\Str;

class RawSqsQueue extends SqsQueue
{
protected string $jobClass;
protected ?int $rateLimit = null;

public function pop($queue = null): SqsJob|Job|null
{
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue = $this->getQueue($queue),
'AttributeNames' => ['All'],
]);
$queue = $this->getQueue($queue);

$response = $this->receiveMessage($queue);

if (!is_null($response['Messages']) && count($response['Messages']) > 0) {
if ($response !== null && !is_null($response['Messages']) && count($response['Messages']) > 0) {
$message = $response['Messages'][0];

$jobBody = json_decode($message['Body'], true);
Expand All @@ -30,7 +34,6 @@ public function pop($queue = null): SqsJob|Job|null
$payload = $this->createPayload($captureJob, $queue, $jobBody);
$message['Body'] = $payload;


return new SqsJob(
$this->container,
$this->sqs,
Expand All @@ -43,6 +46,53 @@ public function pop($queue = null): SqsJob|Job|null
return null;
}

protected function receiveMessage(string $queue): Result|array|null
{
if ($this->getRateLimit() === null) {
return $this->querySqs($queue);
}

$key = 'sqs:' . Str::slug($this->jobClass);

$remainingAttempts = $this->hasRemainingAttempts($key);

if ($remainingAttempts) {
return $this->querySqs($queue);
}

$this->log('Rate limit hit for SQS queue worker', [
'queue' => $queue,
'key' => $key
]);

return null;
}

protected function log(string $text, array $context = []): void
{
Log::info($text, $context);
}

protected function hasRemainingAttempts(string $key): mixed
{
/** @var int $limit */
$limit = $this->getRateLimit();

return RateLimiter::attempt(
$key,
$limit,
fn () => true,
);
}

protected function querySqs(string $queue): Result|array
{
return $this->sqs->receiveMessage([
'QueueUrl' => $queue,
'AttributeNames' => ['All'],
]);
}

/**
* @param object|string $job
* @param string $data
Expand Down Expand Up @@ -79,7 +129,6 @@ public function later($delay, $job, $data = '', $queue = null)
throw new InvalidPayloadException('later is not permitted for raw-sqs connector');
}


/**
* @return string
*/
Expand All @@ -88,6 +137,14 @@ public function getJobClass(): string
return $this->jobClass;
}

/**
* @return int|null
*/
public function getRateLimit(): ?int
{
return $this->rateLimit;
}

/**
* @param string $jobClass
* @return $this
Expand All @@ -97,4 +154,14 @@ public function setJobClass(string $jobClass): static
$this->jobClass = $jobClass;
return $this;
}

/**
* @param int $rateLimit
* @return $this
*/
public function setRateLimit(int $rateLimit): static
{
$this->rateLimit = $rateLimit;
return $this;
}
}
36 changes: 36 additions & 0 deletions tests/RawSqsConnectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,42 @@ public function testConnectShouldReturnRawSqsQueue(): void
$this->assertInstanceOf(RawSqsQueue::class, $rawSqsQueue);
}

public function testCanSpecifyAnIntegerRateLimit(): void
{
$rawSqsConnector = new RawSqsConnector();

$config = [
'key' => 'key',
'secret' => 'secret',
'region' => 'eu-west-2',
'queue' => 'raw-sqs',
'job_class' => TestJobClass::class,
'rate_limit' => 1
];

$rawSqsQueue = $rawSqsConnector->connect($config);

$this->assertEquals(1, $rawSqsQueue->getRateLimit());
}

public function testCanSpecifyACallableRateLimit(): void
{
$rawSqsConnector = new RawSqsConnector();

$config = [
'key' => 'key',
'secret' => 'secret',
'region' => 'eu-west-2',
'queue' => 'raw-sqs',
'job_class' => TestJobClass::class,
'rate_limit' => fn () => 1
];

$rawSqsQueue = $rawSqsConnector->connect($config);

$this->assertEquals(1, $rawSqsQueue->getRateLimit());
}

public function testShouldThrowInvalidArgumentExceptionIfClassDoesNotExist(): void
{
$this->expectException(\InvalidArgumentException::class);
Expand Down
116 changes: 116 additions & 0 deletions tests/RawSqsQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Aws\Sqs\SqsClient;
use Illuminate\Container\Container;
use Illuminate\Queue\InvalidPayloadException;
use Illuminate\Support\Facades\RateLimiter;
use Mockery;
use PHPUnit\Framework\TestCase;
use AgentSoftware\LaravelRawSqsConnector\RawSqsQueue;
Expand Down Expand Up @@ -121,4 +122,119 @@ public function testLaterShouldThrowInvalidPayLoadException(): void

$rawSqsQueue->later(null, null);
}

public function testDoesNotUseRateLimiterIfRateLimitNotSpecified(): void
{
$firstName = 'Primitive';
$lastName = 'Sense';

$sqsReturnMessage = [
'Body' => json_encode([
'first_name' => $firstName,
'last_name' => $lastName
])
];

$sqsClientMock = Mockery::mock(SqsClient::class);
$sqsClientMock->shouldReceive('receiveMessage')
->andReturn([
'Messages' => [
$sqsReturnMessage
]
]);

$sqsClientMock->shouldNotReceive('hasRemainingAttempts');

$rawSqsQueue = new RawSqsQueue(
$sqsClientMock,
'default',
'prefix'
);

$container = Mockery::mock(Container::class);
$rawSqsQueue->setContainer($container);
$rawSqsQueue->setJobClass(TestJobClass::class);

$rawSqsQueue->pop();

$this->expectNotToPerformAssertions();
}

public function testWillReturnMessageIfRateLimitEnabled(): void
{
$firstName = 'Primitive';
$lastName = 'Sense';

$sqsReturnMessage = [
'Body' => json_encode([
'first_name' => $firstName,
'last_name' => $lastName
])
];

$sqsClientMock = Mockery::mock(SqsClient::class);
$sqsClientMock->shouldReceive('receiveMessage')
->andReturn([
'Messages' => [
$sqsReturnMessage
]
]);

$rawSqsQueue = Mockery::mock(RawSqsQueue::class, [
$sqsClientMock,
'default',
'prefix'
])
->shouldAllowMockingProtectedMethods()
->makePartial();

$rawSqsQueue
->shouldReceive('hasRemainingAttempts')
->andReturn(true);

$container = Mockery::mock(Container::class);
$rawSqsQueue->setContainer($container);
$rawSqsQueue->setJobClass(TestJobClass::class);
$rawSqsQueue->setRateLimit(1);

$rawSqsQueue->pop();

$this->expectNotToPerformAssertions();
}

public function testWillNotReturnMessageIfRateLimitEnabledButNoAttemptsLeft(): void
{
$sqsClientMock = Mockery::mock(SqsClient::class);
$sqsClientMock->shouldNotReceive('receiveMessage');

$rawSqsQueue = Mockery::mock(RawSqsQueue::class, [
$sqsClientMock,
'default',
'prefix'
])
->makePartial();

$rawSqsQueue
->shouldAllowMockingProtectedMethods()
->shouldReceive('hasRemainingAttempts')
->andReturn(false);

$rawSqsQueue
->shouldAllowMockingProtectedMethods()
->shouldReceive('log')
->once();

$rawSqsQueue
->shouldAllowMockingProtectedMethods()
->shouldNotReceive('querySqs');

$container = Mockery::mock(Container::class);
$rawSqsQueue->setContainer($container);
$rawSqsQueue->setJobClass(TestJobClass::class);
$rawSqsQueue->setRateLimit(1);

$rawSqsQueue->pop();

$this->expectNotToPerformAssertions();
}
}

0 comments on commit bd56d31

Please sign in to comment.