import numpy as np
from scipy.ndimage import gaussian_filter1d
from ..stats import discriminability
from ..utils import _parse_outstruct_args
[docs]
def get_label_change_points(x):
'''
Find the indices where x changes from one categorical (integer) value to another.
For example, if x is [0, 0, 0, 1, 1, 3, 3], locations returned will be [3, 5]
Parameters
----------
x : np.ndarray of shape (time,)
Label vector over time.
Returns
-------
locs : array of shape (n_changes, )
Indices where labels change.
labels : array of shape (n_changes, )
New label (from input `x`) after each transition.
prior_labels : array of shape (n_changes, )
Old label (from input `x`) before each transition.
'''
if not isinstance(x, np.ndarray):
assert TypeError(f'input must be numpy array but got {type(x)}')
diff_x = np.concatenate([np.array([0]), np.diff(x)])
locs = np.argwhere(diff_x!=0).squeeze()
labels = x[locs]
labels_prior = x[locs-1]
return locs, labels, labels_prior
[docs]
def segment_around_label_transitions(data=None, field=None, labels=None, prechange_samples=50, postchange_samples=300, elec_lag=None):
'''
Cut x around the transition points given by the changes in the labels.
Parameters
----------
data : Data, optional
Data object containing data to be normalized in one of the field. If not given, must give
the data to be normalized directly as the ``data`` argument.
field : list or array-like, or a string specifying a field of the Data object
If a string, must specify a field of the Data object passed to the ``data`` parameter.
Field to be segmented based on the labels. field[i] is shape (time, electrode, *, ...)
labels : string, or list of np.ndarrays or array-like with the same length as field, or a tuple of lists or array-likes each the same length as field
If a string, specifies a field of the oustruct containing labels to use for
segmenting. If a single list or np.ndarray (not a tuple), then labels[i]
is shape (time, ) giving integer label of each trial.
If a tuple, then labels[0][i] is shape (time, ) giving integer label of each sample, and the
transition points of these labels are used to segment. The other lists of labels (like labels[1])
are not used for segmentation but their values surrounding the transition point are returned in a tuple of labels
prechange_samples : int, default=50
Number of samples to include in each segment that come before a label transition.
postchange_samples : int, default=300
Number of samples to include in each segment that come after a label transition.
elec_lag : list or array-like, default=None
Provides the lag (in samples) of each electrode, must be length=n_electrodes=data[i].shape[1].
Only used if not None. This can be computed with many methods, for example, using
``nl.segmentation.electrode_lags_fratio``.
Returns
-------
segments : array of shape (n_segments, time, electrode, *, ...)
Segments cut from data surrounding label transition points.
labels : array-like
If input labels is just a list, then is of shape (n_segments,) providing new label after transition point
If input labels is a tuple of lists, then this is a tuple of array-likes, where the first is the
same as described above, and the others are arrays of shape (n_segments, time)
prior_labels : array-like, shape (n_segments,)
Gives label that came before the transition in the main labels array used for segmentation
Examples
--------
>>> from naplib.segmentation import segment_around_label_transitions
>>> # In this example, we imagine we only have 1 trial of data, so all the lists of data and labels
>>> # are length 1. In practice, they could be any length though.
>>> arr = np.arange(20).reshape(10,2) # array of shape (time, features/electrodes)
>>> arr
array([[ 0, 1],
[ 2, 3],
[ 4, 5],
[ 6, 7],
[ 8, 9],
[10, 11],
[12, 13],
[14, 15],
[16, 17],
[18, 19]])
>>> # use a label of categorical values to segment the array based on
>>> # transitions in the categorical label
>>> label = np.array([0,0,1,1,1,0,0,3,3,3])
>>> segments, labels, prior_labels = segment_around_label_transitions(field=[arr], labels=[label],
... prechange_samples=0,
... postchange_samples=3)
>>> segments
array([[[ 4, 5],
[ 6, 7],
[ 8, 9]],
[[10, 11],
[12, 13],
[14, 15]],
[[14, 15],
[16, 17],
[18, 19]]])
>>> labels, prior_labels
(array([1, 0, 3]), array([0, 1, 0]))
>>> # We can also get the full segment value of these labels or another set of
>>> # labels around the transition, rather than just the single values of the
>>> # labels at the transition point, simply by passing other label vectors
>>> # that we want to be segmented as the 2nd through nth value in a tuple of labels
>>> label2 = np.array([0,1,2,3,4,5,6,7,8,9]) # another set of labels of interest
>>> label_tuple = ([label], [label], [label2])
>>> segments, labels, prior_labels = segment_around_label_transitions(field=[arr], labels=label_tuple, prechange_samples=0, postchange_samples=3)
>>> labels # this time we get a tuple of 3 labels. The first is the same as before, and the others are fully segmented
(array([1, 0, 3]),
array([[1, 1, 1],
[0, 0, 3],
[3, 3, 3]]),
array([[2, 3, 4],
[5, 6, 7],
[7, 8, 9]]))
'''
x, labels = _parse_outstruct_args(data, field, labels)
if any(dd is None for dd in x):
raise ValueError(f'None found in field')
if any(lab is None for lab in labels):
raise ValueError(f'None found in labels')
if isinstance(labels, tuple):
labels, other_labels = labels[0], labels[1:]
return_multiple_labels = True
else:
other_labels = (labels,)
return_multiple_labels = False
segments = []
new_labels = []
prior_labels = []
other_labels_to_return = [[] for _ in range(len(other_labels))]
for i, tmp_unpack in enumerate(zip(x, labels, *other_labels)):
x_i, labels_i, other_labels_i = tmp_unpack[0], tmp_unpack[1], tmp_unpack[2:]
label_changepoints, labels_at_changepoints, labels_before_changepoints = get_label_change_points(labels_i)
label_changepoints = label_changepoints.astype('int')
labels_before_changepoints = labels_before_changepoints.astype('int')
for change_point, new_lab, prior_lab in zip(label_changepoints, labels_at_changepoints, labels_before_changepoints):
if change_point >= prechange_samples and change_point+postchange_samples <= x_i.shape[0]:
if elec_lag is not None:
tmp_x_i_region = np.array([x_i[change_point-prechange_samples+elec_lag[elec_idx]:change_point+postchange_samples+elec_lag[elec_idx], elec_idx] for elec_idx in range(len(elec_lag))])
segments.append(tmp_x_i_region.transpose())
else:
segments.append(x_i[change_point-prechange_samples:change_point+postchange_samples])
new_labels.append(new_lab)
prior_labels.append(prior_lab)
for i, other_labs in enumerate(other_labels_i):
other_labels_to_return[i].append(other_labs[change_point-prechange_samples:change_point+postchange_samples])
if return_multiple_labels:
return np.array(segments), (np.array(new_labels), *[np.array(tt) for tt in other_labels_to_return]), np.array(prior_labels)
else:
return np.array(segments), np.array(new_labels), np.array(prior_labels)
[docs]
def electrode_lags_fratio(data=None, field=None, labels=None, max_lag=20, return_fratios=False):
'''
Compute lags of each electrode based on peak of f-ratio to a given label, such as
phoneme labels. The data is segmented around onset transitions in the labels, and
an electrode's lag is defined as the peak of the f-ratio after the transition.
This uses the "lda" method of ``naplib.stats.discriminability``.
Parameters
----------
data : Data, optional
Data containing data to be normalized in one of the field. If not given, must give
the data to be normalized directly as the ``data`` parameter.
field : string, or list or array-like
Electrode data to use to compute lags with the fratio.
If a string, must specify a field of the Data object passed to the ``data`` parameter.
If a list or array-like, field[i] is shape (time, n_electrodes, *, ...)
labels : string, or list or array-like
Labels over time for each trial. The onset changes of these labels will
be used to segment the data and compute the f-ratio after the transition.
If a string, must specify a field of the Data object. If a list or array-like, then
labels[i] is of shape (time,)
max_lag : int, default=20
Maximum lag to look for in the f-ratio (in samples).
return_fratios : bool, default=False
Whether or not to return smoothed_fratios along with lags in a tuple. Default False.
Returns
-------
lags : np.ndarray, shape=(n_electrodes,)
Lag of each electrode, in samples
smoothed_fratios : np.ndarray, shape=(n_electrodes, time)
Fratio of each electrode over time, after gaussian smoothing.
Only returned if ``return_fratios=True``
'''
segments, labels, _ = segment_around_label_transitions(data=data, field=field, labels=labels, prechange_samples=0, postchange_samples=max_lag)
fratios_lags = discriminability(segments.transpose((2,1,0)), labels, elec_mode='individual')
fratios_lags_smooth = gaussian_filter1d(fratios_lags, 0.5, mode='constant')
lags = fratios_lags_smooth.argmax(-1)
if return_fratios:
return lags, fratios_lags_smooth
else:
return lags
[docs]
def shift_label_onsets(data=None, labels='wrd_labels', p=0.5):
"""
From continuous labels, shift them so that they correspond to a
certain fraction of the feature's progress. This is useful if you have something
like word labels, which encode word onsets and offsets, but you want to be able
to segment your data based on word centers (i.e. 50% of the way through the word).
For example, if the labels for one trial are a continuous (discrete)
signal representing word labels over time,
such as [-1,-1,-1,3,3,3,3,4,4,4,-1,-1,5,5,5,5,5] where -1 means no word, and 3, 4,
and 5 indicate different words which are present over those samples, then with
p=0.5, the onsets will be shifted so that their onsets correspond to 50% of
the word's progress. So, the output will be [-1,-1,-1,-1,-1,3,3,-1,4,4,-1,-1,-1,-1,5,5,5].
Parameters
----------
data : Data, optional
Data instance. If provided, then labels should indicate a field of this data.
labels : string, or list of np.ndarrays, default='wrd_labels'
Either a field of the data, or a list of 1D numpy arrays.
p : float (0<=p<1), default=0.5
Initial proportion of the label duration for each category to remove. For word labels, a value
of 0.5 will replace the first 50% of each word with -1, while a value of 0.99999999 will almost certainly
guarantee that only the last sample of each label region remains and the rest are
replaced with -1. 0.5 can be used to get word centers, while 0.99999999 can be used to get word
offsets.
Returns
-------
new_labels : list[np.ndarray]
New labels for each trial.
Examples
--------
>>> from naplib.segmentation import shift_label_onsets
>>> import numpy as np
>>> labs = [np.array([-1,-1,-1,3,3,3,3,4,4,4,-1,-1,5,5,5,5,5])]
>>> shift_label_onsets(labels=labs)
[array([-1, -1, -1, -1, -1, 3, 3, -1, 4, 4, -1, -1, -1, -1, 5, 5, 5])]
"""
if p >= 1 or p < 0:
raise ValueError(f'p must be in the range [0, 1), but got {p}')
labels = _parse_outstruct_args(data, labels, allow_strings_without_outstruct=False)
new_labels = []
for labels_this_trial in labels:
new_labels_this_trial = labels_this_trial.copy().squeeze()
new_labels_this_trial = np.concatenate([np.array([-1]), new_labels_this_trial])
locs, labs, prior_labs = get_label_change_points(new_labels_this_trial)
for i, (loc, lab, prior_lab) in enumerate(zip(locs, labs, prior_labs)):
if lab == -1:
continue
if i == len(locs)-1:
feature_length = len(labels_this_trial) - loc + 1
else:
feature_length = locs[i+1] - loc
remove_samples = int(p*feature_length)
new_labels_this_trial[loc:loc+remove_samples] = -1
new_labels.append(new_labels_this_trial[1:]) # remove extra -1 that we had prepended
return new_labels