Skip to content

Commit 3ed8edb

Browse files
authored
Added feature and covered: Stream responses (#51)
- Added non-blocking streams gzip encoder - Added support for PRS7 responses
1 parent bcda456 commit 3ed8edb

9 files changed

Lines changed: 405 additions & 128 deletions

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"react/socket": "^1.0",
2222
"react/promise": "^2.7",
2323
"react/filesystem": "^0.1",
24+
"clue/zlib-react": "^0.2.2",
2425
"drift/http-kernel": "0.1.*, >=0.1.3",
2526
"drift/console-bridge": "0.1.*",
2627
"drift/event-loop-utils": "0.1.*",

src/Application.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ function (ServerRequestInterface $request) use ($kernel, $requestHandler, $files
212212

213213
? $requestHandler
214214
->handleStaticResource(
215+
$request,
215216
$filesystem,
216217
$this->rootPath,
217218
$uriPath

src/RequestHandler.php

Lines changed: 155 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,22 @@
2626
* @author Marc Morera <yuhu@mmoreram.com>
2727
*/
2828

29+
use Clue\React\Zlib\Compressor;
2930
use Drift\Console\OutputPrinter;
3031
use Drift\Console\TimeFormatter;
3132
use Drift\HttpKernel\AsyncKernel;
3233
use Drift\Server\Mime\MimeTypeChecker;
3334
use function React\Promise\all;
3435
use function React\Promise\resolve;
3536
use Psr\Http\Message\ServerRequestInterface;
37+
use Psr\Http\Message\StreamInterface;
3638
use Psr\Http\Message\UploadedFileInterface as PsrUploadedFile;
3739
use React\Filesystem\FilesystemInterface;
40+
use React\Http\Response as ReactResponse;
3841
use React\Promise\PromiseInterface;
42+
use React\Stream\ReadableStreamInterface;
43+
use React\Stream\ThroughStream;
44+
use RingCentral\Psr7\Response as PSRResponse;
3945
use Symfony\Component\HttpFoundation\File\UploadedFile as SymfonyUploadedFile;
4046
use Symfony\Component\HttpFoundation\Request;
4147
use Symfony\Component\HttpFoundation\Response;
@@ -104,76 +110,88 @@ public function handleAsyncServerRequest(
104110
$method,
105111
$uriPath
106112
)
107-
->then(function (Request $symfonyRequest) use ($kernel, $from, $uriPath, $method) {
108-
return all([
109-
resolve($symfonyRequest),
110-
$kernel->handleAsync($symfonyRequest),
111-
])
112-
->then(function (array $parts) use ($from) {
113-
list($symfonyRequest, $symfonyResponse) = $parts;
114-
115-
/*
116-
* We don't have to wait to this clean
117-
*/
118-
$this->cleanTemporaryUploadedFiles($symfonyRequest);
119-
120-
return $this->toServerResponse(
121-
$symfonyRequest,
122-
$symfonyResponse,
123-
$from
124-
);
125-
}, function (\Throwable $exception) use ($from, $method, $uriPath) {
126-
return $this->createExceptionServerResponse(
127-
$exception,
128-
$from,
129-
$method,
130-
$uriPath
131-
);
113+
->then(function (Request $symfonyRequest) use ($kernel, $from, $uriPath, $method) {
114+
return all([
115+
resolve($symfonyRequest),
116+
$kernel->handleAsync($symfonyRequest),
117+
])
118+
->then(function (array $parts) use ($from) {
119+
list($symfonyRequest, $symfonyResponse) = $parts;
120+
121+
/*
122+
* We don't have to wait to this clean
123+
*/
124+
$this->cleanTemporaryUploadedFiles($symfonyRequest);
125+
126+
return $this->toServerResponse(
127+
$symfonyRequest,
128+
$symfonyResponse,
129+
$from
130+
);
131+
}, function (\Throwable $exception) use ($from, $method, $uriPath) {
132+
return $this->createExceptionServerResponse(
133+
$exception,
134+
$from,
135+
$method,
136+
$uriPath
137+
);
138+
});
132139
});
133-
});
134140
}
135141

136142
/**
137143
* Handle static resource.
138144
*
139-
* @param FilesystemInterface $filesystem
140-
* @param string $rootPath
141-
* @param string $resourcePath
145+
* @param ServerRequestInterface $request
146+
* @param FilesystemInterface $filesystem
147+
* @param string $rootPath
148+
* @param string $resourcePath
149+
* @param string|null $acceptedContentHeader
142150
*
143151
* @return PromiseInterface
144152
*/
145153
public function handleStaticResource(
154+
ServerRequestInterface $request,
146155
FilesystemInterface $filesystem,
147156
string $rootPath,
148157
string $resourcePath
149158
): PromiseInterface {
150159
$from = microtime(true);
151160

152161
return $filesystem
153-
->getContents($rootPath.$resourcePath)
154-
->then(function ($content) use ($rootPath, $resourcePath, $from) {
155-
$to = microtime(true);
162+
->file($rootPath.$resourcePath)
163+
->open('r')
164+
->then(function (ReadableStreamInterface $stream) use ($rootPath, $resourcePath, $from, $request) {
156165
$mimeType = $this
157166
->mimetypeChecker
158-
->getMimeType($rootPath.$resourcePath);
167+
->getMimeType($resourcePath);
159168

160-
return new ServerResponseWithMessage(
161-
new \React\Http\Response(
162-
Response::HTTP_OK,
163-
['Content-Type' => $mimeType],
164-
$content
165-
),
166-
$this->outputPrinter,
167-
new ConsoleStaticMessage(
168-
$resourcePath,
169-
TimeFormatter::formatTime($to - $from)
170-
)
169+
$response = new ReactResponse(
170+
Response::HTTP_OK,
171+
['Content-Type' => $mimeType],
172+
$stream
171173
);
172-
}, function (Throwable $exception) use ($resourcePath, $from) {
174+
175+
return $this
176+
->applyResponseEncoding($response, $request->getHeaderLine('Accept-Encoding'))
177+
->then(function (PSRResponse $response) use ($resourcePath, $from) {
178+
$to = microtime(true);
179+
180+
return new ServerResponseWithMessage(
181+
$response,
182+
$this->outputPrinter,
183+
new ConsoleStaticMessage(
184+
$resourcePath,
185+
TimeFormatter::formatTime($to - $from)
186+
)
187+
);
188+
});
189+
})
190+
->otherwise(function (Throwable $exception) use ($resourcePath, $from) {
173191
$to = microtime(true);
174192

175193
return new ServerResponseWithMessage(
176-
new \React\Http\Response(
194+
new ReactResponse(
177195
Response::HTTP_NOT_FOUND,
178196
[],
179197
''
@@ -242,49 +260,43 @@ private function toSymfonyRequest(
242260
* @param Response $symfonyResponse
243261
* @param float $from
244262
*
245-
* @return ServerResponseWithMessage
263+
* @return PromiseInterface<ServerResponseWithMessage>
246264
*/
247265
private function toServerResponse(
248266
Request $symfonyRequest,
249-
Response $symfonyResponse,
267+
$response,
250268
float $from
251-
): ServerResponseWithMessage {
252-
$to = microtime(true);
253-
254-
$nonEncodedContent = $symfonyResponse->getContent();
255-
$this->applyResponseEncoding(
256-
$symfonyRequest,
257-
$symfonyResponse
258-
);
259-
260-
if ($symfonyResponse->getStatusCode() >= 400) {
261-
$nonEncodedContent = 'Error returned';
262-
if (404 == $symfonyResponse->getStatusCode()) {
263-
$nonEncodedContent = 'Route not found';
264-
}
269+
): PromiseInterface {
270+
if ($response instanceof Response) {
271+
$response = new PSRResponse(
272+
$response->getStatusCode(),
273+
$response->headers->all(),
274+
$response->getContent()
275+
);
265276
}
266277

267-
$serverResponse =
268-
new ServerResponseWithMessage(
269-
new \React\Http\Response(
270-
$symfonyResponse->getStatusCode(),
271-
$symfonyResponse->headers->all(),
272-
$symfonyResponse->getContent()
273-
),
274-
$this->outputPrinter,
275-
new ConsoleRequestMessage(
276-
$symfonyRequest->getPathInfo(),
277-
$symfonyRequest->getMethod(),
278-
$symfonyResponse->getStatusCode(),
279-
$nonEncodedContent,
280-
TimeFormatter::formatTime($to - $from)
281-
)
282-
);
278+
return $this
279+
->applyResponseEncoding($response, $symfonyRequest->headers->get('Accept-Encoding'))
280+
->then(function (PSRResponse $response) use ($symfonyRequest, $from) {
281+
$to = microtime(true);
282+
$serverResponse =
283+
new ServerResponseWithMessage(
284+
$response,
285+
$this->outputPrinter,
286+
new ConsoleRequestMessage(
287+
$symfonyRequest->getPathInfo(),
288+
$symfonyRequest->getMethod(),
289+
$response->getStatusCode(),
290+
'',
291+
TimeFormatter::formatTime($to - $from)
292+
)
293+
);
283294

284-
$symfonyRequest = null;
285-
$symfonyResponse = null;
295+
$symfonyRequest = null;
296+
$symfonyResponse = null;
286297

287-
return $serverResponse;
298+
return $serverResponse;
299+
});
288300
}
289301

290302
/**
@@ -314,7 +326,7 @@ private function createExceptionServerResponse(
314326

315327
$serverResponse =
316328
new ServerResponseWithMessage(
317-
new \React\Http\Response(
329+
new ReactResponse(
318330
$code,
319331
['Content-Type' => 'text/plain'],
320332
$exception->getMessage()
@@ -338,38 +350,76 @@ private function createExceptionServerResponse(
338350
/**
339351
* Apply response encoding.
340352
*
341-
* @param Request $request
342-
* @param Response $response
353+
* @param PSRResponse $response
354+
* @param string|null $acceptEncodingHeader
355+
*
356+
* @return PromiseInterface
343357
*/
344358
private function applyResponseEncoding(
345-
Request $request,
346-
Response $response
347-
) {
348-
$allowedCompressionAsString = $request
349-
->headers
350-
->get('Accept-Encoding');
351-
352-
if (!$allowedCompressionAsString) {
353-
return;
359+
PSRResponse $response,
360+
?string $acceptEncodingHeader
361+
): PromiseInterface {
362+
if (!$acceptEncodingHeader) {
363+
return resolve($response);
354364
}
355-
$allowedCompression = explode(',', $allowedCompressionAsString);
365+
366+
$allowedCompression = explode(',', $acceptEncodingHeader);
356367
$allowedCompression = array_map('trim', $allowedCompression);
357368
if (in_array('gzip', $allowedCompression)) {
358-
$response->setContent(gzencode($response->getContent()));
359-
$response
360-
->headers
361-
->set('Content-Encoding', 'gzip');
362-
363-
return;
369+
return $this->compressResponse($response, 'gzip');
364370
}
371+
365372
if (in_array('deflate', $allowedCompression)) {
366-
$response->setContent(gzdeflate($response->getContent()));
367-
$response
368-
->headers
369-
->set('Content-Encoding', 'deflate');
373+
return $this->compressResponse($response, 'deflate');
374+
}
370375

371-
return;
376+
return resolve($response);
377+
}
378+
379+
/**
380+
* Apply compression to response.
381+
*
382+
* @param PSRResponse $response
383+
* @param string $compression
384+
*
385+
* @return PromiseInterface
386+
*/
387+
private function compressResponse(
388+
PSRResponse $response,
389+
string $compression
390+
): PromiseInterface {
391+
$body = $response->getBody();
392+
$response = $response->withHeader('Content-Encoding', $compression);
393+
394+
if ($body instanceof ReadableStreamInterface) {
395+
$compressedStream = new ThroughStream();
396+
$compressionStrategy = 'gzip' === $compression
397+
? ZLIB_ENCODING_GZIP
398+
: ZLIB_ENCODING_RAW;
399+
$compressor = new Compressor($compressionStrategy);
400+
$body->pipe($compressor)->pipe($compressedStream);
401+
402+
return resolve(new ReactResponse(
403+
$response->getStatusCode(),
404+
$response->getHeaders(),
405+
$compressedStream
406+
));
372407
}
408+
409+
if ($body instanceof StreamInterface) {
410+
$compressionMethod = 'gzip' === $compression
411+
? 'gzencode'
412+
: 'gzdeflate';
413+
$content = $body->getContents();
414+
415+
return resolve(new ReactResponse(
416+
$response->getStatusCode(),
417+
$response->getHeaders(),
418+
$compressionMethod($content)
419+
));
420+
}
421+
422+
return resolve($response);
373423
}
374424

375425
/**

src/ServerResponseWithMessage.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
namespace Drift\Server;
1717

1818
use Drift\Console\OutputPrinter;
19-
use React\Http\Response;
19+
use RingCentral\Psr7\Response;
2020

2121
/**
2222
* Class ServerResponseWithMessage.

tests/ApplicationTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public function testRegular()
4040

4141
$process->start();
4242
usleep(500000);
43-
Utils::curl("http://127.0.0.1:$port/valid/query?code=200");
43+
Utils::curl("http://127.0.0.1:$port/query?code=200");
4444
usleep(500000);
4545
$this->assertNotFalse(
4646
strpos(
@@ -52,7 +52,7 @@ public function testRegular()
5252
$this->assertNotFalse(
5353
strpos(
5454
$process->getOutput(),
55-
'/valid/query'
55+
'/query'
5656
)
5757
);
5858

0 commit comments

Comments
 (0)