Source code for src.calculations

# **************************************************************************** #
#                                                                              #
#                                                         :::      ::::::::    #
#    calculations.py                                    :+:      :+:    :+:    #
#                                                     +:+ +:+         +:+      #
#    By: daniloceano <danilo.oceano@gmail.com>      +#+  +:+       +#+         #
#                                                 +#+#+#+#+#+   +#+            #
#    Created: 2024/02/16 16:42:55 by daniloceano       #+#    #+#              #
#    Updated: 2025/06/13 10:30:11 by daniloceano      ###   ########.fr        #
#                                                                              #
# **************************************************************************** #

import numpy as np
import pandas as pd
import xarray as xr

from metpy.calc import wind_speed

from src.data_object import DataObject    
from src.select_domain import get_domain_limits
from src.visualization import plot_fixed_domain
from src.utils import handle_track_file
from src.utils import find_extremum_coordinates
from src.utils import get_domain_extreme_values
from src.output_management import save_output_track
from src.output_management import save_results_csv
from src.output_management import save_results_netcdf
from src.visualization import hovmoller_mean_zeta

[docs] def CalcZonalAverage(VariableData): """ Computates variable zonal average of some variable, for all z levels and time steps. Source: Brennan, F. E., & Vincent, D. G. (1980). Zonal and Eddy Components of the Synoptic-Scale Energy Budget during Intensification of Hurricane Carmen (1974), Monthly Weather Review, 108(7), 954-965. Retrieved Jan 25, 2022, from: https://journals.ametsoc.org/view/journals/mwre/108/7/1520-0493_1980_108_0954_zaecot_2_0_co_2.xml Parameters ---------- VariableData: xarray.Dataset arrays containing data to be integrated. Requires dimension rlons (longitude in radians) Returns ------- zonal_ave: xarray.Dataset Arrays of zonal avreages for all longitudes from the passed Dataset """ xlength = VariableData['rlons'][-1] - VariableData['rlons'][0] return VariableData.integrate("rlons") / xlength
[docs] def CalcAreaAverage(VariableData, ZonalAverage=False): """ Computates the Area Average of a function. The default is to computate the zonal average and then a meridional average. If the input data is already some sort of zonal quantity (average or not), simply set longitude_indexer to None Source: Brennan, F. E., & Vincent, D. G. (1980). Zonal and Eddy Components of the Synoptic-Scale Energy Budget during Intensification of Hurricane Carmen (1974), Monthly Weather Review, 108(7), 954-965. Retrieved Jan 25, 2022, from: https://journals.ametsoc.org/view/journals/mwre/108/7/1520-0493_1980_108_0954_zaecot_2_0_co_2.xml Parameters ---------- VariableData: xarray.Dataset arrays containing data to be integrated ZonalAverage: string (optional) if passed, it will first compute zonal averages Returns ------- zonal_ave: xarray.Dataset Arrays of area avreages for all latitudes and longitudes from the passed Dataset """ # Compute zonal average if requested if ZonalAverage: ZA = CalcZonalAverage(VariableData) else: ZA = VariableData # Take the area avearge ylength = np.sin( VariableData['rlats'][-1]) - np.sin(VariableData['rlats'][0]) return (ZA * ZA["coslats"]).integrate("rlats") / ylength
[docs] def perform_calculations(input_data, namelist_df, dTdt, dZdt, dQdt, args, app_logger, *outputs): """ Performs meteorological calculations on input data and generates results and figures. Parameters: - input_data (xr.Dataset): Dataset containing meteorological data from a NetCDF file. - namelist_df (pd.DataFrame): DataFrame mapping variable names. - dTdt, dZdt, dQdt: Data arrays representing temperature, vortcity and moisture tendencies, respectively. - args: Command-line arguments or parameters specifying calculation options. - app_logger (Logger): Logger for outputting information and error messages. - outputs (tuple): A tuple containing paths for results and figures directories, and the output file name. The function processes each time step of the input data to calculate various meteorological terms, stores results in CSV files, and generates figures for analysis. It handles domain selection based on tracks, calculates area averages for specified terms, and plots diagnostic figures. """ # Unpack outputs results_subdirectory, figures_subdirectory, outfile_name = outputs app_logger.info(f'Directory where results will be stored: {results_subdirectory}') app_logger.info(f'Directory where figures will be stored: {figures_subdirectory}') app_logger.info(f'Name of the output file with results: {outfile_name}') # Dictionary to save DataArray results and transform into nc later results_nc = {} # Indexers longitude_indexer = namelist_df.loc['Longitude']['Variable'] latitude_indexer = namelist_df.loc['Latitude']['Variable'] time_indexer = namelist_df.loc['Time']['Variable'] vertical_level_indexer = namelist_df.loc['Vertical Level']['Variable'] # Data time steps and pressure levels timesteps = pd.to_datetime(input_data[time_indexer].values) pres_levels = input_data[vertical_level_indexer].values # Load track file, if specified, and check for errors if args.track: track = handle_track_file(input_data, timesteps, longitude_indexer, latitude_indexer, app_logger) # Create a dictionary for saving area averages of each term stored_terms = ['AdvHTemp','AdvVTemp', 'Sigma','Omega','dTdt','ResT', 'Zeta', 'dZdt','AdvHZeta','AdvVZeta', 'vxBeta', 'ZetaDivH','fDivH', 'Tilting', 'ResZ', 'dQdt', 'dQdt_integrated', 'divQ', 'divQ_integrated', 'WaterBudgetResidual', 'WaterBudgetResidual_integrated'] # Create a dataframe for each term results_df_dictionary = {} for term in stored_terms: if term == 'ResQ': results_df_dictionary[term] = {} else: results_df_dictionary[term] = pd.DataFrame(columns=[str(t) for t in timesteps], index=[float(i) for i in pres_levels]) results_nc[term] = [] # Get choosen vertical level from args and convert to Pa plevel_Pa = int(args.level) * 100 # Dictionary for saving track attributes for each timestep output_track_attributes = {} results_keys = ['time', 'central_lat', 'central_lon', 'length', 'width', f'{args.track_vorticity}_zeta_{args.level}', f'{args.track_geopotential}_hgt_{args.level}', f'max_wind_{args.level}'] for key in results_keys: output_track_attributes[key] = [] for time_step in timesteps: itime = str(time_step) datestr = pd.to_datetime(itime).strftime('%Y-%m-%d %HZ') datestr2 = pd.to_datetime(itime).strftime('%Y%m%d%H00') app_logger.info(f'⏳ Processing time step: {datestr}') # Convert the timezone-aware timestamp 't' to a timezone-naive timestamp (still in UTC) t_naive = time_step.tz_localize(None) # Create a DataObject for the current time step, where all the calculations are performed MovingObj = DataObject( input_data.sel({time_indexer:t_naive}), dTdt=dTdt.sel({time_indexer:t_naive}), dZdt=dZdt.sel({time_indexer:t_naive}), dQdt=dQdt.sel({time_indexer:t_naive}), namelist_df=namelist_df, app_logger=app_logger, args=args) # Get variables at choosen pressure level for the current time step iu_plevel = MovingObj.u.sel({vertical_level_indexer:plevel_Pa}) iv_plevel = MovingObj.v.sel({vertical_level_indexer:plevel_Pa}) ight_plevel = MovingObj.GeopotHeight.sel({vertical_level_indexer:plevel_Pa}) iwspd_plevel = wind_speed(iu_plevel, iv_plevel) zeta = MovingObj.Zeta.sel({vertical_level_indexer:plevel_Pa}).metpy.dequantify() lat, lon = input_data[MovingObj.latitude_indexer], input_data[MovingObj.longitude_indexer] # Get domain limits for the current time step variables_at_plevel = [iu_plevel, iv_plevel, zeta, ight_plevel, lat, lon, itime] if 'track' in locals() or 'track' in globals(): current_domain_limits = get_domain_limits(args, *variables_at_plevel, track=track) else: current_domain_limits = get_domain_limits(args, *variables_at_plevel) min_lat, max_lat = current_domain_limits['min_lat'], current_domain_limits['max_lat'] min_lon, max_lon = current_domain_limits['min_lon'], current_domain_limits['max_lon'] central_lat, central_lon = current_domain_limits['central_lat'], current_domain_limits['central_lon'] # Slice variables for the domain limits izeta_plevel_slice = zeta.sel({latitude_indexer:slice(min_lat, max_lat), longitude_indexer:slice(min_lon, max_lon)}) ight_plevel_slice = ight_plevel.sel({latitude_indexer:slice(min_lat, max_lat), longitude_indexer:slice(min_lon, max_lon)}) iwspd_plevel_slice = iwspd_plevel.sel({latitude_indexer:slice(min_lat, max_lat), longitude_indexer:slice(min_lon, max_lon)}) # Get the extreme values at choosen pressure level for the current time step slices_plevel = [izeta_plevel_slice, ight_plevel_slice, iwspd_plevel_slice] if 'track' in locals() or 'track' in globals(): min_max_zeta, min_max_hgt, max_wind = get_domain_extreme_values(itime, args, slices_plevel, track) else: min_max_zeta, min_max_hgt, max_wind = get_domain_extreme_values(itime, args, slices_plevel) # Find position of the extremes lat_slice, lon_slice = izeta_plevel_slice[latitude_indexer], izeta_plevel_slice[longitude_indexer] min_max_zeta_lat, min_max_zeta_lon = find_extremum_coordinates(izeta_plevel_slice, lat_slice, lon_slice, f'{args.track_vorticity}_zeta_{args.level}', args) min_max_hgt_lat, min_max_hgt_lon = find_extremum_coordinates(ight_plevel_slice, lat_slice, lon_slice, f'{args.track_geopotential}_hgt_{args.level}', args) max_wind_lat, max_wind_lon = find_extremum_coordinates(iwspd_plevel_slice, lat_slice, lon_slice, 'max_wind', args) # Print results on screen if args.fixed: app_logger.info(f"Storing results for: {datestr}") WesternLimit = min_lon EasternLimit = max_lon NorthernLimit = max_lat SouthernLimit = min_lat else: # Store system position and attributes length, width = max_lat - min_lat, max_lon - min_lon domain_attributes = [datestr, central_lat, central_lon, length, width, min_max_zeta, min_max_hgt, max_wind] for key,val in zip(results_keys, domain_attributes): output_track_attributes[key].append(val) WesternLimit = float(input_data[longitude_indexer].sel({longitude_indexer: min_lon}, method='nearest')) EasternLimit =float(input_data[longitude_indexer].sel({longitude_indexer: max_lon}, method='nearest')) SouthernLimit = float(input_data[latitude_indexer].sel({latitude_indexer: min_lat}, method='nearest')) NorthernLimit = float(input_data[latitude_indexer].sel({latitude_indexer: max_lat}, method='nearest')) app_logger.info( f"📍 central lat: {central_lat}, central lon: {central_lon}, " f"size: {length} x {width}, " f"lon range: {min_lon} to {max_lon}, " f"lat range: {min_lat} to {max_lat}" ) app_logger.info( f"📍 {int(args.level)} hPa diagnostics --> " f"{args.track_vorticity} ζ: {min_max_zeta:.2e}, " f"{args.track_geopotential} geopotential height: {min_max_hgt:.0f}, " f"max wind speed: {max_wind:.2f}" ) app_logger.info( f"📍 {int(args.level)} hPa positions (lat/lon) --> " f"{args.track_vorticity} ζ: {min_max_zeta_lat:.2f}, {min_max_zeta_lon:.2f}, " f"{args.track_geopotential} geopotential height: {min_max_hgt_lat:.2f}, {min_max_hgt_lon:.2f}, " f"max wind speed: {max_wind_lat:.2f}, {max_wind_lon:.2f}") app_logger.info(f"📊 Storing results for: {datestr}") for term in stored_terms: term_sliced = getattr(MovingObj,term).sel( **{ MovingObj.latitude_indexer:slice(SouthernLimit,NorthernLimit), MovingObj.longitude_indexer: slice(WesternLimit,EasternLimit) } ) if term in ['divQ_integrated', 'dQdt_integrated', 'WaterBudgetResidual_integrated']: results_df_dictionary[term][itime] = float(CalcAreaAverage(term_sliced, ZonalAverage=True)) else: results_df_dictionary[term][itime] = CalcAreaAverage(term_sliced, ZonalAverage=True) # Save figure with box used for computations dict_for_plot = { f'{args.track_vorticity}_zeta_{args.level}': { 'latitude': min_max_zeta_lat, 'longitude': min_max_zeta_lon, 'data': zeta }, f'{args.track_geopotential}_hgt_{args.level}': { 'latitude': min_max_hgt_lat, 'longitude': min_max_hgt_lon, 'data': ight_plevel }, 'max_wind': { 'latitude': max_wind_lat, 'longitude': max_wind_lon, 'data': iwspd_plevel }, 'lat': lat, 'lon': lon, } app_logger.info(f"📊 Saving domain plot for {datestr2}") plot_fixed_domain(current_domain_limits, dict_for_plot, args, results_subdirectory, datestr2, app_logger) if args.save_nc_file: app_logger.debug(f"💾 Storing results for the NetCDF file for {datestr}") # Initializing out_nc on the first time step if time_step == timesteps[0]: # Create the first Dataset with the terms' variables results_ds = [getattr(MovingObj, term).metpy.dequantify().assign_attrs(units='').rename(term) for term in stored_terms] out_nc = xr.merge(results_ds) # Merge the initial variables app_logger.debug(f"Created Xarray dataset for {datestr}") else: # For subsequent time steps, create results_ds and concatenate along the 'time' dimension results_ds = [getattr(MovingObj, term).metpy.dequantify().assign_attrs(units='').rename(term) for term in stored_terms] # Concatenate results_ds along the 'time' dimension out_nc = xr.concat([out_nc, xr.merge(results_ds)], dim='initial_time0_hours') # Concatenate along time app_logger.debug(f"Appended Xarray dataset for {datestr}") # Save system position as a csv file for replicability if not args.fixed: save_output_track(output_track_attributes, args, results_subdirectory, figures_subdirectory, outfile_name, app_logger) hovmoller_mean_zeta(results_df_dictionary['Zeta'], figures_subdirectory, app_logger) app_logger.info(f"💾 Saving CSV results for {outfile_name}") save_results_csv(results_df_dictionary, results_subdirectory, app_logger) if args.save_nc_file: save_results_netcdf(out_nc, results_subdirectory, outfile_name, app_logger)