connection.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import struct
  2. from frame import ControlFrame, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG, \
  3. OPCODE_CONTINUATION
  4. from message import create_message
  5. from errors import SocketClosed, PingError
  6. class Connection(object):
  7. """
  8. A Connection uses a websocket instance to send and receive (optionally
  9. fragmented) messages, which are Message instances. Control frames are
  10. handled automatically in the way specified by RFC 6455.
  11. To use the Connection class, it should be extended and the extending class
  12. should implement the on*() event handlers.
  13. """
  14. def __init__(self, sock):
  15. """
  16. `sock` is a websocket instance which has completed its handshake.
  17. """
  18. self.sock = sock
  19. self.close_frame_received = False
  20. self.ping_sent = False
  21. self.ping_payload = None
  22. self.onopen()
  23. def send(self, message, fragment_size=None, mask=False):
  24. """
  25. Send a message. If `fragment_size` is specified, the message is
  26. fragmented into multiple frames whose payload size does not extend
  27. `fragment_size`.
  28. """
  29. if fragment_size is None:
  30. self.sock.send(message.frame(mask=mask))
  31. else:
  32. self.sock.send(*message.fragment(fragment_size, mask=mask))
  33. def receive(self):
  34. """
  35. Receive a message. A message may consist of multiple (ordered) data
  36. frames. A control frame may be delivered at any time, also when
  37. expecting the next continuation frame of a fragmented message. These
  38. control frames are handled immediately by handle_control_frame().
  39. """
  40. fragments = []
  41. while not len(fragments) or not fragments[-1].final:
  42. frame = self.sock.recv()
  43. if isinstance(frame, ControlFrame):
  44. self.handle_control_frame(frame)
  45. elif len(fragments) and frame.opcode != OPCODE_CONTINUATION:
  46. raise ValueError('expected continuation/control frame, got %s '
  47. 'instead' % frame)
  48. else:
  49. fragments.append(frame)
  50. payload = bytearray()
  51. for f in fragments:
  52. payload += f.payload
  53. return create_message(fragments[0].opcode, payload)
  54. def handle_control_frame(self, frame):
  55. """
  56. Handle a control frame as defined by RFC 6455.
  57. """
  58. if frame.opcode == OPCODE_CLOSE:
  59. # Close the connection from this end as well
  60. self.close_frame_received = True
  61. code, reason = frame.unpack_close()
  62. # No more receiving data after a close message
  63. raise SocketClosed(code, reason)
  64. elif frame.opcode == OPCODE_PING:
  65. # Respond with a pong message with identical payload
  66. self.sock.send(ControlFrame(OPCODE_PONG, frame.payload))
  67. elif frame.opcode == OPCODE_PONG:
  68. # Assert that the PONG payload is identical to that of the PING
  69. if not self.ping_sent:
  70. raise PingError('received PONG while no PING was sent')
  71. self.ping_sent = False
  72. if frame.payload != self.ping_payload:
  73. raise PingError('received PONG with invalid payload')
  74. self.ping_payload = None
  75. self.onpong(frame.payload)
  76. def receive_forever(self):
  77. """
  78. Receive and handle messages in an endless loop. A message may consist
  79. of multiple data frames, but this is not visible for onmessage().
  80. Control messages (or control frames) are handled automatically.
  81. """
  82. while True:
  83. try:
  84. self.onmessage(self.receive())
  85. except SocketClosed as e:
  86. self.close(e.code, e.reason)
  87. break
  88. except Exception as e:
  89. self.onerror(e)
  90. def send_ping(self, payload=''):
  91. """
  92. Send a PING control frame with an optional payload.
  93. """
  94. self.sock.send(ControlFrame(OPCODE_PING, payload))
  95. self.ping_payload = payload
  96. self.ping_sent = True
  97. self.onping(payload)
  98. def close(self, code=None, reason=''):
  99. """
  100. Close the socket by sending a CLOSE frame and waiting for a response
  101. close message, unless such a message has already been received earlier
  102. (prior to calling this function, for example). The onclose() handler is
  103. called after the response has been received, but before the socket is
  104. actually closed. This order was chosen to prevent errors in
  105. stringification in the onclose() handler. For example,
  106. socket.getpeername() raises a Bad file descriptor error then the socket
  107. is closed.
  108. """
  109. # Send CLOSE frame
  110. payload = '' if code is None else struct.pack('!H', code) + reason
  111. self.sock.send(ControlFrame(OPCODE_CLOSE, payload))
  112. # Receive CLOSE frame
  113. if not self.close_frame_received:
  114. frame = self.sock.recv()
  115. if frame.opcode != OPCODE_CLOSE:
  116. raise ValueError('expected CLOSE frame, got %s instead' % frame)
  117. res_code, res_reason = frame.unpack_close()
  118. # FIXME: check if res_code == code and res_reason == reason?
  119. # FIXME: alternatively, keep receiving frames in a loop until a
  120. # CLOSE frame is received, so that a fragmented chain may arrive
  121. # fully first
  122. self.onclose(code, reason)
  123. self.sock.close()
  124. def onopen(self):
  125. """
  126. Called after the connection is initialized.
  127. """
  128. return NotImplemented
  129. def onmessage(self, message):
  130. """
  131. Called when a message is received. `message` is a Message object, which
  132. can be constructed from a single frame or multiple fragmented frames.
  133. """
  134. return NotImplemented
  135. def onping(self, payload):
  136. """
  137. Called after a PING control frame has been sent. This handler could be
  138. used to start a timeout handler for a PONG frame that is not received
  139. in time.
  140. """
  141. return NotImplemented
  142. def onpong(self, payload):
  143. """
  144. Called when a PONG control frame is received.
  145. """
  146. return NotImplemented
  147. def onclose(self, code, reason):
  148. """
  149. Called when the socket is closed by either end point.
  150. """
  151. return NotImplemented
  152. def onerror(self, e):
  153. """
  154. Handle a raised exception.
  155. """
  156. return NotImplemented