A comprehensive support component for Hyperf providing essential utilities, helpers, and base classes.
- 🎯 Fluent Dispatch API - Elegant job dispatch with support for async queue, AMQP, and Kafka
- 🔄 Closure Jobs - Execute closures as background jobs with dependency injection
- 🛠️ Helper Functions - Collection of useful helper functions
- 📦 Bus System - Pending dispatch classes for various message systems
- 🧩 Traits & Utilities - Reusable traits and utility classes
- ⏱️ Backoff Strategies - Multiple retry backoff implementations for retry mechanisms
composer require friendsofhyperf/supportThe dispatch() helper function provides a fluent API for dispatching jobs to different systems:
use function FriendsOfHyperf\Support\dispatch;
// Simple closure dispatch to async queue
dispatch(function () {
// Your job logic here
logger()->info('Job executed!');
});
// With configuration
dispatch(function () {
// Your job logic here
})
->onConnection('high-priority')
->delay(60) // Execute after 60 seconds
->setMaxAttempts(5);
// With dependency injection
dispatch(function (UserService $userService, LoggerInterface $logger) {
$users = $userService->getActiveUsers();
$logger->info('Processing ' . count($users) . ' users');
});use Hyperf\Amqp\Message\ProducerMessageInterface;
use function FriendsOfHyperf\Support\dispatch;
// Dispatch AMQP message
dispatch($amqpMessage)
->setPool('default')
->setExchange('my.exchange')
->setRoutingKey('my.routing.key')
->setTimeout(10)
->setConfirm(true);use Hyperf\Kafka\Producer\ProduceMessage;
use function FriendsOfHyperf\Support\dispatch;
// Dispatch Kafka message
dispatch($kafkaMessage)
->setPool('default');The CallQueuedClosure class allows you to execute closures as async queue jobs:
use FriendsOfHyperf\Support\CallQueuedClosure;
// Create a closure job
$job = CallQueuedClosure::create(function () {
// Your job logic
return 'Job completed!';
});
// Configure max attempts
$job->setMaxAttempts(3);
// The job can be pushed to queue manually or via dispatch()Fluent API for async queue job dispatch:
use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch;
$pending = new PendingAsyncQueueDispatch($job);
$pending
->onConnection('default')
->delay(30)
->when($condition, function ($dispatch) {
$dispatch->onConnection('special');
})
->unless($otherCondition, function ($dispatch) {
$dispatch->delay(60);
});
// Job is dispatched when object is destroyedFluent API for AMQP message dispatch:
use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch;
$pending = new PendingAmqpProducerMessageDispatch($message);
$pending
->setPool('default')
->setExchange('my.exchange')
->setRoutingKey('my.routing.key')
->setTimeout(5)
->setConfirm(true);
// Message is sent when object is destroyedFluent API for Kafka message dispatch:
use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch;
$pending = new PendingKafkaProducerMessageDispatch($message);
$pending->setPool('default');
// Message is sent when object is destroyedAll pending dispatch classes support conditional execution:
use function FriendsOfHyperf\Support\dispatch;
dispatch($job)
->when($shouldUseHighPriority, function ($dispatch) {
$dispatch->onConnection('high-priority');
})
->unless($isTestMode, function ($dispatch) {
$dispatch->delay(10);
});Creates a pending dispatch instance based on the job type:
Closure→PendingAsyncQueueDispatchwithCallQueuedClosureProducerMessageInterface→PendingAmqpProducerMessageDispatchProduceMessage→PendingKafkaProducerMessageDispatch- Other objects →
PendingAsyncQueueDispatch
onConnection(string $connection): static- Set queue connectiondelay(int $delay): static- Delay job execution (seconds)setMaxAttempts(int $attempts): static- Set max retry attemptswhen(mixed $condition, callable $callback): static- Conditional executionunless(mixed $condition, callable $callback): static- Inverse conditional execution
setPool(string $pool): static- Set AMQP pool namesetExchange(string $exchange): static- Set exchange namesetRoutingKey(array|string $routingKey): static- Set routing key(s)setTimeout(int $timeout): static- Set timeout (seconds)setConfirm(bool $confirm): static- Enable/disable confirm modewhen(mixed $condition, callable $callback): static- Conditional executionunless(mixed $condition, callable $callback): static- Inverse conditional execution
setPool(string $pool): static- Set Kafka pool namewhen(mixed $condition, callable $callback): static- Conditional executionunless(mixed $condition, callable $callback): static- Inverse conditional execution
create(Closure $closure): static- Create a new closure jobsetMaxAttempts(int $attempts): void- Set max retry attemptshandle(): mixed- Execute the closure (called by queue worker)
The component provides various backoff strategies for retry mechanisms:
Use custom delay intervals defined in an array:
use FriendsOfHyperf\Support\Backoff\ArrayBackoff;
// Custom delays
$backoff = new ArrayBackoff([100, 500, 1000, 2000, 5000]);
// Stop after array is exhausted (returns 0)
$backoff = new ArrayBackoff([100, 500, 1000], false);
// From comma-separated string
$backoff = ArrayBackoff::fromString('100, 500, 1000, 2000');
// From predefined patterns
$backoff = ArrayBackoff::fromPattern('short'); // [100, 200, 300, 500, 1000]
$backoff = ArrayBackoff::fromPattern('medium'); // [200, 500, 1000, 2000, 5000]
$backoff = ArrayBackoff::fromPattern('long'); // [500, 1000, 2000, 5000, 10000, 30000]
$backoff = ArrayBackoff::fromPattern('exponential'); // [100, 200, 400, 800, 1600, 3200, 6400]
// Usage in retry logic
$attempt = 0;
while (true) {
try {
return performOperation();
} catch (Exception $e) {
$delay = $backoff->next();
if ($delay === 0) {
throw $e; // No more retries
}
usleep($delay * 1000); // Convert to microseconds
}
}- ArrayBackoff - Custom intervals from an array
- FixedBackoff - Constant delay between retries
- LinearBackoff - Linear growth with configurable step
- ExponentialBackoff - Exponential growth with optional jitter
- FibonacciBackoff - Fibonacci sequence-based delays
- PoissonBackoff - Statistical distribution-based delays
- DecorrelatedJitterBackoff - Decorrelated jitter for better spreading
All backoff implementations implement BackoffInterface:
interface BackoffInterface
{
public function next(): int; // Get next delay in milliseconds
public function reset(): void; // Reset attempt counter
public function getAttempt(): int; // Get current attempt count
public function sleep(): int; // Sleep for calculated delay
}