ProductPromotion
Logo

PHP

made by https://0x3d.site

GitHub - pheanstalk/pheanstalk: PHP client for beanstalkd queue
PHP client for beanstalkd queue. Contribute to pheanstalk/pheanstalk development by creating an account on GitHub.
Visit Site

GitHub - pheanstalk/pheanstalk: PHP client for beanstalkd queue

GitHub - pheanstalk/pheanstalk: PHP client for beanstalkd queue

Pheanstalk

Latest Stable Version Total Downloads Scrutinizer Code Quality Code Coverage Build Status

Pheanstalk 5 is a pure PHP8.1+ client for use with beanstalkd workqueue versions 1.12 and later. In 2021 / 2022 / 2023 it was almost completely rewritten from scratch with the following goals in mind:

  • Fully typed
  • Passing the strict rule set for static analysis using PHPStan and Psalm
  • Splitting the different roles into separate parts

Usage Example

Producer

use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\TubeName;

$pheanstalk = Pheanstalk::create('127.0.0.1');
$tube       = new TubeName('testtube');

// Queue a Job
$pheanstalk->useTube($tube);
$pheanstalk->put("job payload goes here\n");

$pheanstalk->useTube($tube);
$pheanstalk->put(
    data: json_encode(['test' => 'data'], JSON_THROW_ON_ERROR),
    priority: Pheanstalk::DEFAULT_PRIORITY,
    delay: 30,
    timeToRelease: 60
);

Consumer / Worker

use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\TubeName;

$pheanstalk = Pheanstalk::create('127.0.0.1');
$tube       = new TubeName('testtube');

// we want jobs from 'testtube' only.
$pheanstalk->watch($tube);

// this hangs until a Job is produced.
$job = $pheanstalk->reserve();

try {
    $jobPayload = $job->getData();
    // do work.
    
    echo "Starting job with payload: {$jobPayload}\n";

    sleep(2);
    // If it's going to take a long time, periodically
    // tell beanstalk we're alive to stop it rescheduling the job.
    $pheanstalk->touch($job);
    sleep(2);

    // eventually we're done, delete job.
    $pheanstalk->delete($job);
}
catch(\Exception $e) {
    // handle exception.
    // and let some other worker retry.
    $pheanstalk->release($job); 
}

Systemd configuration for Consumer / Worker

Note that this does not aim to cover all possible scenarios or configurations.

[Unit]
Description=My App Worker

[Service]
User=deployer
Group=www-data
Restart=always
ExecStart=/usr/bin/php /var/www/html/worker.php

[Install]
WantedBy=multi-user.target

Running the tests

Make sure you have docker-compose installed.

> composer test

History

Pheanstalk 5

Migration to v5

Some breaking/important changes:

  • no more chaining of ->watch->ignore->reserve/reserveWithTimeout: each call should be made on its own
  • use of new TubeName() instead of strings for tube names
  • some constants from PheanstalkInterface have been moved to other interfaces:
    • DEFAULT_PORT to SocketFactoryInterface,
    • DEFAULT_DELAY/DEFAULT_PRIORITY/DEFAULT_TTR to PheanstalkPublisherInterface
  • put method 4th parameter name changed from ttr to timeToRelease

Pheanstalk 4

In 2018 Sam Mousa took on the responsibility of maintaining Pheanstalk.

Pheanstalk 4.0 drops support for older PHP versions. It contains the following changes (among other things):

  • Strict PHP type hinting
  • Value objects for Job IDs
  • Functions without side effects
  • Dropped support for persistent connections
  • Add support for multiple socket implementations (streams extension, socket extension, fsockopen)

Dropping support persistent connections

Persistent connections are a feature where a TCP connection is kept alive between different requests to reduce overhead from TCP connection set up. When reusing TCP connections we must always guarantee that the application protocol, in this case beanstalks' protocol is in a proper state. This is hard, and in some cases impossible; at the very least this means we must do some tests which cause roundtrips. Consider for example a connection that has just sent the command PUT 0 4000. The beanstalk server is now going to read 4000 bytes, but if the PHP script crashes during this write the next request get assigned this TCP socket. Now to reset the connection to a known state it used to subscribe to the default tube: use default. Since the beanstalk server is expecting 4000 bytes, it will just write this command to the job and wait for more bytes..

To prevent these kinds of issues the simplest solution is to not use persistent connections.

Dropped connection handling

Depending on the socket implementation used we might not be able to enable TCP keepalive. If we do not have TCP keepalive there is no way for us to detect dropped connections, the underlying OS may wait up to 15 minutes to decide that a TCP connection where no packets are being sent is disconnected. When using a socket implementation that supports read timeouts, like SocketSocket which uses the socket extension we use read and write timeouts to detect broken connections; the issue with the beanstalk protocol is that it allows for no packets to be sent for extended periods of time. Solutions are to either catch these connection exceptions and reconnect or use reserveWithTimeout() with a timeout that is less than the read / write timeouts.

Example code for a job runner could look like this (this is real production code):

use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\TubeName;

interface Task {

}
interface TaskFactory {
    public function fromData(string $data): Task;
}
interface CommandBus {
    public function handle(Task $task): void;
}

function run(\Pheanstalk\PheanstalkSubscriber $pheanstalk, CommandBus $commandBus, TaskFactory $taskFactory): void
{
    /**
     * @phpstan-ignore-next-line
     */
    while (true) {
        $job = $pheanstalk->reserveWithTimeout(50);
        if (isset($job)) {
            try {

                $task = $taskFactory->fromData($job->getData());
                $commandBus->handle($task);
                echo "Deleting job: {$job->getId()}\n";
                $pheanstalk->delete($job);
            } catch (\Throwable $t) {
                echo "Burying job: {$job->getId()}\n";
                $pheanstalk->bury($job);
            }
        }
    }
}

Here connection errors will cause the process to exit (and be restarted by a task manager).

Functions with side effects

In version 4 functions with side effects have been removed, functions like putInTube internally did several things:

  1. Switch to the tube
  2. Put the job in the new tube

In this example, the tube changes meaning that the connection is now in a different state. This is not intuitive and forces any user of the connection to always switch / check the current tube. Another issue with this approach is that it is harder to deal with errors. If an exception occurs it is unclear whether we did or did not switch tube.

Migration to v4

A migration should in most cases be relatively simple:

  • Change the constructor, either use the static constructor, use a DI container to construct the dependencies, or manually instantiate them.
  • Change instances of reserve() with a timeout to reserveWithTimeout(int $timeout) since reserve() no longer accepts a timeout parameter.
  • Run your tests, or use a static analyzer to test for calls to functions that no longer exist.
  • Make sure that you handle connection exceptions (this is not new to V4, only in V4 you will get more of them due to the default usage of a socket implementation that has read / write timeouts).

Pheanstalk 3

Pheanstalk is a pure PHP 7.1+ client for the beanstalkd workqueue. It has been actively developed, and used in production by many, since late 2008.

Created by Paul Annesley, Pheanstalk is rigorously unit tested and written using encapsulated, maintainable object oriented design. Community feedback, bug reports and patches has led to a stable 1.0 release in 2010, a 2.0 release in 2013, and a 3.0 release in 2014.

Pheanstalk 3.0 introduces PHP namespaces, PSR-1 and PSR-2 coding standards, and PSR-4 autoloader standard.

beanstalkd up to the latest version 1.10 is supported. All commands and responses specified in the protocol documentation for beanstalkd 1.3 are implemented.

More Resources
to explore the angular.

mail [email protected] to add your project or resources here 🔥.

Related Articles
to learn about angular.

FAQ's
to learn more about Angular JS.

mail [email protected] to add more queries here 🔍.

More Sites
to check out once you're finished browsing here.

0x3d
https://www.0x3d.site/
0x3d is designed for aggregating information.
NodeJS
https://nodejs.0x3d.site/
NodeJS Online Directory
Cross Platform
https://cross-platform.0x3d.site/
Cross Platform Online Directory
Open Source
https://open-source.0x3d.site/
Open Source Online Directory
Analytics
https://analytics.0x3d.site/
Analytics Online Directory
JavaScript
https://javascript.0x3d.site/
JavaScript Online Directory
GoLang
https://golang.0x3d.site/
GoLang Online Directory
Python
https://python.0x3d.site/
Python Online Directory
Swift
https://swift.0x3d.site/
Swift Online Directory
Rust
https://rust.0x3d.site/
Rust Online Directory
Scala
https://scala.0x3d.site/
Scala Online Directory
Ruby
https://ruby.0x3d.site/
Ruby Online Directory
Clojure
https://clojure.0x3d.site/
Clojure Online Directory
Elixir
https://elixir.0x3d.site/
Elixir Online Directory
Elm
https://elm.0x3d.site/
Elm Online Directory
Lua
https://lua.0x3d.site/
Lua Online Directory
C Programming
https://c-programming.0x3d.site/
C Programming Online Directory
C++ Programming
https://cpp-programming.0x3d.site/
C++ Programming Online Directory
R Programming
https://r-programming.0x3d.site/
R Programming Online Directory
Perl
https://perl.0x3d.site/
Perl Online Directory
Java
https://java.0x3d.site/
Java Online Directory
Kotlin
https://kotlin.0x3d.site/
Kotlin Online Directory
PHP
https://php.0x3d.site/
PHP Online Directory
React JS
https://react.0x3d.site/
React JS Online Directory
Angular
https://angular.0x3d.site/
Angular JS Online Directory