from datetime import datetime
from typing import Any
import xarray as xr
from fews_openapi_py_client import AuthenticatedClient, Client
from fews_py_wrapper._api import (
Locations,
Parameters,
Taskruns,
TimeSeries,
WhatIfScenarios,
Workflows,
)
from fews_py_wrapper.models import PiLocationsResponse, PiParametersResponse
from fews_py_wrapper.utils import (
convert_timeseries_response_to_xarray,
)
__all__ = ["FewsWebServiceClient"]
[docs]
class FewsWebServiceClient:
"""Client for interacting with FEWS web services."""
client: Client | AuthenticatedClient
def __init__(
self,
base_url: str,
authenticate: bool = False,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
self.base_url = base_url
if authenticate:
if not token:
raise ValueError("Token must be provided for authentication.")
self.authenticate(token, verify_ssl)
else:
self.client = Client(base_url=base_url, verify_ssl=verify_ssl)
[docs]
def authenticate(self, token: str, verify_ssl: bool) -> None:
"""Authenticate with the FEWS web services."""
self.client = AuthenticatedClient(
base_url=self.base_url, token=token, verify_ssl=verify_ssl
)
[docs]
def get_locations(self) -> PiLocationsResponse:
"""Get locations from the FEWS web services as a typed PI model.
Returns:
A validated PI locations response containing location identifiers,
coordinates, names, and optional relations or attributes.
Example:
::
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
locations = client.get_locations()
first_location = locations.locations[0]
print(first_location.location_id)
print(first_location.lat, first_location.lon)
"""
content = Locations().execute(client=self.client, document_format="PI_JSON")
return PiLocationsResponse.model_validate(content)
[docs]
def get_parameters(self) -> PiParametersResponse:
"""Get parameters from the FEWS web services as a typed PI model.
Returns:
A validated PI parameters response containing parameter metadata such
as parameter IDs, units, parameter type, and optional attributes.
Example:
::
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
parameters = client.get_parameters()
first_parameter = parameters.parameters[0]
print(first_parameter.id)
print(first_parameter.unit)
"""
content = Parameters().execute(client=self.client, document_format="PI_JSON")
return PiParametersResponse.model_validate(content)
[docs]
def get_timeseries(
self,
*,
location_ids: list[str] | None = None,
parameter_ids: list[str] | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
to_xarray: bool = False,
document_format: str | None = "PI_JSON",
**kwargs: Any,
) -> xr.Dataset | dict[str, Any]:
"""Get time series data from the FEWS web services.
Args:
location_ids: One or more FEWS location identifiers.
parameter_ids: One or more FEWS parameter identifiers.
start_time: Inclusive start timestamp. Must be timezone-aware.
end_time: Inclusive end timestamp. Must be timezone-aware.
to_xarray: If ``True``, convert the PI JSON response into an
``xarray.Dataset``.
document_format: FEWS response format. ``PI_JSON`` is currently the
supported option.
**kwargs: Additional endpoint arguments accepted by the underlying
FEWS time series endpoint.
Returns:
Either the raw PI JSON response as a dictionary, or an
``xarray.Dataset`` when ``to_xarray=True``.
Example:
Request raw PI JSON for a single parameter and location.
::
from datetime import datetime, timezone
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
response = client.get_timeseries(
location_ids=["Amanzimtoti_River_level"],
parameter_ids=["H.obs"],
start_time=datetime(2025, 3, 14, 10, 0, tzinfo=timezone.utc),
end_time=datetime(2025, 3, 15, 0, 0, tzinfo=timezone.utc),
)
print(response["timeSeries"][0]["header"]["parameterId"])
Request the same data and convert it to ``xarray`` for analysis.
::
dataset = client.get_timeseries(
location_ids=["Amanzimtoti_River_level"],
parameter_ids=["H.obs"],
start_time=datetime(2025, 3, 14, 10, 0, tzinfo=timezone.utc),
end_time=datetime(2025, 3, 15, 0, 0, tzinfo=timezone.utc),
to_xarray=True,
)
print(dataset)
"""
# Collect only non-None keyword arguments
non_none_kwargs = self._collect_non_none_kwargs(
local_kwargs=locals().copy(), pop_kwargs=["to_xarray"]
)
content = TimeSeries().execute(client=self.client, **non_none_kwargs)
if to_xarray:
return convert_timeseries_response_to_xarray(content)
return content
[docs]
def get_taskruns(
self, workflow_id: str, task_ids: list[str] | str | None = None
) -> dict[str, Any]:
"""Get the status of a task run in the FEWS web services."""
if isinstance(task_ids, str):
task_ids = [task_ids]
# Collect only non-None keyword arguments
non_none_kwargs = self._collect_non_none_kwargs(local_kwargs=locals().copy())
return Taskruns().execute(
client=self.client, document_format="PI_JSON", **non_none_kwargs
)
[docs]
def execute_workflow(self, *args: Any, **kwargs: Any) -> None:
"""Execute a workflow in the FEWS web services."""
pass
[docs]
def execute_whatif_scenario(
self,
what_if_template_id: str | None = None,
single_run_what_if: str | None = None,
name: str | None = None,
document_format: str | None = None,
document_version: str | None = None,
) -> dict[str, Any]:
"""Execute a what-if scenario in the FEWS web services."""
return WhatIfScenarios().execute(
client=self.client,
what_if_template_id=what_if_template_id,
single_run_what_if=single_run_what_if,
name=name,
document_format=document_format,
document_version=document_version,
)
[docs]
def get_workflows(self) -> dict[str, Any]:
return Workflows().execute(client=self.client, document_format="PI_JSON")
[docs]
def endpoint_arguments(self, endpoint: str) -> list[str]:
"""Get the arguments for a specific FEWS web service endpoint.
Args:
endpoint: The name of the endpoint, options: "timeseries", "taskruns",
"whatif_scenarios", "workflows".
Returns:
The argument names for the specified endpoint.
"""
if endpoint == "timeseries":
return TimeSeries().input_args()
elif endpoint == "taskruns":
return Taskruns().input_args()
elif endpoint == "whatif_scenarios":
return WhatIfScenarios().input_args()
elif endpoint == "workflows":
return Workflows().input_args()
else:
raise ValueError(f"Unknown endpoint: {endpoint}")
def _collect_non_none_kwargs(
self, local_kwargs: dict[str, Any], pop_kwargs: list[str] | None = None
) -> dict[str, Any]:
"""Collect only non-None keyword arguments."""
local_kwargs.pop("self", None)
for key in pop_kwargs or []:
local_kwargs.pop(key, None)
if "kwargs" in local_kwargs:
extra_kwargs = local_kwargs.pop("kwargs")
if isinstance(extra_kwargs, dict):
local_kwargs.update(extra_kwargs)
return {k: v for k, v in local_kwargs.items() if v is not None}