#!/usr/bin/env python from OSC import OSCServer OSCServer.print_tracebacks = True from logger import Logger class TuioServer2D(Logger): __tuio_address__ = 'localhost', 3333 def __init__(self, handler_obj): # OSC server that listens to incoming TUIO events self.server = OSCServer(self.__tuio_address__) self.server.addDefaultHandlers() self.server.addMsgHandler('/tuio/2Dobj', self._receive) self.server.addMsgHandler('/tuio/2Dcur', self._receive) self.server.addMsgHandler('/tuio/2Dblb', self._receive) # List of alive seddion id's self.alive = set() # List of session id's of points that have generated a 'point_down' # event self.down = set() self.handler_obj = handler_obj def _receive(self, addr, tags, data, source): surface = addr[8:] #self.debug('Received message ' % (surface, tags, data, source)) msg_type = data[0] # FIXME: Ignore obj/blb events? if surface != 'cur': return if msg_type == 'alive': alive = set(data[1:]) released = self.alive - alive self.alive = alive if released: self.debug('Released %s.' % ', '.join(map(str, released))) self.down -= released for sid in released: self.handler_obj.on_point_up(sid) elif msg_type == 'set': sid, x, y = data[1:4] if sid not in self.alive: raise ValueError('Point with sid "%d" is not alive.' % sid) # Check if 'point_down' has already been triggered. If so, trigger # a 'point_move' event instead if sid in self.down: self.debug('Moved %d to (%s, %s).' % (sid, x, y)) self.handler_obj.on_point_move(sid, x, y) else: self.debug('Down %d at (%s, %s).' % (sid, x, y)) self.down.add(sid) self.handler_obj.on_point_down(sid, x, y) def start(self): try: self.info('Starting OSC server') self.server.serve_forever() except SystemExit: self.stop() def stop(self): self.info('Stopping OSC server') self.server.close() class TuioServerHandler(object): """ Interface for touch servers. Defines point_up, point_move and point_down handlers. """ def on_point_down(self, sid, x, y): return NotImplemented def on_point_move(self, sid, x, y): return NotImplemented def on_point_up(self, sid): return NotImplemented if __name__ == '__main__': import argparse import logging parser = argparse.ArgumentParser(description='TUIO server test.') parser.add_argument('--log', metavar='LOG_LEVEL', default='INFO', choices=['DEBUG', 'INFO', 'WARNING'], help='Global log level.') parser.add_argument('--logfile', metavar='FILENAME', help='Filename for ' 'the log file (the log is printed to stdout by default).') args = parser.parse_args() # Configure logger log_config = dict(level=getattr(logging, args.log)) if args.logfile: log_config['filename'] = args.logfile Logger.configure(**log_config) # Define handlers class Handler(TuioServerHandler, Logger): def on_point_down(self, sid, x, y): self.info('Point down: sid=%d (%s, %s)' % (sid, x, y)) def on_point_up(self, sid): self.info('Point up: sid=%d' % sid) def on_point_move(self, sid, x, y): self.info('Point move: sid=%d (%s, %s)' % (sid, x, y)) server = TuioServer2D(Handler()) try: server.start() except KeyboardInterrupt: server.stop()