Source code for biologger_sim.functions.depth_smoother

# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.

import logging
import math


[docs] class DepthSmoother: """ Multi-scale EMA depth smoothing for reducing high-frequency variance while preserving accuracy. Implements adaptive blending of multiple EMA filters with different time constants: - Fast EMA (2-3 second response): Captures rapid depth changes during active periods - Medium EMA (10-15 second response): Balanced response for general smoothing - Slow EMA (30-60 second response): Provides stability during low-activity periods Adaptive blending ratios based on ODBA. """ def __init__( self, freq: int = 16, low_activity_threshold: float = 0.15, high_activity_threshold: float = 0.20, debug_level: int = 0, ): self.freq = freq self.low_activity_threshold = low_activity_threshold self.high_activity_threshold = high_activity_threshold self.debug_level = debug_level self.alpha_fast = 2.0 / (freq * 3 + 1) self.alpha_medium = 2.0 / (freq * 12 + 1) self.alpha_slow = 2.0 / (freq * 45 + 1) self.depth_ema_fast: float | None = None self.depth_ema_medium: float | None = None self.depth_ema_slow: float | None = None self.last_weights = (0.0, 0.3, 0.7) self.logger = logging.getLogger(__name__)
[docs] def update(self, depth_estimate: float, odba: float) -> float: """ Update smoothing with new depth estimate and activity level. """ if not math.isfinite(depth_estimate): return depth_estimate if ( self.depth_ema_fast is None or self.depth_ema_medium is None or self.depth_ema_slow is None ): self.depth_ema_fast = depth_estimate self.depth_ema_medium = depth_estimate self.depth_ema_slow = depth_estimate return depth_estimate # Static analysis hints: these cannot be None anymore assert self.depth_ema_fast is not None assert self.depth_ema_medium is not None assert self.depth_ema_slow is not None # Update EMAs self.depth_ema_fast = (self.alpha_fast * depth_estimate) + ( 1 - self.alpha_fast ) * self.depth_ema_fast self.depth_ema_medium = (self.alpha_medium * depth_estimate) + ( 1 - self.alpha_medium ) * self.depth_ema_medium self.depth_ema_slow = (self.alpha_slow * depth_estimate) + ( 1 - self.alpha_slow ) * self.depth_ema_slow # Calculate weights based on activity (ODBA) if math.isfinite(odba): if odba <= self.low_activity_threshold: w = 0.0 elif odba >= self.high_activity_threshold: w = 1.0 else: w = (odba - self.low_activity_threshold) / ( self.high_activity_threshold - self.low_activity_threshold ) w = max(0.0, min(1.0, w)) # Adaptive blending fast = 0.4 * w medium = 0.3 * (1.0 - w) + 0.6 * w slow = 0.7 * (1.0 - w) weights = (fast, medium, slow) self.last_weights = weights else: weights = self.last_weights final_depth = ( weights[0] * self.depth_ema_fast + weights[1] * self.depth_ema_medium + weights[2] * self.depth_ema_slow ) return final_depth