"""Defines the Event class which is a breakpoint between workflows."""
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional
import geopandas as gpd
import pandas as pd
import yaml
from pydantic import (
BaseModel,
ConfigDict,
FilePath,
SerializerFunctionWrapHandler,
model_serializer,
model_validator,
)
from typing_extensions import TypedDict
from hydroflows.utils.path_utils import abs_to_rel_path, rel_to_abs_path
__all__ = ["EventSet", "Event", "Forcing"]
SERIALIZATION_KWARGS = {"mode": "json", "round_trip": True, "exclude_none": True}
[docs]
class Forcing(BaseModel):
"""A forcing for the event."""
model_config = ConfigDict(extra="forbid")
type: Literal["water_level", "discharge", "rainfall"]
"""The type of the forcing."""
path: FilePath # file must exist
"""The path to the forcing data."""
tstart: Optional[datetime] = None
"""The start date of the forcing data"""
tstop: Optional[datetime] = None
"""The end date of the forcing data"""
scale_mult: Optional[float] = None
"""A multiplicative scale factor for the forcing."""
scale_add: Optional[float] = None
"""An additive scale factor for the forcing."""
locs_path: Optional[FilePath] = None # file must exist
"""The path to the locations file for the forcing data."""
locs_id_col: Optional[str] = None
"""The column in the locations file with the location ID."""
# Excl from serialization
_data_df: Optional[pd.DataFrame] = None
"""The forcing data. This is excluded from serialization."""
_locs_gdf: Optional[gpd.GeoDataFrame] = None
"""Optional field with geolocation of data. This field is excluded from serialization."""
_root: Optional[Path] = None
"""The root directory for the forcing and location data."""
@model_validator(mode="before")
@classmethod
def _set_abs_paths(cls, data: Dict) -> Dict:
"""Set the paths to relative to root if not absolute."""
if isinstance(data, dict) and "_root" in data:
root = Path(data.pop("_root"))
data = rel_to_abs_path(data, root, ["path", "locs_path"])
return data
@model_serializer(mode="wrap", when_used="json")
def _set_rel_paths(self, nxt: SerializerFunctionWrapHandler):
"""Serialize paths as relative to root."""
data = nxt(self)
if self._root:
data = abs_to_rel_path(
data, Path(self._root), keys=["path", "locs_path"], serialize=True
)
return data
[docs]
def read_data(self) -> Any:
"""Read the data."""
# read forcing data
if self.path.suffix == ".csv":
self._read_csv()
else:
# placeholder for other file types
raise NotImplementedError(f"File type {self.path.suffix} not supported.")
# read locations
if self.locs_path is None:
return
if not self.locs_path.exists():
raise IOError(f"Locations file {self.locs_path} does not exist.")
# should be readable by geopandas
self._read_locs_geopandas()
def _read_locs_geopandas(self) -> None:
"""Read the locations file using geopandas."""
gdf = gpd.read_file(self.locs_path)
if self.locs_id_col is not None:
gdf = gdf.set_index(self.locs_id_col)
self._locs_gdf = gdf
def _read_csv(self) -> None:
"""Read the CSV file."""
# read csv; check for datetime index
df: pd.DataFrame = pd.read_csv(self.path, index_col=0, parse_dates=True)
if not df.index.dtype == "datetime64[ns]":
raise ValueError(f"Index of {self.path} is not datetime.")
df = df.sort_index() # make sure it is sorted
# apply scale factor
if self.scale_mult is not None:
df = df * self.scale_mult
if self.scale_add is not None:
df = df + self.scale_add
# clip data to tstart, tstop
if self.tstart is None:
self.tstart = df.index[0]
if self.tstop is None:
self.tstop = df.index[-1]
if self.tstart > df.index[-1] or self.tstop < df.index[0]:
df = df.loc[slice(self.tstart, self.tstop)]
# set data
self._data_df = df
@property
def data(self) -> pd.DataFrame:
"""Return the forcing data."""
if self._data_df is None:
self.read_data()
return self._data_df
@property
def locs(self) -> Optional[gpd.GeoDataFrame]:
"""Return the locations data."""
if self._locs_gdf is None and self.locs_path is not None:
self.read_data()
return self._locs_gdf
[docs]
class Event(BaseModel):
"""A model event.
Examples
--------
The event can be created as follows::
event = Event(
name="event",
forcings=[{"type": "rainfall", "path": "path/to/data.csv"}],
return_period=2,
)
"""
name: str
"""The name of the event."""
root: Optional[Path] = None
"""The root directory for the event forcing."""
forcings: List[Forcing]
"""The list of forcings for the event. Each forcing is a dictionary with
the structure as defined in :py:class:`Forcing`."""
return_period: Optional[float] = None
"""The return period of the event [years]."""
tstart: Optional[datetime] = None
"""The start date of the event."""
tstop: Optional[datetime] = None
"""The end date of the event."""
@model_validator(mode="before")
@classmethod
def _forward_root(cls, data: Dict) -> Dict:
"""Forward root to forcings."""
if "root" in data:
for forcing in data["forcings"]:
forcing["_root"] = data["root"]
return data
[docs]
def to_dict(self, root: Optional[Path] = None, **kwargs) -> dict:
"""Return the Event as a dictionary."""
# set forings root to path.parent
root = root or self.root
if root is not None:
for forcing in self.forcings:
forcing._root = Path(root)
# serialize
kwargs = {**SERIALIZATION_KWARGS, **kwargs}
data = self.model_dump(**kwargs)
# reset forcings root
for forcing in self.forcings:
forcing._root = None
return data
[docs]
def to_yaml(self, path: Path) -> None:
"""Write the Event to a YAML file."""
path = Path(path)
root = self.root
# check if all forcing.path relative to path.parent, if so use path.parent as root
if root is None and all(
forcing.path.is_relative_to(path.parent) for forcing in self.forcings
):
root = path.parent
# serialize
yaml_dict = self.to_dict(root=root)
# remove root if it is the same as path.parent
if "root" in yaml_dict and Path(yaml_dict["root"]) == path.parent:
yaml_dict.pop("root")
# write to file
with open(path, "w") as file:
yaml.safe_dump(yaml_dict, file, sort_keys=False)
[docs]
@classmethod
def from_yaml(cls, path: Path) -> "Event":
"""Create an Event from a YAML file."""
with open(path, "r") as file:
yml_dict = yaml.safe_load(file)
# set root
if "root" not in yml_dict:
yml_dict["root"] = Path(path).parent
return cls(**yml_dict)
[docs]
def set_time_range_from_forcings(self) -> None:
"""Set the time range from the data."""
for forcing in self.forcings:
if forcing.tstart is None or forcing.tstop is None:
continue
if self.tstart is None or self.tstop is None:
self.tstart = forcing.tstart
self.tstop = forcing.tstop
else:
self.tstart = min(self.tstart, forcing.tstart)
self.tstop = max(self.tstop, forcing.tstop)
[docs]
def read_forcing_data(self) -> None:
"""Read all forcings."""
for forcing in self.forcings:
if forcing.data is None:
forcing.read_data()
if self.tstart is None or self.tstop is None:
self.set_time_range_from_forcings()
EventDict = TypedDict("EventDict", {"name": str, "path": FilePath})
[docs]
class EventSet(BaseModel):
"""A dictionary of events, referring to event file names.
Examples
--------
The event set can be created from a YAML file as follows::
EventSet.from_yaml("path/to/eventset.yaml")
The event set can be created from a dictionary as follows::
EventSet(
events=[
{
"name": "event1",
"path": "path/to/event1.yml"
}
],
)
"""
root: Optional[Path] = None
"""The root directory for the event files."""
events: List[EventDict]
"""The list of events. Each event is a dictionary with an event name and reference to an event file. """
@model_validator(mode="before")
@classmethod
def _set_abs_paths(cls, data: Dict) -> Dict:
"""Set the paths to relative to root if not absolute."""
if "root" in data:
root = Path(data["root"])
events = []
for event in data["events"]:
events.append(rel_to_abs_path(event, root, ["path"]))
data["events"] = events
return data
@model_serializer(mode="wrap", when_used="json")
def _set_rel_paths(self, nxt: SerializerFunctionWrapHandler):
"""Serialize paths as relative to root."""
data = nxt(self)
if self.root:
events = []
for event in data["events"]:
events.append(
abs_to_rel_path(event, self.root, keys=["path"], serialize=True)
)
data["events"] = events
data["root"] = self.root.as_posix()
return data
[docs]
@classmethod
def from_yaml(cls, path: Path) -> "EventSet":
"""Create an EventSet from a YAML file."""
with open(path, "r") as file:
yaml_dict = yaml.safe_load(file)
if "root" not in yaml_dict:
yaml_dict["root"] = Path(path).parent
return cls(**yaml_dict)
[docs]
def to_dict(self, root: Optional[Path] = None, **kwargs) -> dict:
"""Return the EventSet as a dictionary."""
# new root
if root:
root = Path(root)
old_root = self.root
self.root = root
kwargs = {**SERIALIZATION_KWARGS, **kwargs}
data = self.model_dump(**kwargs)
# reset root
if root:
self.root = old_root
return data
[docs]
def to_yaml(self, path: Path) -> None:
"""Write the EventSet to a YAML file."""
root = self.root
# check if all events relative to path.parent, if so reset root
if all(event["path"].is_relative_to(path.parent) for event in self.events):
root = path.parent
# serialize
yaml_dict = self.to_dict(root=root)
# remove root if it is the same as path.parent
if "root" in yaml_dict and Path(yaml_dict["root"]) == path.parent:
yaml_dict.pop("root")
# write to file
with open(path, "w") as file:
yaml.safe_dump(yaml_dict, file, sort_keys=False)
[docs]
def get_event(self, name: str, raise_error=False) -> Optional[Event]:
"""Get an event by name.
Parameters
----------
name : str
The name of the event.
raise_error : bool, optional
Raise an error if the event is not found, by default False
and returns None.
"""
for event in self.events:
if event["name"] == name:
return Event.from_yaml(path=event["path"])
if raise_error:
raise ValueError(f"Event {name} not found.")
return None
[docs]
def add_event(self, name: str, path: Path) -> None:
"""Add an event.
name : str
name of the event
path : Path
Path to yaml file with event description
See :class:`Event` for the structure of the data in this path.
"""
event = {"name": name, "path": path}
self.events.append(event)