| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- #!/usr/bin/env python
- from OSC import OSCServer
- OSCServer.print_tracebacks = True
- from logger import Logger
- class TuioServer2D(Logger):
- __tuio_address__ = 'localhost', 3333
- def __init__(self, handler_obj):
- # OSC server that listens to incoming TUIO events
- self.server = OSCServer(self.__tuio_address__)
- self.server.addDefaultHandlers()
- self.server.addMsgHandler('/tuio/2Dobj', self._receive)
- self.server.addMsgHandler('/tuio/2Dcur', self._receive)
- self.server.addMsgHandler('/tuio/2Dblb', self._receive)
- # List of alive seddion id's
- self.alive = set()
- # List of session id's of points that have generated a 'point_down'
- # event
- self.down = set()
- self.handler_obj = handler_obj
- def _receive(self, addr, tags, data, source):
- surface = addr[8:]
- #self.debug('Received message <surface=%s tags="%s" '
- # 'data=%s source=%s>' % (surface, tags, data, source))
- msg_type = data[0]
- # FIXME: Ignore obj/blb events?
- if surface != 'cur':
- return
- if msg_type == 'alive':
- alive = set(data[1:])
- released = self.alive - alive
- self.alive = alive
- if released:
- self.debug('Released %s.' % ', '.join(map(str, released)))
- self.down -= released
- for sid in released:
- self.handler_obj.on_point_up(sid)
- elif msg_type == 'set':
- sid, x, y = data[1:4]
- if sid not in self.alive:
- raise ValueError('Point with sid "%d" is not alive.' % sid)
- # Check if 'point_down' has already been triggered. If so, trigger
- # a 'point_move' event instead
- if sid in self.down:
- self.debug('Moved %d to (%s, %s).' % (sid, x, y))
- self.handler_obj.on_point_move(sid, x, y)
- else:
- self.debug('Down %d at (%s, %s).' % (sid, x, y))
- self.down.add(sid)
- self.handler_obj.on_point_down(sid, x, y)
- def run(self):
- self.server.handle_request()
- def start(self):
- self.info('Starting OSC server')
- self.server.serve_forever()
- def stop(self):
- self.info('Stopping OSC server')
- self.server.close()
- class TuioServerHandler(object):
- """
- Interface for touch servers. Defines point_up, point_move and point_down
- handlers.
- """
- def on_point_down(self, sid, x, y):
- return NotImplemented
- def on_point_move(self, sid, x, y):
- return NotImplemented
- def on_point_up(self, sid):
- return NotImplemented
- if __name__ == '__main__':
- import argparse
- import logging
- parser = argparse.ArgumentParser(description='TUIO server test.')
- parser.add_argument('--log', metavar='LOG_LEVEL', default='INFO',
- choices=['DEBUG', 'INFO', 'WARNING'], help='Global log level.')
- parser.add_argument('--logfile', metavar='FILENAME', help='Filename for '
- 'the log file (the log is printed to stdout by default).')
- args = parser.parse_args()
- # Configure logger
- log_config = dict(level=getattr(logging, args.log))
- if args.logfile:
- log_config['filename'] = args.logfile
- Logger.configure(**log_config)
- # Define handlers
- class Handler(TuioServerHandler, Logger):
- def on_point_down(self, sid, x, y):
- self.info('Point down: sid=%d (%s, %s)' % (sid, x, y))
- def on_point_up(self, sid):
- self.info('Point up: sid=%d' % sid)
- def on_point_move(self, sid, x, y):
- self.info('Point move: sid=%d (%s, %s)' % (sid, x, y))
- server = TuioServer2D(Handler())
- try:
- server.start()
- except KeyboardInterrupt:
- server.stop()
|