connection.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import struct
  2. import socket
  3. from frame import ControlFrame, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG, \
  4. OPCODE_CONTINUATION
  5. from message import create_message
  6. from errors import SocketClosed, PingError
  7. class Connection(object):
  8. """
  9. A `Connection` uses a `websocket` instance to send and receive (optionally
  10. fragmented) messages, which are `Message` instances. Control frames are
  11. handled automatically in the way specified by RFC 6455.
  12. To use the `Connection` class, it should be extended and the extending
  13. class should implement the on*() event handlers.
  14. Example of an echo server (sends back what it receives):
  15. >>> import wspy
  16. >>> class EchoConnection(wspy.Connection):
  17. >>> def onopen(self):
  18. >>> print 'Connection opened at %s:%d' % self.sock.getpeername()
  19. >>> def onmessage(self, message):
  20. >>> print 'Received message "%s"' % message.payload
  21. >>> self.send(wspy.TextMessage(message.payload))
  22. >>> def onclose(self, message):
  23. >>> print 'Connection closed'
  24. >>> server = wspy.websocket()
  25. >>> server.bind(('', 8000))
  26. >>> server.listen()
  27. >>> while True:
  28. >>> client, addr = server.accept()
  29. >>> EchoConnection(client).receive_forever()
  30. """
  31. def __init__(self, sock):
  32. """
  33. `sock` is a websocket instance which has completed its handshake.
  34. """
  35. self.sock = sock
  36. self.close_frame_sent = False
  37. self.close_frame_received = False
  38. self.ping_sent = False
  39. self.ping_payload = None
  40. self.hooks_send = []
  41. self.hooks_recv = []
  42. self.onopen()
  43. def send(self, message, fragment_size=None, mask=False):
  44. """
  45. Send a message. If `fragment_size` is specified, the message is
  46. fragmented into multiple frames whose payload size does not extend
  47. `fragment_size`.
  48. """
  49. for hook in self.hooks_send:
  50. message = hook(message)
  51. if fragment_size is None:
  52. self.sock.send(message.frame(mask=mask))
  53. else:
  54. self.sock.send(*message.fragment(fragment_size, mask=mask))
  55. def recv(self):
  56. """
  57. Receive a message. A message may consist of multiple (ordered) data
  58. frames. A control frame may be delivered at any time, also when
  59. expecting the next continuation frame of a fragmented message. These
  60. control frames are handled immediately by handle_control_frame().
  61. """
  62. fragments = []
  63. while not len(fragments) or not fragments[-1].final:
  64. frame = self.sock.recv()
  65. if isinstance(frame, ControlFrame):
  66. self.handle_control_frame(frame)
  67. elif len(fragments) and frame.opcode != OPCODE_CONTINUATION:
  68. raise ValueError('expected continuation/control frame, got %s '
  69. 'instead' % frame)
  70. else:
  71. fragments.append(frame)
  72. payload = bytearray()
  73. for f in fragments:
  74. payload += f.payload
  75. message = create_message(fragments[0].opcode, payload)
  76. for hook in self.hooks_recv:
  77. message = hook(message)
  78. return message
  79. def handle_control_frame(self, frame):
  80. """
  81. Handle a control frame as defined by RFC 6455.
  82. """
  83. if frame.opcode == OPCODE_CLOSE:
  84. # Close the connection from this end as well
  85. self.close_frame_received = True
  86. code, reason = frame.unpack_close()
  87. # No more receiving data after a close message
  88. raise SocketClosed(code, reason)
  89. elif frame.opcode == OPCODE_PING:
  90. # Respond with a pong message with identical payload
  91. self.sock.send(ControlFrame(OPCODE_PONG, frame.payload))
  92. elif frame.opcode == OPCODE_PONG:
  93. # Assert that the PONG payload is identical to that of the PING
  94. if not self.ping_sent:
  95. raise PingError('received PONG while no PING was sent')
  96. self.ping_sent = False
  97. if frame.payload != self.ping_payload:
  98. raise PingError('received PONG with invalid payload')
  99. self.ping_payload = None
  100. self.onpong(frame.payload)
  101. def receive_forever(self):
  102. """
  103. Receive and handle messages in an endless loop. A message may consist
  104. of multiple data frames, but this is not visible for onmessage().
  105. Control messages (or control frames) are handled automatically.
  106. """
  107. while True:
  108. try:
  109. self.onmessage(self.recv())
  110. except SocketClosed as e:
  111. self.close(e.code, e.reason)
  112. break
  113. except socket.error as e:
  114. self.onerror(e)
  115. try:
  116. self.sock.close()
  117. except socket.error:
  118. pass
  119. self.onclose(None, '')
  120. break
  121. except Exception as e:
  122. self.onerror(e)
  123. def send_ping(self, payload=''):
  124. """
  125. Send a PING control frame with an optional payload.
  126. """
  127. self.sock.send(ControlFrame(OPCODE_PING, payload))
  128. self.ping_payload = payload
  129. self.ping_sent = True
  130. self.onping(payload)
  131. def send_close_frame(self, code=None, reason=''):
  132. """
  133. Send a CLOSE control frame.
  134. """
  135. payload = '' if code is None else struct.pack('!H', code) + reason
  136. self.sock.send(ControlFrame(OPCODE_CLOSE, payload))
  137. self.close_frame_sent = True
  138. def close(self, code=None, reason=''):
  139. """
  140. Close the socket by sending a CLOSE frame and waiting for a response
  141. close message, unless such a message has already been received earlier
  142. (prior to calling this function, for example). The onclose() handler is
  143. called after the response has been received, but before the socket is
  144. actually closed.
  145. """
  146. # Send CLOSE frame
  147. if not self.close_frame_sent:
  148. self.send_close_frame(code, reason)
  149. # Receive CLOSE frame
  150. if not self.close_frame_received:
  151. frame = self.sock.recv()
  152. if frame.opcode != OPCODE_CLOSE:
  153. raise ValueError('expected CLOSE frame, got %s' % frame)
  154. self.close_frame_received = True
  155. res_code, res_reason = frame.unpack_close()
  156. # FIXME: check if res_code == code and res_reason == reason?
  157. # FIXME: alternatively, keep receiving frames in a loop until a
  158. # CLOSE frame is received, so that a fragmented chain may arrive
  159. # fully first
  160. self.onclose(code, reason)
  161. self.sock.close()
  162. def add_hook(self, send=None, recv=None, prepend=False):
  163. """
  164. Add a pair of send and receive hooks that are called for each frame
  165. that is sent or received. A hook is a function that receives a single
  166. argument - a Message instance - and returns a `Message` instance as
  167. well.
  168. `prepend` is a flag indicating whether the send hook is prepended to
  169. the other send hooks.
  170. For example, to add an automatic JSON conversion to messages and
  171. eliminate the need to contruct TextMessage instances to all messages:
  172. >>> import wspy, json
  173. >>> conn = Connection(...)
  174. >>> conn.add_hook(lambda data: tswpy.TextMessage(json.dumps(data)),
  175. >>> lambda message: json.loads(message.payload))
  176. >>> conn.send({'foo': 'bar'}) # Sends text message {"foo":"bar"}
  177. >>> conn.recv() # May be dict(foo='bar')
  178. Note that here `prepend=True`, so that data passed to `send()` is first
  179. encoded and then packed into a frame. Of course, one could also decide
  180. to add the base64 hook first, or to return a new `Frame` instance with
  181. base64-encoded data.
  182. """
  183. if send:
  184. self.hooks_send.insert(0 if prepend else -1, send)
  185. if recv:
  186. self.hooks_recv.insert(-1 if prepend else 0, recv)
  187. def onopen(self):
  188. """
  189. Called after the connection is initialized.
  190. """
  191. return NotImplemented
  192. def onmessage(self, message):
  193. """
  194. Called when a message is received. `message` is a Message object, which
  195. can be constructed from a single frame or multiple fragmented frames.
  196. """
  197. return NotImplemented
  198. def onping(self, payload):
  199. """
  200. Called after a PING control frame has been sent. This handler could be
  201. used to start a timeout handler for a PONG frame that is not received
  202. in time.
  203. """
  204. return NotImplemented
  205. def onpong(self, payload):
  206. """
  207. Called when a PONG control frame is received.
  208. """
  209. return NotImplemented
  210. def onclose(self, code, reason):
  211. """
  212. Called when the socket is closed by either end point.
  213. """
  214. return NotImplemented
  215. def onerror(self, e):
  216. """
  217. Handle a raised exception.
  218. """
  219. return NotImplemented