import inspect
from datetime import datetime
from typing import Any
import xarray as xr
from fews_openapi_py_client import AuthenticatedClient, Client
from fews_py_wrapper._api import (
Filters,
Locations,
Parameters,
PostRunTask,
PostTimeSeries,
PostWhatIfScenarios,
Taskruns,
Taskrunstatus,
TimeSeries,
WhatIfScenarios,
WhatIfTemplates,
Workflows,
)
from fews_py_wrapper.models import (
PiFilter,
PiFiltersResponse,
PiLocation,
PiLocationsResponse,
PiParameter,
PiParametersResponse,
PiTaskRun,
PiTaskRunsResponse,
PiTaskRunStatusResponse,
PiWhatIfScenarioDescriptor,
PiWhatIfScenariosResponse,
PiWhatIfTemplate,
PiWhatIfTemplatesResponse,
PiWorkflow,
PiWorkflowsResponse,
)
from fews_py_wrapper.utils import convert_netcdf_zip_response_to_xarray
__all__ = ["FewsWebServiceClient"]
PI_TIMESERIES_DOCUMENT_FORMATS = frozenset({"PI_JSON", "PI_XML", "PI_CSV", "PI_NETCDF"})
[docs]
class FewsWebServiceClient:
"""Client for interacting with FEWS web services."""
client: Client | AuthenticatedClient
def __init__(
self,
base_url: str,
authenticate: bool = False,
token: str | None = None,
verify_ssl: bool = True,
) -> None:
self.base_url = base_url
if authenticate:
if not token:
raise ValueError("Token must be provided for authentication.")
self.authenticate(token, verify_ssl)
else:
self.client = Client(base_url=base_url, verify_ssl=verify_ssl)
[docs]
def authenticate(self, token: str, verify_ssl: bool) -> None:
"""Authenticate with the FEWS web services."""
self.client = AuthenticatedClient(
base_url=self.base_url, token=token, verify_ssl=verify_ssl
)
[docs]
def get_locations(self) -> list[PiLocation]:
"""Get locations from the FEWS web services as a typed PI model.
Returns:
A list of typed PI locations containing location identifiers,
coordinates, names, and optional relations or attributes. Response
envelope metadata such as PI document version is used for validation
but not returned by this simplified public method.
Example:
::
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
locations = client.get_locations()
first_location = locations[0]
print(first_location.location_id)
print(first_location.lat, first_location.lon)
"""
content = Locations().execute(client=self.client, document_format="PI_JSON")
return PiLocationsResponse.model_validate(content).locations
[docs]
def get_parameters(self) -> list[PiParameter]:
"""Get parameters from the FEWS web services as a typed PI model.
Returns:
A list of typed PI parameters containing metadata such as parameter
IDs, units, parameter type, and optional attributes. Response
envelope metadata such as PI document version is used for validation
but not returned by this simplified public method.
Example:
::
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
parameters = client.get_parameters()
first_parameter = parameters[0]
print(first_parameter.id)
print(first_parameter.unit)
"""
content = Parameters().execute(client=self.client, document_format="PI_JSON")
return PiParametersResponse.model_validate(content).parameters
[docs]
def get_timeseries(
self,
*,
location_ids: list[str] | None = None,
parameter_ids: list[str] | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
document_format: str = "PI_NETCDF",
**kwargs: Any,
) -> list[xr.Dataset] | dict[str, Any] | str:
"""Get time series data from the FEWS web services.
Args:
location_ids: One or more FEWS location identifiers.
parameter_ids: One or more FEWS parameter identifiers.
start_time: Inclusive start timestamp. Must be timezone-aware.
end_time: Inclusive end timestamp. Must be timezone-aware.
document_format: FEWS PI response format. Supported values are
``PI_JSON``, ``PI_XML``, ``PI_CSV`` and ``PI_NETCDF``.
Defaults to ``PI_NETCDF`` when omitted.
**kwargs: Additional endpoint arguments accepted by the underlying
FEWS time series endpoint.
Returns:
A list of ``xarray.Dataset`` objects for ``PI_NETCDF`` responses,
preserving the original NetCDF member layout and ZIP member order;
a dictionary for ``PI_JSON``; or a string for ``PI_XML`` and
``PI_CSV``.
Example:
Request time series as NetCDF. The ZIP payload returned by FEWS is
unpacked automatically and returned as a list of
``xarray.Dataset`` objects.
::
from datetime import datetime, timezone
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
datasets = client.get_timeseries(
location_ids=["Amanzimtoti_River_level"],
parameter_ids=["H.obs"],
start_time=datetime(2025, 3, 14, 10, 0, tzinfo=timezone.utc),
end_time=datetime(2025, 3, 15, 0, 0, tzinfo=timezone.utc),
)
first_dataset = datasets[0]
print(first_dataset)
When FEWS writes multiple NetCDF files for a single request, each
member is returned as a separate dataset so you can choose your own
merge strategy.
::
datasets = client.get_timeseries(
location_ids=["Amanzimtoti_River_level"],
parameter_ids=["H.obs"],
start_time=datetime(2025, 3, 14, 10, 0, tzinfo=timezone.utc),
end_time=datetime(2025, 3, 15, 0, 0, tzinfo=timezone.utc),
)
merged = xr.merge(datasets, combine_attrs="override")
print(merged)
Request raw PI JSON explicitly.
::
response = client.get_timeseries(
location_ids=["Amanzimtoti_River_level"],
parameter_ids=["H.obs"],
start_time=datetime(2025, 3, 14, 10, 0, tzinfo=timezone.utc),
end_time=datetime(2025, 3, 15, 0, 0, tzinfo=timezone.utc),
document_format="PI_JSON",
)
print(response["timeSeries"][0]["header"]["parameterId"])
By default, `get_timeseries()` requests `PI_NETCDF` and returns a
`list[xarray.Dataset]`, preserving the original NetCDF member
layout returned by FEWS.
PI JSON responses are returned as raw dictionaries. Use
``PI_NETCDF`` when you want the wrapper to return one or more
``xarray.Dataset`` objects.
"""
document_format_value = getattr(document_format, "value", document_format)
if document_format_value not in PI_TIMESERIES_DOCUMENT_FORMATS:
supported_formats = ", ".join(sorted(PI_TIMESERIES_DOCUMENT_FORMATS))
raise ValueError(
"Unsupported timeseries document_format for this PI-focused wrapper: "
f"{document_format_value}. Supported formats are: {supported_formats}."
)
# Collect only non-None keyword arguments
non_none_kwargs = self._collect_non_none_kwargs(
local_kwargs=locals().copy(),
pop_kwargs=["document_format_value"],
)
content = TimeSeries().execute(client=self.client, **non_none_kwargs)
if document_format_value == "PI_NETCDF":
if not isinstance(content, bytes):
raise ValueError("Expected PI_NETCDF response content as bytes.")
return convert_netcdf_zip_response_to_xarray(content)
if document_format_value == "PI_JSON":
if not isinstance(content, dict):
raise ValueError("Expected PI_JSON response content as a dictionary.")
return content
if not isinstance(content, str):
raise ValueError(
f"Expected {document_format_value} response content as a string."
)
return content
[docs]
def post_timeseries(
self,
*,
pi_time_series_xml_content: str | None = None,
pi_time_series_json_content: str | None = None,
filter_id: str | None = None,
convert_datum: bool | None = None,
) -> str:
"""Write PI time series data to the FEWS web services using POST.
The FEWS ``POST /timeseries`` endpoint writes PI time series that belong
to time series sets configured in the default filter or in the filter
identified by ``filter_id``. Provide the PI XML or PI JSON content to be
written through the dedicated content arguments.
Args:
pi_time_series_xml_content: Optional PI XML payload to write.
pi_time_series_json_content: Optional PI JSON payload to write.
filter_id: Optional FEWS filter identifier restricting which time
series sets may be written.
convert_datum: Optional FEWS convert-datum flag.
Returns:
A PI diagnostic XML string describing the import result.
Example:
Post PI XML content.
::
from pathlib import Path
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
xml_payload = Path("tests/test_data/post_timeseries.xml").read_text(
encoding="utf-8"
)
diag_xml = client.post_timeseries(
pi_time_series_xml_content=xml_payload,
filter_id="MEAS",
)
print(diag_xml)
Post PI JSON content.
::
from pathlib import Path
json_payload = Path("tests/test_data/post_timeseries.json").read_text(
encoding="utf-8"
)
diag_xml = client.post_timeseries(
pi_time_series_json_content=json_payload,
)
print(diag_xml)
"""
if pi_time_series_xml_content is None and pi_time_series_json_content is None:
raise ValueError(
"One of pi_time_series_xml_content or "
"pi_time_series_json_content must be provided."
)
request_body = self._collect_non_none_kwargs(
{
"piTimeSeriesXmlContent": pi_time_series_xml_content,
"piTimeSeriesJsonContent": pi_time_series_json_content,
}
)
endpoint_kwargs = self._collect_non_none_kwargs(
{
"body": request_body,
"filter_id": filter_id,
"convert_datum": convert_datum,
}
)
content = PostTimeSeries().execute(client=self.client, **endpoint_kwargs)
if not isinstance(content, str):
raise ValueError("Expected POST timeseries response content as a string.")
return content
[docs]
def get_filters(
self,
filter_id: str | None = None,
*,
document_format: str | None = "PI_JSON",
document_version: str | None = None,
) -> list[PiFilter] | str:
"""Get filters from the FEWS web services.
Retrieves filters that are subfilters of the default filter. An
existing subfilter ID can be specified to narrow the results.
Args:
filter_id: Optional FEWS filter identifier. When provided, only
subfilters of this filter are returned.
document_format: Response format supported by the FEWS filters
endpoint. Defaults to ``PI_JSON``.
document_version: Optional PI document version.
Returns:
A list of typed PI filters for ``PI_JSON`` by default, or a string
when a text-based format such as ``PI_XML`` is requested. Response
envelope metadata such as PI document version is used for validation
but not returned by this simplified public method.
Example:
Retrieve all available filters.
::
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
filters = client.get_filters()
print(filters[0].id)
Retrieve subfilters of a specific filter.
::
filters = client.get_filters(filter_id="MEAS")
print(list(filters))
"""
endpoint_kwargs = self._collect_non_none_kwargs(
{
"filter_id": filter_id,
"document_format": document_format,
"document_version": document_version,
}
)
content = Filters().execute(client=self.client, **endpoint_kwargs)
if isinstance(content, dict):
return PiFiltersResponse.model_validate(content).filters
if not isinstance(content, str):
raise ValueError("Expected filters response content as a string.")
return content
[docs]
def post_runtask(
self,
*,
workflow_id: str,
start_time: datetime | None = None,
end_time: datetime | None = None,
time_zero: datetime | None = None,
cold_state_id: str | None = None,
scenario_id: str | None = None,
user_id: str | None = None,
description: str | None = None,
run_option: str | None = None,
run_locally_and_promote_to_server: bool | None = None,
pi_parameters_xml_content: str | None = None,
) -> str:
"""Run a one-off FEWS task for a workflow and return the task ID.
This wraps FEWS ``POST /runtask`` using
``application/x-www-form-urlencoded`` request encoding. FEWS returns a
plain-text ``taskId`` that can be used to track the task status.
Args:
workflow_id: Required FEWS workflow identifier.
start_time: Optional workflow start time. Must be timezone-aware.
end_time: Optional workflow end time. Must be timezone-aware.
time_zero: Optional forecast time zero. Must be timezone-aware.
cold_state_id: Optional FEWS cold-state identifier.
scenario_id: Optional FEWS scenario identifier.
user_id: Optional FEWS user identifier.
description: Optional task description stored by FEWS.
run_option: Optional FEWS run option. Supported values are ``all``,
``allmostrecentonly``, and ``alloneatatime``.
run_locally_and_promote_to_server: Optional FEWS execution flag.
pi_parameters_xml_content: Optional PI model parameters XML content.
Returns:
The FEWS task identifier returned by ``POST /runtask``.
Example:
::
from datetime import datetime, timezone
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
task_id = client.post_runtask(
workflow_id="ImportObscape",
start_time=datetime(2025, 3, 18, 15, 0, tzinfo=timezone.utc),
end_time=datetime(2025, 3, 18, 16, 0, tzinfo=timezone.utc),
description="Run ImportObscape once from the wrapper",
run_option="all",
)
print(task_id)
"""
request_body = self._collect_non_none_kwargs(
{
"piParametersXmlContent": pi_parameters_xml_content,
}
)
endpoint_kwargs = self._collect_non_none_kwargs(
{
"workflow_id": workflow_id,
"start_time": start_time,
"end_time": end_time,
"time_zero": time_zero,
"cold_state_id": cold_state_id,
"scenario_id": scenario_id,
"user_id": user_id,
"description": description,
"run_option": run_option,
"run_locally_and_promote_to_server": run_locally_and_promote_to_server,
"body": request_body or None,
}
)
content = PostRunTask().execute(client=self.client, **endpoint_kwargs)
if not isinstance(content, str):
raise ValueError("Expected POST runtask response content as a string.")
return content
[docs]
def get_taskruns(
self,
*,
workflow_id: str,
topology_node_id: str | None = None,
forecast_count: int | str | None = None,
task_run_ids: list[str] | None = None,
scenario_id: str | None = None,
mc_id: str | None = None,
start_forecast_time: datetime | None = None,
end_forecast_time: datetime | None = None,
start_dispatch_time: datetime | None = None,
end_dispatch_time: datetime | None = None,
task_run_status_ids: list[str] | None = None,
only_forecasts: bool | None = None,
task_run_count: int | str | None = None,
only_current: bool | None = None,
document_format: str | None = "PI_JSON",
document_version: str | None = None,
) -> list[PiTaskRun] | str:
"""Get task runs for a FEWS workflow.
Retrieves task runs from FEWS ``GET /taskruns`` for the specified
``workflow_id``, optionally filtered by identifiers, status, forecast
time, or dispatch time.
FEWS returns only forecast task runs by default. As a result,
non-forecast workflows can legitimately produce an empty
``task_runs`` list unless you pass ``only_forecasts=False``.
Args:
workflow_id: Required FEWS workflow identifier.
topology_node_id: Optional FEWS topology-node identifier.
forecast_count: Optional forecast-count filter accepted by FEWS.
task_run_ids: Optional FEWS task-run IDs to filter by.
scenario_id: Optional FEWS scenario identifier.
mc_id: Optional FEWS MC identifier.
start_forecast_time: Optional inclusive forecast-time lower bound.
Must be timezone-aware.
end_forecast_time: Optional inclusive forecast-time upper bound.
Must be timezone-aware.
start_dispatch_time: Optional inclusive dispatch-time lower bound.
Must be timezone-aware.
end_dispatch_time: Optional inclusive dispatch-time upper bound.
Must be timezone-aware.
task_run_status_ids: Optional FEWS task-run status identifiers.
only_forecasts: Optional FEWS forecast-only filter. When omitted,
FEWS may default to returning only forecast task runs.
task_run_count: Optional maximum number of returned task runs.
only_current: Optional FEWS current-only filter.
document_format: Response format. Defaults to ``PI_JSON``.
document_version: Optional PI document version.
Returns:
A list of typed task-run descriptors for ``PI_JSON`` by default, or
a string when a text-based format such as ``PI_XML`` is requested.
Example:
Retrieve the latest forecast task runs for a workflow.
::
taskruns = client.get_taskruns(
workflow_id="ImportObscape",
task_run_count=10,
)
for task_run in taskruns:
print(task_run.id, task_run.status, task_run.dispatch_time)
Retrieve task runs for a non-forecast workflow.
::
taskruns = client.get_taskruns(
workflow_id="ftpClientConfig",
only_forecasts=False,
task_run_count=10,
)
print(taskruns)
Retrieve the raw PI XML response.
::
taskruns_xml = client.get_taskruns(
workflow_id="ImportObscape",
document_format="PI_XML",
)
print(taskruns_xml)
"""
endpoint_kwargs = self._collect_non_none_kwargs(
{
"workflow_id": workflow_id,
"topology_node_id": topology_node_id,
"forecast_count": forecast_count,
"task_run_ids": task_run_ids,
"scenario_id": scenario_id,
"mc_id": mc_id,
"start_forecast_time": start_forecast_time,
"end_forecast_time": end_forecast_time,
"start_dispatch_time": start_dispatch_time,
"end_dispatch_time": end_dispatch_time,
"task_run_status_ids": task_run_status_ids,
"only_forecasts": only_forecasts,
"task_run_count": task_run_count,
"only_current": only_current,
"document_format": document_format,
"document_version": document_version,
}
)
content = Taskruns().execute(client=self.client, **endpoint_kwargs)
if isinstance(content, dict):
return PiTaskRunsResponse.model_validate(content).task_runs
if not isinstance(content, str):
raise ValueError("Expected taskruns response content as a string.")
return content
[docs]
def get_taskrunstatus(
self,
*,
task_id: str,
max_wait_millis: int | str | None = None,
document_format: str | None = "PI_JSON",
document_version: str | None = None,
) -> PiTaskRunStatusResponse:
"""Get the status of a FEWS task run.
Retrieves the status of a FEWS task run from ``GET /taskrunstatus``.
The current OpenAPI specification exposes only ``PI_JSON`` for this
endpoint, and the wrapper returns a typed status response.
Possible FEWS status codes are ``I`` (invalid), ``P`` (pending), ``T``
(terminated), ``R`` (running), ``F`` (failed), ``C`` (completed fully
successful), ``D`` (completed partly successful), ``A`` (approved), and
``B`` (approved partly successful).
Args:
task_id: Required FEWS task identifier returned by ``post_runtask``.
max_wait_millis: Optional FEWS long-poll timeout in milliseconds.
document_format: Response format. The current specification supports
``PI_JSON``.
document_version: Optional PI document version.
Returns:
A validated typed task-run-status response.
Example:
::
task_id = client.post_runtask(workflow_id="ImportObscape")
status = client.get_taskrunstatus(
task_id=task_id,
max_wait_millis=1000,
)
print(status.code, status.description, status.task_run_id)
"""
endpoint_kwargs = self._collect_non_none_kwargs(
{
"task_id": task_id,
"max_wait_millis": max_wait_millis,
"document_format": document_format,
"document_version": document_version,
}
)
content = Taskrunstatus().execute(client=self.client, **endpoint_kwargs)
return PiTaskRunStatusResponse.model_validate(content)
[docs]
def get_whatiftemplates(
self,
*,
what_if_template_id: str | None = None,
document_format: str | None = "PI_JSON",
document_version: str | None = None,
) -> list[PiWhatIfTemplate]:
"""Get FEWS what-if templates.
Retrieves the configured what-if templates from ``GET /whatiftemplates``.
The current OpenAPI specification exposes ``PI_JSON`` for this endpoint,
and the wrapper returns a typed templates response.
Args:
what_if_template_id: Optional FEWS what-if template identifier used
to narrow the response to a single template.
document_format: Response format. The current specification supports
``PI_JSON``.
document_version: Optional PI document version.
Returns:
A list of typed what-if-template descriptors.
Example:
Retrieve all what-if templates.
::
templates = client.get_whatiftemplates()
for template in templates:
print(template.id, template.name)
Retrieve one specific what-if template by ID.
::
templates = client.get_whatiftemplates(
what_if_template_id="sfincs_palmiet_scenario_map",
)
print(templates[0].properties)
"""
endpoint_kwargs = self._collect_non_none_kwargs(
{
"what_if_template_id": what_if_template_id,
"document_format": document_format,
"document_version": document_version,
}
)
content = WhatIfTemplates().execute(client=self.client, **endpoint_kwargs)
return PiWhatIfTemplatesResponse.model_validate(content).templates
[docs]
def get_whatifscenarios(
self,
*,
what_if_template_id: str | None = None,
what_if_scenario_id: str | None = None,
workflow_id: str | None = None,
document_format: str | None = "PI_JSON",
document_version: str | None = None,
) -> list[PiWhatIfScenarioDescriptor]:
"""Get FEWS what-if scenarios.
Retrieves the configured what-if scenarios from ``GET /whatifscenarios``.
The current OpenAPI specification exposes ``PI_JSON`` for this endpoint,
and the wrapper returns a typed scenarios response.
Args:
what_if_template_id: Optional FEWS what-if template identifier used
to return scenarios belonging to one template.
what_if_scenario_id: Optional FEWS what-if scenario identifier used
to return one specific scenario descriptor.
workflow_id: Optional FEWS workflow identifier used to return
scenarios linked to one workflow.
document_format: Response format. The current specification supports
``PI_JSON``.
document_version: Optional PI document version.
Returns:
A list of typed what-if-scenario descriptors.
Example:
Retrieve all what-if scenarios.
::
scenarios = client.get_whatifscenarios()
for scenario in scenarios:
print(scenario.id, scenario.name)
Retrieve one specific what-if scenario by ID.
::
scenarios = client.get_whatifscenarios(
what_if_scenario_id="SA107:2",
)
print(scenarios[0].what_if_template_id)
"""
endpoint_kwargs = self._collect_non_none_kwargs(
{
"what_if_template_id": what_if_template_id,
"what_if_scenario_id": what_if_scenario_id,
"workflow_id": workflow_id,
"document_format": document_format,
"document_version": document_version,
}
)
content = WhatIfScenarios().execute(client=self.client, **endpoint_kwargs)
return PiWhatIfScenariosResponse.model_validate(content).scenario_descriptors
[docs]
def post_whatifscenarios(
self,
*,
what_if_template_id: str | None = None,
single_run_what_if: bool | None = None,
name: str | None = None,
document_format: str | None = "PI_JSON",
document_version: str | None = None,
) -> PiWhatIfScenarioDescriptor:
"""Create a FEWS what-if scenario.
Wraps ``POST /whatifscenarios`` using the parameters currently exposed by
the generated FEWS OpenAPI client. The current specification exposes
query parameters such as ``what_if_template_id``, ``single_run_what_if``,
and ``name``, and returns a JSON descriptor of the created scenario.
Args:
what_if_template_id: Optional FEWS what-if template identifier.
single_run_what_if: Optional FEWS single-run what-if flag.
name: Optional FEWS what-if scenario name.
document_format: Response format. The current specification supports
``PI_JSON``.
document_version: Optional PI document version.
Returns:
A validated typed descriptor for the created what-if scenario.
Example:
::
scenario = client.post_whatifscenarios(
what_if_template_id="sfincs_palmiet_scenario_map",
name="Wrapper what-if scenario",
single_run_what_if=False,
)
print(scenario.id, scenario.name, scenario.what_if_template_id)
Note:
The current generated FEWS OpenAPI client does not expose a request
body for setting scenario properties on ``POST /whatifscenarios``.
This wrapper therefore forwards only the parameters defined by that
client.
"""
endpoint_kwargs = self._collect_non_none_kwargs(
{
"what_if_template_id": what_if_template_id,
"single_run_what_if": single_run_what_if,
"name": name,
"document_format": document_format,
"document_version": document_version,
}
)
content = PostWhatIfScenarios().execute(client=self.client, **endpoint_kwargs)
return PiWhatIfScenarioDescriptor.model_validate(content)
[docs]
def execute_workflow(self, *args: Any, **kwargs: Any) -> str:
"""Backward-compatible alias for :meth:`post_runtask`."""
return self.post_runtask(*args, **kwargs)
[docs]
def get_workflows(
self,
*,
document_format: str | None = "PI_JSON",
document_version: str | None = None,
) -> list[PiWorkflow] | str:
"""Get available FEWS workflows.
Retrieves the default workflow XML files exposed by the FEWS
``/workflows`` endpoint.
Args:
document_format: Response format supported by the FEWS workflows
endpoint. Defaults to ``PI_JSON``.
document_version: Optional PI document version.
Returns:
A list of typed workflow descriptors for ``PI_JSON`` by default, or
a string when a text-based format such as ``PI_XML`` is requested.
Example:
Retrieve the available workflows as PI JSON.
::
client = FewsWebServiceClient(
base_url="https://example.com/FewsWebServices/rest"
)
workflows = client.get_workflows()
print(workflows[0].id)
Retrieve the raw PI XML response.
::
workflows_xml = client.get_workflows(document_format="PI_XML")
print(workflows_xml)
"""
endpoint_kwargs = self._collect_non_none_kwargs(
{
"document_format": document_format,
"document_version": document_version,
}
)
content = Workflows().execute(client=self.client, **endpoint_kwargs)
if isinstance(content, dict):
return PiWorkflowsResponse.model_validate(content).workflows
if not isinstance(content, str):
raise ValueError("Expected workflows response content as a string.")
return content
[docs]
def endpoint_arguments(self, endpoint: str) -> list[str]:
"""Get the arguments for a specific FEWS web service endpoint.
Args:
endpoint: The name of the endpoint, options: ``timeseries``,
``post_timeseries``, ``post_runtask``, ``taskruns``,
``taskrunstatus``, ``whatiftemplates``, ``whatifscenarios``,
``post_whatifscenarios``, ``filters``, and ``workflows``.
Returns:
The argument names for the specified endpoint.
"""
if endpoint == "timeseries":
return TimeSeries().input_args()
elif endpoint == "post_timeseries":
return list(inspect.signature(self.post_timeseries).parameters)
elif endpoint == "post_runtask":
return list(inspect.signature(self.post_runtask).parameters)
elif endpoint == "taskruns":
return list(inspect.signature(self.get_taskruns).parameters)
elif endpoint == "taskrunstatus":
return list(inspect.signature(self.get_taskrunstatus).parameters)
elif endpoint == "whatiftemplates":
return list(inspect.signature(self.get_whatiftemplates).parameters)
elif endpoint == "whatifscenarios":
return list(inspect.signature(self.get_whatifscenarios).parameters)
elif endpoint == "post_whatifscenarios":
return list(inspect.signature(self.post_whatifscenarios).parameters)
elif endpoint == "filters":
return list(inspect.signature(self.get_filters).parameters)
elif endpoint == "workflows":
return list(inspect.signature(self.get_workflows).parameters)
else:
raise ValueError(f"Unknown endpoint: {endpoint}")
def _collect_non_none_kwargs(
self, local_kwargs: dict[str, Any], pop_kwargs: list[str] | None = None
) -> dict[str, Any]:
"""Collect only non-None keyword arguments."""
local_kwargs.pop("self", None)
for key in pop_kwargs or []:
local_kwargs.pop(key, None)
if "kwargs" in local_kwargs:
extra_kwargs = local_kwargs.pop("kwargs")
if isinstance(extra_kwargs, dict):
local_kwargs.update(extra_kwargs)
return {k: v for k, v in local_kwargs.items() if v is not None}