API Reference

This section contains the automatically generated documentation for the biologger-sim modules.

Core Processors

class biologger_sim.processors.lab.PostFactoProcessor(filt_len: int = 48, freq: int = 16, debug_level: int = 0, r_exact_mode: bool = False, compute_attachment_angles: bool = True, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, compute_mag_offsets: bool = True, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None, depth_cfg: DepthConfig | None = None, zmq_publisher: ZMQPublisher | None = None, eid: int | None = None, sim_id: str | None = None, tag_id: str = 'unknown', clock_source: ClockSource = ClockSource.FIXED_FREQ, **kwargs: Any)[source]

Bases: BiologgerProcessor

Post-facto (non-causal) biologger processor for R-compatibility.

This processor uses R’s centered moving average filter (filter(sides=2, circular=TRUE)) to achieve exact tie-out with the gRumble R package. Unlike StreamingProcessor which uses causal lfilter (trailing window), this uses a centered window looking both forward and backward in time, which is only possible for post-hoc analysis.

Memory Footprint:
  • Fixed size: O(filt_len) samples (48 samples @ 16Hz for 3s window)

  • Independent of dataset size (unlike batch/ which loads all data)

Validation Target:
  • <0.1° error vs R (pitch, roll, heading)

  • Exact ODBA/VeDBA match

calibrate_from_batch_data() None[source]

Perform batch calibration from collected data.

This should be called after the first pass through the dataset in r_exact_mode. Computes attachment angles and magnetometer offsets from full dataset.

get_current_state() dict[str, Any][source]

Get current processor state.

get_output_schema() list[str][source]

Get list of output fields (R-compatible expanded schema).

get_performance_summary() dict[str, Any][source]

Get performance metrics.

process(data: dict[str, Any] | ndarray) dict[str, Any][source]

Process a single record using non-causal filtfilt.

Parameters:

data – Raw sensor record (dict or array)

Returns:

Dictionary with processed state, or minimal state if record is skipped.

reset() None[source]

Reset processor to initial state.

update_config(config_updates: dict[str, Any]) None[source]

Update processor configuration.

class biologger_sim.processors.streaming.StreamingProcessor(filt_len: int = 48, freq: int = 16, debug_level: int = 0, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None, ema_fast_alpha: float = 0.2, ema_slow_alpha: float = 0.02, ema_cross_threshold: float = 0.5, zmq_publisher: ZMQPublisher | None = None, eid: int | None = None, sim_id: str = 'default', tag_id: str = 'unknown', dead_reckoning_speed_model: str = 'odba_scaled', dead_reckoning_constant_speed: float = 1.0, dead_reckoning_odba_factor: float = 2.0, highpass_cutoff: float = 0.1, **kwargs: Any)[source]

Bases: BiologgerProcessor

Causal (real-time) streaming processor for digital twin and on-tag simulation.

calibrate_from_batch_data() None[source]
get_current_state() dict[str, Any][source]

Get current processor state and internal parameters.

Returns:

Dict containing current state information, calibration status, buffer contents, and algorithm-specific diagnostics

get_performance_summary() dict[str, Any][source]

Get comprehensive performance metrics and telemetry.

Returns:

Dict containing timing statistics, throughput metrics, algorithm performance, and diagnostic information

process(record: dict[str, Any] | Any) dict[str, Any][source]

Process input data through the pipeline.

This is the main processing method that all processors must implement. The input format is flexible to support different processor types: - Streaming: Dict with sensor records (accel, mag, depth, etc.) - Adaptive: np.ndarray with accelerometer samples - Batch: Dict with full dataset parameters

Parameters:

data – Input data - format depends on processor type

Returns:

Dict containing processed results and metadata

Raises:

ProcessingError – If processing fails due to invalid input or internal error

reset() None[source]

Reset processor to initial state.

Clears all internal buffers, calibration state, and accumulated statistics while preserving configuration settings.

update_config(config_updates: dict[str, Any]) None[source]

Update processor configuration at runtime.

Parameters:

config_updates – Dictionary of configuration parameters to update

Raises:

ConfigurationError – If configuration update is invalid

Simulation Logic & Types

class biologger_sim.core.types.AHRSAlgorithm(value)[source]

Bases: str, Enum

Defines the AHRS algorithm to use for sensor fusion.

COMPLEMENTARY = 'complementary'
EKF = 'ekf'
MADGWICK = 'madgwick'
MAHONY = 'mahony'
class biologger_sim.core.types.AHRSConfig(*, enabled: bool = False, algorithm: AHRSAlgorithm = AHRSAlgorithm.MADGWICK, beta: float = 0.1, kp: float = 0.5, ki: float = 0.0, use_adaptive: bool = False, normal_gravity_threshold: float = 0.05, slip_gravity_threshold: float = 0.15, smoothing_alpha: float = 0.01)[source]

Bases: BaseModel

Configuration for AHRS sensor fusion.

algorithm: AHRSAlgorithm
beta: float
enabled: bool
ki: float
kp: float
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

normal_gravity_threshold: float
slip_gravity_threshold: float
smoothing_alpha: float
use_adaptive: bool
class biologger_sim.core.types.CalibrationConfig(*, attachment_angle_mode: CalibrationMode = CalibrationMode.BATCH_COMPUTE, magnetometer_mode: CalibrationMode = CalibrationMode.BATCH_COMPUTE, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None)[source]

Bases: BaseModel

Configuration for sensor calibration.

attachment_angle_mode: CalibrationMode
locked_attachment_pitch_deg: float | None
locked_attachment_roll_deg: float | None
locked_mag_offset_x: float | None
locked_mag_offset_y: float | None
locked_mag_offset_z: float | None
locked_mag_sphere_radius: float | None
magnetometer_mode: CalibrationMode
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class biologger_sim.core.types.CalibrationMode(value)[source]

Bases: str, Enum

Defines how sensor calibration parameters are determined.

BATCH_COMPUTE: Calculate from the full dataset (R-style, acausal). FIXED: Use pre-determined constants (Real-time/Tag-style).

BATCH_COMPUTE = 'batch_compute'
FIXED = 'fixed'
class biologger_sim.core.types.ClockSource(value)[source]

Bases: str, Enum

Defines the source of time steps for integration.

FIXED_FREQ: Use fixed 1/sampling_rate_hz steps. (R-Compatible) SENSOR_TIME: Use actual differential timestamps from sensor data.

FIXED_FREQ = 'fixed_frequency'
SENSOR_TIME = 'sensor_timestamps'
class biologger_sim.core.types.DeadReckoningConfig(*, speed_model: SpeedModel = SpeedModel.ODBA_SCALED, constant_speed_m_s: float = 1.0, odba_speed_factor: float = 2.0)[source]

Bases: BaseModel

Configuration for dead reckoning and speed estimation.

constant_speed_m_s: float
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

odba_speed_factor: float
speed_model: SpeedModel
class biologger_sim.core.types.DepthAlgorithm(value)[source]

Bases: str, Enum

Defines the algorithm used for depth estimation.

ACAUSAL_INTERP: Post-hoc, full-batch interpolation (R-compatible). CAUSAL_SAMPLE_HOLD: Simple hold/nearest neighbor for real-time.

ACAUSAL_INTERP = 'acausal_interpolate'
CAUSAL_SAMPLE_HOLD = 'causal_sample_hold'
OFF = 'off'
class biologger_sim.core.types.DepthConfig(*, algorithm: DepthAlgorithm = DepthAlgorithm.CAUSAL_SAMPLE_HOLD, interpolation_max_gap: float = 5.0)[source]

Bases: BaseModel

Configuration for depth estimation.

algorithm: DepthAlgorithm
interpolation_max_gap: float
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class biologger_sim.core.types.EntityConfig(*, sim_id: str, tag_id: str | None = None, input_file: Path, sampling_rate_hz: int = 16, start_time_offset: float = 0.0, metadata_file: Path | None = None, clock_source: ClockSource = ClockSource.FIXED_FREQ, strict_r_parity: bool = False, calibration: CalibrationConfig = <factory>, depth_estimation: DepthConfig = <factory>, dead_reckoning: DeadReckoningConfig = <factory>, ahrs: AHRSConfig = <factory>, save_telemetry: bool = False)[source]

Bases: BaseModel

Configuration for a single simulation entity.

ahrs: AHRSConfig
calibration: CalibrationConfig
clock_source: ClockSource
dead_reckoning: DeadReckoningConfig
depth_estimation: DepthConfig
enforce_r_parity() EntityConfig[source]

Enforces R-parity settings if strict mode is enabled.

input_file: Path
metadata_file: Path | None
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

sampling_rate_hz: int
save_telemetry: bool
sim_id: str
start_time_offset: float
strict_r_parity: bool
tag_id: str | None
class biologger_sim.core.types.PipelineConfig(*, mode: ProcessingMode = ProcessingMode.SIMULATION, publish_zmq: bool = True, simulation: SimulationConfig)[source]

Bases: BaseModel

Global multi-entity pipeline configuration.

mode: ProcessingMode
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

publish_zmq: bool
simulation: SimulationConfig
class biologger_sim.core.types.ProcessingMode(value)[source]

Bases: str, Enum

Defines the processing mode for the pipeline.

LAB: Post-hoc analysis using acausal filters (filtfilt) for maximum accuracy.

Matches R implementation.

SIMULATION: Real-time simulation using causal filters (lfilter/EMA).

Simulates on-tag processing constraints.

LAB = 'LAB'
SIMULATION = 'SIMULATION'
class biologger_sim.core.types.SimulationConfig(*, entities: list[EntityConfig] = <factory>, meta_file: Path | None = None, playback_speed: float = 1.0, loop: bool = False, zmq: ZMQConfig = <factory>)[source]

Bases: BaseModel

Configuration for simulation-wide settings.

entities: list[EntityConfig]
loop: bool
meta_file: Path | None
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

playback_speed: float
classmethod validate_simulation_source(values: Any) Any[source]

Ensures that either ‘entities’ or ‘input_file’ (legacy/simple) is provided.

zmq: ZMQConfig
class biologger_sim.core.types.SpeedModel(value)[source]

Bases: str, Enum

Defines the model used for speed estimation (dead reckoning).

CONSTANT = 'constant'
ODBA_SCALED = 'odba_scaled'
class biologger_sim.core.types.ZMQConfig(*, port: int = 5555, host: str = '127.0.0.1', topic: str = 'biologger/telemetry')[source]

Bases: BaseModel

Configuration for ZeroMQ communication.

host: str
model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

port: int
topic: str
class biologger_sim.core.processor_interface.BiologgerProcessor[source]

Bases: ABC

Abstract base interface for biologger data processors.

This interface defines the standard contract that all biologger processing pipelines should implement, enabling consistent usage patterns across batch, streaming, and adaptive sensor fusion processors.

Design Principles: - Unified processing interface via process() method - Standardized configuration and state management - Consistent performance monitoring and telemetry - Flexible input/output for different data types - Runtime reconfiguration support

abstractmethod get_current_state() dict[str, Any][source]

Get current processor state and internal parameters.

Returns:

Dict containing current state information, calibration status, buffer contents, and algorithm-specific diagnostics

get_output_schema() list[str][source]

Get list of output field names produced by process().

Default implementation returns empty list. Processors should override this to provide schema information for output validation.

Returns:

List of string field names in process() output

abstractmethod get_performance_summary() dict[str, Any][source]

Get comprehensive performance metrics and telemetry.

Returns:

Dict containing timing statistics, throughput metrics, algorithm performance, and diagnostic information

get_processor_info() dict[str, Any][source]

Get processor metadata and capabilities.

Returns:

Dict containing processor type, version, supported features, and configuration parameters

abstractmethod process(data: dict[str, Any] | ndarray) dict[str, Any][source]

Process input data through the pipeline.

This is the main processing method that all processors must implement. The input format is flexible to support different processor types: - Streaming: Dict with sensor records (accel, mag, depth, etc.) - Adaptive: np.ndarray with accelerometer samples - Batch: Dict with full dataset parameters

Parameters:

data – Input data - format depends on processor type

Returns:

Dict containing processed results and metadata

Raises:

ProcessingError – If processing fails due to invalid input or internal error

abstractmethod reset() None[source]

Reset processor to initial state.

Clears all internal buffers, calibration state, and accumulated statistics while preserving configuration settings.

abstractmethod update_config(config_updates: dict[str, Any]) None[source]

Update processor configuration at runtime.

Parameters:

config_updates – Dictionary of configuration parameters to update

Raises:

ConfigurationError – If configuration update is invalid

validate_input(data: dict[str, Any] | ndarray) bool[source]

Validate input data format and content.

Default implementation always returns True. Processors can override this for custom validation logic.

Parameters:

data – Input data to validate

Returns:

True if input is valid, False otherwise

exception biologger_sim.core.processor_interface.ConfigurationError[source]

Bases: Exception

Exception raised for invalid configuration updates.

exception biologger_sim.core.processor_interface.ProcessingError[source]

Bases: Exception

Exception raised during data processing.

Shared INS (Inertial Navigation System) module for biologger data.

Ported from biologger-pseudotrack to achieve exact depth parity with R baseline.

This module provides reusable INS capabilities including: - Kalman filtering for depth/velocity fusion - Vertical velocity estimation via acceleration integration - Optional biological constraint enforcement (velocity/acceleration limits) - State persistence across algorithm transitions

Biological constraints can be disabled by passing None (R-style streaming).

class biologger_sim.core.ins.INSSolution(sample_rate: float, biological_constraints: dict[str, float] | None = None)[source]

Bases: object

6-DOF INS solution with Kalman filtering for depth estimation.

get_position_estimate() tuple[float, float, float][source]

Get current position estimate.

Returns:

Tuple of (x, y, z) position components

get_state() dict[str, Any][source]

Get current INS state information.

Returns:

Dictionary with complete state information

get_velocity_estimate() tuple[float, float, float][source]

Get current velocity estimate.

Returns:

Tuple of (vx, vy, vz) velocity components in m/s

reset() None[source]

Reset INS solution to initial state.

update(accel_world: ndarray, depth_measurement: float | None, dt: float, current_time: float) dict[str, Any][source]

Update INS solution with new measurements.

Parameters:
  • accel_world – World-frame acceleration [ax, ay, az] in m/s²

  • depth_measurement – Depth measurement in meters, None if not available

  • dt – Time step since last update in seconds

  • current_time – Current timestamp

Returns:

Dictionary with updated state estimates

class biologger_sim.core.ins.VelocityEstimator(process_noise_depth: float = 0.0001, process_noise_velocity: float = 0.001, measurement_noise: float = 0.02)[source]

Bases: object

Kalman filter wrapper for vertical velocity estimation.

apply_biological_constraints(max_velocity: float) None[source]

Apply biological constraints to velocity estimate.

Parameters:

max_velocity – Maximum biologically plausible velocity (m/s)

get_depth() float[source]

Get current depth estimate.

Returns:

Estimated depth in meters

get_velocity() float[source]

Get current velocity estimate.

Returns:

Estimated vertical velocity in m/s

initialize(initial_depth: float) None[source]

Initialize Kalman filter with first depth measurement.

Parameters:

initial_depth – Initial depth value for filter initialization

is_initialized() bool[source]

Check if the velocity estimator is initialized.

Returns:

True if Kalman filter is initialized

predict(dt: float, vertical_acceleration: float) None[source]

Prediction step of Kalman filter.

Parameters:
  • dt – Time step in seconds

  • vertical_acceleration – Filtered vertical acceleration in m/s²

reset() None[source]

Reset velocity estimator to initial state.

update(depth_measurement: float) None[source]

Update step of Kalman filter with depth measurement.

Parameters:

depth_measurement – Measured depth value

Scientific Functions

class biologger_sim.functions.depth_smoother.DepthSmoother(freq: int = 16, low_activity_threshold: float = 0.15, high_activity_threshold: float = 0.2, debug_level: int = 0)[source]

Bases: object

Multi-scale EMA depth smoothing for reducing high-frequency variance while preserving accuracy. Implements adaptive blending of multiple EMA filters with different time constants: - Fast EMA (2-3 second response): Captures rapid depth changes during active periods - Medium EMA (10-15 second response): Balanced response for general smoothing - Slow EMA (30-60 second response): Provides stability during low-activity periods

Adaptive blending ratios based on ODBA.

update(depth_estimate: float, odba: float) float[source]

Update smoothing with new depth estimate and activity level.

Non-causal filtering for post-facto biologger analysis.

This module provides filtfilt-based (bidirectional, zero-phase) filtering to achieve R-compatibility for scientific validation. Unlike the streaming module which uses causal lfilter, these filters look both forward and backward in time, which is only possible for post-hoc analysis.

Key functions: - gsep_filtfilt: R-compatible gravitational separation using scipy.signal.filtfilt

biologger_sim.functions.filters.butterworth_filtfilt(data: ndarray[tuple[Any, ...], dtype[float64]], cutoff: float, fs: float, order: int = 5) ndarray[tuple[Any, ...], dtype[float64]][source]

Apply Butterworth low-pass filter using filtfilt for zero-phase filtering.

Parameters:
  • data (np.ndarray) – Input signal

  • cutoff (float) – Cutoff frequency in Hz

  • fs (float) – Sampling frequency in Hz

  • order (int) – Filter order (default 5)

Returns:

Filtered signal

Return type:

np.ndarray

biologger_sim.functions.filters.gsep_batch_circular(accel_array: ndarray[tuple[Any, ...], dtype[float64]], filt_len: int) tuple[ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]]][source]

Batch Gsep with circular wrapping - exact R match.

This matches R’s: filter(xyz, filter=filt, sides=2, circular=TRUE)

Parameters:
  • accel_array (np.ndarray) – Full accelerometer data, shape (N, 3)

  • filt_len (int) – Filter window length

Returns:

(static, dynamic, ODBA, VeDBA) each shape (N, 3) or (N,)

Return type:

tuple

biologger_sim.functions.filters.gsep_streaming(x: float, y: float, z: float, filt_len: int, buffer: deque[tuple[float, float, float]]) tuple[float, float, float, float, float, float, float, float][source]

Streaming Gsep without circular wrapping - for non-circular mode.

This uses a centered filter on available samples only (no wrap-around). Edge effects at first/last ~filt_len/2 samples.

Parameters:
  • x (float) – Current accelerometer readings (0.1g units)

  • y (float) – Current accelerometer readings (0.1g units)

  • z (float) – Current accelerometer readings (0.1g units)

  • filt_len (int) – Filter window length

  • buffer (deque) – Rolling buffer of samples

Returns:

(static_x, static_y, static_z, dyn_x, dyn_y, dyn_z, ODBA, VeDBA)

Return type:

tuple

biologger_sim.functions.rotation.compute_pitch(static_accel: ndarray[tuple[Any, ...], dtype[float64]]) ndarray[tuple[Any, ...], dtype[float64]][source]

Calculates pitch angles from static acceleration data. :param static_accel: A 2D NumPy array of shape (n_samples, 3), where each row contains the x, y, and z components

of the static acceleration vector.

Returns:

A 1D NumPy array of shape (n_samples,), where each element is the pitch angle (in radians) corresponding to the input acceleration vector.

Return type:

np.ndarray

Notes

Pitch is calculated as arctan2(-x, sqrt(y^2 + z^2)).

biologger_sim.functions.rotation.compute_roll(static_accel: ndarray[tuple[Any, ...], dtype[float64]]) ndarray[tuple[Any, ...], dtype[float64]][source]

Calculates roll angles from static acceleration data. :param static_accel: A 2D NumPy array of shape (n_samples, 3), where each row contains the x, y, and z components

of the static acceleration vector.

Returns:

A 1D NumPy array of shape (n_samples,), where each element is the roll angle (in radians) corresponding to the input acceleration vector.

Return type:

np.ndarray

Notes

Roll is calculated as arctan2(y, z).

biologger_sim.functions.rotation.mag_offset(mag: ndarray[tuple[Any, ...], dtype[float64]]) ndarray[tuple[Any, ...], dtype[float64]][source]

Estimate the offset and radius of a sphere fitted to 3D magnetometer data. This function fits a sphere to the provided 3D magnetometer measurements using least squares, estimating the hard-iron offset (bias) and the radius of the sphere. The offset can be used to correct magnetometer readings for hard-iron distortions. :param mag: An (N, 3) array of magnetometer measurements, where each row represents

a 3D vector [Mx, My, Mz].

Returns:

A 1D array of length 4: [offset_x, offset_y, offset_z, radius], where offset_x, offset_y, and offset_z are the estimated offsets for each axis, and radius is the estimated radius of the fitted sphere.

Return type:

numpy.ndarray

biologger_sim.functions.rotation.xb(angle: float) ndarray[tuple[Any, ...], dtype[float64]][source]

local implementation of gRumble Xb(angle) Return the rotation matrix for a rotation about the X-axis by the given angle.

CRITICAL: Must match R’s gRumble::Xb() sign convention exactly! R uses: [[1, 0, 0], [0, cos(b), sin(b)], [0, -sin(b), cos(b)]]

Parameters:

angle (float) – The rotation angle in radians.

Returns:

A 3x3 rotation matrix representing a rotation about the X-axis.

Return type:

numpy.ndarray

biologger_sim.functions.rotation.yb(angle: float) ndarray[tuple[Any, ...], dtype[float64]][source]

local implementation of gRumble Yb(angle) Return the rotation matrix for a rotation about the Y-axis by the given angle.

CRITICAL: Must match R’s gRumble::Yb() sign convention exactly! R uses: [[cos(b), 0, sin(b)], [0, 1, 0], [-sin(b), 0, cos(b)]]

Parameters:

angle (float) – The rotation angle in radians.

Returns:

A 3x3 rotation matrix representing a rotation about the Y-axis.

Return type:

numpy.ndarray

I/O & Streams

class biologger_sim.io.stream.SensorStream(config: SimulationConfig, file_path: Path, data: DataFrame | None = None)[source]

Bases: object

Handles loading and streaming of biologger data. Simulates a real-time sensor feed from a CSV or Feather file.

stream(chunk_size: int = 100000) Generator[dict[str, Any], None, None][source]

Yields data chunks simulating real-time arrival.

Parameters:

chunk_size – Number of samples to yield per step.

biologger_sim.io.data_loader.load_and_filter_data(data_path: Path, meta_path: Path, tag_id: str) DataFrame[source]

Load sensor data and filter by deployment time from metadata.

Parameters:
  • data_path – Path to the sensor data CSV.

  • meta_path – Path to the metadata CSV.

  • tag_id – Tag ID to look up in metadata.

Returns:

Filtered DataFrame.

biologger_sim.io.data_loader.load_metadata(meta_path: Path, tag_id: str) dict[source]

Load metadata for a specific tag.

Parameters:
  • meta_path – Path to the metadata CSV file.

  • tag_id – The tag ID to filter for (e.g., ‘RED001_20220812’).

Returns:

Dictionary containing metadata for the tag.

class biologger_sim.io.zmq_publisher.ZMQPublisher(config: SimulationConfig, debug_level: int = 0)[source]

Bases: object

Publishes sensor data and simulation state via ZeroMQ. Connects to the NVIDIA Omniverse extension using MessagePack.

close() None[source]

Clean up ZMQ resources.

publish(topic: str, data: dict[str, Any]) None[source]

Publishes a message to a specific topic using MessagePack.

Parameters:
  • topic – The topic string (e.g., “sensor/imu”, “sim/state”).

  • data – The data dictionary to publish.

publish_state(eid: int, sim_id: str, tag_id: str, state: dict[str, Any]) None[source]

Publishes the simulation state to Omniverse in the expected format. Sends Euler angles (degrees) for the receiver to handle rotation.

Parameters:
  • eid – Integer entity identifier.

  • sim_id – String simulation identifier (e.g., “SF_RED001_v2”).

  • state – Dictionary containing ‘pitch_degrees’, ‘roll_degrees’, ‘heading_degrees’, ‘X_Dynamic’, ‘Y_Dynamic’, ‘Z_Dynamic’, ‘VeDBA’.