Source code for superblockify.partitioning.approaches.bearing

"""Approach relating using edge bearings."""

from bisect import bisect_right
from typing import List, Set

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
from matplotlib import cm
from networkx import weakly_connected_components
from scipy.signal import find_peaks
from typing_extensions import deprecated

from ..base import BasePartitioner
from ... import attribute
from ...config import logger, Config
from ...plot import save_plot


[docs] @deprecated("BearingPartitioner does not necessarily produce a valid partitioning.") class BearingPartitioner(BasePartitioner): # pragma: no cover """Bearing partitioner. Partitions based on the edge bearings. """ # pylint: disable=too-many-instance-attributes def __init__(self, *args, **kwargs): """Construct a BearingPartitioner""" super().__init__(*args, **kwargs) self._bin_info = None self._inter_vals = {} self.attr_value_minmax = (0, 90) self.residential_graph = self.graph.copy() self.residential_graph.remove_edges_from( [ (u, v) for u, v, d in self.residential_graph.edges(data=True) if d["highway"] != "residential" or "residential" not in d["highway"] ] )
[docs] def partition_graph( self, make_plots=False, min_length=500, min_edge_count=5, **kwargs ): """Group by prominent bearing directions. Procedure to determine the graphs partitions based on the edges bearings. 1. Bin bearings. Only on residential edges. 2. Find peaks. 3. Determine boundaries/intervals corresponding to a partition. 4. (optional) Plot found peaks and interval splits. 5. Write partition attribute edges. The number of bins is fixed to 360°, as rules for numbers of bins [1]_ ask for much lower numbers of bins, for common number of edges in a street network (approx. 300 to 60.000 edges), which would produce a too small resolution (Sturges' formula: 10 to 17 bins; Square-root choice: 18 to 245 bins). Parameters ---------- make_plots : bool, optional If True show visualization graphs of the approach. min_length : float, optional Minimum component length in meters to be considered for partitioning. min_edge_count : int, optional Minimum component edge count to be considered for partitioning. Raises ------ ArithmeticError If no peaks are being found. References ---------- .. [1] Wikipedia contributors, "Histogram," Wikipedia, The Free Encyclopedia, https://en.wikipedia.org/w/index.php?title=Histogram&oldid=1113935482 (accessed December 14, 2022). """ # Binning self.__bin_bearings(num_bins=360) # Find peaks self.__find_peaks() if len(self._bin_info["peak_ind"]) < 1: raise ArithmeticError("No peaks were found.") if make_plots: fig, _ = self.plot_peakfinding() save_plot( self.results_dir, fig, f"{self.name}_peakfinding.{Config.PLOT_SUFFIX}" ) # Make boundaries self.__make_boundaries() if make_plots: fig, _ = self.plot_interval_splitting() save_plot( self.results_dir, fig, f"{self.name}_interval_splitting.{Config.PLOT_SUFFIX}", ) # Write grouping attribute to graph self.attribute_label = "bearing_group" self.attribute_dtype = str group_bearing = nx.get_edge_attributes(self.residential_graph, "bearing_90") logger.debug("Writing attribute 'bearing_group' to graph.") for edge, bearing in group_bearing.items(): if np.isnan(bearing): # pragma: no cover group_bearing[edge] = np.nan else: i = bisect_right(self._inter_vals["boundaries"], bearing) group_bearing[edge] = self._inter_vals["center_values"][i - 1] nx.set_edge_attributes(self.graph, group_bearing, self.attribute_label) # Write partition dict self.partitions = [ { "name": str(self._inter_vals["boundaries"][i : i + 2]), "value": center_val, } for (i, center_val) in enumerate(self._inter_vals["center_values"]) if center_val is not None ] # Make subgraphs for each partition # Overwrite `self.attribute_label` so residential edges are not added to # subgraphs. for node1, node2, key in self.graph.edges(keys=True): # check if edge is in self.residential_graph if (node1, node2) not in self.residential_graph.edges: self.graph.edges[node1, node2, key][self.attribute_label] = None self.make_subgraphs_from_attribute( split_disconnected=True, min_edge_count=min_edge_count, min_length=min_length, ) # Make sparsified graph self.set_sparsified_from_components() # Produces graph that may be not connected, so we only take the LCC. self.sparsified = self.graph.subgraph( max(weakly_connected_components(self.sparsified), key=len) )
def __bin_bearings(self, num_bins: int): """Construct histogram of `self.graph` bearings. Writes to bin_edges and bin_frequencies of `_bin_info`. Parameters ---------- num_bins : int, >= 360 Number of bins to split the bearings into / radial resolution. Raises ------ ValueError If `num_bins` is < 360. """ if num_bins < 360: raise ValueError( f"The number of bins needs to be greater or equal than 360, " f"but is {num_bins}." ) self._bin_info = {"num_bins": num_bins} # Write attribute where bearings are baked down modulo 90 degrees. attribute.new_edge_attribute_by_function( self.residential_graph, lambda bear: bear % 90, "bearing", "bearing_90" ) bins = ( np.arange((self._bin_info["num_bins"] * 2) + 1) * 90 / (self._bin_info["num_bins"] * 2) ) count, bin_edges = np.histogram( list(nx.get_edge_attributes(self.residential_graph, "bearing_90").values()), bins=bins, ) # move last bin to front, so eg 0.01° and 359.99° will be binned together count = np.roll(count, 1) bin_counts = count[::2] + count[1::2] # because we merged the bins, their edges are now only every other one self._bin_info["bin_edges"] = bin_edges[range(0, len(bin_edges), 2)] # Intensity self._bin_info["bin_frequency"] = bin_counts / bin_counts.sum() def __find_peaks(self): """Find peaks in the histogram of bearings. Writes to peak_ind and peak_values of `_bin_info`. `scipy.signal.find_peaks` is used to find the peaks and configured as follows: - height: minimum height of a peak: mean of the histogram - prominence: minimum prominence of a peak: one standard deviation - distance: minimum distance between two peaks: >= 0.4° Raises ------ AssertionError If bearing histogram is empty. """ if self._bin_info is None or not all( name in self._bin_info for name in ["num_bins", "bin_edges", "bin_frequency"] ): raise AssertionError( f"{self.__class__.__name__} has not been binned yet, " f"run `__bin_bearings` before finding peaks." ) # Add general info about histogram: mean, median, std, min, max self._bin_info["mean"] = np.mean(self._bin_info["bin_frequency"]) self._bin_info["medi"] = np.median(self._bin_info["bin_frequency"]) self._bin_info["std"] = np.std(self._bin_info["bin_frequency"]) self._bin_info["min"] = np.min(self._bin_info["bin_frequency"]) self._bin_info["max"] = np.max(self._bin_info["bin_frequency"]) self._bin_info["peak_ind"], self._bin_info["peak_props"] = find_peaks( self._bin_info["bin_frequency"], height=self._bin_info["mean"], # Required minimal height of peaks. prominence=self._bin_info["std"], # Required minimal prominence of peaks. distance=int(0.4 * self._bin_info["num_bins"] / 90), # Required minimal horizontal distance (>= 0.4°) between neighbouring peaks. ) def __make_boundaries(self): """Determine partition boundaries. Determine boundaries based on the binned data's peaks. Warnings -------- If left and right bases have identical intervals. """ # Find bases left and right of peak indices # Union of all bases bases = set(self._bin_info["peak_props"]["left_bases"]) | set( self._bin_info["peak_props"]["right_bases"] ) # For peak indices, find the closest base to the left and right. # That is for each self._bin_info["peak_ind"] the index with the biggest # value, but smaller than the peak index, and the index with the smallest # value, but bigger than the peak index. logger.debug("Base indices: %s", bases) logger.debug("Peak indices: %s", self._bin_info["peak_ind"]) left_right_bases = [ (max(b for b in bases if b < p), min(b for b in bases if b > p)) for p in self._bin_info["peak_ind"] ] logger.debug("Left and right bases: %s", left_right_bases) # Check if there are identical base pairs in left_right_bases. if len(left_right_bases) != len(set(left_right_bases)): logger.warning( "There are identical base pairs in left_right_bases. " "This means that there are two peaks with identical intervals." ) left_right_bases_values = [ (self._bin_info["bin_edges"][l], self._bin_info["bin_edges"][r]) for (l, r) in left_right_bases ] # Make boundary and value lists # So that for all values in [0, 90[ a center value is defined. # For all intervals without a peak, the center value is `None`. peak_values = [ self._bin_info["bin_edges"][p] for p in self._bin_info["peak_ind"] ] self._inter_vals["boundaries"] = [0] self._inter_vals["center_values"] = [None] for (l_val, r_val), value in zip(left_right_bases_values, peak_values): if self._inter_vals["boundaries"][-1] == l_val: self._inter_vals["boundaries"].append(r_val) self._inter_vals["center_values"] = self._inter_vals["center_values"][ :-1 ] + [value, None] else: self._inter_vals["boundaries"].extend((l_val, r_val)) self._inter_vals["center_values"] = self._inter_vals[ "center_values" ] + [value, None] if self._inter_vals["boundaries"][-1] == 90: self._inter_vals["center_values"].pop() else: self._inter_vals["boundaries"].append(90) # Find and merge repeated/overlaying intervals. ( self._inter_vals["boundaries"], self._inter_vals["center_values"], ) = self.__find_and_merge_intervals( self._inter_vals["boundaries"], self._inter_vals["center_values"] ) # Log the boundaries and center values logger.debug("Boundaries: %s", self._inter_vals["boundaries"]) logger.debug("Center values: %s", self._inter_vals["center_values"]) # For plotting self._inter_vals["base_vals"] = dict( zip(left_right_bases_values, self._bin_info["peak_ind"]) ) @staticmethod def __find_and_merge_intervals(boundaries, center_values): """Find and merge duplicate intervals. Find repeated intervals where boundaries violate strict monotone increasing order and have the same start and end value. In the center values for each repeated interval there is the center value and a `None` value. The `None` values are removed and the repeated intervals are merged, by taking the arithmetic mean of their center values. Parameters ---------- boundaries : list of float or int List of boundaries of intervals. center_values : list of float or int or None List of center values of intervals. If there is no center value, `None`. Returns ------- list of float or int List of boundaries of intervals. list of float or int or None List of center values of intervals. If there is no center value, `None`. Raises ------ AssertionError If the length of `boundaries` is not one more than the length of `center_values`. TypeError If `boundaries` is not a numerical list. TypeError If `center_values` is not a list filled with numerical values or `None`. For repeated intervals the center values cannot be `None`. """ # Type check if not isinstance(boundaries, list): raise TypeError("Boundaries must be a list.") if not isinstance(center_values, list): raise TypeError("Center values must be a list.") if not all(isinstance(b, (int, float)) for b in boundaries): raise TypeError( "The values in boundaries must be of type int (excluding bool) or " "float." ) # As bool is a subclass of int, exclude bool explicitly. if not all( isinstance(c, (int, float, type(None))) and not isinstance(c, bool) for c in center_values ): raise TypeError( "The values in center_values must be of type int, float, and None." ) # Length check if len(boundaries) != len(center_values) + 1: raise AssertionError( "The length of `boundaries` is not one more than the length of " "`center_values`." ) # Find indices of repeated intervals and their center values # If two consecutive boundary pairs are equal, the interval is repeated. indices = { i: { "boundaries": [boundaries[i], boundaries[i + 1]], "center_values": center_values[i], } for i in range(len(boundaries) - 1) if boundaries[i : i + 2] == boundaries[i + 2 : i + 4] or boundaries[i : i + 2] == boundaries[i - 2 : i] } # Merge the repeated intervals by unioning their boundaries and taking the # arithmetic mean of their center values. unique_boundaries = np.unique( [indices[i]["boundaries"] for i in indices], axis=0 ).tolist() repeating_intervals = [ { "interval": interval, "indices": [i for i in indices if indices[i]["boundaries"] == interval], } for interval in unique_boundaries ] # Add the center values to the repeating intervals for interval in repeating_intervals: interval["center_values"] = [center_values[i] for i in interval["indices"]] # Throw an error if there are intervals with where numerical and None values # are mixed. for interval in repeating_intervals: if any(c is None for c in interval["center_values"]) and any( c is not None for c in interval["center_values"] ): raise TypeError( "The center values of intervals cannot be mixed with None values." ) # Drop repeating intervals with center values all None repeating_intervals = [ interval for interval in repeating_intervals if any(c is not None for c in interval["center_values"]) ] # Sort repeating intervals by indices, descending repeating_intervals = sorted( repeating_intervals, key=lambda x: x["indices"][0], reverse=True ) # From high to low, replace the boundaries and center values of the repeating # intervals with the arithmetic mean of their center values. Also remove the # None values. for interval in repeating_intervals: boundaries = ( boundaries[: interval["indices"][0]] + interval["interval"] + boundaries[interval["indices"][-1] + 2 :] ) center_values = ( center_values[: interval["indices"][0]] + [np.mean(interval["center_values"])] + center_values[interval["indices"][-1] + 1 :] ) return boundaries, center_values
[docs] @staticmethod def group_overlapping_intervals(left_bases, right_bases): """Find groups of overlapping intervals. *Unused at the moment.* Parameters ---------- left_bases : numpy.array List of left bases of intervals. right_bases : numpy.array List of right bases of intervals. Returns ------- list List of sets of overlapping intervals. Raises ------ TypeError If `left_bases` or `right_bases1` are not numpy arrays of 1d shape. ValueError If `left_bases` and `right_bases` are not of the same length. Examples -------- >>> left_bases = [0, 0, 1, 3, 4, 9] >>> right_bases1 = [1, 2, 3, 4, 10, 10] >>> group_overlapping_intervals(left_bases, right_bases) [{0, 1, 2}, {4, 5}] """ if not all(isinstance(arr, np.ndarray) for arr in [left_bases, right_bases]): raise TypeError( f"Input lists must be 1d numpy arrays, " f"got types {type(left_bases)} and {type(right_bases)}." ) if not all(arr.ndim == 1 for arr in [left_bases, right_bases]): raise TypeError( f"Input lists must be of 1d shape, " f"got shapes {left_bases.shape} and {right_bases.shape}." ) if len(left_bases) != len(right_bases): raise ValueError( f"Input lists must be of the same length, " f"length of left_bases is {len(left_bases)} and " f"length of right_bases is {len(right_bases)}." ) mask = (left_bases < right_bases[:, None]) & (right_bases > left_bases[:, None]) # scales badly with n^2; optimizable overlaps = np.triu(mask, k=1).nonzero() overlap_groups = [] for group_1, group_2 in tuple(zip(*overlaps)): if len(overlap_groups) == 0: overlap_groups.append({group_1, group_2}) else: added = False for i, sublist in enumerate(overlap_groups): if group_1 in sublist: overlap_groups[i].add(group_2) added = True break if group_2 in sublist: overlap_groups[i].add(group_1) added = True break if not added: overlap_groups.append({group_1, group_2}) # Merge overlap groups that were split by unique bases overlap_groups = BearingPartitioner.merge_sets(overlap_groups) # # add missing groups that are not overlapping anything - not neccessary # overlap_groups += [{i} for i in range(len(left_bases)) if i not in # [group for ol_group in overlap_groups # for group in ol_group]] return overlap_groups
[docs] @staticmethod def merge_sets(sets: List[Set]) -> List[Set]: """Merge sets that share at least one element. Parameters ---------- sets : list of sets List of sets to merge. Returns ------- list of sets List of merged sets. Raises ------ TypeError If `sets` is not a list of sets. Examples -------- >>> sets = [{1, 2}, {2, 3}, {4, 5}] >>> merge_sets(sets) [{1, 2, 3}, {4, 5}] >>> sets = [{'a', 'b'}, {'m', 3.4}, {'b', 'c'}] >>> merge_sets(sets) [{'a', 'b', 'c'}, {'m', 3.4}] """ if not isinstance(sets, list): raise TypeError(f"Input must be a list, got {type(sets)}.") if not all(isinstance(s, set) for s in sets): raise TypeError(f"Input must be a list of sets, got {type(sets)}.") for i, group in enumerate(sets): for j, group2 in enumerate(sets): if i != j and group & group2: sets[i] = group | group2 sets.pop(j) return sets
[docs] def plot_peakfinding(self): """Show the histogram and found peaks. Execute `run` before or plot during running (`run(show_plots=True)`). Raises ------ AssertionError If peakfinding has not been done yet. Returns ------- fig, axe : tuple matplotlib figure, axis """ if self._bin_info is None or not all( name in self._bin_info for name in [ "num_bins", "bin_edges", "bin_frequency", "peak_ind", "peak_props", "mean", "medi", "std", "min", "max", ] ): raise AssertionError( f"{self.__class__.__name__} has not been binned yet, " f"run `__bin_bearings` and `__find_peaks` before plotting graph." ) # Setup figure and axis fig, axe = plt.subplots(figsize=(12, 8)) # Max peak height self._inter_vals["max_height"] = max( self._bin_info["peak_props"]["prominences"] ) # Color peaks with colormap self._inter_vals["colors"] = cm.hsv( self._bin_info["peak_ind"] / self._bin_info["num_bins"] ) # Construct list of peak boxes from left and right bases self._inter_vals["boxes"] = [ ( self._bin_info["bin_edges"][ self._bin_info["peak_props"]["left_bases"][i] ], self._bin_info["bin_edges"][ self._bin_info["peak_props"]["right_bases"][i] ] - self._bin_info["bin_edges"][ self._bin_info["peak_props"]["left_bases"][i] ], ) for i in range(len(self._bin_info["peak_ind"])) ] # Draw list of overlapping boxes behind lower half histogram axe.broken_barh( self._inter_vals["boxes"], (0, self._inter_vals["max_height"] * 0.45), alpha=0.2, facecolors=self._inter_vals["colors"], edgecolors=self._inter_vals["colors"], ) # Draw list of non-overlapping boxes behind upper half histogram for i in range(len(self._bin_info["peak_ind"])): axe.broken_barh( [self._inter_vals["boxes"][i]], ( self._inter_vals["max_height"] * 0.5 * (1 + i / len(self._bin_info["peak_ind"])), self._inter_vals["max_height"] / (2 * len(self._bin_info["peak_ind"])), ), alpha=0.6, facecolors=self._inter_vals["colors"][i], edgecolors=self._inter_vals["colors"][i], ) # Draw histogram plt.bar( self._bin_info["bin_edges"][:-1], self._bin_info["bin_frequency"], width=90 / self._bin_info["num_bins"], # edgecolor='k' alpha=0.8, ) # Draw horizontal lines for min, max, mean, median and std l_min = axe.axhline( self._bin_info["min"], color="mediumblue", linestyle="--", linewidth=1 ) l_max = axe.axhline( self._bin_info["max"], color="crimson", linestyle="--", linewidth=1 ) l_mean = axe.axhline( self._bin_info["mean"], color="k", linestyle="--", linewidth=1.5 ) l_median = axe.axhline( self._bin_info["medi"], color="k", linestyle="--", linewidth=1 ) l_std_up = axe.axhline( self._bin_info["mean"] + self._bin_info["std"], color="orange", linestyle="--", linewidth=1, ) axe.axhline( self._bin_info["mean"] - self._bin_info["std"], color="orange", linestyle="--", linewidth=1, ) # Mark peaks with x plt.scatter( [self._bin_info["bin_edges"][i] for i in self._bin_info["peak_ind"]], [self._bin_info["bin_frequency"][i] for i in self._bin_info["peak_ind"]], marker="x", ) # Calculate midpoints between peaks midpoints_idx = np.array( (self._bin_info["peak_ind"][1:] + self._bin_info["peak_ind"][:-1]) / 2, dtype=int, ) # Draw midpoints for i in midpoints_idx: plt.axvline(self._bin_info["bin_edges"][i], color="black", alpha=0.25) # Show custom legend with min, max, mean, median and std in scientific notation axe.legend( [l_min, l_max, l_mean, l_std_up, l_median], [ f"Min: {self._bin_info['min']:.2e}", f"Max: {self._bin_info['max']:.2e}", f"Mean: {self._bin_info['mean']:.2e}", f"$\\pm$Std: {self._bin_info['std']:.2e}", f"Median: {self._bin_info['medi']:.2e}", ], loc="upper left", ) plt.xticks([0, 15, 30, 45, 60, 75, 90]) plt.xticks(np.linspace(0, 90, 91), minor=True) plt.xlabel(r"Direction ($\degree$)") plt.ylabel("Density") plt.title(f"Bearing histogram of {self.name}") return fig, axe
[docs] def plot_interval_splitting(self): """Plot the split up of peak bases into intervals. Show how the peaks with their overlapping left and right bases are being split up into non overlapping intervals. Raises ------ AssertionError If boundaries have not been partitioned yet. Returns ------- fig, axe : tuple matplotlib figure, axis """ if not all( name in self._inter_vals for name in ["base_vals", "boundaries", "center_values"] ): raise AssertionError( f"{self.__class__.__name__}'s boundaries have not been partitioned " f"yet, run `__make_boundaries` before plotting graph." ) fig, axe = plt.subplots(figsize=(12, 8)) for i in range(len(self._bin_info["peak_ind"])): axe.broken_barh( [self._inter_vals["boxes"][i]], ( self._inter_vals["max_height"] * 0.5 * (i / len(self._bin_info["peak_ind"])), self._inter_vals["max_height"] / (2 * len(self._bin_info["peak_ind"])), ), alpha=0.6, facecolors=self._inter_vals["colors"][i], edgecolors=self._inter_vals["colors"][i], ) axe.broken_barh( [self._inter_vals["boxes"][i]], ( 0, self._inter_vals["max_height"] * 0.5 * (i / len(self._bin_info["peak_ind"])), ), alpha=0.1, facecolors=self._inter_vals["colors"][i], edgecolors=self._inter_vals["colors"][i], ) for i, interval in enumerate(self._inter_vals["base_vals"]): col = cm.hsv(sum(interval) / 180) axe.broken_barh( [(interval[0], interval[1] - interval[0])], ( -self._inter_vals["max_height"] / (2 * len(self._bin_info["peak_ind"])), self._inter_vals["max_height"] / (2 * len(self._bin_info["peak_ind"])), ), alpha=0.6, facecolors=col, edgecolors=col, ) axe.broken_barh( [(interval[0], interval[1] - interval[0])], ( -self._inter_vals["max_height"] * (2 + i) / (2 * len(self._bin_info["peak_ind"])), self._inter_vals["max_height"] / (2 * len(self._bin_info["peak_ind"])), ), alpha=0.6, facecolors=col, edgecolors=col, ) return fig, axe