From 75bad56dc5c7f9aaa028718960c7d1c8e6c46c7d Mon Sep 17 00:00:00 2001 From: Alexander Makarov Date: Thu, 30 Apr 2026 20:37:10 +0300 Subject: [PATCH 1/3] Fix delayed retry push middleware pipeline --- src/Debug/QueueDecorator.php | 11 +++++++++++ .../ExponentialDelayMiddleware.php | 8 +++----- src/Middleware/Push/NoopMessageHandlerPush.php | 18 ------------------ src/QueueInterface.php | 5 +++++ stubs/StubQueue.php | 11 +++++++++++ tests/App/DummyQueue.php | 11 +++++++++++ 6 files changed, 41 insertions(+), 23 deletions(-) delete mode 100644 src/Middleware/Push/NoopMessageHandlerPush.php diff --git a/src/Debug/QueueDecorator.php b/src/Debug/QueueDecorator.php index 908ad064..7400fae6 100644 --- a/src/Debug/QueueDecorator.php +++ b/src/Debug/QueueDecorator.php @@ -6,6 +6,7 @@ use Yiisoft\Queue\MessageStatus; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface; use Yiisoft\Queue\QueueInterface; final class QueueDecorator implements QueueInterface @@ -30,6 +31,16 @@ public function push(MessageInterface $message): MessageInterface return $message; } + public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self + { + return new self($this->queue->withMiddlewares(...$middlewareDefinitions), $this->collector); + } + + public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self + { + return new self($this->queue->withMiddlewaresAdded(...$middlewareDefinitions), $this->collector); + } + public function run(int $max = 0): int { return $this->queue->run($max); diff --git a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php index e303f5e6..ffef3a26 100644 --- a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php +++ b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php @@ -10,7 +10,6 @@ use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface; -use Yiisoft\Queue\Middleware\Push\NoopMessageHandlerPush; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; @@ -65,11 +64,10 @@ public function processFailure( $message = $request->getMessage(); if ($this->suites($message)) { $envelope = new FailureEnvelope($message, $this->createNewMeta($message)); - $envelope = $this->delayMiddleware - ->withDelay($this->getDelay($envelope)) - ->processPush($envelope, new NoopMessageHandlerPush()); $queue = $this->queue ?? $request->getQueue(); - $messageNew = $queue->push($envelope); + $messageNew = $queue + ->withMiddlewaresAdded($this->delayMiddleware->withDelay($this->getDelay($envelope))) + ->push($envelope); return $request->withMessage($messageNew); } diff --git a/src/Middleware/Push/NoopMessageHandlerPush.php b/src/Middleware/Push/NoopMessageHandlerPush.php deleted file mode 100644 index 77b92d6d..00000000 --- a/src/Middleware/Push/NoopMessageHandlerPush.php +++ /dev/null @@ -1,18 +0,0 @@ - Date: Thu, 30 Apr 2026 20:41:52 +0300 Subject: [PATCH 2/3] Add delayed retry middleware pipeline regression test --- tests/Integration/MiddlewareTest.php | 1 + .../ExponentialDelayMiddlewareTest.php | 93 +++++++++++++++++++ .../SendAgainMiddlewareTest.php | 1 + 3 files changed, 95 insertions(+) diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index 1b1cc23e..7af2a4ce 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -129,6 +129,7 @@ public function testFullStackFailure(): void $callableFactory = new CallableFactory($container); $queue->expects(self::exactly(7))->method('push')->willReturnCallback($queueCallback); + $queue->method('withMiddlewaresAdded')->willReturnSelf(); $queue->method('getName')->willReturn('simple'); $middlewares = [ diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php index 949646a9..1808b421 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php @@ -7,11 +7,16 @@ use Exception; use InvalidArgumentException; use PHPUnit\Framework\Attributes\DataProvider; +use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Message\Message; +use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\MessageStatus; use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest; use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware; use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; +use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface; +use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Stubs\StubDelayMiddleware; use Yiisoft\Queue\Tests\TestCase; @@ -134,6 +139,7 @@ public function testPipelineSuccess(): void { $message = new Message('test', null); $queue = $this->createMock(QueueInterface::class); + $queue->method('withMiddlewaresAdded')->willReturnSelf(); $queue->method('push')->willReturnArgument(0); $middleware = new ExponentialDelayMiddleware( 'test', @@ -184,4 +190,91 @@ public function testPipelineFailure(): void $request = new FailureHandlingRequest($message, $exception, $queue); $middleware->processFailure($request, $nextHandler); } + + public function testDelayMiddlewareWrapsActualRetryPush(): void + { + $message = new Message('test', null); + $adapter = new DelayAwareAdapter(); + $queue = $this->createQueue($adapter); + $middleware = new ExponentialDelayMiddleware( + 'test', + 1, + 1, + 1, + 1, + new AdapterContextDelayMiddleware($adapter), + $queue, + ); + + $request = new FailureHandlingRequest($message, new Exception('test'), $queue); + $middleware->processFailure($request, new ThrowingFailureHandler()); + + self::assertSame([1.0], $adapter->delaysDuringPush); + } +} + +final class AdapterContextDelayMiddleware implements DelayMiddlewareInterface +{ + public function __construct( + private readonly DelayAwareAdapter $adapter, + ) {} + + private float $delay = 0.0; + + public function withDelay(float $seconds): self + { + $new = clone $this; + $new->delay = $seconds; + + return $new; + } + + public function processPush(MessageInterface $message, MessageHandlerPushInterface $handler): MessageInterface + { + $this->adapter->activeDelay = $this->delay; + + try { + return $handler->handlePush($message); + } finally { + $this->adapter->activeDelay = null; + } + } +} + +final class DelayAwareAdapter implements AdapterInterface +{ + /** + * @var list + */ + public array $delaysDuringPush = []; + + public ?float $activeDelay = null; + + public function runExisting(callable $handlerCallback): void + { + } + + public function status(string|int $id): MessageStatus + { + return MessageStatus::DONE; + } + + public function push(MessageInterface $message): MessageInterface + { + $this->delaysDuringPush[] = $this->activeDelay; + + return $message; + } + + public function subscribe(callable $handlerCallback): void + { + } +} + +final class ThrowingFailureHandler implements MessageFailureHandlerInterface +{ + public function handleFailure(FailureHandlingRequest $request): FailureHandlingRequest + { + throw $request->getException(); + } } diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php index 7641ea88..b3fc2c5f 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php @@ -212,6 +212,7 @@ private function getPreparedQueue(array $metaResult, bool $suites): QueueInterfa }; $queue = $this->createMock(QueueInterface::class); + $queue->method('withMiddlewaresAdded')->willReturnSelf(); $queue->expects($suites ? self::once() : self::never()) ->method('push') ->willReturnCallback($queueAssertion); From a1dee60b98be65d8161081a05da9ba816f1441a8 Mon Sep 17 00:00:00 2001 From: samdark <47294+samdark@users.noreply.github.com> Date: Thu, 30 Apr 2026 17:45:27 +0000 Subject: [PATCH 3/3] Apply PHP CS Fixer and Rector changes (CI) --- .../ExponentialDelayMiddlewareTest.php | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php index 1808b421..f413db93 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php @@ -215,12 +215,12 @@ public function testDelayMiddlewareWrapsActualRetryPush(): void final class AdapterContextDelayMiddleware implements DelayMiddlewareInterface { + private float $delay = 0.0; + public function __construct( private readonly DelayAwareAdapter $adapter, ) {} - private float $delay = 0.0; - public function withDelay(float $seconds): self { $new = clone $this; @@ -250,9 +250,7 @@ final class DelayAwareAdapter implements AdapterInterface public ?float $activeDelay = null; - public function runExisting(callable $handlerCallback): void - { - } + public function runExisting(callable $handlerCallback): void {} public function status(string|int $id): MessageStatus { @@ -266,9 +264,7 @@ public function push(MessageInterface $message): MessageInterface return $message; } - public function subscribe(callable $handlerCallback): void - { - } + public function subscribe(callable $handlerCallback): void {} } final class ThrowingFailureHandler implements MessageFailureHandlerInterface