#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
| @purpose: Eyetracking classification Module.
| @date: Created on Sat May 1 15:12:38 2019
| @author: Semeon Risom
| @email: semeon.risom@gmail.com
| @url: https://semeon.io/d/R33-analysis
"""
# available functions
__all__ = ['ObservationModel','gaze_observation_model','gaze_transition_model','safelog','viterbi','forward_backward',
'dataset_features','transition_estimates','reestimate_observations_baum_welch','reestimate_observations_viterbi_robust',
'segment_features','classify_segments','classify_gaze']
# required external library
__required__ = ['numpy','scipy','nslr','pandas']
# core
from pdb import set_trace as breakpoint
import numpy as np
import scipy.stats
import itertools
import pandas as pd
# local libraries
from .settings import Settings as settings
FIXATION = 1
SACCADE = 2
PSO = 3
SMOOTH_PURSUIT = 4
[docs]class ObservationModel:
"""Include this - Semeon"""
def __init__(self, dists):
"""Include this - Semeon"""
self.classidx = {}
self.idxclass = []
self.dists = []
for i, (cls, dist) in enumerate(dists.items()):
self.idxclass.append(cls)
self.classidx[cls] = i
self.dists.append(dist)
self.idxclass = np.array(self.idxclass)
[docs] def liks(self, d):
"""Include this - Semeon"""
scores = []
scores = [dist.pdf(d) for dist in self.dists]
return np.array(scores).T
[docs] def classify(self, d):
"""Include this - Semeon"""
return np.argmax(self.liks(d), axis=1)
[docs] def dist(self, cls):
"""Include this - Semeon"""
return self.dists[self.classidx[cls]]
[docs]def gaze_observation_model():
"""Include this - Semeon"""
# Estimated from data by doi:10.3758/s13428-016-0738-9.
params = {
FIXATION: [[0.6039844795867605, -0.7788440631929878], [[0.1651734722683456, 0.0], [0.0, 1.5875256060544993]]],
SACCADE: [[2.3259276064858194, 1.1333265634427712], [[0.080879690559802, 0.0], [0.0, 2.0718979621084372]]],
PSO: [[1.7511546389160744, -1.817487032170937], [[0.0752678429860497, 0.0], [0.0, 1.356411391040218]]],
SMOOTH_PURSUIT: [[0.8175021916433242, 0.3047120126632254], [[0.13334607025750783, 0.0], [0.0, 2.5328705587328173]]]
}
dists = {
cls: scipy.stats.multivariate_normal(m, c)
for cls, (m, c) in params.items()
}
return ObservationModel(dists)
[docs]def gaze_transition_model():
"""Include this - Semeon"""
transitions = np.ones((4, 4))
transitions[0, 2] = 0
transitions[2, 1] = 0
transitions[3, 2] = 0
transitions[3, 0] = 0.5
transitions[0, 3] = 0.5
for i in range(len(transitions)):
transitions[i] /= np.sum(transitions[i])
return transitions
GazeObservationModel = gaze_observation_model()
GazeTransitionModel = gaze_transition_model()
[docs]def safelog(x):
"""Include this - Semeon"""
return np.log10(np.clip(x, 1e-6, None))
[docs]def viterbi(initial_probs, transition_probs, emissions):
"""Include this - Semeon"""
n_states = len(initial_probs)
emissions = iter(emissions)
emission = next(emissions)
transition_probs = safelog(transition_probs)
probs = safelog(emission) + safelog(initial_probs)
state_stack = []
for emission in emissions:
emission /= np.sum(emission)
trans_probs = transition_probs + np.row_stack(probs)
most_likely_states = np.argmax(trans_probs, axis=0)
probs = safelog(emission) + trans_probs[most_likely_states, np.arange(n_states)]
state_stack.append(most_likely_states)
state_seq = [np.argmax(probs)]
while state_stack:
most_likely_states = state_stack.pop()
state_seq.append(most_likely_states[state_seq[-1]])
state_seq.reverse()
return state_seq
[docs]def forward_backward(transition_probs, observations, initial_probs=None):
"""Include this - Semeon"""
observations = np.array(list(observations))
N = len(transition_probs)
T = len(observations)
if initial_probs is None:
initial_probs = np.ones(N)
initial_probs /= np.sum(initial_probs)
forward_probs = np.zeros((T, N))
backward_probs = forward_probs.copy()
probs = initial_probs
for i in range(T):
probs = np.dot(probs, transition_probs)*observations[i]
probs /= np.sum(probs)
forward_probs[i] = probs
probs = np.ones(N)
probs /= np.sum(probs)
for i in range(T-1, -1, -1):
probs = np.dot(transition_probs, (probs*observations[i]).T)
probs /= np.sum(probs)
backward_probs[i] = probs
state_probs = forward_probs*backward_probs
state_probs /= np.sum(state_probs, axis=1).reshape(-1, 1)
return state_probs, forward_probs, backward_probs
[docs]def dataset_features(data, **nslrargs):
segments = ((nslr.fit_gaze(ts, xs, **nslrargs), outliers) for ts, xs, outliers in data)
features = [list(segment_features(s.segments, o)) for s, o in segments]
return features
[docs]def transition_estimates(obs, trans, forward, backward):
T, N = len(obs), len(trans)
ests = np.zeros((T, N, N))
for start, end, i in itertools.product(range(N), range(N), range(T)):
if i == T - 1:
b = 1/N
else:
b = backward[i+1, end]
ests[i,start,end] = forward[i,start]*b*trans[start,end]
return ests
def reestimate_observations_baum_welch(sessions,
transition_probs=GazeTransitionModel,
observation_model=GazeObservationModel,
initial_probs=None,
estimate_observation_model=True,
estimate_transition_model=True,
n_iterations=30,
plot_process=False):
"""Include this - Semeon"""
all_observations = np.vstack(sessions)
if plot_process:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
CLASS_COLORS = {
1: 'b',
2: 'r',
3: 'y',
4: 'g',
5: 'm',
6: 'c',
22: 'orange'
}
for iteration in range(n_iterations): # Should probably have a nicer stopping criterion
all_state_probs = []
all_transition_probs = []
# Compute state and transition probabilities
# for all segments using the forward-backward algorithm
for features in sessions:
liks = np.array([observation_model.liks(f) for f in features])
probs, forward, backward = forward_backward(transition_probs, liks, initial_probs)
all_state_probs.extend(probs)
all_transition_probs.append(transition_estimates(liks, transition_probs, forward, backward))
all_state_probs = np.array(all_state_probs)
all_transition_probs = np.vstack(all_transition_probs)
if plot_process:
winner = np.argmax(all_state_probs, axis=1)
for cls in np.unique(winner):
my = winner == cls
plt.plot(all_observations[my,0], all_observations[my,1], '.', alpha=0.1, color=CLASS_COLORS[cls+1])
dists = {}
# Estimate the observation model using
# mean and covariance of the observations weighted
# by probability of a segment belonging to a given
# class.
for i, cls in enumerate(observation_model.idxclass):
w = all_state_probs[:,i]
wsum = np.sum(w)
w /= wsum
mean = np.average(all_observations, weights=w, axis=0)
cov = np.cov(all_observations, aweights=w, rowvar=False)
if plot_process:
plt.plot(mean[0], mean[1], 'o', color=CLASS_COLORS[cls])
dists[cls] = scipy.stats.multivariate_normal(mean, cov)
if plot_process:
plt.pause(0.1)
plt.cla()
if estimate_transition_model:
# Take a mean of all sessions. This may be unoptimal.
transition_probs = np.mean(all_transition_probs, axis=0)
transition_probs /= np.sum(transition_probs, axis=1).reshape(-1, 1)
if estimate_observation_model:
observation_model=ObservationModel(dists)
return transition_probs, observation_model
def reestimate_observations_viterbi_robust(
sessions,
transition_probs=GazeTransitionModel,
observation_model=GazeObservationModel,
initial_probs=None,
estimate_observation_model=True,
estimate_transition_model=True,
n_iterations=30,
plot_process=False):
"""Include this - Semeon"""
from sklearn.covariance import MinCovDet
all_observations = np.vstack(sessions)
if plot_process:
import matplotlib.pyplot as plt
CLASS_COLORS = {
1: 'b',
2: 'r',
3: 'y',
4: 'g',
5: 'm',
6: 'c',
22: 'orange'
}
N = len(transition_probs)
if initial_probs is None:
initial_probs = np.ones(N)
initial_probs /= np.sum(initial_probs)
for iteration in range(n_iterations):
all_states = []
all_transitions = np.zeros((N, N))
for features in sessions:
liks = np.array([observation_model.liks(f) for f in features])
states = viterbi(initial_probs, transition_probs, liks)
for i in range(len(states) - 1):
all_transitions[states[i], states[i+1]] += 1
all_states.extend(states)
all_states = np.array(all_states)
if plot_process:
for cls in np.unique(states):
my = all_states == cls
plt.plot(all_observations[my,0], all_observations[my,1], '.', alpha=0.1, color=CLASS_COLORS[cls+1])
dists = {}
for i, cls in enumerate(observation_model.idxclass):
my = all_states == i
# Don't reestimate if such class is not found
if np.sum(my) < 2:
dists[cls] = observation_model.dists[cls]
continue
# Use this for non-robust
#mean = np.average(all_observations[my], axis=0)
#cov = observation_model.dists[i].cov
robust = MinCovDet().fit(all_observations[my])
mean = robust.location_
cov = robust.covariance_
dists[cls] = scipy.stats.multivariate_normal(mean, cov)
if plot_process:
plt.plot(mean[0], mean[1], 'o', color=CLASS_COLORS[cls])
if plot_process:
plt.pause(0.1)
plt.cla()
if estimate_transition_model:
new_transition_probs = all_transitions
totals = np.sum(new_transition_probs, axis=1).reshape(-1, 1)
new_transition_probs /= totals
# Avoid nans in transitions. If the algorithm
# gets zeros here, the estimate will likely be quite bad
not_seen = totals.flatten() == 0
new_transition_probs[not_seen,:] = transition_probs[not_seen,:]
transition_probs = new_transition_probs
if estimate_observation_model:
observation_model=ObservationModel(dists)
return transition_probs, observation_model
[docs]def segment_features(segments, outliers=None):
"""Include this - Semeon"""
prev_direction = np.array([0.0, 0.0])
if outliers is None:
outliers = np.zeros(segments[-1].i[-1], dtype=bool)
for segment in segments:
if np.any(outliers[segment.i[0]:segment.i[1]]): continue
duration = float(np.diff(segment.t))
speed = np.diff(segment.x, axis=0)/duration
velocity = float(np.linalg.norm(speed))
direction = speed/velocity
cosangle = float(np.dot(direction, prev_direction.T))
# Fisher transform, avoid exact |1|
cosangle *= (1 - 1e-6)
cosangle = np.arctanh(cosangle)
if cosangle != cosangle:
cosangle = 0.0
yield safelog(velocity), cosangle
prev_direction = direction
def classify_segments(segments,
observation_model=GazeObservationModel,
transition_model=GazeTransitionModel,
initial_probabilities=None):
"""Include this - Semeon"""
if initial_probabilities is None:
initial_probabilities = np.ones(len(transition_model))
initial_probabilities /= np.sum(initial_probabilities)
observation_likelihoods = (observation_model.liks(f) for f in segment_features(segments))
path = viterbi(initial_probabilities, transition_model, observation_likelihoods)
return observation_model.idxclass[path]
[docs]def classify_gaze(ts, xs, **kwargs):
"""Include this - Semeon"""
fit_params = {k: kwargs[k]
for k in ('structural_error', 'optimize_noise', 'split_likelihood') if k in kwargs
}
#breakpoint()
#segment data
import nslr
segmentation = nslr.fit_gaze(ts, xs, **fit_params)
#classify segments #1: fixation, 2: saccade, 3: smooth pursuit, 4:PSO
segment_classes = classify_segments(segmentation.segments)
#classify samples
sample_classes = np.zeros(len(ts))
for c, s in zip(segment_classes, segmentation.segments):
start = s.i[0]
end = s.i[1]
sample_classes[start:end] = c
#prepare dataframe of segmentation data across samples ##0:start, 1:end
l_seg = [] #list of sample data
header = ['index start','index end','start time','end time','duration','class','cxy0','cxy1']
for i, seg in enumerate(segmentation.segments):
l_seg.append([seg.i[0],seg.i[1],
seg.t[0],seg.t[1],
seg.t[1] - seg.t[0], segment_classes[i],
(seg.x[0][0],seg.x[0][1]),
(seg.x[1][0],seg.x[0][1])
])
df = pd.DataFrame(l_seg,columns=header)
#breakpoint() #TODO!
#return sample_classes, segmentation, segment_classes, df
return df, sample_classes