""" Pré-calcul des valeurs RMS ADC pour tous les nodes. Génère un fichier JSON avec les RMS à intervalles réguliers pour une lecture rapide. """ import json import sys from pathlib import Path from datetime import datetime from typing import Dict, List, Any import numpy as np import h5py from tqdm import tqdm # Configuration SAMPLE_RATE = 200 # Hz RMS_INTERVAL_SEC = 60 # Calculer RMS toutes les 60 secondes (plus rapide) RMS_WINDOW_SEC = 5 # Fenêtre de calcul RMS (5 secondes = 1000 samples) INDEX_PATH = Path(r"F:\seismic_webapp\data\index.json") OUTPUT_DIR = Path(r"F:\seismic_webapp\data\rms_cache") def compute_rms_for_file(h5_path: str, interval_sec: int = RMS_INTERVAL_SEC, window_sec: int = RMS_WINDOW_SEC, max_duration_sec: int = 3600) -> List[Dict]: """ Calcule les valeurs RMS à intervalles réguliers pour un fichier HDF5. Retourne une liste de {timestamp, rms} max_duration_sec: Limite à traiter (en secondes) pour accélérer """ results = [] try: with h5py.File(h5_path, 'r') as f: if 'adc_values' not in f: return results dataset = f['adc_values'] total_samples = dataset.shape[0] # Récupérer le timestamp de début start_ts = None if 'timestamp' in dataset.attrs: start_ts = int(dataset.attrs['timestamp']) if start_ts is None: return results # Calculer RMS à intervalles réguliers window_samples = window_sec * SAMPLE_RATE interval_samples = interval_sec * SAMPLE_RATE # Limiter la durée pour accélérer max_samples = min(total_samples, max_duration_sec * SAMPLE_RATE) for idx in range(0, max_samples - window_samples, interval_samples): # Lire uniquement la fenêtre nécessaire samples = dataset[idx:idx + window_samples] # Calculer RMS rms = float(np.sqrt(np.mean(samples.astype(np.float64) ** 2))) # Timestamp pour ce point ts = start_ts + (idx // SAMPLE_RATE) results.append({ 'ts': ts, 'rms': rms }) except Exception as e: print(f"Erreur lecture {h5_path}: {e}") return results def precompute_for_date(index: Dict, date: str, channel: str = 'ch0') -> Dict[str, List[Dict]]: """ Pré-calcule les RMS pour tous les nodes pour une date donnée. Retourne {node_id: [{ts, rms}, ...]} """ results = {} # Trouver tous les nodes avec données pour cette date nodes_with_data = [] for node_id, node in index['nodes'].items(): if node.get('dates') and date in node['dates']: nodes_with_data.append((node_id, node['dates'][date])) print(f"Traitement de {len(nodes_with_data)} nodes pour {date}, canal {channel}") for node_id, files in tqdm(nodes_with_data, desc=f"Date {date}"): # Trouver le fichier pour le canal demandé (priorité aux fichiers "data") channel_pattern = f'_{channel}_' target_file = None for f in files: if channel_pattern in f['path'] and '_data_' in f['path']: target_file = f break if not target_file: for f in files: if channel_pattern in f['path']: target_file = f break if not target_file: continue # Calculer les RMS rms_data = compute_rms_for_file(target_file['path']) if rms_data: results[node_id] = rms_data return results def main(): import argparse parser = argparse.ArgumentParser(description='Pré-calcul des RMS ADC') parser.add_argument('--date', help='Date spécifique (ex: 2020-09-02)') parser.add_argument('--channel', default='ch0', help='Canal (ch0-ch3)') parser.add_argument('--all', action='store_true', help='Traiter toutes les dates/canaux') args = parser.parse_args() # Charger l'index if not INDEX_PATH.exists(): print(f"Index non trouvé: {INDEX_PATH}") sys.exit(1) with open(INDEX_PATH, 'r') as f: index = json.load(f) print(f"Index chargé: {len(index['nodes'])} nodes, {len(index['dates'])} dates") # Créer le dossier de sortie OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # Déterminer quoi traiter if args.date: dates_to_process = [args.date] channels_to_process = [args.channel] elif args.all: dates_to_process = index['dates'] channels_to_process = ['ch0', 'ch1', 'ch2', 'ch3'] else: # Par défaut, traiter la première date disponible, canal ch0 dates_to_process = [index['dates'][0]] if index['dates'] else [] channels_to_process = ['ch0'] for date in dates_to_process: for channel in channels_to_process: output_file = OUTPUT_DIR / f"rms_{date}_{channel}.json" # Skip si déjà calculé if output_file.exists(): print(f"Skip {output_file.name} (déjà existant)") continue print(f"\n=== Traitement {date} - {channel} ===") results = precompute_for_date(index, date, channel) if results: # Sauvegarder output_data = { 'date': date, 'channel': channel, 'interval_sec': RMS_INTERVAL_SEC, 'window_sec': RMS_WINDOW_SEC, 'nodes': results, 'generated_at': datetime.now().isoformat() } with open(output_file, 'w') as f: json.dump(output_data, f) print(f"Sauvegardé: {output_file.name} ({len(results)} nodes)") else: print(f"Aucune donnée pour {date} - {channel}") print("\n=== Terminé ===") if __name__ == '__main__': main()