Просмотр исходного кода

Implemented control frame handlers

Taddeus Kroes 13 лет назад
Родитель
Сommit
437e971299
5 измененных файлов с 177 добавлено и 41 удалено
  1. 0 1
      TODO
  2. 4 0
      exceptions.py
  3. 78 9
      frame.py
  4. 2 22
      message.py
  5. 93 9
      websocket.py

+ 0 - 1
TODO

@@ -1,3 +1,2 @@
-- Handle control frames as defined in RFC 6455.
 - Finish WebSocket.
 - Finish Server/Client.

+ 4 - 0
exceptions.py

@@ -1,2 +1,6 @@
 class SocketClosed(Exception):
     pass
+
+
+class PingError(Exception):
+    pass

+ 78 - 9
frame.py

@@ -11,6 +11,16 @@ OPCODE_CLOSE = 0x8
 OPCODE_PING = 0x9
 OPCODE_PONG = 0xA
 
+CLOSE_NORMAL = 1000
+CLOSE_GOING_AWAY = 1001
+CLOSE_PROTOCOL_ERROR = 1002
+CLOSE_NOACCEPT_DTYPE = 1003
+CLOSE_INVALID_DATA = 1007
+CLOSE_POLICY = 1008
+CLOSE_MESSAGE_TOOBIG = 1009
+CLOSE_MISSING_EXTENSIONS = 1010
+CLOSE_UNABLE = 1011
+
 
 class Frame(object):
     """
@@ -115,12 +125,55 @@ class Frame(object):
         return frames
 
     def __str__(self):
-        return '<Frame opcode=0x%X len=%d>' % (self.opcode, len(self.payload))
+        s = '<%s opcode=0x%X len=%d' \
+            % (self.__class__.__name__, self.opcode, len(self.payload))
+
+        if self.masking_key:
+            s += ' masking_key=%4s' % self.masking_key
 
+        return s + '>'
 
-def receive_fragments(sock):
+
+class ControlFrame(Frame):
+    """
+    A Control frame is a frame with an opcode OPCODE_CLOSE, OPCODE_PING or
+    OPCODE_PONG. These frames must be handled as defined by RFC 6455, and
     """
-    Receive a sequence of frames that belong together:
+    def fragment(self, fragment_size, mask=False):
+        """
+        Control frames must not be fragmented.
+        """
+        raise TypeError('control frames must not be fragmented')
+
+    def pack(self):
+        """
+        Same as Frame.pack(), but asserts that the payload size does not exceed
+        125 bytes.
+        """
+        if len(self.payload) > 125:
+            raise ValueError('control frames must not be larger than 125' \
+                             'bytes')
+
+        return Frame.pack(self)
+
+    def unpack_close(self):
+        """
+        Unpack a close message into a status code and a reason. If no payload
+        is given, the code is None and the reason is an empty string.
+        """
+        if self.payload:
+            code = struct.unpack('!H', self.payload[:2])
+            reason = self.payload[2:]
+        else:
+            code = None
+            reason = ''
+
+        return code, reason
+
+
+def receive_fragments(sock, control_frame_handler):
+    """
+    Receive a sequence of frames that belong together on socket `sock':
     - An initial frame with non-zero opcode
     - Zero or more frames with opcode = 0 and final = False
     - A final frame with opcode = 0 and final = True
@@ -128,18 +181,31 @@ def receive_fragments(sock):
     The first and last frame may be the same frame, having a non-zero opcode
     and final = True. Thus, this function returns a list of at least a single
     frame.
+
+    `control_frame_handler' is a callback function taking a single argument,
+    which is a Frame instance
     """
-    fragments = [receive_frame(sock)]
+    fragments = []
+
+    while not len(fragments) or not fragments[-1].final:
+        frame = receive_frame(sock)
 
-    while not fragments[-1].final:
-        fragments.append(receive_frame(sock))
+        if isinstance(frame, ControlFrame):
+            control_frame_handler(frame)
+
+            # No more receiving data after a close message
+            if frame.opcode == OPCODE_CLOSE:
+                break
+        else:
+            fragments.append(frame)
 
     return fragments
 
 
 def receive_frame(sock):
     """
-    Receive a single frame on socket `sock'.
+    Receive a single frame on socket `sock'. The frame schme is explained in
+    the docs of Frame.pack().
     """
     b1, b2 = struct.unpack('!BB', recvn(sock, 2))
 
@@ -164,8 +230,11 @@ def receive_frame(sock):
         masking_key = ''
         payload = recvn(sock, payload_len)
 
-    return Frame(opcode, payload, masking_key=masking_key, final=final,
-                 rsv1=rsv1, rsv2=rsv2, rsv3=rsv3)
+    # Control frames have most significant bit 1
+    cls = ControlFrame if opcode & 0x8 else Frame
+
+    return cls(opcode, payload, masking_key=masking_key, final=final,
+               rsv1=rsv1, rsv2=rsv2, rsv3=rsv3)
 
 
 def recvn(sock, n):

+ 2 - 22
message.py

@@ -1,9 +1,7 @@
-from frame import Frame, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, \
-        OPCODE_PING, OPCODE_PONG
+from frame import Frame, OPCODE_TEXT, OPCODE_BINARY
 
 
-__all__ = ['Message', 'TextMessage', 'BinaryMessage', 'CloseMessage',
-           'PingMessage', 'PongMessage']
+__all__ = ['Message', 'TextMessage', 'BinaryMessage']
 
 
 class Message(object):
@@ -32,27 +30,9 @@ class BinaryMessage(Message):
         super(TextMessage, self).__init__(OPCODE_BINARY, payload)
 
 
-class CloseMessage(Message):
-    def __init__(self, payload):
-        super(TextMessage, self).__init__(OPCODE_CLOSE, payload)
-
-
-class PingMessage(Message):
-    def __init__(self, payload):
-        super(TextMessage, self).__init__(OPCODE_PING, payload)
-
-
-class PongMessage(Message):
-    def __init__(self, payload):
-        super(TextMessage, self).__init__(OPCODE_PONG, payload)
-
-
 OPCODE_CLASS_MAP = {
     OPCODE_TEXT: TextMessage,
     OPCODE_BINARY: BinaryMessage,
-    OPCODE_CLOSE: CloseMessage,
-    OPCODE_PING: PingMessage,
-    OPCODE_PONG: PongMessage,
 }
 
 

+ 93 - 9
websocket.py

@@ -1,10 +1,12 @@
 import re
+import struct
 from hashlib import sha1
 from threading import Thread
 
-from frame import receive_fragments
+from frame import ControlFrame, receive_fragments, receive_frame, \
+        OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG
 from message import create_message
-from exceptions import SocketClosed
+from exceptions import SocketClosed, PingError
 
 
 WS_GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
@@ -17,6 +19,12 @@ class WebSocket(object):
         self.address = address
         self.encoding = encoding
 
+        self.received_close_params = None
+        self.close_frame_sent = False
+
+        self.ping_sent = False
+        self.ping_payload = None
+
     def send_message(self, message, fragment_size=None):
         if fragment_size is None:
             self.send_frame(message.frame())
@@ -26,8 +34,27 @@ class WebSocket(object):
     def send_frame(self, frame):
         self.sock.sendall(frame.pack())
 
+    def handle_control_frame(self, frame):
+        if frame.opcode == OPCODE_CLOSE:
+            self.received_close_params = frame.unpack_close()
+        elif frame.opcode == OPCODE_PING:
+            # Respond with a pong message with identical payload
+            self.send_frame(ControlFrame(OPCODE_PONG, frame.payload))
+        elif frame.opcode == OPCODE_PONG:
+            # Assert that the PONG payload is identical to that of the PING
+            if not self.ping_sent:
+                raise PingError('received PONG while no PING was sent')
+
+            self.ping_sent = False
+
+            if frame.payload != self.ping_payload:
+                raise PingError('received PONG with invalid payload')
+
+            self.ping_payload = None
+            self.onpong(frame.payload)
+
     def receive_message(self):
-        frames = receive_fragments(self.sock)
+        frames = receive_fragments(self.sock, self.handle_control_frame)
         payload = ''.join([f.payload for f in frames])
         return create_message(frames[0].opcode, payload)
 
@@ -79,14 +106,60 @@ class WebSocket(object):
         try:
             while True:
                 self.onmessage(self, self.receive_message())
+
+                if self.received_close_params is not None:
+                    self.handle_close(*self.received_close_params)
+                    break
         except SocketClosed:
-            self.onclose()
+            self.onclose(None, '')
 
     def run_threaded(self, daemon=True):
         t = Thread(target=self.receive_forever)
         t.daemon = daemon
         t.start()
 
+    def send_close(self, code, reason):
+        payload = '' if code is None else struct.pack('!H', code)
+        self.send_frame(ControlFrame(OPCODE_CLOSE, payload))
+        self.close_frame_sent = True
+
+    def send_ping(self, payload=''):
+        """
+        Send a ping control frame with an optional payload.
+        """
+        self.send_frame(ControlFrame(OPCODE_PING, payload))
+        self.ping_payload = payload
+        self.ping_sent = True
+        self.onping()
+
+    def handle_close(self, code=None, reason=''):
+        """
+        Handle a close message by sending a response close message if no close
+        message was sent before, and closing the connection. The onclose()
+        handler is called afterwards.
+        """
+        if not self.close_frame_sent:
+            payload = '' if code is None else struct.pack('!H', code)
+            self.send_frame(ControlFrame(OPCODE_CLOSE, payload))
+
+        self.sock.close()
+        self.onclose(code, reason)
+
+    def close(self, code=None, reason=''):
+        """
+        Close the socket by sending a close message and waiting for a response
+        close message. The onclose() handler is called after the close message
+        has been sent, but before the response has been received.
+        """
+        self.send_close(code, reason)
+        # FIXME: swap the two lines below?
+        self.onclose(code, reason)
+        frame = receive_frame(self.sock)
+        self.sock.close()
+
+        if frame.opcode != OPCODE_CLOSE:
+            raise ValueError('Expected close frame, got %s instead' % frame)
+
     def onopen(self):
         """
         Called after the handshake has completed.
@@ -100,11 +173,22 @@ class WebSocket(object):
         """
         raise NotImplemented
 
-    def onclose(self):
+    def onping(self, payload):
         """
-        Called when the other end of the socket disconnects.
+        Called after a ping control frame has been sent. This handler could be
+        used to start a timeout handler for a pong message that is not received
+        in time.
         """
-        pass
+        raise NotImplemented
 
-    def close(self):
-        raise SocketClosed()
+    def onpong(self, payload):
+        """
+        Called when a pong control frame is received.
+        """
+        raise NotImplemented
+
+    def onclose(self, code, reason):
+        """
+        Called when the socket is closed by either end point.
+        """
+        pass