spiral / roadrunner-jobs
RoadRunner Queues (Jobs) plugin API library
Fund package maintenance!
roadrunner-server
Requires
- php: >=8.1
- ext-json: *
- ramsey/uuid: ^3 || ^4
- roadrunner-php/roadrunner-api-dto: ^1.0
- spiral/goridge: ^4.0
- spiral/roadrunner: ^2023.1 || ^2024.1
- spiral/roadrunner-worker: ^3.0
Requires (Dev)
- google/protobuf: ^3.17 || ^4.0
- phpunit/phpunit: ^10.0
- roave/security-advisories: dev-master
- vimeo/psalm: >=5.8
README
RoadRunner Jobs Plugin
This repository contains the codebase PHP bridge using RoadRunner Jobs plugin.
Installation
To install application server and Jobs codebase
composer require spiral/roadrunner-jobs
You can use the convenient installer to download the latest available compatible version of RoadRunner assembly:
composer require spiral/roadrunner-cli --dev vendor/bin/rr get
Configuration
First you need to add at least one jobs adapter to your RoadRunner configuration. For example, such a configuration would be quite feasible to run:
rpc: listen: tcp://127.0.0.1:6001 server: command: php consumer.php relay: pipes jobs: consume: [ "local" ] pipelines: local: driver: memory config: priority: 10 prefetch: 10000
Note Read more about all available drivers on the documentation page.
After starting the server with this configuration, one driver named local
will be available to you.
Usage
Producer
The following code will allow writing and reading an arbitrary value from the RoadRunner server.
<?php use Spiral\RoadRunner\Jobs\Jobs; use Spiral\Goridge\RPC\RPC; require __DIR__ . '/vendor/autoload.php'; // Jobs service $jobs = new Jobs(RPC::create('tcp://127.0.0.1:6001')); // Select "local" pipeline from jobs $queue = $jobs->connect('local'); // Create task prototype with default headers $task = $queue->create('ping', '{"site": "https://example.com"}') // Create task with "echo" name ->withHeader('attempts', 4) // Number of attempts to execute the task ->withHeader('retry-delay', 10); // Delay between attempts // Push "echo" task to the queue $task = $queue->dispatch($task); var_dump($task->getId() . ' has been queued');
Consumer
The Consumer processes tasks from RoadRunner server and responds based on the processing outcome:
ack
- is used for positive acknowledgements.nack
- is used for negative acknowledgements.requeue
- is used for requeuing the task.
The behavior of the nack
method depends on its implementation by the queue driver. It can accept an additional
parameter redelivery; if it is passed and set to true, the task will be requeued. However, not all drivers
support this functionality. If the redelivery parameter is not passed, set to false, or the queue driver's
implementation does not support it, the task will not be requeued.
$task->nack(message: $reason, redelivery: true);
The requeue
method is implemented by RoadRunner and does not depend on the queue driver. It allows you to resend
the task to the end of the queue and add additional headers to the task.
$task->withHeader('attempts', (string) ($attempts + 1))->requeue($exception);
The nack
and requeue
methods have the ability to specify a delay for requeuing the task. To do this, call
the withDelay
method and pass the desired value before invoking the nack
or requeue
methods.
$task->withDelay(10)->requeue($exception);
<?php use Spiral\RoadRunner\Jobs\Consumer; use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface; require __DIR__ . '/vendor/autoload.php'; $consumer = new Spiral\RoadRunner\Jobs\Consumer(); /** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */ while ($task = $consumer->waitTask()) { try { $name = $task->getName(); // "ping" $queue = $task->getQueue(); // "local" $driver = $task->getDriver(); // "memory" $payload = $task->getPayload(); // {"site": "https://example.com"} // Process task $task->ack(); } catch (\Throwable $e) { $task->requeue($e); } }
License
The MIT License (MIT). Please see LICENSE
for more information. Maintained
by Spiral Scout.