#!/usr/bin/env python3 """ Script pour générer un inventaire HTML de tous les fichiers HDF5. Affiche: numéro de bumper, canal, date/heure début, date/heure fin, durée, nombre d'échantillons. """ import os import sys import json import h5py import re from datetime import datetime from pathlib import Path from collections import defaultdict # Configuration SAMPLE_RATE = 200 # Hz DATA_DIRS = [ r"F:\2020-09-11", r"E:\2020-09-11", r"E:\2020-09-14", ] def parse_filename(filename): """ Parse le nom de fichier HDF5 pour extraire les infos. Formats supportes: - auto_260_061316_b0_13_212626_data_rsn84614_seq1_ch0_1598976585.h5 (bumper = 13) - auto_255_061140_b119_12_230609_data_rsn5725_seq1_ch0_1599065292.h5 (bumper = 119) """ bumper_id = None # Format 1: _b0_XX_ (ex: _b0_13_) bumper_match = re.search(r'_b0_(\d+)_', filename) if bumper_match: bumper_id = bumper_match.group(1) else: # Format 2: _bXXX_ (ex: _b119_) bumper_match = re.search(r'_b(\d+)_', filename) if bumper_match: bumper_id = bumper_match.group(1) # Extraire le canal (ch0, ch1, ch2, ch3, ch5, ch6, ch7, ch15) channel_match = re.search(r'_(ch\d+)_', filename) channel = channel_match.group(1) if channel_match else None # Extraire l'epoch time (dernier nombre avant .h5) epoch_match = re.search(r'_(\d{10})\.h5$', filename) epoch_time = int(epoch_match.group(1)) if epoch_match else None # Type de fichier (data ou aux) file_type = 'data' if '_data_' in filename else 'aux' if '_aux_' in filename else 'unknown' return { 'bumper_id': bumper_id, 'channel': channel, 'epoch_time': epoch_time, 'file_type': file_type } def get_hdf5_info(filepath): """ Ouvre le fichier HDF5 et récupère le nombre d'échantillons. """ try: with h5py.File(filepath, 'r') as f: # Chercher le dataset adc_values if 'adc_values' in f: samples = f['adc_values'].shape[0] return {'samples': samples, 'error': None} else: # Lister les datasets disponibles datasets = list(f.keys()) return {'samples': 0, 'error': f'No adc_values, found: {datasets}'} except Exception as e: return {'samples': 0, 'error': str(e)} def format_datetime(epoch_time): """Formate un timestamp en date/heure lisible.""" if not epoch_time: return "N/A" dt = datetime.fromtimestamp(epoch_time) return dt.strftime('%Y-%m-%d %H:%M:%S') def format_duration(seconds): """Formate une durée en heures:minutes:secondes.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours > 0: return f"{hours}h {minutes}m {secs}s" elif minutes > 0: return f"{minutes}m {secs}s" else: return f"{secs}s" def scan_directory(data_dir): """Scanne un répertoire pour trouver tous les fichiers HDF5.""" files = [] data_path = Path(data_dir) / 'data' if not data_path.exists(): print(f" Directory not found: {data_path}") return files for filepath in data_path.glob('*.h5'): files.append(filepath) return files def generate_html(inventory, output_path): """Génère le document HTML.""" # Organiser par bumper puis par canal by_bumper = defaultdict(lambda: defaultdict(list)) for item in inventory: bumper = item['bumper_id'] or 'unknown' channel = item['channel'] or 'unknown' by_bumper[bumper][channel].append(item) # Trier les bumpers numériquement sorted_bumpers = sorted(by_bumper.keys(), key=lambda x: int(x) if x.isdigit() else 999) # Statistiques globales total_files = len(inventory) total_samples = sum(i['samples'] for i in inventory) total_duration = total_samples / SAMPLE_RATE total_errors = sum(1 for i in inventory if i['error']) # Compter par canal channel_stats = defaultdict(lambda: {'files': 0, 'samples': 0, 'bumpers': set()}) for item in inventory: ch = item['channel'] or 'unknown' channel_stats[ch]['files'] += 1 channel_stats[ch]['samples'] += item['samples'] if item['bumper_id']: channel_stats[ch]['bumpers'].add(item['bumper_id']) html = f""" Inventaire Fichiers HDF5 Sismiques

📊 Inventaire Fichiers HDF5 Sismiques

Généré le {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

{total_files}
Fichiers HDF5
{len(sorted_bumpers)}
Bumpers (nodes)
{total_samples:,}
Échantillons total
{format_duration(total_duration)}
Durée totale @ 200Hz
{total_errors}
Erreurs lecture

📡 Résumé par Canal

""" for ch in ['ch0', 'ch1', 'ch2', 'ch3']: stats = channel_stats.get(ch, {'files': 0, 'samples': 0, 'bumpers': set()}) duration = stats['samples'] / SAMPLE_RATE html += f"""

{ch.upper()}

{stats['files']} fichiers
{len(stats['bumpers'])} bumpers
{stats['samples']:,} samples
{format_duration(duration)}
""" html += """

📋 Détail par Bumper

""" for bumper in sorted_bumpers: channels = by_bumper[bumper] for channel in sorted(channels.keys()): items = sorted(channels[channel], key=lambda x: x['epoch_time'] or 0) for item in items: duration_sec = item['samples'] / SAMPLE_RATE end_time = (item['epoch_time'] + duration_sec) if item['epoch_time'] else None error_html = f'
{item["error"]}
' if item['error'] else '' html += f""" """ html += """
Bumper Canal Type Début (epoch) Début (date/heure) Fin (date/heure) Durée Samples Fichier
b{bumper} {channel.upper()} {item['file_type'].upper()} {item['epoch_time'] or 'N/A'} {format_datetime(item['epoch_time'])} {format_datetime(end_time)} {format_duration(duration_sec)} {item['samples']:,} {item['filename']}{error_html}
""" with open(output_path, 'w', encoding='utf-8') as f: f.write(html) print(f"\nHTML genere: {output_path}") def main(): print("=" * 60) print("INVENTAIRE DES FICHIERS HDF5 SISMIQUES") print("=" * 60) # Charger l'index existant pour connaître tous les répertoires index_path = Path(r"F:\seismic_webapp\data\index.json") all_dirs = set() if index_path.exists(): with open(index_path, 'r') as f: index = json.load(f) # Récupérer tous les répertoires de dates for node_data in index.get('nodes', {}).values(): for files_list in node_data.get('dates', {}).values(): # files_list est une liste de fichiers directement if isinstance(files_list, list): for file_info in files_list: file_path = Path(file_info.get('path', '')) if file_path.parent.parent.exists(): all_dirs.add(str(file_path.parent.parent)) # Ajouter les répertoires par défaut for d in DATA_DIRS: if Path(d).exists(): all_dirs.add(d) print(f"\nRépertoires à scanner: {len(all_dirs)}") for d in sorted(all_dirs): print(f" - {d}") # Scanner tous les fichiers inventory = [] for data_dir in sorted(all_dirs): print(f"\nScanning {data_dir}...") files = scan_directory(data_dir) print(f" Found {len(files)} HDF5 files") for i, filepath in enumerate(files): if i % 50 == 0: print(f" Processing {i}/{len(files)}...") parsed = parse_filename(filepath.name) hdf5_info = get_hdf5_info(filepath) inventory.append({ 'filepath': str(filepath), 'filename': filepath.name, 'directory': data_dir, 'bumper_id': parsed['bumper_id'], 'channel': parsed['channel'], 'epoch_time': parsed['epoch_time'], 'file_type': parsed['file_type'], 'samples': hdf5_info['samples'], 'error': hdf5_info['error'] }) print(f"\nTotal: {len(inventory)} fichiers") # Générer le HTML output_path = Path(r"F:\seismic_webapp\inventory.html") generate_html(inventory, output_path) # Aussi sauvegarder en JSON pour référence json_path = Path(r"F:\seismic_webapp\inventory.json") with open(json_path, 'w', encoding='utf-8') as f: json.dump(inventory, f, indent=2, ensure_ascii=False) print(f"JSON genere: {json_path}") if __name__ == '__main__': main()