Fix coverage: add /api/coverage route, remove stray gather code from loadCoverage
This commit is contained in:
132
scripts/extract_hdf5_window.py
Executable file
132
scripts/extract_hdf5_window.py
Executable file
@@ -0,0 +1,132 @@
|
||||
"""
|
||||
Script d'extraction de fenêtres de données HDF5.
|
||||
Appelé par le backend Node.js pour lire des portions de données ADC
|
||||
sans charger tout le fichier en mémoire.
|
||||
|
||||
Usage:
|
||||
python extract_hdf5_window.py --file <path> --channel <ch0-ch3> --start <timestamp> --duration <seconds>
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import h5py
|
||||
import numpy as np
|
||||
except ImportError as e:
|
||||
print(json.dumps({"error": f"Module manquant: {e}"}))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
SAMPLE_RATE = 200 # Hz
|
||||
|
||||
|
||||
def extract_window(file_path: str, channel: str, start_ts: int, duration_sec: int) -> dict:
|
||||
"""
|
||||
Extrait une fenêtre de données ADC d'un fichier HDF5.
|
||||
|
||||
Args:
|
||||
file_path: Chemin vers le fichier H5
|
||||
channel: Canal à extraire (ch0, ch1, ch2, ch3)
|
||||
start_ts: Timestamp de début (secondes Unix)
|
||||
duration_sec: Durée en secondes
|
||||
|
||||
Returns:
|
||||
dict avec les échantillons et métadonnées
|
||||
"""
|
||||
file_path = Path(file_path)
|
||||
|
||||
if not file_path.exists():
|
||||
return {"error": f"Fichier non trouvé: {file_path}"}
|
||||
|
||||
try:
|
||||
with h5py.File(file_path, 'r') as f:
|
||||
# Chaque fichier HDF5 contient un seul dataset 'adc_values'
|
||||
# Le canal est déterminé par le nom du fichier, pas par un chemin interne
|
||||
|
||||
if 'adc_values' not in f:
|
||||
# Lister les datasets disponibles pour debug
|
||||
available = []
|
||||
def visit(name, obj):
|
||||
if isinstance(obj, h5py.Dataset):
|
||||
available.append(name)
|
||||
f.visititems(visit)
|
||||
return {"error": f"Dataset 'adc_values' non trouvé. Disponibles: {available}"}
|
||||
|
||||
dataset = f['adc_values']
|
||||
|
||||
# Récupérer les attributs de temps si disponibles
|
||||
# Chercher d'abord dans les attributs du dataset, puis du fichier
|
||||
file_start_ts = None
|
||||
if 'timestamp' in dataset.attrs:
|
||||
file_start_ts = int(dataset.attrs['timestamp'])
|
||||
elif 'start_time' in dataset.attrs:
|
||||
file_start_ts = int(dataset.attrs['start_time'])
|
||||
elif 'timestamp' in f.attrs:
|
||||
file_start_ts = int(f.attrs['timestamp'])
|
||||
elif 'start_time' in f.attrs:
|
||||
file_start_ts = int(f.attrs['start_time'])
|
||||
|
||||
# Calculer les indices de début et fin
|
||||
total_samples = dataset.shape[0]
|
||||
|
||||
if file_start_ts is not None:
|
||||
# Offset par rapport au début du fichier
|
||||
offset_sec = max(0, start_ts - file_start_ts)
|
||||
start_idx = int(offset_sec * SAMPLE_RATE)
|
||||
else:
|
||||
# Pas d'info de temps, prendre depuis le début
|
||||
start_idx = 0
|
||||
|
||||
num_samples = int(duration_sec * SAMPLE_RATE)
|
||||
end_idx = min(start_idx + num_samples, total_samples)
|
||||
|
||||
# Limiter pour éviter les gros payloads (max 60 secondes = 12000 samples)
|
||||
max_samples = 60 * SAMPLE_RATE
|
||||
if end_idx - start_idx > max_samples:
|
||||
end_idx = start_idx + max_samples
|
||||
|
||||
# Extraire les données (lecture partielle, pas tout en RAM)
|
||||
samples = dataset[start_idx:end_idx]
|
||||
|
||||
# Garder en numpy pour les stats
|
||||
samples_array = np.array(samples) if not isinstance(samples, np.ndarray) else samples
|
||||
|
||||
return {
|
||||
"samples": samples.tolist() if isinstance(samples, np.ndarray) else samples,
|
||||
"start_idx": start_idx,
|
||||
"end_idx": end_idx,
|
||||
"total_samples": total_samples,
|
||||
"file_start_ts": file_start_ts,
|
||||
"channel": channel,
|
||||
"stats": {
|
||||
"min": float(np.min(samples_array)) if len(samples_array) > 0 else None,
|
||||
"max": float(np.max(samples_array)) if len(samples_array) > 0 else None,
|
||||
"mean": float(np.mean(samples_array)) if len(samples_array) > 0 else None,
|
||||
"rms": float(np.sqrt(np.mean(samples_array**2))) if len(samples_array) > 0 else None,
|
||||
}
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Extraction de fenêtre HDF5')
|
||||
parser.add_argument('--file', required=True, help='Chemin du fichier H5')
|
||||
parser.add_argument('--channel', required=True, help='Canal (ch0-ch3)')
|
||||
parser.add_argument('--start', type=int, required=True, help='Timestamp de début')
|
||||
parser.add_argument('--duration', type=int, default=10, help='Durée en secondes')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
result = extract_window(args.file, args.channel, args.start, args.duration)
|
||||
|
||||
# Sortie JSON pour le backend Node.js
|
||||
print(json.dumps(result))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user