API reference

This is the automatically generated documentation for all public modules and objects in the package.

gui

RedirectText

Bases: StringIO

Redirect sys.stdout and sys.stderr to a Tkinter Text widget.

Source code in neuroflow/gui.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class RedirectText(io.StringIO):
    """Redirect sys.stdout and sys.stderr to a Tkinter Text widget."""

    def __init__(self, text_ctrl):
        super().__init__()
        self.text_ctrl = text_ctrl

    def write(self, s):
        self.text_ctrl.configure(state="normal")
        self.text_ctrl.insert(tk.END, s)
        self.text_ctrl.see(tk.END)
        self.text_ctrl.configure(state="disabled")

    def flush(self):
        pass

io

load

convert2csv(file_list, output_file=None, device=None)

Convert raw file into standardized wide format CSV/DataFrame.

Source code in neuroflow/io/load.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def convert2csv(file_list, output_file=None, device=None):
    """
    Convert raw file into standardized wide format CSV/DataFrame.
    """
    # Load raw
    print("... Reading and standardizing data")
    df = load_raw(file_list, device)

    print("... Writing DataFrame to .csv")
    # Save CSV
    if output_file is None:
        output_file = (
            Path(file_list[0]).parent
            / f"neuroflow-{device.lower().split(" (")[0]}.csv"
        )
    df.to_csv(output_file, index=False)

    return output_file

load_raw(file_list, device=None)

Load raw device files and return a standardized DataFrame.

Parameters:
  • file_list (list of str/Path) –

    paths to device files for a single collection

  • device (str, default: None ) –

    "Axivity (IMU)", "Bittium (ECG)"

Returns:
  • pd.DataFrame: standardized data

Source code in neuroflow/io/load.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def load_raw(file_list, device=None):
    """
    Load raw device files and return a standardized DataFrame.

    Args:
        file_list (list of str/Path): paths to device files for a single collection
        device (str): "Axivity (IMU)", "Bittium (ECG)"

    Returns:
        pd.DataFrame: standardized data
    """

    dict_devices = {
        "Axivity (IMU)": {"loader": _axivity_cwa_files, "input": "multiple"},
        "Bittium (ECG)": {"loader": _bittium_edf, "input": "single"},
    }

    try:
        if dict_devices[device]["input"] == "single":
            df = dict_devices[device]["loader"](str(file_list[0]))
        elif dict_devices[device]["input"] == "multiple":
            df = dict_devices[device]["loader"](file_list)
    except Exception as e:
        print(e)
        raise ValueError(
            f"Unsupported device/modality {device}. Currently supported options are: {dict_devices.keys()}"
        )

    return df

read_sensor_location

autodetect_sensor_location(filename)

Try to infer sensor location from filename.

Source code in neuroflow/io/read_sensor_location.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def autodetect_sensor_location(filename):
    """
    Try to infer sensor location from filename.
    """
    fname = filename.lower()
    patterns = {
        "leftankle": ["la", "leftankle", "lankle"],
        "rightankle": ["ra", "rightankle", "rankle"],
        "lumbar": ["l5", "lumbar", "lowerback", "waist"],
        "leftwrist": ["lw", "leftwrist", "lwrist"],
        "rightwrist": ["rw", "rightwrist", "rwrist"],
        "chest": ["chest", "sternum"],
        "thoracic": ["c7", "upperback"],
    }

    for canon_name, keywords in patterns.items():
        for kw in keywords:
            if kw in fname:
                return canon_name
    return None  # could not detect

stream

stream_axivity_timestamps(file_list, time_start, time_stop, sensor_names=None)

Load multiple Axivity .cwa files and return a wide DataFrame.

Parameters:
  • file_list (list of str/Path) –

    paths to .cwa files

  • time_start (datetime64) –

    start time of streaming window

  • time_end (datetime64) –

    end time of streaming window

  • sensor_names (list of str, default: None ) –

    optional names for each sensor; defaults to file stem

Returns:
  • pd.DataFrame: wide-format IMU data with columns: time, _ax, _ay, _az, _gx, _gy, _gz

Source code in neuroflow/io/stream.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def stream_axivity_timestamps(file_list, time_start, time_stop, sensor_names=None):
    """
    Load multiple Axivity .cwa files and return a wide DataFrame.

    Args:
        file_list (list of str/Path): paths to .cwa files
        time_start (np.datetime64): start time of streaming window
        time_end (np.datetime64): end time of streaming window
        sensor_names (list of str): optional names for each sensor; defaults to file stem

    Returns:
        pd.DataFrame: wide-format IMU data with columns:
                      time, <sensor>_ax, <sensor>_ay, <sensor>_az, <sensor>_gx, <sensor>_gy, <sensor>_gz
    """
    if sensor_names is None:
        sensor_names = [autodetect_sensor_location(Path(f).stem) for f in file_list]

        sensor_dfs = []

        for f, name in zip(file_list, sensor_names):
            print(f"Streaming {name} ...")
            df_data = stream_cwa_file(f, time_start, time_stop)
            df_data = _sensor_column_names(df_data, f, sensor=name)

            sensor_dfs.append(df_data)

        df_wide = reduce(
            lambda left, right: pd.merge_asof(
                left.sort_values("time"), right.sort_values("time"), on="time"
            ),
            sensor_dfs,
        )

    return df_wide

stream_bittium_timestamps(edf_path, time_start, time_end)

Stream a Bittium .EDF file and return a DataFrame.

Parameters:
  • edf_path (str) –

    path to .EDF file

  • time_start (datetime64) –

    start time of streaming window

  • time_end (datetime64) –

    end time of streaming window

Returns:
  • pd.DataFrame: ECG data with columns: time, ecg_, chest_ax, chest_ay, chest_az

Source code in neuroflow/io/stream.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def stream_bittium_timestamps(
    edf_path: str, time_start: np.datetime64, time_end: np.datetime64
):
    """
    Stream a Bittium .EDF file and return a DataFrame.

    Args:
        edf_path (str): path to .EDF file
        time_start (np.datetime64): start time of streaming window
        time_end (np.datetime64): end time of streaming window

    Returns:
        pd.DataFrame: ECG data with columns:
                      time, ecg_<channel_number>, chest_ax, chest_ay, chest_az
    """
    window = (time_end - time_start) / np.timedelta64(1, "s")

    df = stream_bittium_window(edf_path, time_start, window)

    return df

stream_bittium_window(edf_path, timestamp, duration_seconds)

Stream a Bittium .EDF file and return a DataFrame.

Parameters:
  • edf_path (str) –

    path to .EDF file

  • timestamp (datetime64) –

    start time of streaming window

  • duration_seconds (int) –

    duration of streaming window

Returns:
  • pd.DataFrame: ECG data with columns: time, ecg_, chest_ax, chest_ay, chest_az

Source code in neuroflow/io/stream.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def stream_bittium_window(
    edf_path: str, timestamp: np.datetime64, duration_seconds: int
):
    """
    Stream a Bittium .EDF file and return a DataFrame.

    Args:
        edf_path (str): path to .EDF file
        timestamp (np.datetime64): start time of streaming window
        duration_seconds (int): duration of streaming window

    Returns:
        pd.DataFrame: ECG data with columns:
                      time, ecg_<channel_number>, chest_ax, chest_ay, chest_az
    """
    with pyedflib.EdfReader(edf_path) as f:
        header = f.getHeader()
        signal_headers = f.getSignalLabels()
        sample_frequencies = f.getSampleFrequencies()

        channel_ecg = [s for s in signal_headers if "ecg" in s.lower()]
        channel_imu = [s for s in signal_headers if "accelerometer" in s.lower()]

        fs_ecg = [
            sample_frequencies[i]
            for i, lbl in enumerate(signal_headers)
            if "ecg" in lbl.lower()
        ][0]
        fs_imu = [
            sample_frequencies[i]
            for i, lbl in enumerate(signal_headers)
            if "accelerometer" in lbl.lower()
        ][0]

        # get samples
        start_sec = (timestamp - pd.to_datetime(header["startdate"])).total_seconds()

        # ecg
        start_sample_ecg = int(start_sec * fs_ecg)
        n_samples_ecg = int(duration_seconds * fs_ecg)
        signals_ecg = [
            f.readSignal(signal_headers.index(ch), start_sample_ecg, n_samples_ecg)
            for ch in channel_ecg
        ]

        # imu
        start_sample_imu = int(start_sec * fs_imu)
        n_samples_imu = int(duration_seconds * fs_imu)
        signals_imu = [
            f.readSignal(signal_headers.index(ch), start_sample_imu, n_samples_imu)
            for ch in channel_imu
        ]

    df_ecg = pd.DataFrame(np.array(signals_ecg).T, columns=channel_ecg)
    df_imu = pd.DataFrame(np.array(signals_imu).T, columns=channel_imu)

    # insert time
    time_ecg = np.linspace(0, (len(signals_ecg[0]) - 1) / fs_ecg, len(signals_ecg[0]))
    time_imu = np.linspace(0, (len(signals_imu[0]) - 1) / fs_imu, len(signals_imu[0]))
    df_ecg.insert(0, "time", time_ecg)
    df_imu.insert(0, "time", time_imu)

    # merge on ecg df
    df = pd.merge(df_ecg, df_imu, on="time", how="left")

    # time correction
    df["time"] = (
        pd.to_datetime(header["startdate"])
        + pd.DateOffset(seconds=start_sec)
        + pd.to_timedelta(df["time"], unit="s")
    )

    # standardize column names
    is_ecg_channel = [i for i in df.columns if "ecg" in i.lower()]
    column_names = (
        ["time"]
        + [f"ecg{i}" for i in range(len(is_ecg_channel))]
        + ["chest_ax", "chest_ay", "chest_az"]
    )
    df.columns = column_names

    return df

stream_cwa_file(cwa_path, time_start, time_end, sensor=None)

Stream an Axivity .CWA file and return a DataFrame.

Parameters:
  • cwa_path (str) –

    path to .CWA file

  • time_start (datetime64) –

    start time of streaming window

  • time_end (datetime64) –

    end time of streaming window

Returns:
  • pd.DataFrame: IMU data with columns: time, ax, ay, az, gx, gy, gz

Source code in neuroflow/io/stream.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def stream_cwa_file(
    cwa_path: str, time_start: np.datetime64, time_end: np.datetime64, sensor=None
):
    """
    Stream an Axivity .CWA file and return a DataFrame.

    Args:
        cwa_path (str): path to .CWA file
        time_start (np.datetime64): start time of streaming window
        time_end (np.datetime64): end time of streaming window

    Returns:
        pd.DataFrame: IMU data with columns:
                      time, ax, ay, az, gx, gy, gz
    """
    with open(cwa_path, "rb") as f:
        # get file size to know when to stop
        file_size = os.fstat(f.fileno()).st_size
        current_position = SECTOR_SIZE

        list_datablocks = []
        # iterate through each data block
        while current_position < file_size:
            data_block = f.read(SECTOR_SIZE)
            current_position += SECTOR_SIZE

            # parse the data block to get its timestamp
            data = parse_cwa_data(data_block)

            # skip invalid blocks
            if not data or "timestamp" not in data:
                continue

            # convert the UNIX timestamp to a datetime object
            data_duration = data["sampleCount"] / data["frequency"]
            block_stop = np.datetime64(pd.to_datetime(data["timestampTime"]))
            block_start = np.datetime64(
                block_stop - pd.DateOffset(seconds=data_duration)
            )

            # check if the block is within the desired time range
            if block_stop < time_start:
                # still before the start time, continue to the next block
                continue
            elif block_start > time_end:
                # past the end time, stop streaming
                break
            else:
                data_samples = parse_cwa_data(data_block, extractData=True)
                # Yield the data for blocks within the range
                list_datablocks.append(
                    (
                        block_start,
                        block_stop,
                        data_duration,
                        data["frequency"],
                        data_samples,
                    )
                )

    # convert datablocks to DataFrame
    df_data = _blocks2dataframe(list_datablocks)

    return df_data

sync

split2csv(file_list, file_sync, source='NeuroFlow (CSV)', output_dir=None)

Split standardized data DataFrames using event sync timestamps, and writes each file to .csv.

Parameters:
  • file_list (list) –

    list of file paths of standardized data .csv (or raw)

  • file_sync (Path) –

    file path of sync timestamps .csv

Returns:
  • output_dir

    Path to output directory

Source code in neuroflow/io/sync.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def split2csv(file_list, file_sync, source="NeuroFlow (CSV)", output_dir=None):
    """
    Split standardized data DataFrames using event sync timestamps, and writes each file to .csv.

    Args:
        file_list (list): list of file paths of standardized data .csv (or raw)
        file_sync (Path): file path of sync timestamps .csv

    Returns:
        output_dir: Path to output directory
    """

    print(" ... Reading data and sync files")
    df_sync = pd.read_csv(file_sync)
    match source:
        case "NeuroFlow (CSV)":
            df_data = pd.read_csv(file_list[0])
            print(" ... Splitting data.")
            list_trial_data = split_csv(df_data, df_sync)
        case "Axivity (CWA)":
            print(" ... Streaming data.")
            list_trial_data = split_axivity(file_list, df_sync)
        case "Bittium (EDF)":
            print(" ... Streaming data.")
            list_trial_data = split_bittium(str(file_list[0]), df_sync)
        case _:
            return False

    print(" ... Writing split DataFrames to .csv")
    if output_dir is None:
        output_dir = file_list[0].parent / "events"
    os.makedirs(output_dir, exist_ok=True)

    for t, trial in enumerate(list_trial_data):
        output_file = (
            Path(output_dir) / f"{file_list[0].stem}_event-{trial.attrs["label"]}.csv"
        )
        trial.to_csv(output_file, index=False)

    return output_dir

split_axivity(file_list, df_sync, sensor_names=None)

Stream .CWA files and split using event sync timestamps.

Parameters:
  • file_list (list) –

    list of paths to .CWA files

  • df_sync (DataFrame) –

    DataFrame of timestamps for sync start/end events

  • sensor_names (list of str, default: None ) –

    optional names for each sensor; defaults to file stem

Returns: list[pd.DataFrame]: list of DataFrames, one for each sync event

Source code in neuroflow/io/sync.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def split_axivity(file_list, df_sync, sensor_names=None):
    """
    Stream .CWA files and split using event sync timestamps.

    Args:
        file_list (list): list of paths to .CWA files
        df_sync (pd.DataFrame): DataFrame of timestamps for sync start/end events
        sensor_names (list of str): optional names for each sensor; defaults to file stem
    Returns:
        list[pd.DataFrame]: list of DataFrames, one for each sync event
    """
    if not isinstance(file_list, list):
        file_list = [file_list]

    df_sync = convert_timestamps(df_sync)

    list_trial_data = []
    for e, event in enumerate(df_sync["trial"].unique()):
        print("Processing event: ", e + 1)
        event_start = df_sync.loc[
            (df_sync["trial"] == event) & (df_sync["type"] == "start"), "time"
        ].values[0]
        event_stop = df_sync.loc[
            (df_sync["trial"] == event) & (df_sync["type"] == "stop"), "time"
        ].values[0]

        df_data_event = stream_axivity_timestamps(
            file_list, event_start, event_stop, sensor_names
        )
        df_data_event.attrs["label"] = event
        list_trial_data.append(df_data_event)

    return list_trial_data

split_bittium(edf_path, df_sync)

Stream .EDF file and split using event sync timestamps.

Parameters:
  • edf_path (str) –

    path to .EDF file

  • df_sync (DataFrame) –

    DataFrame of timestamps for sync start/end events

Returns:
  • list[pd.DataFrame]: list of DataFrames, one for each sync event

Source code in neuroflow/io/sync.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def split_bittium(edf_path, df_sync):
    """
    Stream .EDF file and split using event sync timestamps.

    Args:
        edf_path (str): path to .EDF file
        df_sync (pd.DataFrame): DataFrame of timestamps for sync start/end events

    Returns:
        list[pd.DataFrame]: list of DataFrames, one for each sync event
    """

    df_sync = convert_timestamps(df_sync)

    list_trial_data = []
    for e, event in enumerate(df_sync["trial"].unique()):
        event_start = df_sync.loc[
            (df_sync["trial"] == event) & (df_sync["type"] == "start"), "time"
        ].values[0]
        event_stop = df_sync.loc[
            (df_sync["trial"] == event) & (df_sync["type"] == "stop"), "time"
        ].values[0]

        df_data_event = stream_bittium_timestamps(edf_path, event_start, event_stop)
        df_data_event.attrs["label"] = event
        list_trial_data.append(df_data_event)

    return list_trial_data

split_csv(df_data, df_sync)

Split standardized data DataFrames using event sync timestamps.

Parameters:
  • df_data (DataFrame) –

    DataFrame of data to split

  • df_sync (DataFrame) –

    DataFrame of timestamps for sync start/end events

Returns:
  • list[pd.DataFrame]: list of DataFrames, one for each sync event

Source code in neuroflow/io/sync.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def split_csv(df_data, df_sync):
    """
    Split standardized data DataFrames using event sync timestamps.

    Args:
        df_data (pd.DataFrame): DataFrame of data to split
        df_sync (pd.DataFrame): DataFrame of timestamps for sync start/end events

    Returns:
        list[pd.DataFrame]: list of DataFrames, one for each sync event
    """
    df_data = convert_timestamps(df_data)
    df_sync = convert_timestamps(df_sync)

    list_trial_data = []
    for e, event in enumerate(df_sync["trial"].unique()):
        event_start = df_sync.loc[
            (df_sync["trial"] == event) & (df_sync["type"] == "start"), "time"
        ].values[0]
        event_stop = df_sync.loc[
            (df_sync["trial"] == event) & (df_sync["type"] == "stop"), "time"
        ].values[0]

        mask_data = np.logical_and(
            df_data["time"] >= event_start, df_data["time"] < event_stop
        )
        df_data_event = df_data.loc[mask_data].copy()
        df_data_event.attrs["label"] = event

        list_trial_data.append(df_data_event)

    return list_trial_data

test_stream_cwa

stream_axivity_timestamps(file_list, time_start, time_stop, sensor_names=None)

Load multiple Axivity .cwa files and return a wide DataFrame.

Parameters:
  • file_list (list of str/Path) –

    paths to .cwa files

  • time_start (datetime64) –

    start time of streaming window

  • time_end (datetime64) –

    end time of streaming window

  • sensor_names (list of str, default: None ) –

    optional names for each sensor; defaults to file stem

Returns:
  • pd.DataFrame: wide-format IMU data with columns: time, _ax, _ay, _az, _gx, _gy, _gz

Source code in neuroflow/io/test_stream_cwa.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def stream_axivity_timestamps(file_list, time_start, time_stop, sensor_names=None):
    """
    Load multiple Axivity .cwa files and return a wide DataFrame.

    Args:
        file_list (list of str/Path): paths to .cwa files
        time_start (np.datetime64): start time of streaming window
        time_end (np.datetime64): end time of streaming window
        sensor_names (list of str): optional names for each sensor; defaults to file stem

    Returns:
        pd.DataFrame: wide-format IMU data with columns:
                      time, <sensor>_ax, <sensor>_ay, <sensor>_az, <sensor>_gx, <sensor>_gy, <sensor>_gz
    """
    if sensor_names is None:
        sensor_names = [autodetect_sensor_location(Path(f).stem) for f in file_list]

        sensor_dfs = []

        for f, name in zip(file_list, sensor_names):
            print(f"Streaming {f} ...")
            df_data = stream_cwa_file(f, time_start, time_stop)
            df_data = _sensor_column_names(df_data, cwa_path, sensor=name)

            sensor_dfs.append(df_data)

        df_wide = reduce(
            lambda left, right: pd.merge_asof(
                left.sort_values("time"), right.sort_values("time"), on="time"
            ),
            sensor_dfs,
        )

    return df_wide

stream_cwa_file(cwa_path, time_start, time_end, sensor=None)

Stream an Axivity .CWA file and return a DataFrame.

Parameters:
  • cwa_path (str) –

    path to .CWA file

  • time_start (datetime64) –

    start time of streaming window

  • time_end (datetime64) –

    end time of streaming window

Returns:
  • pd.DataFrame: IMU data with columns: time, ax, ay, az, gx, gy, gz

Source code in neuroflow/io/test_stream_cwa.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def stream_cwa_file(
    cwa_path: str, time_start: np.datetime64, time_end: np.datetime64, sensor=None
):
    """
    Stream an Axivity .CWA file and return a DataFrame.

    Args:
        cwa_path (str): path to .CWA file
        time_start (np.datetime64): start time of streaming window
        time_end (np.datetime64): end time of streaming window

    Returns:
        pd.DataFrame: IMU data with columns:
                      time, ax, ay, az, gx, gy, gz
    """
    with open(cwa_path, "rb") as f:
        # get file size to know when to stop
        file_size = os.fstat(f.fileno()).st_size
        current_position = SECTOR_SIZE

        list_datablocks = []
        # iterate through each data block
        while current_position < file_size:
            data_block = f.read(SECTOR_SIZE)
            current_position += SECTOR_SIZE

            # parse the data block to get its timestamp
            data = parse_cwa_data(data_block)

            # skip invalid blocks
            if not data or "timestamp" not in data:
                continue

            # convert the UNIX timestamp to a datetime object
            data_duration = data["sampleCount"] / data["frequency"]
            block_stop = np.datetime64(pd.to_datetime(data["timestampTime"]))
            block_start = np.datetime64(
                block_stop - pd.DateOffset(seconds=data_duration)
            )

            # check if the block is within the desired time range
            if block_stop < time_start:
                # still before the start time, continue to the next block
                continue
            elif block_start > time_end:
                # past the end time, stop streaming
                break
            else:
                data_samples = parse_cwa_data(data_block, extractData=True)
                # Yield the data for blocks within the range
                list_datablocks.append(
                    (
                        block_start,
                        block_stop,
                        data_duration,
                        data["frequency"],
                        data_samples,
                    )
                )

    # convert datablocks to DataFrame
    df_data = _blocks2dataframe(list_datablocks)

    return df_data

window

window2csv(file_list, file_times, duration_window, number_window, source='NeuroFlow (CSV)', output_dir=None)

Split standardized data DataFrames using event sync timestamps, and writes each file to .csv.

Parameters:
  • file_list (list) –

    list of file paths of standardized data .csv (or raw)

  • file_times (Path) –

    file path of timestamps .csv

Returns:
  • output_dir

    Path to output directory

Source code in neuroflow/io/window.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def window2csv(
    file_list,
    file_times,
    duration_window,
    number_window,
    source="NeuroFlow (CSV)",
    output_dir=None,
):
    """
    Split standardized data DataFrames using event sync timestamps, and writes each file to .csv.

    Args:
        file_list (list): list of file paths of standardized data .csv (or raw)
        file_times (Path): file path of timestamps .csv

    Returns:
        output_dir: Path to output directory
    """

    print(" ... Reading data and sync files")
    df_times = pd.read_csv(file_times)
    match source:
        case "NeuroFlow (CSV)":
            df_data = pd.read_csv(file_list[0])
            print(" ... Windowing data.")
            list_trial_data = window_csv(
                df_data, df_times, duration_window, number_window
            )
        case "Axivity (CWA)":
            print(" ... Windowing data.")
            list_trial_data = window_axivity(
                file_list, df_times, duration_window, number_window
            )
        case "Bittium (EDF)":
            print(" ... Windowing data.")
            file_path = str(file_list[0])
            list_trial_data = window_bittium(
                file_path, df_times, duration_window, number_window
            )
        case _:
            return False

    print(" ... Writing split DataFrames to .csv")
    if output_dir is None:
        output_dir = file_list[0].parent / "windows"
    os.makedirs(output_dir, exist_ok=True)

    for t, trial in enumerate(list_trial_data):
        output_file = (
            Path(output_dir) / f"{file_list[0].stem}_window-{trial.attrs["label"]}.csv"
        )
        trial.to_csv(output_file, index=False)

    return output_dir

window_axivity(file_list, df_times, duration_window, number_window, sensor_names=None)

Stream .CWA files and window using timestamps.

Parameters:
  • file_list (list) –

    list of paths to .CWA files

  • df_times (DataFrame) –

    DataFrame of timestamps for window center

  • duration_window (int) –

    Seconds

  • number_window (int) –

    Number of pre/post windows

  • sensor_names (list of str, default: None ) –

    optional names for each sensor; defaults to file stem

Returns: list[pd.DataFrame]: list of DataFrames, one for each sync event

Source code in neuroflow/io/window.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def window_axivity(
    file_list, df_times, duration_window, number_window, sensor_names=None
):
    """
    Stream .CWA files and window using timestamps.

    Args:
        file_list (list): list of paths to .CWA files
        df_times (pd.DataFrame): DataFrame of timestamps for window center
        duration_window (int): Seconds
        number_window (int): Number of pre/post windows
        sensor_names (list of str): optional names for each sensor; defaults to file stem
    Returns:
        list[pd.DataFrame]: list of DataFrames, one for each sync event
    """
    if not isinstance(file_list, list):
        file_list = [file_list]

    df_times = convert_timestamps(df_times)

    list_window_data = []
    for _, ref in df_times.iterrows():
        ref_label = ref["label"]
        ref_ts = ref["time"]
        for w in range(-number_window, number_window):
            win_start = ref_ts + pd.DateOffset(seconds=w * duration_window)
            win_end = win_start + pd.DateOffset(seconds=duration_window)
            win_lbl = str(win_start).replace(" ", "T").replace(":", "").replace("-", "")
            print(w, ref_label + "_" + win_lbl)

            try:
                df_window = stream_axivity_timestamps(
                    file_list, win_start, win_end, sensor_names
                )

                df_window.attrs["label"] = (
                    ref_label
                    + "_"
                    + str(win_start).replace(" ", "T").replace(":", "").replace("-", "")
                )
                list_window_data.append(df_window)
            except Exception as e:
                print(e)

    return list_window_data

metrics

nimbalwear

extract_steps(data_step_state, state_column)

Extracts step phases from nimbalwear state array into a table.

Parameters:
  • data_step_state (DataFrame) –

    Standardized DataFrame of IMU data with step state column.

  • state_column (str) –

    Name of step state column.

Returns: data_steps: Table of step information (phase start and end).

Source code in neuroflow/metrics/nimbalwear.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def extract_steps(data_step_state, state_column):
    """Extracts step phases from nimbalwear state array into a table.

    Args:
        data_step_state (pd.DataFrame): Standardized DataFrame of IMU data with step state column.
        state_column (str): Name of step state column.
    Returns:
        data_steps: Table of step information (phase start and end).
    """
    data = data_step_state[state_column].values
    list_step_phases = ["pushoff", "early-swing", "late-swing", "heelstrike"]
    dict_phases = {
        phase: {"arr": label(data == p + 1)[0], "num": label(data == p + 1)[1]}
        for p, phase in enumerate(list_step_phases)
    }

    # assert only complete step cycles exist
    assert dict_phases["pushoff"]["num"] == dict_phases["early-swing"]["num"]
    assert dict_phases["pushoff"]["num"] == dict_phases["late-swing"]["num"]
    assert dict_phases["pushoff"]["num"] == dict_phases["heelstrike"]["num"]
    n_steps = dict_phases["pushoff"]["num"]

    steps = []
    for s in range(1, n_steps + 1):
        for p, (pkey, pdata) in enumerate(dict_phases.items()):
            pidx = np.where(pdata["arr"] == s)[0]

            steps.append(
                {
                    "step_number": s,
                    "step_phase": p + 1,
                    "phase_label": pkey,
                    "phase_start": pidx[0],
                    "phase_end": pidx[-1],
                }
            )

    data_steps = pd.DataFrame(steps)

    print("Step extraction complete.")
    return data_steps

processing

ecg

pan_tompkins_detector(fs, ecg)

Jiapu Pan and Willis J. Tompkins. A Real-Time QRS Detection Algorithm. In: IEEE Transactions on Biomedical Engineering BME-32.3 (1985), pp. 230-236.

Source code in neuroflow/processing/ecg.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def pan_tompkins_detector(fs, ecg):
    """
    Jiapu Pan and Willis J. Tompkins.
    A Real-Time QRS Detection Algorithm.
    In: IEEE Transactions on Biomedical Engineering
    BME-32.3 (1985), pp. 230-236.
    """

    maxQRSduration = 0.150  # sec
    f1 = 5 / fs
    f2 = 15 / fs

    b, a = signal.butter(1, [f1 * 2, f2 * 2], btype="bandpass")

    filtered_ecg = signal.lfilter(b, a, ecg)

    diff = np.diff(filtered_ecg)

    squared = diff * diff

    N = int(maxQRSduration * fs)
    mwa = _moving_window_average(squared, N)
    mwa[: int(maxQRSduration * fs * 2)] = 0

    pks, _ = signal.find_peaks(mwa, distance=0.3 * fs, height=np.mean(mwa))
    # pks -= int(np.round(N/2))
    pks -= N

    return mwa, pks

rr_pantompkins(df_data, ecg_channel='ecg0')

Detect RR peaks from ECG.

Parameters:
  • df_data (DataFrame) –

    Standardized DataFrame of IMU data

Returns: df_data: DataFrame with RR events column.

Source code in neuroflow/processing/ecg.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def rr_pantompkins(df_data, ecg_channel="ecg0"):
    """Detect RR peaks from ECG.

    Args:
        df_data (pd.DataFrame): Standardized DataFrame of IMU data
    Returns:
        df_data: DataFrame with RR events column.
    """
    fs = get_sampling_info(df_data["time"])["frequency"]
    voltage_filt = _filter_ecg(df_data[ecg_channel], fs)

    _, pks = pan_tompkins_detector(fs, voltage_filt)

    # create an RR event column
    rr_events = np.zeros(len(voltage_filt))
    rr_events[pks] = 100

    df_data[f"ecg_bittium_{ecg_channel}"] = rr_events

    return df_data

gait

armswing_paradigma(df_data, paradigma_params={})

Detect armswing using the paradigma implementation: https://github.com/biomarkersParkinson/paradigma

Parameters:
  • df_data (DataFrame) –

    Standardized DataFrame of IMU data

  • paradigma_params (dict, default: {} ) –

    Key/value pairs of detector parameters; yz_columns = ["y", "z"],

Returns:
  • pd.DataFrame of data with appended column named

  • gait_paradigma_wrist

  • coding arm swing angle (degrees).

Source code in neuroflow/processing/gait.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def armswing_paradigma(df_data, paradigma_params={}):
    """
    Detect armswing using the paradigma implementation: https://github.com/biomarkersParkinson/paradigma

    Args:
        df_data (pd.DataFrame): Standardized DataFrame of IMU data
        paradigma_params (dict): Key/value pairs of detector parameters;
            yz_columns = ["y", "z"],

    Returns:
        pd.DataFrame of data with appended column named
        gait_paradigma_<left/right>wrist
        coding arm swing angle (degrees).
    """

    for side in ["right", "left"]:
        sensor_location = f"{side}wrist"
        if sum([sensor_location in col for col in df_data.columns]):
            if not paradigma_params:
                a_arm = df_data[
                    [f"{sensor_location}_a{axis}" for axis in ["x", "y", "z"]]
                ].values
                g_arm = df_data[
                    [f"{sensor_location}_g{axis}" for axis in ["x", "y", "z"]]
                ].values
                wrist_columns = detect_wrist_axes(*a_arm.T)
                df_arm = pd.DataFrame(g_arm[:, wrist_columns], columns=["y", "z"])
            else:
                df_arm = df_data[
                    [
                        f"{sensor_location}_g{axis}"
                        for axis in paradigma_params["yz_columns"]
                    ]
                ]
                df_arm.columns = ["y", "z"]

            # correct for any nan values
            df_arm["y"] = pd.Series(df_arm["y"].values).interpolate().bfill().values
            df_arm["z"] = pd.Series(df_arm["z"].values).interpolate().bfill().values

            vel = pca_transform_gyroscope(df_arm, "y", "z")
            fs = get_sampling_info(df_data["time"])["frequency"]

            time_array = np.linspace(0, len(vel) / fs, len(vel))
            angle = compute_angle(time_array, vel)
            angle = remove_moving_average_angle(angle, fs)

            # append event array to sensor dataframe
            df_data[f"gait_paradigma_{sensor_location}"] = angle

    return df_data

steps_nimbalwear(df_data, nimbalwear_params={})

Detect steps using nimbalwear algorithm: https://github.com/nimbal/nimbalwear/

Parameters:
  • df_data (DataFrame) –

    Standardized DataFrame of IMU data

  • nimbalwear_params (dict, default: {} ) –

    Key/value pairs of detector parameters; pushoff_threshold = 0.85, pushoff_time: float = 0.4, swing_phase_time: float = 0.2, heel_strike_detect_time: float = 0.5, heel_strike_threshold: int = -5, foot_down_time: float = 0.05

Returns:
  • pd.DataFrame of data with appended column named

  • gait_nimbalwear_ankle

  • coding gait "state" array.

Source code in neuroflow/processing/gait.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def steps_nimbalwear(df_data, nimbalwear_params={}):
    """
    Detect steps using nimbalwear algorithm: https://github.com/nimbal/nimbalwear/

    Args:
        df_data (pd.DataFrame): Standardized DataFrame of IMU data
        nimbalwear_params (dict): Key/value pairs of detector parameters;
            pushoff_threshold = 0.85,
            pushoff_time: float = 0.4,
            swing_phase_time: float = 0.2,
            heel_strike_detect_time: float = 0.5,
            heel_strike_threshold: int = -5,
            foot_down_time: float = 0.05

    Returns:
        pd.DataFrame of data with appended column named
        gait_nimbalwear_<left/right>ankle
        coding gait "state" array.
    """

    for side in ["right", "left"]:
        sensor_location = f"{side}ankle"
        if sum([sensor_location in col for col in df_data.columns]):
            a_vrt = detect_vert(
                df_data[
                    [f"{sensor_location}_a{axis}" for axis in ["x", "y", "z"]]
                ].values.T
            )

            # get nimbalwear pushoff_df.csv
            data_path = files("nimbalwear")
            pushoff_path = data_path.joinpath("data/pushoff_df.csv")
            df_pushoff = pd.read_csv(pushoff_path)
            # get sampling rate
            fs = get_sampling_info(df_data["time"])["frequency"]
            # check if vertical needs to be flipped to positive
            if np.mean(a_vrt) < 0:
                a_vrt *= -1
            # adjust for any nan values
            a_vrt = pd.Series(a_vrt).interpolate().bfill().values
            # run nimbalwear step detector
            state_arr, _, _, _ = detect_steps(
                a_vrt, fs, df_pushoff, **nimbalwear_params
            )

            # append event array to sensor dataframe
            df_data[f"gait_nimbalwear_{sensor_location}"] = state_arr

    return df_data

utils

cwa

parse_cwa_data(block, extractData=False)

(Slow) parser for a single block.

Source code in neuroflow/utils/cwa.py
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def parse_cwa_data(block, extractData=False):
    """(Slow) parser for a single block."""
    data = {}
    if len(block) >= 512:
        packetHeader = unpack(
            "BB", block[0:2]
        )  # @ 0  +2   ASCII "AX", little-endian (0x5841)
        packetLength = unpack("<H", block[2:4])[
            0
        ]  # @ 2  +2   Packet length (508 bytes, with header (4) = 512 bytes total)
        if (
            packetHeader[0] == ord("A")
            and packetHeader[1] == ord("X")
            and packetLength == 508
            and _checksum(block[0:512]) == 0
        ):
            # checksum = unpack('<H', block[510:512])[0]               # @510 +2   Checksum of packet (16-bit word-wise sum of the whole packet should be zero)

            deviceFractional = unpack("<H", block[4:6])[
                0
            ]  # @ 4  +2   Top bit set: 15-bit fraction of a second for the time stamp, the timestampOffset was already adjusted to minimize this assuming ideal sample rate; Top bit clear: 15-bit device identifier, 0 = unknown;
            data["deviceFractional"] = deviceFractional
            data["sessionId"] = unpack("<I", block[6:10])[
                0
            ]  # @ 6  +4   Unique session identifier, 0 = unknown
            data["sequenceId"] = unpack("<I", block[10:14])[
                0
            ]  # @10  +4   Sequence counter (0-indexed), each packet has a new number (reset if restarted)
            timestamp = _parse_timestamp(
                unpack("<I", block[14:18])[0]
            )  # @14  +4   Last reported RTC value, 0 = unknown
            light = unpack("<H", block[18:20])[
                0
            ]  # @18  +2   Lower 10 bits are the last recorded light sensor value in raw units, 0 = none #  log10LuxTimes10Power3 = ((value + 512.0) * 6000 / 1024); lux = pow(10.0, log10LuxTimes10Power3 / 1000.0);
            data["light"] = light & 0x3FF  # least-significant 10 bits
            temperature = unpack("<H", block[20:22])[
                0
            ]  # @20  +2   Last recorded temperature sensor value in raw units, 0 = none
            data["temperature"] = temperature * 75.0 / 256 - 50
            data["events"] = unpack("B", block[22:23])[
                0
            ]  # @22  +1   Event flags since last packet, b0 = resume logging, b1 = reserved for single-tap event, b2 = reserved for double-tap event, b3 = reserved, b4 = reserved for diagnostic hardware buffer, b5 = reserved for diagnostic software buffer, b6 = reserved for diagnostic internal flag, b7 = reserved)
            battery = unpack("B", block[23:24])[
                0
            ]  # @23  +1   Last recorded battery level in raw units, 0 = unknown
            data["battery"] = (battery + 512.0) * 6000 / 1024 / 1000.0
            rateCode = unpack("B", block[24:25])[
                0
            ]  # @24  +1   Sample rate code, frequency (3200/(1<<(15-(rate & 0x0f)))) Hz, range (+/-g) (16 >> (rate >> 6)).
            data["rateCode"] = rateCode
            numAxesBPS = unpack("B", block[25:26])[
                0
            ]  # @25  +1   0x32 (top nibble: number of axes = 3; bottom nibble: packing format - 2 = 3x 16-bit signed, 0 = 3x 10-bit signed + 2-bit exponent)
            data["numAxesBPS"] = numAxesBPS
            timestampOffset = unpack("<h", block[26:28])[
                0
            ]  # @26  +2   Relative sample index from the start of the buffer where the whole-second timestamp is valid
            data["sampleCount"] = unpack("<H", block[28:30])[
                0
            ]  # @28  +2   Number of accelerometer samples (40/80/120, depending on format, if this sector is full)
            # rawSampleData[480] = block[30:510]                      # @30  +480 Raw sample data.  Each sample is either 3x 16-bit signed values (x, y, z) or one 32-bit packed value (The bits in bytes [3][2][1][0]: eezzzzzz zzzzyyyy yyyyyyxx xxxxxxxx, e = binary exponent, lsb on right)

            # range = 16 >> (rateCode >> 6)  ## Nearest configured frequency: 3200 / (2 ^ round(log2(3200 / frequency)))
            if (
                rateCode == 0x00
            ):  # Very old format used timestampOffset to indicate sample rate
                frequency = timestampOffset
            else:
                frequency = 3200 / (1 << (15 - (rateCode & 0x0F)))
            data["frequency"] = frequency

            timeFractional = 0
            # if top-bit set, we have a fractional date
            if deviceFractional & 0x8000:
                # Need to undo backwards-compatible shim by calculating how many whole samples the fractional part of timestamp accounts for.
                timeFractional = (
                    deviceFractional & 0x7FFF
                ) << 1  # use original deviceId field bottom 15-bits as 16-bit fractional time
                timestampOffset += (
                    timeFractional * int(frequency)
                ) >> 16  # undo the backwards-compatible shift (as we have a true fractional)

            # Add fractional time to timestamp
            timestamp += timeFractional / 65536

            data["timestamp"] = timestamp
            data["timestampOffset"] = timestampOffset

            data["timestampTime"] = _timestamp_string(data["timestamp"])

            # Maximum samples per sector
            channels = (numAxesBPS >> 4) & 0x0F
            bytesPerAxis = numAxesBPS & 0x0F
            bytesPerSample = 4
            if bytesPerAxis == 0 and channels == 3:
                bytesPerSample = 4
            elif bytesPerAxis > 0 and channels > 0:
                bytesPerSample = bytesPerAxis * channels
            samplesPerSector = 480 // bytesPerSample
            data["channels"] = channels
            data["bytesPerAxis"] = bytesPerAxis  # 0 for DWORD packing
            data["bytesPerSample"] = bytesPerSample
            data["samplesPerSector"] = samplesPerSector

            # Estimate the time of the first/after-last sample (if at the configured rate)
            data["estimatedFirstSampleTime"] = timestamp - (timestampOffset / frequency)
            data["estimatedAfterLastSampleTime"] = data["estimatedFirstSampleTime"] + (
                samplesPerSector / frequency
            )

            # Axes
            accelAxis = -1
            gyroAxis = -1
            magAxis = -1
            if channels >= 6:
                gyroAxis = 0
                accelAxis = 3
                if channels >= 9:
                    magAxis = 6
            elif channels >= 3:
                accelAxis = 0

            # Default units/scaling/range
            accelUnit = 256  # 1g = 256
            gyroRange = 2000  # 32768 = 2000dps
            magUnit = 16  # 1uT = 16
            # light is least significant 10 bits, accel scale 3-MSB, gyro scale next 3 bits: AAAGGGLLLLLLLLLL
            accelUnit = 1 << (8 + ((light >> 13) & 0x07))
            if ((light >> 10) & 0x07) != 0:
                gyroRange = 8000 // (1 << ((light >> 10) & 0x07))

            # Scale
            # accelScale = 1.0 / accelUnit
            # gyroScale = float(gyroRange) / 32768
            # magScale = 1.0 / magUnit

            # Range
            accelRange = 16
            if rateCode != 0:
                accelRange = 16 >> (rateCode >> 6)
            magRange = 32768 / magUnit

            # Unit
            gyroUnit = 32768.0 / gyroRange

            if accelAxis >= 0:
                data["accelAxis"] = accelAxis
                data["accelRange"] = accelRange
                data["accelUnit"] = accelUnit
            if gyroAxis >= 0:
                data["gyroAxis"] = gyroAxis
                data["gyroRange"] = gyroRange
                data["gyroUnit"] = gyroUnit
            if magAxis >= 0:
                data["magAxis"] = magAxis
                data["magRange"] = magRange
                data["magUnit"] = magUnit

            # Read sample values
            if extractData:
                if accelAxis >= 0:
                    # accelSamples = [[0, 0, 0]] * data["sampleCount"]
                    accelSamples = [[0, 0, 0] for _ in range(data['sampleCount'])]
                    if bytesPerAxis == 0 and channels == 3:
                        for i in range(data["sampleCount"]):
                            ofs = 30 + i * 4
                            # value =  block[i] | (block[i + 1] << 8) | (block[i + 2] << 16) | (block[i + 3] << 24)
                            value = unpack("<I", block[ofs : ofs + 4])[0]
                            axes = _dword_unpack(value)
                            accelSamples[i][0] = axes[0] / accelUnit
                            accelSamples[i][1] = axes[1] / accelUnit
                            accelSamples[i][2] = axes[2] / accelUnit
                    elif bytesPerAxis == 2:
                        for i in range(data["sampleCount"]):
                            ofs = 30 + (i * 2 * channels) + 2 * accelAxis
                            accelSamples[i][0] = (
                                block[ofs + 0] | (block[ofs + 1] << 8)
                            ) / accelUnit
                            accelSamples[i][1] = (
                                block[ofs + 2] | (block[ofs + 3] << 8)
                            ) / accelUnit
                            accelSamples[i][2] = (
                                block[ofs + 4] | (block[ofs + 5] << 8)
                            ) / accelUnit
                    data["samplesAccel"] = accelSamples

                if gyroAxis >= 0 and bytesPerAxis == 2:
                    # gyroSamples = [[0, 0, 0]] * data["sampleCount"]
                    gyroSamples = [[0, 0, 0] for _ in range(data['sampleCount'])]
                    for i in range(data["sampleCount"]):
                        ofs = 30 + (i * 2 * channels) + 2 * gyroAxis
                        gyroSamples[i][0] = (
                            block[ofs + 0] | (block[ofs + 1] << 8)
                        ) / gyroUnit
                        gyroSamples[i][1] = (
                            block[ofs + 2] | (block[ofs + 3] << 8)
                        ) / gyroUnit
                        gyroSamples[i][2] = (
                            block[ofs + 4] | (block[ofs + 5] << 8)
                        ) / gyroUnit
                    data["samplesGyro"] = gyroSamples

                if magAxis >= 0 and bytesPerAxis == 2:
                    magSamples = [[0, 0, 0]] * data["sampleCount"]
                    for i in range(data["sampleCount"]):
                        ofs = 30 + (i * 2 * channels) + 2 * magAxis
                        magSamples[i][0] = (
                            block[ofs + 0] | (block[ofs + 1] << 8)
                        ) / magUnit
                        magSamples[i][1] = (
                            block[ofs + 2] | (block[ofs + 3] << 8)
                        ) / magUnit
                        magSamples[i][2] = (
                            block[ofs + 4] | (block[ofs + 5] << 8)
                        ) / magUnit
                    data["samplesMag"] = magSamples

    return data

signals

get_sampling_info(time)

Compute sampling period and frequency.

Parameters:
  • time (numpy array or pd.Series) –

    Array or Series of timestamps (dtype <M8[ns]).

Returns:
  • dict

    {"period": sampling period, "frequency": sampling frequency}

Source code in neuroflow/utils/signals.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def get_sampling_info(time):
    """
    Compute sampling period and frequency.

    Args:
        time (numpy array or pd.Series): Array or Series of timestamps (dtype <M8[ns]).

    Returns:
        dict: {"period": sampling period, "frequency": sampling frequency}
    """
    dt = np.mean(np.diff(time) / np.timedelta64(1, "s"))  # seconds
    fs = 1 / dt

    return {"period": dt, "frequency": fs}