server.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. import socket
  2. import logging
  3. import time
  4. from traceback import format_exc
  5. from threading import Thread
  6. from ssl import SSLError
  7. from websocket import websocket
  8. from connection import Connection
  9. from errors import HandshakeError
  10. class Server(object):
  11. """
  12. Websocket server, manages multiple client connections.
  13. Example usage:
  14. >>> import wspy
  15. >>> class EchoServer(wspy.Server):
  16. >>> def onopen(self, client):
  17. >>> print 'Client %s connected' % client
  18. >>> def onmessage(self, client, message):
  19. >>> print 'Received message "%s"' % message.payload
  20. >>> client.send(wspy.TextMessage(message.payload))
  21. >>> def onclose(self, client, code, reason):
  22. >>> print 'Client %s disconnected' % client
  23. >>> EchoServer(('', 8000)).run()
  24. """
  25. def __init__(self, address, loglevel=logging.INFO, ssl_args=None,
  26. max_join_time=2.0, backlog_size=32, **kwargs):
  27. """
  28. Constructor for a simple web socket server.
  29. `address` is a (hostname, port) tuple to bind the web socket to.
  30. `loglevel` values should be imported from the logging module.
  31. logging.INFO only shows server start/stop messages, logging.DEBUG shows
  32. clients (dis)connecting and messages being sent/received.
  33. `protocols` and `extensions` are passed directly to the websocket
  34. constructor.
  35. `ssl_args` is a dictionary with arguments for `websocket.enable_ssl`
  36. (and thus to ssl.wrap_socket). If omitted, the server is not
  37. SSL-enabled. If specified, at least the dictionary keys "keyfile" and
  38. "certfile" must be present because these are required arguments for
  39. `websocket.enable_ssl` for a server socket.
  40. `max_join_time` is the maximum time (in seconds) to wait for client
  41. responses after sending CLOSE frames, it defaults to 2 seconds.
  42. `backlog_size` is directly passed to `websocket.listen`.
  43. """
  44. logging.basicConfig(level=loglevel,
  45. format='%(asctime)s: %(levelname)s: %(message)s',
  46. datefmt='%H:%M:%S')
  47. scheme = 'wss' if ssl_args else 'ws'
  48. hostname, port = address
  49. logging.info('Starting server at %s://%s:%d', scheme, hostname, port)
  50. self.sock = websocket(**kwargs)
  51. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  52. if ssl_args:
  53. self.sock.enable_ssl(server_side=True, **ssl_args)
  54. self.sock.bind(address)
  55. self.sock.listen(backlog_size)
  56. self.max_join_time = max_join_time
  57. def run(self):
  58. self.clients = []
  59. self.client_threads = []
  60. while True:
  61. try:
  62. sock, address = self.sock.accept()
  63. client = Client(self, sock)
  64. self.clients.append(client)
  65. logging.debug('Registered client %s', client)
  66. thread = Thread(target=client.receive_forever)
  67. thread.daemon = True
  68. thread.start()
  69. self.client_threads.append(thread)
  70. except SSLError as e:
  71. logging.error('SSL error: %s', e)
  72. except HandshakeError as e:
  73. logging.error('Invalid request: %s', e.message)
  74. except KeyboardInterrupt:
  75. logging.info('Received interrupt, stopping server...')
  76. break
  77. except Exception as e:
  78. logging.error(format_exc(e))
  79. self.quit_gracefully()
  80. def quit_gracefully(self):
  81. # Send a CLOSE frame so that the client connection will receive a
  82. # response CLOSE frame
  83. for client in self.clients:
  84. client.send_close_frame()
  85. # Wait for the CLOSE frames to be received. Wait for all threads in one
  86. # loop instead of joining separately, so that timeouts are not
  87. # propagated
  88. start_time = time.time()
  89. while time.time() - start_time <= self.max_join_time \
  90. and any(t.is_alive() for t in self.client_threads):
  91. time.sleep(0.050)
  92. # Close remaining sockets, this will trigger a socket.error in the
  93. # receive_forever() thread, causing the Connection.onclose() handler to
  94. # be invoked
  95. for client in self.clients:
  96. try:
  97. client.sock.close()
  98. except socket.error:
  99. pass
  100. # Wait for the onclose() handlers to finish
  101. for thread in self.client_threads:
  102. thread.join()
  103. def remove_client(self, client, code, reason):
  104. self.clients.remove(client)
  105. self.onclose(client, code, reason)
  106. def onopen(self, client):
  107. return NotImplemented
  108. def onmessage(self, client, message):
  109. return NotImplemented
  110. def onping(self, client, payload):
  111. return NotImplemented
  112. def onpong(self, client, payload):
  113. return NotImplemented
  114. def onclose(self, client, code, reason):
  115. return NotImplemented
  116. def onerror(self, client, e):
  117. return NotImplemented
  118. class Client(Connection):
  119. def __init__(self, server, sock):
  120. self.server = server
  121. super(Client, self).__init__(sock)
  122. def __str__(self):
  123. try:
  124. return '<Client at %s:%d>' % self.sock.getpeername()
  125. except socket.error:
  126. return '<Client on closed socket>'
  127. def send(self, message, fragment_size=None, mask=False):
  128. logging.debug('Sending %s to %s', message, self)
  129. Connection.send(self, message, fragment_size=fragment_size, mask=mask)
  130. def onopen(self):
  131. logging.debug('Opened socket to %s', self)
  132. self.server.onopen(self)
  133. def onmessage(self, message):
  134. logging.debug('Received %s from %s', message, self)
  135. self.server.onmessage(self, message)
  136. def onping(self, payload):
  137. logging.debug('Sent ping "%s" to %s', payload, self)
  138. self.server.onping(self, payload)
  139. def onpong(self, payload):
  140. logging.debug('Received pong "%s" from %s', payload, self)
  141. self.server.onpong(self, payload)
  142. def onclose(self, code, reason):
  143. msg = 'Closed socket to %s' % self
  144. if code is not None:
  145. msg += ': [%d] %s' % (code, reason)
  146. logging.debug(msg)
  147. self.server.remove_client(self, code, reason)
  148. def onerror(self, e):
  149. logging.error(format_exc(e))
  150. self.server.onerror(self, e)
  151. if __name__ == '__main__':
  152. import sys
  153. port = int(sys.argv[1]) if len(sys.argv) > 1 else 8000
  154. Server(('', port), loglevel=logging.DEBUG).run()