Source code for biologger_sim.io.stream

# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.

import logging
from collections.abc import Generator
from pathlib import Path
from typing import Any, cast

import numpy as np
import pandas as pd
import pyarrow.feather as feather

from ..core.types import SimulationConfig


[docs] class SensorStream: """ Handles loading and streaming of biologger data. Simulates a real-time sensor feed from a CSV or Feather file. """ def __init__(self, config: SimulationConfig, file_path: Path, data: pd.DataFrame | None = None): self.config = config self.file_path = file_path self.data = data self.metadata: dict[str, str] = {} if self.data is None: self._load_data() def _load_data(self) -> None: """Loads the data into memory.""" if not self.file_path.exists(): raise FileNotFoundError(f"Data file not found: {self.file_path}") if self.file_path.suffix == ".feather": self._load_feather() elif self.file_path.suffix == ".csv": # Check if a feather version exists feather_path = self.file_path.with_suffix(".feather") if feather_path.exists(): # Use the feather file instead self.file_path = feather_path self._load_feather() else: # Auto-convert to Feather for performance try: # Import here to avoid circular dependency if any (though unlikely) from .converter import convert_csv_to_feather # Only convert if file is large enough? Or always? # Plan says "configure Lab to output to Feather by default (>100MB datasets)" # But for input, let's just convert. logger = logging.getLogger(__name__) logger.info(f"Auto-converting {self.file_path} to Feather for performance...") feather_path = convert_csv_to_feather(self.file_path) self.file_path = feather_path self._load_feather() except Exception as e: logger.warning(f"Auto-conversion failed: {e}. Falling back to CSV.") self._load_csv() else: # Fallback to CSV loader for other extensions if supported, or raise error # For now assume CSV if not feather self._load_csv() def _load_feather(self) -> None: """Loads data from a Feather file with metadata.""" table = feather.read_table(self.file_path) self.data = table.to_pandas() if self.data is None: raise RuntimeError("Failed to load data from Feather file") # Extract metadata if table.schema.metadata: self.metadata = { k.decode("utf-8"): v.decode("utf-8") for k, v in table.schema.metadata.items() } if "DateTimeP" in self.data.columns: if not pd.api.types.is_datetime64_any_dtype(self.data["DateTimeP"]): self.data["DateTimeP"] = pd.to_datetime(self.data["DateTimeP"]) ts_ns = cast(np.ndarray, self.data["DateTimeP"].values).astype("int64") self.data["timestamp"] = ts_ns / 1e9 # Sort by time just in case if "timestamp" in self.data.columns: self.data = self.data.sort_values("timestamp") elif "DateTimeP" in self.data.columns: self.data = self.data.sort_values("DateTimeP") def _load_csv(self) -> None: """Loads the CSV data into memory.""" # Load only necessary columns for now to save memory if file is huge # In a real scenario, we might use chunking for massive files self.data = pd.read_csv(self.file_path) # Ensure DateTime is parsed if "DateTimeP" in self.data.columns: self.data["DateTimeP"] = pd.to_datetime(self.data["DateTimeP"]) ts_ns = cast(np.ndarray, self.data["DateTimeP"].values).astype("int64") # Convert to float seconds efficiently self.data["timestamp"] = ts_ns / 1e9 # Sort by time just in case if "timestamp" in self.data.columns: self.data = self.data.sort_values("timestamp") elif "DateTimeP" in self.data.columns: self.data = self.data.sort_values("DateTimeP")
[docs] def stream(self, chunk_size: int = 100000) -> Generator[dict[str, Any], None, None]: """ Yields data chunks simulating real-time arrival. Args: chunk_size: Number of samples to yield per step. """ if self.data is None: raise RuntimeError("Data not loaded") num_samples = len(self.data) current_idx = 0 while True: if current_idx >= num_samples: if self.config.loop: current_idx = 0 else: break # Extract chunk end_idx = min(current_idx + chunk_size, num_samples) chunk = self.data.iloc[current_idx:end_idx] # Convert to dict records for easy consumption records = cast(list[dict[str, Any]], chunk.to_dict("records")) yield from records current_idx = end_idx