# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.
import logging
import warnings
from pathlib import Path
from typing import Any, cast
import pandas as pd
import pyarrow.feather as feather
from biologger_sim.core.datetime_utils import excel_date_to_datetime_ns
from .converter import convert_csv_to_feather
[docs]
def load_and_filter_data(data_path: Path, meta_path: Path, tag_id: str) -> pd.DataFrame:
"""
Load sensor data and filter by deployment time from metadata.
Args:
data_path: Path to the sensor data CSV.
meta_path: Path to the metadata CSV.
tag_id: Tag ID to look up in metadata.
Returns:
Filtered DataFrame.
"""
# Load metadata
meta = load_metadata(meta_path, tag_id)
# Load data
feather_path = data_path.with_suffix(".feather")
if feather_path.exists():
# Load from Feather (fast)
df = feather.read_table(feather_path).to_pandas()
else:
# Load from CSV (slow) and auto-convert
try:
logger = logging.getLogger(__name__)
logger.info(f"Auto-converting {data_path} to Feather for performance...")
try:
feather_path = convert_csv_to_feather(data_path)
df = feather.read_table(feather_path).to_pandas()
except Exception as e:
logger.warning(f"Auto-conversion failed: {e}. Falling back to CSV.")
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=pd.errors.ParserWarning)
df = pd.read_csv(
data_path,
comment=";",
index_col=False,
engine="python",
on_bad_lines="warn",
)
except pd.errors.ParserError:
# Fallback if python engine fails (unlikely for this specific issue)
raise
# Filter out rows where 'int aX' is NA (R script: dat <- dat[!is.na(dat$"int aX"),])
if "int aX" in df.columns:
df = df.dropna(subset=["int aX"])
# Ensure DateTimeP exists and is datetime
if "DateTimeP" not in df.columns:
if "Date" in df.columns:
# Excel serial date conversion using high-precision utility
def safe_excel_to_datetime(x: Any) -> Any:
if pd.isna(x) or x is None or x == "":
return pd.NaT
try:
return excel_date_to_datetime_ns(float(x))[0]
except (ValueError, TypeError):
return pd.NaT
df["DateTimeP"] = df["Date"].apply(safe_excel_to_datetime)
else:
raise ValueError("Could not find or construct DateTimeP column")
elif not pd.api.types.is_datetime64_any_dtype(df["DateTimeP"]):
df["DateTimeP"] = pd.to_datetime(df["DateTimeP"])
# Filter by time
# R: dat <- dat %>% dplyr::filter(DateTimeP > meta$time_start_utc)
df_filtered = df[df["DateTimeP"] > meta["time_start_utc"]].copy()
# Also filter by end time if available
if pd.notna(meta["time_end_utc"]):
df_filtered = df_filtered[df_filtered["DateTimeP"] < meta["time_end_utc"]]
# Pre-calculate unix timestamp (float) for high-performance streaming
# This avoids pd.Timestamp.timestamp() overhead in the yield loop
ts_ns = df_filtered["DateTimeP"].values.astype("int64")
df_filtered["timestamp"] = ts_ns // 10**9 + (ts_ns % 10**9) / 10**9
return cast(pd.DataFrame, df_filtered)