Files
seisee/scripts/extract_hdf5_window.py

133 lines
5.0 KiB
Python
Executable File

"""
Script d'extraction de fenêtres de données HDF5.
Appelé par le backend Node.js pour lire des portions de données ADC
sans charger tout le fichier en mémoire.
Usage:
python extract_hdf5_window.py --file <path> --channel <ch0-ch3> --start <timestamp> --duration <seconds>
"""
import argparse
import json
import sys
from pathlib import Path
try:
import h5py
import numpy as np
except ImportError as e:
print(json.dumps({"error": f"Module manquant: {e}"}))
sys.exit(1)
SAMPLE_RATE = 200 # Hz
def extract_window(file_path: str, channel: str, start_ts: int, duration_sec: int) -> dict:
"""
Extrait une fenêtre de données ADC d'un fichier HDF5.
Args:
file_path: Chemin vers le fichier H5
channel: Canal à extraire (ch0, ch1, ch2, ch3)
start_ts: Timestamp de début (secondes Unix)
duration_sec: Durée en secondes
Returns:
dict avec les échantillons et métadonnées
"""
file_path = Path(file_path)
if not file_path.exists():
return {"error": f"Fichier non trouvé: {file_path}"}
try:
with h5py.File(file_path, 'r') as f:
# Chaque fichier HDF5 contient un seul dataset 'adc_values'
# Le canal est déterminé par le nom du fichier, pas par un chemin interne
if 'adc_values' not in f:
# Lister les datasets disponibles pour debug
available = []
def visit(name, obj):
if isinstance(obj, h5py.Dataset):
available.append(name)
f.visititems(visit)
return {"error": f"Dataset 'adc_values' non trouvé. Disponibles: {available}"}
dataset = f['adc_values']
# Récupérer les attributs de temps si disponibles
# Chercher d'abord dans les attributs du dataset, puis du fichier
file_start_ts = None
if 'timestamp' in dataset.attrs:
file_start_ts = int(dataset.attrs['timestamp'])
elif 'start_time' in dataset.attrs:
file_start_ts = int(dataset.attrs['start_time'])
elif 'timestamp' in f.attrs:
file_start_ts = int(f.attrs['timestamp'])
elif 'start_time' in f.attrs:
file_start_ts = int(f.attrs['start_time'])
# Calculer les indices de début et fin
total_samples = dataset.shape[0]
if file_start_ts is not None:
# Offset par rapport au début du fichier
offset_sec = max(0, start_ts - file_start_ts)
start_idx = int(offset_sec * SAMPLE_RATE)
else:
# Pas d'info de temps, prendre depuis le début
start_idx = 0
num_samples = int(duration_sec * SAMPLE_RATE)
end_idx = min(start_idx + num_samples, total_samples)
# Limiter pour éviter les gros payloads (max 60 secondes = 12000 samples)
max_samples = 60 * SAMPLE_RATE
if end_idx - start_idx > max_samples:
end_idx = start_idx + max_samples
# Extraire les données (lecture partielle, pas tout en RAM)
samples = dataset[start_idx:end_idx]
# Garder en numpy pour les stats
samples_array = np.array(samples) if not isinstance(samples, np.ndarray) else samples
return {
"samples": samples.tolist() if isinstance(samples, np.ndarray) else samples,
"start_idx": start_idx,
"end_idx": end_idx,
"total_samples": total_samples,
"file_start_ts": file_start_ts,
"channel": channel,
"stats": {
"min": float(np.min(samples_array)) if len(samples_array) > 0 else None,
"max": float(np.max(samples_array)) if len(samples_array) > 0 else None,
"mean": float(np.mean(samples_array)) if len(samples_array) > 0 else None,
"rms": float(np.sqrt(np.mean(samples_array**2))) if len(samples_array) > 0 else None,
}
}
except Exception as e:
return {"error": str(e)}
def main():
parser = argparse.ArgumentParser(description='Extraction de fenêtre HDF5')
parser.add_argument('--file', required=True, help='Chemin du fichier H5')
parser.add_argument('--channel', required=True, help='Canal (ch0-ch3)')
parser.add_argument('--start', type=int, required=True, help='Timestamp de début')
parser.add_argument('--duration', type=int, default=10, help='Durée en secondes')
args = parser.parse_args()
result = extract_window(args.file, args.channel, args.start, args.duration)
# Sortie JSON pour le backend Node.js
print(json.dumps(result))
if __name__ == '__main__':
main()