Source code for opti_ssr.bridges

"""
A python module that provides tools for position and orientation tracking
inside the SoundScape Renderer (SSR) using the OptiTrack optical tracking system.

This module contains the abstract class _Bridge and its subclasses.
_Bridge is a thread class that includes the actual sequence, whereas the subclasses implement
different applications of a connection between the SSR and a tracking system.
In any subclass of _Bridge the functions to receive and send data need to be defined
according to the desired application.
"""

from __future__ import print_function
import sys
import threading
import socket
from time import sleep
from abc import ABCMeta, abstractmethod # for abstract classes and methods
import numpy as np

from .opti_client import Quaternion

[docs]class _Bridge(threading.Thread): """An abstract class which implements a threading approach to receive and send data. To implement the functionality to send and receive the desired data, subclasses need to define the functions _receive and _send. .. note:: The returns of _receive have to be the input of _send. """ # Python2 compatible way to declare an abstract class __metaclass__ = ABCMeta def __init__( self, optitrack, ssr, data_limit=500, timeout=0.01, *args, **kwargs): # call contructor of super class (threading.Thread) super(_Bridge, self).__init__(*args, **kwargs) # event triggered to stop execution self._quit = threading.Event() # interfaces self._optitrack = optitrack self._ssr = ssr # storing older data self._data = [] # the data buffer itself self._data_limit = data_limit # maximum number of entries self._data_lock = threading.Lock() # mutex to block access self._data_available = threading.Event() # event for new data # timeout self._timeout = timeout # timeout in seconds
[docs] def get_last_data(self, num=None): """Returns a list of data received from the OptiTrack system.""" if not num: num = self._data_limit return self._data[-num:]
[docs] def clear_data(self): """Clears buffer""" if self._data_available.is_set(): with self._data_lock: # lock the mutex self._data = [] self._data_available.clear()
[docs] def run(self): while not self._quit.is_set(): try: packet = self._receive() except socket.error: # thrown if not packet has arrived sleep(self._timeout) except (KeyboardInterrupt, SystemExit): self._quit.set() else: # save data with self._data_lock: self._data.append(packet) self._data = self._data[-self._data_limit:] self._data_available.set() # send data self._send(packet)
def stop(self): self._quit.set() # fire event to stop execution @abstractmethod def _receive(self): return @abstractmethod def _send(self, packet): return
[docs]class HeadTracker(_Bridge): """ A class for using the OptiTrack system as a head tracker for the SSR. Attributes ---------- optitrack : class object Object of class OptiTrackClient. ssr : class object Object of class SSRClient. rb_id : int, optional ID of the rigid body to receive data from. angle : int, optional angle which is used for head rotation * +1 - positive yaw * -1 - negative yaw * +2 - positive pitch * -2 - negative pitch * +3 - positive roll * -3 - negative roll """ def __init__(self, optitrack, ssr, rb_id=0, angle=1, *args, **kwargs): # call contructor of super class (_Bridge) super(HeadTracker, self).__init__(optitrack, ssr, *args, **kwargs) # selects which rigid body from OptiTrack is the head tracker self._rb_id = rb_id self._angle = angle # origin and orientation of world coordinate system self._origin = np.array((0, 0, 0)) self._orientation = Quaternion(1, 0, 0, 0)
[docs] def calibrate(self): """ Use current position and orientation of head tracker to set the origin and orientation of the world coordinate system. """ self._origin, self._orientation, _ = self._optitrack.get_rigid_body(self._rb_id)
def _receive(self): self._ssr.recv_ssr_returns() pos, ori, time_data = self._optitrack.get_rigid_body(self._rb_id) # apply coordinate transform pos = pos - self._origin ori = self._orientation.conjugate * ori # not commutative return pos, ori.yaw_pitch_roll, time_data def _send(self, data): _, ypr, _ = data # (pos, ypr, time_data) alpha = np.sign(self._angle)*ypr[np.absolute(self._angle)-1]*180/np.pi+90 self._ssr.set_ref_orientation(alpha)
[docs]class LocalWFS(_Bridge): """ A class for using the OptiTrack system to track the listener position in the SSR for local sound field synthesis. The first SSR instance (ssr) shifts the reference position and the reference offset position of the real reproduction setup, such that it emulates the movement a virtual source array placed around the listener. The second SSR instance (ssr_virt_repr) shifts the reference position of aforementioned point sources as the virtual reproduction setup in relation to the real sources based on audio files. Attributes ---------- optitrack : class object Object of the class OptiTrackClient. ssr : class object First SSR instance as object of class SSRClient. ssr_virt_repr : class object Second SSR instance as object of the class SSRClient. rb_id : int, optional ID of the rigid body to receive data from. """ def __init__(self, optitrack, ssr, ssr_virt_repr, rb_id=0, *args, **kwargs): # call contructor of super class super(LocalWFS, self).__init__(optitrack, ssr, *args, **kwargs) # selects which rigid body from OptiTrack is the tracker self._rb_id = rb_id # second ssr instance feat. virtual sources as reproduction setup self._ssr_virt_repr = ssr_virt_repr # reference orientation of this instance has to be changed once # in order to compensate for the SSRs standard reference orientation shift self._ssr_virt_repr.set_ref_orientation(0) # self._create_virtual_sources()
[docs] def _create_virtual_sources(self): """Create a specified amount of new sources via network connection to the SSR. """ for src_id in range(1, self._N+1): self._ssr.src_creation(src_id)
[docs] def _receive(self): """Get position data of rigid body from OptiTrack system Returns ------- center : list Rigid body position data. Consists of x, y, z coordinates of Motive`s coordinate system. """ self._ssr.recv_ssr_returns() self._ssr_virt_repr.recv_ssr_returns() center, _, _ = self._optitrack.get_rigid_body() return center
[docs] def _send(self, center): """Send reference position data to both SSR instance. """ self._ssr.set_ref_position(-center[0], -center[1]) self._ssr.set_ref_offset_position(center[0], center[1]) self._ssr_virt_repr.set_ref_position(center[0], center[1])