diff --git a/CONTRIBUTORS.rst b/CONTRIBUTORS.rst index 29cd4797..91617947 100644 --- a/CONTRIBUTORS.rst +++ b/CONTRIBUTORS.rst @@ -17,3 +17,4 @@ In chronological order: - Alek Storm (@alekstorm) - Implemented Python 2.7 support. + - Implemented server push. diff --git a/docs/source/advanced.rst b/docs/source/advanced.rst index e721b0e2..504bfe25 100644 --- a/docs/source/advanced.rst +++ b/docs/source/advanced.rst @@ -108,3 +108,55 @@ Note that we don't plug an instance of the class in, we plug the class itself in. We do this because the connection object will spawn instances of the class in order to manage the flow control windows of streams in addition to managing the window of the connection itself. + +.. _server-push: + +Server Push +----------- + +HTTP/2.0 provides servers with the ability to "push" additional resources to +clients in response to a request, as if the client had requested the resources +themselves. When minimizing the number of round trips is more critical than +maximizing bandwidth usage, this can be a significant performance improvement. + +Servers may declare their intention to push a given resource by sending the +headers and other metadata of a request that would return that resource - this +is referred to as a "push promise". They may do this before sending the response +headers for the original request, after, or in the middle of sending the +response body. + +In order to receive pushed resources, the +:class:`HTTP20Connection ` object must be constructed +with ``enable_push=True``. + +You may retrieve the push promises that the server has sent *so far* by calling +:meth:`getpushes() `, which returns a +generator that yields :class:`HTTP20Push ` objects. Note that +this method is not idempotent; promises returned in one call will not be +returned in subsequent calls. If ``capture_all=False`` is passed (the default), +the generator will yield all buffered push promises without blocking. However, +if ``capture_all=True`` is passed, the generator will first yield all buffered +push promises, then yield additional ones as they arrive, and terminate when the +original stream closes. Using this parameter is only recommended when it is +known that all pushed streams, or a specific one, are of higher priority than +the original response, or when also processing the original response in a +separate thread (N.B. do not do this; ``hyper`` is not yet thread-safe):: + + conn.request('GET', '/') + response = conn.getheaders() + for push in conn.getpushes(): # all pushes promised before response headers + print(push.path) + conn.read() + for push in conn.getpushes(): # all other pushes + print(push.path) + +To cancel an in-progress pushed stream (for example, if the user already has +the given path in cache), call +:meth:`HTTP20Push.cancel() `. + +``hyper`` does not currently verify that pushed resources comply with the +Same-Origin Policy, so users must take care that they do not treat pushed +resources as authoritative without performing this check themselves (since +the server push mechanism is only an optimization, and clients are free to +issue requests for any pushed resources manually, there is little downside to +simply ignoring suspicious ones). diff --git a/docs/source/api.rst b/docs/source/api.rst index 42d53548..867cd676 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -16,6 +16,9 @@ Primary HTTP/2.0 Interface .. autoclass:: hyper.HTTP20Response :inherited-members: +.. autoclass:: hyper.HTTP20Push + :inherited-members: + Requests Transport Adapter -------------------------- diff --git a/docs/source/faq.rst b/docs/source/faq.rst index 1fae9c25..55b6176d 100644 --- a/docs/source/faq.rst +++ b/docs/source/faq.rst @@ -29,15 +29,7 @@ It should! If you find it doesn't, that's a bug: please `report it on GitHub`_. Does ``hyper`` support Server Push? ----------------------------------- -No, and I don't think it ever will directly. Support for Server Push -effectively *mandates* a multithreaded or event-loop based programming style, -which is incompatible with most current Python HTTP code. For that reason, -``hyper``'s default API is unlikely to ever allow Server Push. - -However, there's no reason the underlying framing and stream layers couldn't -support it. If ``hyper`` ever grows a server implementation or a fully -event-loop based implementation, I'll revisit the decision not to support -Server Push. +Yes! See :ref:`server-push`. I hit a bug! What should I do? ------------------------------ diff --git a/hyper/__init__.py b/hyper/__init__.py index a726be7b..d4dc04f9 100644 --- a/hyper/__init__.py +++ b/hyper/__init__.py @@ -9,14 +9,14 @@ __version__ = '0.0.4' from .http20.connection import HTTP20Connection -from .http20.response import HTTP20Response +from .http20.response import HTTP20Response, HTTP20Push # Throw import errors on Python <2.7 and 3.0-3.2. import sys as _sys if _sys.version_info < (2,7) or (3,0) <= _sys.version_info < (3,3): raise ImportError("hyper only supports Python 2.7 and Python 3.3 or higher.") -__all__ = [HTTP20Response, HTTP20Connection] +__all__ = [HTTP20Response, HTTP20Push, HTTP20Connection] # Set default logging handler. import logging diff --git a/hyper/http20/connection.py b/hyper/http20/connection.py index adbb57b4..aa7dde97 100644 --- a/hyper/http20/connection.py +++ b/hyper/http20/connection.py @@ -9,12 +9,14 @@ from .stream import Stream from .tls import wrap_socket from .frame import ( - DataFrame, HeadersFrame, SettingsFrame, Frame, WindowUpdateFrame, - GoAwayFrame + DataFrame, HeadersFrame, PushPromiseFrame, RstStreamFrame, SettingsFrame, + Frame, WindowUpdateFrame, GoAwayFrame ) +from .response import HTTP20Response, HTTP20Push from .window import FlowControlManager from .exceptions import ConnectionError +import errno import logging import socket @@ -42,8 +44,11 @@ class HTTP20Connection(object): If not provided, :class:`FlowControlManager ` will be used. + :param enable_push: Whether the server is allowed to push resources to the + client (see :meth:`getpushes() `). """ - def __init__(self, host, port=None, window_manager=None, **kwargs): + def __init__(self, host, port=None, window_manager=None, enable_push=False, + **kwargs): """ Creates an HTTP/2.0 connection to a specific server. """ @@ -56,6 +61,8 @@ def __init__(self, host, port=None, window_manager=None, **kwargs): else: self.host, self.port = host, port + self._enable_push = enable_push + # Create the mutable state. self.__wm_class = window_manager or FlowControlManager self.__init_state() @@ -134,6 +141,10 @@ def request(self, method, url, body=None, headers={}): return stream_id + def _get_stream(self, stream_id): + return (self.streams[stream_id] if stream_id is not None + else self.recent_stream) + def getresponse(self, stream_id=None): """ Should be called after a request is sent to get a response from the @@ -146,9 +157,27 @@ def getresponse(self, stream_id=None): get a response. :returns: A HTTP response object. """ - stream = (self.streams[stream_id] if stream_id is not None - else self.recent_stream) - return stream.getresponse() + stream = self._get_stream(stream_id) + return HTTP20Response(stream.getheaders(), stream) + + def getpushes(self, stream_id=None, capture_all=False): + """ + Returns a generator that yields push promises from the server. Note that + this method is not idempotent; promises returned in one call will not be + returned in subsequent calls. Iterating through generators returned by + multiple calls to this method simultaneously results in undefined + behavior. + + :param stream_id: (optional) The stream ID of the request for which to + get push promises. + :param capture_all: If ``False``, the generator will yield all buffered + push promises without blocking. If ``True``, the generator will + first yield all buffered push promises, then yield additional ones + as they arrive, and terminate when the original stream closes. + """ + stream = self._get_stream(stream_id) + for promised_stream_id, headers in stream.getpushes(capture_all): + yield HTTP20Push(headers, self.streams[promised_stream_id]) def connect(self): """ @@ -166,7 +195,7 @@ def connect(self): # connection, followed by an initial settings frame. sock.send(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n') f = SettingsFrame(0) - f.settings[SettingsFrame.ENABLE_PUSH] = 0 + f.settings[SettingsFrame.ENABLE_PUSH] = int(self._enable_push) self._send_cb(f) # The server will also send an initial settings frame, so get it. @@ -209,7 +238,6 @@ def putrequest(self, method, selector, **kwargs): s.add_header(":path", selector) # Save the stream. - self.streams[s.stream_id] = s self.recent_stream = s return s.stream_id @@ -229,9 +257,7 @@ def putheader(self, header, argument, stream_id=None): header to. :returns: Nothing. """ - stream = (self.streams[stream_id] if stream_id is not None - else self.recent_stream) - + stream = self._get_stream(stream_id) stream.add_header(header, argument) return @@ -256,8 +282,7 @@ def endheaders(self, message_body=None, final=False, stream_id=None): """ self.connect() - stream = (self.streams[stream_id] if stream_id is not None - else self.recent_stream) + stream = self._get_stream(stream_id) # Close this if we've been told no more data is coming and we don't # have any to send. @@ -283,9 +308,7 @@ def send(self, data, final=False, stream_id=None): data on. :returns: Nothing. """ - stream = (self.streams[stream_id] if stream_id is not None - else self.recent_stream) - + stream = self._get_stream(stream_id) stream.send_data(data, final) return @@ -340,27 +363,33 @@ def _update_settings(self, frame): self._settings[SettingsFrame.INITIAL_WINDOW_SIZE] = newsize - def _new_stream(self): + def _new_stream(self, stream_id=None, local_closed=False): """ Returns a new stream object for this connection. """ window_size = self._settings[SettingsFrame.INITIAL_WINDOW_SIZE] s = Stream( - self.next_stream_id, self._send_cb, self._recv_cb, + stream_id or self.next_stream_id, self._send_cb, self._recv_cb, self._close_stream, self.encoder, self.decoder, - self.__wm_class(window_size) + self.__wm_class(window_size), local_closed ) + self.streams[s.stream_id] = s self.next_stream_id += 2 return s - def _close_stream(self, stream_id): + def _close_stream(self, stream_id, error_code=None): """ Called by a stream when it would like to be 'closed'. """ + if error_code is not None: + f = RstStreamFrame(stream_id) + f.error_code = error_code + self._send_cb(f) + del self.streams[stream_id] - def _send_cb(self, frame): + def _send_cb(self, frame, tolerate_peer_gone=False): """ This is the callback used by streams to send data on the connection. @@ -387,7 +416,11 @@ def _send_cb(self, frame): frame.stream_id ) - self._sock.send(data) + try: + self._sock.send(data) + except socket.error as e: + if not tolerate_peer_gone or e.errno not in (errno.EPIPE, errno.ECONNRESET): + raise def _adjust_receive_window(self, frame_len): """ @@ -399,7 +432,7 @@ def _adjust_receive_window(self, frame_len): if increment: f = WindowUpdateFrame(0) f.window_increment = increment - self._send_cb(f) + self._send_cb(f, True) return @@ -440,6 +473,17 @@ def _recv_cb(self): # Inform the WindowManager of how much data was received. If the # manager tells us to increment the window, do so. self._adjust_receive_window(len(frame.data)) + elif isinstance(frame, PushPromiseFrame): + if self._enable_push: + self._new_stream(frame.promised_stream_id, local_closed=True) + else: + # Servers are forbidden from sending push promises when + # the ENABLE_PUSH setting is 0, but the spec leaves the client + # action undefined when they do it anyway. So we just refuse + # the stream and go about our business. + f = RstStreamFrame(frame.promised_stream_id) + f.error_code = 7 # REFUSED_STREAM + self._send_cb(f) # Work out to whom this frame should go. if frame.stream_id != 0: diff --git a/hyper/http20/frame.py b/hyper/http20/frame.py index 99244112..c643cb75 100644 --- a/hyper/http20/frame.py +++ b/hyper/http20/frame.py @@ -221,14 +221,19 @@ class PushPromiseFrame(Frame): """ The PUSH_PROMISE frame is used to notify the peer endpoint in advance of streams the sender intends to initiate. - - Right now hyper doesn't support these, so we treat the body data as totally - opaque, along with the flags. """ + defined_flags = [('END_PUSH_PROMISE', 0x01)] + type = 0x05 - def __init__(self, stream_id): - raise NotImplementedError("hyper doesn't support server push") + def serialize(self): + data = self.build_frame_header(len(self.data) + 4) + data += struct.pack("!L", self.promised_stream_id) + return b''.join([data, self.data]) + + def parse_body(self, data): + self.promised_stream_id, = struct.unpack("!L", data[:4]) + self.data = data[4:] class PingFrame(Frame): diff --git a/hyper/http20/response.py b/hyper/http20/response.py index 3fee7317..c5e6f427 100644 --- a/hyper/http20/response.py +++ b/hyper/http20/response.py @@ -8,6 +8,8 @@ """ import zlib +from .util import pop_from_key_value_set + class DeflateDecoder(object): """ @@ -49,6 +51,22 @@ def decompress(self, data): self._data = None +class Headers(object): + def __init__(self, pairs): + # This conversion to dictionary is unwise, as there may be repeated + # keys, but it's acceptable for an early alpha. + self._headers = dict(pairs) + + def getheader(self, name, default=None): + return self._headers.get(name, default) + + def getheaders(self): + return list(self._headers.items()) + + def items(self): + return self._headers.items() + + class HTTP20Response(object): """ An ``HTTP20Response`` wraps the HTTP/2.0 response from the server. It @@ -62,15 +80,14 @@ def __init__(self, headers, stream): #: HTTP/2.0, and so is always the empty string. self.reason = '' - # The response headers. These are determined upon creation, assigned - # once, and never assigned again. - # This conversion to dictionary is unwise, as there may be repeated - # keys, but it's acceptable for an early alpha. - self._headers = dict(headers) + status = pop_from_key_value_set(headers, ':status')[0] #: The status code returned by the server. - self.status = int(self._headers[':status']) - del self._headers[':status'] + self.status = int(status) + + # The response headers. These are determined upon creation, assigned + # once, and never assigned again. + self._headers = Headers(headers) # The stream this response is being sent over. self._stream = stream @@ -85,9 +102,9 @@ def __init__(self, headers, stream): # This 16 + MAX_WBITS nonsense is to force gzip. See this # Stack Overflow answer for more: # http://stackoverflow.com/a/2695466/1401686 - if self._headers.get('content-encoding', '') == 'gzip': + if self._headers.getheader('content-encoding') == 'gzip': self._decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS) - elif self._headers.get('content-encoding', '') == 'deflate': + elif self._headers.getheader('content-encoding') == 'deflate': self._decompressobj = DeflateDecoder() else: self._decompressobj = None @@ -145,7 +162,7 @@ def getheader(self, name, default=None): :param default: (optional) The return value if the header wasn't sent. :returns: The value of the header. """ - return self._headers.get(name, default) + return self._headers.getheader(name, default) def getheaders(self): """ @@ -177,3 +194,61 @@ def __enter__(self): def __exit__(self, *args): self.close() return False # Never swallow exceptions. + + +class HTTP20Push(object): + """ + Represents a request-response pair sent by the server through the server + push mechanism. + """ + def __init__(self, request_headers, stream): + scheme, method, authority, path = ( + pop_from_key_value_set(request_headers, + ':scheme', ':method', ':authority', ':path') + ) + #: The scheme of the simulated request + self.scheme = scheme + #: The method of the simulated request (must be safe and cacheable, e.g. GET) + self.method = method + #: The authority of the simulated request (usually host:port) + self.authority = authority + #: The path of the simulated request + self.path = path + + self._request_headers = Headers(request_headers) + self._stream = stream + + def getrequestheader(self, name, default=None): + """ + Return the value of the simulated request header ``name``, or ``default`` + if there is no header matching ``name``. If there is more than one header + with the value ``name``, return all of the values joined by ', '. If + ``default`` is any iterable other than a single string, its elements are + similarly returned joined by commas. + + :param name: The name of the header to get the value of. + :param default: (optional) The return value if the header wasn't sent. + :returns: The value of the header. + """ + return self._request_headers.getheader(name, default) + + def getrequestheaders(self): + """ + Get all the simulated request headers. + + :returns: A list of (header, value) tuples. + """ + return self._request_headers.getheaders() + + def getresponse(self): + """ + Returns an :class:`HTTP20Response` object representing the pushed + response. + """ + return HTTP20Response(self._stream.getheaders(), self._stream) + + def cancel(self): + """ + Cancel the pushed response and close the stream. + """ + self._stream.close(8) # CANCEL diff --git a/hyper/http20/stream.py b/hyper/http20/stream.py index 2f97b27e..8b42bed0 100644 --- a/hyper/http20/stream.py +++ b/hyper/http20/stream.py @@ -14,10 +14,9 @@ the stream by the endpoint that initiated the stream. """ from .frame import ( - FRAME_MAX_LEN, HeadersFrame, DataFrame, WindowUpdateFrame, + FRAME_MAX_LEN, HeadersFrame, DataFrame, PushPromiseFrame, WindowUpdateFrame, ContinuationFrame, ) -from .response import HTTP20Response from .util import get_from_key_value_set import collections @@ -52,13 +51,27 @@ def __init__(self, close_cb, header_encoder, header_decoder, - window_manager): + window_manager, + local_closed=False): self.stream_id = stream_id - self.state = STATE_IDLE + self.state = STATE_HALF_CLOSED_LOCAL if local_closed else STATE_IDLE self.headers = [] + # Set to a key-value set of the response headers once their + # HEADERS..CONTINUATION frame sequence finishes. self.response_headers = None + # A dict mapping the promised stream ID of a pushed resource to a + # key-value set of its request headers. Entries are added once their + # PUSH_PROMISE..CONTINUATION frame sequence finishes. + self.promised_headers = {} + # Chunks of encoded header data from the current + # (HEADERS|PUSH_PROMISE)..CONTINUATION frame sequence. Since sending any + # frame other than a CONTINUATION is disallowed while a header block is + # being transmitted, this and ``promised_stream_id`` are the only pieces + # of state we have to track. self.header_data = [] + self.promised_stream_id = None + # Unconsumed response data chunks. Empties after every call to _read(). self.data = [] # There are two flow control windows: one for data we're sending, @@ -125,10 +138,6 @@ def _remote_closed(self): def _local_open(self): return self.state in (STATE_OPEN, STATE_HALF_CLOSED_REMOTE) - @property - def _remote_open(self): - return self.state in (STATE_OPEN, STATE_HALF_CLOSED_LOCAL) - def _close_local(self): self.state = ( STATE_HALF_CLOSED_LOCAL if self.state == STATE_OPEN @@ -146,11 +155,6 @@ def _read(self, amt=None): Read data from the stream. Unlike a normal read behaviour, this function returns _at least_ ``amt`` data, but may return more. """ - if self.state == STATE_CLOSED: - return b'' - - assert self._remote_open - def listlen(list): return sum(map(len, list)) @@ -171,7 +175,16 @@ def receive_frame(self, frame): if isinstance(frame, WindowUpdateFrame): self._out_flow_control_window += frame.window_increment - elif isinstance(frame, (HeadersFrame, ContinuationFrame)): + elif isinstance(frame, HeadersFrame): + # Begin the header block for the response headers. + self.promised_stream_id = None + self.header_data = [frame.data] + elif isinstance(frame, PushPromiseFrame): + # Begin a header block for the request headers of a pushed resource. + self.promised_stream_id = frame.promised_stream_id + self.header_data = [frame.data] + elif isinstance(frame, ContinuationFrame): + # Continue a header block begun with either HEADERS or PUSH_PROMISE. self.header_data.append(frame.data) elif isinstance(frame, DataFrame): # Append the data to the buffer. @@ -183,12 +196,16 @@ def receive_frame(self, frame): if increment and not self._remote_closed: w = WindowUpdateFrame(self.stream_id) w.window_increment = increment - self._data_cb(w) + self._data_cb(w, True) else: raise ValueError('Unexpected frame type: %i' % frame.type) - if 'END_HEADERS' in frame.flags: - self.response_headers = self._decoder.decode(b''.join(self.header_data)) + if 'END_HEADERS' in frame.flags or 'END_PUSH_PROMISE' in frame.flags: + headers = self._decoder.decode(b''.join(self.header_data)) + if self.promised_stream_id is None: + self.response_headers = headers + else: + self.promised_headers[self.promised_stream_id] = headers def open(self, end): """ @@ -228,10 +245,10 @@ def open(self, end): return - def getresponse(self): + def getheaders(self): """ - Once all data has been sent on this connection, returns a - HTTP20Response object wrapping this stream. + Once all data has been sent on this connection, returns a key-value set + of the headers of the response to the original request. """ assert self._local_closed @@ -244,19 +261,40 @@ def getresponse(self): int(get_from_key_value_set(self.response_headers, 'content-length', 0)) ) - # Create the HTTP response. - return HTTP20Response(self.response_headers, self) + return self.response_headers + + def getpushes(self, capture_all=False): + """ + Returns a generator that yields push promises from the server. Note that + this method is not idempotent; promises returned in one call will not be + returned in subsequent calls. Iterating through generators returned by + multiple calls to this method simultaneously results in undefined + behavior. + + :param capture_all: If ``False``, the generator will yield all buffered + push promises without blocking. If ``True``, the generator will + first yield all buffered push promises, then yield additional ones + as they arrive, and terminate when the original stream closes. + """ + while True: + for pair in self.promised_headers.items(): + yield pair + self.promised_headers = {} + if not capture_all or self._remote_closed: + break + self._recv_cb() - def close(self): + def close(self, error_code=None): """ Closes the stream. If the stream is currently open, attempts to close it as gracefully as possible. + :param error_code: (optional) The error code to reset the stream with. :returns: Nothing. """ # Right now let's not bother with grace, let's just call close on the # connection. - self._close_cb(self.stream_id) + self._close_cb(self.stream_id, error_code) def _send_chunk(self, data, final): """ diff --git a/hyper/http20/util.py b/hyper/http20/util.py index eadec40d..092ed3e8 100644 --- a/hyper/http20/util.py +++ b/hyper/http20/util.py @@ -10,7 +10,26 @@ def get_from_key_value_set(kvset, key, default=None): Returns a value from a key-value set, or the default if the value isn't present. """ - for name, value in kvset: - if name == key: - return value - return default + value = pop_from_key_value_set(kvset.copy(), key)[0] + return value if value is not None else default + +def pop_from_key_value_set(kvset, *keys): + """ + Pops the values of ``keys`` from ``kvset`` and returns them as a tuple. If a + key is not found in ``kvset``, ``None`` is used instead. + + >>> kvset = [('a',0), ('b',1), ('c',2)] + >>> pop_from_key_value_set(kvset, 'a', 'foo', 'c') + (0, None, 2) + >>> kvset + [('b', 1)] + """ + extracted = [None] * len(keys) + rest = set() + for key, value in kvset: + try: + extracted[keys.index(key)] = value + except ValueError: + rest.add((key, value)) + kvset.intersection_update(rest) + return tuple(extracted) diff --git a/setup.py b/setup.py index a8487112..2d6d15c6 100644 --- a/setup.py +++ b/setup.py @@ -45,7 +45,9 @@ 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License', 'Programming Language :: Python', + 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', ] ) diff --git a/test/test_hyper.py b/test/test_hyper.py index ce26a53e..3a220d44 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -16,7 +16,9 @@ from hyper.http20.window import FlowControlManager from hyper.compat import zlib_compressobj from hyper.contrib import HTTP20Adapter +import errno import pytest +import socket import zlib from io import BytesIO @@ -195,9 +197,38 @@ def test_settings_frames_never_have_streams(self): class TestPushPromiseFrame(object): - def test_push_promise_unsupported(self): - with pytest.raises(NotImplementedError): - f = PushPromiseFrame(1) + def test_push_promise_frame_flags(self): + f = PushPromiseFrame(1) + flags = f.parse_flags(0xFF) + + assert flags == set(['END_PUSH_PROMISE']) + + def test_push_promise_frame_serialize_with_priority_properly(self): + f = PushPromiseFrame(1) + f.parse_flags(0xFF) + f.promised_stream_id = 4 + f.data = b'hello world' + + s = f.serialize() + assert s == ( + b'\x00\x0F\x05\x01\x00\x00\x00\x01' + + b'\x00\x00\x00\x04' + + b'hello world' + ) + + def test_push_promise_frame_parses_properly(self): + s = ( + b'\x00\x0F\x05\x0D\x00\x00\x00\x01' + + b'\x00\x00\x00\x04' + + b'hello world' + ) + f, length = Frame.parse_frame_header(s[:8]) + f.parse_body(s[8:8 + length]) + + assert isinstance(f, PushPromiseFrame) + assert f.flags == set(['END_PUSH_PROMISE']) + assert f.promised_stream_id == 4 + assert f.data == b'hello world' class TestPingFrame(object): @@ -1049,8 +1080,24 @@ def test_read_headers_out_of_order(self): assert c.getresponse(r3).getheaders() == [('content-type', 'baz/qux')] assert c.getresponse(r1).getheaders() == [('content-type', 'foo/bar')] + def test_headers_with_continuation(self): + e = Encoder() + h = HeadersFrame(1) + h.data = e.encode({':status': 200, 'content-type': 'foo/bar'}) + c = ContinuationFrame(1) + c.data = e.encode({'content-length': '0'}) + c.flags |= set(['END_HEADERS', 'END_STREAM']) + sock = DummySocket() + sock.buffer = BytesIO(h.serialize() + c.serialize()) + + c = HTTP20Connection('www.google.com') + c._sock = sock + r = c.request('GET', '/') + + assert set(c.getresponse(r).getheaders()) == set([('content-type', 'foo/bar'), ('content-length', '0')]) + def test_receive_unexpected_frame(self): - # RSTSTREAM frames are never defined on connections, so send one of + # RST_STREAM frames are never defined on connections, so send one of # those. c = HTTP20Connection('www.google.com') f = RstStreamFrame(1) @@ -1058,6 +1105,169 @@ def test_receive_unexpected_frame(self): with pytest.raises(ValueError): c.receive_frame(f) + def test_send_tolerate_peer_gone(self): + class ErrorSocket(DummySocket): + def send(self, data): + raise socket.error(errno.EPIPE) + + c = HTTP20Connection('www.google.com') + c._sock = ErrorSocket() + f = SettingsFrame(0) + with pytest.raises(socket.error): + c._send_cb(f, False) + c._sock = DummySocket() + c._send_cb(f, True) # shouldn't raise an error + + +class TestServerPush(object): + def setup_method(self, method): + self.frames = [] + self.encoder = Encoder() + self.conn = None + + def add_push_frame(self, stream_id, promised_stream_id, headers, end_block=True): + frame = PushPromiseFrame(stream_id) + frame.promised_stream_id = promised_stream_id + frame.data = self.encoder.encode(headers) + if end_block: + frame.flags.add('END_PUSH_PROMISE') + self.frames.append(frame) + + def add_headers_frame(self, stream_id, headers, end_block=True, end_stream=False): + frame = HeadersFrame(stream_id) + frame.data = self.encoder.encode(headers) + if end_block: + frame.flags.add('END_HEADERS') + if end_stream: + frame.flags.add('END_STREAM') + self.frames.append(frame) + + def add_data_frame(self, stream_id, data, end_stream=False): + frame = DataFrame(stream_id) + frame.data = data + if end_stream: + frame.flags.add('END_STREAM') + self.frames.append(frame) + + def request(self): + self.conn = HTTP20Connection('www.google.com', enable_push=True) + self.conn._sock = DummySocket() + self.conn._sock.buffer = BytesIO(b''.join([frame.serialize() for frame in self.frames])) + self.conn.request('GET', '/') + + def assert_response(self): + self.response = self.conn.getresponse() + assert self.response.status == 200 + assert dict(self.response.getheaders()) == {'content-type': 'text/html'} + + def assert_pushes(self): + self.pushes = list(self.conn.getpushes()) + assert len(self.pushes) == 1 + assert self.pushes[0].method == 'GET' + assert self.pushes[0].scheme == 'https' + assert self.pushes[0].authority == 'www.google.com' + assert self.pushes[0].path == '/' + expected_headers = {'accept-encoding': 'gzip'} + for name, value in expected_headers.items(): + assert self.pushes[0].getrequestheader(name) == value + assert dict(self.pushes[0].getrequestheaders()) == expected_headers + + def assert_push_response(self): + push_response = self.pushes[0].getresponse() + assert push_response.status == 200 + assert dict(push_response.getheaders()) == {'content-type': 'application/javascript'} + assert push_response.read() == b'bar' + + def test_promise_before_headers(self): + self.add_push_frame(1, 2, [(':method', 'GET'), (':path', '/'), (':authority', 'www.google.com'), (':scheme', 'https'), ('accept-encoding', 'gzip')]) + self.add_headers_frame(1, [(':status', '200'), ('content-type', 'text/html')]) + self.add_data_frame(1, b'foo', end_stream=True) + self.add_headers_frame(2, [(':status', '200'), ('content-type', 'application/javascript')]) + self.add_data_frame(2, b'bar', end_stream=True) + + self.request() + assert len(list(self.conn.getpushes())) == 0 + self.assert_response() + self.assert_pushes() + assert self.response.read() == b'foo' + self.assert_push_response() + + def test_promise_after_headers(self): + self.add_headers_frame(1, [(':status', '200'), ('content-type', 'text/html')]) + self.add_push_frame(1, 2, [(':method', 'GET'), (':path', '/'), (':authority', 'www.google.com'), (':scheme', 'https'), ('accept-encoding', 'gzip')]) + self.add_data_frame(1, b'foo', end_stream=True) + self.add_headers_frame(2, [(':status', '200'), ('content-type', 'application/javascript')]) + self.add_data_frame(2, b'bar', end_stream=True) + + self.request() + assert len(list(self.conn.getpushes())) == 0 + self.assert_response() + assert len(list(self.conn.getpushes())) == 0 + assert self.response.read() == b'foo' + self.assert_pushes() + self.assert_push_response() + + def test_promise_after_data(self): + self.add_headers_frame(1, [(':status', '200'), ('content-type', 'text/html')]) + self.add_data_frame(1, b'fo') + self.add_push_frame(1, 2, [(':method', 'GET'), (':path', '/'), (':authority', 'www.google.com'), (':scheme', 'https'), ('accept-encoding', 'gzip')]) + self.add_data_frame(1, b'o', end_stream=True) + self.add_headers_frame(2, [(':status', '200'), ('content-type', 'application/javascript')]) + self.add_data_frame(2, b'bar', end_stream=True) + + self.request() + assert len(list(self.conn.getpushes())) == 0 + self.assert_response() + assert len(list(self.conn.getpushes())) == 0 + assert self.response.read() == b'foo' + self.assert_pushes() + self.assert_push_response() + + def test_capture_all_promises(self): + self.add_push_frame(1, 2, [(':method', 'GET'), (':path', '/one'), (':authority', 'www.google.com'), (':scheme', 'https'), ('accept-encoding', 'gzip')]) + self.add_headers_frame(1, [(':status', '200'), ('content-type', 'text/html')]) + self.add_push_frame(1, 4, [(':method', 'GET'), (':path', '/two'), (':authority', 'www.google.com'), (':scheme', 'https'), ('accept-encoding', 'gzip')]) + self.add_data_frame(1, b'foo', end_stream=True) + self.add_headers_frame(4, [(':status', '200'), ('content-type', 'application/javascript')]) + self.add_headers_frame(2, [(':status', '200'), ('content-type', 'application/javascript')]) + self.add_data_frame(4, b'two', end_stream=True) + self.add_data_frame(2, b'one', end_stream=True) + + self.request() + assert len(list(self.conn.getpushes())) == 0 + pushes = list(self.conn.getpushes(capture_all=True)) + assert len(pushes) == 2 + assert pushes[0].path == '/one' + assert pushes[1].path == '/two' + assert pushes[0].getresponse().read() == b'one' + assert pushes[1].getresponse().read() == b'two' + self.assert_response() + assert self.response.read() == b'foo' + + def test_cancel_push(self): + self.add_push_frame(1, 2, [(':method', 'GET'), (':path', '/'), (':authority', 'www.google.com'), (':scheme', 'https'), ('accept-encoding', 'gzip')]) + self.add_headers_frame(1, [(':status', '200'), ('content-type', 'text/html')]) + + self.request() + self.conn.getresponse() + list(self.conn.getpushes())[0].cancel() + + f = RstStreamFrame(2) + f.error_code = 8 + assert self.conn._sock.queue[-1] == f.serialize() + + def test_reset_pushed_streams_when_push_disabled(self): + self.add_push_frame(1, 2, [(':method', 'GET'), (':path', '/'), (':authority', 'www.google.com'), (':scheme', 'https'), ('accept-encoding', 'gzip')]) + self.add_headers_frame(1, [(':status', '200'), ('content-type', 'text/html')]) + + self.request() + self.conn._enable_push = False + self.conn.getresponse() + + f = RstStreamFrame(2) + f.error_code = 7 + assert self.conn._sock.queue[-1] == f.serialize() + class TestHyperStream(object): def test_streams_have_ids(self): @@ -1165,7 +1375,7 @@ def test_stream_reading_works(self): out_frames = [] in_frames = [] - def send_cb(frame): + def send_cb(frame, tolerate_peer_gone=False): out_frames.append(frame) def recv_cb(s): @@ -1191,7 +1401,7 @@ def test_can_read_multiple_frames_from_streams(self): out_frames = [] in_frames = [] - def send_cb(frame): + def send_cb(frame, tolerate_peer_gone=False): out_frames.append(frame) def recv_cb(s): @@ -1223,7 +1433,7 @@ def test_partial_reads_from_streams(self): out_frames = [] in_frames = [] - def send_cb(frame): + def send_cb(frame, tolerate_peer_gone=False): out_frames.append(frame) def recv_cb(s): @@ -1267,14 +1477,14 @@ def test_receive_unexpected_frame(self): class TestResponse(object): def test_status_is_stripped_from_headers(self): - headers = {':status': '200'} + headers = set([(':status', '200')]) resp = HTTP20Response(headers, None) assert resp.status == 200 assert resp.getheaders() == [] def test_response_transparently_decrypts_gzip(self): - headers = {':status': '200', 'content-encoding': 'gzip'} + headers = set([(':status', '200'), ('content-encoding', 'gzip')]) c = zlib_compressobj(wbits=24) body = c.compress(b'this is test data') body += c.flush() @@ -1283,7 +1493,7 @@ def test_response_transparently_decrypts_gzip(self): assert resp.read() == b'this is test data' def test_response_transparently_decrypts_real_deflate(self): - headers = {':status': '200', 'content-encoding': 'deflate'} + headers = set([(':status', '200'), ('content-encoding', 'deflate')]) c = zlib_compressobj(wbits=zlib.MAX_WBITS) body = c.compress(b'this is test data') body += c.flush() @@ -1292,7 +1502,7 @@ def test_response_transparently_decrypts_real_deflate(self): assert resp.read() == b'this is test data' def test_response_transparently_decrypts_wrong_deflate(self): - headers = {':status': '200', 'content-encoding': 'deflate'} + headers = set([(':status', '200'), ('content-encoding', 'deflate')]) c = zlib_compressobj(wbits=-zlib.MAX_WBITS) body = c.compress(b'this is test data') body += c.flush() @@ -1302,7 +1512,7 @@ def test_response_transparently_decrypts_wrong_deflate(self): def test_response_calls_stream_close(self): stream = DummyStream('') - resp = HTTP20Response({':status': '200'}, stream) + resp = HTTP20Response(set([(':status', '200')]), stream) resp.close() assert stream.closed @@ -1310,13 +1520,13 @@ def test_response_calls_stream_close(self): def test_responses_are_context_managers(self): stream = DummyStream('') - with HTTP20Response({':status': '200'}, stream) as resp: + with HTTP20Response(set([(':status', '200')]), stream) as resp: pass assert stream.closed def test_read_small_chunks(self): - headers = {':status': '200'} + headers = set([(':status', '200')]) stream = DummyStream(b'1234567890') chunks = [b'12', b'34', b'56', b'78', b'90'] resp = HTTP20Response(headers, stream) @@ -1327,7 +1537,7 @@ def test_read_small_chunks(self): assert resp.read() == b'' def test_read_buffered(self): - headers = {':status': '200'} + headers = set([(':status', '200')]) stream = DummyStream(b'1234567890') chunks = [b'12', b'34', b'56', b'78', b'90'] * 2 resp = HTTP20Response(headers, stream) @@ -1339,21 +1549,21 @@ def test_read_buffered(self): assert resp.read() == b'' def test_getheader(self): - headers = {':status': '200', 'content-type': 'application/json'} + headers = set([(':status', '200'), ('content-type', 'application/json')]) stream = DummyStream(b'') resp = HTTP20Response(headers, stream) assert resp.getheader('content-type') == 'application/json' def test_getheader_default(self): - headers = {':status': '200'} + headers = set([(':status', '200')]) stream = DummyStream(b'') resp = HTTP20Response(headers, stream) assert resp.getheader('content-type', 'text/html') == 'text/html' def test_fileno_not_implemented(self): - resp = HTTP20Response({':status': '200'}, DummyStream(b'')) + resp = HTTP20Response(set([(':status', '200')]), DummyStream(b'')) with pytest.raises(NotImplementedError): resp.fileno()