|
|
@@ -1,11 +1,35 @@
|
|
|
#!/usr/bin/env python
|
|
|
import time
|
|
|
-from threading import Thread
|
|
|
+import pygame.display
|
|
|
from math import atan2, pi
|
|
|
+from threading import Thread
|
|
|
|
|
|
from tuio_server import TuioServer2D
|
|
|
from logger import Logger
|
|
|
-from events import TapEvent, FlickEvent, RotateEvent, PinchEvent, PanEvent
|
|
|
+from events import BasicEvent, DownEvent, UpEvent, MoveEvent, TapEvent, \
|
|
|
+ SingleTapEvent, DoubleTapEvent, FlickEvent, RotateEvent, PinchEvent, \
|
|
|
+ PanEvent
|
|
|
+
|
|
|
+# get screen resolution
|
|
|
+pygame.display.init()
|
|
|
+info = pygame.display.Info()
|
|
|
+screen_size = info.current_w, info.current_h
|
|
|
+pygame.display.quit()
|
|
|
+
|
|
|
+# Heuristic constants
|
|
|
+# TODO: Encapsulate screen resolution in distance heuristics
|
|
|
+SUPPORTED_GESTURES = ('down', 'up', 'move', 'tap', 'single_tap', 'double_tap',
|
|
|
+ 'pan', 'flick', 'rotate', 'pinch')
|
|
|
+DOUBLE_TAP_DISTANCE = .05
|
|
|
+FLICK_VELOCITY_TRESHOLD = 20
|
|
|
+TAP_TIMEOUT = .2
|
|
|
+MAX_MULTI_DRAG_DISTANCE = .05
|
|
|
+STATIONARY_TIME = .01
|
|
|
+
|
|
|
+# Minimum distance for two coordinates to be considered different
|
|
|
+# Theoretically, this should be one pixel because that is the minimal movement
|
|
|
+# of a mouse cursor on the screen
|
|
|
+DIST_THRESHOLD = 1. / max(screen_size)
|
|
|
|
|
|
|
|
|
def distance(a, b):
|
|
|
@@ -22,13 +46,26 @@ def add(a, b):
|
|
|
return a + b
|
|
|
|
|
|
|
|
|
+# Maximum distance between touch- and release location of a tap event
|
|
|
+TAP_DISTANCE = .01
|
|
|
+
|
|
|
+# Maximum duration of a tap
|
|
|
+TAP_TIME = .300
|
|
|
+
|
|
|
+
|
|
|
class TouchPoint(object):
|
|
|
def __init__(self, sid, x, y):
|
|
|
+ self.start_time = self.update_time = time.time()
|
|
|
self.sid = sid
|
|
|
- self.px = self.x = x
|
|
|
- self.py = self.y = y
|
|
|
+ self.start_x = self.px = self.x = x
|
|
|
+ self.start_y = self.py = self.y = y
|
|
|
+
|
|
|
+ @property
|
|
|
+ def xy(self):
|
|
|
+ return self.x, self.y
|
|
|
|
|
|
def update(self, x, y):
|
|
|
+ self.update_time = time.time()
|
|
|
self.px = self.x
|
|
|
self.py = self.y
|
|
|
self.x = x
|
|
|
@@ -44,6 +81,19 @@ class TouchPoint(object):
|
|
|
self.pinch = self.old_pinch = self.distance_to(cx, cy)
|
|
|
self.angle = self.old_angle = atan2(self.y - cy, self.x - cx)
|
|
|
|
|
|
+ def rotation_around(cx, cy):
|
|
|
+ angle = atan2(cy - self.y, self.x - cx)
|
|
|
+ prev_angle = atan2(cy - self.py, self.px - cx)
|
|
|
+ da = angle - prev_angle
|
|
|
+
|
|
|
+ # Assert that angle is in [-pi, pi]
|
|
|
+ if da > pi:
|
|
|
+ da -= 2 * pi
|
|
|
+ elif da < pi:
|
|
|
+ da += 2 * pi
|
|
|
+
|
|
|
+ return da
|
|
|
+
|
|
|
def set_angle(self, angle):
|
|
|
self.old_angle = self.angle
|
|
|
self.angle = angle
|
|
|
@@ -61,75 +111,50 @@ class TouchPoint(object):
|
|
|
def dy(self):
|
|
|
return int(self.y - self.py)
|
|
|
|
|
|
+ def down_time(self):
|
|
|
+ return time.time() - self.start_time
|
|
|
|
|
|
-# Heuristic constants
|
|
|
-# TODO: Encapsulate DPI resolution in distance heuristics
|
|
|
-SUPPORTED_GESTURES = ('tap', 'pan', 'flick', 'rotate', 'pinch')
|
|
|
-DOUBLE_TAP_DISTANCE_THRESHOLD = 30
|
|
|
-FLICK_VELOCITY_TRESHOLD = 20
|
|
|
-TAP_INTERVAL = .200
|
|
|
-TAP_TIMEOUT = .200
|
|
|
-MAX_MULTI_DRAG_DISTANCE = 100
|
|
|
+ def is_tap(self):
|
|
|
+ return self.distance_to_prev() < TAP_TIME \
|
|
|
+ and self.distance_to(self.start_x, self.start_y) < TAP_DISTANCE
|
|
|
+
|
|
|
+ def movement(self):
|
|
|
+ return self.x - self.px, self.y - self.py
|
|
|
+
|
|
|
+ def is_stationary(self):
|
|
|
+ return self.distance_to_prev() < DIST_THRESHOLD
|
|
|
|
|
|
|
|
|
class MultiTouchListener(Logger):
|
|
|
- def __init__(self, update_rate=60, verbose=0, tuio_verbose=0, **kwargs):
|
|
|
+ def __init__(self, verbose=0, tuio_verbose=0, **kwargs):
|
|
|
super(MultiTouchListener, self).__init__(**kwargs)
|
|
|
self.verbose = verbose
|
|
|
self.tuio_verbose = tuio_verbose
|
|
|
- self.last_tap = 0
|
|
|
- self.update_rate = update_rate
|
|
|
- self.points_changed = False
|
|
|
+ self.last_tap_time = 0
|
|
|
self.handlers = {}
|
|
|
|
|
|
# Session id's pointing to point coordinates
|
|
|
self.points = []
|
|
|
|
|
|
- self.taps_down = []
|
|
|
- self.taps = []
|
|
|
- self.centroid = (0, 0)
|
|
|
+ # Put centroid outside screen to prevent misinterpretation
|
|
|
+ self.centroid = (-1., -1.)
|
|
|
|
|
|
- def update_centroid(self):
|
|
|
+ def update_centroid(self, moving=None):
|
|
|
self.old_centroid = self.centroid
|
|
|
- l = len(self.points)
|
|
|
|
|
|
- if not l:
|
|
|
- self.centroid = (0, 0)
|
|
|
+ if not len(self.points):
|
|
|
+ self.centroid = (-1., -1.)
|
|
|
return
|
|
|
|
|
|
- cx, cy = zip(*[(p.x, p.y) for p in self.points])
|
|
|
- self.centroid = (reduce(add, cx, 0) / l, reduce(add, cy, 0) / l)
|
|
|
-
|
|
|
- def analyze(self):
|
|
|
- self.detect_taps()
|
|
|
+ #use = filter(TouchPoint.is_stationary, self.points)
|
|
|
+ use = filter(lambda p: p != moving, self.points)
|
|
|
|
|
|
- if self.points_changed:
|
|
|
- self.update_centroid()
|
|
|
+ if not use:
|
|
|
+ use = self.points
|
|
|
|
|
|
- # Do not try to rotate or pinch while panning
|
|
|
- # This gets rid of a lot of jittery events
|
|
|
- if not self.detect_pan():
|
|
|
- self.detect_rotation_and_pinch()
|
|
|
-
|
|
|
- self.points_changed = False
|
|
|
-
|
|
|
- def detect_taps(self):
|
|
|
- if len(self.taps) == 2:
|
|
|
- if distance(*self.taps) > DOUBLE_TAP_DISTANCE_THRESHOLD:
|
|
|
- # Taps are too far away too be a double tap, add 2 separate
|
|
|
- # events
|
|
|
- self.trigger(TapEvent(*self.taps[0]))
|
|
|
- self.trigger(TapEvent(*self.taps[1]))
|
|
|
- else:
|
|
|
- # Distance is within treshold, trigger a 'double tap' event
|
|
|
- self.trigger(TapEvent(*self.taps[0], double=True))
|
|
|
-
|
|
|
- self.taps = []
|
|
|
- elif len(self.taps) == 1:
|
|
|
- # FIXME: Ignore successive single- and double taps?
|
|
|
- if time.time() - self.last_tap > TAP_TIMEOUT:
|
|
|
- self.trigger(TapEvent(*self.taps[0]))
|
|
|
- self.taps = []
|
|
|
+ l = len(use)
|
|
|
+ cx, cy = zip(*[(p.x, p.y) for p in use])
|
|
|
+ self.centroid = (reduce(add, cx, 0) / l, reduce(add, cy, 0) / l)
|
|
|
|
|
|
def detect_rotation_and_pinch(self):
|
|
|
"""
|
|
|
@@ -189,25 +214,6 @@ class MultiTouchListener(Logger):
|
|
|
|
|
|
self.trigger(PanEvent(cx, cy, dx, dy, l))
|
|
|
|
|
|
- def point_down(self, sid, x, y):
|
|
|
- if self.find_point(sid):
|
|
|
- raise ValueError('Point with session id "%s" already exists.' % sid)
|
|
|
-
|
|
|
- p = TouchPoint(sid, x, y)
|
|
|
- self.points.append(p)
|
|
|
- self.update_centroid()
|
|
|
-
|
|
|
- # Detect multi-point gestures
|
|
|
- if len(self.points) > 1:
|
|
|
- p.init_gesture_data(*self.centroid)
|
|
|
-
|
|
|
- if len(self.points) == 2:
|
|
|
- self.points[0].init_gesture_data(*self.centroid)
|
|
|
-
|
|
|
- self.taps_down.append(p)
|
|
|
- self.last_tap = time.time()
|
|
|
- self.points_changed = True
|
|
|
-
|
|
|
def find_point(self, sid, index=False):
|
|
|
for i, p in enumerate(self.points):
|
|
|
if p.sid == sid:
|
|
|
@@ -216,55 +222,82 @@ class MultiTouchListener(Logger):
|
|
|
if index:
|
|
|
return -1, None
|
|
|
|
|
|
+ def point_down(self, sid, x, y):
|
|
|
+ if self.find_point(sid):
|
|
|
+ raise ValueError('Point with session id "%d" already exists.' % sid)
|
|
|
+
|
|
|
+ p = TouchPoint(sid, x, y)
|
|
|
+ self.points.append(p)
|
|
|
+ self.update_centroid()
|
|
|
+ self.trigger(DownEvent(p))
|
|
|
+
|
|
|
def point_up(self, sid):
|
|
|
i, p = self.find_point(sid, index=True)
|
|
|
|
|
|
if not p:
|
|
|
- raise KeyError('No point with session id "%s".' % sid)
|
|
|
+ raise KeyError('No point with session id "%d".' % sid)
|
|
|
|
|
|
del self.points[i]
|
|
|
+ self.update_centroid()
|
|
|
+ self.trigger(UpEvent(p))
|
|
|
+
|
|
|
+ if p.is_tap():
|
|
|
+ # Always trigger a regular tap event, also in case of double tap
|
|
|
+ # (use the 'single_tap' event to keep single/double apart from
|
|
|
+ # eachother)
|
|
|
+ self.trigger(TapEvent(p.x, p.y))
|
|
|
+
|
|
|
+ # Detect double tap by comparing time and distance from last tap
|
|
|
+ # event
|
|
|
+ t = time.time()
|
|
|
|
|
|
- # Tap/flick detection
|
|
|
- if p in self.taps_down:
|
|
|
- # Detect if Flick based on movement
|
|
|
- if p.distance_to_prev() > FLICK_VELOCITY_TRESHOLD:
|
|
|
- self.trigger(FlickEvent(p.px, p.py, (p.dx(), p.dy())))
|
|
|
+ if t - self.last_tap_time < TAP_TIMEOUT \
|
|
|
+ and p.distance_to(*self.last_tap.xy) < DOUBLE_TAP_DISTANCE:
|
|
|
+ self.trigger(DoubleTapEvent(p.x, p.y))
|
|
|
else:
|
|
|
- if time.time() - self.last_tap < TAP_INTERVAL:
|
|
|
- # Move from taps_down to taps for us in tap detection
|
|
|
- self.taps_down.remove(p)
|
|
|
- self.taps.append((p.x, p.y))
|
|
|
+ self.last_tap = p
|
|
|
+ self.last_tap_time = t
|
|
|
|
|
|
- self.points_changed = True
|
|
|
+ # TODO: Detect flick
|
|
|
+ #elif p.is_flick():
|
|
|
+ # self.trigger(FlickEvent(p.x, p.y))
|
|
|
|
|
|
def point_move(self, sid, x, y):
|
|
|
- self.find_point(sid).update(x, y)
|
|
|
- self.points_changed = True
|
|
|
+ p = self.find_point(sid)
|
|
|
+
|
|
|
+ # Optimization: only update if the point has moved far enough
|
|
|
+ if p.distance_to(x, y) > DIST_THRESHOLD:
|
|
|
+ p.update(x, y)
|
|
|
+ self.update_centroid(moving=p)
|
|
|
+ self.trigger(MoveEvent(p))
|
|
|
+
|
|
|
+ # TODO: Detect pan
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ self.log('Stopping event loop')
|
|
|
|
|
|
- def _tuio_server(self):
|
|
|
- server = TuioServer2D(self, verbose=self.tuio_verbose)
|
|
|
- server.start()
|
|
|
+ if self.thread:
|
|
|
+ self.thread.join()
|
|
|
+ self.thread = False
|
|
|
+ else:
|
|
|
+ self.server.stop()
|
|
|
|
|
|
- def start(self):
|
|
|
+ def start(self, threaded=False):
|
|
|
"""
|
|
|
Start event loop.
|
|
|
"""
|
|
|
- self.log('Starting event loop')
|
|
|
+ if threaded:
|
|
|
+ self.thread = Thread(target=self.start, kwargs={'threaded': False})
|
|
|
+ self.thread.daemon = True
|
|
|
+ self.thread.start()
|
|
|
+ return
|
|
|
|
|
|
try:
|
|
|
- # Run TUIO message listener in a different thread
|
|
|
- #thread = Thread(target=self.__class__._tuio_server, args=(self))
|
|
|
- thread = Thread(target=self._tuio_server)
|
|
|
- thread.daemon = True
|
|
|
- thread.start()
|
|
|
-
|
|
|
- interval = 1. / self.update_rate
|
|
|
-
|
|
|
- while True:
|
|
|
- self.analyze()
|
|
|
- time.sleep(interval)
|
|
|
+ self.log('Starting event loop')
|
|
|
+ self.server = TuioServer2D(self, verbose=self.tuio_verbose)
|
|
|
+ self.server.start()
|
|
|
except KeyboardInterrupt:
|
|
|
- self.log('Stopping event loop')
|
|
|
+ self.stop()
|
|
|
|
|
|
def bind(self, gesture, handler):
|
|
|
if gesture not in SUPPORTED_GESTURES:
|
|
|
@@ -276,18 +309,26 @@ class MultiTouchListener(Logger):
|
|
|
self.handlers[gesture].append(handler)
|
|
|
|
|
|
def trigger(self, event):
|
|
|
- if event.gesture in self.handlers:
|
|
|
- h = self.handlers[event.gesture]
|
|
|
- self.log('Event triggered: "%s" (%d handlers)' % (event, len(h)))
|
|
|
+ if event.__class__._name in self.handlers:
|
|
|
+ h = self.handlers[event.__class__._name]
|
|
|
+ self.log('Event triggered: "%s" (%d handlers)' % (event, len(h)),
|
|
|
+ 1 + int(isinstance(event, BasicEvent)))
|
|
|
|
|
|
for handler in h:
|
|
|
handler(event)
|
|
|
|
|
|
+ def centroid_movement(self):
|
|
|
+ cx, cy = self.centroid
|
|
|
+ ocx, ocy = self.old_centroid
|
|
|
+
|
|
|
+ return cx - ocx, cy - ocy
|
|
|
+
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
def tap(event):
|
|
|
print 'tap:', event
|
|
|
|
|
|
- loop = MultiTouchListener(verbose=2, tuio_verbose=1)
|
|
|
+ loop = MultiTouchListener(verbose=1, tuio_verbose=0)
|
|
|
loop.bind('tap', tap)
|
|
|
+ loop.bind('double_tap', tap)
|
|
|
loop.start()
|