Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/Middleware/Push/PushMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ public function dispatch(

/**
* Returns new instance with middleware handlers replaced with the ones provided.
* The last specified handler will be executed first.
*
* @param array[]|callable[]|MiddlewarePushInterface[]|string[] $middlewareDefinitions Each array element is:
*
* - A name of a middleware class. The middleware instance will be obtained from container executed.
* - A callable with `function(ServerRequestInterface $request, RequestHandlerInterface $handler):
* ResponseInterface` signature.
* - A callable with `function(MessageInterface $message, MessageHandlerPushInterface $handler):
* MessageInterface` signature.
* - A "callable-like" array in format `[FooMiddleware::class, 'index']`. `FooMiddleware` instance will
* be created and `index()` method will be executed.
* - A function returning a middleware. The middleware returned will be executed.
Expand All @@ -73,6 +72,27 @@ public function withMiddlewares(array $middlewareDefinitions): self
return $instance;
}

/**
* Returns a new instance with additional middleware handlers added to the existing ones.
*
* @param array[]|callable[]|MiddlewarePushInterface[]|string[] $middlewareDefinitions Each array element is:
*
* - A name of a middleware class. The middleware instance will be obtained from container executed.
* - A callable with `function(MessageInterface $message, MessageHandlerPushInterface $handler):
* MessageInterface` signature.
* - A "callable-like" array in format `[FooMiddleware::class, 'index']`. `FooMiddleware` instance will
* be created and `index()` method will be executed.
* - A function returning a middleware. The middleware returned will be executed.
*
* For callables typed parameters are automatically injected using dependency injection container.
*
* @return self New instance of the {@see PushMiddlewareDispatcher}
*/
public function withMiddlewaresAdded(array $middlewareDefinitions): self
{
return $this->withMiddlewares([...array_reverse($this->middlewareDefinitions), ...$middlewareDefinitions]);
}

/**
* @return bool Whether there are middleware defined in the dispatcher.
*/
Expand Down
73 changes: 32 additions & 41 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,47 @@
final class Queue implements QueueInterface
{
/**
* @var array|array[]|callable[]|MiddlewarePushInterface[]|string[]
* @var array<array|callable|MiddlewarePushInterface|string> Queue-specific middleware definitions.
*/
private array $middlewareDefinitions;

/**
* @var MessageHandlerPushInterface $finalPushHandler The final push handler in the middleware chain, responsible
* @var MessageHandlerPushInterface The final push handler in the middleware chain, responsible
* for actually sending the message. Uses {@see SynchronousPushHandler} in synchronous mode or
* {@see AdapterPushHandler} otherwise.
*/
private MessageHandlerPushInterface $finalPushHandler;

private string $name;

/**
* @var PushMiddlewareDispatcher The dispatcher used for push messages, combining base dispatcher middleware with
* queue-specific middleware.
*/
private PushMiddlewareDispatcher $dispatcher;

/**
* @param WorkerInterface $worker The worker that processes messages.
* @param LoopInterface $loop The loop for controlling message processing.
* @param LoggerInterface $logger The logger for debug and informational messages.
* @param PushMiddlewareDispatcher $baseDispatcher The middleware dispatcher.
* @param AdapterInterface|null $adapter The message adapter (`null` for synchronous mode).
* @param string|BackedEnum $name The queue name.
* @param MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions Queue-specific middleware
* definitions.
*/
public function __construct(
private readonly WorkerInterface $worker,
private readonly LoopInterface $loop,
private readonly LoggerInterface $logger,
private readonly PushMiddlewareDispatcher $pushMiddlewareDispatcher,
private readonly PushMiddlewareDispatcher $baseDispatcher,
private readonly ?AdapterInterface $adapter = null,
string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE,
MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions,
) {
$this->name = StringNormalizer::normalize($name);
$this->middlewareDefinitions = $middlewareDefinitions;
$this->finalPushHandler = $this->createFinalPushHandler();
$this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
}

public function __clone()
Expand All @@ -65,10 +81,7 @@ public function push(MessageInterface $message): MessageInterface
['messageType' => $message->getType()],
);

$message = $this->pushMiddlewareDispatcher->dispatch(
$message,
$this->createPushHandler(),
);
$message = $this->dispatcher->dispatch($message, $this->finalPushHandler);

if ($this->isSynchronous()) {
$this->logger->info(
Expand Down Expand Up @@ -143,55 +156,33 @@ public function status(string|int $id): MessageStatus
public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
$instance = clone $this;
$instance->middlewareDefinitions = $middlewareDefinitions;

$instance->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
return $instance;
}

public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
$instance = clone $this;
$instance->middlewareDefinitions = [...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)];

$instance->setMiddlewaresAndPrepareDispatcher([...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]);
return $instance;
}

/**
* @param array<MiddlewarePushInterface|callable|array|string> $middlewareDefinitions
*/
private function setMiddlewaresAndPrepareDispatcher(array $middlewareDefinitions): void
{
$this->middlewareDefinitions = $middlewareDefinitions;
$this->dispatcher = $this->baseDispatcher->withMiddlewaresAdded($middlewareDefinitions);
}

private function handle(MessageInterface $message): bool
{
$this->worker->process($message, $this);

return $this->loop->canContinue();
}

private function createPushHandler(): MessageHandlerPushInterface
{
return new class (
$this->finalPushHandler,
$this->pushMiddlewareDispatcher,
$this->middlewareDefinitions,
) implements MessageHandlerPushInterface {
public function __construct(
/**
* @var MessageHandlerPushInterface $finishHandler Final handler invoked after all middlewares are
* processed.
*/
private readonly MessageHandlerPushInterface $finishHandler,
private readonly PushMiddlewareDispatcher $dispatcher,
/**
* @var array|array[]|callable[]|MiddlewarePushInterface[]|string[]
*/
private readonly array $middlewares,
) {}

public function handlePush(MessageInterface $message): MessageInterface
{
return $this->dispatcher
->withMiddlewares($this->middlewares)
->dispatch($message, $this->finishHandler);
}
};
}

private function createFinalPushHandler(): MessageHandlerPushInterface
{
return $this->isSynchronous()
Expand Down
3 changes: 2 additions & 1 deletion tests/Integration/MiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public function testFullStackPush(): void
'channel 1',
'channel 2',
'channel 3',
'channel 4',
];

$pushMiddlewareDispatcher = new PushMiddlewareDispatcher(
Expand All @@ -66,7 +67,7 @@ public function testFullStackPush(): void
$queue = $queue
->withMiddlewares(new TestMiddleware('Won\'t be executed'))
->withMiddlewares(new TestMiddleware('channel 1'), new TestMiddleware('channel 2'))
->withMiddlewaresAdded(new TestMiddleware('channel 3'));
->withMiddlewaresAdded(new TestMiddleware('channel 3'), new TestMiddleware('channel 4'));

$message = new Message('test', ['initial']);
$messagePushed = $queue->push($message);
Expand Down
Loading