"""Validate simulated hazard based on floodmarks."""
import logging
from pathlib import Path
from typing import Literal, Union
import geopandas as gpd
import hydromt
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
from pydantic import PositiveInt
from shapely.geometry import Point
from hydroflows._typing import ListOfFloat, OutputDirPath, TupleOfInt
from hydroflows.utils.units import convert_to_meters
from hydroflows.workflow.method import Method
from hydroflows.workflow.method_parameters import Parameters
__all__ = ["FloodmarksValidation", "Input", "Output", "Params"]
logger = logging.getLogger(__name__)
[docs]
class Output(Parameters):
"""Output parameters for the :py:class:`FloodmarksValidation` method."""
validation_scores_geom: Path
"""The path to the geometry file with the derived validation scores."""
validation_scores_csv: Path
"""The path to the CSV file with the derived validation scores."""
[docs]
class Params(Parameters):
"""Parameters for :py:class:`FloodmarksValidation` method."""
out_root: OutputDirPath
"""Root folder to save the derived validation scores."""
waterlevel_col: str
"""The column/attribute name representing the observed water level in the input floodmarks geometry file,
as provided in the :py:class:`Input` class."""
waterlevel_unit: Literal["m", "cm", "mm", "ft", "in"]
"""The unit (length) of the observed floodmarks in the input geometry file,
as provided in the :py:class:`Input` class. Valid options are 'm' for meters
, 'cm' for centimeters, "mm" for milimeters, "ft" for feet and "in" for inches."""
filename: str = "validation_scores_floodmarks.csv"
"""The filename for the produced validation scores csv file."""
plot_fig: bool = True
"""Determines whether to plot a figure, with the derived
validation scores and the difference between observed and simulated
values with color bins geographically."""
# Plotting settings params
bins: ListOfFloat = None
"""Custom bin edges for categorizing the the difference between
observed and simulated values. If None (default), calculated based on `nbins`,
`vmin` and `vmax`."""
nbins: PositiveInt = 5
"""Number of bins for grouping the difference between observed and
simulated values (for color mapping)."""
vmin: Union[float, int] = None
"""Minimum value for the color scale. If None (default),
the (rounded) minimum value of difference (observed - simulated) is used."""
vmax: Union[float, int] = None
"""Maximum value for the color scale. If None (default),
the maximum value of difference (observed - simulated) is used."""
cmap: str = "bwr"
"""Colormap used for visualizing the difference (observed - simulated) values.
Default is 'bwr' (blue-white-red)."""
zoomlevel: str = "auto"
"""Zoomlevel, by default 'auto'. """
figsize: TupleOfInt = (12, 7)
"""Figure size, by default (12,7)."""
bmap: str = None
"""Background map souce name, by default None
Default image tiles "sat", and "osm" are fetched from cartopy image tiles.
If contextily is installed, xyzproviders tiles can be used as well."""
region: Path = None
"""File path to the geometry file representing the area used for hazard simulation.
This is only used for visualization pursposes. An example of this file could be a GeoJSON of the SFINCS region.
"""
[docs]
class FloodmarksValidation(Method):
"""Validate simulated hazard based on floodmarks.
Parameters
----------
floodmarks_geom : Path
Path to the geometry file (shapefile, GeoJSON or GeoPackage) with floodmark locations as
points. The corresponding water levels are defined by the property specified
in :py:attr:`waterlevel_col`.
flood_hazard_map : Path
The file path to the flood hazard map to be used for validation.
out_root : Path, optional
The root folder to save the derived validation scores, by default "data/validation".
waterlevel_col : Str
The property name for the observed water levels in the floodmarks geometry file
waterlevel_unit : Literal["m", "cm", "mm", "ft", "in"]
Obsevred floodmarks unit. Valid options are 'm' for meters,
'cm' for centimeters, 'ft' for feet and 'in' for inches .
**params
Additional parameters to pass to the FloodmarksValidation instance.
See Also
--------
:py:class:`FloodmarksValidation Input <hydroflows.methods.validation.floodmarks_validation.Input>`
:py:class:`FloodmarksValidation Output <hydroflows.methods.validation.floodmarks_validation.Output>`
:py:class:`FloodmarksValidation Params <hydroflows.methods.validation.floodmarks_validation.Params>`
"""
name: str = "floodmarks_validation"
_test_kwargs = {
"floodmarks_geom": Path("floodmarks.geojson"),
"flood_hazard_map": Path("hazard_map_output.tif"),
"waterlevel_col": "water_level_obs",
"waterlevel_unit": "m",
}
def __init__(
self,
floodmarks_geom: Path,
flood_hazard_map: Path,
waterlevel_col: str,
waterlevel_unit: Literal["m", "cm", "mm", "ft", "in"],
out_root: Path = Path("data/validation"),
**params,
):
self.params: Params = Params(
out_root=out_root,
waterlevel_col=waterlevel_col,
waterlevel_unit=waterlevel_unit,
**params,
)
self.input: Input = Input(
floodmarks_geom=floodmarks_geom,
flood_hazard_map=flood_hazard_map,
)
self.output: Output = Output(
validation_scores_geom=self.params.out_root
/ f"{self.input.floodmarks_geom.stem}_validation.gpkg",
validation_scores_csv=self.params.out_root / self.params.filename,
)
def _run(self):
"""Run the FloodmarksValidation method."""
# Read the floodmarks and the region files
gdf = gpd.read_file(self.input.floodmarks_geom)
# Read the floodmap using HydroMT
floodmap = hydromt.io.open_raster(self.input.flood_hazard_map)
proj_crs = floodmap.raster.crs
if not proj_crs.is_projected and proj_crs.to_epsg() is None:
raise ValueError("Input hazard map needs to be georeferenced.")
if gdf.crs != proj_crs:
gdf = gdf.to_crs(proj_crs)
# Include flood marks that fall within the flood_bounds_gdf
gdf_in_region = gdf[gdf.geometry.within(floodmap.raster.box.union_all())]
# Number of points inside (outside) the modeled region
num_floodmarks_inside = len(gdf_in_region)
num_floodmarks_outside = len(gdf) - num_floodmarks_inside
if num_floodmarks_inside == 0:
raise ValueError(
"No floodmarks found within the modeled flood hazard map extents."
)
logging.info(
f"Floodmarks inside the simulated region: {num_floodmarks_inside}/{num_floodmarks_outside}"
)
gdf_in_region.loc[:, "geometry"] = gdf_in_region["geometry"].apply(
multipoint_to_point
)
# Sample the floodmap at the floodmark locs
samples: xr.DataArray = floodmap.raster.sample(gdf_in_region)
# Assign the modeled values to the gdf
gdf_in_region.loc[:, "modeled_value"] = samples.fillna(0).values
# Make sure the units are in meters
gdf_in_region.loc[:, self.params.waterlevel_col] = gdf_in_region.loc[
:, self.params.waterlevel_col
].apply(lambda x: convert_to_meters(x, self.params.waterlevel_unit))
# Set 'is_flooded' to True if 'modeled_value' is greater than 0
gdf_in_region.loc[:, "is_flooded"] = gdf_in_region["modeled_value"] > 0
# Calculate the difference between observed and modeled values
gdf_in_region.loc[:, "difference"] = (
gdf_in_region[self.params.waterlevel_col] - gdf_in_region["modeled_value"]
)
# Calculate the abs. difference between observed and modeled values
gdf_in_region.loc[:, "abs_difference"] = gdf_in_region.loc[
:, "difference"
].abs()
# Calculate RMSE and R²
rmse = RMSE(
gdf_in_region[self.params.waterlevel_col],
gdf_in_region["modeled_value"],
)
r2 = R2(
gdf_in_region[self.params.waterlevel_col].values,
gdf_in_region["modeled_value"].values,
)
# Create a df with the scores and convert it to a csv
metrics = {"rmse": [rmse], "r2": [r2]}
df_metrics = pd.DataFrame(metrics)
df_metrics.to_csv(self.output.validation_scores_csv, index=False)
# Export the validated gdf
gdf_in_region.to_file(self.output.validation_scores_geom, driver="GPKG")
fig_root = self.output.validation_scores_geom.parent
# save plot
if self.params.plot_fig:
# create a folder to save the figs
plot_dir = Path(fig_root, "figs")
plot_dir.mkdir(exist_ok=True)
_plot_scores(
scores_gdf=gdf_in_region,
floodmap_ds=floodmap,
region=self.params.region,
nbins=self.params.nbins,
rmse=rmse,
r2=r2,
path=Path(plot_dir, "validation_scores.png"),
bins=self.params.bins,
vmin=self.params.vmin,
vmax=self.params.vmax,
cmap=self.params.cmap,
figsize=self.params.figsize,
zoomlevel=self.params.zoomlevel,
bmap=self.params.bmap,
)
def multipoint_to_point(geometry):
"""Convert MultiPoint to Point.
This function converts a `MultiPoint` geometry into a single `Point` by taking the first point in the collection.
It ensures that if the input geometry is of type `MultiPoint`, it returns the first point in the geometry collection.
If the geometry is not a `MultiPoint`, it returns the original geometry as it is.
The function also supports 3D geometries (Point with z-coordinate).
Parameters
----------
geometry : shapely.geometry (e.g., Point, MultiPoint)
The input geometry to be evaluated and converted if it is a MultiPoint.
Returns
-------
shapely.geometry.Point or original geometry:
- If the input geometry is a `MultiPoint`, the function returns a `Point` object corresponding to the first point
in the collection (preserving the z-coordinate if present).
- If the input geometry is not a `MultiPoint`, the original geometry is returned unchanged.
"""
if geometry.geom_type == "MultiPoint":
points = geometry.geoms
if len(points) > 0: # Check if there are points in the MultiPoint
return Point(points[0].x, points[0].y, getattr(points[0], "z", 0))
return geometry # Return the original geometry if it's not a MultiPoint
def RMSE(actual, predicted):
"""
Calculate Root Mean Squared Error (RMSE) between actual and predicted values.
Parameters
----------
actual (array-like): Array of actual values.
predicted (array-like): Array of predicted values.
Returns
-------
float: RMSE value.
"""
actual = np.asarray(actual)
predicted = np.asarray(predicted)
mse = np.mean((actual - predicted) ** 2)
rmse = np.sqrt(mse)
return rmse
def R2(actual, predicted):
"""
Calculate R-squared (R²) between actual and predicted values.
Parameters
----------
actual (array-like): Array of actual values.
predicted (array-like): Array of predicted values.
Returns
-------
float: R² value.
"""
actual = np.asarray(actual)
predicted = np.asarray(predicted)
ss_total = np.sum((actual - np.mean(actual)) ** 2)
ss_residual = np.sum((actual - predicted) ** 2)
r2 = 1 - (ss_residual / ss_total)
return r2
def _plot_scores(
scores_gdf,
floodmap_ds: xr.Dataset,
region: Path,
nbins,
rmse,
r2,
path: Path,
bins,
vmin,
vmax,
cmap,
figsize,
zoomlevel,
bmap,
) -> None:
"""Plot scores."""
import cartopy.crs as ccrs
import cartopy.io.img_tiles as cimgt
import contextily as ctx
# Set default values for vmin, vmax, cmap, and bins if not provided
if vmin is None:
vmin = np.floor(scores_gdf["difference"].min())
if vmax is None:
vmax = scores_gdf["difference"].max()
if bins is None:
bins = np.linspace(vmin, vmax, nbins + 1)
bounds = floodmap_ds.raster.box.total_bounds
extent = np.array(bounds)[[0, 2, 1, 3]]
proj_crs = floodmap_ds.raster.crs
proj_str = proj_crs.name
if proj_crs.is_projected and proj_crs.to_epsg() is not None:
crs = ccrs.epsg(floodmap_ds.raster.crs.to_epsg())
unit = proj_crs.axis_info[0].unit_name
unit = "m" if unit == "metre" else unit
xlab, ylab = f"x [{unit}] - {proj_str}", f"y [{unit}]"
elif proj_crs.is_geographic:
crs = ccrs.PlateCarree()
xlab, ylab = f"lon [deg] - {proj_str}", "lat [deg]"
if zoomlevel == "auto": # auto zoomlevel
c = 2 * np.pi * 6378137 # Earth circumference
lat = np.array(floodmap_ds.raster.transform_bounds(4326))[[1, 3]].mean()
# max 4 x 4 tiles per image
tile_size = max(bounds[2] - bounds[0], bounds[3] - bounds[1]) / 4
if proj_crs.is_geographic:
tile_size = tile_size * 111111
zoomlevel = int(np.log2(c * abs(np.cos(lat)) / tile_size))
# sensible range is 9 (large metropolitan area) - 16 (street)
zoomlevel = min(16, max(9, zoomlevel))
fig = plt.figure(figsize=figsize)
ax = plt.subplot(projection=crs)
ax.set_extent(extent, crs=crs)
ax.set_title("Validation against flood marks", size=14)
if bmap is not None:
if zoomlevel == "auto": # auto zoomlevel
c = 2 * np.pi * 6378137 # Earth circumference
lat = np.array(floodmap_ds.raster.transform_bounds(4326))[[1, 3]].mean()
# max 4 x 4 tiles per image
tile_size = max(bounds[2] - bounds[0], bounds[3] - bounds[1]) / 4
if proj_crs.is_geographic:
tile_size = tile_size * 111111
zoomlevel = int(np.log2(c * abs(np.cos(lat)) / tile_size))
# sensible range is 9 (large metropolitan area) - 16 (street)
zoomlevel = min(16, max(9, zoomlevel))
# short names for cartopy image tiles
bmap = {"sat": "QuadtreeTiles", "osm": "OSM"}.get(bmap, bmap)
bmap_kwargs = {}
if bmap in list(ctx.providers.flatten()):
bmap_kwargs = dict(zoom=zoomlevel, **bmap_kwargs)
ctx.add_basemap(ax, crs=crs, source=bmap, **bmap_kwargs)
elif hasattr(cimgt, bmap):
bmap_img = getattr(cimgt, bmap)(**bmap_kwargs)
ax.add_image(bmap_img, zoomlevel)
else:
err = f"Unknown background map: {bmap}"
raise ValueError(err)
# Add a region plot in case there is one
if region is not None:
region_gdf = gpd.read_file(region)
region_gdf = region_gdf.to_crs(proj_crs)
region_gdf.plot(
ax=ax,
color="grey",
edgecolor="black",
linewidth=2,
alpha=0.3,
label="Region",
)
cmap = plt.get_cmap(cmap)
norm = mcolors.BoundaryNorm(bins, cmap.N)
sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
sm.set_array([])
# Bin the 'difference' values and plot
scores_gdf["binned"] = pd.cut(scores_gdf["difference"], bins=bins, labels=False)
if len(scores_gdf[scores_gdf["binned"].isna() == True]) > 0:
logger.warning(
"There are flood mark difference (observed - modeled) values that fall outside "
"the determined bin range and thus were excluded from the plot. "
"Consider adjusting the plotting settings.",
stacklevel=2,
)
for bin_val in range(len(bins) - 1):
subset = scores_gdf[scores_gdf["binned"] == bin_val]
# Skip plotting if the subset is empty
if subset.empty:
continue
color = cmap(norm(bins[bin_val]))
subset.plot(ax=ax, color=color, linewidth=1, edgecolor="black", markersize=100)
# Add the color bar for the bins
cbar = plt.colorbar(sm, ax=ax, shrink=0.75, orientation="vertical")
cbar.set_label("Difference (Observed - Modeled) [m]")
textstr = "\n".join(
(
f"Min Abs. Difference: {scores_gdf['abs_difference'].min():.2f} m",
f"Max Abs. Difference: {scores_gdf['abs_difference'].max():.2f} m",
f"RMSE: {rmse:.2f} m",
f"R$^2$: {r2:.2f}",
)
)
# matplotlib.patch.Patch properties
props = dict(boxstyle="round", facecolor="white", alpha=0.8)
ax.text(
0.02,
0.98,
textstr,
transform=ax.transAxes,
fontsize=12,
verticalalignment="top",
bbox=props,
)
ax.xaxis.set_visible(True)
ax.yaxis.set_visible(True)
ax.set_ylabel(ylab)
ax.set_xlabel(xlab)
fig.savefig(path, dpi=150, bbox_inches="tight")