Skip to content

Commit d752f00

Browse files
authored
Introduce PushMiddlewareConfig + Refactor and make internal PushMiddlewareDispatcher (#283)
1 parent f060bdf commit d752f00

10 files changed

Lines changed: 103 additions & 63 deletions

File tree

config/di.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
1515
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
1616
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactoryInterface;
17+
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
1718
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
1819
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactoryInterface;
19-
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
2020
use Yiisoft\Queue\Worker\Worker as QueueWorker;
2121
use Yiisoft\Queue\Worker\WorkerInterface;
2222

@@ -36,8 +36,8 @@
3636
PushMiddlewareFactoryInterface::class => PushMiddlewareFactory::class,
3737
ConsumeMiddlewareFactoryInterface::class => ConsumeMiddlewareFactory::class,
3838
FailureMiddlewareFactoryInterface::class => FailureMiddlewareFactory::class,
39-
PushMiddlewareDispatcher::class => [
40-
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']],
39+
PushMiddlewareConfig::class => [
40+
'__construct()' => ['commonMiddlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']],
4141
],
4242
ConsumeMiddlewareDispatcher::class => [
4343
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-consume']],

docs/guide/en/configuration-manual.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
2222
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareFactory;
2323
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
2424
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
25+
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
2526
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
26-
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
2727
use Yiisoft\Queue\Queue;
2828
use Yiisoft\Queue\Worker\Worker;
2929

@@ -51,7 +51,7 @@ $failureMiddlewareDispatcher = new FailureMiddlewareDispatcher(
5151
[],
5252
);
5353

54-
$pushMiddlewareDispatcher = new PushMiddlewareDispatcher(
54+
$pushMiddlewareConfig = new PushMiddlewareConfig(
5555
new PushMiddlewareFactory($container, $callableFactory),
5656
);
5757

@@ -75,7 +75,7 @@ $queue = new Queue(
7575
$worker,
7676
$loop,
7777
$logger,
78-
$pushMiddlewareDispatcher,
78+
$pushMiddlewareConfig,
7979
);
8080

8181
// Now you can push messages

docs/guide/en/synchronous-mode.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ $logger = $DIContainer->get(\Psr\Log\LoggerInterface::class);
1515

1616
$worker = $DIContainer->get(\Yiisoft\Queue\Worker\WorkerInterface::class);
1717
$loop = $DIContainer->get(\Yiisoft\Queue\Cli\LoopInterface::class);
18-
$pushMiddlewareDispatcher = $DIContainer->get(
19-
\Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher::class
18+
$pushMiddlewareConfig = $DIContainer->get(
19+
\Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig::class
2020
);
2121

2222
$queue = new Yiisoft\Queue\Queue(
2323
$worker,
2424
$loop,
2525
$logger,
26-
$pushMiddlewareDispatcher,
26+
$pushMiddlewareConfig,
2727
);
2828
```
2929

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Yiisoft\Queue\Middleware\Push;
6+
7+
/**
8+
* Holds the push middleware factory and the list of common middleware definitions
9+
* applied to queues.
10+
*/
11+
final class PushMiddlewareConfig
12+
{
13+
/**
14+
* @param PushMiddlewareFactoryInterface $middlewareFactory Factory used to instantiate middleware from definitions.
15+
* @param array<array|callable|PushMiddlewareInterface|string> $commonMiddlewareDefinitions Middleware definitions.
16+
*/
17+
public function __construct(
18+
public readonly PushMiddlewareFactoryInterface $middlewareFactory,
19+
public readonly array $commonMiddlewareDefinitions = [],
20+
) {}
21+
}

src/Middleware/Push/PushMiddlewareDispatcher.php

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66

77
use Closure;
88
use Yiisoft\Queue\Message\MessageInterface;
9+
use Yiisoft\Queue\Queue;
910

11+
/**
12+
* @internal Used internally by {@see Queue}.
13+
*/
1014
final class PushMiddlewareDispatcher
1115
{
1216
/**
@@ -15,35 +19,44 @@ final class PushMiddlewareDispatcher
1519
* @var PushMiddlewareStack|null The middleware stack.
1620
*/
1721
private ?PushMiddlewareStack $stack = null;
22+
1823
/**
19-
* @var array[]|callable[]|PushMiddlewareInterface[]|string[]
24+
* @param PushMiddlewareFactoryInterface $middlewareFactory Factory used to instantiate middleware.
25+
* @param array<array|callable|PushMiddlewareInterface|string> $middlewareDefinitions Middleware definitions.
26+
* @param PushHandlerInterface $finishHandler Finish message handler.
2027
*/
21-
private array $middlewareDefinitions;
22-
2328
public function __construct(
2429
private readonly PushMiddlewareFactoryInterface $middlewareFactory,
25-
array|callable|string|PushMiddlewareInterface ...$middlewareDefinitions,
26-
) {
27-
$this->middlewareDefinitions = $middlewareDefinitions;
28-
}
30+
private array $middlewareDefinitions,
31+
private PushHandlerInterface $finishHandler,
32+
) {}
2933

3034
/**
3135
* Dispatch message through middleware to get response.
3236
*
3337
* @param MessageInterface $message Message to pass to middleware.
34-
* @param PushHandlerInterface $finishHandler Handler to use in case no middleware produced a response.
3538
*/
36-
public function dispatch(
37-
MessageInterface $message,
38-
PushHandlerInterface $finishHandler,
39-
): MessageInterface {
39+
public function dispatch(MessageInterface $message): MessageInterface
40+
{
4041
if ($this->stack === null) {
41-
$this->stack = new PushMiddlewareStack($this->buildMiddlewares(), $finishHandler);
42+
$this->stack = new PushMiddlewareStack($this->buildMiddlewares(), $this->finishHandler);
4243
}
4344

4445
return $this->stack->handlePush($message);
4546
}
4647

48+
public function withFinishHandler(PushHandlerInterface $finishHandler): self
49+
{
50+
$instance = clone $this;
51+
$instance->finishHandler = $finishHandler;
52+
53+
// Fixes a memory leak.
54+
unset($instance->stack);
55+
$instance->stack = null;
56+
57+
return $instance;
58+
}
59+
4760
/**
4861
* Returns new instance with middleware handlers replaced with the ones provided.
4962
*

src/Queue.php

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
use Yiisoft\Queue\Message\MessageInterface;
1212
use Yiisoft\Queue\Middleware\Push\AdapterPushHandler;
1313
use Yiisoft\Queue\Middleware\Push\PushHandlerInterface;
14-
use Yiisoft\Queue\Middleware\Push\PushMiddlewareInterface;
14+
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
1515
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
16+
use Yiisoft\Queue\Middleware\Push\PushMiddlewareInterface;
1617
use Yiisoft\Queue\Middleware\Push\SynchronousPushHandler;
1718
use Yiisoft\Queue\Worker\WorkerInterface;
1819
use Yiisoft\Queue\Message\IdEnvelope;
@@ -25,13 +26,6 @@ final class Queue implements QueueInterface
2526
*/
2627
private array $middlewareDefinitions;
2728

28-
/**
29-
* @var PushHandlerInterface The final push handler in the middleware chain, responsible
30-
* for actually sending the message. Uses {@see SynchronousPushHandler} in synchronous mode or
31-
* {@see AdapterPushHandler} otherwise.
32-
*/
33-
private PushHandlerInterface $finalPushHandler;
34-
3529
private string $name;
3630

3731
/**
@@ -40,11 +34,18 @@ final class Queue implements QueueInterface
4034
*/
4135
private PushMiddlewareDispatcher $dispatcher;
4236

37+
/**
38+
* @var PushMiddlewareDispatcher The base dispatcher built from {@see PushMiddlewareConfig}.
39+
* Holds the common middleware applied to all queues.
40+
*/
41+
private PushMiddlewareDispatcher $baseDispatcher;
42+
4343
/**
4444
* @param WorkerInterface $worker The worker that processes messages.
4545
* @param LoopInterface $loop The loop for controlling message processing.
4646
* @param LoggerInterface $logger The logger for debug and informational messages.
47-
* @param PushMiddlewareDispatcher $baseDispatcher The middleware dispatcher.
47+
* @param PushMiddlewareConfig $middlewareConfig The push middleware configuration: factory and common middleware
48+
* definitions.
4849
* @param AdapterInterface|null $adapter The message adapter (`null` for synchronous mode).
4950
* @param string|BackedEnum $name The queue name.
5051
* @param PushMiddlewareInterface|callable|array|string ...$middlewareDefinitions Queue-specific middleware
@@ -54,19 +55,25 @@ public function __construct(
5455
private readonly WorkerInterface $worker,
5556
private readonly LoopInterface $loop,
5657
private readonly LoggerInterface $logger,
57-
private readonly PushMiddlewareDispatcher $baseDispatcher,
58+
PushMiddlewareConfig $middlewareConfig,
5859
private readonly ?AdapterInterface $adapter = null,
5960
string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE,
6061
PushMiddlewareInterface|callable|array|string ...$middlewareDefinitions,
6162
) {
6263
$this->name = StringNormalizer::normalize($name);
63-
$this->finalPushHandler = $this->createFinalPushHandler();
64+
$this->baseDispatcher = new PushMiddlewareDispatcher(
65+
$middlewareConfig->middlewareFactory,
66+
$middlewareConfig->commonMiddlewareDefinitions,
67+
$this->createFinalPushHandler(),
68+
);
6469
$this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
6570
}
6671

6772
public function __clone()
6873
{
69-
$this->finalPushHandler = $this->createFinalPushHandler();
74+
$finalPushHandler = $this->createFinalPushHandler();
75+
$this->baseDispatcher = $this->baseDispatcher->withFinishHandler($finalPushHandler);
76+
$this->dispatcher = $this->dispatcher->withFinishHandler($finalPushHandler);
7077
}
7178

7279
public function getName(): string
@@ -81,7 +88,7 @@ public function push(MessageInterface $message): MessageInterface
8188
['messageType' => $message->getType()],
8289
);
8390

84-
$message = $this->dispatcher->dispatch($message, $this->finalPushHandler);
91+
$message = $this->dispatcher->dispatch($message);
8592

8693
if ($this->isSynchronous()) {
8794
$this->logger->info(

tests/Benchmark/QueueBench.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;
1919
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
2020
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
21+
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
2122
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
22-
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
2323
use Yiisoft\Queue\Queue;
2424
use Yiisoft\Queue\QueueInterface;
2525
use Yiisoft\Queue\Tests\Benchmark\Support\VoidAdapter;
@@ -59,7 +59,7 @@ public function __construct()
5959
$worker,
6060
new SimpleLoop(0),
6161
$logger,
62-
new PushMiddlewareDispatcher(new PushMiddlewareFactory($container, $callableFactory)),
62+
new PushMiddlewareConfig(new PushMiddlewareFactory($container, $callableFactory)),
6363
$this->adapter,
6464
);
6565
}

tests/Integration/MiddlewareTest.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware;
2424
use Yiisoft\Queue\Middleware\FailureHandling\Implementation\SendAgainMiddleware;
2525
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
26+
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
2627
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
27-
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
2828
use Yiisoft\Queue\Queue;
2929
use Yiisoft\Queue\QueueInterface;
3030
use Yiisoft\Queue\Tests\Integration\Support\TestMiddleware;
@@ -45,23 +45,25 @@ public function testFullStackPush(): void
4545
'channel 4',
4646
];
4747

48-
$pushMiddlewareDispatcher = new PushMiddlewareDispatcher(
48+
$pushMiddlewareConfig = new PushMiddlewareConfig(
4949
new PushMiddlewareFactory(
5050
$this->createMock(ContainerInterface::class),
5151
new CallableFactory(
5252
$this->createMock(ContainerInterface::class),
5353
),
5454
),
55-
new TestMiddleware('common 1'),
56-
new TestMiddleware('common 2'),
55+
[
56+
new TestMiddleware('common 1'),
57+
new TestMiddleware('common 2'),
58+
],
5759
);
5860
$worker = $this->createMock(WorkerInterface::class);
5961
$worker->method('process')->willReturnArgument(0);
6062
$queue = new Queue(
6163
$worker,
6264
$this->createMock(LoopInterface::class),
6365
$this->createMock(LoggerInterface::class),
64-
$pushMiddlewareDispatcher,
66+
$pushMiddlewareConfig,
6567
name: 'test',
6668
);
6769
$queue = $queue

tests/TestCase.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareFactory;
2121
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
2222
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
23+
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
2324
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
24-
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
2525
use Yiisoft\Queue\Queue;
2626
use Yiisoft\Queue\Worker\Worker;
2727
use Yiisoft\Queue\Worker\WorkerInterface;
@@ -97,7 +97,7 @@ protected function createQueue(
9797
$this->getWorker(),
9898
$this->getLoop(),
9999
new NullLogger(),
100-
$this->getPushMiddlewareDispatcher(),
100+
$this->getPushMiddlewareConfig(),
101101
$adapter,
102102
$name,
103103
);
@@ -158,9 +158,9 @@ protected function getMessageHandlers(): array
158158
];
159159
}
160160

161-
protected function getPushMiddlewareDispatcher(): PushMiddlewareDispatcher
161+
protected function getPushMiddlewareConfig(): PushMiddlewareConfig
162162
{
163-
return new PushMiddlewareDispatcher(
163+
return new PushMiddlewareConfig(
164164
new PushMiddlewareFactory(
165165
$this->getContainer(),
166166
new CallableFactory($this->getContainer()),

0 commit comments

Comments
 (0)