Source code for protocolo.analisis_escenas_nubosas

# ============================================================================
# SCRIPT DE ANÁLISIS DE ESCENAS NUBOSAS - ESTADÍSTICAS DETALLADAS
# ============================================================================

import os
import glob
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt

# Configuración de MongoDB
client = MongoClient()
database = client.Satelites
db = database.Landsat

# Configuración de rutas
path_base = '/mnt/datos_last/'
nor = os.path.join(path_base, 'nor')
pro = os.path.join(path_base, 'pro')

# ============================================================================
# FUNCIONES DE ANÁLISIS
# ============================================================================

[docs] def verificar_normalizacion_completa(escena_name, ruta_nor): """ Verifica el estado de normalización de una escena. Retorna un diccionario con detalles. """ ruta_completa = os.path.join(ruta_nor, escena_name) if not os.path.exists(ruta_completa): return { 'existe': False, 'bandas_norm': 0, 'total_tifs': 0, 'completa': False } # Buscar archivos normalizados archivos_norm = glob.glob(os.path.join(ruta_completa, '*_grn2_*.tif')) archivos_total = glob.glob(os.path.join(ruta_completa, '*.tif')) return { 'existe': True, 'bandas_norm': len(archivos_norm), 'total_tifs': len(archivos_total), 'completa': len(archivos_norm) >= 4 }
[docs] def verificar_productos_generados(escena_name, ruta_pro): """ Verifica qué productos han sido generados para una escena. """ ruta_escena = os.path.join(ruta_pro, escena_name, escena_name) if not os.path.exists(ruta_escena): return { 'existe': False, 'productos': [], 'total': 0 } productos = { 'rgb': os.path.exists(os.path.join(ruta_escena, f"{escena_name}_rgb.png")), 'flood': os.path.exists(os.path.join(ruta_escena, f"{escena_name}_flood.png")), 'superficie_inundada': os.path.exists(os.path.join(ruta_escena, f"{escena_name}_superficie_inundada.csv")), 'lagunas': os.path.exists(os.path.join(ruta_escena, f"{escena_name}_lagunas.csv")), 'lagunas_principales': os.path.exists(os.path.join(ruta_escena, f"{escena_name}_lagunas_principales.csv")), 'censo': os.path.exists(os.path.join(ruta_escena, f"{escena_name}_censo_aereo.csv")), } return { 'existe': True, 'productos': [k for k, v in productos.items() if v], 'total': sum(productos.values()) }
[docs] def analizar_escenas_nubosas(umbral_nubes=20): """ Genera un análisis completo de las escenas con nubes. """ print("\n" + "="*80) print(f"ANÁLISIS DETALLADO DE ESCENAS CON >{umbral_nubes}% NUBES") print("="*80 + "\n") # Consultar MongoDB query = {'Clouds.cloud_RBIOS': {'$gt': umbral_nubes}} projection = { '_id': 1, 'Clouds.cloud_RBIOS': 1, 'usgs_id': 1, 'Productos': 1 } escenas = list(db.find(query, projection).sort('_id', 1)) print(f"📊 Total de escenas en MongoDB: {len(escenas)}\n") # Analizar cada escena datos = [] for doc in escenas: escena_id = doc['_id'] cloud_value = doc.get('Clouds', {}).get('cloud_RBIOS', 0) anio = escena_id[:4] # Verificar normalización norm_info = verificar_normalizacion_completa(escena_id, nor) # Verificar productos prod_info = verificar_productos_generados(escena_id, pro) datos.append({ 'escena': escena_id, 'anio': anio, 'nubes': cloud_value, 'normalizada': norm_info['completa'], 'bandas_norm': norm_info['bandas_norm'], 'productos_existe': prod_info['existe'], 'num_productos': prod_info['total'], 'productos': ', '.join(prod_info['productos']) if prod_info['productos'] else 'Ninguno' }) # Crear DataFrame df = pd.DataFrame(datos) # ESTADÍSTICAS GENERALES print("\n" + "="*80) print("ESTADÍSTICAS GENERALES") print("="*80) print(f"Total escenas: {len(df)}") print(f"Escenas normalizadas: {df['normalizada'].sum()}") print(f"Escenas NO normalizadas: {(~df['normalizada']).sum()}") print(f"Escenas con productos: {df['productos_existe'].sum()}") print(f"Escenas sin productos: {(~df['productos_existe']).sum()}") print(f"Promedio % nubes: {df['nubes'].mean():.2f}%") print(f"Rango % nubes: {df['nubes'].min():.1f}% - {df['nubes'].max():.1f}%") # ESTADÍSTICAS POR AÑO print("\n" + "="*80) print("ESTADÍSTICAS POR AÑO") print("="*80) print(f"{'Año':<6} {'Total':>6} {'Norm':>6} {'NoNorm':>6} {'ConProd':>8} {'SinProd':>8} {'%Nubes':>8}") print("-"*80) for anio in sorted(df['anio'].unique()): df_anio = df[df['anio'] == anio] total = len(df_anio) norm = df_anio['normalizada'].sum() no_norm = total - norm con_prod = df_anio['productos_existe'].sum() sin_prod = total - con_prod prom_nubes = df_anio['nubes'].mean() print(f"{anio:<6} {total:>6} {norm:>6} {no_norm:>6} {con_prod:>8} {sin_prod:>8} {prom_nubes:>7.1f}%") # ESCENAS LISTAS PARA ENVIAR print("\n" + "="*80) print("ESCENAS LISTAS PARA ENVIAR (normalizadas + productos)") print("="*80) df_listas = df[(df['normalizada']) & (df['productos_existe'])] print(f"\nTotal: {len(df_listas)} escenas") if len(df_listas) > 0: print("\nPrimeras 10:") for idx, row in df_listas.head(10).iterrows(): print(f" {row['escena']} | {row['nubes']:.1f}% | Productos: {row['num_productos']}") if len(df_listas) > 10: print(f" ... y {len(df_listas) - 10} más") # ESCENAS QUE NECESITAN PRODUCTOS print("\n" + "="*80) print("ESCENAS NORMALIZADAS SIN PRODUCTOS (necesitan ejecutar productos.py)") print("="*80) df_sin_prod = df[(df['normalizada']) & (~df['productos_existe'])] print(f"\nTotal: {len(df_sin_prod)} escenas") if len(df_sin_prod) > 0: print("\nPrimeras 10:") for idx, row in df_sin_prod.head(10).iterrows(): print(f" {row['escena']} | {row['nubes']:.1f}% | {row['bandas_norm']} bandas norm") if len(df_sin_prod) > 10: print(f" ... y {len(df_sin_prod) - 10} más") # ESCENAS NO NORMALIZADAS print("\n" + "="*80) print("ESCENAS NO NORMALIZADAS (necesitan ejecutar protocolo)") print("="*80) df_no_norm = df[~df['normalizada']] print(f"\nTotal: {len(df_no_norm)} escenas") if len(df_no_norm) > 0: print("\nPrimeras 10:") for idx, row in df_no_norm.head(10).iterrows(): print(f" {row['escena']} | {row['nubes']:.1f}%") if len(df_no_norm) > 10: print(f" ... y {len(df_no_norm) - 10} más") # GUARDAR RESULTADOS EN CSV csv_output = os.path.join(path_base, 'analisis_escenas_nubosas.csv') df.to_csv(csv_output, index=False) print(f"\n✅ Análisis completo guardado en: {csv_output}") return df
[docs] def generar_reporte_por_anio(anio, umbral_nubes=20): """ Genera un reporte detallado para un año específico. """ print("\n" + "="*80) print(f"REPORTE DETALLADO - AÑO {anio}") print("="*80 + "\n") # Consultar MongoDB query = { 'Clouds.cloud_RBIOS': {'$gt': umbral_nubes}, '_id': {'$regex': f'^{anio}'} } projection = { '_id': 1, 'Clouds.cloud_RBIOS': 1, 'Clouds.cloud_PN': 1, 'usgs_id': 1, } escenas = list(db.find(query, projection).sort('_id', 1)) if not escenas: print(f"No se encontraron escenas para el año {anio}") return print(f"Total de escenas: {len(escenas)}\n") print(f"{'Escena':<20} {'%RBIOS':>8} {'%PN':>8} {'Norm':>6} {'Prod':>6} {'Estado':<20}") print("-"*80) for doc in escenas: escena_id = doc['_id'] cloud_rbios = doc.get('Clouds', {}).get('cloud_RBIOS', 0) cloud_pn = doc.get('Clouds', {}).get('cloud_PN', 0) norm_info = verificar_normalizacion_completa(escena_id, nor) prod_info = verificar_productos_generados(escena_id, pro) estado = [] if norm_info['completa']: estado.append("✓Norm") else: estado.append("✗Norm") if prod_info['existe']: estado.append("✓Prod") else: estado.append("✗Prod") norm_symbol = "✓" if norm_info['completa'] else "✗" prod_symbol = "✓" if prod_info['existe'] else "✗" estado_text = " | ".join(estado) print(f"{escena_id:<20} {cloud_rbios:>7.1f}% {cloud_pn:>7.1f}% {norm_symbol:>6} {prod_symbol:>6} {estado_text:<20}")
[docs] def exportar_lista_para_procesamiento(umbral_nubes=20, tipo='envio'): """ Exporta listas de escenas para diferentes propósitos. tipo: 'envio' - Escenas listas para enviar (normalizadas + productos) 'productos' - Escenas que necesitan generar productos 'normalizacion' - Escenas que necesitan normalizar """ query = {'Clouds.cloud_RBIOS': {'$gt': umbral_nubes}} escenas = list(db.find(query, {'_id': 1}).sort('_id', 1)) lista = [] for doc in escenas: escena_id = doc['_id'] norm_info = verificar_normalizacion_completa(escena_id, nor) prod_info = verificar_productos_generados(escena_id, pro) if tipo == 'envio': if norm_info['completa'] and prod_info['existe']: lista.append(escena_id) elif tipo == 'productos': if norm_info['completa'] and not prod_info['existe']: lista.append(escena_id) elif tipo == 'normalizacion': if not norm_info['completa']: lista.append(escena_id) output_file = os.path.join(path_base, f'lista_{tipo}.txt') with open(output_file, 'w') as f: f.write('\n'.join(lista)) print(f"\n✅ Lista de {len(lista)} escenas guardada en: {output_file}") return lista
# ============================================================================ # EJECUCIÓN # ============================================================================ if __name__ == "__main__": print("="*80) print("ANÁLISIS DE ESCENAS LANDSAT CON ALTA NUBOSIDAD") print("="*80) # Análisis completo df = analizar_escenas_nubosas(umbral_nubes=20) # Exportar listas print("\n" + "="*80) print("GENERANDO LISTAS DE PROCESAMIENTO") print("="*80) exportar_lista_para_procesamiento(umbral_nubes=20, tipo='envio') exportar_lista_para_procesamiento(umbral_nubes=20, tipo='productos') exportar_lista_para_procesamiento(umbral_nubes=20, tipo='normalizacion') # Ejemplo de reporte por año # generar_reporte_por_anio(1985, umbral_nubes=20) print("\n" + "="*80) print("ANÁLISIS COMPLETADO") print("="*80 + "\n")