connection.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import struct
  2. from frame import ControlFrame, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG
  3. from message import create_message
  4. from exceptions import SocketClosed, PingError
  5. class Connection(object):
  6. """
  7. A Connection uses a websocket instance to send and receive (optionally
  8. fragmented) messages, which are Message instances. Control frames are
  9. handled automatically in the way specified by RFC 6455.
  10. To use the Connection class, it should be extended and the extending class
  11. should implement the on*() event handlers.
  12. """
  13. def __init__(self, sock):
  14. """
  15. `sock` is a websocket instance which has completed its handshake.
  16. """
  17. self.sock = sock
  18. self.received_close_params = None
  19. self.close_frame_sent = False
  20. self.ping_sent = False
  21. self.ping_payload = None
  22. self.onopen()
  23. def send(self, message, fragment_size=None, mask=False):
  24. """
  25. Send a message. If `fragment_size` is specified, the message is
  26. fragmented into multiple frames whose payload size does not extend
  27. `fragment_size`.
  28. """
  29. if fragment_size is None:
  30. self.sock.send(message.frame(mask=mask))
  31. else:
  32. self.sock.send(*message.fragment(fragment_size, mask=mask))
  33. def receive(self):
  34. """
  35. Receive a message. A message may consist of multiple (ordered) data
  36. frames. A control frame may be delivered at any time, also when
  37. expecting the next data frame of a fragmented message. These control
  38. frames are handled immediately bu handle_control_frame().
  39. """
  40. fragments = []
  41. while not len(fragments) or not fragments[-1].final:
  42. frame = self.sock.recv()
  43. if isinstance(frame, ControlFrame):
  44. self.handle_control_frame(frame)
  45. # No more receiving data after a close message
  46. if frame.opcode == OPCODE_CLOSE:
  47. break
  48. else:
  49. fragments.append(frame)
  50. payload = ''.join([f.payload for f in fragments])
  51. return create_message(fragments[0].opcode, payload)
  52. def handle_control_frame(self, frame):
  53. """
  54. Handle a control frame as defined by RFC 6455.
  55. """
  56. if frame.opcode == OPCODE_CLOSE:
  57. # Set parameters and keep receiving the current fragmented frame
  58. # chain, assuming that the CLOSE frame will be handled by
  59. # handle_close() as soon as possible
  60. self.received_close_params = frame.unpack_close()
  61. elif frame.opcode == OPCODE_PING:
  62. # Respond with a pong message with identical payload
  63. self.sock.send(ControlFrame(OPCODE_PONG, frame.payload))
  64. elif frame.opcode == OPCODE_PONG:
  65. # Assert that the PONG payload is identical to that of the PING
  66. if not self.ping_sent:
  67. raise PingError('received PONG while no PING was sent')
  68. self.ping_sent = False
  69. if frame.payload != self.ping_payload:
  70. raise PingError('received PONG with invalid payload')
  71. self.ping_payload = None
  72. self.onpong(frame.payload)
  73. def receive_forever(self):
  74. """
  75. Receive and handle messages in an endless loop. A message may consist
  76. of multiple data frames, but this is not visible for onmessage().
  77. Control messages (or control frames) are handled automatically.
  78. """
  79. while True:
  80. try:
  81. self.onmessage(self, self.receive())
  82. if self.received_close_params is not None:
  83. self.handle_close(*self.received_close_params)
  84. break
  85. except SocketClosed:
  86. self.onclose(None, '')
  87. break
  88. except Exception as e:
  89. self.onexception(e)
  90. def send_close(self, code, reason):
  91. """
  92. Send a CLOSE control frame.
  93. """
  94. payload = '' if code is None else struct.pack('!H', code) + reason
  95. self.sock.send(ControlFrame(OPCODE_CLOSE, payload))
  96. self.close_frame_sent = True
  97. def send_ping(self, payload=''):
  98. """
  99. Send a PING control frame with an optional payload.
  100. """
  101. self.sock.send(ControlFrame(OPCODE_PING, payload))
  102. self.ping_payload = payload
  103. self.ping_sent = True
  104. self.onping(payload)
  105. def handle_close(self, code=None, reason=''):
  106. """
  107. Handle a close message by sending a response close message if no CLOSE
  108. frame was sent before, and closing the connection. The onclose()
  109. handler is called afterwards.
  110. """
  111. if not self.close_frame_sent:
  112. payload = '' if code is None else struct.pack('!H', code)
  113. self.sock.send(ControlFrame(OPCODE_CLOSE, payload))
  114. self.sock.close()
  115. self.onclose(code, reason)
  116. def close(self, code=None, reason=''):
  117. """
  118. Close the socket by sending a CLOSE frame and waiting for a response
  119. close message. The onclose() handler is called after the CLOSE frame
  120. has been sent, but before the response has been received.
  121. """
  122. self.send_close(code, reason)
  123. # FIXME: swap the two lines below?
  124. self.onclose(code, reason)
  125. frame = self.sock.recv()
  126. self.sock.close()
  127. if frame.opcode != OPCODE_CLOSE:
  128. raise ValueError('expected CLOSE frame, got %s instead' % frame)
  129. def onopen(self):
  130. """
  131. Called after the connection is initialized.
  132. """
  133. return NotImplemented
  134. def onmessage(self, message):
  135. """
  136. Called when a message is received. `message` is a Message object, which
  137. can be constructed from a single frame or multiple fragmented frames.
  138. """
  139. return NotImplemented
  140. def onping(self, payload):
  141. """
  142. Called after a PING control frame has been sent. This handler could be
  143. used to start a timeout handler for a PONG frame that is not received
  144. in time.
  145. """
  146. return NotImplemented
  147. def onpong(self, payload):
  148. """
  149. Called when a PONG control frame is received.
  150. """
  151. return NotImplemented
  152. def onclose(self, code, reason):
  153. """
  154. Called when the socket is closed by either end point.
  155. """
  156. return NotImplemented
  157. def onexception(self, e):
  158. """
  159. Handle a raised exception.
  160. """
  161. return NotImplemented