# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.
"""
Non-causal filtering for post-facto biologger analysis.
This module provides filtfilt-based (bidirectional, zero-phase) filtering
to achieve R-compatibility for scientific validation. Unlike the streaming
module which uses causal lfilter, these filters look both forward and backward
in time, which is only possible for post-hoc analysis.
Key functions:
- gsep_filtfilt: R-compatible gravitational separation using scipy.signal.filtfilt
"""
from collections import deque
from typing import cast
import numpy as np
from numpy.typing import NDArray
from scipy.signal import butter, filtfilt
[docs]
def gsep_batch_circular(
accel_array: NDArray[np.float64], filt_len: int
) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
"""
Batch Gsep with circular wrapping - exact R match.
This matches R's: filter(xyz, filter=filt, sides=2, circular=TRUE)
Args:
accel_array (np.ndarray): Full accelerometer data, shape (N, 3)
filt_len (int): Filter window length
Returns:
tuple: (static, dynamic, ODBA, VeDBA) each shape (N, 3) or (N,)
"""
weights = np.ones(filt_len) / filt_len
# Apply R's circular filter to each axis
static = np.zeros_like(accel_array)
for axis in range(3):
# Pad with circular wrapping (centered: (filt_len-1)//2 left, filt_len//2 right)
pad_left = (filt_len - 1) // 2
pad_right = filt_len // 2
padded = np.pad(accel_array[:, axis], (pad_left, pad_right), mode="wrap")
# Convolve and extract valid region
static[:, axis] = np.convolve(padded, weights, mode="valid")
# Compute dynamic
dynamic = accel_array - static
# ODBA and VeDBA
odba = np.sum(np.abs(dynamic), axis=1)
vedba = np.sqrt(np.sum(dynamic**2, axis=1))
return static, dynamic, odba, vedba
[docs]
def gsep_streaming(
x: float,
y: float,
z: float,
filt_len: int,
buffer: deque[tuple[float, float, float]],
) -> tuple[float, float, float, float, float, float, float, float]:
"""
Streaming Gsep without circular wrapping - for non-circular mode.
This uses a centered filter on available samples only (no wrap-around).
Edge effects at first/last ~filt_len/2 samples.
Args:
x, y, z (float): Current accelerometer readings (0.1g units)
filt_len (int): Filter window length
buffer (deque): Rolling buffer of samples
Returns:
tuple: (static_x, static_y, static_z, dyn_x, dyn_y, dyn_z, ODBA, VeDBA)
"""
buffer.append((x, y, z))
# Warmup period
if len(buffer) < filt_len:
return (float("nan"),) * 8
accel_array = np.array(list(buffer))
weights = np.ones(filt_len) / filt_len
# Centered filter: compute static for center point of buffer
# For streaming, we want the filtered value at the center of our window
center_idx = filt_len // 2
static_filtered = np.convolve(accel_array[:, 0], weights, mode="same")
static_x = float(static_filtered[center_idx])
static_filtered = np.convolve(accel_array[:, 1], weights, mode="same")
static_y = float(static_filtered[center_idx])
static_filtered = np.convolve(accel_array[:, 2], weights, mode="same")
static_z = float(static_filtered[center_idx])
# Dynamic acceleration (for center point)
dyn_x = float(accel_array[center_idx, 0] - static_x)
dyn_y = float(accel_array[center_idx, 1] - static_y)
dyn_z = float(accel_array[center_idx, 2] - static_z)
# ODBA and VeDBA
odba = abs(dyn_x) + abs(dyn_y) + abs(dyn_z)
vedba = np.sqrt(dyn_x**2 + dyn_y**2 + dyn_z**2)
return (static_x, static_y, static_z, dyn_x, dyn_y, dyn_z, odba, vedba)
[docs]
def butterworth_filtfilt(
data: NDArray[np.float64], cutoff: float, fs: float, order: int = 5
) -> NDArray[np.float64]:
"""
Apply Butterworth low-pass filter using filtfilt for zero-phase filtering.
Args:
data (np.ndarray): Input signal
cutoff (float): Cutoff frequency in Hz
fs (float): Sampling frequency in Hz
order (int): Filter order (default 5)
Returns:
np.ndarray: Filtered signal
"""
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype="low", analog=False)
return cast(NDArray[np.float64], filtfilt(b, a, data, padtype="even"))