# **************************************************************************** #
# #
# ::: :::::::: #
# select_domain.py :+: :+: :+: #
# +:+ +:+ +:+ #
# By: daniloceano <danilo.oceano@gmail.com> +#+ +:+ +#+ #
# +#+#+#+#+#+ +#+ #
# Created: 2023/01/06 12:33:00 by daniloceano #+# #+# #
# Updated: 2025/05/28 08:14:06 by daniloceano ### ########.fr #
# #
# **************************************************************************** #
"""
A module for selecting and visualizing meteorological data domains.
Designed to assist in the analysis and interpretation of atmospheric phenomena.
Author: Danilo Couto de Souza
Affiliation: Universidade de São Paulo (USP), Instituto de Astornomia, Ciências Atmosféricas e Geociências, São Paulo - Brazil
Contact: danilo.oceano@gmail.com
"""
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import numpy as np
import time
from shapely.geometry.polygon import Polygon
import matplotlib.colors as colors
import cmocean.cm as cmo
import matplotlib.ticker as ticker
nclicks = 2
# CRS used by the input data.
# This should remain PlateCarree() because the data coordinates themselves
# are still regular latitude/longitude coordinates.
data_crs = ccrs.PlateCarree()
[docs]
def get_map_crs(args=None):
"""
Return the map projection.
If --keep_longitude is used, the input data may remain in 0–360 longitude.
In this case, centering the map at 180 degrees avoids dateline-related
plotting artifacts in Cartopy.
"""
if args is not None and getattr(args, "keep_longitude", False):
return ccrs.PlateCarree(central_longitude=180)
return ccrs.PlateCarree()
# Transformation function
[docs]
def tellme(s):
"""
Displays a message on the plot title.
Parameters:
- s: The message to display.
"""
plt.title(s, fontsize=16)
plt.draw()
[docs]
def fmt(x, pos):
"""
Formats the colorbar labels.
Parameters:
- x: The value to format.
- pos: The position (unused, but required by FuncFormatter).
Returns:
- Formatted label.
"""
a, b = '{:.2e}'.format(x).split('e')
b = int(b)
return r'${} \times 10^{{{}}}$'.format(a, b)
[docs]
def draw_box(ax, limits, crs=data_crs):
"""
Draws a rectangular box on the plot based on specified limits.
Parameters:
- ax: The matplotlib axes object.
- limits: A dictionary with 'min_lon', 'max_lon', 'min_lat', 'max_lat' keys.
- crs: The coordinate reference system for the plot.
"""
max_lon, min_lon = limits['max_lon'], limits['min_lon']
max_lat, min_lat = limits['max_lat'], limits['min_lat']
pgon = Polygon([(min_lon, min_lat), (min_lon, max_lat), (max_lon, max_lat), (max_lon, min_lat), (min_lon, min_lat)])
ax.add_geometries([pgon], crs=crs, facecolor='None', edgecolor='k', linewidth=3, alpha=1, zorder=3)
[docs]
def plot_zeta(ax, zeta, lat, lon, hgt=None, crs=data_crs):
"""
Plots the vorticity field and optionally the geopotential height.
Parameters:
- ax: The matplotlib axes object for plotting.
- zeta: The vorticity data array.
- lat: Latitude coordinates.
- lon: Longitude coordinates.
- hgt: Optional geopotential height data array.
"""
norm = colors.TwoSlopeNorm(vmin=min(-zeta.max(), zeta.min()), vcenter=0, vmax=max(zeta.max(), -zeta.min()))
cmap = cmo.balance
cf1 = ax.contourf(lon, lat, zeta, cmap=cmap, norm=norm, levels=51, transform=data_crs)
plt.colorbar(cf1, pad=0.1, orientation='vertical', shrink=0.5, format=ticker.FuncFormatter(fmt))
if hgt is not None:
cs = ax.contour(lon, lat, hgt, levels=11, colors='#747578', linestyles='dashed', linewidths=1, transform=crs)
ax.clabel(cs, cs.levels, inline=True, fontsize=10)
[docs]
def map_decorators(ax):
"""
Adds coastlines and gridlines to the map.
Parameters:
- ax: The matplotlib axes object for the map.
"""
ax.coastlines()
gl = ax.gridlines(draw_labels=True, zorder=2, linestyle='dashed', alpha=0.7, linewidth=0.5, color='#383838')
gl.xlabel_style = {'size': 14, 'color': '#383838'}
gl.ylabel_style = {'size': 14, 'color': '#383838'}
gl.top_labels = None
gl.right_labels = None
[docs]
def plot_min_max_zeta(ax, zeta, lat, lon, limits, args):
"""
Plots the minimum or maximum zeta point within a specified domain.
Parameters:
- ax: The matplotlib axes object for plotting.
- zeta: The vorticity data array.
- lat: Latitude coordinates.
- lon: Longitude coordinates.
- limits: A dictionary with 'min_lon', 'max_lon', 'min_lat', 'max_lat' keys specifying the domain.
- args: A Namespace object containing command-line arguments.
"""
max_lon, min_lon = limits['max_lon'], limits['min_lon']
max_lat, min_lat = limits['max_lat'], limits['min_lat']
# Plot mininum zeta point whithin box
izeta = zeta.sel({lon.dims[0]:slice(min_lon,max_lon),
lat.dims[0]:slice(min_lat,max_lat)})
# Select min or max
if args.track_vorticity == "min":
min_max_zeta = izeta.min()
else:
min_max_zeta = izeta.max()
min_max_zeta_loc = izeta.where(izeta==min_max_zeta, drop=True).squeeze()
# sometimes there are multiple minimuns
if min_max_zeta_loc.shape:
if len(min_max_zeta_loc.shape) >1:
for points in min_max_zeta_loc:
for point in points:
ax.scatter(point[lon.dims[0]], point[lat.dims[0]],
marker='o', facecolors='none', linewidth=3,
edgecolor='k', s=200, transform=data_crs)
else:
for point in min_max_zeta_loc:
ax.scatter(point[lon.dims[0]], point[lat.dims[0]],
marker='o', facecolors='none', linewidth=3,
edgecolor='k', s=200, transform=data_crs)
else:
ax.scatter(min_max_zeta_loc[lon.dims[0]], min_max_zeta_loc[lat.dims[0]],
marker='o', facecolors='none', linewidth=3,
edgecolor='k', s=200, transform=data_crs)
[docs]
def initial_domain(zeta, lat, lon, args=None):
"""
Interactively selects an initial spatial domain on a map.
Parameters:
- zeta: The vorticity data array to be plotted for domain selection.
- lat: Latitude coordinates for the plot.
- lon: Longitude coordinates for the plot.
Returns:
- limits: A dictionary containing the selected domain's 'min_lon', 'max_lon', 'min_lat', 'max_lat'.
"""
plt.close('all')
fig = plt.figure(figsize=(10, 8))
map_crs = get_map_crs(args)
ax = plt.axes(projection=map_crs)
fig.add_axes(ax)
ax.set_global()
plot_zeta(ax, zeta, lat, lon, crs=data_crs)
map_decorators(ax)
plt.subplots_adjust(bottom=0, top=1.2)
while True:
pts = []
while len(pts) < nclicks:
tellme('Select an initial spatial domain')
pts = np.asarray(plt.ginput(nclicks, timeout=15,
mouse_stop='MouseButton.MIDDLE'))
if len(pts) < nclicks:
tellme('Too few points, starting over')
time.sleep(1) # Wait a second
xmin,xmax = min([pts[0,0],pts[1,0]]),max([pts[0,0],pts[1,0]])
ymin,ymax = min([pts[0,1],pts[1,1]]),max([pts[0,1],pts[1,1]])
xs, ys = np.array((xmin,xmax)), np.array((ymin,ymax))
lls = coordXform(map_crs, data_crs, xs, ys)
if args is not None and getattr(args, "keep_longitude", False):
lls[:, 0] = lls[:, 0] % 360
min_lon,min_lat = lls[0,0], lls[0,1]
max_lon, max_lat = lls[1,0], lls[1,1]
limits = {'min_lon':min_lon,'max_lon':max_lon,
'min_lat':min_lat, 'max_lat':max_lat}
draw_box(ax, limits, data_crs)
tellme('Happy? Key press any keyboard key for yes, mouse click for no')
if plt.waitforbuttonpress():
break
return limits
[docs]
def draw_box_map(u, v, zeta, hgt, lat, lon, timestr, args):
"""
Draws a map with streamlines and allows for the interactive selection of a domain.
Parameters:
- u: Eastward wind component data array.
- v: Northward wind component data array.
- zeta: Vorticity data array.
- hgt: Geopotential height data array.
- lat: Latitude coordinates.
- lon: Longitude coordinates.
- timestr: The timestep as a string for display.
- args: Command-line arguments.
Returns:
- limits: A dictionary with the selected domain's limits.
"""
plt.close('all')
fig = plt.figure(figsize=(10, 8))
map_crs = get_map_crs(args)
ax = plt.axes(projection=map_crs)
fig.add_axes(ax)
plot_zeta(ax, zeta, lat, lon, hgt, crs=data_crs)
ax.streamplot(lon.values, lat.values, u.values, v.values, color='#2A1D21',
transform=data_crs)
map_decorators(ax)
# Convert to datetime object
try:
date_obj = datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S%z')
except:
date_obj = datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S')
# Format datetime object to desired string format (YYYY-MM-DD HH)
formatted_date_str = date_obj.strftime('%Y-%m-%d %H')
while True:
pts = []
while len(pts) < nclicks:
tellme(f'Select box corners \nModel time step: {formatted_date_str}')
pts = np.asarray(plt.ginput(nclicks, timeout=15,
mouse_stop='MouseButton.MIDDLE'))
if len(pts) < nclicks:
tellme('Too few points, starting over')
time.sleep(1) # Wait a second
xmin,xmax = min([pts[0,0],pts[1,0]]),max([pts[0,0],pts[1,0]])
ymin,ymax = min([pts[0,1],pts[1,1]]),max([pts[0,1],pts[1,1]])
xs, ys = np.array((xmin,xmax)), np.array((ymin,ymax))
lls = coordXform(map_crs, data_crs, xs, ys)
if getattr(args, "keep_longitude", False):
lls[:, 0] = lls[:, 0] % 360
min_lon,min_lat = lls[0,0], lls[0,1]
max_lon, max_lat = lls[1,0], lls[1,1]
limits = {'min_lon':min_lon,'max_lon':max_lon,
'min_lat':min_lat, 'max_lat':max_lat}
draw_box(ax, limits, data_crs)
plot_min_max_zeta(ax, zeta, lat, lon, limits, args)
tellme('Happy? Key press any keyboard key for yes, mouse click for no')
if plt.waitforbuttonpress():
break
return limits
[docs]
def get_domain_limits(args, *variables_at_plevel, track=None):
"""
Determines the domain limits based on track data or user selection.
Parameters:
- args: Command-line arguments or options.
- variables_at_plevel: A tuple containing meteorological variables at chosen pressure level.
- track: Optional DataFrame containing track data for domain selection.
Returns:
- current_domain_limits: A dictionary with the calculated domain limits.
"""
iu_plevel, iv_plevel, zeta, ight_plevel, lat, lon, itime = variables_at_plevel
if args.track:
if 'width'in track.columns:
width, length = track.loc[itime]['width'],track.loc[itime]['length']
else:
width, length = 15, 15
central_lon = track.loc[itime]['Lon']
central_lat = track.loc[itime]['Lat']
min_lon = central_lon-(width/2)
max_lon = central_lon+(width/2)
min_lat = central_lat-(length/2)
max_lat = central_lat+(length/2)
current_domain_limits = {
'min_lon': min_lon,
'max_lon': max_lon,
'min_lat': min_lat,
'max_lat': max_lat,
'central_lat': central_lat,
'central_lon': central_lon
}
elif args.choose:
# Draw maps and ask user to specify corners for specifying the box
limits = draw_box_map(iu_plevel, iv_plevel, zeta, ight_plevel,
lat, lon, itime, args)
# Store system position and attributes
min_lon, max_lon = limits['min_lon'], limits['max_lon']
min_lat, max_lat = limits['min_lat'], limits['max_lat']
width, length = max_lon - min_lon, max_lat - min_lat
central_lat = (max_lat + min_lat) / 2
central_lon = (max_lon + min_lon) / 2
current_domain_limits = {
'min_lon': min_lon,
'max_lon': max_lon,
'min_lat': min_lat,
'max_lat': max_lat,
'central_lat': central_lat,
'central_lon': central_lon
}
elif args.fixed:
dfbox = pd.read_csv('inputs/box_limits',header=None, delimiter=';',index_col=0)
min_lon = float(lon.sel({lon.name:float(dfbox.loc['min_lon'].values.item())}, method='nearest'))
max_lon = float(lon.sel({lon.name:float(dfbox.loc['max_lon'].values.item())}, method='nearest'))
min_lat = float(lat.sel({lat.name:float(dfbox.loc['min_lat'].values.item())}, method='nearest'))
max_lat = float(lat.sel({lat.name:float(dfbox.loc['max_lat'].values.item())}, method='nearest'))
current_domain_limits = {
'min_lon': min_lon,
'max_lon': max_lon,
'min_lat': min_lat,
'max_lat': max_lat,
'central_lat': (max_lat + min_lat) / 2,
'central_lon': (max_lon + min_lon) / 2
}
return current_domain_limits