Source code for city2graph.proximity

"""
Proximity-Based Graph Generation Module.

This module provides comprehensive functionality for generating graph networks based
on spatial proximity relationships between geographic features. It implements several
classical proximity models commonly used in spatial network analysis and geographic
information systems. The module is particularly useful for constructing heterogeneous
graphs from multiple domains of geospatial relations, enabling complex spatial analysis
across different feature types and scales.
"""

# Future annotations for type hints
from __future__ import annotations

# Standard library imports
import logging
from itertools import combinations
from itertools import permutations
from typing import Any
from typing import Literal
from typing import cast

# Third-party imports
import geopandas as gpd
import libpysal
import networkx as nx
import numpy as np
import numpy.typing as npt
import pandas as pd
from scipy.spatial import Delaunay
from scipy.spatial import distance as sdist
from shapely.geometry import LineString
from sklearn.neighbors import NearestNeighbors

# Local imports
from .utils import gdf_to_nx
from .utils import nx_to_gdf
from .utils import validate_gdf

# Type checking imports

# Module logger configuration
logger = logging.getLogger(__name__)

__all__ = [
    "bridge_nodes",
    "contiguity_graph",
    "delaunay_graph",
    "euclidean_minimum_spanning_tree",
    "fixed_radius_graph",
    "gabriel_graph",
    "group_nodes",
    "knn_graph",
    "relative_neighborhood_graph",
    "waxman_graph",
]

# Simple type alias for readability
EdgePair = tuple[Any, Any]


# ----------------------------------------------------------------------------
# Internal helpers (new)
# ----------------------------------------------------------------------------


def _normalize_metric(metric: object) -> str:
    """
    Return a normalised distance metric name.

    Falls back to "euclidean" when the provided value is falsy or not a string.
    The function intentionally performs only light validation; callers that have
    a restricted set (e.g., ``contiguity_graph``) should still perform explicit
    membership checks to keep error messages and behavior stable.

    Parameters
    ----------
    metric : object
        Candidate distance metric value. If a non-empty string, it is lowercased
        and returned. Any other value results in "euclidean".

    Returns
    -------
    str
        Normalised metric string (e.g., "euclidean", "manhattan", or "network").
    """
    if not isinstance(metric, str) or not metric:
        return "euclidean"
    return metric.lower()


def _get_network_distance_matrix(
    metric: str,
    coords: npt.NDArray[np.floating],
    network_gdf: gpd.GeoDataFrame | None,
    crs: gpd.GeoDataFrame | gpd.GeoSeries | None,
) -> npt.NDArray[np.floating] | None:
    """
    Return a network distance matrix when ``metric == 'network'``.

    This tiny helper centralises a common pattern across generators: only
    compute the distance matrix when the network metric is requested; otherwise
    return ``None`` and let callers use their metric-specific fast paths.

    Parameters
    ----------
    metric : str
        Normalised distance metric name.
    coords : numpy.ndarray
        Array of node coordinates with shape ``(n, 2)``.
    network_gdf : geopandas.GeoDataFrame or None
        Network edges GeoDataFrame. Required when ``metric == 'network'``.
    crs : object
        CRS information for validation against ``network_gdf`` when present.

    Returns
    -------
    numpy.ndarray or None
        Network distance matrix or ``None`` when ``metric`` is not "network".
    """
    if metric == "network":  # hot path early exit for common euclidean/manhattan cases
        return _distance_matrix(coords, "network", network_gdf, getattr(crs, "crs", crs))
    return None


# ============================================================================
# GRAPH GENERATORS
# ============================================================================


[docs] def knn_graph( gdf: gpd.GeoDataFrame, k: int = 5, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, *, target_gdf: gpd.GeoDataFrame | None = None, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: r""" Generate a k-nearest neighbour graph from a GeoDataFrame of points. This function constructs a graph where each node is connected to its k nearest neighbors based on the specified distance metric. The resulting graph captures local spatial relationships and is commonly used in spatial analysis, clustering, and network topology studies. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing the points (nodes) for the graph. The index of this GeoDataFrame will be used as node IDs. k : int, default 5 The number of nearest neighbors to connect to each node. Must be positive and less than the total number of nodes. distance_metric : str, default "euclidean" The distance metric to use for calculating nearest neighbors. Options are: - "euclidean": Straight-line distance - "manhattan": City-block distance (L1 norm) - "network": Shortest path distance along a network network_gdf : geopandas.GeoDataFrame, optional A GeoDataFrame representing a network (e.g., roads, paths) to use for "network" distance calculations. Required if `distance_metric` is "network". target_gdf : geopandas.GeoDataFrame, optional If provided, creates a directed graph where edges connect nodes from `gdf` to their k nearest neighbors in `target_gdf`. If None, creates an undirected graph within `gdf` itself. as_nx : bool, default False If True, returns a NetworkX graph object. Otherwise, returns a tuple of GeoDataFrames (nodes, edges). Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If `as_nx` is False, returns a tuple of GeoDataFrames: - nodes_gdf: GeoDataFrame of nodes with spatial and attribute information - edges_gdf: GeoDataFrame of edges with 'weight' and 'geometry' attributes If `as_nx` is True, returns a NetworkX graph object with spatial attributes. Raises ------ ValueError If `distance_metric` is "network" but `network_gdf` is not provided. If `k` is greater than or equal to the number of available nodes. See Also -------- delaunay_graph : Generate a Delaunay triangulation graph. fixed_radius_graph : Generate a fixed-radius graph. waxman_graph : Generate a probabilistic Waxman graph. Examples -------- >>> data = { ... 'id': [f'node_{i}' for i in range(6)], ... 'type': ['residential', 'commercial', 'industrial', 'park', 'school', 'hospital'], ... 'geometry': [Point(x, y) for x, y in coords] ... } >>> gdf = gpd.GeoDataFrame(data, crs="EPSG:4326").set_index('id') >>> print("Input GeoDataFrame:") >>> print(gdf.head(3)) type geometry id node_0 residential POINT (3.745 9.507) node_1 commercial POINT (7.319 5.987) node_2 industrial POINT (1.560 0.581) >>> # Generate a 3-nearest neighbor graph >>> nodes_gdf, edges_gdf = knn_graph(gdf, k=3, distance_metric="euclidean") >>> print(f"\\nNodes GDF shape: {nodes_gdf.shape}") >>> print(f"Edges GDF shape: {edges_gdf.shape}") Nodes GDF shape: (6, 2) Edges GDF shape: (18, 2) >>> print("\\nSample edges with weights:") >>> print(edges_gdf[['weight']].head(3)) weight 0 4.186842 1 6.190525 2 8.944272 >>> # Generate with Manhattan distance >>> nodes_manhattan, edges_manhattan = knn_graph( ... gdf, k=2, distance_metric="manhattan" ... ) >>> print(f"\\nManhattan edges count: {len(edges_manhattan)}") Manhattan edges count: 12 >>> # Generate as NetworkX graph >>> G = knn_graph(gdf, k=3, as_nx=True) >>> print(f"\\nNetworkX graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges") >>> print(f"Average degree: {2 * G.number_of_edges() / G.number_of_nodes():.2f}") NetworkX graph: 6 nodes, 9 edges Average degree: 3.00 >>> # Directed graph example with target_gdf >>> target_data = { ... 'id': ['target_1', 'target_2'], ... 'service': ['hospital', 'school'], ... 'geometry': [Point(5, 5), Point(8, 8)] ... } >>> target_gdf = gpd.GeoDataFrame(target_data, crs="EPSG:4326").set_index('id') >>> nodes_dir, edges_dir = knn_graph(gdf, k=1, target_gdf=target_gdf) >>> print(f"\\nDirected graph edges: {len(edges_dir)} (each source node → 1 target)") Directed graph edges: 6 (each source node → 1 target) """ # Handle directed variant if target_gdf is not None: return _directed_graph( src_gdf=gdf, dst_gdf=target_gdf, distance_metric=distance_metric, method="knn", param=k, as_nx=as_nx, network_gdf=network_gdf, ) # Prepare nodes and handle trivial cases G, coords, node_ids = _prepare_nodes(gdf) if len(coords) <= 1 or k <= 0: return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) # Generate edges based on distance metric dm = None distance_metric = _normalize_metric(distance_metric) if distance_metric == "network": dm = _get_network_distance_matrix(distance_metric, coords, network_gdf, gdf) assert dm is not None # for type-checker: ensured by distance_metric branch order = np.argsort(dm, axis=1)[:, 1 : k + 1] edges = [ (node_ids[i], node_ids[j]) for i in range(len(node_ids)) for j in order[i] if dm[i, j] < np.inf ] else: nn_metric = "cityblock" if distance_metric == "manhattan" else "euclidean" n_neigh = min(k + 1, len(coords)) nn = NearestNeighbors(n_neighbors=n_neigh, metric=nn_metric).fit(coords) _, idxs = nn.kneighbors(coords) edges = [(node_ids[i], node_ids[j]) for i, neigh in enumerate(idxs) for j in neigh[1:]] # Add edges with weights and geometries _add_edges(G, edges, coords, node_ids, metric=distance_metric, dm=dm, network_gdf=network_gdf) return G if as_nx else nx_to_gdf(G, nodes=True, edges=True)
[docs] def delaunay_graph( gdf: gpd.GeoDataFrame, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, *, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: """ Generate a Delaunay triangulation graph from a GeoDataFrame of points. This function constructs a graph based on the Delaunay triangulation of the input points. Each edge in the graph corresponds to an edge in the Delaunay triangulation. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing the points (nodes) for the graph. The index of this GeoDataFrame will be used as node IDs. distance_metric : str, default "euclidean" The distance metric to use for calculating edge weights. Options are "euclidean", "manhattan", or "network". network_gdf : geopandas.GeoDataFrame, optional A GeoDataFrame representing a network (e.g., roads) to use for "network" distance calculations. Required if `distance_metric` is "network". as_nx : bool, default False If True, returns a NetworkX graph object. Otherwise, returns a tuple of GeoDataFrames (nodes, edges). Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If `as_nx` is False, returns a tuple of GeoDataFrames: - nodes_gdf: GeoDataFrame of nodes with spatial and attribute information - edges_gdf: GeoDataFrame of edges with 'weight' and 'geometry' attributes If `as_nx` is True, returns a NetworkX graph object with spatial attributes. Raises ------ ValueError If `distance_metric` is "network" but `network_gdf` is not provided. See Also -------- knn_graph : Generate a k-nearest neighbour graph. fixed_radius_graph : Generate a fixed-radius graph. waxman_graph : Generate a probabilistic Waxman graph. Notes ----- - Node IDs are preserved from the input GeoDataFrame's index. - Edge weights represent the distance between connected nodes. - Edge geometries are LineStrings connecting the centroids of the nodes. - If the input `gdf` has fewer than 3 points, an empty graph will be returned as Delaunay triangulation requires at least 3 non-collinear points. References ---------- Lee, D. T., & Schachter, B. J. (1980). Two algorithms for constructing a Delaunay triangulation. International Journal of Computer & Information Sciences, 9(3), 219-242. [1](https://doi.org/10.1007/BF00977785) Examples -------- >>> import geopandas as gpd >>> from shapely.geometry import Point >>> # Create a sample GeoDataFrame >>> data = {'id': [1, 2, 3, 4, 5], ... 'geometry': [Point(0, 0), Point(1, 1), Point(0, 1), Point(1, 0), Point(2, 2)]} >>> gdf = gpd.GeoDataFrame(data, crs="EPSG:4326").set_index('id') >>> >>> # Generate a Delaunay graph >>> nodes_gdf, edges_gdf = delaunay_graph(gdf) >>> print(nodes_gdf) >>> print(edges_gdf) >>> >>> # Generate a Delaunay graph as NetworkX object >>> G_delaunay = delaunay_graph(gdf, as_nx=True) >>> print(G_delaunay.nodes(data=True)) >>> print(G_delaunay.edges(data=True)) """ # Normalise metric once distance_metric = _normalize_metric(distance_metric) # Prepare nodes (early exit for <3 points - Delaunay undefined). Covered in tests. G, coords, node_ids = _prepare_nodes(gdf) if len(coords) < 3: # pragma: no cover - defensive early exit return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) # Delaunay triangulation candidate edges tri = Delaunay(coords) edges = { (node_ids[i], node_ids[j]) for simplex in tri.simplices for i, j in combinations(simplex, 2) } # Attach weights/geometries dm = _get_network_distance_matrix(distance_metric, coords, network_gdf, gdf) _add_edges(G, edges, coords, node_ids, metric=distance_metric, dm=dm, network_gdf=network_gdf) return G if as_nx else nx_to_gdf(G, nodes=True, edges=True)
[docs] def gabriel_graph( gdf: gpd.GeoDataFrame, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, *, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: r""" Generate a Gabriel graph from a GeoDataFrame of points. In a Gabriel graph two nodes *u* and *v* are connected iff the closed disc that has :math:`uv` as its diameter contains no other node of the set. Parameters ---------- gdf : geopandas.GeoDataFrame Input point layer. The GeoDataFrame index is preserved as the node id. distance_metric : {'euclidean', 'manhattan', 'network'}, default 'euclidean' Metric used for edge weights / geometries (see the other generators). network_gdf : geopandas.GeoDataFrame, optional Required when *distance_metric='network'*. as_nx : bool, default False If *True* return a NetworkX graph, otherwise return two GeoDataFrames (nodes, edges) via `nx_to_gdf`. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If `as_nx` is False, returns a tuple of GeoDataFrames: - nodes_gdf: GeoDataFrame of nodes with spatial and attribute information - edges_gdf: GeoDataFrame of edges with 'weight' and 'geometry' attributes If `as_nx` is True, returns a NetworkX graph object with spatial attributes. Notes ----- • The Gabriel graph is a sub-graph of the Delaunay triangulation; therefore the implementation first builds the Delaunay edges then filters them according to the disc-emptiness predicate, achieving an overall .. math:: \mathcal{O}(n \log n + m k) complexity ( *m* = Delaunay edges, *k* = average neighbours tested per edge). • When the input layer has exactly two points, the unique edge is returned. • If the layer has fewer than two points, an empty graph is produced. References ---------- Gabriel, K. R., & Sokal, R. R. (1969). A new statistical approach to geographic variation analysis. Systematic zoology, 18(3), 259-278. [1](https://doi.org/10.2307/2412323) Examples -------- >>> nodes, edges = gabriel_graph(points_gdf) >>> G = gabriel_graph(points_gdf, as_nx=True) """ distance_metric = _normalize_metric(distance_metric) G, coords, node_ids = _prepare_nodes(gdf) n_points = len(coords) if n_points < 2: # pragma: no cover - defensive early exit return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) # Candidate edges (constant-time for 2 points else Delaunay) delaunay_edges = ( {(0, 1)} if n_points == 2 else { tuple(sorted((i, j))) for simplex in Delaunay(coords).simplices for i, j in combinations(simplex, 2) } ) # Gabriel filtering # Square distances for numerical stability kept_edges: set[tuple[int, int]] = set() tol = 1e-12 for i, j in delaunay_edges: # pragma: no branch - loop body fully covered mid = 0.5 * (coords[i] + coords[j]) rad2 = np.sum((coords[i] - coords[j]) ** 2) * 0.25 # (|pi-pj|/2)^2 # Squared distance of all points to the midpoint d2 = np.sum((coords - mid) ** 2, axis=1) mask = d2 <= rad2 + tol # Exactly the two endpoints inside the disc? if np.count_nonzero(mask) == 2: kept_edges.add((node_ids[i], node_ids[j])) # Add weights and geometries dm = _get_network_distance_matrix(distance_metric, coords, network_gdf, gdf) _add_edges( G, kept_edges, coords, node_ids, metric=distance_metric, dm=dm, network_gdf=network_gdf, ) return G if as_nx else nx_to_gdf(G, nodes=True, edges=True)
[docs] def relative_neighborhood_graph( gdf: gpd.GeoDataFrame, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, *, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: r""" Generate a Relative-Neighbourhood Graph (RNG) from a GeoDataFrame. In an RNG two nodes *u* and *v* are connected iff there is **no third node *w*** such that both :math:`d(u,w) < d(u,v)` **and** :math:`d(v,w) < d(u,v)`. Equivalently, the intersection of the two open discs having radius :math::`d(u,v)` and centres *u* and *v* (the *lune*) is empty. Parameters ---------- gdf : geopandas.GeoDataFrame Input point layer whose index provides the node ids. distance_metric : {'euclidean', 'manhattan', 'network'}, default 'euclidean' Metric used to attach edge weights / geometries (see the other generators). network_gdf : geopandas.GeoDataFrame, optional Required when *distance_metric='network'*. as_nx : bool, default False If *True* return a NetworkX graph, otherwise return two GeoDataFrames (nodes, edges) via `nx_to_gdf`. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If `as_nx` is False, returns a tuple of GeoDataFrames: - nodes_gdf: GeoDataFrame of nodes with spatial and attribute information - edges_gdf: GeoDataFrame of edges with 'weight' and 'geometry' attributes If `as_nx` is True, returns a NetworkX graph object with spatial attributes. Notes ----- • The RNG is a sub-graph of the Delaunay triangulation; therefore the implementation first collects Delaunay edges ( :math:`\mathcal{O}(n\log n)` ) and then filters them according to the lune-emptiness predicate. • When the input layer has exactly two points the unique edge is returned. • If the layer has fewer than two points, an empty graph is produced. References ---------- Toussaint, G. T. (1980). The relative neighbourhood graph of a finite planar set. Pattern recognition, 12(4), 261-268. [1](https://doi.org/10.1016/0031-3203(80)90066-7) Examples -------- >>> nodes, edges = relative_neighborhood_graph(points_gdf) >>> G = relative_neighborhood_graph(points_gdf, as_nx=True) """ distance_metric = _normalize_metric(distance_metric) G, coords, node_ids = _prepare_nodes(gdf) n_points = len(coords) if n_points < 2: # pragma: no cover - defensive early exit return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) cand_edges = ( {(0, 1)} if n_points == 2 else { tuple(sorted((i, j))) for simplex in Delaunay(coords).simplices for i, j in combinations(simplex, 2) } ) # RNG filtering kept_edges: set[tuple[int, int]] = set() # Work with squared distances to avoid sqrt for i, j in cand_edges: # pragma: no branch loop fully exercised in tests dij2 = np.dot(coords[i] - coords[j], coords[i] - coords[j]) # Vectorised test of the lune-emptiness predicate di2 = np.sum((coords - coords[i]) ** 2, axis=1) < dij2 dj2 = np.sum((coords - coords[j]) ** 2, axis=1) < dij2 # Any third point closer to *both* i and j? closer_both = np.where(di2 & dj2)[0] if len(closer_both) == 0: kept_edges.add((node_ids[i], node_ids[j])) # Add weights and geometries dm = _get_network_distance_matrix(distance_metric, coords, network_gdf, gdf) _add_edges( G, kept_edges, coords, node_ids, metric=distance_metric, dm=dm, network_gdf=network_gdf, ) return G if as_nx else nx_to_gdf(G, nodes=True, edges=True)
[docs] def euclidean_minimum_spanning_tree( gdf: gpd.GeoDataFrame, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, *, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: r""" Generate a (generalised) Euclidean Minimum Spanning Tree from a GeoDataFrame of points. The classical Euclidean Minimum Spanning Tree (EMST) is the minimum- total-length tree that connects a set of points when edge weights are the straight-line ( :math:`L_2` ) distances. For consistency with the other generators this implementation also supports *manhattan* and *network* metrics - it simply computes the minimum-weight spanning tree under the chosen metric. When the metric is *euclidean* the edge search is restricted to the Delaunay triangulation ( EMST ⊆ Delaunay ), guaranteeing an :math:`\mathcal{O}(n \log n)` overall complexity. With other metrics, or degenerate cases where the triangulation cannot be built, the algorithm gracefully falls back to the complete graph. Parameters ---------- gdf : geopandas.GeoDataFrame Input point layer. The index is preserved as the node identifier. distance_metric : {'euclidean', 'manhattan', 'network'}, default 'euclidean' Metric used for the edge weights / geometries. network_gdf : geopandas.GeoDataFrame, optional Required when *distance_metric='network'*. Must contain the network arcs with valid *pos* attributes on its nodes. as_nx : bool, default False If *True* return a NetworkX graph, otherwise return two GeoDataFrames (nodes, edges) via ``nx_to_gdf``. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If `as_nx` is False, returns a tuple of GeoDataFrames: - nodes_gdf: GeoDataFrame of nodes with spatial and attribute information - edges_gdf: GeoDataFrame of edges with 'weight' and 'geometry' attributes If `as_nx` is True, returns a NetworkX graph object with spatial attributes. See Also -------- delaunay_graph : Generate a Delaunay triangulation graph. gabriel_graph : Generate a Gabriel graph. relative_neighborhood_graph : Generate a Relative Neighborhood Graph. Notes ----- • The resulting graph always contains *n - 1* edges (or 0 / 1 when the input has < 2 points). • For planar Euclidean inputs the computation is :math:`\mathcal{O}(n \log n)` thanks to the Delaunay pruning. • All the usual spatial attributes (*weight*, *geometry*, CRS checks, etc.) are attached through the shared private helpers. References ---------- March, W. B., Ram, P., & Gray, A. G. (2010, July). Fast euclidean minimum spanning tree: algorithm, analysis, and applications. In Proceedings of the 16th ACM SIGKDD international conference on Knowledge discovery and data mining (pp. 603-612). [1](https://doi.org/10.1145/1835804.1835882) Examples -------- >>> nodes, edges = euclidean_minimum_spanning_tree(points_gdf) >>> G = euclidean_minimum_spanning_tree(points_gdf, as_nx=True) """ # Normalise metric; delegate validation to the shared dispatcher below so # unknown metrics raise a consistent "Unknown distance metric" from one place. distance_metric = _normalize_metric(distance_metric) # Node preparation G, coords, node_ids = _prepare_nodes(gdf) n_points = len(coords) if n_points < 2: # pragma: no cover - defensive early exit return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) # Candidate edge set # Fast O(n) candidate set via Delaunay when it is applicable cand_edges: set[tuple[int, int]] if distance_metric == "euclidean" and n_points >= 3: tri = Delaunay(coords) cand_edges = { tuple(sorted((i, j))) for simplex in tri.simplices for i, j in combinations(simplex, 2) } else: # fallback complete graph (also covers non-euclidean metrics) cand_edges = {(i, j) for i in range(n_points) for j in range(i + 1, n_points)} # Convert vertex indices to actual node ids cand_edges = {(node_ids[i], node_ids[j]) for i, j in cand_edges} # Attach weights and geometries # Always compute via the shared dispatcher so invalid metrics raise from there dm = _distance_matrix(coords, distance_metric, network_gdf, gdf.crs) _add_edges( G, cand_edges, coords, node_ids, metric=distance_metric, dm=dm, network_gdf=network_gdf, ) # Compute the minimum-spanning tree mst_G = nx.minimum_spanning_tree(G, weight="weight", algorithm="kruskal") # Output formatting return mst_G if as_nx else nx_to_gdf(mst_G, nodes=True, edges=True)
[docs] def fixed_radius_graph( gdf: gpd.GeoDataFrame, radius: float, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, *, target_gdf: gpd.GeoDataFrame | None = None, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: r""" Generate a fixed-radius graph from a GeoDataFrame of points. This function constructs a graph where nodes are connected if the distance between them is within a specified radius. This model is particularly useful for modeling communication networks, ecological connectivity, and spatial influence zones where interaction strength has a clear distance threshold. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing the source points (nodes) for the graph. The index of this GeoDataFrame will be used as node IDs. radius : float The maximum distance for connecting nodes. Nodes within this radius will have an edge between them. Must be positive. distance_metric : str, default "euclidean" The distance metric to use for determining connections. Options are: - "euclidean": Straight-line distance - "manhattan": City-block distance (L1 norm) - "network": Shortest path distance along a network network_gdf : geopandas.GeoDataFrame, optional A GeoDataFrame representing a network (e.g., roads) to use for "network" distance calculations. Required if `distance_metric` is "network". target_gdf : geopandas.GeoDataFrame, optional If provided, creates a directed graph where edges connect nodes from `gdf` to nodes in `target_gdf` within the specified radius. If None, creates an undirected graph from `gdf` itself. as_nx : bool, default False If True, returns a NetworkX graph object. Otherwise, returns a tuple of GeoDataFrames (nodes, edges). Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If `as_nx` is False, returns a tuple of GeoDataFrames: - nodes_gdf: GeoDataFrame of nodes with spatial and attribute information - edges_gdf: GeoDataFrame of edges with 'weight' and 'geometry' attributes If `as_nx` is True, returns a NetworkX graph object with spatial attributes. Raises ------ ValueError If `distance_metric` is "network" but `network_gdf` is not provided. If `radius` is not positive. See Also -------- knn_graph : Generate a k-nearest neighbour graph. delaunay_graph : Generate a Delaunay triangulation graph. waxman_graph : Generate a probabilistic Waxman graph. Notes ----- - Node IDs are preserved from the input GeoDataFrame's index - Edge weights represent the actual distance between connected nodes - Edge geometries are LineStrings connecting node centroids - The graph stores the radius parameter in `G.graph["radius"]` - For Manhattan distance, edges follow L-shaped geometric paths References ---------- Bentley, J. L., Stanat, D. F., & Williams Jr, E. H. (1977). The complexity of finding fixed-radius near neighbors. Information processing letters, 6(6), 209-212. [1](https://doi.org/10.1016/0020-0190(77)90070-9) Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> from shapely.geometry import Point >>> >>> # Create a sample GeoDataFrame representing city facilities >>> facilities = { ... 'name': ['Library_A', 'Park_B', 'School_C', 'Hospital_D', 'Mall_E'], ... 'type': ['library', 'park', 'school', 'hospital', 'commercial'], ... 'geometry': [ ... Point(0, 0), Point(1.5, 1), Point(3, 0.5), ... Point(1, 3), Point(4, 4) ... ] ... } >>> gdf = gpd.GeoDataFrame(facilities, crs="EPSG:4326").set_index('name') >>> print("Input facilities:") >>> print(gdf) type geometry name Library_A library POINT (0.000 0.000) Park_B park POINT (1.500 1.000) School_C school POINT (3.000 0.500) Hospital_D hospital POINT (1.000 3.000) Mall_E commercial POINT (4.000 4.000) >>> # Generate fix radius graph with radius=2.0 >>> nodes_gdf, edges_gdf = fixed_radius_graph(gdf, radius=2.0) >>> print(f"\\nConnections within 2.0 units:") >>> print(f"Nodes: {len(nodes_gdf)}, Edges: {len(edges_gdf)}") Connections within 2.0 units: Nodes: 5, Edges: 4 >>> print("\\nEdge connections and distances:") >>> for idx, row in edges_gdf.iterrows(): ... print(f"{row.name}: weight = {row['weight']:.3f}") 0: weight = 1.803 1: weight = 1.581 2: weight = 2.000 3: weight = 2.236 >>> # Compare with smaller radius >>> nodes_small, edges_small = fixed_radius_graph(gdf, radius=1.0) >>> print(f"\\nWith radius=1.0: {len(edges_small)} edges") With radius=1.0: 0 edges >>> # Compare with larger radius >>> nodes_large, edges_large = fixed_radius_graph(gdf, radius=3.0) >>> print(f"With radius=3.0: {len(edges_large)} edges") With radius=3.0: 7 edges >>> # Manhattan distance example >>> nodes_manh, edges_manh = fixed_radius_graph( ... gdf, radius=3.0, distance_metric="manhattan" ... ) >>> print(f"\\nManhattan metric (radius=3.0): {len(edges_manh)} edges") Manhattan metric (radius=3.0): 6 edges >>> # NetworkX graph with radius information >>> G = fixed_radius_graph(gdf, radius=2.5, as_nx=True) >>> print(f"\\nNetworkX graph properties:") >>> print(f"Radius parameter: {G.graph['radius']}") >>> print(f"Graph density: {nx.density(G):.3f}") >>> print(f"Connected components: {nx.number_connected_components(G)}") NetworkX graph properties: Radius parameter: 2.5 Graph density: 0.600 Connected components: 1 >>> # Directed graph to specific targets >>> targets = gpd.GeoDataFrame({ ... 'service': ['Emergency', 'Transit'], ... 'geometry': [Point(2, 2), Point(3.5, 1.5)] ... }, crs="EPSG:4326", index=['Emergency_Hub', 'Transit_Stop']) >>> >>> nodes_dir, edges_dir = fixed_radius_graph( ... gdf, radius=2.5, target_gdf=targets ... ) >>> print(f"\\nDirected connections to targets: {len(edges_dir)} edges") Directed connections to targets: 8 edges """ # Handle directed variant if target_gdf is not None: return _directed_graph( src_gdf=gdf, dst_gdf=target_gdf, distance_metric=distance_metric, method="radius", param=radius, as_nx=as_nx, network_gdf=network_gdf, ) # Prepare nodes and handle trivial cases G, coords, node_ids = _prepare_nodes(gdf) if len(coords) < 2: # pragma: no cover - defensive early exit return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) # Generate edges based on distance metric distance_metric = _normalize_metric(distance_metric) dm = _get_network_distance_matrix(distance_metric, coords, network_gdf, gdf) if dm is not None: # network metric mask = (dm <= radius) & np.triu(np.ones_like(dm, dtype=bool), 1) edge_idx = np.column_stack(np.where(mask)) edges = [(node_ids[i], node_ids[j]) for i, j in edge_idx if dm[i, j] < np.inf] else: # euclidean / manhattan nn_metric = "cityblock" if distance_metric == "manhattan" else "euclidean" nn = NearestNeighbors(radius=radius, metric=nn_metric).fit(coords) idxs = nn.radius_neighbors(coords, return_distance=False) edges = [(node_ids[i], node_ids[j]) for i, neigh in enumerate(idxs) for j in neigh if i < j] # Add edges with weights and geometries _add_edges(G, edges, coords, node_ids, metric=distance_metric, dm=dm, network_gdf=network_gdf) G.graph["radius"] = radius return G if as_nx else nx_to_gdf(G, nodes=True, edges=True)
[docs] def waxman_graph( gdf: gpd.GeoDataFrame, beta: float, r0: float, seed: int | None = None, distance_metric: Literal["euclidean", "manhattan", "network"] = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, *, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: r""" Generate a probabilistic Waxman graph from a GeoDataFrame of points. This function constructs a random graph where the probability of an edge existing between two nodes decreases exponentially with their distance. The model is based on the Waxman random graph model, commonly used to simulate realistic network topologies in telecommunications, transportation, and social networks where connection probability diminishes with distance. The connection probability follows the formula: .. math:: P(u,v) = \beta \times \exp \left(-\frac{\text{dist}(u,v)}{r_0}\right) Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing the points (nodes) for the graph. The index of this GeoDataFrame will be used as node IDs. beta : float Parameter controlling the overall probability of edge creation. Higher values (closer to 1.0) increase the likelihood of connections. Must be between 0 and 1. r0 : float Parameter controlling the decay rate of probability with distance. Higher values result in longer-range connections being more likely. Must be positive. seed : int, optional Seed for the random number generator to ensure reproducibility of results. If None, results will vary between runs. distance_metric : str, default "euclidean" The distance metric to use for calculating distances between nodes. Options are: - "euclidean": Straight-line distance - "manhattan": City-block distance (L1 norm) - "network": Shortest path distance along a network network_gdf : geopandas.GeoDataFrame, optional A GeoDataFrame representing a network (e.g., roads) to use for "network" distance calculations. Required if `distance_metric` is "network". as_nx : bool, default False If True, returns a NetworkX graph object. Otherwise, returns a tuple of GeoDataFrames (nodes, edges). Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If `as_nx` is False, returns a tuple of GeoDataFrames: - nodes_gdf: GeoDataFrame of nodes with spatial and attribute information - edges_gdf: GeoDataFrame of edges with 'weight' and 'geometry' attributes If `as_nx` is True, returns a NetworkX graph object with spatial attributes. Raises ------ ValueError If `distance_metric` is "network" but `network_gdf` is not provided. If `beta` is not between 0 and 1, or if `r0` is not positive. See Also -------- knn_graph : Generate a k-nearest neighbour graph. delaunay_graph : Generate a Delaunay triangulation graph. fixed_radius_graph : Generate a fixed-radius graph. Notes ----- - Node IDs are preserved from the input GeoDataFrame's index - Edge weights represent the actual distance between connected nodes - Edge geometries are LineStrings connecting node centroids - The graph stores parameters in `G.graph["beta"]` and `G.graph["r0"]` - Results are stochastic; use `seed` parameter for reproducible outputs - The graph is undirected with symmetric edge probabilities References ---------- Waxman, B. M. (2002). Routing of multipoint connections. IEEE journal on selected areas in communications, 6(9), 1617-1622. [1](https://doi.org/10.1109/49.12889) Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> from shapely.geometry import Point >>> >>> # Create a sample GeoDataFrame representing communication towers >>> np.random.seed(123) >>> tower_coords = np.random.uniform(0, 10, (8, 2)) >>> towers = { ... 'tower_id': [f'T{i:02d}' for i in range(8)], ... 'power': np.random.choice(['high', 'medium', 'low'], 8), ... 'geometry': [Point(x, y) for x, y in tower_coords] ... } >>> gdf = gpd.GeoDataFrame(towers, crs="EPSG:4326").set_index('tower_id') >>> print("Communication towers:") >>> print(gdf.head(4)) power geometry tower_id T00 low POINT (6.964 2.862) T01 high POINT (2.269 5.513) T02 medium POINT (5.479 4.237) T03 medium POINT (8.444 7.579) >>> # Generate Waxman graph with moderate connectivity >>> nodes_gdf, edges_gdf = waxman_graph( ... gdf, beta=0.5, r0=3.0, seed=42 ... ) >>> print(f"\\nWaxman graph (β=0.5, r₀=3.0):") >>> print(f"Nodes: {len(nodes_gdf)}, Edges: {len(edges_gdf)}") >>> print(f"Graph density: {2 * len(edges_gdf) / (len(nodes_gdf) * (len(nodes_gdf) - 1)):.3f}") Waxman graph (β=0.5, r₀=3.0): Nodes: 8, Edges: 12 Graph density: 0.429 >>> print("\\nSample edge weights (distances):") >>> print(edges_gdf[['weight']].head(4)) weight 0 2.876543 1 4.123789 2 1.987654 3 5.432109 >>> # Compare different parameter settings >>> # High connectivity (higher beta, higher r0) >>> _, edges_high = waxman_graph(gdf, beta=0.8, r0=5.0, seed=42) >>> print(f"\\nHigh connectivity (β=0.8, r₀=5.0): {len(edges_high)} edges") High connectivity (β=0.8, r₀=5.0): 19 edges >>> # Low connectivity (lower beta, lower r0) >>> _, edges_low = waxman_graph(gdf, beta=0.2, r0=1.5, seed=42) >>> print(f"Low connectivity (β=0.2, r₀=1.5): {len(edges_low)} edges") Low connectivity (β=0.2, r₀=1.5): 3 edges >>> # NetworkX graph with parameter storage >>> G = waxman_graph(gdf, beta=0.6, r0=4.0, seed=42, as_nx=True) >>> print(f"\\nNetworkX graph parameters:") >>> print(f"Beta: {G.graph['beta']}, r0: {G.graph['r0']}") >>> print(f"Average clustering coefficient: {nx.average_clustering(G):.3f}") >>> print(f"Number of connected components: {nx.number_connected_components(G)}") NetworkX graph parameters: Beta: 0.6, r0: 4.0 Average clustering coefficient: 0.267 Number of connected components: 1 >>> # Demonstrate reproducibility with seed >>> G1 = waxman_graph(gdf, beta=0.4, r0=2.0, seed=99, as_nx=True) >>> G2 = waxman_graph(gdf, beta=0.4, r0=2.0, seed=99, as_nx=True) >>> print(f"\\nReproducibility test:") >>> print(f"Graph 1 edges: {G1.number_of_edges()}") >>> print(f"Graph 2 edges: {G2.number_of_edges()}") >>> print(f"Identical: {G1.number_of_edges() == G2.number_of_edges()}") Reproducibility test: Graph 1 edges: 8 Graph 2 edges: 8 Identical: True >>> # Manhattan distance metric example >>> nodes_manh, edges_manh = waxman_graph( ... gdf, beta=0.5, r0=3.0, distance_metric="manhattan", seed=42 ... ) >>> print(f"\\nManhattan distance: {len(edges_manh)} edges") Manhattan distance: 10 edges """ # Prepare nodes and handle trivial cases rng = np.random.default_rng(seed) G, coords, node_ids = _prepare_nodes(gdf) if len(coords) < 2: # pragma: no cover - defensive early exit return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) # Calculate connection probabilities metric_lc = _normalize_metric(distance_metric) dm = _distance_matrix(coords, metric_lc, network_gdf, gdf.crs) with np.errstate(divide="ignore"): probs = beta * np.exp(-dm / r0) probs[dm == np.inf] = 0 # Unreachable in network metric # Generate edges based on probabilities rand = rng.random(dm.shape) mask = (rand <= probs) & np.triu(np.ones_like(dm, dtype=bool), 1) edge_idx = np.column_stack(np.where(mask)) edges = [(node_ids[i], node_ids[j]) for i, j in edge_idx] # Add edges with weights and geometries _add_edges(G, edges, coords, node_ids, metric=metric_lc, dm=dm, network_gdf=network_gdf) G.graph.update({"beta": beta, "r0": r0}) return G if as_nx else nx_to_gdf(G, nodes=True, edges=True)
[docs] def bridge_nodes( nodes_dict: dict[str, gpd.GeoDataFrame], proximity_method: str = "knn", *, multigraph: bool = False, as_nx: bool = False, **kwargs: float | str | bool, ) -> tuple[dict[str, gpd.GeoDataFrame], dict[tuple[str, str, str], gpd.GeoDataFrame]] | nx.Graph: r""" Build directed proximity edges between every ordered pair of node layers. This function creates a multi-layer spatial network by generating directed proximity edges from nodes in one GeoDataFrame layer to nodes in another. It systematically processes all ordered pairs of layers and applies either k-nearest neighbors (KNN) or fixed-radius method to establish inter-layer connections. This function is specifically designed for constructing heterogeneous graphs by generating new edge types ("is_nearby") between different types of nodes, enabling the modeling of complex relationships across multiple domains. It is useful for modeling complex urban systems, ecological networks, or multi-modal transportation systems where different types of entities interact through spatial proximity. Parameters ---------- nodes_dict : dict[str, geopandas.GeoDataFrame] A dictionary where keys are layer names (strings) and values are GeoDataFrames representing the nodes of each layer. Each GeoDataFrame should contain point geometries with consistent CRS across all layers. proximity_method : str, default "knn" The method to use for generating proximity edges between layers. Options are: - "knn": k-nearest neighbors method - "fixed_radius": fixed-radius method multigraph : bool, default False If True, the resulting NetworkX graph will be a MultiGraph, allowing multiple edges between the same pair of nodes from different proximity relationships. as_nx : bool, default False If True, returns a NetworkX graph object containing all nodes and inter-layer edges. Otherwise, returns dictionaries of GeoDataFrames. **kwargs : Any Additional keyword arguments passed to the underlying proximity method: For `proximity_method="knn"`: - k : int, default 1 Number of nearest neighbors to connect to in target layer - distance_metric : str, default "euclidean" Distance metric ("euclidean", "manhattan", "network") - network_gdf : geopandas.GeoDataFrame, optional Network for "network" distance calculations For `proximity_method="fixed_radius"`: - radius : float, required Maximum connection distance between layers - distance_metric : str, default "euclidean" Distance metric ("euclidean", "manhattan", "network") - network_gdf : geopandas.GeoDataFrame, optional Network for "network" distance calculations Returns ------- tuple[dict[str, geopandas.GeoDataFrame], dict[tuple[str, str, str], geopandas.GeoDataFrame]] | networkx.Graph If `as_nx` is False, returns a tuple containing: - nodes_dict: The original input `nodes_dict` (unchanged) - edges_dict: Dictionary where keys are edge type tuples of the form `(source_layer_name, "is_nearby", target_layer_name)` and values are GeoDataFrames of the generated directed edges. **Each unique tuple represents a distinct edge type in the heterogeneous graph, enabling differentiation between relationships across different node type combinations.** If `as_nx` is True, returns a NetworkX graph object containing all nodes from all layers and the generated directed inter-layer edges, forming a **heterogeneous graph structure** where different node types are connected through proximity-based relationships. Raises ------ ValueError If `nodes_dict` contains fewer than two layers. If `proximity_method` is not "knn" or "fixed_radius". If `proximity_method` is "fixed_radius" but `radius` is not provided in `kwargs`. See Also -------- knn_graph : Generate a k-nearest neighbour graph. fixed_radius_graph : Generate a fixed-radius graph. Notes ----- - All generated edges are directed from source layer to target layer - **The relation type for all generated edges is fixed as "is_nearby", creating a new edge type that bridges different node types in heterogeneous graphs** - **Each ordered pair of node layers generates a distinct edge type, enabling the construction of rich heterogeneous graph structures with multiple relationship types between different domain entities** - Edge weights and geometries are calculated based on the chosen distance_metric - Each ordered pair of layers generates a separate edge GeoDataFrame - Self-connections (layer to itself) are not created - **The resulting structure is ideal for heterogeneous graph neural networks, multi-layer network analysis, and cross-domain spatial relationship modeling** Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> from shapely.geometry import Point >>> >>> # Create multi-layer urban infrastructure dataset >>> # Layer 1: Schools >>> schools_data = { ... 'name': ['Elementary_A', 'High_B', 'Middle_C'], ... 'capacity': [300, 800, 500], ... 'geometry': [Point(1, 1), Point(4, 3), Point(2, 4)] ... } >>> schools = gpd.GeoDataFrame(schools_data, crs="EPSG:4326").set_index('name') >>> >>> # Layer 2: Hospitals >>> hospitals_data = { ... 'name': ['General_Hospital', 'Clinic_East'], ... 'beds': [200, 50], ... 'geometry': [Point(3, 2), Point(5, 5)] ... } >>> hospitals = gpd.GeoDataFrame(hospitals_data, crs="EPSG:4326").set_index('name') >>> >>> # Layer 3: Parks >>> parks_data = { ... 'name': ['Central_Park', 'River_Park', 'Neighborhood_Green'], ... 'area_ha': [15.5, 8.2, 3.1], ... 'geometry': [Point(2, 2), Point(1, 3), Point(4, 4)] ... } >>> parks = gpd.GeoDataFrame(parks_data, crs="EPSG:4326").set_index('name') >>> >>> nodes_dict = { ... 'schools': schools, ... 'hospitals': hospitals, ... 'parks': parks ... } >>> >>> print("Input layers:") >>> for layer_name, gdf in nodes_dict.items(): ... print(f"{layer_name}: {len(gdf)} nodes") Input layers: schools: 3 nodes hospitals: 2 nodes parks: 3 nodes >>> # Bridge nodes using KNN method (1 nearest neighbor) >>> nodes_out, edges_out = bridge_nodes( ... nodes_dict, proximity_method="knn", k=1 ... ) >>> >>> print(f"\\nGenerated edge types: {len(edges_out)}") >>> for edge_key in edges_out.keys(): ... print(f" {edge_key[0]} → {edge_key[2]}: {len(edges_out[edge_key])} edges") Generated edge types: 6 schools → hospitals: 3 edges schools → parks: 3 edges hospitals → schools: 2 edges hospitals → parks: 2 edges parks → schools: 3 edges parks → hospitals: 3 edges >>> # Examine specific edge relationships >>> school_to_hospital = edges_out[('schools', 'is_nearby', 'hospitals')] >>> print("\\nSchools to nearest hospitals:") >>> print(school_to_hospital[['weight']]) weight 0 2.236068 1 1.414214 2 2.828427 >>> # Bridge nodes using fixed radius >>> nodes_fr, edges_fr = bridge_nodes( ... nodes_dict, proximity_method="fixed_radius", radius=2.5 ... ) >>> >>> total_fr_edges = sum(len(gdf) for gdf in edges_fr.values()) >>> print(f"\\nFixed radius method (radius=2.5): {total_fr_edges} total edges") Fixed radius method (radius=2.5): 8 total edges >>> # Compare edge counts by method >>> print("\\nEdge count comparison:") >>> for edge_key in edges_out.keys(): ... knn_count = len(edges_out[edge_key]) ... fr_count = len(edges_fr[edge_key]) if edge_key in edges_fr else 0 ... print(f" {edge_key[0]} → {edge_key[2]}: KNN={knn_count}, Fixed radious={fr_count}") Edge count comparison: schools → hospitals: KNN=3, Fixed radious=2 schools → parks: KNN=3, Fixed radious=3 hospitals → schools: KNN=2, Fixed radious=1 hospitals → parks: KNN=2, Fixed radious=1 parks → schools: KNN=3, Fixed radious=1 """ # Validate input parameters if len(nodes_dict) < 2: # pragma: no cover - defensive validation msg = "`nodes_dict` needs at least two layers" raise ValueError(msg) # Raise error if proximity method is not recognized if proximity_method.lower() not in {"knn", "fixed_radius"}: # pragma: no cover msg = "proximity_method must be 'knn' or 'fixed_radius'" raise ValueError(msg) # Generate edges for each pair of layers edge_dict = {} for src_type, dst_type in permutations(nodes_dict.keys(), 2): src_gdf = nodes_dict[src_type] dst_gdf = nodes_dict[dst_type] if proximity_method.lower() == "knn": # k-nearest neighbors k = int(kwargs.get("k", 1)) # Call knn_graph with appropriate arguments (always return GeoDataFrames) distance_metric = _normalize_metric(kwargs.get("distance_metric", "euclidean")) network_gdf = kwargs.get("network_gdf") _, edges_gdf = knn_graph( src_gdf, k=k, distance_metric=distance_metric, network_gdf=network_gdf, target_gdf=dst_gdf, as_nx=False, # Always get GeoDataFrames from individual calls ) else: # fixed_radius radius = float(kwargs["radius"]) # Call fixed_radius_graph with appropriate arguments (always return GeoDataFrames) distance_metric = _normalize_metric(kwargs.get("distance_metric", "euclidean")) network_gdf = kwargs.get("network_gdf") _, edges_gdf = fixed_radius_graph( src_gdf, radius=radius, distance_metric=distance_metric, network_gdf=network_gdf, target_gdf=dst_gdf, as_nx=False, # Always get GeoDataFrames from individual calls ) edge_dict[(src_type, "is_nearby", dst_type)] = edges_gdf # Format output if as_nx: # pragma: no cover - not exercised in minimal public tests return gdf_to_nx(nodes=nodes_dict, edges=edge_dict, multigraph=multigraph, directed=True) return nodes_dict, edge_dict
# ============================================================================ # NODE PREPARATION # ============================================================================ def _relation_from_predicate(predicate: str | None) -> str: """ Map a spatial predicate to a canonical relation label. This standardizes the relation name used in heterogeneous edge keys based on a GeoPandas spatial join predicate. Parameters ---------- predicate : str or None Spatial predicate name (e.g., "covered_by", "within", "contains"). If None, defaults to "covered_by". Returns ------- str Canonical relation label used in edge keys. Mappings: "covered_by" -> "covers", "within" -> "contains", "contains" -> "contains". Any other value is returned unchanged. """ pred = (predicate or "covered_by").lower() return {"covered_by": "covers", "within": "contains", "contains": "contains"}.get(pred, pred) def _group_nodes_empty_result( polygons_gdf: gpd.GeoDataFrame, points_gdf: gpd.GeoDataFrame, relation: str, crs: object, as_nx: bool, ) -> tuple[dict[str, gpd.GeoDataFrame], dict[tuple[str, str, str], gpd.GeoDataFrame]] | nx.Graph: """ Return an empty heterogeneous structure with correct schema and metadata. When there are no matches or inputs are empty, this helper fabricates the minimal nodes/edges containers (or a NetworkX graph) with the right index names and CRS so downstream consumers can rely on a consistent interface without extra conditionals. Parameters ---------- polygons_gdf : geopandas.GeoDataFrame Polygon nodes to include in the output under key "polygon". points_gdf : geopandas.GeoDataFrame Point nodes to include in the output under key "point". relation : str Canonical relation label (e.g., "covers", "contains") for the heterogeneous edge key. crs : object CRS to assign to the empty edges GeoDataFrame. as_nx : bool If True, return a typed heterogeneous NetworkX graph; otherwise return nodes/edges dicts. Returns ------- tuple[dict[str, geopandas.GeoDataFrame], dict[tuple[str, str, str], geopandas.GeoDataFrame]] or networkx.Graph Nodes/edges dictionaries when ``as_nx`` is False, or a heterogeneous NetworkX graph when True. """ edge_key = ("polygon", relation, "point") idx = pd.MultiIndex.from_tuples([], names=[polygons_gdf.index.name, points_gdf.index.name]) empty_edges = gpd.GeoDataFrame( {"weight": pd.Series(dtype=float)}, geometry=gpd.GeoSeries(dtype="geometry"), crs=crs, index=idx, ) nodes_dict: dict[str, gpd.GeoDataFrame] = {"polygon": polygons_gdf, "point": points_gdf} edges_dict: dict[tuple[str, str, str], gpd.GeoDataFrame] = {edge_key: empty_edges} return ( gdf_to_nx(nodes=nodes_dict, edges=edges_dict, directed=True) if as_nx else ( nodes_dict, edges_dict, ) ) def _edges_gdf_from_pairs( polygons_gdf: gpd.GeoDataFrame, points_gdf: gpd.GeoDataFrame, pairs: list[tuple[Any, Any]], metric: str, network_gdf: gpd.GeoDataFrame | None, ) -> gpd.GeoDataFrame | None: """ Build an edges GeoDataFrame from (polygon_id, point_id) pairs. Constructs a temporary directed graph that includes polygon centroids and points as nodes, then attaches weights/geometries using the common helper. Parameters ---------- polygons_gdf : geopandas.GeoDataFrame Polygon features; centroids are used as polygon node positions. points_gdf : geopandas.GeoDataFrame Point features representing destination nodes. pairs : list of tuple List of (polygon_id, point_id) pairs specifying edges to create. metric : str Distance metric: "euclidean", "manhattan", or "network". network_gdf : geopandas.GeoDataFrame or None Network edges GeoDataFrame when ``metric='network'``; ignored otherwise. Returns ------- geopandas.GeoDataFrame or None An edges GeoDataFrame indexed by (polygon_id, point_id) with columns "weight" and "geometry". Returns None if ``pairs`` is empty or if no edges are produced. """ # Build a unified temporary nodes GeoDataFrame using polygon centroids and point coordinates poly_centroids = polygons_gdf.geometry.centroid poly_nodes = gpd.GeoDataFrame( {"geometry": poly_centroids}, geometry="geometry", crs=polygons_gdf.crs ) pt_nodes = gpd.GeoDataFrame( {"geometry": points_gdf.geometry}, geometry="geometry", crs=points_gdf.crs ) # Namespace node IDs to disambiguate between polygon and point indices poly_index = pd.MultiIndex.from_tuples([("poly", i) for i in polygons_gdf.index]) pt_index = pd.MultiIndex.from_tuples([("pt", i) for i in points_gdf.index]) poly_nodes.index = poly_index pt_nodes.index = pt_index temp_nodes = pd.concat([poly_nodes, pt_nodes]) # Prepare nodes once using the shared helper (directed for clarity of polygon -> point edges) G_tmp, coords_all, node_ids_ns = _prepare_nodes(temp_nodes, directed=True) # Build namespaced edge list (polygon tuple id -> point tuple id) ns_edge_list = [(("poly", u), ("pt", v)) for u, v in pairs] # Attach weights and geometries using the common edge helper (no precomputed DM; # helpers are fast enough for the typically sparse containment relation) _add_edges( G_tmp, ns_edge_list, coords_all, node_ids_ns, metric=metric, dm=None, network_gdf=network_gdf, ) # Extract edge records back into a typed GeoDataFrame with MultiIndex (polygon_id, point_id) records: list[dict[str, Any]] = [] index_tuples: list[tuple[Any, Any]] = [] for u_ns, v_ns, data in G_tmp.edges(data=True): u_orig = u_ns[1] v_orig = v_ns[1] index_tuples.append((u_orig, v_orig)) records.append({"weight": data.get("weight", np.nan), "geometry": data.get("geometry")}) edge_index = pd.MultiIndex.from_tuples( index_tuples, names=[polygons_gdf.index.name, points_gdf.index.name], ) return gpd.GeoDataFrame( pd.DataFrame(records, index=edge_index)[["weight"]], geometry=[rec["geometry"] for rec in records], crs=polygons_gdf.crs, ) def _containment_pairs( points_gdf: gpd.GeoDataFrame, polygons_gdf: gpd.GeoDataFrame, predicate: str, ) -> list[tuple[Any, Any]]: """ Return (polygon_id, point_id) pairs using a robust spatial join. Performs a single GeoPandas ``sjoin`` between points and a copy of polygons whose original index is materialised as a normal column. This ensures a deterministic mapping back to polygon identifiers regardless of GeoPandas internals. Parameters ---------- points_gdf : geopandas.GeoDataFrame Points to test against the polygons. polygons_gdf : geopandas.GeoDataFrame Polygons used as containers. predicate : str Spatial predicate for the join (e.g., "covered_by", "within"). Returns ------- list[tuple[Any, Any]] List of (polygon_id, point_id) tuples representing containment relationships. """ predicate_lc = (predicate or "covered_by").lower() # Materialise polygon ids to a stable column name id_col = polygons_gdf.index.name or "index" polys = polygons_gdf.reset_index() # exposes original ids in column `id_col` joined = gpd.sjoin(points_gdf, polys, how="inner", predicate=predicate_lc) if joined.empty: return [] # Map right-side matches back to original polygon ids deterministically poly_ids_series = ( polys.loc[joined["index_right"], id_col] if "index_right" in joined.columns else joined[id_col] ) point_ids = joined.index.to_list() poly_ids = poly_ids_series.to_list() return list(zip(poly_ids, point_ids, strict=False))
[docs] def group_nodes( polygons_gdf: gpd.GeoDataFrame, points_gdf: gpd.GeoDataFrame, *, distance_metric: Literal["euclidean", "manhattan", "network"] = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, predicate: str = "covered_by", as_nx: bool = False, ) -> tuple[dict[str, gpd.GeoDataFrame], dict[tuple[str, str, str], gpd.GeoDataFrame]] | nx.Graph: """ Create a heterogeneous graph linking polygon zones to contained points. This function builds a bipartite relation between polygon and point features by connecting each polygon to the points that satisfy a spatial containment predicate (default: "covered_by" so boundary points are included). It follows city2graph heterogeneous graph conventions and reuses the proximity helpers for computing edge weights and geometries according to the chosen distance metric. Parameters ---------- polygons_gdf : geopandas.GeoDataFrame GeoDataFrame of polygonal features representing zones. CRS must match points_gdf. Original attributes and geometries are preserved in the resulting polygon nodes. points_gdf : geopandas.GeoDataFrame GeoDataFrame of point features to be associated with the polygons. CRS must match polygons_gdf. Original attributes and geometries are preserved in the resulting point nodes. distance_metric : {"euclidean", "manhattan", "network"}, default "euclidean" Metric used for edge weights and geometries. Euclidean produces straight line segments, Manhattan produces L-shaped polylines, and Network traces polylines along the provided network_gdf and uses shortest-path distances. network_gdf : geopandas.GeoDataFrame, optional Required when distance_metric="network". Must share the same CRS as the inputs. predicate : str, default "covered_by" Spatial predicate used to determine containment in a vectorized spatial join (e.g., "covered_by", "within", "contains", "intersects"). The default includes points on polygon boundaries. as_nx : bool, default False If False, return heterogeneous GeoDataFrame dictionaries. If True, return a typed heterogeneous NetworkX graph built with gdf_to_nx. Returns ------- (nodes_dict, edges_dict) : tuple of dicts Returned when as_nx=False. nodes_dict is {"polygon": polygons_gdf, "point": points_gdf} with original indices, attributes, and geometries. edges_dict maps a typed edge key to an edges GeoDataFrame whose index is a MultiIndex of (polygon_id, point_id) and includes at least weight and geometry columns. The edge key has the form ("polygon", relation, "point"), where relation is derived from predicate (e.g., covered_by -> "covers", within -> "contains"). G : networkx.Graph Returned when as_nx=True. A heterogeneous graph with node_type in nodes and a typed edge_type reflecting the relation derived from predicate. Graph metadata includes crs and is_hetero=True. Notes ----- - CRS must be present and identical for both inputs. For network metric, the network's CRS must also match. - Boundary points are included by default via predicate="covered_by". - Distance calculations and edge geometries reuse internal helpers (_prepare_nodes, _distance_matrix, _add_edges) to ensure consistency with other proximity functions. Examples -------- Build heterogeneous GeoDataFrames (default Euclidean): >>> import geopandas as gpd >>> from shapely.geometry import Point, Polygon >>> from city2graph.proximity import group_nodes >>> polys = gpd.GeoDataFrame( ... {"name": ["A"]}, ... geometry=[Polygon([(0,0), (2,0), (2,2), (0,2)])], ... crs="EPSG:3857", ... ) >>> pts = gpd.GeoDataFrame( ... {"id": [1, 2]}, ... geometry=[Point(1, 1), Point(3, 3)], ... crs="EPSG:3857", ... ) >>> nodes, edges = group_nodes(polys, pts, as_nx=False) >>> list(nodes.keys()) ['polygon', 'point'] >>> next(iter(edges)).__class__ is tuple True Build a typed heterogeneous NetworkX graph: >>> G = group_nodes(polys, pts, as_nx=True) >>> G.graph.get("is_hetero") True """ # ---- Normalise inputs and validate once ---- relation = _relation_from_predicate(predicate) metric_lc = _normalize_metric(distance_metric) # Strong, early CRS presence check (runs before generic validator to avoid any side-effects) poly_crs = polygons_gdf.crs pt_crs = points_gdf.crs if not poly_crs or not pt_crs: msg = ( f"Both inputs must have a CRS (got polygons_gdf.crs={poly_crs}, " f"points_gdf.crs={pt_crs})" ) raise ValueError(msg) # Validate GDFs and CRS via shared utility (keeps behaviour consistent across the package) # We intentionally ignore the returned validated copies to avoid accidental mutation. validate_gdf({"polygon": polygons_gdf, "point": points_gdf}, None, allow_empty=True) if poly_crs != pt_crs: msg = f"CRS mismatch between inputs: {poly_crs} != {pt_crs}" raise ValueError(msg) if metric_lc not in {"euclidean", "manhattan", "network"}: msg = f"Unsupported distance_metric: {distance_metric!r}" raise ValueError(msg) if metric_lc == "network": if network_gdf is None: msg = "network_gdf is required when distance_metric='network'" raise ValueError(msg) if network_gdf.crs != poly_crs: msg = f"CRS mismatch between inputs and network: inputs={poly_crs} != network={network_gdf.crs}" raise ValueError(msg) # Quick exits for empty inputs / no matches if polygons_gdf.empty or points_gdf.empty: return _group_nodes_empty_result(polygons_gdf, points_gdf, relation, poly_crs, as_nx) # Find all (polygon_id, point_id) pairs matching the containment predicate pairs = _containment_pairs(points_gdf, polygons_gdf, predicate) # Return empty result if no containment pairs were found if not pairs: return _group_nodes_empty_result(polygons_gdf, points_gdf, relation, poly_crs, as_nx) # Build edge GeoDataFrame using shared helpers edges_gdf = _edges_gdf_from_pairs(polygons_gdf, points_gdf, pairs, metric_lc, network_gdf) # Package as heterogeneous data structure (edges_gdf may legitimately be empty) nodes_dict: dict[str, gpd.GeoDataFrame] = {"polygon": polygons_gdf, "point": points_gdf} edges_dict: dict[tuple[str, str, str], gpd.GeoDataFrame] = { ("polygon", relation, "point"): edges_gdf } return ( gdf_to_nx(nodes=nodes_dict, edges=edges_dict, directed=True) if as_nx else (nodes_dict, edges_dict) )
def _prepare_nodes( gdf: gpd.GeoDataFrame, *, directed: bool = False, ) -> tuple[nx.Graph, npt.NDArray[np.floating], list[Any]]: """ Return an empty graph with populated nodes plus coord cache. This function prepares a NetworkX graph by adding nodes from a given GeoDataFrame. It extracts centroids as node positions and includes all GeoDataFrame attributes as node attributes. Parameters ---------- gdf : geopandas.GeoDataFrame The GeoDataFrame containing the nodes to be added to the graph. directed : bool, default False If True, a directed graph (DiGraph) is created; otherwise, an undirected graph (Graph) is created. Returns ------- tuple[networkx.Graph, numpy.ndarray, list[int]] A tuple containing: - G : networkx.Graph or networkx.DiGraph An empty graph with populated nodes. - coords : numpy.ndarray A 2D NumPy array of node coordinates. - node_ids : list[Any] A list of node IDs. """ validate_gdf(nodes_gdf=gdf) centroids = gdf.geometry.centroid coords = np.column_stack([centroids.x, centroids.y]) node_ids: list[Any] = list(gdf.index) G = nx.DiGraph() if directed else nx.Graph() # Bulk-add nodes using a comprehension to minimize Python-level loops attrs_list = gdf.to_dict("records") G.add_nodes_from( ( node_id, {"pos": (float(x), float(y)), **attrs}, ) for node_id, attrs, (x, y) in zip(node_ids, attrs_list, coords, strict=False) ) G.graph["crs"] = gdf.crs return G, coords, node_ids # ============================================================================ # DISTANCE MATRIX # ============================================================================ def _euclidean_dm(coords: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]: """ Compute Euclidean distance matrix. This function calculates the pairwise Euclidean distances between all points in the input coordinate array and returns them as a squareform distance matrix. Parameters ---------- coords : numpy.ndarray A 2D NumPy array of coordinates (n_points, n_dimensions). Returns ------- numpy.ndarray A squareform Euclidean distance matrix. """ return cast("npt.NDArray[np.floating]", sdist.squareform(sdist.pdist(coords))) def _manhattan_dm(coords: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]: """ Compute Manhattan distance matrix. This function calculates the pairwise Manhattan (city-block) distances between all points in the input coordinate array and returns them as a squareform distance matrix. Parameters ---------- coords : numpy.ndarray A 2D NumPy array of coordinates (n_points, n_dimensions). Returns ------- numpy.ndarray A squareform Manhattan distance matrix. """ return cast( "npt.NDArray[np.floating]", sdist.squareform(sdist.pdist(coords, metric="cityblock")), ) def _network_dm( coords: npt.NDArray[np.floating], network_gdf: gpd.GeoDataFrame, gdf_crs: gpd.crs.CRS | None = None, ) -> npt.NDArray[np.floating]: """ Compute network distance matrix. This function calculates a distance matrix based on shortest paths within a given network. It maps the input coordinates to the nearest nodes in the network and then computes all-pairs shortest paths between these mapped nodes. Parameters ---------- coords : numpy.ndarray A 2D NumPy array of coordinates (n_points, n_dimensions) for which to compute distances. network_gdf : geopandas.GeoDataFrame A GeoDataFrame representing the network (e.g., roads) to use for distance calculations. It must contain LineString geometries and have a valid CRS. gdf_crs : geopandas.crs.CRS or None, optional The Coordinate Reference System (CRS) of the input `coords`. Used for CRS validation against the `network_gdf`. Returns ------- numpy.ndarray A squareform network distance matrix, where `dm[i, j]` is the shortest path distance between the point corresponding to `coords[i]` and `coords[j]` along the `network_gdf`. """ # Validate CRS if network_gdf.crs != gdf_crs: msg = f"CRS mismatch: {gdf_crs} != {network_gdf.crs}" raise ValueError(msg) # Convert edge GDF to NetworkX net_nx = gdf_to_nx(edges=network_gdf) # Get node positions pos = nx.get_node_attributes(net_nx, "pos") net_coords: npt.NDArray[np.floating] = np.asarray(list(pos.values())) net_ids = list(pos.keys()) # Map sample points to nearest network nodes nn = NearestNeighbors(n_neighbors=1).fit(net_coords) _, idx = nn.kneighbors(coords) nearest = [net_ids[i[0]] for i in idx] # Pre-allocate distance matrix n = len(coords) dm: npt.NDArray[np.floating] = np.full((n, n), np.inf) np.fill_diagonal(dm, 0) # Calculate all-pairs shortest paths use_weight = "length" if any("length" in d for _, _, d in net_nx.edges(data=True)) else None for i in range(n): lengths = nx.single_source_dijkstra_path_length(net_nx, nearest[i], weight=use_weight) for j in range(i + 1, n): dist = lengths.get(nearest[j], np.inf) dm[i, j] = dm[j, i] = dist return dm def _distance_matrix( coords: npt.NDArray[np.floating], metric: str, network_gdf: gpd.GeoDataFrame | None, gdf_crs: gpd.crs.CRS | None = None, ) -> npt.NDArray[np.floating]: """ Compute distance matrix based on the specified metric. This function acts as a dispatcher, calling the appropriate distance matrix computation function based on the `metric` parameter. It supports Euclidean, Manhattan, and network-based distance calculations. Parameters ---------- coords : numpy.ndarray A 2D NumPy array of coordinates (n_points, n_dimensions) for which to compute distances. metric : str The distance metric to use. Options are "euclidean", "manhattan", or "network". network_gdf : geopandas.GeoDataFrame or None A GeoDataFrame representing a network (e.g., roads) to use for "network" distance calculations. Required if `metric` is "network". gdf_crs : geopandas.crs.CRS or None, optional The Coordinate Reference System (CRS) of the input `coords`. Used for CRS validation against the `network_gdf`. Returns ------- numpy.ndarray A squareform distance matrix based on the specified metric. Raises ------ ValueError If `metric` is "network" but `network_gdf` is not provided. If an unknown `metric` is specified. """ if metric == "euclidean": return _euclidean_dm(coords) if metric == "manhattan": return _manhattan_dm(coords) if metric == "network": if network_gdf is None: msg = "network_gdf is required for network distance metric" raise ValueError(msg) return _network_dm(coords, network_gdf, gdf_crs) msg = f"Unknown distance metric: {metric}" raise ValueError(msg) # ============================================================================ # EDGE ADDITION # ============================================================================ def _add_edges( G: nx.Graph, edges: list[EdgePair] | set[EdgePair], coords: npt.NDArray[np.floating], node_ids: list[Any], *, metric: str, dm: npt.NDArray[np.floating] | None = None, network_gdf: gpd.GeoDataFrame | None = None, ) -> None: """ Add edges to the graph with weights and geometries. When the metric is *network* the geometry is built by tracing the path on `network_gdf`, so the resulting LineString corresponds to the real shortest-path on that network rather than a straight segment. Parameters ---------- G : networkx.Graph The graph to which edges will be added. edges : list[tuple[int, int]] or set[tuple[int, int]] A list or set of (u, v) tuples representing the edges to add. coords : numpy.ndarray A 2D NumPy array of coordinates. node_ids : list[int] A list of node IDs corresponding to the coordinates. metric : str The distance metric used for edge weights (e.g., "euclidean", "manhattan", "network"). dm : numpy.ndarray or None, optional Precomputed distance matrix. If provided, it is used to set edge weights. network_gdf : geopandas.GeoDataFrame or None, optional Required if `metric` is "network". Represents the network for shortest path calculations. """ if not edges: return # Add edges to the graph G.add_edges_from(edges) # Calculate and set edge weights idx_map: dict[Any, int] = {n: i for i, n in enumerate(node_ids)} if dm is not None: weights = {(u, v): dm[idx_map[u], idx_map[v]] for u, v in G.edges()} elif metric == "manhattan": weights = { (u, v): abs(coords[idx_map[u]][0] - coords[idx_map[v]][0]) + abs(coords[idx_map[u]][1] - coords[idx_map[v]][1]) for u, v in G.edges() } else: # Euclidean weights = { (u, v): float( np.hypot( coords[idx_map[u]][0] - coords[idx_map[v]][0], coords[idx_map[u]][1] - coords[idx_map[v]][1], ), ) for u, v in G.edges() } nx.set_edge_attributes(G, weights, "weight") # Create and set edge geometries geom_attr: dict[tuple[Any, Any], LineString] = {} if metric.lower() == "network": # delegate to helper for clarity _set_network_edge_geometries( G, geom_attr, coords, node_ids, idx_map, network_gdf, ) nx.set_edge_attributes(G, geom_attr, "geometry") return # Manhattan or Euclidean metric # Vectorized geometry creation via array ops and a small comprehension edge_list = list(G.edges()) if edge_list: # Use Python mapping to avoid None from dict.get with object keys under vectorization u_idx = np.array([idx_map[u] for u, _ in edge_list], dtype=int) v_idx = np.array([idx_map[v] for _, v in edge_list], dtype=int) p1 = coords[u_idx] p2 = coords[v_idx] if metric.lower() == "manhattan": geoms = [ LineString([(x1, y1), (x2, y1), (x2, y2)]) for (x1, y1), (x2, y2) in zip(p1, p2, strict=False) ] else: geoms = [LineString([pt1, pt2]) for pt1, pt2 in zip(p1, p2, strict=False)] geom_attr = {edge: geom for edge, geom in zip(edge_list, geoms, strict=False)} nx.set_edge_attributes(G, geom_attr, "geometry") def _set_network_edge_geometries( G: nx.Graph, geom_attr: dict[tuple[Any, Any], LineString], coords: npt.NDArray[np.floating], node_ids: list[Any], idx_map: dict[Any, int], network_gdf: gpd.GeoDataFrame | None, ) -> None: """ Populate geometry for network metric edges using batched shortest paths. This isolates the more complex logic from `_add_edges`, reducing cyclomatic complexity while enabling caching of the network graph and nearest-node mapping for repeated calls. Parameters ---------- G : nx.Graph Graph whose edges will receive a ``geometry`` attribute in-place. geom_attr : dict[tuple[Any, Any], LineString] Mutable mapping populated with ``(u, v) -> LineString``; passed in so we can extend an existing dict without reallocating. coords : numpy.ndarray Array of node coordinate pairs (shape ``(n, 2)``) aligned with ``node_ids``. node_ids : list[Any] Sequence of node identifiers whose order matches ``coords``. idx_map : dict[Any, int] Mapping from node id to its integer row index in ``coords`` for quick lookup. network_gdf : geopandas.GeoDataFrame or None GeoDataFrame describing the underlying real network geometry (e.g., road segments). If provided it is converted to an internal NetworkX graph and cached under a private attribute to avoid repeated conversions. If ``None`` a straight LineString between endpoints is used. Notes ----- Caching: A NetworkX graph is cached on ``network_gdf`` under the private attribute ``_c2g_cached_nx`` so subsequent calls reuse the constructed graph. A nearest-node mapping is also cached on ``G.graph['_c2g_nearest_cache']`` keyed by the ``id(coords)`` to avoid recomputing nearest neighbors when the same coordinate array instance is reused. The function mutates ``geom_attr`` and (via the caller) sets the ``geometry`` edge attribute on ``G``. It may also attach a cached network graph to ``network_gdf``. """ if network_gdf is None: # pragma: no cover - defensive; public API ensures non-None msg = "network_gdf must be provided for network metric geometry construction" raise ValueError(msg) # Reuse cached network graph if available (constructed once per network_gdf instance) net_nx = getattr(network_gdf, "_c2g_cached_nx", None) if net_nx is None: net_nx = gdf_to_nx(edges=network_gdf) # Cache on the GeoDataFrame instance for reuse in subsequent calls cast("Any", network_gdf)._c2g_cached_nx = net_nx pos = nx.get_node_attributes(net_nx, "pos") # gdf_to_nx guarantees 'pos' # Build / reuse nearest mapping net_coords = np.asarray(list(pos.values())) net_ids = list(pos.keys()) nearest_cache = G.graph.get("_c2g_nearest_cache") if nearest_cache is None or nearest_cache.get("_hash_coords") is not id(coords): nn = NearestNeighbors(n_neighbors=1).fit(net_coords) _, idxs = nn.kneighbors(coords) nearest = {node_ids[i]: net_ids[j[0]] for i, j in enumerate(idxs)} G.graph["_c2g_nearest_cache"] = {"mapping": nearest, "_hash_coords": id(coords)} else: nearest = nearest_cache["mapping"] use_weight = "length" if any("length" in d for *_, d in net_nx.edges(data=True)) else None edges_by_source: dict[Any, list[tuple[Any, Any]]] = {} for u, v in G.edges(): edges_by_source.setdefault(nearest[u], []).append((u, v)) for src_nn, edge_list in edges_by_source.items(): _, paths = nx.single_source_dijkstra(net_nx, src_nn, weight=use_weight) for u, v in edge_list: path_nodes = paths.get(nearest[v]) if not path_nodes or len(path_nodes) < 2: # Uniform fallback: straight segment between endpoints geom_attr[(u, v)] = LineString([coords[idx_map[u]], coords[idx_map[v]]]) else: geom_attr[(u, v)] = LineString([pos[p] for p in path_nodes]) def _directed_edges( src_coords: npt.NDArray[np.floating], dst_coords: npt.NDArray[np.floating], src_ids: list[int], dst_ids: list[int], *, metric: str, k: int | None = None, radius: float | None = None, ) -> list[tuple[int, int]]: """ Generate directed edges from source to destination nodes. This function creates directed edges between source and destination node sets using either k-nearest neighbors or radius-based proximity methods to establish spatial connections between different node layers. Parameters ---------- src_coords : numpy.ndarray A 2D NumPy array of coordinates (n_points, n_dimensions) for the source nodes. dst_coords : numpy.ndarray A 2D NumPy array of coordinates (n_points, n_dimensions) for the destination nodes. src_ids : list[int] A list of node IDs corresponding to `src_coords`. dst_ids : list[int] A list of node IDs corresponding to `dst_coords`. metric : str The distance metric to use (e.g., "euclidean", "manhattan"). k : int or None, optional The number of nearest neighbors to consider for each source node. radius : float or None, optional The maximum distance for connecting nodes. Returns ------- list[tuple[int, int]] A list of directed edges as (source_id, destination_id) tuples. """ # Internal invariant: exactly one of k / radius is provided by callers. nn_metric = "cityblock" if metric == "manhattan" else "euclidean" if k is not None: n_neigh = min(k, len(dst_coords)) nn = NearestNeighbors(n_neighbors=n_neigh, metric=nn_metric).fit(dst_coords) _, idxs = nn.kneighbors(src_coords) return [(src_ids[i], dst_ids[j]) for i, neigh in enumerate(idxs) for j in neigh] nn = NearestNeighbors(radius=radius, metric=nn_metric).fit(dst_coords) idxs = nn.radius_neighbors(src_coords, return_distance=False) return [(src_ids[i], dst_ids[j]) for i, neigh in enumerate(idxs) for j in neigh] # ============================================================================ # MUTILAYER BRIDGING # ============================================================================ def _directed_graph( *, src_gdf: gpd.GeoDataFrame, dst_gdf: gpd.GeoDataFrame, distance_metric: str, method: str, param: float, as_nx: bool, network_gdf: gpd.GeoDataFrame | None = None, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: """ Build source → target directed proximity edges. This function creates a directed graph with edges from source nodes to target nodes based on proximity criteria, supporting both k-nearest neighbors and radius-based connection methods for spatial network construction. Parameters ---------- src_gdf : geopandas.GeoDataFrame The source GeoDataFrame. dst_gdf : geopandas.GeoDataFrame The destination GeoDataFrame. distance_metric : str The distance metric to use. method : str The method to use for generating proximity edges ("knn" or "radius"). param : float The parameter for the proximity method (k for knn, radius for fixed_radius). as_nx : bool If True, returns a NetworkX graph object. network_gdf : geopandas.GeoDataFrame, optional A GeoDataFrame representing a network for "network" distance calculations. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph If as_nx is False, returns a tuple of (nodes_gdf, edges_gdf). If as_nx is True, returns a NetworkX Graph object. """ # Validate CRS if src_gdf.crs != dst_gdf.crs: msg = "CRS mismatch between source and target GeoDataFrames" raise ValueError(msg) # Prepare nodes for both source and destination src_G, src_coords, src_ids = _prepare_nodes(src_gdf, directed=True) dst_G, dst_coords, dst_ids = _prepare_nodes(dst_gdf, directed=True) # Disambiguate node identifiers between source and destination layers. # Use tuple-based namespacing to guarantee collision-free composition: # ('src', original_id) for sources, ('dst', original_id) for targets unique_src_ids = [("src", sid) for sid in src_ids] unique_dst_ids = [("dst", did) for did in dst_ids] src_relabel_map = {old: new for old, new in zip(src_ids, unique_src_ids, strict=False)} dst_relabel_map = {old: new for old, new in zip(dst_ids, unique_dst_ids, strict=False)} # Relabel graphs with unique IDs and attach provenance attributes src_G = nx.relabel_nodes(src_G, src_relabel_map, copy=True) dst_G = nx.relabel_nodes(dst_G, dst_relabel_map, copy=True) # Attach original index and node_type to enable correct reconstruction later nx.set_node_attributes( src_G, {nid: oid for nid, oid in zip(unique_src_ids, src_ids, strict=False)}, "_original_index", ) nx.set_node_attributes(src_G, dict.fromkeys(unique_src_ids, "src"), "node_type") nx.set_node_attributes( dst_G, {nid: oid for nid, oid in zip(unique_dst_ids, dst_ids, strict=False)}, "_original_index", ) nx.set_node_attributes(dst_G, dict.fromkeys(unique_dst_ids, "dst"), "node_type") # Merge nodes into one directed graph (safe: IDs are now unique) G = nx.compose(src_G, dst_G) # Precompute combined coordinates and ids for weighting/geometry combined_coords = np.vstack([src_coords, dst_coords]) combined_ids = unique_src_ids + unique_dst_ids src_n = len(src_ids) dst_n = len(dst_ids) # Generate directed edges # Special handling for network metric: selection must use network distances, # and resulting weights should be set from the corresponding distance matrix. dm: npt.NDArray[np.floating] | None = None raw_edges: list[tuple[int, int]] if distance_metric.lower() == "network": # Compute network distances for all src+dst points once dm = _distance_matrix(combined_coords, "network", network_gdf, src_gdf.crs) d_sub = dm[:src_n, src_n : src_n + dst_n] finite = np.isfinite(d_sub) if method == "knn": k = int(param) # Rank all destinations per source, then pick top-k finite order = np.argsort(d_sub, axis=1) rows = np.arange(src_n)[:, None] ranks = np.empty_like(order) ranks[rows, order] = np.arange(dst_n)[None, :] sel_mask = (ranks < k) & finite i_idx, j_idx = np.where(sel_mask) else: # radius radius_val = float(param) i_idx, j_idx = np.where(finite & (d_sub <= radius_val)) raw_edges = list(zip((src_ids[i] for i in i_idx), (dst_ids[j] for j in j_idx), strict=True)) else: raw_edges = _directed_edges( src_coords, dst_coords, src_ids, dst_ids, metric=distance_metric, k=int(param) if method == "knn" else None, radius=param if method == "radius" else None, ) # Convert edges to use the unique, namespaced node identifiers relabeled_edges: list[tuple[tuple[str, int], tuple[str, int]]] = [ (src_relabel_map[u], dst_relabel_map[v]) for (u, v) in raw_edges ] # Add edges with weights and geometries _add_edges( G, relabeled_edges, combined_coords, combined_ids, metric=distance_metric, dm=dm, network_gdf=network_gdf, ) # Preserve original edge indices for GeoDataFrame reconstruction # so that nx_to_gdf emits (orig_src, orig_dst) even though internal IDs are namespaced orig_edge_index = { (u, v): ( G.nodes[u].get("_original_index", u), G.nodes[v].get("_original_index", v), ) for u, v in G.edges() } nx.set_edge_attributes(G, orig_edge_index, "_original_edge_index") return G if as_nx else nx_to_gdf(G, nodes=True, edges=True) def _validate_contiguity_input(gdf: gpd.GeoDataFrame, contiguity: str) -> None: """ Lightweight validation for contiguity graph public API. Keep only checks required by tests and core invariants; rely on downstream libraries (GeoPandas/libpysal) for deeper geometry validation to reduce code complexity and branching. Parameters ---------- gdf : geopandas.GeoDataFrame Input polygon layer to validate. contiguity : {"queen", "rook"} Contiguity rule to enforce. Raises ------ TypeError If ``gdf`` is not a GeoDataFrame. ValueError If ``contiguity`` is not one of {"queen", "rook"}. """ if not isinstance(gdf, gpd.GeoDataFrame): # pragma: no cover (defensive) msg = ( f"Input must be a GeoDataFrame, got {type(gdf).__name__}. " "Please provide a valid GeoDataFrame with polygon geometries." ) raise TypeError(msg) if contiguity not in {"queen", "rook"}: msg = "Invalid contiguity type: must be 'queen' or 'rook'" raise ValueError(msg) def _create_spatial_weights( gdf: gpd.GeoDataFrame, contiguity: str, ) -> libpysal.weights.W: """ Create spatial weights matrix using libpysal for contiguity analysis. This helper wraps libpysal's Queen/Rook constructors and returns a weights object keyed by the original GeoDataFrame index. It centralises the choice of contiguity rule and small edge cases (like empties) so the public API remains concise and uniform. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing polygon geometries. Must have been validated by ``_validate_contiguity_input`` before calling this function. contiguity : {"queen", "rook"} Type of spatial contiguity to use. Returns ------- libpysal.weights.W Spatial weights matrix representing adjacency relationships between polygons. The weights matrix uses the original GeoDataFrame index as identifiers. """ # Handle empty GeoDataFrame case early if gdf.empty: # pragma: no cover - empty handled earlier in public API return libpysal.weights.W({}) # Validate contiguity value early to avoid catching it in the generic except contiguity_lc = contiguity.lower() if contiguity_lc not in {"queen", "rook"}: # pragma: no cover (validated earlier) msg = f"Unsupported contiguity type: {contiguity}" raise ValueError(msg) # Simplified: rely on libpysal API (modern versions support ids); remove fallbacks. ids = list(gdf.index) if contiguity_lc == "queen": weights = libpysal.weights.Queen.from_dataframe(gdf, ids=ids) else: # rook weights = libpysal.weights.Rook.from_dataframe(gdf, ids=ids) return weights def _generate_contiguity_edges( weights: libpysal.weights.W, _gdf: gpd.GeoDataFrame, ) -> list[EdgePair]: """ Extract adjacency relationships from a spatial weights matrix. The returned edge list contains each undirected adjacency exactly once with endpoints ordered canonically. This normalisation simplifies later processing and avoids duplicate edges in the final graph. Parameters ---------- weights : libpysal.weights.W Spatial weights matrix containing adjacency relationships between polygons. Should be created by ``_create_spatial_weights``. _gdf : geopandas.GeoDataFrame Original GeoDataFrame (unused, present for logging/signature consistency). Returns ------- list[tuple[Any, Any]] Unique undirected edges as pairs of original GeoDataFrame index values. Returns an empty list if there are no adjacency relationships. """ # Empty weights -> no edges if not weights.neighbors: return [] # Unique undirected edges, canonical order return list( {tuple(sorted((src, nbr))) for src, nbrs in weights.neighbors.items() for nbr in nbrs} ) def _build_contiguity_graph( gdf: gpd.GeoDataFrame, edges: list[EdgePair], *, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, ) -> nx.Graph: """ Build NetworkX graph from GeoDataFrame and edge list. Nodes preserve all attributes from the input GeoDataFrame and carry a 'pos' coordinate attribute derived from polygon centroids. Edges receive metric-specific weights and geometries through the shared edge attachment utilities, ensuring consistent behaviour with the other generators. Parameters ---------- gdf : geopandas.GeoDataFrame Original GeoDataFrame containing polygon geometries and attributes. The index provides node identifiers and all columns are preserved as node attributes. edges : list[tuple[Any, Any]] List of (source_id, target_id) tuples representing edges in the graph. Should use the same identifiers as the GeoDataFrame index. distance_metric : {"euclidean", "manhattan", "network"}, default "euclidean" Metric used to compute edge weights/geometries. network_gdf : geopandas.GeoDataFrame, optional Line-based network required when ``distance_metric == 'network'``. Returns ------- networkx.Graph Graph with nodes preserving original attributes and edges carrying weight and geometry attributes; graph metadata includes CRS. """ # Reuse shared helpers: prepare nodes once (adds 'pos' and preserves attributes) G, coords, node_ids = _prepare_nodes(gdf) # Caller (contiguity_graph) already validated metric & network_gdf; avoid duplicate branches. metric_lc = _normalize_metric(distance_metric) # Attach edges with selected metric logic if edges: _add_edges( G, edges, coords, node_ids, metric=metric_lc, dm=None, network_gdf=network_gdf, ) # Log graph construction results logger.debug( "Contiguity graph constructed: %d nodes, %d edges, CRS: %s, metric: %s", G.number_of_nodes(), G.number_of_edges(), G.graph.get("crs"), distance_metric, ) return G def _contiguity_graph_core( gdf: gpd.GeoDataFrame, contiguity: str, *, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, ) -> nx.Graph: """ Build a contiguity graph (NetworkX) from a validated polygon GeoDataFrame. This internal core performs the minimal steps to obtain adjacency edges via libpysal, then delegates node/edge construction to the shared helpers. Public-facing validation and formatting are handled by `contiguity_graph`. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing valid polygon geometries. Index values are used as node identifiers and all columns are preserved as node attributes. contiguity : {"queen", "rook"} Type of spatial contiguity used to derive adjacency relationships. distance_metric : {"euclidean", "manhattan", "network"}, default "euclidean" Metric used to compute edge weights/geometries. network_gdf : geopandas.GeoDataFrame, optional Line-based network required when ``distance_metric == 'network'``. Returns ------- networkx.Graph Undirected graph with nodes/edges and metadata ('crs', 'contiguity', 'distance_metric'). """ # Create spatial weights and extract undirected edges logger.debug("Creating %s contiguity spatial weights matrix", contiguity) weights = _create_spatial_weights(gdf, contiguity) logger.debug("Extracting adjacency relationships from spatial weights") edges = _generate_contiguity_edges(weights, gdf) # Build graph and attach metadata logger.debug("Building NetworkX graph with nodes and edges") G = _build_contiguity_graph( gdf, edges, distance_metric=distance_metric, network_gdf=network_gdf, ) G.graph["contiguity"] = contiguity G.graph["distance_metric"] = distance_metric return G def _empty_contiguity_result( gdf: gpd.GeoDataFrame, contiguity: str, *, distance_metric: str = "euclidean", as_nx: bool, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: """ Create an empty-but-typed result matching contiguity_graph outputs. This keeps return types stable even when the input is empty or yields no adjacency relationships, preserving CRS and expected columns/metadata so downstream code can rely on a consistent schema. Parameters ---------- gdf : geopandas.GeoDataFrame The input GeoDataFrame (possibly empty) used to derive CRS and columns. contiguity : str Contiguity type label to attach to graph metadata when returning NetworkX. distance_metric : {"euclidean", "manhattan", "network"}, default "euclidean" Metric recorded in graph metadata when returning a NetworkX graph. as_nx : bool Whether to return a NetworkX graph or a tuple of GeoDataFrames. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph Properly typed empty result preserving CRS and node columns (or an empty graph). """ if as_nx: empty_graph = nx.Graph() empty_graph.graph["crs"] = gdf.crs empty_graph.graph["contiguity"] = contiguity empty_graph.graph["distance_metric"] = distance_metric return empty_graph # Create empty nodes GeoDataFrame with same structure as input empty_nodes = gpd.GeoDataFrame( columns=gdf.columns.tolist(), crs=gdf.crs, index=gdf.index[:0], # Empty index with same type ) # Create empty edges GeoDataFrame with expected structure empty_edges = gpd.GeoDataFrame( columns=["weight", "geometry"], crs=gdf.crs, ) return empty_nodes, empty_edges
[docs] def contiguity_graph( gdf: gpd.GeoDataFrame, contiguity: str = "queen", *, distance_metric: str = "euclidean", network_gdf: gpd.GeoDataFrame | None = None, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: r""" Generate a contiguity-based spatial graph from polygon geometries. This function creates a spatial graph where nodes represent polygons and edges connect spatially contiguous (adjacent) polygons based on Queen or Rook contiguity rules. It leverages libpysal's robust spatial weights functionality to accurately determine adjacency relationships, making it ideal for spatial analysis of administrative boundaries, urban morphology studies, land use patterns, and geographic network analysis. The function supports both Queen contiguity (polygons sharing edges or vertices) and Rook contiguity (polygons sharing only edges), providing flexibility for different spatial analysis requirements. Edge weights are calculated as distances between polygon centroids using the selected ``distance_metric``. Supported metrics: * ``euclidean`` (default): straight-line distance; edge geometry is a direct centroid-to-centroid LineString. * ``manhattan``: L1 distance; edge geometry is an L-shaped polyline (two segments) following an axis-aligned path between centroids. * ``network``: shortest-path distance over ``network_gdf`` (a line network in the same CRS); edge geometry is the polyline path traced along the network. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame containing polygon geometries. Must contain valid polygon geometries in the 'geometry' column. The index of this GeoDataFrame will be preserved as node identifiers in the output graph. All original attributes are maintained in the nodes output. contiguity : {"queen", "rook"}, default "queen" Type of spatial contiguity rule to apply for determining adjacency: - "queen": Polygons are considered adjacent if they share any boundary (edges or vertices). This is more inclusive and typically results in more connections. - "rook": Polygons are considered adjacent only if they share an edge (not just vertices). This is more restrictive and results in fewer connections. distance_metric : {"euclidean", "manhattan", "network"}, default "euclidean" Metric used to compute edge weights and geometries. network_gdf : geopandas.GeoDataFrame, optional Required when ``distance_metric='network'``. A line-based network whose CRS matches ``gdf``. as_nx : bool, default False Output format control. If True, returns a NetworkX Graph object with spatial attributes. If False, returns a tuple of GeoDataFrames for nodes and edges, compatible with other city2graph functions. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] or networkx.Graph When ``as_nx=False`` (default), returns ``(nodes_gdf, edges_gdf)`` as GeoDataFrames. When ``as_nx=True``, returns a NetworkX Graph with spatial attributes and metadata. Raises ------ TypeError If `gdf` is not a geopandas.GeoDataFrame instance. ValueError If `contiguity` is not one of {"queen", "rook"}. If `gdf` contains geometries that are not polygons (Point, LineString, etc.). If `gdf` contains invalid or corrupt polygon geometries. If libpysal fails to create spatial weights matrix. See Also -------- libpysal.weights.Queen : Spatial weights based on Queen contiguity. libpysal.weights.Rook : Spatial weights based on Rook contiguity. knn_graph : Generate k-nearest neighbor graphs from point data. delaunay_graph : Generate Delaunay triangulation graphs from point data. fixed_radius_graph : Generate fixed-radius proximity graphs. gabriel_graph : Generate Gabriel graphs from point data. relative_neighborhood_graph : Generate relative neighborhood graphs. waxman_graph : Generate probabilistic Waxman graphs. Examples -------- >>> # Create sample administrative districts >>> districts = [ ... Polygon([(0, 0), (2, 0), (2, 2), (0, 2)]), # District A ... Polygon([(2, 0), (4, 0), (4, 2), (2, 2)]), # District B (adjacent to A) ... Polygon([(0, 2), (2, 2), (2, 4), (0, 4)]), # District C (adjacent to A) ... Polygon([(3, 3), (5, 3), (5, 5), (3, 5)]) # District D (isolated) ... ] >>> gdf = gpd.GeoDataFrame({ ... 'district_id': ['A', 'B', 'C', 'D'], ... 'population': [10000, 15000, 8000, 5000], ... 'area_km2': [4.0, 4.0, 4.0, 4.0], ... 'geometry': districts ... }, crs="EPSG:3857").set_index('district_id') >>> >>> # Generate Queen contiguity graph >>> nodes_gdf, edges_gdf = contiguity_graph(gdf, contiguity="queen", distance_metric="euclidean") >>> print(f"Districts: {len(nodes_gdf)}, Adjacency relationships: {len(edges_gdf)}") Districts: 4, Adjacency relationships: 4 >>> >>> # Examine adjacency relationships >>> print("\\nAdjacency relationships:") >>> for idx, edge in edges_gdf.iterrows(): ... src, dst = edge.name # Edge endpoints from MultiIndex ... weight = edge['weight'] ... print(f" {src} ↔ {dst}: distance = {weight:.2f}") Adjacency relationships: A ↔ B: distance = 2.83 A ↔ C: distance = 2.83 """ # Log function entry for debugging logger.debug( "contiguity_graph called with %s polygons, contiguity='%s', as_nx=%s", (len(gdf) if hasattr(gdf, "__len__") else "unknown"), contiguity, as_nx, ) # Step 1: Input validation _validate_contiguity_input(gdf, contiguity) metric_lc = _normalize_metric(distance_metric) if metric_lc not in {"euclidean", "manhattan", "network"}: msg = f"Unsupported distance_metric: {distance_metric!r}" raise ValueError(msg) if metric_lc == "network": if network_gdf is None: msg = "network_gdf is required when distance_metric='network'" raise ValueError(msg) if network_gdf.crs != gdf.crs: msg = f"CRS mismatch between gdf ({gdf.crs}) and network_gdf ({network_gdf.crs})" raise ValueError(msg) # Step 2: Handle empty GeoDataFrame case up-front if gdf.empty: logger.debug("Empty GeoDataFrame provided - returning empty result") return _empty_contiguity_result( gdf, contiguity, distance_metric=metric_lc, as_nx=as_nx, ) # Step 3: Build the core graph once, then format output as requested G = _contiguity_graph_core( gdf, contiguity, distance_metric=metric_lc, network_gdf=network_gdf, ) # Step 4: Return in requested format (single return branch for clarity) return G if as_nx else nx_to_gdf(G, nodes=True, edges=True)