Source code for biologger_sim.core.ins

# Copyright (c) 2025 Daniel Fry
# MIT License. See LICENSE file in the project root for full license text.

"""
Shared INS (Inertial Navigation System) module for biologger data.

Ported from biologger-pseudotrack to achieve exact depth parity with R baseline.

This module provides reusable INS capabilities including:
- Kalman filtering for depth/velocity fusion
- Vertical velocity estimation via acceleration integration
- Optional biological constraint enforcement (velocity/acceleration limits)
- State persistence across algorithm transitions

Biological constraints can be disabled by passing None (R-style streaming).
"""

import logging
import math
from typing import Any

import numpy as np

from biologger_sim.functions.kalman_filter import KalmanFilter


[docs] class VelocityEstimator: """Kalman filter wrapper for vertical velocity estimation.""" def __init__( self, process_noise_depth: float = 1e-4, process_noise_velocity: float = 1e-3, measurement_noise: float = 0.02, ): """ Initialize velocity estimator. Args: process_noise_depth: Process noise for depth state process_noise_velocity: Process noise for velocity state measurement_noise: Measurement noise for depth observations """ self.process_noise_depth = process_noise_depth self.process_noise_velocity = process_noise_velocity self.measurement_noise = measurement_noise self.kalman_filter: KalmanFilter | None = None self.logger = logging.getLogger(__name__)
[docs] def initialize(self, initial_depth: float) -> None: """ Initialize Kalman filter with first depth measurement. Args: initial_depth: Initial depth value for filter initialization """ self.kalman_filter = KalmanFilter( process_noise_depth=self.process_noise_depth, process_noise_velocity=self.process_noise_velocity, measurement_noise=self.measurement_noise, initial_depth=initial_depth, )
[docs] def predict(self, dt: float, vertical_acceleration: float) -> None: """ Prediction step of Kalman filter. Args: dt: Time step in seconds vertical_acceleration: Filtered vertical acceleration in m/s² """ if self.kalman_filter is not None: self.kalman_filter.predict(dt, vertical_acceleration)
[docs] def update(self, depth_measurement: float) -> None: """ Update step of Kalman filter with depth measurement. Args: depth_measurement: Measured depth value """ if self.kalman_filter is not None: self.kalman_filter.update(depth_measurement)
[docs] def get_velocity(self) -> float: """ Get current velocity estimate. Returns: Estimated vertical velocity in m/s """ if self.kalman_filter is not None: return self.kalman_filter.get_velocity() return 0.0
[docs] def get_depth(self) -> float: """ Get current depth estimate. Returns: Estimated depth in meters """ if self.kalman_filter is not None: return self.kalman_filter.get_depth() return 0.0
[docs] def apply_biological_constraints(self, max_velocity: float) -> None: """ Apply biological constraints to velocity estimate. Args: max_velocity: Maximum biologically plausible velocity (m/s) """ if self.kalman_filter is not None: current_velocity = self.kalman_filter.get_velocity() if abs(current_velocity) > max_velocity: constrained_velocity = float(np.sign(current_velocity)) * max_velocity # Update the Kalman filter state directly self.kalman_filter.x1 = constrained_velocity
[docs] def is_initialized(self) -> bool: """ Check if the velocity estimator is initialized. Returns: True if Kalman filter is initialized """ return self.kalman_filter is not None
[docs] def reset(self) -> None: """Reset velocity estimator to initial state.""" self.kalman_filter = None
[docs] class INSSolution: """6-DOF INS solution with Kalman filtering for depth estimation.""" def __init__( self, sample_rate: float, biological_constraints: dict[str, float] | None = None, ): """ Initialize INS solution. Args: sample_rate: Data sampling rate in Hz biological_constraints: Dict with 'max_velocity' and 'max_acceleration' keys. Set to None to disable constraints (R-style streaming). """ self.sample_rate = sample_rate self.max_velocity = ( biological_constraints.get("max_velocity", 3.0) if biological_constraints else None ) self.max_acceleration = ( biological_constraints.get("max_acceleration", 2.0) if biological_constraints else None ) # Velocity estimator using Kalman filtering self.velocity_estimator = VelocityEstimator() # Simple velocity integration state self.vertical_velocity = 0.0 # Depth tracking state self.last_depth_measurement: float | None = None self.last_depth_time: float | None = None self.last_kf_depth_used: float | None = None self.last_kf_update_time: float | None = None # Position state (for future extensions) self.position = np.array([0.0, 0.0, 0.0]) # [x, y, depth]
[docs] def update( self, accel_world: np.ndarray, depth_measurement: float | None, dt: float, current_time: float, ) -> dict[str, Any]: """ Update INS solution with new measurements. Args: accel_world: World-frame acceleration [ax, ay, az] in m/s² depth_measurement: Depth measurement in meters, None if not available dt: Time step since last update in seconds current_time: Current timestamp Returns: Dictionary with updated state estimates """ # Extract vertical acceleration (Z component in world frame) accel_z_world = float(accel_world[2]) # Apply biological constraints to acceleration (if enabled) if self.max_acceleration is not None and abs(accel_z_world) > self.max_acceleration: accel_z_world = float(np.sign(accel_z_world)) * self.max_acceleration # Step 1: Simple velocity integration self.vertical_velocity += accel_z_world * dt depth_val: float | None = None if depth_measurement is not None and math.isfinite(depth_measurement): depth_val = depth_measurement # Zero velocity estimate at each depth reading (as prescribed) # Only zero if depth changed significantly or enough time passed if self.last_depth_measurement is not None and ( abs(depth_val - self.last_depth_measurement) > 0.1 or (self.last_depth_time is not None and current_time - self.last_depth_time > 1.0) ): self.vertical_velocity = 0.0 self.last_depth_measurement = depth_val self.last_depth_time = current_time # Step 3: Kalman filter for depth estimation if not self.velocity_estimator.is_initialized() and depth_val is not None: # Initialize Kalman filter with first depth measurement self.velocity_estimator.initialize(depth_val) if self.velocity_estimator.is_initialized(): # Prediction step self.velocity_estimator.predict(dt, accel_z_world) # Update step (only at ~1Hz or when depth value changes materially) if depth_val is not None: should_update = False if ( self.last_kf_update_time is None or (current_time - self.last_kf_update_time) >= 0.9 ): should_update = True last_used = self.last_kf_depth_used if last_used is None or abs(depth_val - last_used) > 1e-6: should_update = True if should_update: self.velocity_estimator.update(depth_val) self.last_kf_depth_used = depth_val self.last_kf_update_time = current_time # Get Kalman filter estimates depth_kalman_filtered = self.velocity_estimator.get_depth() velocity_kalman_estimate = self.velocity_estimator.get_velocity() # Apply biological constraints (if enabled) if self.max_velocity is not None: self.velocity_estimator.apply_biological_constraints(self.max_velocity) velocity_kalman_estimate = self.velocity_estimator.get_velocity() else: depth_kalman_filtered = depth_val if depth_val is not None else float("nan") velocity_kalman_estimate = self.vertical_velocity # Apply biological constraints to simple velocity integration (if enabled) if self.max_velocity is not None and abs(self.vertical_velocity) > self.max_velocity: self.vertical_velocity = float(np.sign(self.vertical_velocity)) * self.max_velocity return { "depth_kalman_filtered": depth_kalman_filtered, "velocity_estimate": velocity_kalman_estimate, "vertical_velocity": self.vertical_velocity, "position": self.position.copy(), "kalman_initialized": self.velocity_estimator.is_initialized(), }
[docs] def get_velocity_estimate(self) -> tuple[float, float, float]: """ Get current velocity estimate. Returns: Tuple of (vx, vy, vz) velocity components in m/s """ # For now, only vertical velocity is estimated if self.velocity_estimator.is_initialized(): vz = self.velocity_estimator.get_velocity() else: vz = self.vertical_velocity return (0.0, 0.0, vz)
[docs] def get_position_estimate(self) -> tuple[float, float, float]: """ Get current position estimate. Returns: Tuple of (x, y, z) position components """ if self.velocity_estimator.is_initialized(): depth = self.velocity_estimator.get_depth() return (self.position[0], self.position[1], depth) else: return (self.position[0], self.position[1], self.position[2])
[docs] def get_state(self) -> dict[str, Any]: """ Get current INS state information. Returns: Dictionary with complete state information """ velocity = self.get_velocity_estimate() position = self.get_position_estimate() return { "velocity": velocity, "position": position, "vertical_velocity": self.vertical_velocity, "kalman_initialized": self.velocity_estimator.is_initialized(), "last_depth_measurement": self.last_depth_measurement, "last_depth_time": self.last_depth_time, "biological_constraints": { "max_velocity": self.max_velocity, "max_acceleration": self.max_acceleration, }, }
[docs] def reset(self) -> None: """Reset INS solution to initial state.""" self.velocity_estimator.reset() self.vertical_velocity = 0.0 self.position.fill(0.0) self.last_depth_measurement = None self.last_depth_time = None self.last_kf_depth_used = None self.last_kf_update_time = None