import re
import array
import numpy as np
from math import ceil, floor
from typing import Iterable, Dict
from datetime import datetime
[docs]
def load_edf(path: str, t1: float=0, t2: float=0) -> Dict:
"""
Load data from EDF file (*.edf).
Notes
-----
This function supports the original EDF format and EDF+C. For the EDF+D format
it may be better to use PyEDFlib or mne.io.
Parameters
----------
path : str, path-like
Path to EDF data file
t1 : float, default=0
Starting time to extract
t2 : float, default=0
Ending time to extract until (default of 0 extracts until end of file)
Returns
-------
loaded_dict : dict from string to numpy array or float
Keys: 'data' - loaded neural recording (time*channels), 'data_f' - sampling rate of data,
'wav' - loaded audio recording (time*channels), wav_f' - sampling rate of sound,
'labels_data' - array of labels for the channel streams,
'labels_wav' - array of labels for the audio streams
"""
with open(path, 'rb') as fin:
# read header
version = int(fin.read(8).decode('ascii'))
patient = fin.read(80).decode('ascii').strip()
recording = fin.read(80).decode('ascii').strip()
start_date = fin.read(8).decode('ascii')
start_time = fin.read(8).decode('ascii')
header_size = int(fin.read(8).decode('ascii'))
reserved = fin.read(44).decode('ascii').strip()
# current implementation only supports contiguous EDF
if version != 0:
raise ValueError('EDF with version != 0 not supported.')
if reserved not in ('', 'EDF+C'):
raise ValueError('EDF reserved field should be empty (original EDF) or "EDF+C" (contiguous EDF+).')
# number of lengths of signals
num_records = int(fin.read(8).decode('ascii'))
record_dur = float(fin.read(8).decode('ascii'))
num_signals = int(fin.read(4).decode('ascii'))
# channel metadata
labels = np.array([fin.read(16).decode('ascii').strip() for _ in range(num_signals)])
transducer = np.array([fin.read(80).decode('ascii').strip() for _ in range(num_signals)])
physical_dim = np.array([fin.read(8).decode('ascii').strip() for _ in range(num_signals)])
physical_min = np.array([float(fin.read(8).decode('ascii')) for _ in range(num_signals)])
physical_max = np.array([float(fin.read(8).decode('ascii')) for _ in range(num_signals)])
digital_min = np.array([float(fin.read(8).decode('ascii')) for _ in range(num_signals)])
digital_max = np.array([float(fin.read(8).decode('ascii')) for _ in range(num_signals)])
prefilt = np.array([fin.read(80).decode('ascii').strip() for _ in range(num_signals)])
samples = np.array([int(fin.read(8).decode('ascii')) for _ in range(num_signals)])
samples_per_record = samples.sum()
fin.seek(32 * num_signals, 1)
if labels[-1] == 'EDF Annotations':
skip_end_bytes = samples[-1] * 2
num_signals -= 1
labels = labels[:-1]
transducer = transducer[:-1]
physical_dim = physical_dim[:-1]
physical_min = physical_min[:-1]
physical_max = physical_max[:-1]
digital_min = digital_min[:-1]
digital_max = digital_max[:-1]
prefilt = prefilt[:-1]
samples = samples[:-1]
else:
skip_end_bytes = 0
if len(set(samples)) != 1:
raise RuntimeError(
'The load_edf function does not support heterogeneous `samples` per record, '
'except for EDF annotations.'
)
samples = samples[0]
# compute sampling rate
sampling_rate = samples / record_dur
# only process part of recording that falls between the specified time range (t1, t2)
min_record = floor(t1 / record_dur)
max_record = ceil(t2 / record_dur) if 0 < t2 < num_records*record_dur else num_records
fin.seek(min_record * samples_per_record * 2, 1)
num_records = max_record - min_record
t_skip = min_record * record_dur
# scale recordings from digital to physical units
scale = (physical_max - physical_min) / (digital_max - digital_min)
def rescale(x):
return (x - digital_min) * scale + physical_min
# read actual neural data
aux_signals = _aux_channels(labels)
num_aux_signals = sum(aux_signals)
num_data_signals = num_signals - num_aux_signals
data = np.zeros((num_records * samples, num_data_signals), dtype=np.float32)
aux_data = np.zeros((num_records * samples, num_aux_signals), dtype=np.float32)
for i in range(num_records):
buffer = array.array('h')
buffer.fromfile(fin, num_signals * samples)
buffer = np.array(buffer).reshape(num_signals, samples).T
buffer = rescale(buffer)
data[i*samples:(i+1)*samples] = buffer[:, ~aux_signals]
aux_data[i*samples:(i+1)*samples] = buffer[:, aux_signals]
fin.seek(skip_end_bytes, 1)
# Convert start_date and start_time to python objects
try:
start_date = datetime.strptime(start_date, '%m.%d.%y').date()
except:
start_date = None
try:
start_time = datetime.strptime(start_time, '%H.%M.%S').time()
except:
start_time = None
return {
'data': data,
'data_f': sampling_rate,
'wav': aux_data,
'wav_f': sampling_rate,
'labels_data': labels[~aux_signals],
'labels_wav': labels[aux_signals],
't_skip': t_skip,
'info': {
'start_date': start_date,
'start_time': start_time,
'patient': patient,
'version': version,
'transducer': transducer,
'physical_unit': physical_dim,
'prefilt': prefilt,
},
}
def _aux_channels(labels: Iterable[str]):
pattern = r'^(EKG[LR]?|DC[0-9]*|TRIG[0-9]*|OSAT|PR|Pleth|EDF Annotations)$'
return np.array([re.match(pattern, label) for label in labels], dtype=bool)