Source code for lusos.area_statistics
from typing import List, TypeVar
import geopandas as gpd
import numba
import numpy as np
import xarray as xr
from lusos.geometry import ops
LassoGrid = TypeVar("LassoGrid")
def calc_areal_percentage_in_cells(
polygons: gpd.GeoDataFrame, lasso_grid: LassoGrid, units: List[str]
) -> xr.DataArray:
"""
Calculate in each grid cell the proportion of the area that is covered by each polygon
in a GeoDataFrame.
Parameters
----------
polygons : gpd.GeoDataFrame
GeoDataFrame containing the polygons to calculate the areal percentages for.
lasso_grid : LassoGrid
LassoGrid instance containing the raster grid to calculate the percentages for.
units : List[str]
List of unique units to calculate the areal percentages for.
Returns
-------
xr.DataArray
3D DataArray with the areal percentages.
"""
# Needs unique polygons, otherwise area calculation goes wrong
polygons = polygons.explode()
polygon_area = ops.polygon_area_in_grid(polygons, lasso_grid.dataarray())
polygon_area.polygon[:] = polygons["idx"].values[polygon_area.polygon]
cellarea = np.abs(lasso_grid.xsize * lasso_grid.ysize)
area_grid = lasso_grid.empty_array(units, False)
area_grid.values = area_to_grid3d(polygon_area, area_grid.values)
area_grid = area_grid / cellarea
return area_grid
[docs]
def calculate_model_flux(model: gpd.GeoDataFrame, grid: LassoGrid) -> xr.DataArray:
"""
Calculate a weighted flux per cell in a 2D grid from Somers emissions data.
Parameters
----------
model : gpd.GeoDataFrame
GeoDataFrame containing model emissions per polygon.
grid : LassoGrid
2D grid to calculate the emission fluxes for.
Returns
-------
xr.DataArray
2D grid with weighted emission flux per cell.
"""
flux_grid = grid.dataarray(np.nan)
model = model.explode()
area = ops.polygon_area_in_grid(model, flux_grid)
flux = model["flux_m2"].values[area.polygon]
flux_grid.values = flux_to_grid(flux, area, flux_grid.values)
return flux_grid
@numba.njit
def area_to_grid3d(
polygon_area: ops.PolygonGridArea, area_grid: np.ndarray
) -> np.ndarray:
"""
Translate calculated areas for polygons per grid cell into a 3D grid.
Parameters
----------
polygon_area : ops.PolygonGridArea
_description_
area_grid : np.ndarray
_description_
Returns
-------
np.ndarray
_description_
"""
_, nx, nz = area_grid.shape
min_idx = 0
for i in range(len(polygon_area.cell_idx)):
cell_idx = polygon_area.cell_idx[i]
nitems = polygon_area.nitems[i]
max_idx = min_idx + nitems
polygons = polygon_area.polygon[min_idx:max_idx]
area = polygon_area.area[min_idx:max_idx]
area = np.bincount(polygons, weights=area, minlength=nz)
row, col = np.divmod(cell_idx, nx)
area_grid[row, col, :] = area
min_idx += nitems
return area_grid
@numba.njit
def flux_to_grid(
flux: np.ndarray, area: ops.PolygonGridArea, grid: np.ndarray
) -> np.ndarray:
"""
Translate Somers emissions for polygons to a 2D grid containing a weighted flux by
area based on the calculated area of each polygon in a cell.
Parameters
----------
flux : np.ndarray
_description_
area : ops.PolygonGridArea
_description_
grid : np.ndarray
_description_
Returns
-------
np.ndarray
_description_
"""
_, nx = grid.shape
min_idx = 0
for i in range(len(area.cell_idx)):
cell_idx = area.cell_idx[i]
nitems = area.nitems[i]
max_idx = min_idx + nitems
f = flux[min_idx:max_idx]
a = area.area[min_idx:max_idx]
row, col = np.divmod(cell_idx, nx)
grid[row, col] = _weighted_average(f, a)
min_idx += nitems
return grid
@numba.njit
def _weighted_average(flux: np.ndarray, area: np.ndarray) -> np.ndarray:
"""
Helper function for weighted average in Numba compiled functions.
"""
return np.sum(flux * area) / np.sum(area)