Files
seisee/scripts/index_h5_files.py

232 lines
8.0 KiB
Python
Executable File

"""
Script d'indexation des fichiers HDF5 sismiques.
Parcourt les dossiers de données, extrait les métadonnées (node_id, date, canaux)
et génère un index JSON utilisé par l'API backend.
"""
import os
import re
import json
import csv
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any
# Pattern pour extraire les infos du nom de fichier
# Exemple: auto_256_070617_b67_14_025708_data_rsn6027_seq1_ch0_1599057453.h5
# ou: auto_255_125334_b4_rsn13696_seq1_1599045513.h5
FILENAME_PATTERN = re.compile(
r'auto_(\d+)_(\d{6})_b(\d+).*?_(\d{10})\.h5$',
re.IGNORECASE
)
# Dossiers racine contenant les données H5
DATA_ROOTS = [
Path(r"F:\2020-09-12"),
Path(r"F:\2020-09-13"),
Path(r"F:\2020-09-14"),
Path(r"F:\2020-09-15"),
Path(r"F:\2020-09-16"),
Path(r"F:\2020-09-17"),
Path(r"F:\2020-09-18"),
Path(r"F:\2020-09-19"),
Path(r"F:\2020-09-21"),
Path(r"F:\2020-09-22"),
Path(r"F:\2020-09-23"),
]
# Fichier CSV des positions
POSITIONS_CSV = Path(r"F:\Copie de SETE_AUV_DARFV4-Copier(1).csv")
# Sortie
OUTPUT_INDEX = Path(r"F:\seismic_webapp\data\index.json")
def load_node_positions(csv_path: Path) -> Dict[str, Dict[str, Any]]:
"""
Charge les positions des nodes depuis le CSV.
Retourne un dict: node_id -> {easting, northing, depth, ...}
"""
positions = {}
with open(csv_path, 'r', encoding='utf-8', errors='replace') as f:
# Sauter les premières lignes d'en-tête (lignes 1-4)
lines = f.readlines()
# La ligne 4 (index 3) contient les vrais en-têtes
if len(lines) < 5:
return positions
header_line = lines[3]
headers = header_line.strip().split(',')
# Trouver les indices des colonnes importantes
# Utiliser Aslaid (positions réelles mesurées) plutôt que Preplot (planifiées)
try:
node_code_idx = headers.index('NodeCode')
# Priorité aux positions Aslaid (réelles), sinon Preplot (planifiées)
if 'Aslaid Easting' in headers:
easting_idx = headers.index('Aslaid Easting')
northing_idx = headers.index('Aslaid Northing')
depth_idx = headers.index('Aslaid Depth') if 'Aslaid Depth' in headers else None
print("Utilisation des coordonnées Aslaid (positions réelles)")
else:
easting_idx = headers.index('Preplot Easting')
northing_idx = headers.index('Preplot Northing')
depth_idx = headers.index('Preplot Depth') if 'Preplot Depth' in headers else None
print("Utilisation des coordonnées Preplot (positions planifiées)")
except ValueError as e:
print(f"Colonne manquante dans le CSV: {e}")
# Fallback sur indices connus (Aslaid)
node_code_idx = 3
easting_idx = 9 # Aslaid Easting
northing_idx = 10 # Aslaid Northing
depth_idx = 11 # Aslaid Depth
# Parser les lignes de données (à partir de la ligne 5)
for line in lines[4:]:
parts = line.strip().split(',')
if len(parts) <= max(node_code_idx, easting_idx, northing_idx):
continue
node_code = parts[node_code_idx].strip()
if not node_code or node_code == '':
continue
try:
easting = float(parts[easting_idx]) if parts[easting_idx] else None
northing = float(parts[northing_idx]) if parts[northing_idx] else None
depth = float(parts[depth_idx]) if depth_idx and parts[depth_idx] else 0.0
except (ValueError, IndexError):
continue
if easting and northing:
positions[node_code] = {
'easting': easting,
'northing': northing,
'depth': depth,
}
print(f"Chargé {len(positions)} positions de nodes")
return positions
def scan_h5_files(data_roots: List[Path]) -> Dict[str, Any]:
"""
Parcourt les dossiers et indexe tous les fichiers H5.
Retourne un dict structuré par node_id -> date -> fichiers
"""
index = {}
file_count = 0
for root in data_roots:
if not root.exists():
print(f"Dossier non trouvé: {root}")
continue
print(f"Scan de {root}...")
for h5_file in root.rglob("*.h5"):
match = FILENAME_PATTERN.search(h5_file.name)
if not match:
# Essayer un pattern plus simple
simple_match = re.search(r'_b(\d+)_.*?(\d{10})\.h5$', h5_file.name, re.IGNORECASE)
if simple_match:
node_id = simple_match.group(1)
timestamp = int(simple_match.group(2))
else:
continue
else:
node_id = match.group(3)
timestamp = int(match.group(4))
# Convertir timestamp en date
dt = datetime.fromtimestamp(timestamp)
date_str = dt.strftime('%Y-%m-%d')
# Détecter les canaux disponibles dans le fichier
# Pour l'instant on suppose ch0-ch3 par défaut
channels = ['ch0', 'ch1', 'ch2', 'ch3']
# Structure: node_id -> date -> liste de fichiers
if node_id not in index:
index[node_id] = {}
if date_str not in index[node_id]:
index[node_id][date_str] = []
index[node_id][date_str].append({
'path': str(h5_file),
'timestamp': timestamp,
'channels': channels,
'size_bytes': h5_file.stat().st_size if h5_file.exists() else 0
})
file_count += 1
print(f"Indexé {file_count} fichiers H5")
return index
def build_full_index(positions: Dict, files_index: Dict) -> Dict[str, Any]:
"""
Combine les positions et l'index des fichiers.
"""
full_index = {
'generated_at': datetime.now().isoformat(),
'sample_rate_hz': 200,
'nodes': {},
'dates': set(),
}
# Fusionner les données
all_node_ids = set(files_index.keys()) | set(positions.keys())
for node_id in all_node_ids:
node_data = {
'id': node_id,
'position': positions.get(node_id, None),
'dates': {}
}
if node_id in files_index:
node_data['dates'] = files_index[node_id]
for date_str in files_index[node_id].keys():
full_index['dates'].add(date_str)
full_index['nodes'][node_id] = node_data
# Convertir le set en liste triée
full_index['dates'] = sorted(list(full_index['dates']))
return full_index
def main():
print("=== Indexation des fichiers HDF5 sismiques ===\n")
# 1. Charger les positions
print("1. Chargement des positions des nodes...")
positions = load_node_positions(POSITIONS_CSV)
# 2. Scanner les fichiers H5
print("\n2. Scan des fichiers H5...")
files_index = scan_h5_files(DATA_ROOTS)
# 3. Construire l'index complet
print("\n3. Construction de l'index...")
full_index = build_full_index(positions, files_index)
# 4. Sauvegarder
print(f"\n4. Sauvegarde vers {OUTPUT_INDEX}...")
OUTPUT_INDEX.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_INDEX, 'w', encoding='utf-8') as f:
json.dump(full_index, f, indent=2, ensure_ascii=False)
print(f"\nTerminé! Index généré avec {len(full_index['nodes'])} nodes et {len(full_index['dates'])} dates.")
if __name__ == '__main__':
main()