diff --git a/docs/guide/en/error-handling.md b/docs/guide/en/error-handling.md index 029e75be..c8c4b525 100644 --- a/docs/guide/en/error-handling.md +++ b/docs/guide/en/error-handling.md @@ -105,7 +105,7 @@ It's configured via constructor parameters, too. Here they are: Requirements: - - Requires a `DelayMiddlewareInterface` implementation and an adapter that supports delayed delivery. + - Requires an adapter that supports delayed delivery. The middleware uses `DelayEnvelope` to specify the delay time. If the adapter doesn't support delaying, it will **ignore the delay data** and process the message immediately. State tracking: diff --git a/docs/guide/en/usage.md b/docs/guide/en/usage.md index ec701e51..1ecbf280 100644 --- a/docs/guide/en/usage.md +++ b/docs/guide/en/usage.md @@ -19,16 +19,16 @@ $queue->push($message); To push a message that should be processed after 5 minutes: -Delayed execution is implemented via a push middleware. -The middleware must implement `\Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface` and is provided by the adapter package you use. -For example, the official AMQP adapter supports delays: +Delayed execution is implemented using the `DelayEnvelope`. The envelope wraps your message with delay information that adapters can use if they support delayed execution. ```php -$delayMiddleware = $container->get(\Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface::class); -$queue->withMiddlewaresAdded($delayMiddleware->withDelay(5 * 60))->push($message); +use Yiisoft\Queue\Message\DelayEnvelope; + +$delayedMessage = new DelayEnvelope($message, 5 * 60); // 5 minutes delay +$queue->push($delayedMessage); ``` -**Important:** Not every adapter (such as synchronous adapter) supports delayed execution. +**Important:** Adapters that support delaying will use the delay information from `DelayEnvelope` to schedule the message accordingly. Adapters that don't support delaying will **ignore the delay data** and process the message in the queue order. ## Queue handling diff --git a/src/Message/DelayEnvelope.php b/src/Message/DelayEnvelope.php new file mode 100644 index 00000000..8f79c85c --- /dev/null +++ b/src/Message/DelayEnvelope.php @@ -0,0 +1,32 @@ +getMetadata()[self::META_DELAY_SECONDS] ?? 0.0; + return new self($message, (float) $delaySeconds); + } + + public function getDelaySeconds(): float + { + return $this->delaySeconds; + } + + protected function getEnvelopeMetadata(): array + { + return [self::META_DELAY_SECONDS => $this->delaySeconds]; + } +} diff --git a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php index e303f5e6..e7f4a293 100644 --- a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php +++ b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php @@ -9,8 +9,7 @@ use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest; use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; -use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface; -use Yiisoft\Queue\Middleware\Push\NoopMessageHandlerPush; +use Yiisoft\Queue\Message\DelayEnvelope; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; @@ -29,7 +28,6 @@ final class ExponentialDelayMiddleware implements MiddlewareFailureInterface * @param float $delayInitial The first delay period * @param float $delayMaximum The maximum delay period * @param float $exponent Message handling delay will be increased by this multiplication each time it fails - * @param DelayMiddlewareInterface $delayMiddleware A middleware for message delaying. * @param QueueInterface|null $queue */ public function __construct( @@ -38,7 +36,6 @@ public function __construct( private readonly float $delayInitial, private readonly float $delayMaximum, private readonly float $exponent, - private readonly DelayMiddlewareInterface $delayMiddleware, private readonly ?QueueInterface $queue = null, ) { if ($maxAttempts <= 0) { @@ -64,12 +61,10 @@ public function processFailure( ): FailureHandlingRequest { $message = $request->getMessage(); if ($this->suites($message)) { - $envelope = new FailureEnvelope($message, $this->createNewMeta($message)); - $envelope = $this->delayMiddleware - ->withDelay($this->getDelay($envelope)) - ->processPush($envelope, new NoopMessageHandlerPush()); + $failureEnvelope = new FailureEnvelope($message, $this->createNewMeta($message)); + $delayEnvelope = new DelayEnvelope($failureEnvelope, $this->getDelay($failureEnvelope)); $queue = $this->queue ?? $request->getQueue(); - $messageNew = $queue->push($envelope); + $messageNew = $queue->push($delayEnvelope); return $request->withMessage($messageNew); } @@ -92,11 +87,14 @@ private function createNewMeta(MessageInterface $message): array private function getAttempts(MessageInterface $message): int { - return $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY][self::META_KEY_ATTEMPTS . "-$this->id"] ?? 0; + /** @var array{failure-strategy-exponential-delay-attempts?: int} $failureMeta */ + $failureMeta = $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY] ?? []; + return $failureMeta[self::META_KEY_ATTEMPTS . "-$this->id"] ?? 0; } private function getDelay(MessageInterface $message): float { + /** @var array{failure-strategy-exponential-delay-delay?: float} $meta */ $meta = $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY] ?? []; $key = self::META_KEY_DELAY . "-$this->id"; diff --git a/src/Middleware/Push/Implementation/DelayMiddlewareInterface.php b/src/Middleware/Push/Implementation/DelayMiddlewareInterface.php deleted file mode 100644 index bd52a52e..00000000 --- a/src/Middleware/Push/Implementation/DelayMiddlewareInterface.php +++ /dev/null @@ -1,22 +0,0 @@ -handlePush($message); - } -} diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index 1b1cc23e..fe22c71b 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -9,7 +9,6 @@ use Psr\Container\ContainerInterface; use Psr\Log\LoggerInterface; use Yiisoft\Injector\Injector; -use Yiisoft\Queue\Stubs\StubDelayMiddleware; use Yiisoft\Test\Support\Container\SimpleContainer; use Yiisoft\Test\Support\Log\SimpleLogger; use Yiisoft\Queue\Cli\LoopInterface; @@ -150,7 +149,6 @@ public function testFullStackFailure(): void 1, 5, 2, - new StubDelayMiddleware(), $queue, ), ], diff --git a/tests/Unit/Message/DelayEnvelopeTest.php b/tests/Unit/Message/DelayEnvelopeTest.php new file mode 100644 index 00000000..100cf768 --- /dev/null +++ b/tests/Unit/Message/DelayEnvelopeTest.php @@ -0,0 +1,45 @@ + 'value']); + $delayEnvelope = new DelayEnvelope($message, 300.5); + + self::assertSame($message, $delayEnvelope->getMessage()); + self::assertSame('test', $delayEnvelope->getType()); + self::assertSame(['data' => 'value'], $delayEnvelope->getData()); + self::assertSame(300.5, $delayEnvelope->getDelaySeconds()); + + $metadata = $delayEnvelope->getMetadata(); + self::assertArrayHasKey(DelayEnvelope::META_DELAY_SECONDS, $metadata); + self::assertSame(300.5, $metadata[DelayEnvelope::META_DELAY_SECONDS]); + } + + public function testFromMessage(): void + { + $message = new Message('test', ['data' => 'value'], [DelayEnvelope::META_DELAY_SECONDS => 150]); + $delayEnvelope = DelayEnvelope::fromMessage($message); + + self::assertSame(150.0, $delayEnvelope->getDelaySeconds()); + self::assertSame('test', $delayEnvelope->getType()); + self::assertSame(['data' => 'value'], $delayEnvelope->getData()); + } + + public function testFromMessageWithoutDelay(): void + { + $message = new Message('test', ['data' => 'value']); + $delayEnvelope = DelayEnvelope::fromMessage($message); + + self::assertSame(0.0, $delayEnvelope->getDelaySeconds()); + } +} diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php index 949646a9..1b820c6f 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php @@ -13,7 +13,7 @@ use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware; use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Queue\QueueInterface; -use Yiisoft\Queue\Stubs\StubDelayMiddleware; +use Yiisoft\Queue\Message\DelayEnvelope; use Yiisoft\Queue\Tests\TestCase; use const PHP_INT_MAX; @@ -119,7 +119,6 @@ public static function constructorRequirementsProvider(): array #[DataProvider('constructorRequirementsProvider')] public function testConstructorRequirements(bool $success, array $arguments): void { - $arguments[] = new StubDelayMiddleware(); $arguments[] = $this->createMock(QueueInterface::class); if (!$success) { @@ -141,7 +140,6 @@ public function testPipelineSuccess(): void 1, 1, 1, - new StubDelayMiddleware(), $queue, ); $nextHandler = $this->createMock(MessageFailureHandlerInterface::class); @@ -152,6 +150,7 @@ public function testPipelineSuccess(): void self::assertNotEquals($request, $result); $message = $result->getMessage(); self::assertArrayHasKey(FailureEnvelope::FAILURE_META_KEY, $message->getMetadata()); + self::assertArrayHasKey(DelayEnvelope::META_DELAY_SECONDS, $message->getMetadata()); $meta = $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY]; self::assertArrayHasKey(ExponentialDelayMiddleware::META_KEY_ATTEMPTS . '-test', $meta); @@ -175,7 +174,6 @@ public function testPipelineFailure(): void 1, 1, 1, - new StubDelayMiddleware(), $queue, ); $nextHandler = $this->createMock(MessageFailureHandlerInterface::class); diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php index 7641ea88..aeb89642 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php @@ -17,7 +17,6 @@ use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; use Yiisoft\Queue\QueueInterface; -use Yiisoft\Queue\Stubs\StubDelayMiddleware; use Yiisoft\Queue\Tests\TestCase; class SendAgainMiddlewareTest extends TestCase @@ -179,7 +178,6 @@ private function getStrategy(string $strategyName, QueueInterface $queue): Middl self::EXPONENTIAL_STRATEGY_DELAY_INITIAL, self::EXPONENTIAL_STRATEGY_DELAY_MAXIMUM, self::EXPONENTIAL_STRATEGY_EXPONENT, - new StubDelayMiddleware(), $queue, ), default => throw new RuntimeException('Unknown strategy'),