""" Script d'indexation des fichiers HDF5 sismiques. Parcourt les dossiers de données, extrait les métadonnées (node_id, date, canaux) et génère un index JSON utilisé par l'API backend. """ import os import re import json import csv from pathlib import Path from datetime import datetime from typing import Dict, List, Any # Pattern pour extraire les infos du nom de fichier # Exemple: auto_256_070617_b67_14_025708_data_rsn6027_seq1_ch0_1599057453.h5 # ou: auto_255_125334_b4_rsn13696_seq1_1599045513.h5 FILENAME_PATTERN = re.compile( r'auto_(\d+)_(\d{6})_b(\d+).*?_(\d{10})\.h5$', re.IGNORECASE ) # Dossiers racine contenant les données H5 DATA_ROOTS = [ Path(r"F:\2020-09-12"), Path(r"F:\2020-09-13"), Path(r"F:\2020-09-14"), Path(r"F:\2020-09-15"), Path(r"F:\2020-09-16"), Path(r"F:\2020-09-17"), Path(r"F:\2020-09-18"), Path(r"F:\2020-09-19"), Path(r"F:\2020-09-21"), Path(r"F:\2020-09-22"), Path(r"F:\2020-09-23"), ] # Fichier CSV des positions POSITIONS_CSV = Path(r"F:\Copie de SETE_AUV_DARFV4-Copier(1).csv") # Sortie OUTPUT_INDEX = Path(r"F:\seismic_webapp\data\index.json") def load_node_positions(csv_path: Path) -> Dict[str, Dict[str, Any]]: """ Charge les positions des nodes depuis le CSV. Retourne un dict: node_id -> {easting, northing, depth, ...} """ positions = {} with open(csv_path, 'r', encoding='utf-8', errors='replace') as f: # Sauter les premières lignes d'en-tête (lignes 1-4) lines = f.readlines() # La ligne 4 (index 3) contient les vrais en-têtes if len(lines) < 5: return positions header_line = lines[3] headers = header_line.strip().split(',') # Trouver les indices des colonnes importantes # Utiliser Aslaid (positions réelles mesurées) plutôt que Preplot (planifiées) try: node_code_idx = headers.index('NodeCode') # Priorité aux positions Aslaid (réelles), sinon Preplot (planifiées) if 'Aslaid Easting' in headers: easting_idx = headers.index('Aslaid Easting') northing_idx = headers.index('Aslaid Northing') depth_idx = headers.index('Aslaid Depth') if 'Aslaid Depth' in headers else None print("Utilisation des coordonnées Aslaid (positions réelles)") else: easting_idx = headers.index('Preplot Easting') northing_idx = headers.index('Preplot Northing') depth_idx = headers.index('Preplot Depth') if 'Preplot Depth' in headers else None print("Utilisation des coordonnées Preplot (positions planifiées)") except ValueError as e: print(f"Colonne manquante dans le CSV: {e}") # Fallback sur indices connus (Aslaid) node_code_idx = 3 easting_idx = 9 # Aslaid Easting northing_idx = 10 # Aslaid Northing depth_idx = 11 # Aslaid Depth # Parser les lignes de données (à partir de la ligne 5) for line in lines[4:]: parts = line.strip().split(',') if len(parts) <= max(node_code_idx, easting_idx, northing_idx): continue node_code = parts[node_code_idx].strip() if not node_code or node_code == '': continue try: easting = float(parts[easting_idx]) if parts[easting_idx] else None northing = float(parts[northing_idx]) if parts[northing_idx] else None depth = float(parts[depth_idx]) if depth_idx and parts[depth_idx] else 0.0 except (ValueError, IndexError): continue if easting and northing: positions[node_code] = { 'easting': easting, 'northing': northing, 'depth': depth, } print(f"Chargé {len(positions)} positions de nodes") return positions def scan_h5_files(data_roots: List[Path]) -> Dict[str, Any]: """ Parcourt les dossiers et indexe tous les fichiers H5. Retourne un dict structuré par node_id -> date -> fichiers """ index = {} file_count = 0 for root in data_roots: if not root.exists(): print(f"Dossier non trouvé: {root}") continue print(f"Scan de {root}...") for h5_file in root.rglob("*.h5"): match = FILENAME_PATTERN.search(h5_file.name) if not match: # Essayer un pattern plus simple simple_match = re.search(r'_b(\d+)_.*?(\d{10})\.h5$', h5_file.name, re.IGNORECASE) if simple_match: node_id = simple_match.group(1) timestamp = int(simple_match.group(2)) else: continue else: node_id = match.group(3) timestamp = int(match.group(4)) # Convertir timestamp en date dt = datetime.fromtimestamp(timestamp) date_str = dt.strftime('%Y-%m-%d') # Détecter les canaux disponibles dans le fichier # Pour l'instant on suppose ch0-ch3 par défaut channels = ['ch0', 'ch1', 'ch2', 'ch3'] # Structure: node_id -> date -> liste de fichiers if node_id not in index: index[node_id] = {} if date_str not in index[node_id]: index[node_id][date_str] = [] index[node_id][date_str].append({ 'path': str(h5_file), 'timestamp': timestamp, 'channels': channels, 'size_bytes': h5_file.stat().st_size if h5_file.exists() else 0 }) file_count += 1 print(f"Indexé {file_count} fichiers H5") return index def build_full_index(positions: Dict, files_index: Dict) -> Dict[str, Any]: """ Combine les positions et l'index des fichiers. """ full_index = { 'generated_at': datetime.now().isoformat(), 'sample_rate_hz': 200, 'nodes': {}, 'dates': set(), } # Fusionner les données all_node_ids = set(files_index.keys()) | set(positions.keys()) for node_id in all_node_ids: node_data = { 'id': node_id, 'position': positions.get(node_id, None), 'dates': {} } if node_id in files_index: node_data['dates'] = files_index[node_id] for date_str in files_index[node_id].keys(): full_index['dates'].add(date_str) full_index['nodes'][node_id] = node_data # Convertir le set en liste triée full_index['dates'] = sorted(list(full_index['dates'])) return full_index def main(): print("=== Indexation des fichiers HDF5 sismiques ===\n") # 1. Charger les positions print("1. Chargement des positions des nodes...") positions = load_node_positions(POSITIONS_CSV) # 2. Scanner les fichiers H5 print("\n2. Scan des fichiers H5...") files_index = scan_h5_files(DATA_ROOTS) # 3. Construire l'index complet print("\n3. Construction de l'index...") full_index = build_full_index(positions, files_index) # 4. Sauvegarder print(f"\n4. Sauvegarde vers {OUTPUT_INDEX}...") OUTPUT_INDEX.parent.mkdir(parents=True, exist_ok=True) with open(OUTPUT_INDEX, 'w', encoding='utf-8') as f: json.dump(full_index, f, indent=2, ensure_ascii=False) print(f"\nTerminé! Index généré avec {len(full_index['nodes'])} nodes et {len(full_index['dates'])} dates.") if __name__ == '__main__': main()