Skip to content

Commit 3bcd121

Browse files
authored
Merge pull request reactphp#71 from clue-labs/cancellation
Support Promise cancellation for all connectors
2 parents 4ed32f2 + 64b7e2e commit 3bcd121

11 files changed

Lines changed: 280 additions & 31 deletions

README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@ $tcpConnector->create('127.0.0.1', 80)->then(function (React\Stream\Stream $stre
4545
$loop->run();
4646
```
4747

48+
Pending connection attempts can be cancelled by cancelling its pending promise like so:
49+
50+
```php
51+
$promise = $tcpConnector->create($host, $port);
52+
53+
$promise->cancel();
54+
```
55+
56+
Calling `cancel()` on a pending promise will close the underlying socket
57+
resource, thus cancelling the pending TCP/IP connection, and reject the
58+
resulting promise.
59+
4860
You can optionally pass additional
4961
[socket context options](http://php.net/manual/en/context.socket.php)
5062
to the constructor like this:
@@ -83,6 +95,17 @@ $dnsConnector->create('www.google.com', 80)->then(function (React\Stream\Stream
8395
$loop->run();
8496
```
8597

98+
Pending connection attempts can be cancelled by cancelling its pending promise like so:
99+
100+
```php
101+
$promise = $dnsConnector->create($host, $port);
102+
103+
$promise->cancel();
104+
```
105+
106+
Calling `cancel()` on a pending promise will cancel the underlying DNS lookup
107+
and/or the underlying TCP/IP connection and reject the resulting promise.
108+
86109
The legacy `Connector` class can be used for backwards-compatiblity reasons.
87110
It works very much like the newer `DnsConnector` but instead has to be
88111
set up like this:
@@ -112,6 +135,17 @@ $secureConnector->create('www.google.com', 443)->then(function (React\Stream\Str
112135
$loop->run();
113136
```
114137

138+
Pending connection attempts can be cancelled by cancelling its pending promise like so:
139+
140+
```php
141+
$promise = $secureConnector->create($host, $port);
142+
143+
$promise->cancel();
144+
```
145+
146+
Calling `cancel()` on a pending promise will cancel the underlying TCP/IP
147+
connection and/or the SSL/TLS negonation and reject the resulting promise.
148+
115149
You can optionally pass additional
116150
[SSL context options](http://php.net/manual/en/context.ssl.php)
117151
to the constructor like this:
@@ -138,6 +172,10 @@ $connector->create('/tmp/demo.sock')->then(function (React\Stream\Stream $stream
138172
$loop->run();
139173
```
140174

175+
Connecting to Unix domain sockets is an atomic operation, i.e. its promise will
176+
settle (either resolve or reject) immediately.
177+
As such, calling `cancel()` on the resulting promise has no effect.
178+
141179
## Install
142180

143181
The recommended way to install this library is [through Composer](http://getcomposer.org).

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"react/dns": "0.4.*|0.3.*",
99
"react/event-loop": "0.4.*|0.3.*",
1010
"react/stream": "0.4.*|0.3.*",
11-
"react/promise": "~2.0|~1.1"
11+
"react/promise": "^2.1 || ^1.2"
1212
},
1313
"autoload": {
1414
"psr-4": {

src/DnsConnector.php

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22

33
namespace React\SocketClient;
44

5-
use React\EventLoop\LoopInterface;
65
use React\Dns\Resolver\Resolver;
76
use React\Stream\Stream;
87
use React\Promise;
9-
use React\Promise\Deferred;
8+
use React\Promise\CancellablePromiseInterface;
109

1110
class DnsConnector implements ConnectorInterface
1211
{
@@ -21,12 +20,12 @@ public function __construct(ConnectorInterface $connector, Resolver $resolver)
2120

2221
public function create($host, $port)
2322
{
24-
$connector = $this->connector;
23+
$that = $this;
2524

2625
return $this
2726
->resolveHostname($host)
28-
->then(function ($address) use ($connector, $port) {
29-
return $connector->create($address, $port);
27+
->then(function ($ip) use ($that, $port) {
28+
return $that->connect($ip, $port);
3029
});
3130
}
3231

@@ -36,6 +35,49 @@ private function resolveHostname($host)
3635
return Promise\resolve($host);
3736
}
3837

39-
return $this->resolver->resolve($host);
38+
$promise = $this->resolver->resolve($host);
39+
40+
return new Promise\Promise(
41+
function ($resolve, $reject) use ($promise) {
42+
// resolve/reject with result of DNS lookup
43+
$promise->then($resolve, $reject);
44+
},
45+
function ($_, $reject) use ($promise) {
46+
// cancellation should reject connection attempt
47+
$reject(new \RuntimeException('Connection attempt cancelled during DNS lookup'));
48+
49+
// (try to) cancel pending DNS lookup
50+
if ($promise instanceof CancellablePromiseInterface) {
51+
$promise->cancel();
52+
}
53+
}
54+
);
55+
}
56+
57+
/** @internal */
58+
public function connect($ip, $port)
59+
{
60+
$promise = $this->connector->create($ip, $port);
61+
62+
return new Promise\Promise(
63+
function ($resolve, $reject) use ($promise) {
64+
// resolve/reject with result of TCP/IP connection
65+
$promise->then($resolve, $reject);
66+
},
67+
function ($_, $reject) use ($promise) {
68+
// cancellation should reject connection attempt
69+
$reject(new \RuntimeException('Connection attempt cancelled during TCP/IP connection'));
70+
71+
// forefully close TCP/IP connection if it completes despite cancellation
72+
$promise->then(function (Stream $stream) {
73+
$stream->close();
74+
});
75+
76+
// (try to) cancel pending TCP/IP connection
77+
if ($promise instanceof CancellablePromiseInterface) {
78+
$promise->cancel();
79+
}
80+
}
81+
);
4082
}
4183
}

src/SecureConnector.php

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React\EventLoop\LoopInterface;
66
use React\Stream\Stream;
77
use React\Promise;
8+
use React\Promise\CancellablePromiseInterface;
89

910
class SecureConnector implements ConnectorInterface
1011
{
@@ -39,7 +40,7 @@ public function create($host, $port)
3940
}
4041

4142
$encryption = $this->streamEncryption;
42-
return $this->connector->create($host, $port)->then(function (Stream $stream) use ($context, $encryption) {
43+
return $this->connect($host, $port)->then(function (Stream $stream) use ($context, $encryption) {
4344
// (unencrypted) TCP/IP connection succeeded
4445

4546
// set required SSL/TLS context options
@@ -55,4 +56,30 @@ public function create($host, $port)
5556
});
5657
});
5758
}
59+
60+
private function connect($host, $port)
61+
{
62+
$promise = $this->connector->create($host, $port);
63+
64+
return new Promise\Promise(
65+
function ($resolve, $reject) use ($promise) {
66+
// resolve/reject with result of TCP/IP connection
67+
$promise->then($resolve, $reject);
68+
},
69+
function ($_, $reject) use ($promise) {
70+
// cancellation should reject connection attempt
71+
$reject(new \RuntimeException('Connection attempt cancelled during TCP/IP connection'));
72+
73+
// forefully close TCP/IP connection if it completes despite cancellation
74+
$promise->then(function (Stream $stream) {
75+
$stream->close();
76+
});
77+
78+
// (try to) cancel pending TCP/IP connection
79+
if ($promise instanceof CancellablePromiseInterface) {
80+
$promise->cancel();
81+
}
82+
}
83+
);
84+
}
5885
}

src/StreamEncryption.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ public function toggle(Stream $stream, $toggle)
6666

6767
// TODO: add write() event to make sure we're not sending any excessive data
6868

69-
$deferred = new Deferred();
69+
$deferred = new Deferred(function ($_, $reject) use ($toggle) {
70+
// cancelling this leaves this stream in an inconsistent state…
71+
$reject(new \RuntimeException('Cancelled toggling encryption ' . $toggle ? 'on' : 'off'));
72+
});
7073

7174
// get actual stream socket from stream instance
7275
$socket = $stream->stream;
@@ -82,15 +85,18 @@ public function toggle(Stream $stream, $toggle)
8285
$wrap = $this->wrapSecure && $toggle;
8386
$loop = $this->loop;
8487

85-
return $deferred->promise()->then(function () use ($stream, $wrap, $loop) {
88+
return $deferred->promise()->then(function () use ($stream, $socket, $wrap, $loop) {
89+
$loop->removeReadStream($socket);
90+
8691
if ($wrap) {
8792
return new SecureStream($stream, $loop);
8893
}
8994

9095
$stream->resume();
9196

9297
return $stream;
93-
}, function($error) use ($stream) {
98+
}, function($error) use ($stream, $socket, $loop) {
99+
$loop->removeReadStream($socket);
94100
$stream->resume();
95101
throw $error;
96102
});
@@ -103,12 +109,8 @@ public function toggleCrypto($socket, Deferred $deferred, $toggle)
103109
restore_error_handler();
104110

105111
if (true === $result) {
106-
$this->loop->removeReadStream($socket);
107-
108112
$deferred->resolve();
109113
} else if (false === $result) {
110-
$this->loop->removeReadStream($socket);
111-
112114
$deferred->reject(new UnexpectedValueException(
113115
sprintf("Unable to complete SSL/TLS handshake: %s", $this->errstr),
114116
$this->errno

src/TcpConnector.php

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,20 @@ public function create($ip, $port)
5555

5656
private function waitForStreamOnce($stream)
5757
{
58-
$deferred = new Deferred();
59-
6058
$loop = $this->loop;
6159

62-
$this->loop->addWriteStream($stream, function ($stream) use ($loop, $deferred) {
60+
return new Promise\Promise(function ($resolve) use ($loop, $stream) {
61+
$loop->addWriteStream($stream, function ($stream) use ($loop, $resolve) {
62+
$loop->removeWriteStream($stream);
63+
64+
$resolve($stream);
65+
});
66+
}, function () use ($loop, $stream) {
6367
$loop->removeWriteStream($stream);
68+
fclose($stream);
6469

65-
$deferred->resolve($stream);
70+
throw new \RuntimeException('Cancelled while waiting for TCP/IP connection to be established');
6671
});
67-
68-
return $deferred->promise();
6972
}
7073

7174
/** @internal */

tests/DnsConnectorTest.php

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ public function setUp()
2222
public function testPassByResolverIfGivenIp()
2323
{
2424
$this->resolver->expects($this->never())->method('resolve');
25-
$this->tcp->expects($this->once())->method('create')->with($this->equalTo('127.0.0.1'), $this->equalTo(80));
25+
$this->tcp->expects($this->once())->method('create')->with($this->equalTo('127.0.0.1'), $this->equalTo(80))->will($this->returnValue(Promise\reject()));
2626

2727
$this->connector->create('127.0.0.1', 80);
2828
}
2929

3030
public function testPassThroughResolverIfGivenHost()
3131
{
3232
$this->resolver->expects($this->once())->method('resolve')->with($this->equalTo('google.com'))->will($this->returnValue(Promise\resolve('1.2.3.4')));
33-
$this->tcp->expects($this->once())->method('create')->with($this->equalTo('1.2.3.4'), $this->equalTo(80));
33+
$this->tcp->expects($this->once())->method('create')->with($this->equalTo('1.2.3.4'), $this->equalTo(80))->will($this->returnValue(Promise\reject()));
3434

3535
$this->connector->create('google.com', 80);
3636
}
@@ -42,4 +42,46 @@ public function testSkipConnectionIfDnsFails()
4242

4343
$this->connector->create('example.invalid', 80);
4444
}
45+
46+
public function testCancelDuringDnsCancelsDnsAndDoesNotStartTcpConnection()
47+
{
48+
$pending = new Promise\Promise(function () { }, $this->expectCallableOnce());
49+
$this->resolver->expects($this->once())->method('resolve')->with($this->equalTo('example.com'))->will($this->returnValue($pending));
50+
$this->tcp->expects($this->never())->method('resolve');
51+
52+
$promise = $this->connector->create('example.com', 80);
53+
$promise->cancel();
54+
55+
$promise->then($this->expectCallableNever(), $this->expectCallableOnce());
56+
}
57+
58+
public function testCancelDuringTcpConnectionCancelsTcpConnection()
59+
{
60+
$pending = new Promise\Promise(function () { }, $this->expectCallableOnce());
61+
$this->resolver->expects($this->once())->method('resolve')->with($this->equalTo('example.com'))->will($this->returnValue(Promise\resolve('1.2.3.4')));
62+
$this->tcp->expects($this->once())->method('create')->with($this->equalTo('1.2.3.4'), $this->equalTo(80))->will($this->returnValue($pending));
63+
64+
$promise = $this->connector->create('example.com', 80);
65+
$promise->cancel();
66+
67+
$promise->then($this->expectCallableNever(), $this->expectCallableOnce());
68+
}
69+
70+
public function testCancelClosesStreamIfTcpResolvesDespiteCancellation()
71+
{
72+
$stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
73+
$stream->expects($this->once())->method('close');
74+
75+
$pending = new Promise\Promise(function () { }, function ($resolve) use ($stream) {
76+
$resolve($stream);
77+
});
78+
79+
$this->resolver->expects($this->once())->method('resolve')->with($this->equalTo('example.com'))->will($this->returnValue(Promise\resolve('1.2.3.4')));
80+
$this->tcp->expects($this->once())->method('create')->with($this->equalTo('1.2.3.4'), $this->equalTo(80))->will($this->returnValue($pending));
81+
82+
$promise = $this->connector->create('example.com', 80);
83+
$promise->cancel();
84+
85+
$promise->then($this->expectCallableNever(), $this->expectCallableOnce());
86+
}
4587
}

0 commit comments

Comments
 (0)