Source code for biologger_sim.processors.lab

# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.
#
# PostFactoProcessor - R-Compatible Post-Hoc Biologger Analysis
# ==============================================================
#
# Non-causal processor for post-facto biologger data analysis.
# Achieves perfect R tie-out (<0.1° error target) through R-compatible filtering.
#
# Key Differences from StreamingProcessor:
#   - NON-CAUSAL: Uses centered filter (R's filter(sides=2, circular=TRUE))
#     instead of lfilter (causal)
#   - R-COMPATIBLE: Exact match with gRumble R package for scientific validation
#   - FIXED MEMORY: Streaming architecture with fixed buffer size (not in-memory batch)
#   - POST-HOC: Analysis after data collection complete, not real-time
#
# Processing Architecture:
#   1. Buffering: Maintains filt_len sample window for centered filter
#   2. Gsep: R-style centered filter (sides=2, circular=TRUE) for static/dynamic separation
#   3. Orientation: R-style pitchRoll2 from static acceleration
#   4. Output: Same schema as StreamingProcessor for drop-in compatibility
#
# For complete architecture documentation, see:
#   biologger_pseudotrack/postfacto/README.md (TODO)

import logging
import math
from collections import deque
from typing import Any, cast

import numpy as np
from numpy.typing import NDArray

from biologger_sim.core.numeric_utils import safe_float
from biologger_sim.core.processor_interface import BiologgerProcessor
from biologger_sim.core.types import ClockSource, DepthAlgorithm, DepthConfig
from biologger_sim.functions.filters import gsep_batch_circular, gsep_streaming
from biologger_sim.functions.rotation import xb, yb
from biologger_sim.io.zmq_publisher import ZMQPublisher


[docs] class PostFactoProcessor(BiologgerProcessor): """ Post-facto (non-causal) biologger processor for R-compatibility. This processor uses R's centered moving average filter (filter(sides=2, circular=TRUE)) to achieve exact tie-out with the gRumble R package. Unlike StreamingProcessor which uses causal lfilter (trailing window), this uses a centered window looking both forward and backward in time, which is only possible for post-hoc analysis. Memory Footprint: - Fixed size: O(filt_len) samples (48 samples @ 16Hz for 3s window) - Independent of dataset size (unlike batch/ which loads all data) Validation Target: - <0.1° error vs R (pitch, roll, heading) - Exact ODBA/VeDBA match """ def __init__( self, filt_len: int = 48, freq: int = 16, debug_level: int = 0, r_exact_mode: bool = False, compute_attachment_angles: bool = True, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, compute_mag_offsets: bool = True, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None, depth_cfg: DepthConfig | None = None, zmq_publisher: ZMQPublisher | None = None, eid: int | None = None, sim_id: str | None = None, tag_id: str = "unknown", clock_source: ClockSource = ClockSource.FIXED_FREQ, **kwargs: Any, ) -> None: """ Initialize PostFactoProcessor. Args: filt_len (int): Filter window length in samples (default 48 = 3s @ 16Hz) freq (int): Sampling frequency in Hz (default 16Hz) debug_level (int): Debug verbosity (0=off, 1=basic, 2=detailed) r_exact_mode (bool): Enable full R-exact compatibility (batch calibration from full dataset) compute_attachment_angles (bool): Compute attachment angles from data (True) or use locked values (False) locked_attachment_roll_deg (float): Pre-computed attachment roll angle (degrees) locked_attachment_pitch_deg (float): Pre-computed attachment pitch angle compute_mag_offsets (bool): Compute magnetometer offsets from data (True) or use locked values (False) locked_mag_offset_x (float): Pre-computed mag X-axis hard iron offset locked_mag_offset_y (float): Pre-computed mag Y-axis hard iron offset locked_mag_offset_z (float): Pre-computed mag Z-axis hard iron offset locked_mag_sphere_radius (float): Pre-computed mag calibration sphere radius depth_cfg (DepthConfig | None): Depth estimation configuration zmq_publisher (ZMQPublisher | None): Optional ZMQ publisher for real-time viz eid (int | None): Entity identifier for ZMQ publishing sim_id (str | None): Simulation view name for ZMQ publishing clock_source (ClockSource): Time step source for integration (FIXED_FREQ or SENSOR_TIME). **kwargs: Additional parameters (for compatibility) """ self.filt_len = filt_len self.freq = freq self.debug_level = debug_level self.r_exact_mode = r_exact_mode self.clock_source = clock_source self.true_integration = clock_source == ClockSource.SENSOR_TIME self.compute_attachment_angles = compute_attachment_angles self.compute_mag_offsets = compute_mag_offsets # Handle Depth Configuration if depth_cfg is None: depth_cfg = DepthConfig() # Derive interpolation flag from algorithm choice # Used for buffer initialization below enable_depth_interpolation = depth_cfg.algorithm == DepthAlgorithm.ACAUSAL_INTERP self.enable_depth_interpolation = enable_depth_interpolation self.zmq_publisher = zmq_publisher self.eid = eid self.sim_id = sim_id self.tag_id = tag_id # Locked calibration parameters (for streaming mode) self.locked_attachment_roll_rad = ( math.radians(locked_attachment_roll_deg) if locked_attachment_roll_deg is not None else None ) self.locked_attachment_pitch_rad = ( math.radians(locked_attachment_pitch_deg) if locked_attachment_pitch_deg is not None else None ) self.locked_mag_offset = ( np.array( [ locked_mag_offset_x, locked_mag_offset_y, locked_mag_offset_z, locked_mag_sphere_radius, ] ) if all( x is not None for x in [ locked_mag_offset_x, locked_mag_offset_y, locked_mag_offset_z, locked_mag_sphere_radius, ] ) else None ) # Computed calibration parameters (set during batch processing) self.computed_attachment_roll_rad: float | None = None self.computed_attachment_pitch_rad: float | None = None self.computed_mag_offset: NDArray[np.float64] | None = None # Circular buffer for R-style centered filter (needs filt_len samples) self.accel_buffer: deque[tuple[float, float, float]] = deque(maxlen=filt_len) self.mag_buffer: deque[tuple[float, float, float]] | None = ( deque(maxlen=filt_len) if compute_mag_offsets or self.locked_mag_offset is not None else None ) self.depth_buffer: deque[float] | None = deque() if enable_depth_interpolation else None # Record buffer for delayed processing (to align with centered filter output) # The centered filter introduces a delay of filt_len // 2 samples # We need to buffer the original records to emit them with the correct filtered values self.record_buffer: deque[dict[str, Any]] = deque(maxlen=filt_len) # Batch calibration data storage (only used in r_exact_mode) self.batch_accel_data: list[list[float]] | None = [] if r_exact_mode else None self.batch_mag_data: list[list[float]] | None = [] if r_exact_mode else None self.batch_depth_data: list[float] | None = ( [] if r_exact_mode and enable_depth_interpolation else None ) self.batch_velocity_data: list[float] | None = [] if r_exact_mode else None self.batch_timestamps: list[float] | None = [] if r_exact_mode else None self.batch_results: dict[str, Any] | None = None # Store pre-computed results for Pass 2 self.calibration_complete = False # Record counter self.record_count = 0 self.valid_record_indices: set[int] = set() self.processing_idx = 0 # Dead Reckoning State self.pseudo_x = 0.0 self.pseudo_y = 0.0 # Sub-second Interpolation State self.last_ts = -1.0 self.sub_sec_idx = 0 # Logger self.logger = logging.getLogger(__name__) if debug_level > 0: self.logger.setLevel(logging.DEBUG) self.logger.info( f"PostFactoProcessor initialized: filt_len={filt_len}, freq={freq}Hz, " f"r_exact_mode={r_exact_mode}, clock_source={clock_source}, " f"compute_attachment_angles={compute_attachment_angles}, " f"compute_mag_offsets={compute_mag_offsets}, " f"enable_depth_interpolation={enable_depth_interpolation}" ) def _compute_attachment_angles_batch(self) -> None: """ Compute attachment angles from full dataset using R-style colMeans approach. This implements the two-step rotation correction from swordRED_pseudotrack-clean.R (lines 135-147): 1. Calculate roll from mean Y/Z components 2. Apply Xb(roll) rotation, recalculate mean 3. Calculate pitch from rotated mean X/Z components """ if not self.batch_accel_data: self.logger.warning( "No batch acceleration data collected for attachment angle computation" ) return # Convert list of [x, y, z] to numpy array accel_array = np.array(self.batch_accel_data) # Step 1: Calculate mean acceleration (R's colMeans) pos_mean = accel_array.mean(axis=0) # Step 2: Calculate roll from Y/Z components roll = np.arctan2(pos_mean[1], pos_mean[2]) if self.debug_level >= 1: self.logger.debug(f"Batch calibration - posMean_Accel: {np.round(pos_mean, 3)}") self.logger.debug(f"Batch calibration - roll: {roll:.6f} rad ({np.degrees(roll):.2f}°)") # Step 3: Rotate accelerometer data by roll (R's %*% Xb(roll)) accel_rotated = accel_array @ xb(roll) # Step 4: Recalculate mean from rotated data pos_mean2 = accel_rotated.mean(axis=0) if self.debug_level >= 1: self.logger.debug(f"Batch calibration - posMean_AccelR: {np.round(pos_mean2, 3)}") # Step 5: Calculate pitch from rotated X/Z components pitch = np.arctan(pos_mean2[0] / pos_mean2[2]) if self.debug_level >= 1: self.logger.debug( f"Batch calibration - pitch: {pitch:.6f} rad ({np.degrees(pitch):.2f}°)" ) # Store computed values self.computed_attachment_roll_rad = roll self.computed_attachment_pitch_rad = pitch self.logger.info( f"Computed attachment angles: roll={np.degrees(roll):.3f}°, " f"pitch={np.degrees(pitch):.3f}°" ) def _compute_mag_offset_batch(self) -> None: """ Compute magnetometer hard iron offsets from full dataset using R's MagOffset function. This implements MagOffset from swordRED_pseudotrack-clean.R (lines 21-33): Hard iron calibration using least squares to find sphere center and radius. """ if not self.batch_mag_data: self.logger.warning("No batch magnetometer data collected for offset computation") return # Convert list of [x, y, z] to numpy array mag_array = np.array(self.batch_mag_data) # R's MagOffset function: # A = cbind(Mag[,1]*2, Mag[,2]*2, Mag[,3]*2, 1) # f = matrix(Mag[,1]^2 + Mag[,2]^2 + Mag[,3]^2, ncol=1) # C = solve(crossprod(A), crossprod(A, f)) # rad = sqrt((C[1]^2 + C[2]^2 + C[3]^2) + C[4]) a_mat = np.column_stack( [ mag_array[:, 0] * 2, mag_array[:, 1] * 2, mag_array[:, 2] * 2, np.ones(len(mag_array)), ] ) f = (mag_array[:, 0] ** 2 + mag_array[:, 1] ** 2 + mag_array[:, 2] ** 2).reshape(-1, 1) # Solve: C = (A^T A)^-1 (A^T f) c_vec = np.linalg.solve(a_mat.T @ a_mat, a_mat.T @ f).flatten() # Calculate sphere radius rad = np.sqrt(c_vec[0] ** 2 + c_vec[1] ** 2 + c_vec[2] ** 2 + c_vec[3]) # Store as [offset_x, offset_y, offset_z, radius] self.computed_mag_offset = np.array([c_vec[0], c_vec[1], c_vec[2], rad]) if self.debug_level >= 1: self.logger.debug( f"Batch calibration - MagOffset: {np.round(self.computed_mag_offset, 3)}" ) self.logger.info( f"Computed mag offsets: x={c_vec[0]:.3f}, y={c_vec[1]:.3f}, " f"z={c_vec[2]:.3f}, radius={rad:.3f}" )
[docs] def calibrate_from_batch_data(self) -> None: """ Perform batch calibration from collected data. This should be called after the first pass through the dataset in r_exact_mode. Computes attachment angles and magnetometer offsets from full dataset. """ if not self.r_exact_mode: self.logger.warning("calibrate_from_batch_data called but r_exact_mode is False") return self.logger.info("Starting batch calibration from full dataset...") # Compute attachment angles if requested if self.compute_attachment_angles and self.batch_accel_data: self._compute_attachment_angles_batch() # Compute magnetometer offsets if requested if self.compute_mag_offsets and self.batch_mag_data: self._compute_mag_offset_batch() # Pre-compute all results for Pass 2 self._process_batch_data() self.calibration_complete = True self.logger.info("Batch calibration complete")
def _process_batch_data(self) -> None: """ Process the entire batch of data using R-exact methods (circular filtering). Populates self.batch_results. """ if not self.batch_accel_data: return self.logger.info("Processing full batch with circular Gsep...") accel_data = np.array(self.batch_accel_data) # Get attachment angles att_roll, att_pitch = self._get_attachment_angles() # Rotate accelerometer data if att_roll is not None and att_pitch is not None: # Apply Xb(roll) then Yb(pitch) rotation (R-style: Data %*% Rotation) accel_rotated = accel_data @ xb(att_roll) accel_rotated = accel_rotated @ yb(att_pitch) else: accel_rotated = accel_data # Apply Gsep with circular wrapping static, dynamic, odba, vedba = gsep_batch_circular(accel_rotated, self.filt_len) # Compute pitch/roll from static components (R's pitchRoll2) static_mag = np.sqrt(np.sum(static**2, axis=1)) # Avoid division by zero static_mag[static_mag == 0] = 1.0 static_norm = static / static_mag[:, np.newaxis] pitch_deg = -np.degrees( np.arctan2(static_norm[:, 0], np.sqrt(static_norm[:, 1] ** 2 + static_norm[:, 2] ** 2)) ) roll_deg = np.degrees(np.arctan2(static_norm[:, 1], static_norm[:, 2])) # Store results self.batch_results = { "static": static, "dynamic": dynamic, "ODBA": odba, "VeDBA": vedba, "pitch_deg": pitch_deg, "roll_deg": roll_deg, "pitch_rad": np.radians(pitch_deg), "roll_rad": np.radians(roll_deg), "accel_rotated": accel_rotated, } # Store depth if available (with interpolation) if self.batch_depth_data: depth_arr = np.array(self.batch_depth_data) # Interpolate NaNs if present (acausal/batch interpolation) nans = np.isnan(depth_arr) if np.any(nans): # Create x-axis indices x = np.arange(len(depth_arr)) # Interpolate nans using valid data points # Note: This handles gaps in the 1Hz/0.2Hz depth data depth_arr[nans] = np.interp(x[nans], x[~nans], depth_arr[~nans]) # Apply 5-second moving average smoothing (R-compatible) # R: stats::filter(dat$Depth, filter=rep(1,freq * 5) / (freq * 5)) window_size = int(self.freq * 5) if len(depth_arr) >= window_size: kernel = np.ones(window_size) / window_size # Use mode='same' to match centered filter behavior # Note: This introduces edge effects (zeros assumed outside) # To mitigate, we could pad, but for now we stick to simple convolution # or we can use a more robust method if needed. # R's stats::filter produces NAs at ends. We prefer valid values. # Let's use 'valid' and pad with edge values to avoid zero-assumption artifacts pad_width = window_size // 2 depth_padded = np.pad(depth_arr, pad_width, mode="edge") depth_smoothed = np.convolve(depth_padded, kernel, mode="valid") # Trim to original length if necessary # (should match exactly if window is odd/even handled right) # If window is even (e.g. 80), 'valid' on padded (N + 80) -> N + 1? # Let's check sizes. # Len(padded) = N + 2*pad_width. # Len(valid) = Len(padded) - window_size + 1 = N + 2*(W//2) - W + 1. # If W=80, pad=40. Len = N + 80 - 80 + 1 = N + 1. # We might have one extra sample. if len(depth_smoothed) > len(depth_arr): depth_smoothed = depth_smoothed[: len(depth_arr)] elif len(depth_smoothed) < len(depth_arr): # Should not happen with this padding logic usually pass depth_arr = depth_smoothed self.batch_results["Depth"] = depth_arr # Calculate vertical velocity (R-style: diff of depth) # dat$vertical_velocity <- c(0, diff(dat$Depth)) # dat$vertical_velocity <- c(stats::filter(dat$vertical_velocity, # filter = rep(1,5) / 5)) # 1. Calculate diff (prepend 0 to maintain length) vert_vel = np.diff(depth_arr, prepend=depth_arr[0]) # 2. Smooth # R code uses `rep(1,5)/5` which is explicitly 5 samples, despite comments in R code # mentioning "5 second running mean". We stick to the code implementation (5 samples) # rather than the comment intent (5 seconds) to ensure exact output matching. vv_window_size = 5 if len(vert_vel) >= vv_window_size: vv_kernel = np.ones(vv_window_size) / vv_window_size vv_pad_width = vv_window_size // 2 vv_padded = np.pad(vert_vel, vv_pad_width, mode="edge") vv_smoothed = np.convolve(vv_padded, vv_kernel, mode="valid") if len(vv_smoothed) > len(vert_vel): vv_smoothed = vv_smoothed[: len(vert_vel)] vert_vel = vv_smoothed self.batch_results["vertical_velocity"] = vert_vel # Store velocity if available (with interpolation) if self.batch_velocity_data: vel_arr = np.array(self.batch_velocity_data) # Interpolate NaNs if present nans = np.isnan(vel_arr) if np.any(nans): x = np.arange(len(vel_arr)) # Only interpolate if we have at least some valid data if np.any(~nans): vel_arr[nans] = np.interp(x[nans], x[~nans], vel_arr[~nans]) # Apply smoothing (same as depth) window_size = int(self.freq * 5) if len(vel_arr) >= window_size: kernel = np.ones(window_size) / window_size pad_width = window_size // 2 vel_padded = np.pad(vel_arr, pad_width, mode="edge") vel_smoothed = np.convolve(vel_padded, kernel, mode="valid") if len(vel_smoothed) > len(vel_arr): vel_smoothed = vel_smoothed[: len(vel_arr)] vel_arr = vel_smoothed self.batch_results["velocity"] = vel_arr # Also process magnetometer if available if self.batch_mag_data: mag_data = np.array(self.batch_mag_data) mag_offset = self._get_mag_offset() if mag_offset is not None: # Apply hard iron offset and normalization # mag_offset is [x, y, z, radius] mag_adj = (mag_data - mag_offset[:3]) / mag_offset[3] # Rotate by attachment angles if att_roll is not None and att_pitch is not None: mag_rotated = mag_adj @ xb(att_roll) mag_rotated = mag_rotated @ yb(att_pitch) else: mag_rotated = mag_adj # Correct for pitch/roll (per-row local rotation) # R-style: Mag %*% Yb(-pitch) %*% Xb(roll) heading_deg = np.zeros(len(mag_data)) # R-style Local Rotation Corrected Magnetometer # We use the row-vector convention: v_corr = v_rot @ Yb(-pitch) @ Xb(roll) for i in range(len(mag_data)): p_rad = np.radians(pitch_deg[i]) r_rad = np.radians(roll_deg[i]) # Apply Yb(-p) then Xb(r) # v_corr = v_rot @ Yb(-p) @ Xb(r) m_vec = mag_rotated[i] m_corr = m_vec @ yb(-p_rad) m_corr = m_corr @ xb(r_rad) # Heading heading_deg[i] = math.degrees(math.atan2(-m_corr[1], m_corr[0])) self.batch_results["heading_deg"] = heading_deg self.batch_results["heading_rad"] = np.radians(heading_deg) self.batch_results["mag_rotated"] = mag_rotated # Calculate Pseudo Track (Dead Reckoning) # R: dat$pseudo_x <- cumsum(cos(dat$heading_radians) * (1 / freq) * 1) # Assumes 1 m/s speed speed = 1.0 # Calculate Pseudo Track (Dead Reckoning) using REAL TIMESTAMPS (True Integration) # Matches position changes to actual time deltas, eliminating velocity jitter speed = 1.0 dt = 1.0 / self.freq if self.batch_timestamps and len(self.batch_timestamps) == len( self.batch_accel_data ): ts_arr = np.array(self.batch_timestamps) # Calculate actual dt between samples # Prepend assumption for first sample dt_arr = np.diff(ts_arr, prepend=ts_arr[0] - (1.0 / self.freq)) # Calculate Clock Drift # drift = Actual - Synthetic # Synthetic = Start + i * (1/freq) synthetic_ts = ts_arr[0] + np.arange(len(ts_arr)) * (1.0 / self.freq) drift_arr = ts_arr - synthetic_ts self.batch_results["clock_drift_sec"] = drift_arr # Use real dt (True Integration) or fixed dt (R-Compat) dt_vector = dt_arr if self.true_integration else np.full(len(dt_arr), dt) self.batch_results["pseudo_x"] = np.cumsum( np.cos(self.batch_results["heading_rad"]) * speed * dt_vector ) self.batch_results["pseudo_y"] = np.cumsum( np.sin(self.batch_results["heading_rad"]) * speed * dt_vector ) else: # Fallback if timestamps missing (legacy behavior) self.batch_results["pseudo_x"] = np.cumsum( np.cos(self.batch_results["heading_rad"]) * speed * dt ) self.batch_results["pseudo_y"] = np.cumsum( np.sin(self.batch_results["heading_rad"]) * speed * dt ) def _get_attachment_angles(self) -> tuple[float | None, float | None]: """Get attachment angles (either computed, locked, or None).""" if self.computed_attachment_roll_rad is not None: return self.computed_attachment_roll_rad, self.computed_attachment_pitch_rad elif self.locked_attachment_roll_rad is not None: return self.locked_attachment_roll_rad, self.locked_attachment_pitch_rad else: return None, None def _get_mag_offset(self) -> NDArray[np.float64] | None: """Get magnetometer offset (either computed, locked, or None).""" if self.computed_mag_offset is not None: return self.computed_mag_offset elif self.locked_mag_offset is not None: return self.locked_mag_offset else: return None
[docs] def process(self, data: dict[str, Any] | np.ndarray) -> dict[str, Any]: """ Process a single record using non-causal filtfilt. Args: data: Raw sensor record (dict or array) Returns: Dictionary with processed state, or minimal state if record is skipped. """ # Increment record count self.record_count += 1 record = data if isinstance(data, dict) else cast(dict[str, Any], data) # Extract accelerometer data (handle both quoted and unquoted field names) def get_field(record: dict[str, Any], quoted_name: str, unquoted_name: str) -> Any: return record.get(quoted_name, record.get(unquoted_name, "nan")) # Extract timestamp (look for internal DateTimeP from data_loader first, then Date) timestamp_obj = record.get("DateTimeP") if timestamp_obj is not None and not isinstance(timestamp_obj, float): # Already a datetime object from data_loader try: timestamp = timestamp_obj.timestamp() except (AttributeError, ValueError): timestamp = float("nan") else: # Try to extract Date (Excel serial date) and convert date_val = safe_float( get_field(record, '"Date"', "Date"), "Date", self.debug_level, self.record_count, ) if not math.isnan(date_val): from datetime import datetime, timedelta, timezone base_date = datetime(1899, 12, 30, tzinfo=timezone.utc) try: dt = base_date + timedelta(days=date_val) timestamp = dt.timestamp() except Exception: timestamp = float("nan") else: # Try Time as fallback timestamp = safe_float( get_field(record, '"Time"', "Time"), "Time", self.debug_level, self.record_count, ) # Sub-second interpolation if not math.isnan(timestamp): # If timestamp matches previous, increment index if timestamp == self.last_ts: self.sub_sec_idx += 1 else: self.last_ts = timestamp self.sub_sec_idx = 0 # Apply offset: index * period # e.g. at 16Hz: 0.0, 0.0625, 0.125... timestamp += self.sub_sec_idx * (1.0 / self.freq) # 1. Collection Phase (Pass 1) if self.r_exact_mode and not self.calibration_complete: # Extract raw values for batch collection x_accel_raw = safe_float( get_field(record, '"int aX"', "int aX"), "int aX", self.debug_level, self.record_count, ) y_accel_raw = safe_float( get_field(record, '"int aY"', "int aY"), "int aY", self.debug_level, self.record_count, ) z_accel_raw = safe_float( get_field(record, '"int aZ"', "int aZ"), "int aZ", self.debug_level, self.record_count, ) x_mag_raw = safe_float( get_field(record, '"int mX"', "int mX"), "int mX", self.debug_level, self.record_count, ) y_mag_raw = safe_float( get_field(record, '"int mY"', "int mY"), "int mY", self.debug_level, self.record_count, ) z_mag_raw = safe_float( get_field(record, '"int mZ"', "int mZ"), "int mZ", self.debug_level, self.record_count, ) # Extract depth data (if available) depth_raw = ( safe_float( get_field(record, '"Depth"', "Depth"), "Depth", self.debug_level, self.record_count, ) if self.depth_buffer is not None else float("nan") ) # Extract velocity data (if available) velocity_raw = ( safe_float( get_field(record, '"velocity"', "velocity"), "velocity", self.debug_level, self.record_count, ) if self.depth_buffer is not None else float("nan") ) # Check for validity (R script filters out NaNs in 'int aX') is_valid = ( not math.isnan(x_accel_raw) and not math.isnan(y_accel_raw) and not math.isnan(z_accel_raw) ) if is_valid: if self.batch_accel_data is not None: self.batch_accel_data.append([x_accel_raw, y_accel_raw, z_accel_raw]) if self.batch_mag_data is not None: self.batch_mag_data.append([x_mag_raw, y_mag_raw, z_mag_raw]) if self.batch_depth_data is not None: self.batch_depth_data.append(depth_raw) if self.batch_velocity_data is not None: self.batch_velocity_data.append(velocity_raw) if self.batch_timestamps is not None: self.batch_timestamps.append(timestamp) # Store that this record_count was valid for Pass 2 indexing self.valid_record_indices.add(self.record_count) # Return minimal output during collection phase return { "record_count": self.record_count, "timestamp": timestamp, "X_Accel_raw": x_accel_raw, "Y_Accel_raw": y_accel_raw, "Z_Accel_raw": z_accel_raw, "skipped": not is_valid, } # 2. Processing Phase (Pass 2) if self.r_exact_mode and self.calibration_complete and self.batch_results: # Check if this record was actually included in the batch calculations if self.record_count not in self.valid_record_indices: return { "record_count": self.record_count, "timestamp": timestamp, "skipped": True, } # Map the actual record index to its position in the filtered batch results res = self.batch_results idx = self.processing_idx self.processing_idx += 1 x_accel_raw = safe_float( get_field(record, '"int aX"', "int aX"), "int aX", self.debug_level, self.record_count, ) y_accel_raw = safe_float( get_field(record, '"int aY"', "int aY"), "int aY", self.debug_level, self.record_count, ) z_accel_raw = safe_float( get_field(record, '"int aZ"', "int aZ"), "int aZ", self.debug_level, self.record_count, ) x_mag_raw = safe_float( get_field(record, '"int mX"', "int mX"), "int mX", self.debug_level, self.record_count, ) y_mag_raw = safe_float( get_field(record, '"int mY"', "int mY"), "int mY", self.debug_level, self.record_count, ) z_mag_raw = safe_float( get_field(record, '"int mZ"', "int mZ"), "int mZ", self.debug_level, self.record_count, ) depth_raw = safe_float( get_field(record, '"Depth"', "Depth"), "Depth", self.debug_level, self.record_count, ) velocity_raw = safe_float( get_field(record, '"velocity"', "velocity"), "velocity", self.debug_level, self.record_count, ) # Step 1: Apply attachment angle rotation to raw accelerometer (if available) static = res["static"][idx] dynamic = res["dynamic"][idx] accel_rot = res["accel_rotated"][idx] attachment_roll, attachment_pitch = self._get_attachment_angles() # Calculate final depth and velocity values final_depth = res["Depth"][idx] if "Depth" in res else depth_raw final_vertical_velocity = 0.0 if "vertical_velocity" in res: final_vertical_velocity = res["vertical_velocity"][idx] # Use 0.0 for speed calc if vertical velocity is NaN vv_safe = final_vertical_velocity if not math.isnan(final_vertical_velocity) else 0.0 # Compute 3D speed from vertical velocity + assumed horizontal speed (1.0 m/s) # This ensures invalid input velocity doesn't zero out the HUD final_velocity = math.sqrt(1.0**2 + vv_safe**2) output = { "record_count": self.record_count, "timestamp": timestamp, "X_Accel_raw": x_accel_raw, "Y_Accel_raw": y_accel_raw, "Z_Accel_raw": z_accel_raw, "X_Mag_raw": x_mag_raw, "Y_Mag_raw": y_mag_raw, "Z_Mag_raw": z_mag_raw, "Depth": final_depth, "velocity": final_velocity, "vertical_velocity": final_vertical_velocity, "X_Accel_rotate": accel_rot[0], "Y_Accel_rotate": accel_rot[1], "Z_Accel_rotate": accel_rot[2], "X_Static": static[0], "Y_Static": static[1], "Z_Static": static[2], "X_Dynamic": dynamic[0], "Y_Dynamic": dynamic[1], "Z_Dynamic": dynamic[2], "ODBA": res["ODBA"][idx], "VeDBA": res["VeDBA"][idx], "pitch_degrees": res["pitch_deg"][idx], "roll_degrees": res["roll_deg"][idx], "pitch_radians": res["pitch_rad"][idx], "roll_radians": res["roll_rad"][idx], } if "clock_drift_sec" in res: output["clock_drift_sec"] = res["clock_drift_sec"][idx] if "heading_deg" in res: output["heading_degrees"] = res["heading_deg"][idx] output["heading_radians"] = res["heading_rad"][idx] if "pseudo_x" in res: output["pseudo_x"] = res["pseudo_x"][idx] output["pseudo_y"] = res["pseudo_y"][idx] # Publish to ZMQ if enabled if self.zmq_publisher and self.eid is not None and self.sim_id is not None: self.zmq_publisher.publish_state(self.eid, self.sim_id, self.tag_id, output) return output # Get attachment angles (for rotation) attachment_roll, attachment_pitch = self._get_attachment_angles() # Step 1: Apply attachment angle rotation to raw accelerometer (if available) if attachment_roll is not None and attachment_pitch is not None: # Create numpy array for rotation accel_vec = np.array([x_accel_raw, y_accel_raw, z_accel_raw]) # Apply Xb(roll) then Yb(pitch) rotation (R-style: Data %*% Rotation) # Note: We use vector @ matrix to match R's row-vector convention accel_rotated = accel_vec @ xb(attachment_roll) accel_rotated = accel_rotated @ yb(attachment_pitch) x_accel_rotate = accel_rotated[0] y_accel_rotate = accel_rotated[1] z_accel_rotate = accel_rotated[2] else: # No attachment angle correction - use raw values x_accel_rotate = x_accel_raw y_accel_rotate = y_accel_raw z_accel_rotate = z_accel_raw # Step 2: Apply filtfilt-based Gsep to rotated accelerometer ( static_x, static_y, static_z, dyn_x, dyn_y, dyn_z, odba, vedba, ) = gsep_streaming( x_accel_rotate, y_accel_rotate, z_accel_rotate, self.filt_len, self.accel_buffer, ) # Buffer the current record for delayed emission # We store the raw values and the computed rotation buffered_record = record.copy() buffered_record["_x_accel_rotate"] = x_accel_rotate buffered_record["_y_accel_rotate"] = y_accel_rotate buffered_record["_z_accel_rotate"] = z_accel_rotate self.record_buffer.append(buffered_record) # If we haven't filled the buffer enough to account for the centered filter delay, # we can't emit a valid record yet. # The centered filter output corresponds to the sample at index: len(buffer) - 1 - delay # delay = (filt_len - 1) // 2 delay = (self.filt_len - 1) // 2 if len(self.record_buffer) <= delay: return {} # Still warming up # Retrieve the record that corresponds to the current filter output # The filter output (static_x, etc.) corresponds to the record 'delay' samples ago # record_buffer[-1] is current, record_buffer[-(delay+1)] is the target target_record = self.record_buffer[-(delay + 1)] # Step 3: Compute pitch and roll from static acceleration (R-style pitchRoll2) # Uses xb, yb rotation functions from biologger_sim.functions.rotation if not math.isnan(static_x): # Convert static acceleration to unit vector static_mag = math.sqrt(static_x**2 + static_y**2 + static_z**2) if static_mag > 0: static_x_norm = static_x / static_mag static_y_norm = static_y / static_mag static_z_norm = static_z / static_mag # Compute pitch and roll using R-style atan2 # pitch = atan2(xb, sqrt(yb^2 + zb^2)) # roll = atan2(yb, zb) pitch_rad = math.atan2( static_x_norm, math.sqrt(static_y_norm**2 + static_z_norm**2) ) roll_rad = math.atan2(static_y_norm, static_z_norm) pitch_deg = math.degrees(pitch_rad) roll_deg = math.degrees(roll_rad) else: pitch_rad = float("nan") roll_rad = float("nan") pitch_deg = float("nan") roll_deg = float("nan") else: pitch_rad = float("nan") roll_rad = float("nan") pitch_deg = float("nan") roll_deg = float("nan") # Step 4: Process magnetometer data (if available) mag_offset = self._get_mag_offset() if mag_offset is not None and not math.isnan(x_mag_raw): # Apply hard iron offset and normalization x_mag_adj = (x_mag_raw - mag_offset[0]) / mag_offset[3] y_mag_adj = (y_mag_raw - mag_offset[1]) / mag_offset[3] z_mag_adj = (z_mag_raw - mag_offset[2]) / mag_offset[3] # Rotate magnetometer by attachment angles (if available) if attachment_roll is not None and attachment_pitch is not None: mag_vec = np.array([x_mag_adj, y_mag_adj, z_mag_adj]) # Apply Xb(roll) then Yb(pitch) rotation (R-style: Data %*% Rotation) mag_rotated = mag_vec @ xb(attachment_roll) mag_rotated = mag_rotated @ yb(attachment_pitch) x_mag_rotate = mag_rotated[0] y_mag_rotate = mag_rotated[1] z_mag_rotate = mag_rotated[2] else: x_mag_rotate = x_mag_adj y_mag_rotate = y_mag_adj z_mag_rotate = z_mag_adj # Correct magnetometer for pitch/roll (R-style, per-row local rotation) if not math.isnan(pitch_rad) and not math.isnan(roll_rad): mag_vec_corr = np.array([x_mag_rotate, y_mag_rotate, z_mag_rotate]) # Apply Yb(-pitch) then Xb(roll) rotation (R-style: Data %*% Rotation) mag_corrected = mag_vec_corr @ yb(-pitch_rad) mag_corrected = mag_corrected @ xb(roll_rad) x_mag_corrected = mag_corrected[0] y_mag_corrected = mag_corrected[1] z_mag_corrected = mag_corrected[2] # Calculate heading from corrected magnetometer heading_rad = math.atan2(-y_mag_corrected, x_mag_corrected) heading_deg = math.degrees(heading_rad) else: x_mag_corrected = float("nan") y_mag_corrected = float("nan") z_mag_corrected = float("nan") heading_rad = float("nan") heading_deg = float("nan") else: x_mag_adj = float("nan") y_mag_adj = float("nan") z_mag_adj = float("nan") x_mag_rotate = float("nan") y_mag_rotate = float("nan") z_mag_rotate = float("nan") x_mag_corrected = float("nan") y_mag_corrected = float("nan") z_mag_corrected = float("nan") heading_rad = float("nan") heading_deg = float("nan") # Calculate Dead Reckoning (Pseudo Track) if not math.isnan(heading_rad): # R uses constant 1 m/s speed speed = 1.0 delta_t = 1.0 / self.freq dx = speed * math.cos(heading_rad) * delta_t dy = speed * math.sin(heading_rad) * delta_t self.pseudo_x += dx self.pseudo_y += dy else: # If heading is NaN, position doesn't change (or should it be NaN?) # For visualization, keeping last known position is usually better than NaN pass # Build output record (expanded schema for R-compatibility) # Use target_record for raw values to ensure alignment # Get batch-computed vertical velocity (R-style smoothing applied in Pass 1) vert_vel = 0.0 results = self.batch_results if results is not None and "vertical_velocity" in results: vv_list = results["vertical_velocity"] res_idx = self.record_count - 1 - delay if 0 <= res_idx < len(vv_list): vert_vel = vv_list[res_idx] # Calculate 3D velocity magnitude (Horizontal 1.0 m/s + Vertical Rate) # This provides a non-zero, dynamic velocity readout for the HUD total_speed_3d = math.sqrt(1.0 + vert_vel**2) # Build output record (expanded schema for R-compatibility) # Use target_record for raw values to ensure alignment output = { "record_count": self.record_count - delay, # Adjust count for delay "X_Accel_raw": safe_float( get_field(target_record, '"int aX"', "int aX"), "int aX", 0, 0, ), "Y_Accel_raw": safe_float( get_field(target_record, '"int aY"', "int aY"), "int aY", 0, 0, ), "Z_Accel_raw": safe_float( get_field(target_record, '"int aZ"', "int aZ"), "int aZ", 0, 0, ), "X_Mag_raw": safe_float( get_field(target_record, '"int mX"', "int mX"), "int mX", 0, 0, ), "Y_Mag_raw": safe_float( get_field(target_record, '"int mY"', "int mY"), "int mY", 0, 0, ), "Z_Mag_raw": safe_float( get_field(target_record, '"int mZ"', "int mZ"), "int mZ", 0, 0, ), "Depth": safe_float( get_field(target_record, '"Depth"', "Depth"), "Depth", 0, 0, ), "X_Accel_rotate": target_record.get("_x_accel_rotate", float("nan")), "Y_Accel_rotate": target_record.get("_y_accel_rotate", float("nan")), "Z_Accel_rotate": target_record.get("_z_accel_rotate", float("nan")), "X_Static": static_x, "Y_Static": static_y, "Z_Static": static_z, "X_Dynamic": dyn_x, "Y_Dynamic": dyn_y, "Z_Dynamic": dyn_z, "ODBA": odba, "VeDBA": vedba, "pitch_radians": pitch_rad, "roll_radians": roll_rad, "pitch_degrees": pitch_deg, "roll_degrees": roll_deg, "X_Mag_adj": x_mag_adj, "Y_Mag_adj": y_mag_adj, "Z_Mag_adj": z_mag_adj, "X_Mag_rotate": x_mag_rotate, "Y_Mag_rotate": y_mag_rotate, "Z_Mag_rotate": z_mag_rotate, "X_Mag_corrected": x_mag_corrected, "Y_Mag_corrected": y_mag_corrected, "Z_Mag_corrected": z_mag_corrected, "heading_radians": heading_rad, "heading_degrees": heading_deg, "velocity": total_speed_3d, "vertical_velocity": vert_vel, "pseudo_x": self.pseudo_x, "pseudo_y": self.pseudo_y, } # Publish to ZMQ if enabled if self.zmq_publisher and self.eid is not None and self.sim_id is not None: if self.debug_level >= 2: self.logger.debug(f"Publishing to ZMQ: {output.get('record_count')}") self.zmq_publisher.publish_state(self.eid, self.sim_id, self.tag_id, output) if self.debug_level >= 2 and self.record_count <= 10: self.logger.debug(f"Record #{self.record_count}: {output}") return output
[docs] def reset(self) -> None: """Reset processor to initial state.""" self.accel_buffer.clear() if self.mag_buffer is not None: self.mag_buffer.clear() if self.depth_buffer is not None: self.depth_buffer.clear() self.record_buffer.clear() self.record_count = 0 self.processing_idx = 0 self.pseudo_x = 0.0 self.pseudo_y = 0.0 self.logger.info("PostFactoProcessor reset")
[docs] def get_performance_summary(self) -> dict[str, Any]: """Get performance metrics.""" return { "processor_type": "PostFactoProcessor", "records_processed": self.record_count, "filt_len": self.filt_len, "buffer_size": len(self.accel_buffer), "buffer_max_size": self.accel_buffer.maxlen, }
[docs] def update_config(self, config_updates: dict[str, Any]) -> None: """Update processor configuration.""" if "filt_len" in config_updates: self.filt_len = config_updates["filt_len"] # Recreate buffer with new size old_buffer = list(self.accel_buffer) self.accel_buffer = deque(old_buffer, maxlen=self.filt_len * 2) self.logger.info(f"Updated filt_len to {self.filt_len}") if "debug_level" in config_updates: self.debug_level = config_updates["debug_level"] self.logger.info(f"Updated debug_level to {self.debug_level}")
[docs] def get_current_state(self) -> dict[str, Any]: """Get current processor state.""" return { "record_count": self.record_count, "buffer_fill": len(self.accel_buffer), "buffer_capacity": self.accel_buffer.maxlen, "ready_for_output": len(self.accel_buffer) >= self.filt_len * 2, }
[docs] def get_output_schema(self) -> list[str]: """Get list of output fields (R-compatible expanded schema).""" return [ "record_count", "X_Accel_raw", "Y_Accel_raw", "Z_Accel_raw", "X_Mag_raw", "Y_Mag_raw", "Z_Mag_raw", "Depth", "X_Accel_rotate", "Y_Accel_rotate", "Z_Accel_rotate", "X_Static", "Y_Static", "Z_Static", "X_Dynamic", "Y_Dynamic", "Z_Dynamic", "ODBA", "VeDBA", "pitch_radians", "roll_radians", "pitch_degrees", "roll_degrees", "X_Mag_adj", "Y_Mag_adj", "Z_Mag_adj", "X_Mag_rotate", "Y_Mag_rotate", "Z_Mag_rotate", "X_Mag_corrected", "Y_Mag_corrected", "Z_Mag_corrected", "heading_radians", "heading_degrees", "velocity", "vertical_velocity", "pseudo_x", "pseudo_y", ]