API Reference
This section contains the automatically generated documentation for the biologger-sim modules.
Core Processors
- class biologger_sim.processors.lab.PostFactoProcessor(filt_len: int = 48, freq: int = 16, debug_level: int = 0, r_exact_mode: bool = False, compute_attachment_angles: bool = True, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, compute_mag_offsets: bool = True, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None, depth_cfg: DepthConfig | None = None, zmq_publisher: ZMQPublisher | None = None, eid: int | None = None, sim_id: str | None = None, tag_id: str = 'unknown', clock_source: ClockSource = ClockSource.FIXED_FREQ, **kwargs: Any)[source]
Bases:
BiologgerProcessorPost-facto (non-causal) biologger processor for R-compatibility.
This processor uses R’s centered moving average filter (filter(sides=2, circular=TRUE)) to achieve exact tie-out with the gRumble R package. Unlike StreamingProcessor which uses causal lfilter (trailing window), this uses a centered window looking both forward and backward in time, which is only possible for post-hoc analysis.
- Memory Footprint:
Fixed size: O(filt_len) samples (48 samples @ 16Hz for 3s window)
Independent of dataset size (unlike batch/ which loads all data)
- Validation Target:
<0.1° error vs R (pitch, roll, heading)
Exact ODBA/VeDBA match
- calibrate_from_batch_data() None[source]
Perform batch calibration from collected data.
This should be called after the first pass through the dataset in r_exact_mode. Computes attachment angles and magnetometer offsets from full dataset.
- class biologger_sim.processors.streaming.StreamingProcessor(filt_len: int = 48, freq: int = 16, debug_level: int = 0, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None, ema_fast_alpha: float = 0.2, ema_slow_alpha: float = 0.02, ema_cross_threshold: float = 0.5, zmq_publisher: ZMQPublisher | None = None, eid: int | None = None, sim_id: str = 'default', tag_id: str = 'unknown', dead_reckoning_speed_model: str = 'odba_scaled', dead_reckoning_constant_speed: float = 1.0, dead_reckoning_odba_factor: float = 2.0, highpass_cutoff: float = 0.1, **kwargs: Any)[source]
Bases:
BiologgerProcessorCausal (real-time) streaming processor for digital twin and on-tag simulation.
- get_current_state() dict[str, Any][source]
Get current processor state and internal parameters.
- Returns:
Dict containing current state information, calibration status, buffer contents, and algorithm-specific diagnostics
- get_performance_summary() dict[str, Any][source]
Get comprehensive performance metrics and telemetry.
- Returns:
Dict containing timing statistics, throughput metrics, algorithm performance, and diagnostic information
- process(record: dict[str, Any] | Any) dict[str, Any][source]
Process input data through the pipeline.
This is the main processing method that all processors must implement. The input format is flexible to support different processor types: - Streaming: Dict with sensor records (accel, mag, depth, etc.) - Adaptive: np.ndarray with accelerometer samples - Batch: Dict with full dataset parameters
- Parameters:
data – Input data - format depends on processor type
- Returns:
Dict containing processed results and metadata
- Raises:
ProcessingError – If processing fails due to invalid input or internal error
- reset() None[source]
Reset processor to initial state.
Clears all internal buffers, calibration state, and accumulated statistics while preserving configuration settings.
- update_config(config_updates: dict[str, Any]) None[source]
Update processor configuration at runtime.
- Parameters:
config_updates – Dictionary of configuration parameters to update
- Raises:
ConfigurationError – If configuration update is invalid
Simulation Logic & Types
- class biologger_sim.core.types.AHRSAlgorithm(value)[source]
Bases:
str,EnumDefines the AHRS algorithm to use for sensor fusion.
- COMPLEMENTARY = 'complementary'
- EKF = 'ekf'
- MADGWICK = 'madgwick'
- MAHONY = 'mahony'
- class biologger_sim.core.types.AHRSConfig(*, enabled: bool = False, algorithm: AHRSAlgorithm = AHRSAlgorithm.MADGWICK, beta: float = 0.1, kp: float = 0.5, ki: float = 0.0, use_adaptive: bool = False, normal_gravity_threshold: float = 0.05, slip_gravity_threshold: float = 0.15, smoothing_alpha: float = 0.01)[source]
Bases:
BaseModelConfiguration for AHRS sensor fusion.
- algorithm: AHRSAlgorithm
- beta: float
- enabled: bool
- ki: float
- kp: float
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- normal_gravity_threshold: float
- slip_gravity_threshold: float
- smoothing_alpha: float
- use_adaptive: bool
- class biologger_sim.core.types.CalibrationConfig(*, attachment_angle_mode: CalibrationMode = CalibrationMode.BATCH_COMPUTE, magnetometer_mode: CalibrationMode = CalibrationMode.BATCH_COMPUTE, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None)[source]
Bases:
BaseModelConfiguration for sensor calibration.
- attachment_angle_mode: CalibrationMode
- locked_attachment_pitch_deg: float | None
- locked_attachment_roll_deg: float | None
- locked_mag_offset_x: float | None
- locked_mag_offset_y: float | None
- locked_mag_offset_z: float | None
- locked_mag_sphere_radius: float | None
- magnetometer_mode: CalibrationMode
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class biologger_sim.core.types.CalibrationMode(value)[source]
Bases:
str,EnumDefines how sensor calibration parameters are determined.
BATCH_COMPUTE: Calculate from the full dataset (R-style, acausal). FIXED: Use pre-determined constants (Real-time/Tag-style).
- BATCH_COMPUTE = 'batch_compute'
- FIXED = 'fixed'
- class biologger_sim.core.types.ClockSource(value)[source]
Bases:
str,EnumDefines the source of time steps for integration.
FIXED_FREQ: Use fixed 1/sampling_rate_hz steps. (R-Compatible) SENSOR_TIME: Use actual differential timestamps from sensor data.
- FIXED_FREQ = 'fixed_frequency'
- SENSOR_TIME = 'sensor_timestamps'
- class biologger_sim.core.types.DeadReckoningConfig(*, speed_model: SpeedModel = SpeedModel.ODBA_SCALED, constant_speed_m_s: float = 1.0, odba_speed_factor: float = 2.0)[source]
Bases:
BaseModelConfiguration for dead reckoning and speed estimation.
- constant_speed_m_s: float
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- odba_speed_factor: float
- speed_model: SpeedModel
- class biologger_sim.core.types.DepthAlgorithm(value)[source]
Bases:
str,EnumDefines the algorithm used for depth estimation.
ACAUSAL_INTERP: Post-hoc, full-batch interpolation (R-compatible). CAUSAL_SAMPLE_HOLD: Simple hold/nearest neighbor for real-time.
- ACAUSAL_INTERP = 'acausal_interpolate'
- CAUSAL_SAMPLE_HOLD = 'causal_sample_hold'
- OFF = 'off'
- class biologger_sim.core.types.DepthConfig(*, algorithm: DepthAlgorithm = DepthAlgorithm.CAUSAL_SAMPLE_HOLD, interpolation_max_gap: float = 5.0)[source]
Bases:
BaseModelConfiguration for depth estimation.
- algorithm: DepthAlgorithm
- interpolation_max_gap: float
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class biologger_sim.core.types.EntityConfig(*, sim_id: str, tag_id: str | None = None, input_file: Path, sampling_rate_hz: int = 16, start_time_offset: float = 0.0, metadata_file: Path | None = None, clock_source: ClockSource = ClockSource.FIXED_FREQ, strict_r_parity: bool = False, calibration: CalibrationConfig = <factory>, depth_estimation: DepthConfig = <factory>, dead_reckoning: DeadReckoningConfig = <factory>, ahrs: AHRSConfig = <factory>, save_telemetry: bool = False)[source]
Bases:
BaseModelConfiguration for a single simulation entity.
- ahrs: AHRSConfig
- calibration: CalibrationConfig
- clock_source: ClockSource
- dead_reckoning: DeadReckoningConfig
- depth_estimation: DepthConfig
- enforce_r_parity() EntityConfig[source]
Enforces R-parity settings if strict mode is enabled.
- input_file: Path
- metadata_file: Path | None
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- sampling_rate_hz: int
- save_telemetry: bool
- sim_id: str
- start_time_offset: float
- strict_r_parity: bool
- tag_id: str | None
- class biologger_sim.core.types.PipelineConfig(*, mode: ProcessingMode = ProcessingMode.SIMULATION, publish_zmq: bool = True, simulation: SimulationConfig)[source]
Bases:
BaseModelGlobal multi-entity pipeline configuration.
- mode: ProcessingMode
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- publish_zmq: bool
- simulation: SimulationConfig
- class biologger_sim.core.types.ProcessingMode(value)[source]
Bases:
str,EnumDefines the processing mode for the pipeline.
- LAB: Post-hoc analysis using acausal filters (filtfilt) for maximum accuracy.
Matches R implementation.
- SIMULATION: Real-time simulation using causal filters (lfilter/EMA).
Simulates on-tag processing constraints.
- LAB = 'LAB'
- SIMULATION = 'SIMULATION'
- class biologger_sim.core.types.SimulationConfig(*, entities: list[EntityConfig] = <factory>, meta_file: Path | None = None, playback_speed: float = 1.0, loop: bool = False, zmq: ZMQConfig = <factory>)[source]
Bases:
BaseModelConfiguration for simulation-wide settings.
- entities: list[EntityConfig]
- loop: bool
- meta_file: Path | None
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- playback_speed: float
- class biologger_sim.core.types.SpeedModel(value)[source]
Bases:
str,EnumDefines the model used for speed estimation (dead reckoning).
- CONSTANT = 'constant'
- ODBA_SCALED = 'odba_scaled'
- class biologger_sim.core.types.ZMQConfig(*, port: int = 5555, host: str = '127.0.0.1', topic: str = 'biologger/telemetry')[source]
Bases:
BaseModelConfiguration for ZeroMQ communication.
- host: str
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- port: int
- topic: str
- class biologger_sim.core.processor_interface.BiologgerProcessor[source]
Bases:
ABCAbstract base interface for biologger data processors.
This interface defines the standard contract that all biologger processing pipelines should implement, enabling consistent usage patterns across batch, streaming, and adaptive sensor fusion processors.
Design Principles: - Unified processing interface via process() method - Standardized configuration and state management - Consistent performance monitoring and telemetry - Flexible input/output for different data types - Runtime reconfiguration support
- abstractmethod get_current_state() dict[str, Any][source]
Get current processor state and internal parameters.
- Returns:
Dict containing current state information, calibration status, buffer contents, and algorithm-specific diagnostics
- get_output_schema() list[str][source]
Get list of output field names produced by process().
Default implementation returns empty list. Processors should override this to provide schema information for output validation.
- Returns:
List of string field names in process() output
- abstractmethod get_performance_summary() dict[str, Any][source]
Get comprehensive performance metrics and telemetry.
- Returns:
Dict containing timing statistics, throughput metrics, algorithm performance, and diagnostic information
- get_processor_info() dict[str, Any][source]
Get processor metadata and capabilities.
- Returns:
Dict containing processor type, version, supported features, and configuration parameters
- abstractmethod process(data: dict[str, Any] | ndarray) dict[str, Any][source]
Process input data through the pipeline.
This is the main processing method that all processors must implement. The input format is flexible to support different processor types: - Streaming: Dict with sensor records (accel, mag, depth, etc.) - Adaptive: np.ndarray with accelerometer samples - Batch: Dict with full dataset parameters
- Parameters:
data – Input data - format depends on processor type
- Returns:
Dict containing processed results and metadata
- Raises:
ProcessingError – If processing fails due to invalid input or internal error
- abstractmethod reset() None[source]
Reset processor to initial state.
Clears all internal buffers, calibration state, and accumulated statistics while preserving configuration settings.
- abstractmethod update_config(config_updates: dict[str, Any]) None[source]
Update processor configuration at runtime.
- Parameters:
config_updates – Dictionary of configuration parameters to update
- Raises:
ConfigurationError – If configuration update is invalid
- exception biologger_sim.core.processor_interface.ConfigurationError[source]
Bases:
ExceptionException raised for invalid configuration updates.
- exception biologger_sim.core.processor_interface.ProcessingError[source]
Bases:
ExceptionException raised during data processing.
Shared INS (Inertial Navigation System) module for biologger data.
Ported from biologger-pseudotrack to achieve exact depth parity with R baseline.
This module provides reusable INS capabilities including: - Kalman filtering for depth/velocity fusion - Vertical velocity estimation via acceleration integration - Optional biological constraint enforcement (velocity/acceleration limits) - State persistence across algorithm transitions
Biological constraints can be disabled by passing None (R-style streaming).
- class biologger_sim.core.ins.INSSolution(sample_rate: float, biological_constraints: dict[str, float] | None = None)[source]
Bases:
object6-DOF INS solution with Kalman filtering for depth estimation.
- get_position_estimate() tuple[float, float, float][source]
Get current position estimate.
- Returns:
Tuple of (x, y, z) position components
- get_state() dict[str, Any][source]
Get current INS state information.
- Returns:
Dictionary with complete state information
- get_velocity_estimate() tuple[float, float, float][source]
Get current velocity estimate.
- Returns:
Tuple of (vx, vy, vz) velocity components in m/s
- update(accel_world: ndarray, depth_measurement: float | None, dt: float, current_time: float) dict[str, Any][source]
Update INS solution with new measurements.
- Parameters:
accel_world – World-frame acceleration [ax, ay, az] in m/s²
depth_measurement – Depth measurement in meters, None if not available
dt – Time step since last update in seconds
current_time – Current timestamp
- Returns:
Dictionary with updated state estimates
- class biologger_sim.core.ins.VelocityEstimator(process_noise_depth: float = 0.0001, process_noise_velocity: float = 0.001, measurement_noise: float = 0.02)[source]
Bases:
objectKalman filter wrapper for vertical velocity estimation.
- apply_biological_constraints(max_velocity: float) None[source]
Apply biological constraints to velocity estimate.
- Parameters:
max_velocity – Maximum biologically plausible velocity (m/s)
- get_velocity() float[source]
Get current velocity estimate.
- Returns:
Estimated vertical velocity in m/s
- initialize(initial_depth: float) None[source]
Initialize Kalman filter with first depth measurement.
- Parameters:
initial_depth – Initial depth value for filter initialization
- is_initialized() bool[source]
Check if the velocity estimator is initialized.
- Returns:
True if Kalman filter is initialized
Scientific Functions
- class biologger_sim.functions.depth_smoother.DepthSmoother(freq: int = 16, low_activity_threshold: float = 0.15, high_activity_threshold: float = 0.2, debug_level: int = 0)[source]
Bases:
objectMulti-scale EMA depth smoothing for reducing high-frequency variance while preserving accuracy. Implements adaptive blending of multiple EMA filters with different time constants: - Fast EMA (2-3 second response): Captures rapid depth changes during active periods - Medium EMA (10-15 second response): Balanced response for general smoothing - Slow EMA (30-60 second response): Provides stability during low-activity periods
Adaptive blending ratios based on ODBA.
Non-causal filtering for post-facto biologger analysis.
This module provides filtfilt-based (bidirectional, zero-phase) filtering to achieve R-compatibility for scientific validation. Unlike the streaming module which uses causal lfilter, these filters look both forward and backward in time, which is only possible for post-hoc analysis.
Key functions: - gsep_filtfilt: R-compatible gravitational separation using scipy.signal.filtfilt
- biologger_sim.functions.filters.butterworth_filtfilt(data: ndarray[tuple[Any, ...], dtype[float64]], cutoff: float, fs: float, order: int = 5) ndarray[tuple[Any, ...], dtype[float64]][source]
Apply Butterworth low-pass filter using filtfilt for zero-phase filtering.
- Parameters:
data (np.ndarray) – Input signal
cutoff (float) – Cutoff frequency in Hz
fs (float) – Sampling frequency in Hz
order (int) – Filter order (default 5)
- Returns:
Filtered signal
- Return type:
np.ndarray
- biologger_sim.functions.filters.gsep_batch_circular(accel_array: ndarray[tuple[Any, ...], dtype[float64]], filt_len: int) tuple[ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]]][source]
Batch Gsep with circular wrapping - exact R match.
This matches R’s: filter(xyz, filter=filt, sides=2, circular=TRUE)
- Parameters:
accel_array (np.ndarray) – Full accelerometer data, shape (N, 3)
filt_len (int) – Filter window length
- Returns:
(static, dynamic, ODBA, VeDBA) each shape (N, 3) or (N,)
- Return type:
tuple
- biologger_sim.functions.filters.gsep_streaming(x: float, y: float, z: float, filt_len: int, buffer: deque[tuple[float, float, float]]) tuple[float, float, float, float, float, float, float, float][source]
Streaming Gsep without circular wrapping - for non-circular mode.
This uses a centered filter on available samples only (no wrap-around). Edge effects at first/last ~filt_len/2 samples.
- Parameters:
x (float) – Current accelerometer readings (0.1g units)
y (float) – Current accelerometer readings (0.1g units)
z (float) – Current accelerometer readings (0.1g units)
filt_len (int) – Filter window length
buffer (deque) – Rolling buffer of samples
- Returns:
(static_x, static_y, static_z, dyn_x, dyn_y, dyn_z, ODBA, VeDBA)
- Return type:
tuple
- biologger_sim.functions.rotation.compute_pitch(static_accel: ndarray[tuple[Any, ...], dtype[float64]]) ndarray[tuple[Any, ...], dtype[float64]][source]
Calculates pitch angles from static acceleration data. :param static_accel: A 2D NumPy array of shape (n_samples, 3), where each row contains the x, y, and z components
of the static acceleration vector.
- Returns:
A 1D NumPy array of shape (n_samples,), where each element is the pitch angle (in radians) corresponding to the input acceleration vector.
- Return type:
np.ndarray
Notes
Pitch is calculated as arctan2(-x, sqrt(y^2 + z^2)).
- biologger_sim.functions.rotation.compute_roll(static_accel: ndarray[tuple[Any, ...], dtype[float64]]) ndarray[tuple[Any, ...], dtype[float64]][source]
Calculates roll angles from static acceleration data. :param static_accel: A 2D NumPy array of shape (n_samples, 3), where each row contains the x, y, and z components
of the static acceleration vector.
- Returns:
A 1D NumPy array of shape (n_samples,), where each element is the roll angle (in radians) corresponding to the input acceleration vector.
- Return type:
np.ndarray
Notes
Roll is calculated as arctan2(y, z).
- biologger_sim.functions.rotation.mag_offset(mag: ndarray[tuple[Any, ...], dtype[float64]]) ndarray[tuple[Any, ...], dtype[float64]][source]
Estimate the offset and radius of a sphere fitted to 3D magnetometer data. This function fits a sphere to the provided 3D magnetometer measurements using least squares, estimating the hard-iron offset (bias) and the radius of the sphere. The offset can be used to correct magnetometer readings for hard-iron distortions. :param mag: An (N, 3) array of magnetometer measurements, where each row represents
a 3D vector [Mx, My, Mz].
- Returns:
A 1D array of length 4: [offset_x, offset_y, offset_z, radius], where offset_x, offset_y, and offset_z are the estimated offsets for each axis, and radius is the estimated radius of the fitted sphere.
- Return type:
numpy.ndarray
- biologger_sim.functions.rotation.xb(angle: float) ndarray[tuple[Any, ...], dtype[float64]][source]
local implementation of gRumble Xb(angle) Return the rotation matrix for a rotation about the X-axis by the given angle.
CRITICAL: Must match R’s gRumble::Xb() sign convention exactly! R uses: [[1, 0, 0], [0, cos(b), sin(b)], [0, -sin(b), cos(b)]]
- Parameters:
angle (float) – The rotation angle in radians.
- Returns:
A 3x3 rotation matrix representing a rotation about the X-axis.
- Return type:
numpy.ndarray
- biologger_sim.functions.rotation.yb(angle: float) ndarray[tuple[Any, ...], dtype[float64]][source]
local implementation of gRumble Yb(angle) Return the rotation matrix for a rotation about the Y-axis by the given angle.
CRITICAL: Must match R’s gRumble::Yb() sign convention exactly! R uses: [[cos(b), 0, sin(b)], [0, 1, 0], [-sin(b), 0, cos(b)]]
- Parameters:
angle (float) – The rotation angle in radians.
- Returns:
A 3x3 rotation matrix representing a rotation about the Y-axis.
- Return type:
numpy.ndarray
I/O & Streams
- class biologger_sim.io.stream.SensorStream(config: SimulationConfig, file_path: Path, data: DataFrame | None = None)[source]
Bases:
objectHandles loading and streaming of biologger data. Simulates a real-time sensor feed from a CSV or Feather file.
- biologger_sim.io.data_loader.load_and_filter_data(data_path: Path, meta_path: Path, tag_id: str) DataFrame[source]
Load sensor data and filter by deployment time from metadata.
- Parameters:
data_path – Path to the sensor data CSV.
meta_path – Path to the metadata CSV.
tag_id – Tag ID to look up in metadata.
- Returns:
Filtered DataFrame.
- biologger_sim.io.data_loader.load_metadata(meta_path: Path, tag_id: str) dict[source]
Load metadata for a specific tag.
- Parameters:
meta_path – Path to the metadata CSV file.
tag_id – The tag ID to filter for (e.g., ‘RED001_20220812’).
- Returns:
Dictionary containing metadata for the tag.
- class biologger_sim.io.zmq_publisher.ZMQPublisher(config: SimulationConfig, debug_level: int = 0)[source]
Bases:
objectPublishes sensor data and simulation state via ZeroMQ. Connects to the NVIDIA Omniverse extension using MessagePack.
- publish(topic: str, data: dict[str, Any]) None[source]
Publishes a message to a specific topic using MessagePack.
- Parameters:
topic – The topic string (e.g., “sensor/imu”, “sim/state”).
data – The data dictionary to publish.
- publish_state(eid: int, sim_id: str, tag_id: str, state: dict[str, Any]) None[source]
Publishes the simulation state to Omniverse in the expected format. Sends Euler angles (degrees) for the receiver to handle rotation.
- Parameters:
eid – Integer entity identifier.
sim_id – String simulation identifier (e.g., “SF_RED001_v2”).
state – Dictionary containing ‘pitch_degrees’, ‘roll_degrees’, ‘heading_degrees’, ‘X_Dynamic’, ‘Y_Dynamic’, ‘Z_Dynamic’, ‘VeDBA’.