Source code for city2graph.morphology

"""
Module for creating morphological graphs from urban data.

This module provides comprehensive functionality for analyzing urban morphology
through graph representations, focusing on the relationships between private
spaces (buildings and their tessellations) and public spaces (street segments).
It creates heterogeneous graphs that capture the complex spatial relationships
inherent in urban environments. Both GeoDataFrame and NetworkX objects can be
converted to PyTorch Geometric Data or HeteroData by functions from graph.py.

The module specializes in three types of spatial relationships:
1. Private-to-private: Adjacency relationships between building tessellations
2. Public-to-public: Topological connectivity between street segments
3. Private-to-public: Interface relationships between private and public spaces
"""

# Standard library imports
import itertools
import logging
import math
import warnings

# Third-party imports
import geopandas as gpd
import libpysal
import networkx as nx
import numpy as np
import pandas as pd
from scipy.spatial import KDTree
from scipy.spatial.distance import pdist
from shapely.creation import linestrings as sh_linestrings
from shapely.geometry import Point

# Local imports
from .utils import create_tessellation
from .utils import dual_graph
from .utils import filter_graph_by_distance
from .utils import gdf_to_nx
from .utils import nx_to_gdf
from .utils import segments_to_graph

# Public API definition
__all__ = [
    "morphological_graph",
    "private_to_private_graph",
    "private_to_public_graph",
    "public_to_public_graph",
]

# Module logger configuration
logger = logging.getLogger(__name__)


# ============================================================================
# MAIN MORPHOLOGICAL GRAPH FUNCTION
# ============================================================================


[docs] def morphological_graph( # noqa: PLR0912, PLR0915 buildings_gdf: gpd.GeoDataFrame, segments_gdf: gpd.GeoDataFrame, center_point: gpd.GeoSeries | gpd.GeoDataFrame | None = None, distance: float | None = None, clipping_buffer: float = math.inf, primary_barrier_col: str | None = "barrier_geometry", contiguity: str = "queen", keep_buildings: bool = False, tolerance: float = 1e-6, as_nx: bool = False, ) -> tuple[dict[str, gpd.GeoDataFrame], dict[tuple[str, str, str], gpd.GeoDataFrame]] | nx.Graph: """ Create a morphological graph from buildings and street segments. This function creates a comprehensive morphological graph that captures relationships between private spaces (building tessellations) and public spaces (street segments). The graph includes three types of relationships: private-to-private adjacency, public-to-public connectivity, and private-to-public interfaces. The 'private_id' for tessellation cells is derived from 'tess_id' (generated by `create_tessellation`) or assigned sequentially if 'tess_id' doesn't directly map. The 'public_id' for street segments is taken directly from the index of `segments_gdf`. Parameters ---------- buildings_gdf : geopandas.GeoDataFrame GeoDataFrame containing building polygons. Should contain Polygon or MultiPolygon geometries. segments_gdf : geopandas.GeoDataFrame GeoDataFrame containing street segments. Should contain LineString geometries. center_point : geopandas.GeoSeries or geopandas.GeoDataFrame, optional Center point(s) for spatial filtering. If provided with distance parameter, only segments within the specified distance will be included. distance : float, optional Maximum distance from ``center_point`` for spatial filtering. When specified, street segments beyond this shortest-path distance are removed and tessellation cells are kept only if their own distance via these segments does not exceed this value. clipping_buffer : float, default=math.inf Buffer distance to ensure adequate context for generating tessellation. Must be non-negative. primary_barrier_col : str, optional Column name containing alternative geometry for public spaces. If specified and exists, this geometry will be used instead of the main geometry column for tessellation barriers. contiguity : str, default="queen" Type of spatial contiguity for private-to-private connections. Must be either "queen" or "rook". keep_buildings : bool, default=False If True, preserves building information in the tessellation output. tolerance : float, default=1e-6 Buffer distance for public geometries when creating private-to-public connections. This parameter controls how close private spaces need to be to public spaces to establish a connection. as_nx : bool, default=False If True, convert the output to a NetworkX graph. Returns ------- tuple[dict[str, geopandas.GeoDataFrame], dict[tuple[str, str, str], geopandas.GeoDataFrame]] | networkx.Graph A tuple containing: - nodes: Dictionary with keys "private" and "public" containing node GeoDataFrames - edges: Dictionary with relationship type keys containing edge GeoDataFrames If as_nx is True, returns a NetworkX graph. Raises ------ TypeError If buildings_gdf or segments_gdf are not GeoDataFrames. ValueError If contiguity parameter is not "queen" or "rook". If clipping_buffer is negative. See Also -------- private_to_private_graph : Create adjacency between private spaces. private_to_public_graph : Create connections between private and public spaces. public_to_public_graph : Create connectivity between public spaces. Notes ----- The function first filters the street network by `distance` (resulting in `segs`). A `segs_buffer` GeoDataFrame is also created for tessellation context, potentially filtered by `distance + clipping_buffer` or `distance` if `center_point` and `distance` are provided. This `segs_buffer` is used to create enclosures and tessellations. It then establishes three types of relationships: 1. Private-to-private: Adjacency between tessellation cells (handled by private_to_private_graph) 2. Public-to-public: Topological connectivity between street segments 3. Private-to-public: Spatial interfaces between tessellations and streets The output follows a heterogeneous graph structure suitable for network analysis of urban morphology. Examples -------- >>> # Create morphological graph from buildings and segments >>> nodes, edges = morphological_graph(buildings_gdf, segments_gdf) >>> private_nodes = nodes['private'] >>> public_nodes = nodes['public'] """ # Define fixed ID column names private_id_col = "private_id" public_id_col = "public_id" # Validate input GeoDataFrames (includes geometry type checks) _validate_input_gdfs(buildings_gdf, segments_gdf) # Validate contiguity parameter if contiguity not in {"queen", "rook"}: msg = "contiguity must be 'queen' or 'rook'" raise ValueError(msg) # Validate clipping_buffer if clipping_buffer < 0: msg = "clipping_buffer cannot be negative." raise ValueError(msg) # Convert MultiIndex to tuples if buildings_gdf has MultiIndex if isinstance(buildings_gdf.index, pd.MultiIndex): buildings_gdf = buildings_gdf.copy() buildings_gdf.index = buildings_gdf.index.to_flat_index() # Ensure CRS consistency between buildings and segments segments_gdf = _ensure_crs_consistency(buildings_gdf, segments_gdf) # Assign the original index of segments_gdf to public_id_col # A copy is made to avoid SettingWithCopyWarning if segments_gdf might be a slice, # especially before adding/modifying a column. segments_gdf = segments_gdf.copy() segments_gdf[public_id_col] = segments_gdf.index # Use original index as public ID # Convert segments to a graph representation for efficient filtering. if not segments_gdf.empty: # segments_to_graph returns nodes and edges with a MultiIndex. # gdf_to_nx converts these to a NetworkX graph. segment_nodes, segment_edges = segments_to_graph(segments_gdf) segments_graph = gdf_to_nx(nodes=segment_nodes, edges=segment_edges) else: # Create empty structures to avoid errors downstream segment_nodes, segment_edges = segments_to_graph(segments_gdf) segments_graph = nx.Graph() # Filter segments by network distance for the final graph if center_point and distance are provided if center_point is not None and distance is not None and not segments_gdf.empty: filtered_segments_graph = filter_graph_by_distance(segments_graph, center_point, distance) segments_filtered = nx_to_gdf(filtered_segments_graph, nodes=False, edges=True) else: segments_filtered = segment_edges # Create a buffered version of the graph for tessellation creation (segments_buffer) # This segments_buffer is used to _prepare_barriers -> create_tessellation # and as the segments context for _filter_adjacent_tessellation. if center_point is not None and distance is not None and not segments_gdf.empty: if not math.isinf(clipping_buffer): # Finite clipping_buffer: use distance + clipping_buffer for segments_buffer radius buffer_radius = distance + clipping_buffer segments_buffer_graph = filter_graph_by_distance( segments_graph, center_point, buffer_radius, ) segments_buffer = nx_to_gdf(segments_buffer_graph, nodes=False, edges=True) else: # clipping_buffer is math.inf # Fallback to 'distance' as radius for segments_buffer segments_buffer_graph = filter_graph_by_distance( segments_graph, center_point, distance, ) segments_buffer = nx_to_gdf(segments_buffer_graph, nodes=False, edges=True) else: # No center_point or no distance, so segments_buffer is not filtered by distance segments_buffer = segment_edges # Prepare barriers from the buffered segments (segments_buffer) for tessellation barriers = _prepare_barriers(segments_buffer, primary_barrier_col) # Create tessellation based on buildings and prepared barriers tessellation = create_tessellation( buildings_gdf, primary_barriers=None if barriers.empty else barriers, # Use barriers if available ) # Rename tessellation ID column (typically 'tess_id') to the fixed private ID name tessellation = tessellation.rename(columns={"tess_id": private_id_col}) # Ensure the fixed private ID column exists, creating a sequential ID if necessary if private_id_col not in tessellation.columns: tessellation[private_id_col] = range(len(tessellation)) # Assign sequential private IDs # Determine max_distance for filtering tessellation adjacent to segments max_distance_for_adj_filter = distance + clipping_buffer if distance is not None else math.inf # Filter tessellation to only include cells adjacent to the (potentially filtered) 'segments_filtered' tessellation = _filter_adjacent_tessellation( tessellation, segments_filtered, # Use 'segments_filtered' (final graph segments) for adjacency check max_distance=max_distance_for_adj_filter, # Max distance for adjacency ) # Further filter tessellation by network distance if center_point and distance are specified if center_point is not None and distance is not None: tessellation = _filter_tessellation_by_network_distance( tessellation, segments_filtered, # Use 'segments_filtered' for network distance calculation center_point, distance, # Max network distance ) # Optionally preserve building information by joining tessellation with buildings if keep_buildings: tessellation = _add_building_info(tessellation, buildings_gdf) # Determine group_col for private_to_private_graph group_col_for_priv_priv: str | None = "enclosure_index" if group_col_for_priv_priv not in tessellation.columns: if not tessellation.empty: logger.warning( "Column '%s' not found in tessellation. " "Private-to-private graph will not use grouping.", group_col_for_priv_priv, ) group_col_for_priv_priv = None # Create private-to-private graph (adjacency between tessellation cells) _, private_to_private_edges = private_to_private_graph( tessellation, group_col=group_col_for_priv_priv, contiguity=contiguity, ) # Create public-to-public graph (connectivity between street segments) _, public_to_public_edges = public_to_public_graph( segments_filtered, ) # Use 'segments_filtered' (final graph segments) # Create private-to-public graph (interfaces between tessellation and streets) _, private_to_public_edges = private_to_public_graph( tessellation, segments_filtered, # Use 'segments_filtered' (final graph segments) primary_barrier_col=primary_barrier_col, # Optional alternative geometry for public spaces tolerance=tolerance, # Buffer distance for proximity detection ) # Log warning if no private-public connections found if private_to_public_edges.empty: logger.warning("No private to public connections found") # Organize output as a heterogeneous graph structure (nodes and edges dictionaries) nodes = { "private": _set_node_index(tessellation, private_id_col), # Private nodes (tessellation) "public": _set_node_index(segments_filtered, public_id_col), # Public nodes (segments) } # Organize edges into a dictionary with relationship types as keys edges = { ("private", "touched_to", "private"): _set_edge_index( private_to_private_edges, "from_private_id", "to_private_id", # Private-private edges ), ("public", "connected_to", "public"): _set_edge_index( public_to_public_edges, "from_public_id", "to_public_id", # Public-public edges ), ("private", "faced_to", "public"): _set_edge_index( private_to_public_edges, "private_id", "public_id", # Private-public edges ), } return (nodes, edges) if not as_nx else gdf_to_nx(nodes, edges)
# ============================================================================ # PRIVATE TO PRIVATE GRAPH FUNCTIONS # ============================================================================
[docs] def private_to_private_graph( private_gdf: gpd.GeoDataFrame, group_col: str | None = None, contiguity: str = "queen", as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: """ Create edges between contiguous private polygons based on spatial adjacency. This function identifies spatial adjacency relationships between private polygons (e.g., tessellation cells) using either Queen or Rook contiguity criteria. Optionally groups connections within specified groups (e.g., enclosures). The input `private_gdf` is expected to have a 'private_id' column. Parameters ---------- private_gdf : geopandas.GeoDataFrame GeoDataFrame containing private space polygons. Must contain a 'private_id' column. group_col : str, optional Column name for grouping connections. Only polygons within the same group will be connected. If None, all polygons are considered as one group. contiguity : str, default="queen" Type of spatial contiguity to use. Must be either "queen" or "rook". Queen contiguity includes vertex neighbors, Rook includes only edge neighbors. as_nx : bool, default=False If True, convert the output to a NetworkX graph. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] | networkx.Graph A tuple containing: - Nodes of the private graph (the input private_gdf). - Edges of the private graph (adjacency connections). If as_nx is True, returns a NetworkX graph. Raises ------ TypeError If private_gdf is not a GeoDataFrame. ValueError If contiguity not in {"queen", "rook"}, or if group_col doesn't exist. See Also -------- morphological_graph : Main function that creates comprehensive morphological graphs. private_to_public_graph : Create connections between private and public spaces. public_to_public_graph : Create connectivity between public spaces. Notes ----- The function uses libpysal's spatial weights to determine adjacency relationships. Edge geometries are created as LineStrings connecting polygon centroids. Self-connections and duplicate edges are automatically filtered out. The input private_gdf is expected to have a 'private_id' column. Examples -------- >>> # Create private-to-private adjacency graph >>> nodes, edges = private_to_private_graph(tessellation_gdf) >>> # With grouping by enclosures >>> nodes, edges = private_to_private_graph(tessellation_gdf, group_col='enclosure_id') """ # Input validation _validate_single_gdf_input(private_gdf, "private_gdf") private_id_col = "private_id" # Fixed ID column name for private polygons # If not empty, require the ID column if not private_gdf.empty and private_id_col not in private_gdf.columns: msg = f"Expected ID column '{private_id_col}' not found in private_gdf." raise ValueError(msg) # Validate that the contiguity type is supported if contiguity not in {"queen", "rook"}: msg = "contiguity must be either 'queen' or 'rook'" raise ValueError(msg) # Handle empty or insufficient data: return empty edges GeoDataFrame if private_gdf.empty or len(private_gdf) < 2: group_cols = [group_col or "group"] empty_edges = _create_empty_edges_gdf( private_gdf.crs, "from_private_id", "to_private_id", group_cols, ) return ( (private_gdf, empty_edges) if not as_nx else gdf_to_nx(nodes=private_gdf, edges=empty_edges) ) # Validate that the group column exists if specified if group_col and group_col not in private_gdf.columns: msg = f"group_col '{group_col}' not found in private_gdf columns" raise ValueError(msg) # Reset index for consistent spatial weights computation by libpysal gdf_indexed = private_gdf.reset_index(drop=True) # Create spatial weights matrix (Queen or Rook contiguity) spatial_weights = _create_spatial_weights(gdf_indexed, contiguity) # Extract adjacency relationships from the spatial weights matrix adjacency_data = _extract_adjacency_relationships( spatial_weights, gdf_indexed, private_id_col, group_col, ) # Create edge geometries (LineStrings connecting centroids of adjacent polygons) edges_gdf_data = _create_adjacency_edges(adjacency_data, gdf_indexed, group_col or "group") # Convert the DataFrame with edge geometries to a GeoDataFrame edges_gdf = gpd.GeoDataFrame( edges_gdf_data, # Data including 'from_private_id', 'to_private_id', group, and geometry geometry="geometry", crs=private_gdf.crs, # Preserve original CRS ) return (private_gdf, edges_gdf) if not as_nx else gdf_to_nx(nodes=private_gdf, edges=edges_gdf)
# ============================================================================ # PRIVATE TO PUBLIC GRAPH FUNCTIONS # ============================================================================
[docs] def private_to_public_graph( private_gdf: gpd.GeoDataFrame, public_gdf: gpd.GeoDataFrame, primary_barrier_col: str | None = None, tolerance: float = 1e-6, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: """ Create edges between private polygons and nearby public geometries. This function identifies spatial relationships between private spaces (tessellations) and public spaces (street segments) by finding intersections between buffered public geometries and private polygons. Input GDFs are expected to have 'private_id' and 'public_id' columns respectively. Parameters ---------- private_gdf : geopandas.GeoDataFrame GeoDataFrame containing private space polygons. Expected to have a 'private_id' column. public_gdf : geopandas.GeoDataFrame GeoDataFrame containing public space geometries (typically LineStrings). Expected to have a 'public_id' column. primary_barrier_col : str, optional Column name for alternative public geometry. If specified and exists, this geometry will be used instead of the main geometry column. tolerance : float, default=1e-6 Buffer distance for public geometries to detect proximity to private spaces. as_nx : bool, default=False If True, convert the output to a NetworkX graph. Returns ------- tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] | networkx.Graph A tuple containing: - Nodes of the private and public graphs (the input private_gdf and public_gdf). - Edges of the private-public graph (connections between private polygons and public geometries). If as_nx is True, returns a NetworkX graph. Raises ------ TypeError If private_gdf or public_gdf are not GeoDataFrames. ValueError If 'private_id' or 'public_id' columns are missing from input GDFs. See Also -------- morphological_graph : Main function that creates comprehensive morphological graphs. private_to_private_graph : Create adjacency between private spaces. public_to_public_graph : Create connectivity between public spaces. Notes ----- Edge geometries are created as LineStrings connecting the centroids of private polygons and public geometries. The function uses spatial joins to identify overlapping areas within the specified tolerance. Input GDFs are expected to have 'private_id' and 'public_id' columns respectively. Examples -------- >>> # Create private-to-public interface graph >>> nodes, edges = private_to_public_graph(tessellation_gdf, segments_gdf) >>> # With custom tolerance >>> nodes, edges = private_to_public_graph(tessellation_gdf, segments_gdf, tolerance=2.0) """ # Input validation _validate_single_gdf_input(private_gdf, "private_gdf") _validate_single_gdf_input(public_gdf, "public_gdf") private_id_col = "private_id" # Fixed ID column name for private spaces public_id_col = "public_id" # Fixed ID column name for public spaces # Handle empty data: return empty edges GeoDataFrame if private_gdf.empty or public_gdf.empty: empty_edges = _create_empty_edges_gdf(private_gdf.crs, private_id_col, public_id_col) all_nodes = pd.concat([private_gdf, public_gdf], ignore_index=True) return ( (all_nodes, empty_edges) if not as_nx else gdf_to_nx(nodes=all_nodes, edges=empty_edges) ) # Ensure required ID columns exist in the input GeoDataFrames if private_id_col not in private_gdf.columns: msg = f"Expected ID column '{private_id_col}' not found in private_gdf." raise ValueError(msg) if public_id_col not in public_gdf.columns: msg = f"Expected ID column '{public_id_col}' not found in public_gdf." raise ValueError(msg) # Ensure CRS consistency between private and public GeoDataFrames public_gdf = _ensure_crs_consistency(private_gdf, public_gdf) # Determine which geometry column to use for public spaces (main or alternative) join_geom_series = ( public_gdf[primary_barrier_col] if primary_barrier_col and primary_barrier_col in public_gdf.columns else public_gdf.geometry ) # Create buffered geometries for public spaces to detect proximity buffered_public_data = {public_id_col: public_gdf[public_id_col]} # Keep public IDs buffered_public = gpd.GeoDataFrame( buffered_public_data, geometry=join_geom_series.buffer(tolerance), # Buffer the selected public geometry crs=public_gdf.crs, ) # Perform spatial join to find intersections between private polygons and buffered public geometries joined = gpd.sjoin( private_gdf[[private_id_col, "geometry"]], # Private polygons with their IDs buffered_public, # Buffered public geometries with their IDs how="inner", # Keep only intersecting pairs predicate="intersects", # Use intersection predicate ) # If intersections found, keep only the ID columns if not joined.empty: id_cols_to_keep = [private_id_col, public_id_col] joined = joined[id_cols_to_keep] # Select relevant ID columns # Drop duplicate pairs of (private_id, public_id) joined = joined.drop_duplicates() # Create maps of centroids for private and public geometries, indexed by their IDs private_centroids_map = ( private_gdf.drop_duplicates(subset=[private_id_col]) .set_index(private_id_col) .geometry.centroid ) public_centroids_map = ( public_gdf.drop_duplicates(subset=[public_id_col]) .set_index(public_id_col) .geometry.centroid ) # Prepare to add edge geometries to the joined DataFrame joined_with_geom = joined.copy() if joined_with_geom.empty: edges_gdf = gpd.GeoDataFrame(joined_with_geom, geometry="geometry", crs=private_gdf.crs) all_nodes = pd.concat([private_gdf, public_gdf], ignore_index=True) return (all_nodes, edges_gdf) if not as_nx else gdf_to_nx(nodes=all_nodes, edges=edges_gdf) # Get centroid geometries for each pair in the 'joined' DataFrame private_centroids = private_centroids_map.loc[joined_with_geom[private_id_col]].reset_index( drop=True, ) public_centroids = public_centroids_map.loc[joined_with_geom[public_id_col]].reset_index( drop=True, ) # Extract coordinates and ensure 2D array shape private_coords = np.array(list(zip(private_centroids.x, private_centroids.y, strict=True))) public_coords = np.array(list(zip(public_centroids.x, public_centroids.y, strict=True))) # Ensure coords are 2D by reshaping if needed private_coords = private_coords.reshape(-1, 2) public_coords = public_coords.reshape(-1, 2) # Stack coordinates for LineString creation line_coords = np.stack((private_coords, public_coords), axis=1) joined_with_geom["geometry"] = list(sh_linestrings(line_coords)) # Concatenate private and public GeoDataFrames to create a unified nodes GeoDataFrame nodes_gdf = pd.concat([private_gdf, public_gdf], ignore_index=True) # Convert the DataFrame with edge geometries to a GeoDataFrame edges_gdf = gpd.GeoDataFrame(joined_with_geom, geometry="geometry", crs=private_gdf.crs) return (nodes_gdf, edges_gdf) if not as_nx else gdf_to_nx(nodes=nodes_gdf, edges=edges_gdf)
# ============================================================================ # PUBLIC TO PUBLIC GRAPH FUNCTIONS # ============================================================================
[docs] def public_to_public_graph( public_gdf: gpd.GeoDataFrame, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph: """ Create edges between connected public segments based on topological connectivity. This function identifies topological connections between public space geometries (typically street segments) using the dual graph approach to find segments that share endpoints or connection points. The function automatically creates a unique identifier for each row if needed. Parameters ---------- public_gdf : geopandas.GeoDataFrame GeoDataFrame containing public space geometries (typically LineString). as_nx : bool, default=False If True, convert the output to a NetworkX graph. Returns ------- tuple[geopandas.GeoDataFrame, gpd.GeoDataFrame] | networkx.Graph A tuple containing: - Nodes of the public graph (the input public_gdf). - Edges of the public graph (connectivity). If as_nx is True, returns a NetworkX graph. Raises ------ TypeError If public_gdf is not a GeoDataFrame. See Also -------- morphological_graph : Main function that creates comprehensive morphological graphs. private_to_private_graph : Create adjacency between private spaces. private_to_public_graph : Create connections between private and public spaces. Notes ----- The function uses the dual graph approach where each LineString becomes a node, and edges represent topological connections between segments. Edge geometries are created as LineStrings connecting the centroids of connected segments. Examples -------- >>> # Create public-to-public connectivity graph >>> nodes, edges = public_to_public_graph(segments_gdf) >>> # Convert to NetworkX format >>> graph = public_to_public_graph(segments_gdf, as_nx=True) """ # Input validation _validate_single_gdf_input(public_gdf, "public_gdf") # Handle empty or insufficient data: return empty edges GeoDataFrame if public_gdf.empty or len(public_gdf) < 2: empty_edges = _create_empty_edges_gdf(public_gdf.crs, "from_public_id", "to_public_id") return ( (public_gdf, empty_edges) if not as_nx else gdf_to_nx(nodes=public_gdf, edges=empty_edges) ) # Create a copy to avoid modifying the original public_gdf_copy = public_gdf.copy() # Check if public_id column already exists (shared with other components) if "public_id" in public_gdf_copy.columns: edge_id_col = "public_id" else: edge_id_col = "_edge_id" public_gdf_copy[edge_id_col] = [str(idx) for idx in public_gdf_copy.index] # Convert public_gdf to a graph segment_nodes, segment_edges = segments_to_graph(public_gdf_copy) # Create dual graph (nodes are segments, edges are connections) _, dual_edges = dual_graph( (segment_nodes, segment_edges), edge_id_col=edge_id_col, keep_original_geom=True, ) # Preserve the original MultiIndex structure and data types if isinstance(dual_edges.index, pd.MultiIndex): # Rename the MultiIndex levels for clarity and consistency dual_edges.index.names = ["from_public_id", "to_public_id"] # Reset index to ensure it is a regular DataFrame dual_edges = dual_edges.reset_index() return (public_gdf, dual_edges) if not as_nx else gdf_to_nx(nodes=public_gdf, edges=dual_edges)
# ============================================================================ # HELPER FUNCTIONS FOR VALIDATION AND DATA PROCESSIçNG # ============================================================================ def _validate_input_gdfs(buildings_gdf: gpd.GeoDataFrame, segments_gdf: gpd.GeoDataFrame) -> None: """ Validate the primary input GeoDataFrames for the morphological graph function. This function ensures that the provided buildings and segments inputs are both GeoDataFrames and that their geometries are of the expected types (Polygons for buildings, LineStrings for segments). It serves as a critical initial check to prevent errors in downstream processing. Parameters ---------- buildings_gdf : geopandas.GeoDataFrame GeoDataFrame containing building polygons. segments_gdf : geopandas.GeoDataFrame GeoDataFrame containing street segments. Raises ------ TypeError If either input is not a GeoDataFrame. ValueError If the geometries are not of the expected types. See Also -------- _validate_single_gdf_input : Validate a single GeoDataFrame. """ # Check if buildings_gdf is a GeoDataFrame if not isinstance(buildings_gdf, gpd.GeoDataFrame): msg = "buildings_gdf must be a GeoDataFrame" raise TypeError(msg) # Check if segments_gdf is a GeoDataFrame if not isinstance(segments_gdf, gpd.GeoDataFrame): msg = "segments_gdf must be a GeoDataFrame" raise TypeError(msg) # Validate geometry types for non-empty buildings_gdf if not buildings_gdf.empty: building_geom_types = buildings_gdf.geometry.geom_type.unique() if not all(geom_type in {"Polygon", "MultiPolygon"} for geom_type in building_geom_types): msg = ( f"buildings_gdf must contain only Polygon or MultiPolygon geometries. " f"Found: {', '.join(building_geom_types)}" ) raise ValueError(msg) # Validate geometry types for non-empty segments_gdf if not segments_gdf.empty: # Assuming LineString is required for operations like dual_graph segment_geom_types = segments_gdf.geometry.geom_type.unique() if not all(geom_type in {"LineString"} for geom_type in segment_geom_types): msg = ( f"segments_gdf must contain only LineString geometries. " f"Found: {', '.join(segment_geom_types)}" ) raise ValueError(msg) def _validate_single_gdf_input( gdf: gpd.GeoDataFrame, gdf_name: str, ) -> None: """ Validate that a single input is a GeoDataFrame. This is a simple utility function to ensure that an input object is of type geopandas.GeoDataFrame, raising a TypeError with a descriptive message if it is not. It is used for validating individual geospatial inputs in various functions. Parameters ---------- gdf : geopandas.GeoDataFrame The GeoDataFrame to validate. gdf_name : str The name of the GeoDataFrame, used in the error message. Raises ------ TypeError If the input is not a GeoDataFrame. See Also -------- _validate_input_gdfs : Validate both buildings and segments GeoDataFrames. """ # Check if the input is a GeoDataFrame if not isinstance(gdf, gpd.GeoDataFrame): msg = f"{gdf_name} must be a GeoDataFrame" raise TypeError(msg) def _ensure_crs_consistency( target_gdf: gpd.GeoDataFrame, source_gdf: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """ Ensure that the source GeoDataFrame has the same CRS as the target. This function checks if the Coordinate Reference System (CRS) of the source GeoDataFrame matches that of the target. If they do not match, it reprojects the source to the target's CRS and issues a warning. This is essential for ensuring that spatial operations between the two GeoDataFrames are valid. Parameters ---------- target_gdf : geopandas.GeoDataFrame The GeoDataFrame with the target CRS. source_gdf : geopandas.GeoDataFrame The GeoDataFrame to check and potentially reproject. Returns ------- geopandas.GeoDataFrame The source GeoDataFrame, reprojected to the target CRS if necessary. Warns ----- RuntimeWarning If a CRS mismatch is detected and reprojection is performed. """ # Check if CRS of source_gdf matches target_gdf if source_gdf.crs != target_gdf.crs: warnings.warn( "CRS mismatch detected, reprojecting", RuntimeWarning, stacklevel=3, ) # Warn user return source_gdf.to_crs(target_gdf.crs) # Reproject source_gdf to target_gdf's CRS return source_gdf # Return source_gdf as is if CRSs match def _prepare_barriers( segments: gpd.GeoDataFrame, geom_col: str | None, ) -> gpd.GeoDataFrame: """ Prepare the barrier geometries for tessellation. This function selects the appropriate geometry column from the segments GeoDataFrame to be used as barriers in the tessellation process. If an alternative geometry column is specified and exists, it is used; otherwise, the default geometry column is used. Parameters ---------- segments : geopandas.GeoDataFrame The street segments GeoDataFrame. geom_col : str, optional The name of an alternative geometry column to use for the barriers. Returns ------- geopandas.GeoDataFrame A GeoDataFrame containing the prepared barrier geometries. """ # If an alternative geometry column is specified, exists, and is not the default 'geometry' if geom_col and geom_col in segments.columns and geom_col != "geometry": # Create a new GeoDataFrame using the alternative geometry column as the active geometry return gpd.GeoDataFrame( segments.drop(columns=["geometry"]), # Drop the original 'geometry' column geometry=segments[geom_col], # Set the alternative column as the geometry crs=segments.crs, # Preserve CRS ) return segments.copy() # Otherwise, return a copy of the original segments def _filter_adjacent_tessellation( tessellation: gpd.GeoDataFrame, segments: gpd.GeoDataFrame, max_distance: float = math.inf, ) -> gpd.GeoDataFrame: """ Filter tessellation cells to include only those adjacent to segments. This function filters a tessellation GeoDataFrame to retain only the cells that are within a specified maximum Euclidean distance of the provided street segments. If the tessellation is grouped by enclosures, the filtering is performed independently for each enclosure, considering only the segments that intersect that enclosure. Parameters ---------- tessellation : geopandas.GeoDataFrame The tessellation GeoDataFrame to filter. segments : geopandas.GeoDataFrame The street segments to measure distance against. max_distance : float, default math.inf The maximum distance for a tessellation cell to be considered adjacent. Returns ------- geopandas.GeoDataFrame The filtered tessellation GeoDataFrame. """ # If tessellation is empty, return an empty GeoDataFrame with the same structure if tessellation.empty: return tessellation.copy() # If max_distance is infinite, no filtering is needed based on distance if math.isinf(max_distance): return tessellation.copy() # Check if 'enclosure_index' column exists for grouped processing enclosure_col = "enclosure_index" if "enclosure_index" in tessellation.columns else None # List to store filtered parts of tessellation filtered_parts: list[gpd.GeoDataFrame] = [] # Iterate over each enclosure group for _, group in tessellation.groupby(enclosure_col): # Geometry of the current enclosure enclosure_geom = group.union_all() # Segments intersecting this enclosure segments_in_enclosure = segments[segments.intersects(enclosure_geom)] # Union of segments within this enclosure segment_union_in_enclosure = segments_in_enclosure.union_all() # Centroids of cells in this group centroids_in_group = group.geometry.centroid # Distances to segments in this enclosure distances_in_group = centroids_in_group.distance(segment_union_in_enclosure) # Filter cells in group by distance filtered_group = group.loc[distances_in_group <= max_distance] # Add filtered group to list if not filtered_group.empty: filtered_parts.append(filtered_group) # Concatenate all filtered parts into a single GeoDataFrame return gpd.GeoDataFrame(pd.concat(filtered_parts), crs=tessellation.crs) def _build_spatial_graph( segments: gpd.GeoDataFrame, tessellation_centroids: gpd.GeoSeries, ) -> tuple[nx.Graph, dict[int, str], dict[str, tuple[float, float]]]: """ Build a spatial graph from segments and tessellation centroids. This function creates a unified NetworkX graph that combines the street segment network with the tessellation centroids. The segment endpoints and the centroids are all treated as nodes in this temporary graph, which is used for calculating network distances. Parameters ---------- segments : geopandas.GeoDataFrame The street segments GeoDataFrame. tessellation_centroids : geopandas.GeoSeries A GeoSeries of tessellation centroid points. Returns ------- tuple[networkx.Graph, dict[int, str], dict[str, tuple[float, float]]] A tuple containing the combined graph, a mapping from centroid integer location to node ID, and a dictionary of segment node positions. """ # Convert segment GeoDataFrame to a NetworkX graph; nodes are segment endpoints, edges are segments graph = gdf_to_nx(edges=segments) # Get positions (coordinates) of segment graph nodes segment_node_positions = nx.get_node_attributes(graph, "pos") # Create unique node IDs for each tessellation centroid based on its iloc (integer location) centroid_iloc_to_node_id = { i: f"tess_centroid_{i}" for i, _ in enumerate(tessellation_centroids) } # Prepare new nodes (tessellation centroids) to add to the graph with their positions centroid_nodes = [ (f"tess_centroid_{i}", {"pos": (point.x, point.y), "type": "centroid_node"}) for i, point in enumerate(tessellation_centroids) # point is a Shapely Point object ] # Add tessellation centroid nodes to the graph graph.add_nodes_from(centroid_nodes) return graph, centroid_iloc_to_node_id, segment_node_positions # Return graph and mappings def _connect_centroids_to_segment_graph( graph: nx.Graph, tessellation_centroids: gpd.GeoSeries, # GeoSeries of tessellation centroid Points centroid_iloc_to_node_id: dict[int, str], # Maps centroid iloc to graph node ID segment_node_positions: dict[str, tuple[float, float]], # Positions of segment graph nodes ) -> None: """ Connect centroid nodes to their nearest segment graph nodes. This function uses a KDTree for efficient nearest neighbor search to find the closest segment node for each tessellation centroid. It then adds edges to the graph connecting each centroid to its nearest segment node, with the edge length representing the Euclidean distance. Parameters ---------- graph : networkx.Graph The graph to which the new edges will be added. tessellation_centroids : geopandas.GeoSeries The GeoSeries of tessellation centroid points. centroid_iloc_to_node_id : dict[int, str] A mapping from centroid integer location to its graph node ID. segment_node_positions : dict[str, tuple[float, float]] A dictionary of segment node positions. """ # Prepare segment node IDs and their coordinates for KDTree segment_node_ids = list(segment_node_positions.keys()) segment_node_coords = [list(coord) for coord in segment_node_positions.values()] segment_coords_array = np.array(segment_node_coords) # Build KDTree from segment graph node coordinates for efficient nearest neighbor search kdtree = KDTree(segment_coords_array) # Prepare centroid coordinates for querying the KDTree centroid_coords = [(point.x, point.y) for point in tessellation_centroids.tolist()] centroid_coords_array = np.array(centroid_coords) # Query KDTree: for each centroid, find the nearest segment graph node and distance to it distances_to_segments, segment_indices = kdtree.query(centroid_coords_array) # Prepare edges to connect centroids to their nearest segment graph nodes edges_to_add = [ ( centroid_iloc_to_node_id[i], # Graph node ID of the i-th centroid segment_node_ids[segment_indices[i]], # Graph node ID of the nearest segment node {"length": distances_to_segments[i]}, # Euclidean distance as edge length ) for i in range(len(tessellation_centroids)) # Iterate through each centroid if 0 <= segment_indices[i] < len(segment_node_ids) # Ensure index is valid ] # Add the new edges to the graph if edges_to_add: graph.add_edges_from(edges_to_add) def _connect_centroids_to_centroids( graph: nx.Graph, tessellation_centroids: gpd.GeoSeries, # Assumed to be a GeoSeries of Point geometries centroid_iloc_to_node_id: dict[int, str], # Maps iloc of centroids to graph node_id ) -> None: """ Connect centroid nodes to each other based on Euclidean distance. This function creates a complete graph between all the tessellation centroid nodes, where the weight of each edge is the Euclidean distance between the centroids. This allows for pathfinding between centroids that may not be directly connected to the street network. Parameters ---------- graph : networkx.Graph The graph to which the new edges will be added. tessellation_centroids : geopandas.GeoSeries The GeoSeries of tessellation centroid points. centroid_iloc_to_node_id : dict[int, str] A mapping from centroid integer location to its graph node ID. """ # Get relevant ilocs (integer positions) from the centroid_iloc_to_node_id map and sort them relevant_ilocs = sorted(centroid_iloc_to_node_id.keys()) # Extract centroid Point geometries and their graph node IDs in a consistent, sorted order points_to_connect = tessellation_centroids.iloc[relevant_ilocs] # GeoSeries of Points node_ids_ordered = [ centroid_iloc_to_node_id[iloc] for iloc in relevant_ilocs ] # List of node IDs # Create a 2D NumPy array of coordinates (x, y) from the Point geometries coords_array = np.array([(point.x, point.y) for point in points_to_connect]) # Calculate all pairwise Euclidean distances using scipy.spatial.distance.pdist # pdist returns a condensed distance matrix (a 1D array of upper triangular part) pairwise_distances = pdist(coords_array, metric="euclidean") # List to store edges to be added to the graph edges_to_add = [] # Generate all unique pairs of ordered node IDs using itertools.combinations # The order of pairs matches the order of distances in pairwise_distances for idx, (node_id1, node_id2) in enumerate(itertools.combinations(node_ids_ordered, 2)): distance = pairwise_distances[idx] # Get the corresponding distance edges_to_add.append((node_id1, node_id2, {"length": distance})) # Prepare edge tuple # Add all new centroid-to-centroid edges to the graph if edges_to_add: graph.add_edges_from(edges_to_add) def _find_closest_node_to_center( graph: nx.Graph, center_point_geom: Point, ) -> str: """ Find the graph node ID closest to a geometric center point. This function uses a KDTree to efficiently find the nearest node in the graph to a given geometric point. This is used to identify the starting node for network distance calculations when filtering by distance from a center point. Parameters ---------- graph : networkx.Graph The graph to search within. center_point_geom : shapely.geometry.Point The geometric point to find the closest node to. Returns ------- str The ID of the closest node in the graph. """ # Extract node positions from the graph (all nodes in spatial graphs have 'pos') pos = nx.get_node_attributes(graph, "pos") node_ids = list(pos.keys()) node_coords = np.array(list(pos.values())) # Create a KDTree for efficient nearest neighbor search kdtree = KDTree(node_coords) # Query for the closest node to the center point _, idx = kdtree.query([center_point_geom.x, center_point_geom.y]) # Return the ID of the closest node return str(node_ids[idx]) def _filter_nodes_by_path_length( graph: nx.Graph, source_node_id: str, max_distance: float, centroid_iloc_to_node_id: dict[int, str], ) -> list[int]: """ Filter nodes by their shortest path length from a source node. This function calculates the shortest path distance from a given source node to all other nodes in the graph using Dijkstra's algorithm. It then returns a list of the integer locations (ilocs) of the tessellation centroids whose corresponding nodes are within the specified maximum distance. Parameters ---------- graph : networkx.Graph The graph to perform the search on. source_node_id : str The ID of the source node for the path length calculation. max_distance : float The maximum path length for a node to be included. centroid_iloc_to_node_id : dict[int, str] A mapping from centroid integer location to its graph node ID. Returns ------- list[int] A list of integer locations of the centroids that are within the distance. """ if source_node_id not in graph: # This can happen if the center point is very far from any graph node logger.warning("Source node for distance filtering not found in graph.") return [] lengths = nx.single_source_dijkstra_path_length( graph, source_node_id, weight="length", ) # Get the ilocs of tessellation centroids that are within the max_distance keep_ilocs = [] for iloc, node_id in centroid_iloc_to_node_id.items(): if node_id in lengths and lengths[node_id] <= max_distance: keep_ilocs.append(iloc) return keep_ilocs def _filter_tessellation_by_network_distance( tessellation: gpd.GeoDataFrame, segments: gpd.GeoDataFrame, center_point: gpd.GeoSeries | gpd.GeoDataFrame | Point, # Geographic center for filtering max_distance: float, # Maximum network distance ) -> gpd.GeoDataFrame: """ Filter tessellation by network distance from a center point. This function filters a tessellation GeoDataFrame to include only those cells that are within a specified network distance from a given `center_point`. It constructs a spatial graph combining street segments and tessellation centroids, then uses shortest-path calculations to determine reachability. Parameters ---------- tessellation : geopandas.GeoDataFrame The tessellation GeoDataFrame to filter. segments : geopandas.GeoDataFrame The street segments GeoDataFrame used to build the network for distance calculations. center_point : shapely.geometry.Point or geopandas.GeoSeries or geopandas.GeoDataFrame The geographic center point(s) from which to calculate network distances. max_distance : float The maximum network distance (e.g., in meters) for a tessellation cell to be included. Returns ------- geopandas.GeoDataFrame The filtered tessellation GeoDataFrame, containing only cells within the specified network distance from the center point. """ # Return a copy of tessellation if it or segments are empty if tessellation.empty or segments.empty: return tessellation.copy() # Get centroids of tessellation cells tessellation_centroids = tessellation.geometry.centroid # Build a spatial graph including segment endpoints and tessellation centroids as nodes graph, centroid_iloc_to_node_id, segment_node_positions = _build_spatial_graph( segments, tessellation_centroids, ) # Connect tessellation centroid nodes to the nearest segment graph nodes _connect_centroids_to_segment_graph( graph, tessellation_centroids, centroid_iloc_to_node_id, segment_node_positions, ) # Connect tessellation centroid nodes to each other (e.g., for paths through open spaces) _connect_centroids_to_centroids(graph, tessellation_centroids, centroid_iloc_to_node_id) # Normalize center_point input to a single Shapely Point geometry center_geom = center_point.iloc[0] if isinstance(center_point, gpd.GeoSeries) else center_point # Find the graph node closest to the geographic center_geom center_node_id_in_graph = _find_closest_node_to_center(graph, center_geom) # Filter centroid ilocs based on network path length from the center_node_id_in_graph keep_ilocs = _filter_nodes_by_path_length( graph, center_node_id_in_graph, max_distance, centroid_iloc_to_node_id, ) # Return the subset of the original tessellation corresponding to the kept ilocs return tessellation.iloc[sorted(keep_ilocs)].copy() def _add_building_info( tessellation: gpd.GeoDataFrame, buildings: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """ Add building information to tessellation. This function performs a spatial join between the tessellation GeoDataFrame and the buildings GeoDataFrame to associate each tessellation cell with the building(s) it contains or intersects. It adds a new column `building_geometry` to the tessellation, containing the geometry of the intersecting building. Parameters ---------- tessellation : geopandas.GeoDataFrame The tessellation GeoDataFrame to which building information will be added. buildings : geopandas.GeoDataFrame The GeoDataFrame containing building geometries. Returns ------- geopandas.GeoDataFrame The tessellation GeoDataFrame with an added `building_geometry` column. """ # Perform a spatial join (left join) from tessellation to buildings based on intersection # This adds columns from 'buildings' to 'tessellation' for intersecting features. # 'index_right' column is added by sjoin, containing the index of the joined building. joined = gpd.sjoin(tessellation, buildings, how="left", predicate="intersects") # If 'index_right' exists (meaning some joins occurred), map building geometries if "index_right" in joined.columns: # Create a mapping from building index to building geometry building_geom_map = buildings.geometry.to_dict() # Use the 'index_right' (building indices) to look up geometries from the map # This creates a pandas Series of Shapely geometries. building_geometries_series = joined["index_right"].map(building_geom_map) # Convert this pandas Series to a GeoSeries, assigning the CRS from the source 'buildings' GDF joined["building_geometry"] = gpd.GeoSeries(building_geometries_series, crs=buildings.crs) # Remove the temporary 'index_right' column joined = joined.drop(columns=["index_right"]) return joined # Return tessellation with added building geometry (if any) def _create_empty_edges_gdf( crs: str | int | None, from_col: str, # Name for the 'from' node ID column to_col: str, # Name for the 'to' node ID column extra_cols: list[str] | None = None, # Optional list of additional column names ) -> gpd.GeoDataFrame: """ Create an empty edges GeoDataFrame with specified column structure. This helper function generates an empty GeoDataFrame suitable for representing graph edges, ensuring it has the correct column names for 'from' and 'to' node IDs, and optionally additional columns, along with a geometry column and a specified Coordinate Reference System (CRS). This is useful for initializing empty edge GeoDataFrames when no connections are found or when setting up a new graph structure. Parameters ---------- crs : str, int, or None Coordinate reference system. from_col : str Name for the 'from' node ID column. to_col : str Name for the 'to' node ID column. extra_cols : list[str], optional Optional list of additional column names. Returns ------- geopandas.GeoDataFrame Empty GeoDataFrame with specified columns. """ # Initialize list of column names with 'from' and 'to' ID columns columns = [from_col, to_col] # Add any extra columns if provided if extra_cols: columns.extend(extra_cols) # Add the 'geometry' column name (standard for GeoDataFrames) columns.append("geometry") # Create an empty GeoDataFrame with the defined columns, geometry column, and CRS return gpd.GeoDataFrame(columns=columns, geometry="geometry", crs=crs) def _set_node_index(gdf: gpd.GeoDataFrame, col: str) -> gpd.GeoDataFrame: """ Set GeoDataFrame index using a specified column, if it exists. This function attempts to set the index of a GeoDataFrame to a specified column. If the GeoDataFrame is empty, it safely returns an empty GeoDataFrame with an empty index. Otherwise, it sets the specified column as the new index, dropping the column from the DataFrame's columns. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame. col : str Column name to use as index. Returns ------- geopandas.GeoDataFrame GeoDataFrame with index set if column exists. See Also -------- _set_edge_index : Set multi-index for edge GeoDataFrames. Examples -------- >>> indexed_gdf = _set_node_index(gdf, 'node_id') """ if gdf.empty: # For an empty GDF, set an empty index. # Attempting to set_index with a non-existent column `col` would error. # If `col` is the intended index name, it can be assigned after. return gdf.set_index(pd.Index([])) # Safest for empty return gdf.set_index(col, drop=True) def _set_edge_index( gdf: gpd.GeoDataFrame, from_col: str, # Column name for the 'from' part of the MultiIndex to_col: str, # Column name for the 'to' part of the MultiIndex ) -> gpd.GeoDataFrame: """ Set multi-index for edge GeoDataFrame. This function sets a MultiIndex on an edge GeoDataFrame using specified 'from' and 'to' column names. This is crucial for representing graph edges where each edge is uniquely identified by its source and target nodes. Parameters ---------- gdf : geopandas.GeoDataFrame The edge GeoDataFrame to modify. from_col : str The name of the column to be used as the first level of the MultiIndex (representing the source node ID). to_col : str The name of the column to be used as the second level of the MultiIndex (representing the target node ID). Returns ------- geopandas.GeoDataFrame The GeoDataFrame with the new MultiIndex applied. """ return gdf.set_index([from_col, to_col]) # ============================================================================ # HELPER FUNCTIONS FOR SPATIAL WEIGHTS AND ADJACENCY # ============================================================================ def _create_spatial_weights(gdf: gpd.GeoDataFrame, contiguity: str) -> libpysal.weights.W | None: """ Create spatial weights matrix for adjacency analysis. This function generates a spatial weights matrix based on the specified contiguity type (Queen or Rook) for a given GeoDataFrame of polygon geometries. This matrix is crucial for defining adjacency relationships between spatial units, which is a fundamental step in many spatial analysis and graph construction tasks. Parameters ---------- gdf : geopandas.GeoDataFrame Input GeoDataFrame with polygon geometries. contiguity : str Type of contiguity ("queen" or "rook"). Returns ------- libpysal.weights.W or None Spatial weights matrix, or None if creation fails. See Also -------- _extract_adjacency_relationships : Extract relationships from spatial weights matrix. Examples -------- >>> weights = _create_spatial_weights(gdf, 'queen') """ # Attempt to create spatial weights matrix using libpysal if contiguity == "queen": # Queen contiguity (shared edges or vertices) return libpysal.weights.Queen.from_dataframe(gdf, use_index=True) # Rook contiguity (shared edges only) return libpysal.weights.Rook.from_dataframe(gdf, use_index=True) def _extract_adjacency_relationships( spatial_weights: libpysal.weights.W, # Precomputed spatial weights matrix gdf: gpd.GeoDataFrame, # GeoDataFrame from which weights were computed (indexed 0..N-1) id_col: str, # Name of the column in 'gdf' containing the actual polygon IDs group_col: str | None, # Optional column name for grouping adjacencies ) -> pd.DataFrame: """ Extract adjacency relationships from spatial weights matrix. This function processes a spatial weights matrix to extract pairs of adjacent polygons and their corresponding IDs. It can optionally filter these relationships to include only those within specified groups, ensuring that connections are meaningful within a larger spatial context. Parameters ---------- spatial_weights : libpysal.weights.W Spatial weights matrix. gdf : geopandas.GeoDataFrame Input GeoDataFrame. id_col : str ID column name. group_col : str, optional Group column name for filtering. Returns ------- pandas.DataFrame DataFrame with adjacency relationships. See Also -------- _create_spatial_weights : Create spatial weights matrix. _create_adjacency_edges : Create edge geometries from adjacency data. Examples -------- >>> adj_data = _extract_adjacency_relationships(weights, gdf, 'id', 'group') """ # Convert spatial weights to COO (Coordinate Format) sparse matrix for easier pair extraction coo = spatial_weights.sparse.tocoo() # Create a mask to get only unique pairs (upper triangle, row < col) to avoid duplicates (i,j) and (j,i) mask = coo.row < coo.col rows = coo.row[mask] # Indices of 'from' polygons (based on gdf's 0..N-1 index) cols = coo.col[mask] # Indices of 'to' polygons (based on gdf's 0..N-1 index) # Extract the actual polygon IDs using the 'id_col' from the input 'gdf' from_ids = gdf.iloc[rows][id_col].to_numpy() # Actual IDs for 'from' polygons to_ids = gdf.iloc[cols][id_col].to_numpy() # Actual IDs for 'to' polygons # If grouping is specified, filter adjacencies to only include pairs within the same group if group_col: grp_i = gdf.iloc[rows][group_col].to_numpy() # Group values for 'from' polygons grp_j = gdf.iloc[cols][group_col].to_numpy() # Group values for 'to' polygons valid_group_mask = grp_i == grp_j # Mask for pairs in the same group # Apply group filter to indices and IDs rows = rows[valid_group_mask] cols = cols[valid_group_mask] from_ids = from_ids[valid_group_mask] to_ids = to_ids[valid_group_mask] groups = grp_i[valid_group_mask] # Group values for the valid pairs else: # If no grouping, assign a default group "all" to all pairs groups = np.full(len(from_ids), "all", dtype=object) group_col = "group" # Set group_col name for the output DataFrame # Filter out self-loops (though coo.row < coo.col should prevent this for simple IDs) # This is an additional safeguard, especially if id_col might not be perfectly unique per geometry. valid_ids_filter = from_ids != to_ids from_ids = from_ids[valid_ids_filter] to_ids = to_ids[valid_ids_filter] groups = groups[valid_ids_filter] rows = rows[valid_ids_filter] cols = cols[valid_ids_filter] # Return a DataFrame containing the adjacency relationships return pd.DataFrame( { "row": rows, # Original 0..N-1 index of 'from' polygon in gdf_indexed "col": cols, # Original 0..N-1 index of 'to' polygon in gdf_indexed "from_private_id": from_ids, # Actual ID of 'from' polygon "to_private_id": to_ids, # Actual ID of 'to' polygon group_col: groups, # Group identifier for the pair }, ) def _create_adjacency_edges( adjacency_data: pd.DataFrame, # DataFrame from _extract_adjacency_relationships gdf: gpd.GeoDataFrame, # Original private_gdf, indexed 0..N-1, used for centroid lookup group_col: str, # Name of the group column in adjacency_data ) -> pd.DataFrame: # Returns a DataFrame, to be converted to GeoDataFrame by caller """ Create edge geometries from adjacency relationships. This function processes adjacency data to generate edge geometries connecting adjacent spatial features, creating LineString geometries between centroids of neighboring features based on their spatial relationships. Parameters ---------- adjacency_data : pandas.DataFrame DataFrame with adjacency relationships (must include 'row', 'col', 'from_private_id', 'to_private_id', and group_col columns). gdf : geopandas.GeoDataFrame Input GeoDataFrame with geometries (assumed to have a simple 0..N-1 index corresponding to 'row'/'col' in adjacency_data). group_col : str Group column name present in adjacency_data. Returns ------- pandas.DataFrame DataFrame with edge geometries. See Also -------- _extract_adjacency_relationships : Extract adjacency relationships. Examples -------- >>> edges_df = _create_adjacency_edges(adj_data, gdf, 'group') """ # Work with a copy to avoid modifying the input DataFrame adjacency_processed = adjacency_data.copy() # Sort ID pairs to ensure canonical representation (e.g., (A,B) not (B,A)) for deduplication id_pairs = adjacency_processed[["from_private_id", "to_private_id"]].to_numpy() sorted_id_pairs = np.sort(id_pairs, axis=1) # Sort each pair [id1, id2] adjacency_processed["from_private_id"] = sorted_id_pairs[:, 0] # Assign smaller ID to 'from' adjacency_processed["to_private_id"] = sorted_id_pairs[:, 1] # Assign larger ID to 'to' # Drop duplicate edges based on the sorted (from_id, to_id) and group adjacency_processed = adjacency_processed.drop_duplicates( subset=["from_private_id", "to_private_id", group_col], ) if adjacency_processed.empty: return adjacency_processed.reindex( columns=["from_private_id", "to_private_id", group_col, "geometry"], ) # Calculate centroids of all polygons in the input GeoDataFrame 'gdf' # 'gdf' is assumed to be the one with 0..N-1 index matching 'row'/'col' in adjacency_data centroids = gdf.geometry.centroid # Get the 0..N-1 indices for 'from' and 'to' polygons from adjacency_data rows_idx = adjacency_processed["row"].to_numpy() # Indices for 'from' centroids cols_idx = adjacency_processed["col"].to_numpy() # Indices for 'to' centroids # Look up centroid geometries using these indices from_centroids = centroids.iloc[rows_idx] # Centroids of 'from' polygons to_centroids = centroids.iloc[cols_idx] # Centroids of 'to' polygons # Extract (x,y) coordinates for from_centroids and to_centroids from_coords = np.array(list(zip(from_centroids.x, from_centroids.y, strict=True))) to_coords = np.array(list(zip(to_centroids.x, to_centroids.y, strict=True))) # Stack coordinates to form pairs for LineString creation [(x1,y1), (x2,y2)] line_coords = np.stack((from_coords, to_coords), axis=1) # Create LineStrings and assign to the 'geometry' column adjacency_processed["geometry"] = list(sh_linestrings(line_coords)) # Drop intermediate 'row' and 'col' index columns if they exist columns_to_drop = [col for col in ["row", "col"] if col in adjacency_processed.columns] # Define final columns for the output DataFrame final_columns = ["from_private_id", "to_private_id", group_col, "geometry"] # Return the processed DataFrame with selected columns return adjacency_processed.drop(columns=columns_to_drop)[final_columns]