Source code for imhr.r33._classify

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
| @purpose: Classification of eyetracking data for imhr.r33.procesing.   
| @date: Created on Sat May 1 15:12:38 2019   
| @author: Semeon Risom   
| @email: semeon.risom@gmail.com   
| @url: https://semeon.io/d/R33-analysis  
"""

# available functions
__all__ = ['Classify']

# required external library
__required__ = ['numpy','pandas','nslr']

from pdb import set_trace as breakpoint
import numpy as np
import pandas as pd

# local libraries
from .. import settings

[docs]class Classify(): """Analysis methods for imhr.processing.preprocesing.""" def __init__(self, isLibrary=False): """Analysis methods for imhr.processing.preprocesing. Parameters ---------- isLibrary : :obj:`bool` Check if required libraries are available. Default `False`. """ #check libraries if isLibrary: settings.library(__required__)
[docs] @classmethod def VisualAngle(cls, g_x, g_y, config): """ Convert pixel eye-coordinates to visual angle. Parameters ---------- g_x,g_y : :class:`numpy.ndarray` List of gaze coordinates. drift : :obj:`dict` Counter of drift correct runs. config : :class:`dict` Configuration data for data analysis. i.e. trial number, location. Notes ----- * Stimulus positions (g_x,g_y) are defined in x and y pixel units, with the origin (0,0) being at the **center** of the display, as to match the PsychoPy pix unit coord type. * The pix2deg method is vectorized, meaning that is will perform the pixel to angle calculations on all elements of the provided pixel position numpy arrays in one numpy call. * The convertion process can use either a fixed eye to calibration plane distance, or a numpy array of eye distances passed as eye_distance_mm. In this case the eye distance array must be the same length as g_x, g_y arrays. """ d_x=config['monitor.cm'][0] d_y=config['monitor.cm'][1] r_x=config['resolution.px'][0] r_y=config['resolution.px'][1] dist=config['distance'] mmpp_x = d_x/r_x mmpp_y = d_y/r_y x_mm = mmpp_x * g_x y_mm = mmpp_y * g_y breakpoint() #TODO! Ah = np.arctan2(x_mm, np.hypot(dist, y_mm)) Av = np.arctan2(y_mm, np.hypot(dist, x_mm)) return np.rad2deg(Ah), np.rad2deg(Av)
[docs] @classmethod def Velocity(cls, time, config, d_x, d_y=None): """ Calculate the instantaneous velocity (degrees / second) for data points in d_x and (optionally) d_y, using the time numpy array for time delta information. Parameters ---------- time : :class:`numpy.ndarray` Timestamp of each coordinate. d_x,d_y : :class:`numpy.ndarray` List of gaze coordinates. config : :class:`dict` Configuration data for data analysis. i.e. trial number, location. Notes ----- Numpy arrays time, d_x, and d_y must all be 1D arrays of the same length. If both d_x and d_y are provided, then the euclidian distance between each set of points is calculated and used in the velocity calculation. Time must be in seconds.msec units, while d_x and d_y are expected to be in visual degrees. If the position traces are in pixel coordinate space, use the VisualAngleCalc class to convert the data into degrees. """ if d_y is None: data=d_x else: data=np.sqrt(d_x*d_x+d_y*d_y) velocity_between = (data[1:]-data[:-1])/(time[1:]-time[:-1]) velocity = (velocity_between[1:]+velocity_between[:-1])/2.0 return velocity
[docs] @classmethod def Acceleration(cls, time, data_x, data_y=None): """ Calculate the acceleration (deg/sec/sec) for data points in d_x and (optionally) d_y, using the time numpy array for time delta information. Parameters ---------- time : :class:`numpy.ndarray` Timestamp of each coordinate. d_x,d_y : :class:`numpy.ndarray` List of gaze coordinates. config : :class:`dict` Configuration data for data analysis. i.e. trial number, location. """ velocity=Velocity(time,data_x,data_y) accel = Velocity(time[1:-1],velocity) return accel
[docs] @classmethod def savitzky_golay(cls, y, window_size, order, deriv=0, rate=1): """ Smooth (and optionally differentiate) data with a Savitzky-Golay filter. The Savitzky-Golay filter removes high frequency noise from data. It has the advantage of preserving the original shape and features of the signal better than other types of filtering approaches, such as moving averages techniques. Parameters ---------- y : :class:`numpy.ndarray`, shape (N,) the values of the time history of the signal. window_size : :obj:`int` the length of the window. Must be an odd integer number. order : :obj:`int` the order of the polynomial used in the filtering. Must be less then `window_size` - 1. deriv : :obj:`int` the order of the derivative to compute (default = 0 means only smoothing) Returns ------- ys : :class:`numpy.ndarray`, shape (N) the smoothed signal (or it's n-th derivative). Notes ----- The Savitzky-Golay is a type of low-pass filter, particularly suited for smoothing noisy data. The main idea behind this approach is to make for each point a least-square fit with a polynomial of high order over a odd-sized window centered at the point. For more information, see: http://wiki.scipy.org/Cookbook/SavitzkyGolay. Examples -------- >>> t = np.linspace(-4, 4, 500) >>> y = np.exp( -t**2 ) + np.random.normal(0, 0.05, t.shape) >>> ysg = savitzky_golay(y, window_size=31, order=4) >>> import matplotlib.pyplot as plt >>> plt.plot(t, y, label='Noisy signal') >>> plt.plot(t, np.exp(-t**2), 'k', lw=1.5, label='Original signal') >>> plt.plot(t, ysg, 'r', label='Filtered signal') >>> plt.legend() >>> plt.show() References ---------- .. [1] A. Savitzky, Golay, M. (1964). Smoothing and Differentiation of Data by Simplified Least Squares Procedures. Analytical Chemistry. 36(8), pp 1627-1639. .. [2] S.A. Teukolsky, W.T. Vetterling, B.P. Flannery Numerical Recipes 3rd Edition: The Art of Scientific Computing. W.H. Press,Cambridge University Press ISBN-13: 9780521880688. """ import numpy as np from math import factorial try: window_size = np.abs(np.int(window_size)) order = np.abs(np.int(order)) except ValueError: raise ValueError("window_size and order have to be of type int") if window_size % 2 != 1 or window_size < 1: raise TypeError("window_size size must be a positive odd number") if window_size < order + 2: raise TypeError("window_size is too small for the polynomials order") order_range = range(order+1) half_window = (window_size -1) // 2 # precompute coefficients b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)]) m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv) # pad the signal at the extremes with # values taken from the signal itself firstvals = y[0] - np.abs(y[1:half_window+1][::-1] - y[0]) #breakpoint() #TODO! if isinstance(y, pd.core.series.Series): y_m1 = y.iloc[-1] else: y_m1 = y[-1] lastvals = y_m1 + np.abs(y[-half_window-1:-1][::-1] - y_m1) y = np.concatenate((firstvals, y, lastvals)) return np.convolve( m[::-1], y, mode='valid')
[docs] @classmethod def ivt(cls,data, v_threshold, config): """ Identification with Velocity Threshold. In the I-VT model, the velocity value is computed for every eye position sample. The velocity value is then compared to the threshold. If the sampled velocity is less than the threshold, the corresponding eye-position sample is marked as part of a fixation, otherwise it is marked as a part of a saccade. Parameters ---------- data : :class:`numpy.ndarray`, shape (N,) the smoothed signal (or it's n-th derivative). v_threshold : :obj:`str` Velocity threshold in pix/sec. config : :class:`dict` Configuration data for data analysis. i.e. trial number, location. Returns ------- ys : :class:`numpy.ndarray`, shape (N,) the smoothed signal (or it's n-th derivative). Notes ----- From https://github.com/ecekt/eyegaze. Formula from: https://dl.acm.org/citation.cfm?id=355028 """ l_time = data['timestamp'] ts = [] for t in l_time: ts.append(float(t)/1000.0) Xs = data['x'] Ys = data['y'] difX = [] difY = [] tdif = [] for i in range(len(data) - 1): difX.append(float(Xs[i+1]) - float(Xs[i])) difY.append(float(Ys[i+1]) - float(Ys[i])) tdif.append(float(l_time[i+1]) - float(l_time[i])) dif = np.sqrt(np.power(difX,2) + np.power(difY,2)) #in pix #print (dif) velocity = dif / tdif #print velocity in pix/sec #print tdif mvmts = [] #length is len(data)-1 for v in velocity: if (v < v_threshold): #fixation mvmts.append(1) #print v, v_threshold else: mvmts.append(0) fixations = [] fs = [] for m in range(len(mvmts)): if(mvmts[m] == 0): if(len(fs) > 0): fixations.append(fs) fs = [] else: fs.append(m) if(len(fs) > 0): fixations.append(fs) #print fixations centroidsX = [] centroidsY = [] time0 = [] time1 = [] for f in fixations: cX = 0 cY = 0 if(len(f) == 1): i = f[0] cX = (float(data['x'][i]) + float(data['x'][i+1]))/2.0 cY = (float(data['y'][i]) + float(data['y'][i+1]))/2.0 t0 = float(data['timestamp'][i]) t1 = float(data['timestamp'][i+1]) else: t0 = float(data['timestamp'][f[0]]) t1 = float(data['timestamp'][f[len(f)-1]+1]) for e in range(len(f)): cX += float(data['x'][f[e]]) cY += float(data['y'][f[e]]) cX += float(data['x'][f[len(f)-1]+1]) cY += float(data['y'][f[len(f)-1]+1]) cX = cX / float(len(f)+1) cY = cY / float(len(f)+1) centroidsX.append(cX) centroidsY.append(cY) time0.append(t0) time1.append(t1) #create dataframe cxy_df = pd.DataFrame(np.column_stack([centroidsX, centroidsY, time0, time1]), columns=['cx', 'cy', 'start','end']) return cxy_df
[docs] @classmethod def hmm(cls, data, filter_type, config): """ Hidden Makov Model, adapted from https://gitlab.com/nslr/nslr-hmm. Parameters ---------- data : :class:`pandas.DataFrame` Pandas dataframe of x,y and timestamp positions. filter_type : :class:`dict` Types of filters to use. config : :class:`dict` Configuration data for data analysis. i.e. trial number, location. Attributes ---------- data : :class:`numpy.ndarray` The smoothed signal (or it's n-th derivative). dr_th : :obj:`str` Data threshold. Notes ----- **Definitions** * **Saccade**: The saccade is a ballistic movement, meaning it is pre-programmed and does not change once it has started. Saccades of amplitude 40° peak at velocities of 300–600°/s and last for 80–150 ms. * **Fixation**: The point between any two saccades, during which the eyes are relatively stationary and virtually all visual input occurs. Regular eye movement alternates between saccades and visual fixations, the notable exception being in smooth pursuit. * **Smooth pursuit**: Smooth pursuit movements are much slower tracking movements of the eyes designed to keep a moving stimulus on the fovea. Such movements are under voluntary control in the sense that the observer can choose whether or not to track a moving stimulus. (Neuroscience 2nd edition). References ---------- .. [1] Pekkanen, J., & Lappi, O. (2017). A new and general approach to signal denoising and eye movement classification based on segmented linear regression. Scientific Reports, 7(1). doi:10.1038/s41598-017-17983-x. """ from . import nslr_hmm t = data['timestamp'].values x = data['x'] y = data['y'] #class_df (list of fixations), c_sample (list of samples with associated classification) class_df, c_sample = nslr_hmm.classify_gaze(t, np.vstack((x, y)).T) #keep fixation events only class_df = class_df[(class_df['class'] == 1)] #if trial has at least one fixation if class_df.shape[0]>=1: ###split cxy0 tuple into cx0 and cy0 class_df[['cx0', 'cy0']] = class_df['cxy0'].apply(pd.Series) ###split cxy0 tuple into cx1 and cy1 class_df[['cx1', 'cy1']] = class_df['cxy1'].apply(pd.Series) ###drop column class_df = class_df.drop(['cxy0','cxy1'], axis=1) ###merge c_sample data['%s_class'%(filter_type)] = c_sample return data, class_df
[docs] @classmethod def idt(cls,data, dis_threshold, dur_threshold): """ Identification with Dispersion Threshold. Parameters ---------- data : :class:`numpy.ndarray` The smoothed signal (or it's n-th derivative). dr_th : :obj:`str` Fixation duration threshold in pix/msec di_th : :obj:`str` Dispersion threshold in pixels Returns ------- ys : :class:`numpy.ndarray` The smoothed signal (or it's n-th derivative). Notes ----- The I-DT algorithm has two parameters: a dispersion threshold and the length of a time window in which the dispersion is calculated. The length of the time window is often set to the minimum duration of a fixation, which is around 100-200 ms. """ current = 0 #pointer to represent the current beginning point of the window last = 0 #final lists for fixation info centroidsX = [] centroidsY = [] time0 = [] time1 = [] while (current < len(data)): t0 = float(data['timestamp'][current]) #beginning time t1 = t0 + float(dur_threshold) #time after a min. fix. threshold has been observed for r in range(current, len(data)): if(float(data['timestamp'][r])>= t0 and float(data['timestamp'][r])<= t1): #print "if",r last = r #this will find the last index still in the duration threshold #now check the dispersion in this window #print "window", current, last points = data[current:last+1] dispersion = 0 argxmin = np.min(points.loc[:,'x'].astype(np.float)) argxmax = np.max(points.loc[:,'x'].astype(np.float)) argymin = np.min(points.loc[:,'y'].astype(np.float)) argymax = np.max(points.loc[:,'y'].astype(np.float)) dispersion = ((argxmax - argxmin) + (argymax - argymin))/2 if (dispersion <= dis_threshold): #add new points while(dispersion <= dis_threshold and last + 1 < len(data)): last += 1 points = data[current:last+1] dispersion = 0 argxmin = np.min(points.loc[:,'x'].astype(np.float)) argxmax = np.max(points.loc[:,'x'].astype(np.float)) argymin = np.min(points.loc[:,'y'].astype(np.float)) argymax = np.max(points.loc[:,'y'].astype(np.float)) dispersion = ((argxmax - argxmin) + (argymax - argymin))/2 #dispersion threshold is exceeded #fixation at the centroid [current,last] cX = 0 cY = 0 for f in range(current, last + 1): cX += float(data['x'][f]) cY += float(data['y'][f]) cX = cX / float(last - current + 1) cY = cY / float(last - current + 1) t0 = float(data['timestamp'][current]) t1 = float(data['timestamp'][last]) centroidsX.append(cX) centroidsY.append(cY) time0.append(t0) time1.append(t1) current = last + 1 #this will move the pointer to a novel window else: current += 1 #this will remove the first point last = current #this is not necessary #create dataframe cxy_df = pd.DataFrame(np.column_stack([centroidsX, centroidsY, time0, time1]), columns=['cx', 'cy', 'start','end']) return cxy_df
[docs] @classmethod def simple(cls, df, missing, maxdist, mindur): """Detects fixations, defined as consecutive samples with an inter-sample distance of less than a set amount of pixels (disregarding missing data). Parameters ---------- df : :class:`pandas.DataFrame` Pandas dataframe of x,y and timestamp positions missing : :obj:`str` Value to be used for missing data (default = 0.0) maxdist : :obj:`str` Maximal inter-sample distance in pixels (default = 25) mindur : :obj:`str` Minimal duration of a fixation in milliseconds; detected fixation cadidates will be disregarded if they are below this duration (default = 100). Returns ------- Sfix : :class:`numpy.ndarray` shape (N) list of lists, each containing [starttime] Efix : :class:`numpy.ndarray` shape (N) list of lists, each containing [starttime, endtime, duration, endx, endy] Notes ----- From https://github.com/esdalmaijer/PyGazeAnalyser/blob/master/pygazeanalyser/detectors.py """ x = df['x'] y = df['y'] time = df['timestamp'] # empty list to contain data Sfix = [] Efix = [] # loop through all coordinates si = 0 fixstart = False for i in range(1,len(x)): # calculate Euclidean distance from the current fixation coordinate # to the next coordinate dist = ((x[si]-x[i])**2 + (y[si]-y[i])**2)**0.5 # check if the next coordinate is below maximal distance if dist <= maxdist and not fixstart: # start a new fixation si = 0 + i fixstart = True Sfix.append([time[i]]) elif dist > maxdist and fixstart: # end the current fixation fixstart = False # only store the fixation if the duration is ok if time[i-1]-Sfix[-1][0] >= mindur: Efix.append([Sfix[-1][0], time[i-1], time[i-1]-Sfix[-1][0], x[si], y[si]]) # delete the last fixation start if it was too short else: Sfix.pop(-1) si = 0 + i elif not fixstart: si += 1 #return Sfix, Efix return Sfix, Efix