connection.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. import socket
  2. from frame import ControlFrame, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG, \
  3. OPCODE_CONTINUATION, create_close_frame
  4. from message import create_message
  5. from errors import SocketClosed, PingError
  6. class Connection(object):
  7. """
  8. A `Connection` uses a `websocket` instance to send and receive (optionally
  9. fragmented) messages, which are `Message` instances. Control frames are
  10. handled automatically in the way specified by RFC 6455.
  11. To use the `Connection` class, it should be extended and the extending
  12. class should implement the on*() event handlers.
  13. Example of an echo server (sends back what it receives):
  14. >>> import wspy
  15. >>> class EchoConnection(wspy.Connection):
  16. >>> def onopen(self):
  17. >>> print 'Connection opened at %s:%d' % self.sock.getpeername()
  18. >>> def onmessage(self, message):
  19. >>> print 'Received message "%s"' % message.payload
  20. >>> self.send(wspy.TextMessage(message.payload))
  21. >>> def onclose(self, code, reason):
  22. >>> print 'Connection closed'
  23. >>> server = wspy.websocket()
  24. >>> server.bind(('', 8000))
  25. >>> server.listen()
  26. >>> while True:
  27. >>> client, addr = server.accept()
  28. >>> EchoConnection(client).receive_forever()
  29. """
  30. def __init__(self, sock):
  31. """
  32. `sock` is a websocket instance which has completed its handshake.
  33. """
  34. self.sock = sock
  35. self.close_frame_sent = False
  36. self.close_frame_received = False
  37. self.ping_sent = False
  38. self.ping_payload = None
  39. self.hooks_send = []
  40. self.hooks_recv = []
  41. self.onopen()
  42. def message_to_frames(self, message, fragment_size=None, mask=False):
  43. frame = self.sock.apply_send_hooks(message.frame(mask=mask), True)
  44. if fragment_size is None:
  45. yield frame
  46. else:
  47. for fragment in frame.fragment(fragment_size):
  48. yield fragment
  49. def send(self, message, fragment_size=None, mask=False):
  50. """
  51. Send a message. If `fragment_size` is specified, the message is
  52. fragmented into multiple frames whose payload size does not extend
  53. `fragment_size`.
  54. """
  55. for frame in self.message_to_frames(message, fragment_size, mask):
  56. self.send_frame(frame)
  57. def send_frame(self, frame, callback=None):
  58. self.sock.send(frame)
  59. if callback:
  60. callback()
  61. def recv(self):
  62. """
  63. Receive a message. A message may consist of multiple (ordered) data
  64. frames. A control frame may be delivered at any time, also when
  65. expecting the next continuation frame of a fragmented message. These
  66. control frames are handled immediately by handle_control_frame().
  67. """
  68. fragments = []
  69. while not len(fragments) or not fragments[-1].final:
  70. frame = self.sock.recv()
  71. if isinstance(frame, ControlFrame):
  72. self.handle_control_frame(frame)
  73. elif len(fragments) > 0 and frame.opcode != OPCODE_CONTINUATION:
  74. raise ValueError('expected continuation/control frame, got %s '
  75. 'instead' % frame)
  76. else:
  77. fragments.append(frame)
  78. return self.concat_fragments(fragments)
  79. def concat_fragments(self, fragments):
  80. frame = fragments[0]
  81. for f in fragments[1:]:
  82. frame.payload += f.payload
  83. frame.final = True
  84. frame = self.sock.apply_recv_hooks(frame, True)
  85. return create_message(frame.opcode, frame.payload)
  86. def handle_control_frame(self, frame):
  87. """
  88. Handle a control frame as defined by RFC 6455.
  89. """
  90. if frame.opcode == OPCODE_CLOSE:
  91. self.close_frame_received = True
  92. code, reason = frame.unpack_close()
  93. if self.close_frame_sent:
  94. self.onclose(code, reason)
  95. self.sock.close()
  96. raise SocketClosed(True)
  97. else:
  98. self.close_params = (code, reason)
  99. self.send_close_frame(code, reason)
  100. elif frame.opcode == OPCODE_PING:
  101. # Respond with a pong message with identical payload
  102. self.send_frame(ControlFrame(OPCODE_PONG, frame.payload))
  103. elif frame.opcode == OPCODE_PONG:
  104. # Assert that the PONG payload is identical to that of the PING
  105. if not self.ping_sent:
  106. raise PingError('received PONG while no PING was sent')
  107. self.ping_sent = False
  108. if frame.payload != self.ping_payload:
  109. raise PingError('received PONG with invalid payload')
  110. self.ping_payload = None
  111. self.onpong(frame.payload)
  112. def receive_forever(self):
  113. """
  114. Receive and handle messages in an endless loop. A message may consist
  115. of multiple data frames, but this is not visible for onmessage().
  116. Control messages (or control frames) are handled automatically.
  117. """
  118. while True:
  119. try:
  120. self.onmessage(self.recv())
  121. except (KeyboardInterrupt, SystemExit, SocketClosed):
  122. break
  123. except Exception as e:
  124. self.onerror(e)
  125. self.onclose(None, 'error: %s' % e)
  126. try:
  127. self.sock.close()
  128. except socket.error:
  129. pass
  130. raise e
  131. def send_ping(self, payload=''):
  132. """
  133. Send a PING control frame with an optional payload.
  134. """
  135. self.send_frame(ControlFrame(OPCODE_PING, payload),
  136. lambda: self.onping(payload))
  137. self.ping_payload = payload
  138. self.ping_sent = True
  139. def send_close_frame(self, code, reason):
  140. self.send_frame(create_close_frame(code, reason))
  141. self.close_frame_sent = True
  142. self.shutdown_write()
  143. def shutdown_write(self):
  144. if self.close_frame_received:
  145. self.onclose(*self.close_params)
  146. self.sock.close()
  147. raise SocketClosed(False)
  148. else:
  149. self.sock.shutdown(socket.SHUT_WR)
  150. def close(self, code=None, reason=''):
  151. """
  152. Close the socket by sending a CLOSE frame and waiting for a response
  153. close message, unless such a message has already been received earlier
  154. (prior to calling this function, for example). The onclose() handler is
  155. called after the response has been received, but before the socket is
  156. actually closed.
  157. """
  158. self.send_close_frame(code, reason)
  159. frame = self.sock.recv()
  160. if frame.opcode != OPCODE_CLOSE:
  161. raise ValueError('expected CLOSE frame, got %s' % frame)
  162. self.handle_control_frame(frame)
  163. def onopen(self):
  164. """
  165. Called after the connection is initialized.
  166. """
  167. return NotImplemented
  168. def onmessage(self, message):
  169. """
  170. Called when a message is received. `message` is a Message object, which
  171. can be constructed from a single frame or multiple fragmented frames.
  172. """
  173. return NotImplemented
  174. def onping(self, payload):
  175. """
  176. Called after a PING control frame has been sent. This handler could be
  177. used to start a timeout handler for a PONG frame that is not received
  178. in time.
  179. """
  180. return NotImplemented
  181. def onpong(self, payload):
  182. """
  183. Called when a PONG control frame is received.
  184. """
  185. return NotImplemented
  186. def onclose(self, code, reason):
  187. """
  188. Called when the socket is closed by either end point.
  189. """
  190. return NotImplemented
  191. def onerror(self, e):
  192. """
  193. Handle a raised exception.
  194. """
  195. return NotImplemented