import numpy as np
from joblib import Parallel, delayed
from numpy.typing import NDArray
from scipy.fft import fft, ifft
from scipy.signal import resample
from tqdm.auto import tqdm, trange
import logging
from naplib import logger
from naplib.data import Data
from naplib.utils import _parse_outstruct_args
[docs]
def filter_hilbert(x, fs, Wn=[[70,150]], n_jobs=1):
'''
Compute the phase and amplitude (envelope) of a signal over multiple frequency bands,
as in [#edwards]_. This is done using a filter bank of gaussian shaped filters with
center frequencies linearly spaced until 4Hz and then logarithmically spaced. The
Hilbert Transform of each filter's output is computed and the amplitude and phase are
computed from the complex values. Then amplitude and phase are averaged for each
channel over the center frequencies. See [#edwards]_ for details on the filter
bank used.
See Also
--------
filterbank_hilbert
Parameters
----------
x : np.ndarray, shape (time, channels)
Signal to filter. Filtering is performed on each channel independently.
fs : int
Sampling rate.
Wn : list or array-like, shape (n_freq_bands, 2) or (2,), default=[[70, 150]]
Lower and upper boundaries for filterbank center frequencies. The default
of [[70, 150]] extracts the phase and amplitude of the highgamma band.
n_jobs : int, default=1
Number of jobs to use to compute filterbank across channels in parallel.
Returns
-------
x_phase : np.ndarray, shape (time, channels, frequency_bins)
Phase of each frequency bin in the filter bank for each channel.
x_envelope : np.ndarray, shape (time, channels, frequency_bins)
Envelope of each frequency bin in the filter bank for each channel.
center_freqs : np.ndarray, shape (frequency_bins,)
Center frequencies for each frequency bin used in the filter bank.
Examples
--------
>>> import naplib as nl
>>> from naplib.preprocessing import filter_hilbert as f_hilb
>>> import numpy as np
>>> x = np.random.rand(1000,3) # 3 channels of signals
>>> fs = 500
>>> x_phase, x_envelope, freqs = f_hilb(x, fs, Wn=[[1, 50], [70, 150]])
>>> # the outputs have the phase and envelope for each channel and each frequency band
>>> x_phase.shape # 3rd dimension is one for each frequency band in Wn
(1000, 3, 2)
>>> x_envelope.shape
(1000, 3, 2)
>>> freqs[0] # center frequency of first filter bank filter
1.21558792
>>> freqs[-1] # center frequency of last filter bank filter
143.97075186
'''
Wn = np.asarray(Wn)
if Wn.ndim == 1:
Wn = Wn[np.newaxis,:]
for minf, maxf in Wn:
if minf >= maxf:
raise ValueError(
f'Upper bound of frequency range must be greater than lower bound, but got lower '
f'bound of {minf} and upper bound of {maxf}'
)
if x.ndim != 1 and x.ndim != 2:
raise ValueError(f'Input signal must be 1- or 2-dimensional but got input with shape {x.shape}')
if x.ndim == 1:
x = x[:,np.newaxis]
# create filter bank
a = np.array([np.log10(0.39), 0.5])
f0 = 0.018
octSpace = 1./7
maxf_global = Wn.max() + 1
maxfo = np.log2(maxf_global / f0) # octave of max freq
cfs = [f0]
sigma_f = 10**(a[0]+a[1]*np.log10(cfs[-1]))
while np.log2(cfs[-1]/f0) < maxfo:
if cfs[-1] < 4:
cfs.append(cfs[-1]+sigma_f)
else: # switches to log spacing at 4 Hz
cfo = np.log2(cfs[-1]/f0) # current freq octave
cfo += octSpace # new freq octave
cfs.append(f0*(2**(cfo)))
sigma_f = 10**(a[0]+a[1]*np.log10(cfs[-1]))
cfs = np.array(cfs)
for minf, maxf in Wn:
if np.logical_and(cfs>=minf, cfs<=maxf).sum() == 0:
raise ValueError(f'Frequency band [{minf}, {maxf}] is too narrow, so no filters in filterbank are placed inside. Try a wider frequency band.')
# choose those that lie in the input freqRange
cfs = np.array([f for f in cfs if any(minf <= f <= maxf for minf, maxf in Wn)])
exponent = np.concatenate((np.ones((len(cfs),1)), np.log10(cfs)[:,np.newaxis]), axis=1) @ a
sigma_fs = 10**exponent
sds = sigma_fs * np.sqrt(2)
N = x.shape[0]
freqs = np.arange(0, N//2+1)*(fs/N)
# perform hilbert transform at each center freq
if x.dtype != np.float32:
x = x.astype('float32')
Xf = fft(x, N, axis=0)
h = np.zeros(N, dtype=Xf.dtype)
if N % 2 == 0:
h[0] = h[N // 2] = 1
h[1:N // 2] = 2
else:
h[0] = 1
h[1:(N + 1) // 2] = 2
if x.ndim > 1:
ind = [np.newaxis] * x.ndim
ind[0] = slice(None)
h = h[tuple(ind)]
def extract_channel(Xf):
hilb_channel = _vectorized_band_hilbert(Xf, h, N, freqs, cfs, sds)
hilb_phase = np.zeros((x.shape[0], len(Wn)), dtype='float32')
hilb_amp = np.zeros((x.shape[0], len(Wn)), dtype='float32')
for i, (minf, maxf) in enumerate(Wn):
# average over the filters in the frequency band
band_locator = np.logical_and(cfs>=minf, cfs<=maxf)
hilb_phase[:,i] = np.angle(hilb_channel[:,band_locator]).mean(-1)
hilb_amp[:,i] = np.abs(hilb_channel[:,band_locator]).mean(-1)
return hilb_phase, hilb_amp
# pre-allocate
hilb_phase = np.zeros((*x.shape, len(Wn)), dtype='float32')
hilb_amp = np.zeros((*x.shape, len(Wn)), dtype='float32')
# process channels sequentially
if n_jobs == 1:
for chn in trange(x.shape[1], disable=not logger.isEnabledFor(logging.DEBUG)):
hilb_phase[:,chn], hilb_amp[:,chn] = extract_channel(Xf[:,chn])
# process channels in parallel
else:
results = Parallel(n_jobs)(delayed(extract_channel)(Xf[:,chn]) for chn in range(x.shape[1]))
for chn, (phase, amp) in enumerate(results):
hilb_phase[:,chn], hilb_amp[:,chn] = phase, amp
return hilb_phase, hilb_amp, cfs
[docs]
def filterbank_hilbert(x, fs, Wn=[70,150], n_jobs=1):
'''
Compute the phase and amplitude (envelope) of a signal for a single frequency band,
as in [#edwards]_. This is done using a filter bank of gaussian shaped filters with
center frequencies linearly spaced until 4Hz and then logarithmically spaced. The
Hilbert Transform of each filter's output is computed and the amplitude and phase
are computed from the complex values. See [#edwards]_ for details on the filter
bank used.
See Also
--------
filter_hilbert
Parameters
----------
x : np.ndarray, shape (time, channels)
Signal to filter. Filtering is performed on each channel independently.
fs : int
Sampling rate.
Wn : list or array-like, length 2, default=[70, 150]
Lower and upper boundaries for filterbank center frequencies. A range
of [1, 150] results in 42 filters.
n_jobs : int, default=1
Number of jobs to use to compute filterbank across channels in parallel.
Returns
-------
x_phase : np.ndarray, shape (time, channels, frequency_bins)
Phase of each frequency bin in the filter bank for each channel.
x_envelope : np.ndarray, shape (time, channels, frequency_bins)
Envelope of each frequency bin in the filter bank for each channel.
center_freqs : np.ndarray, shape (frequency_bins,)
Center frequencies for each frequency bin used in the filter bank.
Examples
--------
>>> import naplib as nl
>>> from naplib.preprocessing import filterbank_hilbert as fb_hilb
>>> import numpy as np
>>> x = np.random.rand(1000,3) # 3 channels of signals
>>> fs = 500
>>> x_phase, x_envelope, freqs = fb_hilb(x, fs, Wn=[1, 150])
>>> # the outputs have the phase and envelope for each channel and each filter in the filterbank
>>> x_phase.shape # 3rd dimension is one for each filter in filterbank
(1000, 3, 42)
>>> x_envelope.shape
(1000, 3, 42)
>>> freqs[0] # center frequency of first filter bank filter
1.21558792
>>> freqs[-1] # center frequency of last filter bank filter
143.97075186
'''
minf, maxf = Wn
if minf >= maxf:
raise ValueError(f'Upper bound of frequency range must be greater than lower bound, but got lower bound of {minf} and upper bound of {maxf}')
if x.ndim != 1 and x.ndim != 2:
raise ValueError(f'Input signal must be 1- or 2-dimensional but got input with shape {x.shape}')
if x.ndim == 1:
x = x[:,np.newaxis]
# create filter bank
a = np.array([np.log10(0.39), 0.5])
f0 = 0.018
octSpace = 1./7
maxfo = np.log2(maxf / f0) # octave of max freq
cfs = [f0]
sigma_f = 10**(a[0]+a[1]*np.log10(cfs[-1]))
while np.log2(cfs[-1]/f0) < maxfo:
if cfs[-1] < 4:
cfs.append(cfs[-1]+sigma_f)
else: # switches to log spacing at 4 Hz
cfo = np.log2(cfs[-1]/f0) # current freq octave
cfo += octSpace # new freq octave
cfs.append(f0*(2**(cfo)))
sigma_f = 10**(a[0]+a[1]*np.log10(cfs[-1]))
cfs = np.array(cfs)
if np.logical_and(cfs>=minf, cfs<=maxf).sum() == 0:
raise ValueError(f'Frequency band [{minf}, {maxf}] is too narrow, so no filters in filterbank are placed inside. Try a wider frequency band.')
# choose those that lie in the input freqRange
cfs = cfs[np.logical_and(cfs>=minf, cfs<=maxf)]
exponent = np.concatenate((np.ones((len(cfs),1)), np.log10(cfs)[:,np.newaxis]), axis=1) @ a
sigma_fs = 10**exponent
sds = sigma_fs * np.sqrt(2)
N = x.shape[0]
freqs = np.arange(0, N//2+1)*(fs/N)
# perform hilbert transform at each center freq
if x.dtype != np.float32:
x = x.astype('float32')
Xf = fft(x, N, axis=0)
h = np.zeros(N, dtype=Xf.dtype)
if N % 2 == 0:
h[0] = h[N // 2] = 1
h[1:N // 2] = 2
else:
h[0] = 1
h[1:(N + 1) // 2] = 2
if x.ndim > 1:
ind = [np.newaxis] * x.ndim
ind[0] = slice(None)
h = h[tuple(ind)]
def extract_channel(Xf):
hilb_channel = _vectorized_band_hilbert(Xf, h, N, freqs, cfs, sds)
hilb_phase = np.zeros((x.shape[0], len(cfs)), dtype='float32')
hilb_amp = np.zeros((x.shape[0], len(cfs)), dtype='float32')
band_locator = np.logical_and(cfs>=minf, cfs<=maxf)
hilb_phase = np.angle(hilb_channel[:,band_locator])
hilb_amp = np.abs(hilb_channel[:,band_locator])
return hilb_phase, hilb_amp
# pre-allocate
hilb_phase = np.zeros((*x.shape, len(cfs)), dtype='float32')
hilb_amp = np.zeros((*x.shape, len(cfs)), dtype='float32')
# process channels sequentially
if n_jobs == 1:
for chn in trange(x.shape[1], disable=not logger.isEnabledFor(logging.DEBUG)):
hilb_phase[:,chn], hilb_amp[:,chn] = extract_channel(Xf[:,chn])
# process channels in parallel
else:
results = Parallel(n_jobs)(delayed(extract_channel)(Xf[:,chn]) for chn in range(x.shape[1]))
for chn, (phase, amp) in enumerate(results):
hilb_phase[:,chn], hilb_amp[:,chn] = phase, amp
return hilb_phase, hilb_amp, cfs
def _vectorized_band_hilbert(X_fft, h, N, freqs, cfs, sds) -> NDArray:
n_freqs = len(freqs)
k = freqs.reshape(-1,1) - cfs.reshape(1,-1)
H = np.zeros((N, len(cfs)), dtype='float32')
H[:n_freqs,:] = np.exp(-0.5 * np.divide(k, sds) ** 2)
H[n_freqs:,:] = H[1:int(np.floor((N+1)/2)),:][::-1]
H[0,:] = 0.
H = np.multiply(H, h)
return ifft(X_fft[:,np.newaxis] * H, N, axis=0).astype('complex64')