Commit 50f9fec0 authored by Taddeus Kroes's avatar Taddeus Kroes

Began work on major code refactoring.

parent 37983d6d
...@@ -11,3 +11,4 @@ ...@@ -11,3 +11,4 @@
*.nav *.nav
*.snm *.snm
src/old/* src/old/*
src/old_kivy/*
from __future__ import division
from math import atan2, pi
import time
class Positionable(object):
"""
Parent class for any object with a position.
"""
def __init__(self, x=None, y=None):
self.x = x
self.y = y
def __str__(self):
return '<%s at (%s, %s)>' % (self.__class__.__name__, self.x, self.y)
def set_position(self, x, y):
self.x = x
self.y = y
def get_position(self):
return self.x, self.y
@property
def xy(self):
"""
Shortcut getter for (x, y) position.
"""
return self.x, self.y
def distance_to(self, positionable):
"""
Calculate the Pythagorian distance from this positionable to another.
"""
x, y = positionable.get_position()
return ((x - self.x) ** 2 + (y - self.y) ** 2) ** .5
class MovingPositionable(Positionable):
"""
Parent class for positionable objects that need movement calculations. For
these calculations, the previous position is also saved.
"""
def __init__(self, x=None, y=None):
super(MovingPositionable, self).__init__(x, y)
self.px = x
self.py = y
def set_position(self, x, y):
"""
Set a new position and save the current position as the precious
position. If no previous position has been set, set is to the new
position so that the movement is zero.
"""
if self.x is None or self.y is None:
self.px = x
self.py = y
else:
self.px = self.x
self.py = self.y
Positionable.set_position(self, x, y)
def get_previous_position(self):
return self.px, self.py
def rotation_around(self, center):
"""
Calculate rotation of this positionable relative to a center
positionable.
"""
cx, cy = center.get_position()
prev_angle = atan2(self.px - cx, self.py - cy)
current_angle = atan2(self.x - cx, self.y - cy)
rotation = current_angle - prev_angle
if rotation >= pi:
return 2 * pi - rotation
if rotation <= -pi:
return -2 * pi - rotation
return rotation
class AcceleratedPositionable(MovingPositionable):
"""
Parent class for positionable objects that need acceleration calculations.
For these calculations, timestamps are saved for both the current and
previous position.
"""
def __init__(self, x=None, y=None):
super(AcceleratedPositionable, self).__init__(x, y)
self.prev_timestamp = self.current_timestamp = None
def set_position(self, x, y):
MovingPositionable.set_position(self, x, y)
if self.current_timestamp is None:
self.prev_timestamp = self.current_timestamp = time.time()
return
self.prev_timestamp = self.current_timestamp
self.current_timestamp = time.time()
class Surface(Positionable):
"""
Interface class for surfaces with a position. Defines a function
'contains', which calculates whether a position is located in the surface.
"""
def contains(self, point):
raise NotImplemented
class RectangularSurface(Surface):
"""
Rectangle, represented by a left-top position (x, y) and a size (width,
height).
"""
def __init__(self, x, y, width, height):
super(RectangularSurface, self).__init__(x, y)
self.set_size(width, height)
def __str__(self):
return '<%s at (%s, %s) size=(%s, %s)>' \
% (self.__class__.__name__, self.x, self.y, self.width,
self.height)
def set_size(self, width, height):
self.width = width
self.height = height
def get_size(self):
return self.width, self.height
def contains(self, point):
x, y = point.get_position()
return self.x <= x <= self.x + self.width \
and self.y <= y <= self.y + self.height
class CircularSurface(Surface):
"""
Circle, represented by a center position (x, y) and a radius.
"""
def __init__(self, x, y, radius):
super(CircularSurface, self).__init__(x, y)
self.set_radius(radius)
def __str__(self):
return '<%s at (%s, %s) size=(%s, %s)>' \
% (self.__class__.__name__, self.x, self.y, self.width,
self.height)
def set_radius(self, radius):
self.radius = radius
def get_radius(self):
return self.radius
def contains(self, point):
return self.distance_to(point) <= self.radius
#import time import logging
_id_len = 0
class Logger(object): class Logger(object):
def __init__(self, **kwargs): configured = False
global _id_len
self.identifier = kwargs.get('identifier', self.__class__.__name__) @staticmethod
self.verbose = kwargs.get('verbose', 0) def configure(**kwargs):
_id_len = max(_id_len, len(self.identifier)) FORMAT = kwargs.get('format', '%(levelname)s: %(message)s')
logging.basicConfig(format=FORMAT, **kwargs)
def log(self, msg, verbosity=1): Logger.configured = True
# TODO: log time
if self.verbose >= verbosity: def format_message(self, message):
print '| %s | %s' % ('%%-%ds' % _id_len % self.identifier, msg) if not Logger.configured:
Logger.configure()
return '%s: %s' % (self.__class__.__name__, message)
def debug(self, message):
logging.debug(self.format_message(message))
def info(self, message):
logging.info(self.format_message(message))
def warning(self, message):
logging.warning(self.format_message(message))
def error(self, message):
logging.error(self.format_message(message))
def critical(self, message):
logging.critical(self.format_message(message))
def log(self, level, message):
logging.critical(self.format_message('CRITICAL', message))
#import time
_id_len = 0
class Logger(object):
def __init__(self, **kwargs):
global _id_len
self.identifier = kwargs.get('identifier', self.__class__.__name__)
self.verbose = kwargs.get('verbose', 0)
_id_len = max(_id_len, len(self.identifier))
def log(self, msg, verbosity=1):
# TODO: log time
if self.verbose >= verbosity:
print '| %s | %s' % ('%%-%ds' % _id_len % self.identifier, msg)
import pygame.display
__all__ = ['screen_size']
# get screen resolution
pygame.display.init()
info = pygame.display.Info()
screen_size = info.current_w, info.current_h
pygame.display.quit()
#!/usr/bin/env python
from OSC import OSCServer
from logger import Logger
class TuioServer2D(Logger):
_tuio_address = ('localhost', 3333)
def __init__(self, handler_obj, **kwargs):
super(TuioServer2D, self).__init__(**kwargs)
for handler in ('point_down', 'point_up', 'point_move'):
if not hasattr(handler_obj, handler):
raise RuntimeError('Handler "%s" is not defined.' % handler)
# OSC server that listens to incoming TUIO events
self.server = OSCServer(self.__class__._tuio_address)
self.server.addDefaultHandlers()
self.server.addMsgHandler('/tuio/2Dobj', self._receive)
self.server.addMsgHandler('/tuio/2Dcur', self._receive)
self.server.addMsgHandler('/tuio/2Dblb', self._receive)
# List of alive seddion id's
self.alive = set()
# List of session id's of points that have generated a 'point_down'
# event
self.down = set()
self.handler_obj = handler_obj
def _receive(self, addr, tags, data, source):
surface = addr[8:]
self.log('Received message <surface=%s tags="%s" data=%s source=%s>' \
% (surface, tags, data, source), 2)
msg_type = data[0]
# FIXME: Ignore obj/blb events?
if surface != 'cur':
return
if msg_type == 'alive':
alive = set(data[1:])
released = self.alive - alive
self.alive = alive
if released:
self.log('Released %s.' % ', '.join(map(str, released)))
self.down -= released
for sid in released:
self.handler_obj.point_up(sid)
elif msg_type == 'set':
sid, x, y = data[1:4]
if sid not in self.alive:
raise ValueError('Point with sid "%d" is not alive.' % sid)
# Check if 'point_down' has already been triggered. If so, trigger
# a 'point_move' event instead
if sid in self.down:
self.log('Moved %d to (%s, %s).' % (sid, x, y))
self.handler_obj.point_move(sid, x, y)
else:
self.log('Down %d at (%s, %s).' % (sid, x, y))
self.down.add(sid)
self.handler_obj.point_down(sid, x, y)
def start(self):
try:
self.log('Starting OSC server')
self.server.serve_forever()
except SystemExit:
self.stop()
def stop(self):
self.log('Stopping OSC server')
self.server.close()
if __name__ == '__main__':
import sys
class Handler(Logger):
def point_down(self, sid, x, y):
self.log('Point down: sid=%d (%s, %s)' % (sid, x, y))
def point_up(self, sid):
self.log('Point up: sid=%d' % sid)
def point_move(self, sid, x, y):
self.log('Point move: sid=%d (%s, %s)' % (sid, x, y))
v = 1 if len(sys.argv) < 2 else int(sys.argv[1])
server = TuioServer2D(Handler(verbose=v), verbose=v)
try:
server.start()
except KeyboardInterrupt:
server.stop()
from geometry import AcceleratedPositionable
class TouchPoint(AcceleratedPositionable):
"""
Representation of an object touching the screen. The simplest form of a
touch object is a 'point', represented by an (x, y) position and a TUIO
session id (sid). All dimensions are in pixels.
"""
def __init__(self, x, y, sid):
super(TouchPoint, self).__init__(x, y)
self.sid = sid
# List of all windows this point is located in
self.windows = []
def __str__(self):
return '<%s at (%s, %s) sid=%d>' \
% (self.__class__.__name__, self.x, self.y, self.sid)
def add_window(self, window):
self.windows.append(window)
def remove_window(self, window):
self.windows.remove(window)
def update_window_trackers(self, event_type):
for window in self.windows:
window.update_trackers(event_type, self)
# TODO: Extend with more complex touch object, e.g.:
#class TouchFiducial(TouchPoint): ...
import pygame.display import pygame.display
__all__ = ['screen_size']
# get screen resolution # get screen resolution
pygame.display.init() pygame.display.init()
info = pygame.display.Info() _info = pygame.display.Info()
screen_size = info.current_w, info.current_h _w, _h = screen_size = _info.current_w, _info.current_h
pygame.display.quit() pygame.display.quit()
def pixel_coords(x, y):
"""
Translate coordinates (x, y) with 0 <= x <= 1 and 0 <= y <= 1 to pixel
coordinates using the screen size.
"""
return _w * x, _h * y
from logger import Logger
from window import FullscreenWindow
from tuio_server import TuioServer2D, TuioServerHandler
from point import TouchPoint
from screen import pixel_coords
class GestureServer(TuioServerHandler, Logger):
"""
Multi-touch gesture server. This uses a TUIO server to receive basic touch
events, which are translated to gestures using gesture trackers. Trackers
are assigned to a Window object, and gesture handlers are bound to a
tracker.
"""
def __init__(self):
# List of all connected windows
self.windows = []
# Map of point sid to TouchPoint object
self.points = {}
# Create TUIO server, to be started later
self.tuio_server = TuioServer2D(self)
def add_window(self, window):
self.windows.append(window)
def remove_window(self, window):
self.windows.remove(window)
# TODO: Remove window from touch points
def on_point_down(self, sid, x, y):
if sid in self.points:
raise ValueError('Point with sid %d already exists.' % sid)
# Translate to pixel coordinates
px, py = pixel_coords(x, y)
# Create a new touch point
self.points[sid] = point = TouchPoint(px, py, sid)
# Save the windows containing the point in a dictionary, and update
# their trackers
for window in self.windows:
if window.contains(point):
point.add_window(window)
window.update_trackers('down', point)
def on_point_move(self, sid, x, y):
if sid not in self.points:
raise KeyError('No point with sid %d exists.' % sid)
# Translate to pixel coordinates
px, py = pixel_coords(x, y)
# Update point position and corresponding window trackers
point = self.points[sid]
point.set_position(px, py)
point.update_window_trackers('move')
def on_point_up(self, sid):
if sid not in self.points:
raise KeyError('No point with sid %d exists.' % sid)
# Clear the list of windows containing the point, and update their
# trackers
self.points[sid].update_window_trackers('up')
del self.points[sid]
def start(self):
# Assert that at least one window exists, by adding a fullscreen window
# if none have been added yet
if not self.windows:
self.add_window(FullscreenWindow())
self.tuio_server.start()
def stop(self):
self.tuio_server.stop()
from logger import Logger
class GestureTracker(Logger):
"""
Abstract class for gesture tracker definitions. Contains methods for
changing the state of touch points.
"""
# Supported gesture types
__gesture_types__ = []
# Configurable properties (see configure() method)
__configurable__ = []
def __init__(self, window=None):
# Hashmap of gesture types
self.handlers = {}
if window:
window.add_tracker(self)
def bind(self, gesture_type, handler, *args, **kwargs):
"""
Bind a handler to a gesture type. Multiple handlers can be bound to a
single gesture type. Optionally, (keyword) arguments that will be
passed to the handler along with a Gesture object can be specified.
"""
if gesture_type not in self.__gesture_types__:
raise AttributeError('Unsupported gesture type "%s".' \
% gesture_type)
h = handler, args, kwargs
if gesture_type not in self.handlers:
self.handlers[gesture_type] = [h]
else:
self.handlers[gesture_type].append(h)
def trigger(self, gesture):
if gesture.__type__ not in self.handlers:
self.debug('Triggered "%s", but no handlers are bound.'
% gesture.__type__)
return
self.info('Triggered %s.' % gesture)
for handler, args, kwargs in self.handlers[gesture.__type__]:
handler(gesture, *args, **kwargs)
def is_type_bound(self, gesture_type):
return gesture_type in self.handlers
def on_point_down(self, point):
pass
def on_point_move(self, point):
pass
def on_point_up(self, point):
pass
def configure(self, **kwargs):
for name, value in kwargs.iteritems():
if name not in self.__configurable__:
raise ValueError('%s.%s is not a configurable property.'
% (self.__class__.__name__, name))
setattr(self, name, value)
class Gesture(object):
"""
Abstract class that represents a triggered gesture.
"""
pass
import time
from threading import Thread
from ..tracker import GestureTracker, Gesture
from ..geometry import Positionable
from utils import PointGesture
class TapTracker(GestureTracker):
__gesture_types__ = ['tap', 'single_tap', 'double_tap']
__configurable__ = ['tap_distance', 'tap_time', 'double_tap_time',
'double_tap_distance', 'update_rate']
def __init__(self, window=None):
super(TapTracker, self).__init__(window)
# Map of TUIO session id to tuple (timestamp, position) of point down
self.reg = {}
# Maximum radius in which a touch point can move in order to be a tap
# event in pixels
self.tap_distance = 20
# Maximum time between 'down' and 'up' of a tap event in seconds
self.tap_time = .2
# Maximum time in seconds and distance in pixels between two taps to
# count as double tap
self.double_tap_time = .3
self.double_tap_distance = 30
# Times per second to detect single taps
self.update_rate = 30
self.reset_last_tap()
self.single_tap_thread = Thread(target=self.detect_single_tap)
self.single_tap_thread.daemon = True
self.single_tap_thread.start()
def detect_single_tap(self):
"""
Iteration function for single-tap detection thread.
"""
while True:
time_diff = time.time() - self.last_tap_time
if self.last_tap and time_diff > self.double_tap_time:
# Last tap is too long ago to be a double tap, so trigger a
# single tap
self.trigger(SingleTapGesture(self.last_tap))
self.reset_last_tap()
time.sleep(1. / self.update_rate)
def reset_last_tap(self):
self.last_tap_time = 0
self.last_tap = None
def on_point_down(self, point):
x, y = point.get_position()
self.reg[point.sid] = time.time(), Positionable(x, y)
def on_point_move(self, point):
if point.sid not in self.reg:
return
# If a stationary point moves beyond a threshold, delete it so that the
# 'up' event will not trigger a 'tap'
t, initial_position = self.reg[point.sid]
if point.distance_to(initial_position) > self.tap_distance:
del self.reg[point.sid]
def on_point_up(self, point):
# Assert that the point has not been deleted by a 'move' event yet
if point.sid not in self.reg:
return
down_time = self.reg[point.sid][0]
del self.reg[point.sid]
# Only trigger a tap event if the 'up' is triggered within a certain
# time afer the 'down'
current_time = time.time()
if current_time - down_time > self.tap_time:
return
tap = TapGesture(point)
self.trigger(tap)
# Trigger double tap if the threshold has not not expired yet
if self.last_tap:
if self.last_tap.distance_to(tap) <= self.double_tap_distance:
# Close enough to be a double tap
self.trigger(DoubleTapGesture(self.last_tap))
self.reset_last_tap()
return
# Generate a seperate single tap gesture for the last tap,
# because the lat tap variable is overwritten now
self.trigger(SingleTapGesture(self.last_tap))
self.last_tap_time = current_time
self.last_tap = tap
class TapGesture(PointGesture):
"""
A tap gesture is triggered
"""
__type__ = 'tap'
class SingleTapGesture(TapGesture):
"""
A single tap gesture is triggered after a regular tap gesture, if no double
tap is triggered for that gesture.
"""
__type__ = 'single_tap'
class DoubleTapGesture(TapGesture):
"""
A double tap gesture is triggered if two sequential taps are triggered
within a certain time and distance of eachother.
"""
__type__ = 'double_tap'
from ..tracker import Gesture
from ..geometry import Positionable
class PointGesture(Positionable, Gesture):
"""
Abstract base class for positionable gestures that have the same
coordinated as a touch point.
"""
def __init__(self, point):
# Use the coordinates of the touch point
Positionable.__init__(self, *point.get_position())
#!/usr/bin/env python #!/usr/bin/env python
from OSC import OSCServer from OSC import OSCServer
OSCServer.print_tracebacks = True
from logger import Logger from logger import Logger
class TuioServer2D(Logger): class TuioServer2D(Logger):
_tuio_address = ('localhost', 3333) __tuio_address__ = 'localhost', 3333
def __init__(self, handler_obj, **kwargs):
super(TuioServer2D, self).__init__(**kwargs)
for handler in ('point_down', 'point_up', 'point_move'):
if not hasattr(handler_obj, handler):
raise RuntimeError('Handler "%s" is not defined.' % handler)
def __init__(self, handler_obj):
# OSC server that listens to incoming TUIO events # OSC server that listens to incoming TUIO events
self.server = OSCServer(self.__class__._tuio_address) self.server = OSCServer(self.__tuio_address__)
self.server.addDefaultHandlers() self.server.addDefaultHandlers()
self.server.addMsgHandler('/tuio/2Dobj', self._receive) self.server.addMsgHandler('/tuio/2Dobj', self._receive)
self.server.addMsgHandler('/tuio/2Dcur', self._receive) self.server.addMsgHandler('/tuio/2Dcur', self._receive)
...@@ -31,8 +27,8 @@ class TuioServer2D(Logger): ...@@ -31,8 +27,8 @@ class TuioServer2D(Logger):
def _receive(self, addr, tags, data, source): def _receive(self, addr, tags, data, source):
surface = addr[8:] surface = addr[8:]
self.log('Received message <surface=%s tags="%s" data=%s source=%s>' \ #self.debug('Received message <surface=%s tags="%s" '
% (surface, tags, data, source), 2) # 'data=%s source=%s>' % (surface, tags, data, source))
msg_type = data[0] msg_type = data[0]
# FIXME: Ignore obj/blb events? # FIXME: Ignore obj/blb events?
...@@ -45,11 +41,11 @@ class TuioServer2D(Logger): ...@@ -45,11 +41,11 @@ class TuioServer2D(Logger):
self.alive = alive self.alive = alive
if released: if released:
self.log('Released %s.' % ', '.join(map(str, released))) self.debug('Released %s.' % ', '.join(map(str, released)))
self.down -= released self.down -= released
for sid in released: for sid in released:
self.handler_obj.point_up(sid) self.handler_obj.on_point_up(sid)
elif msg_type == 'set': elif msg_type == 'set':
sid, x, y = data[1:4] sid, x, y = data[1:4]
...@@ -59,40 +55,71 @@ class TuioServer2D(Logger): ...@@ -59,40 +55,71 @@ class TuioServer2D(Logger):
# Check if 'point_down' has already been triggered. If so, trigger # Check if 'point_down' has already been triggered. If so, trigger
# a 'point_move' event instead # a 'point_move' event instead
if sid in self.down: if sid in self.down:
self.log('Moved %d to (%s, %s).' % (sid, x, y)) self.debug('Moved %d to (%s, %s).' % (sid, x, y))
self.handler_obj.point_move(sid, x, y) self.handler_obj.on_point_move(sid, x, y)
else: else:
self.log('Down %d at (%s, %s).' % (sid, x, y)) self.debug('Down %d at (%s, %s).' % (sid, x, y))
self.down.add(sid) self.down.add(sid)
self.handler_obj.point_down(sid, x, y) self.handler_obj.on_point_down(sid, x, y)
def start(self): def start(self):
try: try:
self.log('Starting OSC server') self.info('Starting OSC server')
self.server.serve_forever() self.server.serve_forever()
except SystemExit: except SystemExit:
self.stop() self.stop()
def stop(self): def stop(self):
self.log('Stopping OSC server') self.info('Stopping OSC server')
self.server.close() self.server.close()
class TuioServerHandler(object):
"""
Interface for touch servers. Defines point_up, point_move and point_down
handlers.
"""
def on_point_down(self, sid, x, y):
raise NotImplementedError
def on_point_move(self, sid, x, y):
raise NotImplementedError
def on_point_up(self, sid):
raise NotImplementedError
if __name__ == '__main__': if __name__ == '__main__':
import sys import argparse
import logging
parser = argparse.ArgumentParser(description='TUIO server test.')
parser.add_argument('--log', metavar='LOG_LEVEL', default='INFO',
choices=['DEBUG', 'INFO', 'WARNING'], help='Global log level.')
parser.add_argument('--logfile', metavar='FILENAME', help='Filename for '
'the log file (the log is printed to stdout by default).')
args = parser.parse_args()
# Configure logger
log_config = dict(level=getattr(logging, args.log))
if args.logfile:
log_config['filename'] = args.logfile
Logger.configure(**log_config)
class Handler(Logger): # Define handlers
def point_down(self, sid, x, y): class Handler(TuioServerHandler, Logger):
self.log('Point down: sid=%d (%s, %s)' % (sid, x, y)) def on_point_down(self, sid, x, y):
self.info('Point down: sid=%d (%s, %s)' % (sid, x, y))
def point_up(self, sid): def on_point_up(self, sid):
self.log('Point up: sid=%d' % sid) self.info('Point up: sid=%d' % sid)
def point_move(self, sid, x, y): def on_point_move(self, sid, x, y):
self.log('Point move: sid=%d (%s, %s)' % (sid, x, y)) self.info('Point move: sid=%d (%s, %s)' % (sid, x, y))
v = 1 if len(sys.argv) < 2 else int(sys.argv[1]) server = TuioServer2D(Handler())
server = TuioServer2D(Handler(verbose=v), verbose=v)
try: try:
server.start() server.start()
......
from geometry import Surface, RectangularSurface, CircularSurface
from screen import screen_size
class Window(Surface):
"""
Abstract class that represents a 2D object on the screen to which gesture
trackers can be added. Implementations of this class should define a method
that can detect wether a touch point is located within the window.
Because any type of window on the screen has (at least) an (x, y) position,
it extends the Positionable object.
Note: Dimensions used in implementations of this class should be in pixels.
"""
def __init__(self, **kwargs):
# All trackers that are currently bound to this window
self.trackers = []
# List of points that originated in this window
self.points = []
if 'server' in kwargs:
kwargs['server'].add_window(self)
def add_tracker(self, tracker):
self.trackers.append(tracker)
def remove_tracker(self, tracker):
self.trackers.remove(tracker)
def on_point_down(self, point):
self.points.append(point)
self.update_trackers('down', point)
def on_point_move(self, point):
self.update_trackers('move', point)
def on_point_up(self, point):
self.points.remove(point)
self.update_trackers('up', point)
def update_trackers(self, event_type, point):
"""
Update all gesture trackers that are bound to this window with an
added/moved/removed touch point.
"""
handler_name = 'on_point_' + event_type
for tracker in self.trackers:
if hasattr(tracker, handler_name):
getattr(tracker, handler_name)(point)
class RectangularWindow(Window, RectangularSurface):
"""
Rectangular window.
"""
def __init__(self, x, y, width, height, **kwargs):
Window.__init__(self, **kwargs)
RectangularSurface.__init__(self, x, y, width, height)
class CircularWindow(Window, CircularSurface):
"""
Circular window.
"""
def __init__(self, x, y, width, height, **kwargs):
Window.__init__(self, **kwargs)
CircularSurface.__init__(self, x, y, radius)
class FullscreenWindow(RectangularWindow):
"""
Window representation for the entire screen. This class provides an easy
way to create a single rectangular window that catches all gestures.
"""
def __init__(self, **kwargs):
super(FullscreenWindow, self).__init__(0, 0, *screen_size, **kwargs)
def __str__(self):
return '<FullscreenWindow size=(%d, %d)>' % (self.width, self.height)
def contains(self, point):
return True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment