-
Notifications
You must be signed in to change notification settings - Fork 197
Refactor Stream to ensure header blocks are decoded in the correct order #39
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| build/ | ||
| env/ | ||
| dist/ | ||
| *.egg-info/ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,7 +56,10 @@ def __init__(self, | |
| self.stream_id = stream_id | ||
| self.state = STATE_IDLE | ||
| self.headers = [] | ||
| self._queued_frames = collections.deque() | ||
|
|
||
| self.response_headers = None | ||
| self.header_data = [] | ||
| self.data = [] | ||
|
|
||
| # There are two flow control windows: one for data we're sending, | ||
| # one for data being sent to us. | ||
|
|
@@ -110,6 +113,34 @@ def file_iterator(fobj): | |
| for chunk in chunks: | ||
| self._send_chunk(chunk, final) | ||
|
|
||
| @property | ||
| def _local_closed(self): | ||
| return self.state in (STATE_CLOSED, STATE_HALF_CLOSED_LOCAL) | ||
|
|
||
| @property | ||
| def _remote_closed(self): | ||
| return self.state in (STATE_CLOSED, STATE_HALF_CLOSED_REMOTE) | ||
|
|
||
| @property | ||
| def _local_open(self): | ||
| return self.state in (STATE_OPEN, STATE_HALF_CLOSED_REMOTE) | ||
|
|
||
| @property | ||
| def _remote_open(self): | ||
| return self.state in (STATE_OPEN, STATE_HALF_CLOSED_LOCAL) | ||
|
|
||
| def _close_local(self): | ||
| self.state = ( | ||
| STATE_HALF_CLOSED_LOCAL if self.state == STATE_OPEN | ||
| else STATE_CLOSED | ||
| ) | ||
|
|
||
| def _close_remote(self): | ||
| self.state = ( | ||
| STATE_HALF_CLOSED_REMOTE if self.state == STATE_OPEN | ||
| else STATE_CLOSED | ||
| ) | ||
|
|
||
| def _read(self, amt=None): | ||
| """ | ||
| Read data from the stream. Unlike a normal read behaviour, this | ||
|
|
@@ -118,35 +149,30 @@ def _read(self, amt=None): | |
| if self.state == STATE_CLOSED: | ||
| return b'' | ||
|
|
||
| assert self.state in (STATE_OPEN, STATE_HALF_CLOSED_LOCAL) | ||
| assert self._remote_open | ||
|
|
||
| def listlen(list): | ||
| return sum(map(len, list)) | ||
|
|
||
| data = [] | ||
|
|
||
| # Begin by processing frames off the queue. | ||
| while amt is None or listlen(data) < amt: | ||
| try: | ||
| frame = self._queued_frames.popleft() | ||
| except IndexError: | ||
| # No frames on the queue. Try to read one and try again. | ||
| self._recv_cb() | ||
| continue | ||
| # Keep reading until the stream is closed or we get enough data. | ||
| while not self._remote_closed and (amt is None or listlen(self.data) < amt): | ||
| self._recv_cb() | ||
|
|
||
| # All queued frames at this point should be data frames. | ||
| assert isinstance(frame, DataFrame) | ||
| result = b''.join(self.data) | ||
| self.data = [] | ||
| return result | ||
|
|
||
| def receive_frame(self, frame): | ||
| """ | ||
| Handle a frame received on this stream. | ||
| """ | ||
| if isinstance(frame, WindowUpdateFrame): | ||
| self._out_flow_control_window += frame.window_increment | ||
| elif isinstance(frame, (HeadersFrame, ContinuationFrame)): | ||
| self.header_data.append(frame.data) | ||
| elif isinstance(frame, DataFrame): | ||
| # Append the data to the buffer. | ||
| data.append(frame.data) | ||
|
|
||
| # If that was the last frame, we're done here. | ||
| if 'END_STREAM' in frame.flags: | ||
| self.state = ( | ||
| STATE_HALF_CLOSED_REMOTE if self.state == STATE_OPEN | ||
| else STATE_CLOSED | ||
| ) | ||
| break | ||
| self.data.append(frame.data) | ||
|
|
||
| # Increase the window size. Only do this if the data frame contains | ||
| # actual data. | ||
|
|
@@ -155,18 +181,14 @@ def listlen(list): | |
| w = WindowUpdateFrame(self.stream_id) | ||
| w.window_increment = increment | ||
| self._data_cb(w) | ||
| else: # pragma: no cover | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't pragma no cover this, test it. =) |
||
| raise ValueError('Unexpected frame type: %i' % frame.type) | ||
|
|
||
| return b''.join(data) | ||
| if 'END_HEADERS' in frame.flags: | ||
| self.response_headers = self._decoder.decode(b''.join(self.header_data)) | ||
|
|
||
| def receive_frame(self, frame): | ||
| """ | ||
| Handle a frame received on this stream. If this is a window update | ||
| frame, immediately update the window accordingly. | ||
| """ | ||
| if isinstance(frame, WindowUpdateFrame): | ||
| self._out_flow_control_window += frame.window_increment | ||
| else: | ||
| self._queued_frames.append(frame) | ||
| if 'END_STREAM' in frame.flags: | ||
| self._close_remote() | ||
|
|
||
| def open(self, end): | ||
| """ | ||
|
|
@@ -211,38 +233,19 @@ def getresponse(self): | |
| Once all data has been sent on this connection, returns a | ||
| HTTP20Response object wrapping this stream. | ||
| """ | ||
| assert self.state == STATE_HALF_CLOSED_LOCAL | ||
| header_data = [] | ||
|
|
||
| # At this stage, the only things in the frame queue should be HEADERS | ||
| # and CONTINUATION frames. Grab them all, reading more frames off the | ||
| # connection if necessary. | ||
| while True: | ||
| try: | ||
| frame = self._queued_frames.popleft() | ||
| except IndexError: | ||
| self._recv_cb() | ||
| continue | ||
|
|
||
| assert isinstance(frame, (HeadersFrame, ContinuationFrame)) | ||
| assert self._local_closed | ||
|
|
||
| header_data.append(frame.data) | ||
|
|
||
| if 'END_HEADERS' in frame.flags: | ||
| if 'END_STREAM' in frame.flags: | ||
| self.state = STATE_CLOSED | ||
| break | ||
|
|
||
| # Decode the headers. | ||
| headers = self._decoder.decode(b''.join(header_data)) | ||
| # Keep reading until all headers are received. | ||
| while self.response_headers is None: | ||
| self._recv_cb() | ||
|
|
||
| # Find the Content-Length header if present. | ||
| self._in_window_manager.document_size = ( | ||
| int(get_from_key_value_set(headers, 'content-length', 0)) | ||
| int(get_from_key_value_set(self.response_headers, 'content-length', 0)) | ||
| ) | ||
|
|
||
| # Create the HTTP response. | ||
| return HTTP20Response(headers, self) | ||
| return HTTP20Response(self.response_headers, self) | ||
|
|
||
| def close(self): | ||
| """ | ||
|
|
@@ -264,7 +267,7 @@ def _send_chunk(self, data, final): | |
| (determined by being of size less than MAX_CHUNK) and no more data is | ||
| to be sent. | ||
| """ | ||
| assert self.state in (STATE_OPEN, STATE_HALF_CLOSED_REMOTE) | ||
| assert self._local_open | ||
|
|
||
| f = DataFrame(self.stream_id) | ||
| f.data = data | ||
|
|
@@ -286,5 +289,4 @@ def _send_chunk(self, data, final): | |
|
|
||
| # If no more data is to be sent on this stream, transition our state. | ||
| if len(data) < MAX_CHUNK and final: | ||
| self.state = (STATE_HALF_CLOSED_LOCAL if self.state == STATE_OPEN | ||
| else STATE_CLOSED) | ||
| self._close_local() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -922,9 +922,7 @@ def test_we_can_read_from_the_socket(self): | |
| c._recv_cb() | ||
|
|
||
| s = c.recent_stream | ||
| assert len(s._queued_frames) == 1 | ||
| assert isinstance(s._queued_frames[0], DataFrame) | ||
| assert s._queued_frames[0].data == b'testdata' | ||
| assert s.data == [b'testdata'] | ||
|
|
||
| def test_putrequest_sends_data(self): | ||
| sock = DummySocket() | ||
|
|
@@ -1029,6 +1027,28 @@ def test_connections_handle_resizing_header_tables_properly(self): | |
| assert f2.stream_id == 0 | ||
| assert f2.flags == set(['ACK']) | ||
|
|
||
| def test_read_headers_out_of_order(self): | ||
| # If header blocks aren't decoded in the same order they're received, | ||
| # regardless of the stream they belong to, the decoder state will become | ||
| # corrupted. | ||
| e = Encoder() | ||
| h1 = HeadersFrame(1) | ||
| h1.data = e.encode({':status': 200, 'content-type': 'foo/bar'}) | ||
| h1.flags |= set(['END_HEADERS', 'END_STREAM']) | ||
| h3 = HeadersFrame(3) | ||
| h3.data = e.encode({':status': 200, 'content-type': 'baz/qux'}) | ||
| h3.flags |= set(['END_HEADERS', 'END_STREAM']) | ||
| sock = DummySocket() | ||
| sock.buffer = BytesIO(h1.serialize() + h3.serialize()) | ||
|
|
||
| c = HTTP20Connection('www.google.com') | ||
| c._sock = sock | ||
| r1 = c.request('GET', '/a') | ||
| r3 = c.request('GET', '/b') | ||
|
|
||
| assert c.getresponse(r3).getheaders() == [('content-type', 'baz/qux')] | ||
| assert c.getresponse(r1).getheaders() == [('content-type', 'foo/bar')] | ||
|
|
||
| def test_receive_unexpected_frame(self): | ||
| # RSTSTREAM frames are never defined on connections, so send one of | ||
| # those. | ||
|
|
@@ -1065,11 +1085,6 @@ def data_callback(frame): | |
|
|
||
| assert s.state == STATE_HALF_CLOSED_LOCAL | ||
|
|
||
| def test_receiving_a_frame_queues_it(self): | ||
| s = Stream(1, None, None, None, None, None, None) | ||
| s.receive_frame(Frame(0)) | ||
| assert len(s._queued_frames) == 1 | ||
|
|
||
| def test_file_objects_can_be_sent(self): | ||
| def data_callback(frame): | ||
| assert isinstance(frame, DataFrame) | ||
|
|
@@ -1158,7 +1173,7 @@ def inner(): | |
| s.receive_frame(in_frames.pop(0)) | ||
| return inner | ||
|
|
||
| s = Stream(1, send_cb, None, None, None, None, None) | ||
| s = Stream(1, send_cb, None, None, None, None, FlowControlManager(65535)) | ||
| s._recv_cb = recv_cb(s) | ||
| s.state = STATE_HALF_CLOSED_LOCAL | ||
|
|
||
|
|
@@ -1170,7 +1185,9 @@ def inner(): | |
|
|
||
| data = s._read() | ||
| assert data == b'hi there!' | ||
| assert len(out_frames) == 0 | ||
| assert len(out_frames) == 1 | ||
| assert isinstance(out_frames[0], WindowUpdateFrame) | ||
| assert out_frames[0].window_increment == len(b'hi there!') | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is enforcing incorrect behaviour. If the DATA frame has an END_STREAM flag set, it's a protocol error for us to send anything back on that stream, including a WINDOW_UPDATE. There's a bug here. =) |
||
|
|
||
| def test_can_read_multiple_frames_from_streams(self): | ||
| out_frames = [] | ||
|
|
@@ -1200,9 +1217,10 @@ def inner(): | |
|
|
||
| data = s._read() | ||
| assert data == b'hi there!hi there again!' | ||
| assert len(out_frames) == 1 | ||
| assert isinstance(out_frames[0], WindowUpdateFrame) | ||
| assert out_frames[0].window_increment == len(b'hi there!') | ||
| assert len(out_frames) == 2 | ||
| for frame, data in zip(out_frames, [b'hi there!', b'hi there again!']): | ||
| assert isinstance(frame, WindowUpdateFrame) | ||
| assert frame.window_increment == len(data) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same again here. |
||
|
|
||
| def test_partial_reads_from_streams(self): | ||
| out_frames = [] | ||
|
|
@@ -1238,7 +1256,7 @@ def inner(): | |
| # Now we'll get the entire of the second frame. | ||
| data = s._read(4) | ||
| assert data == b'hi there again!' | ||
| assert len(out_frames) == 1 | ||
| assert len(out_frames) == 2 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And here. |
||
| assert s.state == STATE_CLOSED | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I'll fix it up. =D