A job queue based on beanstalkd(and other queue system), easy to dispatch job and handle job.
queue
, job
, easy handle
, job with timeout
, delayed job
, retry_after
, given connection
, given queue
, given tube
, memory limit
,reload
composer require "jetea/queue:~2.0"
use Jetea\Queue\Drivers\Beanstalkd;
use Jetea\Queue\Queue;
$queue = new Queue(new Beanstalkd($host, $port));
<?php
use Jetea\Queue\Job;
class ExampleJob extends Job
{
/**
* @var string job queue name (beanstalkd tube)
*/
public $queue = 'default';
/**
* The "time to run" for all pushed jobs. (beanstalkd ttr, timeout)
*
* @var int 允许 worker 执行的最大秒数,超时 job 将会被 release 到 ready 状态.
*/
public $retry_after = 60;
/**
* The number of times the job may be attempted.
*
* @var int 最大尝试次数
*/
public $tries = 1;
/**
* @var array
*/
public $words;
public function __construct(array $words)
{
$this->words = $words;
}
public function handle()
{
var_export($this->words);
var_dump($this->retry_after, $this->tries);
// throw new \Exception('handle job with error...lol ^_^');
}
}
specifying job queue by defining $queue
, specifying Max Job Attempts by defining $tries
, specifying timeout Values by defining $retry_after
.
$queue->push(new ExampleJob(['i', 'love', 'china']));
of cause, you can dispatch job later (push a delayed job) :
$queue->later(60, new ExampleJob(['i', 'love', 'china']));
$worker = new Worker($queue);
$worker->daemon();
Note:
$worker->daemon()
is blocking.
by default, the worker will will listen the tube named default
, you can specifying worker queue (beanstalkd tube) like :
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon();
you can specifying worker with sleep time while there is no job, and memoryLimit, like :
$sleep = 60;
$memoryLimit = 128;
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon($sleep, $memoryLimit);
Notice, if you want reload queue worker, you should implement queueShouldRestart
method on class Jetea\Queue\Worker
.
it's highly recommended that extends Jetea\Queue\Worker
, and you should implements these methods: logProcessError
, handleWithObj
, queueShouldRestart
.
the queue package can work on other queue system, you should implements these interface Jetea\Queue\Drivers\Jobs\JobInterface
, Jetea\Queue\Drivers\QueueInterface
.
- https://github.com/Jetea/queue/blob/master/tests/QueueTest.php
- https://github.com/Jetea/app/blob/master/ctx/tests/Ctx/queue_example.php
- add some useful command, like clearn queue job or kick buried job, etc.