Source code for src.utils

# **************************************************************************** #
#                                                                              #
#                                                         :::      ::::::::    #
#    utils.py                                           :+:      :+:    :+:    #
#                                                     +:+ +:+         +:+      #
#    By: daniloceano <danilo.oceano@gmail.com>      +#+  +:+       +#+         #
#                                                 +#+#+#+#+#+   +#+            #
#    Created: 2024/02/16 18:31:30 by daniloceano       #+#    #+#              #
#    Updated: 2025/08/15 09:30:24 by daniloceano      ###   ########.fr        #
#                                                                              #
# **************************************************************************** #

import os
import sys
import logging
import numpy as np
import pandas as pd
from metpy.calc import vorticity
from scipy.signal import savgol_filter
from src.select_domain import initial_domain

[docs] def initialize_logging(results_subdirectory, args): """ Initializes the logging configuration for the application. Args: results_subdirectory (str): Directory path to save the log file. args (object): The argparse object containing the command line arguments. """ verbose = True if args.verbose else False # Set root logger to higher severity level (INFO or ERROR) root_log_level = logging.ERROR if not verbose else logging.INFO logging.basicConfig(level=root_log_level, format='%(asctime)s - %(levelname)s - %(message)s') # Create a separate logger for the application app_logger = logging.getLogger('atmos_bud') app_log_level = logging.DEBUG if verbose else logging.INFO app_logger.setLevel(app_log_level) app_logger.propagate = False # Prevent the logger from propagating messages to the root logger # Create file handler for saving logs log_file_name = f'log.{os.path.basename(args.infile).split(".")[0]}' log_file = os.path.join(results_subdirectory, log_file_name) file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8') # Ensure file uses UTF-8 file_handler.setLevel(app_log_level) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) app_logger.addHandler(file_handler) # Create a console handler for app logger with UTF-8 encoding console_handler = logging.StreamHandler(stream=sys.stdout) # Use sys.stdout explicitly console_handler.setLevel(app_log_level) console_handler.setFormatter(file_formatter) # Set UTF-8 encoding for console output if sys.platform.startswith('win'): # On Windows, wrap the stream to ensure UTF-8 encoding console_handler.stream = open(sys.stdout.fileno(), mode='w', encoding='utf-8', errors='replace') app_logger.addHandler(console_handler) return app_logger
[docs] def convert_lon(input_data, longitude_indexer): """ Convert longitudes from 0:360 range to -180:180 Parameters ---------- xr : xarray.DataArray gridded data. longitude_indexer : str corrdinate indexer used for longitude. Returns ------- xr : xarray.DataArray gridded data with longitude converted to desired format. """ input_data.coords[longitude_indexer] = (input_data.coords[longitude_indexer] + 180) % 360 - 180 input_data = input_data.sortby(input_data[longitude_indexer]) return input_data
[docs] def handle_track_file(input_data, times, longitude_indexer, latitude_indexer, app_logger): """ Handles the track file by validating its time and spatial limits against the provided dataset. Args: data (xr.Dataset): A Xarray Dataset containing the data to compute the energy cycle. times (pd.DatetimeIndex): The time series of the dataset. LonIndexer (str): The name of the longitude coordinate in the dataset. LatIndexer (str): The name of the latitude coordinate in the dataset. args (argparse.Namespace): Arguments provided to the script. app_logger (logging.Logger): Logger for logging messages. Returns: pd.DataFrame: DataFrame containing the track information if the track file is valid. Raises: FileNotFoundError: If the track file is not found. ValueError: If the time or spatial limits of the track file do not match the dataset. """ trackfile = "inputs/track" try: track = pd.read_csv(trackfile, parse_dates=[0], delimiter=';', index_col='time') except FileNotFoundError: app_logger.error(f'Track file not found: {trackfile}') raise except pd.errors.ParserError: app_logger.error(f'Error parsing track file: {trackfile}') raise # Remove timezone information from track index if it's tz-aware if track.index.tz is not None: track.index = track.index.tz_localize(None) try: data_lon_max, data_lon_min = float(input_data[longitude_indexer].max()), float(input_data[longitude_indexer].min()) data_lat_max, data_lat_min = float(input_data[latitude_indexer].max()), float(input_data[latitude_indexer].min()) app_logger.debug(f"Data spatial limits --> lon_min: {data_lon_min:.2f}, lon_max: {data_lon_max:.2f}, lat_min: {data_lat_min:.2f}, lat_max: {data_lat_max:.2f}") app_logger.debug(f"Track spatial limits --> lon_min: {track['Lon'].min():.2f}, lon_max: {track['Lon'].max():.2f}, lat_min: {track['Lat'].min():.2f}, lat_max: {track['Lat'].max():.2f}") # Check time if track time limits match with data time limits if track.index[0] < times.min() or track.index[-1] > times.max(): app_logger.error("Track time limits do not match with data time limits.") raise ValueError("Track time limits do not match with data time limits.") # Check longitude limits if track['Lon'].max() > data_lon_max: app_logger.error(f"Track file longitude max limit ({track['Lon'].max():.2f}) exceeds data max longitude limit ({data_lon_max:.2f}).") raise ValueError(f"Track file longitude max limit ({track['Lon'].max():.2f}) exceeds data max longitude limit ({data_lon_max:.2f}).") if track['Lon'].min() < data_lon_min: app_logger.error(f"Track file longitude min limit ({track['Lon'].min():.2f}) is below data min longitude limit ({data_lon_min:.2f}).") raise ValueError(f"Track file longitude min limit ({track['Lon'].min():.2f}) is below data min longitude limit ({data_lon_min:.2f}).") # Check latitude limits if track['Lat'].max() > data_lat_max: app_logger.error(f"Track file latitude max limit ({track['Lat'].max():.2f}) exceeds data max latitude limit ({data_lat_max:.2f}).") raise ValueError(f"Track file latitude max limit ({track['Lat'].max():.2f}) exceeds data max latitude limit ({data_lat_max:.2f}).") if track['Lat'].min() < data_lat_min: app_logger.error(f"Track file latitude min limit ({track['Lat'].min():.2f}) is below data min latitude limit ({data_lat_min:.2f}).") raise ValueError(f"Track file latitude min limit ({track['Lat'].min():.2f}) is below data min latitude limit ({data_lat_min:.2f}).") return track except FileNotFoundError: app_logger.error(f"Track file {trackfile} not found.") raise
[docs] def find_extremum_coordinates(ds_data, lat, lon, variable, args): """ Finds the indices of the extremum values for a given variable. Args: ds_data: An xarray DataArray containing the data to compute the energy cycle. lat: An xarray DataArray containing the latitudes of the data. lon: An xarray DataArray containing the longitudes of the data. variable: A string containing the name of the variable to find the indices for. Returns: A tuple containing the indices of the extremum values for the specified variable. """ lat_values = lat.values lon_values = lon.values if (f'{args.track_vorticity}_zeta' in variable): if args.track_vorticity == "min": index = np.unravel_index(np.argmin(ds_data.data), ds_data.shape) else: index = np.unravel_index(np.argmax(ds_data.data), ds_data.shape) elif (f'{args.track_geopotential}_hgt' in variable): if args.track_geopotential == "min": index = np.unravel_index(np.argmin(ds_data.data), ds_data.shape) else: index = np.unravel_index(np.argmax(ds_data.data), ds_data.shape) elif variable == 'max_wind': index = np.unravel_index(np.argmax(ds_data.data), ds_data.shape) else: raise ValueError("Invalid variable specified.") lat_index = index[0] lon_index = index[1] lat_value = lat_values[lat_index] lon_value = lon_values[lon_index] return lat_value, lon_value
[docs] def slice_domain(input_data, args, namelist_df): """ Slices the input dataset according to the specified domain. The domain can be defined based on fixed boundaries, track information, or an interactively selected area. Parameters: - input_data (xr.Dataset): The dataset containing meteorological variables. - args (argparse.Namespace): Parsed command-line arguments indicating the slicing method. - namelist_df (pd.DataFrame): DataFrame mapping variable names to their respective dataset labels. Returns: - xr.Dataset: The sliced dataset within the specified domain. """ # Data indexers longitude_indexer = namelist_df.loc['Longitude']['Variable'] latitude_indexer = namelist_df.loc['Latitude']['Variable'] time_indexer = namelist_df.loc['Time']['Variable'] vertical_level_indexer = namelist_df.loc['Vertical Level']['Variable'] # Get choosen vertical level from args and convert to Pa plevel_Pa = int(args.level) * 100 # Input data limits input_data_lon_min = input_data[longitude_indexer].min().values input_data_lon_max = input_data[longitude_indexer].max().values input_data_lat_min = input_data[latitude_indexer].min().values input_data_lat_max = input_data[latitude_indexer].max().values if args.fixed: dfbox = pd.read_csv('inputs/box_limits', header=None, delimiter=';', index_col=0) dfbox_min_lon = float(dfbox.loc['min_lon'].values.item()) dfbox_max_lon = float(dfbox.loc['max_lon'].values.item()) dfbox_min_lat = float(dfbox.loc['min_lat'].values.item()) dfbox_max_lat = float(dfbox.loc['max_lat'].values.item()) # Check if dfbox limits are within input_data limits if (dfbox_min_lon < input_data_lon_min or dfbox_max_lon > input_data_lon_max or dfbox_min_lat < input_data_lat_min or dfbox_max_lat > input_data_lat_max): raise ValueError("Specified domain limits in 'inputs/box_limits' are outside the range of 'input_data' limits.") WesternLimit = float(input_data[longitude_indexer].sel( {longitude_indexer: dfbox_min_lon}, method='nearest')) EasternLimit = float(input_data[longitude_indexer].sel( {longitude_indexer: dfbox_max_lon}, method='nearest')) SouthernLimit = float(input_data[latitude_indexer].sel( {latitude_indexer: dfbox_min_lat}, method='nearest')) NorthernLimit = float(input_data[latitude_indexer].sel( {latitude_indexer: dfbox_max_lat}, method='nearest')) elif args.track: trackfile = 'inputs/track' track = pd.read_csv(trackfile,parse_dates=[0], delimiter=';',index_col='time') if 'width' in track.columns: max_width = track['width'].max() max_length = track['length'].max() else: max_width = 15 max_length = 15 WesternLimit = track['Lon'].min()-(max_width) - 5 EasternLimit = track['Lon'].max()+(max_width) + 5 SouthernLimit = track['Lat'].min()-(max_length) - 5 NorthernLimit = track['Lat'].max()+(max_length) + 5 elif args.choose: iu_plevel = input_data.isel({time_indexer:0}).sel({vertical_level_indexer:plevel_Pa}, method='nearest' )[namelist_df.loc['Eastward Wind Component']['Variable']] iv_plevel = input_data.isel({time_indexer:0}).sel({vertical_level_indexer:plevel_Pa}, method='nearest' )[namelist_df.loc['Northward Wind Component']['Variable']] zeta = vorticity(iu_plevel, iv_plevel).metpy.dequantify() lat, lon = iu_plevel[latitude_indexer], iu_plevel[longitude_indexer] domain_limits = initial_domain(zeta, lat, lon, args) WesternLimit = domain_limits['min_lon'] EasternLimit = domain_limits['max_lon'] SouthernLimit = domain_limits['min_lat'] NorthernLimit = domain_limits['max_lat'] input_data = input_data.sel( **{latitude_indexer:slice(SouthernLimit,NorthernLimit), longitude_indexer: slice(WesternLimit,EasternLimit)}) return input_data
[docs] def get_domain_extreme_values(itime, args, slices_plevel, track=None): """ Retrieves or calculates extreme values (minimum/maximum vorticity, minimum geopotential height, and maximum wind speed) within a specified domain at chosen pressure level. Parameters: - track (pd.DataFrame): Track data potentially containing extreme values. - itime (str or pd.Timestamp): The specific time step for which to retrieve or calculate extremes. - args (argparse.Namespace): Parsed command-line arguments indicating the slicing method. - slices_plevel (tuple): Tuple containing slices of vorticity, geopotential height, and wind speed at chosen pressure level. Returns: - tuple: Containing minimum/maximum vorticity, minimum geopotential height, and maximum wind speed. """ izeta_plevel_slice, ight_plevel_slice, iwspd_plevel_slice = slices_plevel if args.track: # Check if 'min_max_zeta_plevel', 'min_max_hgt_plevel' and 'max_wind_plevel' columns exists in the track file. # If they exist, then retrieve and convert the value from the track file. # If they do not exist, calculate them. if f'{args.track_vorticity}_zeta_{args.level}' in track.columns: min_max_zeta = float(track.loc[itime][f'{args.track_vorticity}_zeta_{args.level}']) else: if args.track_vorticity == "min": min_max_zeta = float(izeta_plevel_slice.min()) else: min_max_zeta = float(izeta_plevel_slice.max()) if f'{args.track_geopotential}_hgt_{args.level}' in track.columns: min_max_hgt = float(track.loc[itime][f'{args.track_geopotential}_hgt_{args.level}']) else: if args.track_geopotential == "min": min_max_hgt = float(ight_plevel_slice.min()) else: min_max_hgt = float(ight_plevel_slice.max()) if f'max_wind_{args.level}' in track.columns: max_wind = float(track.loc[itime][f'max_wind_{args.level}']) else: max_wind = float(iwspd_plevel_slice.max()) else: if args.track_vorticity == "min": min_max_zeta = float(izeta_plevel_slice.min()) else: min_max_zeta = float(izeta_plevel_slice.max()) min_max_hgt = float(ight_plevel_slice.min()) max_wind = float(iwspd_plevel_slice.max()) return min_max_zeta, min_max_hgt, max_wind