232 lines
8.0 KiB
Python
Executable File
232 lines
8.0 KiB
Python
Executable File
"""
|
|
Script d'indexation des fichiers HDF5 sismiques.
|
|
Parcourt les dossiers de données, extrait les métadonnées (node_id, date, canaux)
|
|
et génère un index JSON utilisé par l'API backend.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import csv
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any
|
|
|
|
# Pattern pour extraire les infos du nom de fichier
|
|
# Exemple: auto_256_070617_b67_14_025708_data_rsn6027_seq1_ch0_1599057453.h5
|
|
# ou: auto_255_125334_b4_rsn13696_seq1_1599045513.h5
|
|
FILENAME_PATTERN = re.compile(
|
|
r'auto_(\d+)_(\d{6})_b(\d+).*?_(\d{10})\.h5$',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
# Dossiers racine contenant les données H5
|
|
DATA_ROOTS = [
|
|
Path(r"F:\2020-09-12"),
|
|
Path(r"F:\2020-09-13"),
|
|
Path(r"F:\2020-09-14"),
|
|
Path(r"F:\2020-09-15"),
|
|
Path(r"F:\2020-09-16"),
|
|
Path(r"F:\2020-09-17"),
|
|
Path(r"F:\2020-09-18"),
|
|
Path(r"F:\2020-09-19"),
|
|
Path(r"F:\2020-09-21"),
|
|
Path(r"F:\2020-09-22"),
|
|
Path(r"F:\2020-09-23"),
|
|
]
|
|
|
|
# Fichier CSV des positions
|
|
POSITIONS_CSV = Path(r"F:\Copie de SETE_AUV_DARFV4-Copier(1).csv")
|
|
|
|
# Sortie
|
|
OUTPUT_INDEX = Path(r"F:\seismic_webapp\data\index.json")
|
|
|
|
|
|
def load_node_positions(csv_path: Path) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Charge les positions des nodes depuis le CSV.
|
|
Retourne un dict: node_id -> {easting, northing, depth, ...}
|
|
"""
|
|
positions = {}
|
|
|
|
with open(csv_path, 'r', encoding='utf-8', errors='replace') as f:
|
|
# Sauter les premières lignes d'en-tête (lignes 1-4)
|
|
lines = f.readlines()
|
|
|
|
# La ligne 4 (index 3) contient les vrais en-têtes
|
|
if len(lines) < 5:
|
|
return positions
|
|
|
|
header_line = lines[3]
|
|
headers = header_line.strip().split(',')
|
|
|
|
# Trouver les indices des colonnes importantes
|
|
# Utiliser Aslaid (positions réelles mesurées) plutôt que Preplot (planifiées)
|
|
try:
|
|
node_code_idx = headers.index('NodeCode')
|
|
# Priorité aux positions Aslaid (réelles), sinon Preplot (planifiées)
|
|
if 'Aslaid Easting' in headers:
|
|
easting_idx = headers.index('Aslaid Easting')
|
|
northing_idx = headers.index('Aslaid Northing')
|
|
depth_idx = headers.index('Aslaid Depth') if 'Aslaid Depth' in headers else None
|
|
print("Utilisation des coordonnées Aslaid (positions réelles)")
|
|
else:
|
|
easting_idx = headers.index('Preplot Easting')
|
|
northing_idx = headers.index('Preplot Northing')
|
|
depth_idx = headers.index('Preplot Depth') if 'Preplot Depth' in headers else None
|
|
print("Utilisation des coordonnées Preplot (positions planifiées)")
|
|
except ValueError as e:
|
|
print(f"Colonne manquante dans le CSV: {e}")
|
|
# Fallback sur indices connus (Aslaid)
|
|
node_code_idx = 3
|
|
easting_idx = 9 # Aslaid Easting
|
|
northing_idx = 10 # Aslaid Northing
|
|
depth_idx = 11 # Aslaid Depth
|
|
|
|
# Parser les lignes de données (à partir de la ligne 5)
|
|
for line in lines[4:]:
|
|
parts = line.strip().split(',')
|
|
if len(parts) <= max(node_code_idx, easting_idx, northing_idx):
|
|
continue
|
|
|
|
node_code = parts[node_code_idx].strip()
|
|
if not node_code or node_code == '':
|
|
continue
|
|
|
|
try:
|
|
easting = float(parts[easting_idx]) if parts[easting_idx] else None
|
|
northing = float(parts[northing_idx]) if parts[northing_idx] else None
|
|
depth = float(parts[depth_idx]) if depth_idx and parts[depth_idx] else 0.0
|
|
except (ValueError, IndexError):
|
|
continue
|
|
|
|
if easting and northing:
|
|
positions[node_code] = {
|
|
'easting': easting,
|
|
'northing': northing,
|
|
'depth': depth,
|
|
}
|
|
|
|
print(f"Chargé {len(positions)} positions de nodes")
|
|
return positions
|
|
|
|
|
|
def scan_h5_files(data_roots: List[Path]) -> Dict[str, Any]:
|
|
"""
|
|
Parcourt les dossiers et indexe tous les fichiers H5.
|
|
Retourne un dict structuré par node_id -> date -> fichiers
|
|
"""
|
|
index = {}
|
|
file_count = 0
|
|
|
|
for root in data_roots:
|
|
if not root.exists():
|
|
print(f"Dossier non trouvé: {root}")
|
|
continue
|
|
|
|
print(f"Scan de {root}...")
|
|
|
|
for h5_file in root.rglob("*.h5"):
|
|
match = FILENAME_PATTERN.search(h5_file.name)
|
|
if not match:
|
|
# Essayer un pattern plus simple
|
|
simple_match = re.search(r'_b(\d+)_.*?(\d{10})\.h5$', h5_file.name, re.IGNORECASE)
|
|
if simple_match:
|
|
node_id = simple_match.group(1)
|
|
timestamp = int(simple_match.group(2))
|
|
else:
|
|
continue
|
|
else:
|
|
node_id = match.group(3)
|
|
timestamp = int(match.group(4))
|
|
|
|
# Convertir timestamp en date
|
|
dt = datetime.fromtimestamp(timestamp)
|
|
date_str = dt.strftime('%Y-%m-%d')
|
|
|
|
# Détecter les canaux disponibles dans le fichier
|
|
# Pour l'instant on suppose ch0-ch3 par défaut
|
|
channels = ['ch0', 'ch1', 'ch2', 'ch3']
|
|
|
|
# Structure: node_id -> date -> liste de fichiers
|
|
if node_id not in index:
|
|
index[node_id] = {}
|
|
|
|
if date_str not in index[node_id]:
|
|
index[node_id][date_str] = []
|
|
|
|
index[node_id][date_str].append({
|
|
'path': str(h5_file),
|
|
'timestamp': timestamp,
|
|
'channels': channels,
|
|
'size_bytes': h5_file.stat().st_size if h5_file.exists() else 0
|
|
})
|
|
|
|
file_count += 1
|
|
|
|
print(f"Indexé {file_count} fichiers H5")
|
|
return index
|
|
|
|
|
|
def build_full_index(positions: Dict, files_index: Dict) -> Dict[str, Any]:
|
|
"""
|
|
Combine les positions et l'index des fichiers.
|
|
"""
|
|
full_index = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'sample_rate_hz': 200,
|
|
'nodes': {},
|
|
'dates': set(),
|
|
}
|
|
|
|
# Fusionner les données
|
|
all_node_ids = set(files_index.keys()) | set(positions.keys())
|
|
|
|
for node_id in all_node_ids:
|
|
node_data = {
|
|
'id': node_id,
|
|
'position': positions.get(node_id, None),
|
|
'dates': {}
|
|
}
|
|
|
|
if node_id in files_index:
|
|
node_data['dates'] = files_index[node_id]
|
|
for date_str in files_index[node_id].keys():
|
|
full_index['dates'].add(date_str)
|
|
|
|
full_index['nodes'][node_id] = node_data
|
|
|
|
# Convertir le set en liste triée
|
|
full_index['dates'] = sorted(list(full_index['dates']))
|
|
|
|
return full_index
|
|
|
|
|
|
def main():
|
|
print("=== Indexation des fichiers HDF5 sismiques ===\n")
|
|
|
|
# 1. Charger les positions
|
|
print("1. Chargement des positions des nodes...")
|
|
positions = load_node_positions(POSITIONS_CSV)
|
|
|
|
# 2. Scanner les fichiers H5
|
|
print("\n2. Scan des fichiers H5...")
|
|
files_index = scan_h5_files(DATA_ROOTS)
|
|
|
|
# 3. Construire l'index complet
|
|
print("\n3. Construction de l'index...")
|
|
full_index = build_full_index(positions, files_index)
|
|
|
|
# 4. Sauvegarder
|
|
print(f"\n4. Sauvegarde vers {OUTPUT_INDEX}...")
|
|
OUTPUT_INDEX.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(OUTPUT_INDEX, 'w', encoding='utf-8') as f:
|
|
json.dump(full_index, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\nTerminé! Index généré avec {len(full_index['nodes'])} nodes et {len(full_index['dates'])} dates.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|