Source code for superblockify.population.approximation

"""Population approximation for the superblockify package.

See reference notebook for a detailed description of the population approximation.
"""

from functools import partial
from multiprocessing import Pool

from geopandas import GeoDataFrame
from numpy import float32, sum as npsum, zeros
from rasterio import open as rasopen
from rasterio.features import shapes
from shapely import STRtree
from shapely.geometry import shape
from tqdm import tqdm

from .ghsl import get_ghsl, resample_load_window
from .tessellation import get_edge_cells
from ..config import logger


[docs] def add_edge_population(graph, overwrite=False, **tess_kwargs): """Add edge population to edge attributes in the graph. Calculates the population and area of the edges. First tessellates the edges and then determines the population with GHSL data. Function writes to edge attributes `population` and `area` of the graph in-place. Furthermore, `cell_id` is added to the edge attributes, for easier summary of statistics later. The graph attribute `edge_population` is set to True. With this information, population densities can be calculated for arbitrary subsets of edges. Parameters ---------- graph : networkx.MultiDiGraph The graph to tessellate. overwrite : bool, optional If True, overwrite existing population and area attributes. Only depends on the graph attribute `edge_population` and not on the actual attributes. **tess_kwargs Keyword arguments for the :func:`superblockify.population.tessellation.get_edge_cells` function. Raises ------ ValueError If the graph already has population and area attributes and `overwrite` is False. ValueError If the graph is not in a projected coordinate system. ValueError If the limit and the edge points are disjoint. Notes ----- The graph must be in a projected coordinate system. """ if graph.graph.get("edge_population", False) is True and overwrite is False: raise ValueError( "The graph already has population and area attributes. " "Use `overwrite=True` to overwrite them." ) edge_population = get_edge_population(graph, **tess_kwargs) logger.debug("Adding population and area to edges.") for edge_keys, population, geometry in edge_population[ ["population", "geometry"] ].itertuples(): for edge_key in edge_keys: # Absolute population for area enclosed in edge cell (people) graph.edges[edge_key]["population"] = float32(population) # Area of edge cell (m²) graph.edges[edge_key]["area"] = float32(geometry.area) # Cell ID of edge cell graph.edges[edge_key]["cell_id"] = edge_population.index.get_loc(edge_keys) # Note in graph attributes that population has been added graph.graph["edge_population"] = True
[docs] def get_population_area(graph): """Calculate the population of a graph or subgraph. Calculates the population and area of the graph. Parameters ---------- graph : networkx.MultiDiGraph Graph or subgraph. Must have edge attributes `population`, `area` and `cell_id`. Returns ------- population : float Population of the subgraph. area : float Area of the subgraph. Raises ------ ValueError If the graph does not have the population attributes. """ # Check if the graph has edges if graph.number_of_edges() == 0: return 0.0, 0.0 # Check if the graph has population attributes if graph.graph.get("edge_population", False) is False: raise ValueError( "The graph does not have the population attributes. " "Use `add_edge_population` to add them." ) # Get population, area and cell_id of edges population = [] area = [] cell_id = [] for _, _, data in graph.edges(data=True): if data["cell_id"] not in cell_id and isinstance( data["population"], (float, float32) ): population.append(data["population"]) area.append(data["area"]) cell_id.append(data["cell_id"]) return npsum(population), npsum(area)
[docs] def get_edge_population(graph, batch_size=10000, **tess_kwargs): """Get edge population for the graph. Calculates the population and area of the edge. First tessellates the edges and then determines the population with GHSL data. The population distribution process is parallelized with multiprocessing in batches of edges. Parameters ---------- graph : networkx.MultiDiGraph The graph to tessellate. batch_size : int, optional Number of edges to process in one batch. By default, 10000. It must be greater than 0. If it is greater than the number of edges, all edges are processed in one batch. **tess_kwargs Keyword arguments for the :func:`superblockify.population.tessellation.get_edge_cells` function. Returns ------- geopandas.GeoDataFrame A GeoDataFrame with the tuple of edge keys as index and the population and area of the edge as columns, as well as the tessellation cells as geometry. The CRS will be in World Mollweide. Raises ------ ValueError If the batch size is not greater than 0. ValueError If the graph is not in a projected coordinate system. ValueError If the limit and the edge points are disjoint. Notes ----- The graph must be in a projected coordinate system. Output CRS is World Mollweide. It uses the STRtree index to speed up the intersection. [1]_ References ---------- .. [1] Leutenegger, Scott T.; Edgington, Jeffrey M.; Lopez, Mario A. (February 1997). "STR: A Simple and Efficient Algorithm for R-Tree Packing". https://ia600900.us.archive.org/27/items/nasa_techdoc_19970016975/19970016975.pdf """ if not isinstance(batch_size, (float, int)): raise ValueError(f"Batch size must be numeric, but is {batch_size}.") if batch_size <= 0: raise ValueError(f"Batch size must be greater than 0, but is {batch_size}.") edge_cells = get_edge_cells(graph, **tess_kwargs) # Project to World Mollweide edge_cells = edge_cells.to_crs("World Mollweide") bbox_moll = edge_cells.union_all().buffer(100).bounds ghsl_file = get_ghsl(bbox_moll) with rasopen(ghsl_file) as src: load_window = src.window(*bbox_moll) ghsl_polygons = load_ghsl_as_polygons(ghsl_file, window=load_window) # Build STRtree index logger.debug("Building STRtree index.") ghsl_polygons_index = STRtree(ghsl_polygons.geometry) # Add columns for population and area edge_cells["population"] = 0.0 batch_size = int(min(batch_size, len(edge_cells))) with Pool() as pool: slices = ( slice( n_batch * batch_size, min((n_batch + 1) * batch_size, len(edge_cells)) ) for n_batch in range(0, len(edge_cells) // batch_size + 1) ) population_sums = list( tqdm( pool.imap_unordered( partial( _population_fraction_list_sliced, ghsl_polygons["geometry"].values, ghsl_polygons["population"].values, ghsl_polygons_index, edge_cells["geometry"].values, ), slices, ), desc="Distributing population over road cells", total=len(ghsl_polygons) // batch_size + 1, unit="Cells", unit_scale=batch_size, unit_divisor=batch_size, ) ) # write the results to the dataframe for _, (cell_slice, population) in enumerate(population_sums): edge_cells.loc[edge_cells.index[cell_slice], "population"] = population return edge_cells
# Marked as `no cover` as it is tested, but as a forked process with `multiprocessing`
[docs] def population_fraction(ghsl_polygon, population, road_cell): # pragma: no cover """Function returns fractional population count between road_cell and ghsl_polygon. Parameters ---------- ghsl_polygon : shapely.geometry.Polygon Polygon of GHSL cell. population : float Population of GHSL cell. road_cell : shapely.geometry.Polygon Polygon of road cell. Returns ------- float Fractional population count between road_cell and ghsl_polygon. """ intersection = road_cell.intersection(ghsl_polygon) return population * intersection.area / ghsl_polygon.area
def _population_fraction_list( ghsl_polygons, ghsl_populations, overlap_index_pairs, road_cell_geometries ): # pragma: no cover """Function returns population count for each road cell in road_cell_geometries Parameters ---------- ghsl_polygons : list of shapely.geometry.Polygon List of GHSL cells. ghsl_populations : list of float List of GHSL populations. overlap_index_pairs : ndarray with shape (2, n) Array of indices of overlapping road cells and GHSL cells. The first row contains the indices of the road cells, and the second row contains the indices of the GHSL cells. road_cell_geometries : list of shapely.geometry.Polygon Returns ------- ndarray with shape (n,) Array of population counts for each road cell in road_cell_geometries. """ population = zeros(len(road_cell_geometries)) for road_cell_idx, pop_cell_idx in overlap_index_pairs: population[road_cell_idx] += population_fraction( ghsl_polygons[pop_cell_idx], ghsl_populations[pop_cell_idx], road_cell_geometries[road_cell_idx], ) return population def _population_fraction_list_sliced( ghsl_polygons, ghsl_populations, ghsl_polygons_index, road_cell_geometries, slice_n ): # pragma: no cover """Function for the parallelization of _population_fraction_list. Works like :func:`_population_fraction_list`, but takes all the road cells and only determines the population for the road cells in slice_n. Parameters ---------- ghsl_polygons : list of shapely.geometry.Polygon List of GHSL cells. ghsl_populations : list of float List of GHSL populations. ghsl_polygons_index : shapely.strtree.STRtree STRtree index of ghsl_polygons. road_cell_geometries : list of shapely.geometry.Polygon List of road cells. slice_n : slice Slice of road cells to determine the population for. Returns ------- slice, ndarray with shape (n,) Slice of road cells and array of population counts for each road cell in road_cell_geometries[slice_n]. """ return slice_n, _population_fraction_list( ghsl_polygons, ghsl_populations, ghsl_polygons_index.query( road_cell_geometries[slice_n], predicate="intersects" ).T, road_cell_geometries[slice_n], )
[docs] def load_ghsl_as_polygons(file, window=None): """Get polygonized GHSL data. Polygonizes the GHSL population raster data and returns the population in a GeoDataFrame. Area with no population is not included. Parameters ---------- file : str Path to the raster file. It Can be a tile or the whole raster. window : rasterio.windows.Window, optional Window of the raster to resample. If None, the whole raster will be loaded. Returns ------- geopandas.GeoDataFrame A GeoDataFrame derived from the GHSL population raster data. Includes geometry and population columns. Notes ----- When not passing a window, the whole raster will be loaded. Make sure the raster is not too big. """ logger.debug("Loading GHSL data for window %s from file %s.", window, file) ghsl_unsampled, affine_unsampled = resample_load_window(file=file, window=window) # convert to float32 ghsl_unsampled = ghsl_unsampled.astype(float32) # Make shapes ghsl_shapes = [ {"population": pop, "geometry": shp} for _, (shp, pop) in enumerate( shapes(ghsl_unsampled, transform=affine_unsampled) ) if pop > 0 ] ghsl_polygons = GeoDataFrame( geometry=[shape(geom["geometry"]) for geom in ghsl_shapes], data=[geom["population"] for geom in ghsl_shapes], columns=["population"], crs="World Mollweide", ) return ghsl_polygons