"""
Module for creating morphological graphs from urban data.
This module provides comprehensive functionality for analyzing urban morphology
through graph representations, focusing on the relationships between private
spaces (buildings and their tessellations) and public spaces (street segments).
It creates heterogeneous graphs that capture the complex spatial relationships
inherent in urban environments. Both GeoDataFrame and NetworkX objects can be
converted to PyTorch Geometric Data or HeteroData by functions from graph.py.
The module specializes in three types of spatial relationships:
1. Private-to-private: Adjacency relationships between building tessellations
2. Public-to-public: Topological connectivity between street segments
3. Private-to-public: Interface relationships between private and public spaces
"""
# Standard library imports
import itertools
import logging
import math
import warnings
# Third-party imports
import geopandas as gpd
import libpysal
import networkx as nx
import numpy as np
import pandas as pd
from scipy.spatial import KDTree
from scipy.spatial.distance import pdist
from shapely.creation import linestrings as sh_linestrings
from shapely.geometry import Point
# Local imports
from .utils import create_tessellation
from .utils import dual_graph
from .utils import filter_graph_by_distance
from .utils import gdf_to_nx
from .utils import nx_to_gdf
from .utils import segments_to_graph
# Public API definition
__all__ = [
"morphological_graph",
"private_to_private_graph",
"private_to_public_graph",
"public_to_public_graph",
]
# Module logger configuration
logger = logging.getLogger(__name__)
# ============================================================================
# MAIN MORPHOLOGICAL GRAPH FUNCTION
# ============================================================================
[docs]
def morphological_graph( # noqa: PLR0912, PLR0915
buildings_gdf: gpd.GeoDataFrame,
segments_gdf: gpd.GeoDataFrame,
center_point: gpd.GeoSeries | gpd.GeoDataFrame | None = None,
distance: float | None = None,
clipping_buffer: float = math.inf,
primary_barrier_col: str | None = "barrier_geometry",
contiguity: str = "queen",
keep_buildings: bool = False,
tolerance: float = 1e-6,
as_nx: bool = False,
) -> tuple[dict[str, gpd.GeoDataFrame], dict[tuple[str, str, str], gpd.GeoDataFrame]] | nx.Graph:
"""
Create a morphological graph from buildings and street segments.
This function creates a comprehensive morphological graph that captures relationships
between private spaces (building tessellations) and public spaces (street segments).
The graph includes three types of relationships: private-to-private adjacency,
public-to-public connectivity, and private-to-public interfaces.
The 'private_id' for tessellation cells is derived from 'tess_id' (generated by
`create_tessellation`) or assigned sequentially if 'tess_id' doesn't directly map.
The 'public_id' for street segments is taken directly from the index of `segments_gdf`.
Parameters
----------
buildings_gdf : geopandas.GeoDataFrame
GeoDataFrame containing building polygons. Should contain Polygon or MultiPolygon geometries.
segments_gdf : geopandas.GeoDataFrame
GeoDataFrame containing street segments. Should contain LineString geometries.
center_point : geopandas.GeoSeries or geopandas.GeoDataFrame, optional
Center point(s) for spatial filtering. If provided with distance parameter,
only segments within the specified distance will be included.
distance : float, optional
Maximum distance from ``center_point`` for spatial filtering. When
specified, street segments beyond this shortest-path distance are
removed and tessellation cells are kept only if their own distance via
these segments does not exceed this value.
clipping_buffer : float, default=math.inf
Buffer distance to ensure adequate context for generating tessellation.
Must be non-negative.
primary_barrier_col : str, optional
Column name containing alternative geometry for public spaces. If specified and exists,
this geometry will be used instead of the main geometry column for tessellation barriers.
contiguity : str, default="queen"
Type of spatial contiguity for private-to-private connections.
Must be either "queen" or "rook".
keep_buildings : bool, default=False
If True, preserves building information in the tessellation output.
tolerance : float, default=1e-6
Buffer distance for public geometries when creating private-to-public connections.
This parameter controls how close private spaces need to be to public spaces
to establish a connection.
as_nx : bool, default=False
If True, convert the output to a NetworkX graph.
Returns
-------
tuple[dict[str, geopandas.GeoDataFrame], dict[tuple[str, str, str], geopandas.GeoDataFrame]] | networkx.Graph
A tuple containing:
- nodes: Dictionary with keys "private" and "public" containing node GeoDataFrames
- edges: Dictionary with relationship type keys containing edge GeoDataFrames
If as_nx is True, returns a NetworkX graph.
Raises
------
TypeError
If buildings_gdf or segments_gdf are not GeoDataFrames.
ValueError
If contiguity parameter is not "queen" or "rook".
If clipping_buffer is negative.
See Also
--------
private_to_private_graph : Create adjacency between private spaces.
private_to_public_graph : Create connections between private and public spaces.
public_to_public_graph : Create connectivity between public spaces.
Notes
-----
The function first filters the street network by `distance` (resulting in `segs`).
A `segs_buffer` GeoDataFrame is also created for tessellation context, potentially
filtered by `distance + clipping_buffer` or `distance` if `center_point` and
`distance` are provided. This `segs_buffer` is used to create enclosures and
tessellations.
It then establishes three types of relationships:
1. Private-to-private: Adjacency between tessellation cells (handled by private_to_private_graph)
2. Public-to-public: Topological connectivity between street segments
3. Private-to-public: Spatial interfaces between tessellations and streets
The output follows a heterogeneous graph structure suitable for network analysis
of urban morphology.
Examples
--------
>>> # Create morphological graph from buildings and segments
>>> nodes, edges = morphological_graph(buildings_gdf, segments_gdf)
>>> private_nodes = nodes['private']
>>> public_nodes = nodes['public']
"""
# Define fixed ID column names
private_id_col = "private_id"
public_id_col = "public_id"
# Validate input GeoDataFrames (includes geometry type checks)
_validate_input_gdfs(buildings_gdf, segments_gdf)
# Validate contiguity parameter
if contiguity not in {"queen", "rook"}:
msg = "contiguity must be 'queen' or 'rook'"
raise ValueError(msg)
# Validate clipping_buffer
if clipping_buffer < 0:
msg = "clipping_buffer cannot be negative."
raise ValueError(msg)
# Convert MultiIndex to tuples if buildings_gdf has MultiIndex
if isinstance(buildings_gdf.index, pd.MultiIndex):
buildings_gdf = buildings_gdf.copy()
buildings_gdf.index = buildings_gdf.index.to_flat_index()
# Ensure CRS consistency between buildings and segments
segments_gdf = _ensure_crs_consistency(buildings_gdf, segments_gdf)
# Assign the original index of segments_gdf to public_id_col
# A copy is made to avoid SettingWithCopyWarning if segments_gdf might be a slice,
# especially before adding/modifying a column.
segments_gdf = segments_gdf.copy()
segments_gdf[public_id_col] = segments_gdf.index # Use original index as public ID
# Convert segments to a graph representation for efficient filtering.
if not segments_gdf.empty:
# segments_to_graph returns nodes and edges with a MultiIndex.
# gdf_to_nx converts these to a NetworkX graph.
segment_nodes, segment_edges = segments_to_graph(segments_gdf)
segments_graph = gdf_to_nx(nodes=segment_nodes, edges=segment_edges)
else:
# Create empty structures to avoid errors downstream
segment_nodes, segment_edges = segments_to_graph(segments_gdf)
segments_graph = nx.Graph()
# Filter segments by network distance for the final graph if center_point and distance are provided
if center_point is not None and distance is not None and not segments_gdf.empty:
filtered_segments_graph = filter_graph_by_distance(segments_graph, center_point, distance)
segments_filtered = nx_to_gdf(filtered_segments_graph, nodes=False, edges=True)
else:
segments_filtered = segment_edges
# Create a buffered version of the graph for tessellation creation (segments_buffer)
# This segments_buffer is used to _prepare_barriers -> create_tessellation
# and as the segments context for _filter_adjacent_tessellation.
if center_point is not None and distance is not None and not segments_gdf.empty:
if not math.isinf(clipping_buffer):
# Finite clipping_buffer: use distance + clipping_buffer for segments_buffer radius
buffer_radius = distance + clipping_buffer
segments_buffer_graph = filter_graph_by_distance(
segments_graph,
center_point,
buffer_radius,
)
segments_buffer = nx_to_gdf(segments_buffer_graph, nodes=False, edges=True)
else: # clipping_buffer is math.inf
# Fallback to 'distance' as radius for segments_buffer
segments_buffer_graph = filter_graph_by_distance(
segments_graph,
center_point,
distance,
)
segments_buffer = nx_to_gdf(segments_buffer_graph, nodes=False, edges=True)
else:
# No center_point or no distance, so segments_buffer is not filtered by distance
segments_buffer = segment_edges
# Prepare barriers from the buffered segments (segments_buffer) for tessellation
barriers = _prepare_barriers(segments_buffer, primary_barrier_col)
# Create tessellation based on buildings and prepared barriers
tessellation = create_tessellation(
buildings_gdf,
primary_barriers=None if barriers.empty else barriers, # Use barriers if available
)
# Rename tessellation ID column (typically 'tess_id') to the fixed private ID name
tessellation = tessellation.rename(columns={"tess_id": private_id_col})
# Ensure the fixed private ID column exists, creating a sequential ID if necessary
if private_id_col not in tessellation.columns:
tessellation[private_id_col] = range(len(tessellation)) # Assign sequential private IDs
# Determine max_distance for filtering tessellation adjacent to segments
max_distance_for_adj_filter = distance + clipping_buffer if distance is not None else math.inf
# Filter tessellation to only include cells adjacent to the (potentially filtered) 'segments_filtered'
tessellation = _filter_adjacent_tessellation(
tessellation,
segments_filtered, # Use 'segments_filtered' (final graph segments) for adjacency check
max_distance=max_distance_for_adj_filter, # Max distance for adjacency
)
# Further filter tessellation by network distance if center_point and distance are specified
if center_point is not None and distance is not None:
tessellation = _filter_tessellation_by_network_distance(
tessellation,
segments_filtered, # Use 'segments_filtered' for network distance calculation
center_point,
distance, # Max network distance
)
# Optionally preserve building information by joining tessellation with buildings
if keep_buildings:
tessellation = _add_building_info(tessellation, buildings_gdf)
# Determine group_col for private_to_private_graph
group_col_for_priv_priv: str | None = "enclosure_index"
if group_col_for_priv_priv not in tessellation.columns:
if not tessellation.empty:
logger.warning(
"Column '%s' not found in tessellation. "
"Private-to-private graph will not use grouping.",
group_col_for_priv_priv,
)
group_col_for_priv_priv = None
# Create private-to-private graph (adjacency between tessellation cells)
_, private_to_private_edges = private_to_private_graph(
tessellation,
group_col=group_col_for_priv_priv,
contiguity=contiguity,
)
# Create public-to-public graph (connectivity between street segments)
_, public_to_public_edges = public_to_public_graph(
segments_filtered,
) # Use 'segments_filtered' (final graph segments)
# Create private-to-public graph (interfaces between tessellation and streets)
_, private_to_public_edges = private_to_public_graph(
tessellation,
segments_filtered, # Use 'segments_filtered' (final graph segments)
primary_barrier_col=primary_barrier_col, # Optional alternative geometry for public spaces
tolerance=tolerance, # Buffer distance for proximity detection
)
# Log warning if no private-public connections found
if private_to_public_edges.empty:
logger.warning("No private to public connections found")
# Organize output as a heterogeneous graph structure (nodes and edges dictionaries)
nodes = {
"private": _set_node_index(tessellation, private_id_col), # Private nodes (tessellation)
"public": _set_node_index(segments_filtered, public_id_col), # Public nodes (segments)
}
# Organize edges into a dictionary with relationship types as keys
edges = {
("private", "touched_to", "private"): _set_edge_index(
private_to_private_edges,
"from_private_id",
"to_private_id", # Private-private edges
),
("public", "connected_to", "public"): _set_edge_index(
public_to_public_edges,
"from_public_id",
"to_public_id", # Public-public edges
),
("private", "faced_to", "public"): _set_edge_index(
private_to_public_edges,
"private_id",
"public_id", # Private-public edges
),
}
return (nodes, edges) if not as_nx else gdf_to_nx(nodes, edges)
# ============================================================================
# PRIVATE TO PRIVATE GRAPH FUNCTIONS
# ============================================================================
[docs]
def private_to_private_graph(
private_gdf: gpd.GeoDataFrame,
group_col: str | None = None,
contiguity: str = "queen",
as_nx: bool = False,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph:
"""
Create edges between contiguous private polygons based on spatial adjacency.
This function identifies spatial adjacency relationships between private polygons
(e.g., tessellation cells) using either Queen or Rook contiguity criteria.
Optionally groups connections within specified groups (e.g., enclosures).
The input `private_gdf` is expected to have a 'private_id' column.
Parameters
----------
private_gdf : geopandas.GeoDataFrame
GeoDataFrame containing private space polygons. Must contain a 'private_id' column.
group_col : str, optional
Column name for grouping connections. Only polygons within the same group
will be connected. If None, all polygons are considered as one group.
contiguity : str, default="queen"
Type of spatial contiguity to use. Must be either "queen" or "rook".
Queen contiguity includes vertex neighbors, Rook includes only edge neighbors.
as_nx : bool, default=False
If True, convert the output to a NetworkX graph.
Returns
-------
tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] | networkx.Graph
A tuple containing:
- Nodes of the private graph (the input private_gdf).
- Edges of the private graph (adjacency connections).
If as_nx is True, returns a NetworkX graph.
Raises
------
TypeError
If private_gdf is not a GeoDataFrame.
ValueError
If contiguity not in {"queen", "rook"}, or if group_col doesn't exist.
See Also
--------
morphological_graph : Main function that creates comprehensive morphological graphs.
private_to_public_graph : Create connections between private and public spaces.
public_to_public_graph : Create connectivity between public spaces.
Notes
-----
The function uses libpysal's spatial weights to determine adjacency relationships.
Edge geometries are created as LineStrings connecting polygon centroids.
Self-connections and duplicate edges are automatically filtered out.
The input private_gdf is expected to have a 'private_id' column.
Examples
--------
>>> # Create private-to-private adjacency graph
>>> nodes, edges = private_to_private_graph(tessellation_gdf)
>>> # With grouping by enclosures
>>> nodes, edges = private_to_private_graph(tessellation_gdf, group_col='enclosure_id')
"""
# Input validation
_validate_single_gdf_input(private_gdf, "private_gdf")
private_id_col = "private_id" # Fixed ID column name for private polygons
# If not empty, require the ID column
if not private_gdf.empty and private_id_col not in private_gdf.columns:
msg = f"Expected ID column '{private_id_col}' not found in private_gdf."
raise ValueError(msg)
# Validate that the contiguity type is supported
if contiguity not in {"queen", "rook"}:
msg = "contiguity must be either 'queen' or 'rook'"
raise ValueError(msg)
# Handle empty or insufficient data: return empty edges GeoDataFrame
if private_gdf.empty or len(private_gdf) < 2:
group_cols = [group_col or "group"]
empty_edges = _create_empty_edges_gdf(
private_gdf.crs,
"from_private_id",
"to_private_id",
group_cols,
)
return (
(private_gdf, empty_edges)
if not as_nx
else gdf_to_nx(nodes=private_gdf, edges=empty_edges)
)
# Validate that the group column exists if specified
if group_col and group_col not in private_gdf.columns:
msg = f"group_col '{group_col}' not found in private_gdf columns"
raise ValueError(msg)
# Reset index for consistent spatial weights computation by libpysal
gdf_indexed = private_gdf.reset_index(drop=True)
# Create spatial weights matrix (Queen or Rook contiguity)
spatial_weights = _create_spatial_weights(gdf_indexed, contiguity)
# Extract adjacency relationships from the spatial weights matrix
adjacency_data = _extract_adjacency_relationships(
spatial_weights,
gdf_indexed,
private_id_col,
group_col,
)
# Create edge geometries (LineStrings connecting centroids of adjacent polygons)
edges_gdf_data = _create_adjacency_edges(adjacency_data, gdf_indexed, group_col or "group")
# Convert the DataFrame with edge geometries to a GeoDataFrame
edges_gdf = gpd.GeoDataFrame(
edges_gdf_data, # Data including 'from_private_id', 'to_private_id', group, and geometry
geometry="geometry",
crs=private_gdf.crs, # Preserve original CRS
)
return (private_gdf, edges_gdf) if not as_nx else gdf_to_nx(nodes=private_gdf, edges=edges_gdf)
# ============================================================================
# PRIVATE TO PUBLIC GRAPH FUNCTIONS
# ============================================================================
[docs]
def private_to_public_graph(
private_gdf: gpd.GeoDataFrame,
public_gdf: gpd.GeoDataFrame,
primary_barrier_col: str | None = None,
tolerance: float = 1e-6,
as_nx: bool = False,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph:
"""
Create edges between private polygons and nearby public geometries.
This function identifies spatial relationships between private spaces (tessellations)
and public spaces (street segments) by finding intersections between buffered public
geometries and private polygons.
Input GDFs are expected to have 'private_id' and 'public_id' columns respectively.
Parameters
----------
private_gdf : geopandas.GeoDataFrame
GeoDataFrame containing private space polygons. Expected to have a 'private_id' column.
public_gdf : geopandas.GeoDataFrame
GeoDataFrame containing public space geometries (typically LineStrings).
Expected to have a 'public_id' column.
primary_barrier_col : str, optional
Column name for alternative public geometry. If specified and exists,
this geometry will be used instead of the main geometry column.
tolerance : float, default=1e-6
Buffer distance for public geometries to detect proximity to private spaces.
as_nx : bool, default=False
If True, convert the output to a NetworkX graph.
Returns
-------
tuple[geopandas.GeoDataFrame, geopandas.GeoDataFrame] | networkx.Graph
A tuple containing:
- Nodes of the private and public graphs (the input private_gdf and public_gdf).
- Edges of the private-public graph (connections between private polygons and public geometries).
If as_nx is True, returns a NetworkX graph.
Raises
------
TypeError
If private_gdf or public_gdf are not GeoDataFrames.
ValueError
If 'private_id' or 'public_id' columns are missing from input GDFs.
See Also
--------
morphological_graph : Main function that creates comprehensive morphological graphs.
private_to_private_graph : Create adjacency between private spaces.
public_to_public_graph : Create connectivity between public spaces.
Notes
-----
Edge geometries are created as LineStrings connecting the centroids of
private polygons and public geometries. The function uses spatial joins
to identify overlapping areas within the specified tolerance.
Input GDFs are expected to have 'private_id' and 'public_id' columns respectively.
Examples
--------
>>> # Create private-to-public interface graph
>>> nodes, edges = private_to_public_graph(tessellation_gdf, segments_gdf)
>>> # With custom tolerance
>>> nodes, edges = private_to_public_graph(tessellation_gdf, segments_gdf, tolerance=2.0)
"""
# Input validation
_validate_single_gdf_input(private_gdf, "private_gdf")
_validate_single_gdf_input(public_gdf, "public_gdf")
private_id_col = "private_id" # Fixed ID column name for private spaces
public_id_col = "public_id" # Fixed ID column name for public spaces
# Handle empty data: return empty edges GeoDataFrame
if private_gdf.empty or public_gdf.empty:
empty_edges = _create_empty_edges_gdf(private_gdf.crs, private_id_col, public_id_col)
all_nodes = pd.concat([private_gdf, public_gdf], ignore_index=True)
return (
(all_nodes, empty_edges) if not as_nx else gdf_to_nx(nodes=all_nodes, edges=empty_edges)
)
# Ensure required ID columns exist in the input GeoDataFrames
if private_id_col not in private_gdf.columns:
msg = f"Expected ID column '{private_id_col}' not found in private_gdf."
raise ValueError(msg)
if public_id_col not in public_gdf.columns:
msg = f"Expected ID column '{public_id_col}' not found in public_gdf."
raise ValueError(msg)
# Ensure CRS consistency between private and public GeoDataFrames
public_gdf = _ensure_crs_consistency(private_gdf, public_gdf)
# Determine which geometry column to use for public spaces (main or alternative)
join_geom_series = (
public_gdf[primary_barrier_col]
if primary_barrier_col and primary_barrier_col in public_gdf.columns
else public_gdf.geometry
)
# Create buffered geometries for public spaces to detect proximity
buffered_public_data = {public_id_col: public_gdf[public_id_col]} # Keep public IDs
buffered_public = gpd.GeoDataFrame(
buffered_public_data,
geometry=join_geom_series.buffer(tolerance), # Buffer the selected public geometry
crs=public_gdf.crs,
)
# Perform spatial join to find intersections between private polygons and buffered public geometries
joined = gpd.sjoin(
private_gdf[[private_id_col, "geometry"]], # Private polygons with their IDs
buffered_public, # Buffered public geometries with their IDs
how="inner", # Keep only intersecting pairs
predicate="intersects", # Use intersection predicate
)
# If intersections found, keep only the ID columns
if not joined.empty:
id_cols_to_keep = [private_id_col, public_id_col]
joined = joined[id_cols_to_keep] # Select relevant ID columns
# Drop duplicate pairs of (private_id, public_id)
joined = joined.drop_duplicates()
# Create maps of centroids for private and public geometries, indexed by their IDs
private_centroids_map = (
private_gdf.drop_duplicates(subset=[private_id_col])
.set_index(private_id_col)
.geometry.centroid
)
public_centroids_map = (
public_gdf.drop_duplicates(subset=[public_id_col])
.set_index(public_id_col)
.geometry.centroid
)
# Prepare to add edge geometries to the joined DataFrame
joined_with_geom = joined.copy()
if joined_with_geom.empty:
edges_gdf = gpd.GeoDataFrame(joined_with_geom, geometry="geometry", crs=private_gdf.crs)
all_nodes = pd.concat([private_gdf, public_gdf], ignore_index=True)
return (all_nodes, edges_gdf) if not as_nx else gdf_to_nx(nodes=all_nodes, edges=edges_gdf)
# Get centroid geometries for each pair in the 'joined' DataFrame
private_centroids = private_centroids_map.loc[joined_with_geom[private_id_col]].reset_index(
drop=True,
)
public_centroids = public_centroids_map.loc[joined_with_geom[public_id_col]].reset_index(
drop=True,
)
# Extract coordinates and ensure 2D array shape
private_coords = np.array(list(zip(private_centroids.x, private_centroids.y, strict=True)))
public_coords = np.array(list(zip(public_centroids.x, public_centroids.y, strict=True)))
# Ensure coords are 2D by reshaping if needed
private_coords = private_coords.reshape(-1, 2)
public_coords = public_coords.reshape(-1, 2)
# Stack coordinates for LineString creation
line_coords = np.stack((private_coords, public_coords), axis=1)
joined_with_geom["geometry"] = list(sh_linestrings(line_coords))
# Concatenate private and public GeoDataFrames to create a unified nodes GeoDataFrame
nodes_gdf = pd.concat([private_gdf, public_gdf], ignore_index=True)
# Convert the DataFrame with edge geometries to a GeoDataFrame
edges_gdf = gpd.GeoDataFrame(joined_with_geom, geometry="geometry", crs=private_gdf.crs)
return (nodes_gdf, edges_gdf) if not as_nx else gdf_to_nx(nodes=nodes_gdf, edges=edges_gdf)
# ============================================================================
# PUBLIC TO PUBLIC GRAPH FUNCTIONS
# ============================================================================
[docs]
def public_to_public_graph(
public_gdf: gpd.GeoDataFrame,
as_nx: bool = False,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph:
"""
Create edges between connected public segments based on topological connectivity.
This function identifies topological connections between public space geometries
(typically street segments) using the dual graph approach to find segments
that share endpoints or connection points.
The function automatically creates a unique identifier for each row if needed.
Parameters
----------
public_gdf : geopandas.GeoDataFrame
GeoDataFrame containing public space geometries (typically LineString).
as_nx : bool, default=False
If True, convert the output to a NetworkX graph.
Returns
-------
tuple[geopandas.GeoDataFrame, gpd.GeoDataFrame] | networkx.Graph
A tuple containing:
- Nodes of the public graph (the input public_gdf).
- Edges of the public graph (connectivity).
If as_nx is True, returns a NetworkX graph.
Raises
------
TypeError
If public_gdf is not a GeoDataFrame.
See Also
--------
morphological_graph : Main function that creates comprehensive morphological graphs.
private_to_private_graph : Create adjacency between private spaces.
private_to_public_graph : Create connections between private and public spaces.
Notes
-----
The function uses the dual graph approach where each LineString becomes a node,
and edges represent topological connections between segments. Edge geometries
are created as LineStrings connecting the centroids of connected segments.
Examples
--------
>>> # Create public-to-public connectivity graph
>>> nodes, edges = public_to_public_graph(segments_gdf)
>>> # Convert to NetworkX format
>>> graph = public_to_public_graph(segments_gdf, as_nx=True)
"""
# Input validation
_validate_single_gdf_input(public_gdf, "public_gdf")
# Handle empty or insufficient data: return empty edges GeoDataFrame
if public_gdf.empty or len(public_gdf) < 2:
empty_edges = _create_empty_edges_gdf(public_gdf.crs, "from_public_id", "to_public_id")
return (
(public_gdf, empty_edges)
if not as_nx
else gdf_to_nx(nodes=public_gdf, edges=empty_edges)
)
# Create a copy to avoid modifying the original
public_gdf_copy = public_gdf.copy()
# Check if public_id column already exists (shared with other components)
if "public_id" in public_gdf_copy.columns:
edge_id_col = "public_id"
else:
edge_id_col = "_edge_id"
public_gdf_copy[edge_id_col] = [str(idx) for idx in public_gdf_copy.index]
# Convert public_gdf to a graph
segment_nodes, segment_edges = segments_to_graph(public_gdf_copy)
# Create dual graph (nodes are segments, edges are connections)
_, dual_edges = dual_graph(
(segment_nodes, segment_edges),
edge_id_col=edge_id_col,
keep_original_geom=True,
)
# Preserve the original MultiIndex structure and data types
if isinstance(dual_edges.index, pd.MultiIndex):
# Rename the MultiIndex levels for clarity and consistency
dual_edges.index.names = ["from_public_id", "to_public_id"]
# Reset index to ensure it is a regular DataFrame
dual_edges = dual_edges.reset_index()
return (public_gdf, dual_edges) if not as_nx else gdf_to_nx(nodes=public_gdf, edges=dual_edges)
# ============================================================================
# HELPER FUNCTIONS FOR VALIDATION AND DATA PROCESSIçNG
# ============================================================================
def _validate_input_gdfs(buildings_gdf: gpd.GeoDataFrame, segments_gdf: gpd.GeoDataFrame) -> None:
"""
Validate the primary input GeoDataFrames for the morphological graph function.
This function ensures that the provided buildings and segments inputs are both
GeoDataFrames and that their geometries are of the expected types (Polygons for
buildings, LineStrings for segments). It serves as a critical initial check
to prevent errors in downstream processing.
Parameters
----------
buildings_gdf : geopandas.GeoDataFrame
GeoDataFrame containing building polygons.
segments_gdf : geopandas.GeoDataFrame
GeoDataFrame containing street segments.
Raises
------
TypeError
If either input is not a GeoDataFrame.
ValueError
If the geometries are not of the expected types.
See Also
--------
_validate_single_gdf_input : Validate a single GeoDataFrame.
"""
# Check if buildings_gdf is a GeoDataFrame
if not isinstance(buildings_gdf, gpd.GeoDataFrame):
msg = "buildings_gdf must be a GeoDataFrame"
raise TypeError(msg)
# Check if segments_gdf is a GeoDataFrame
if not isinstance(segments_gdf, gpd.GeoDataFrame):
msg = "segments_gdf must be a GeoDataFrame"
raise TypeError(msg)
# Validate geometry types for non-empty buildings_gdf
if not buildings_gdf.empty:
building_geom_types = buildings_gdf.geometry.geom_type.unique()
if not all(geom_type in {"Polygon", "MultiPolygon"} for geom_type in building_geom_types):
msg = (
f"buildings_gdf must contain only Polygon or MultiPolygon geometries. "
f"Found: {', '.join(building_geom_types)}"
)
raise ValueError(msg)
# Validate geometry types for non-empty segments_gdf
if not segments_gdf.empty:
# Assuming LineString is required for operations like dual_graph
segment_geom_types = segments_gdf.geometry.geom_type.unique()
if not all(geom_type in {"LineString"} for geom_type in segment_geom_types):
msg = (
f"segments_gdf must contain only LineString geometries. "
f"Found: {', '.join(segment_geom_types)}"
)
raise ValueError(msg)
def _validate_single_gdf_input(
gdf: gpd.GeoDataFrame,
gdf_name: str,
) -> None:
"""
Validate that a single input is a GeoDataFrame.
This is a simple utility function to ensure that an input object is of type
geopandas.GeoDataFrame, raising a TypeError with a descriptive message if it
is not. It is used for validating individual geospatial inputs in various
functions.
Parameters
----------
gdf : geopandas.GeoDataFrame
The GeoDataFrame to validate.
gdf_name : str
The name of the GeoDataFrame, used in the error message.
Raises
------
TypeError
If the input is not a GeoDataFrame.
See Also
--------
_validate_input_gdfs : Validate both buildings and segments GeoDataFrames.
"""
# Check if the input is a GeoDataFrame
if not isinstance(gdf, gpd.GeoDataFrame):
msg = f"{gdf_name} must be a GeoDataFrame"
raise TypeError(msg)
def _ensure_crs_consistency(
target_gdf: gpd.GeoDataFrame,
source_gdf: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
"""
Ensure that the source GeoDataFrame has the same CRS as the target.
This function checks if the Coordinate Reference System (CRS) of the source
GeoDataFrame matches that of the target. If they do not match, it reprojects
the source to the target's CRS and issues a warning. This is essential for
ensuring that spatial operations between the two GeoDataFrames are valid.
Parameters
----------
target_gdf : geopandas.GeoDataFrame
The GeoDataFrame with the target CRS.
source_gdf : geopandas.GeoDataFrame
The GeoDataFrame to check and potentially reproject.
Returns
-------
geopandas.GeoDataFrame
The source GeoDataFrame, reprojected to the target CRS if necessary.
Warns
-----
RuntimeWarning
If a CRS mismatch is detected and reprojection is performed.
"""
# Check if CRS of source_gdf matches target_gdf
if source_gdf.crs != target_gdf.crs:
warnings.warn(
"CRS mismatch detected, reprojecting",
RuntimeWarning,
stacklevel=3,
) # Warn user
return source_gdf.to_crs(target_gdf.crs) # Reproject source_gdf to target_gdf's CRS
return source_gdf # Return source_gdf as is if CRSs match
def _prepare_barriers(
segments: gpd.GeoDataFrame,
geom_col: str | None,
) -> gpd.GeoDataFrame:
"""
Prepare the barrier geometries for tessellation.
This function selects the appropriate geometry column from the segments
GeoDataFrame to be used as barriers in the tessellation process. If an
alternative geometry column is specified and exists, it is used; otherwise,
the default geometry column is used.
Parameters
----------
segments : geopandas.GeoDataFrame
The street segments GeoDataFrame.
geom_col : str, optional
The name of an alternative geometry column to use for the barriers.
Returns
-------
geopandas.GeoDataFrame
A GeoDataFrame containing the prepared barrier geometries.
"""
# If an alternative geometry column is specified, exists, and is not the default 'geometry'
if geom_col and geom_col in segments.columns and geom_col != "geometry":
# Create a new GeoDataFrame using the alternative geometry column as the active geometry
return gpd.GeoDataFrame(
segments.drop(columns=["geometry"]), # Drop the original 'geometry' column
geometry=segments[geom_col], # Set the alternative column as the geometry
crs=segments.crs, # Preserve CRS
)
return segments.copy() # Otherwise, return a copy of the original segments
def _filter_adjacent_tessellation(
tessellation: gpd.GeoDataFrame,
segments: gpd.GeoDataFrame,
max_distance: float = math.inf,
) -> gpd.GeoDataFrame:
"""
Filter tessellation cells to include only those adjacent to segments.
This function filters a tessellation GeoDataFrame to retain only the cells
that are within a specified maximum Euclidean distance of the provided street
segments. If the tessellation is grouped by enclosures, the filtering is
performed independently for each enclosure, considering only the segments
that intersect that enclosure.
Parameters
----------
tessellation : geopandas.GeoDataFrame
The tessellation GeoDataFrame to filter.
segments : geopandas.GeoDataFrame
The street segments to measure distance against.
max_distance : float, default math.inf
The maximum distance for a tessellation cell to be considered adjacent.
Returns
-------
geopandas.GeoDataFrame
The filtered tessellation GeoDataFrame.
"""
# If tessellation is empty, return an empty GeoDataFrame with the same structure
if tessellation.empty:
return tessellation.copy()
# If max_distance is infinite, no filtering is needed based on distance
if math.isinf(max_distance):
return tessellation.copy()
# Check if 'enclosure_index' column exists for grouped processing
enclosure_col = "enclosure_index" if "enclosure_index" in tessellation.columns else None
# List to store filtered parts of tessellation
filtered_parts: list[gpd.GeoDataFrame] = []
# Iterate over each enclosure group
for _, group in tessellation.groupby(enclosure_col):
# Geometry of the current enclosure
enclosure_geom = group.union_all()
# Segments intersecting this enclosure
segments_in_enclosure = segments[segments.intersects(enclosure_geom)]
# Union of segments within this enclosure
segment_union_in_enclosure = segments_in_enclosure.union_all()
# Centroids of cells in this group
centroids_in_group = group.geometry.centroid
# Distances to segments in this enclosure
distances_in_group = centroids_in_group.distance(segment_union_in_enclosure)
# Filter cells in group by distance
filtered_group = group.loc[distances_in_group <= max_distance]
# Add filtered group to list
if not filtered_group.empty:
filtered_parts.append(filtered_group)
# Concatenate all filtered parts into a single GeoDataFrame
return gpd.GeoDataFrame(pd.concat(filtered_parts), crs=tessellation.crs)
def _build_spatial_graph(
segments: gpd.GeoDataFrame,
tessellation_centroids: gpd.GeoSeries,
) -> tuple[nx.Graph, dict[int, str], dict[str, tuple[float, float]]]:
"""
Build a spatial graph from segments and tessellation centroids.
This function creates a unified NetworkX graph that combines the street segment
network with the tessellation centroids. The segment endpoints and the centroids
are all treated as nodes in this temporary graph, which is used for calculating
network distances.
Parameters
----------
segments : geopandas.GeoDataFrame
The street segments GeoDataFrame.
tessellation_centroids : geopandas.GeoSeries
A GeoSeries of tessellation centroid points.
Returns
-------
tuple[networkx.Graph, dict[int, str], dict[str, tuple[float, float]]]
A tuple containing the combined graph, a mapping from centroid integer
location to node ID, and a dictionary of segment node positions.
"""
# Convert segment GeoDataFrame to a NetworkX graph; nodes are segment endpoints, edges are segments
graph = gdf_to_nx(edges=segments)
# Get positions (coordinates) of segment graph nodes
segment_node_positions = nx.get_node_attributes(graph, "pos")
# Create unique node IDs for each tessellation centroid based on its iloc (integer location)
centroid_iloc_to_node_id = {
i: f"tess_centroid_{i}" for i, _ in enumerate(tessellation_centroids)
}
# Prepare new nodes (tessellation centroids) to add to the graph with their positions
centroid_nodes = [
(f"tess_centroid_{i}", {"pos": (point.x, point.y), "type": "centroid_node"})
for i, point in enumerate(tessellation_centroids) # point is a Shapely Point object
]
# Add tessellation centroid nodes to the graph
graph.add_nodes_from(centroid_nodes)
return graph, centroid_iloc_to_node_id, segment_node_positions # Return graph and mappings
def _connect_centroids_to_segment_graph(
graph: nx.Graph,
tessellation_centroids: gpd.GeoSeries, # GeoSeries of tessellation centroid Points
centroid_iloc_to_node_id: dict[int, str], # Maps centroid iloc to graph node ID
segment_node_positions: dict[str, tuple[float, float]], # Positions of segment graph nodes
) -> None:
"""
Connect centroid nodes to their nearest segment graph nodes.
This function uses a KDTree for efficient nearest neighbor search to find the
closest segment node for each tessellation centroid. It then adds edges to the
graph connecting each centroid to its nearest segment node, with the edge
length representing the Euclidean distance.
Parameters
----------
graph : networkx.Graph
The graph to which the new edges will be added.
tessellation_centroids : geopandas.GeoSeries
The GeoSeries of tessellation centroid points.
centroid_iloc_to_node_id : dict[int, str]
A mapping from centroid integer location to its graph node ID.
segment_node_positions : dict[str, tuple[float, float]]
A dictionary of segment node positions.
"""
# Prepare segment node IDs and their coordinates for KDTree
segment_node_ids = list(segment_node_positions.keys())
segment_node_coords = [list(coord) for coord in segment_node_positions.values()]
segment_coords_array = np.array(segment_node_coords)
# Build KDTree from segment graph node coordinates for efficient nearest neighbor search
kdtree = KDTree(segment_coords_array)
# Prepare centroid coordinates for querying the KDTree
centroid_coords = [(point.x, point.y) for point in tessellation_centroids.tolist()]
centroid_coords_array = np.array(centroid_coords)
# Query KDTree: for each centroid, find the nearest segment graph node and distance to it
distances_to_segments, segment_indices = kdtree.query(centroid_coords_array)
# Prepare edges to connect centroids to their nearest segment graph nodes
edges_to_add = [
(
centroid_iloc_to_node_id[i], # Graph node ID of the i-th centroid
segment_node_ids[segment_indices[i]], # Graph node ID of the nearest segment node
{"length": distances_to_segments[i]}, # Euclidean distance as edge length
)
for i in range(len(tessellation_centroids)) # Iterate through each centroid
if 0 <= segment_indices[i] < len(segment_node_ids) # Ensure index is valid
]
# Add the new edges to the graph
if edges_to_add:
graph.add_edges_from(edges_to_add)
def _connect_centroids_to_centroids(
graph: nx.Graph,
tessellation_centroids: gpd.GeoSeries, # Assumed to be a GeoSeries of Point geometries
centroid_iloc_to_node_id: dict[int, str], # Maps iloc of centroids to graph node_id
) -> None:
"""
Connect centroid nodes to each other based on Euclidean distance.
This function creates a complete graph between all the tessellation centroid
nodes, where the weight of each edge is the Euclidean distance between the
centroids. This allows for pathfinding between centroids that may not be
directly connected to the street network.
Parameters
----------
graph : networkx.Graph
The graph to which the new edges will be added.
tessellation_centroids : geopandas.GeoSeries
The GeoSeries of tessellation centroid points.
centroid_iloc_to_node_id : dict[int, str]
A mapping from centroid integer location to its graph node ID.
"""
# Get relevant ilocs (integer positions) from the centroid_iloc_to_node_id map and sort them
relevant_ilocs = sorted(centroid_iloc_to_node_id.keys())
# Extract centroid Point geometries and their graph node IDs in a consistent, sorted order
points_to_connect = tessellation_centroids.iloc[relevant_ilocs] # GeoSeries of Points
node_ids_ordered = [
centroid_iloc_to_node_id[iloc] for iloc in relevant_ilocs
] # List of node IDs
# Create a 2D NumPy array of coordinates (x, y) from the Point geometries
coords_array = np.array([(point.x, point.y) for point in points_to_connect])
# Calculate all pairwise Euclidean distances using scipy.spatial.distance.pdist
# pdist returns a condensed distance matrix (a 1D array of upper triangular part)
pairwise_distances = pdist(coords_array, metric="euclidean")
# List to store edges to be added to the graph
edges_to_add = []
# Generate all unique pairs of ordered node IDs using itertools.combinations
# The order of pairs matches the order of distances in pairwise_distances
for idx, (node_id1, node_id2) in enumerate(itertools.combinations(node_ids_ordered, 2)):
distance = pairwise_distances[idx] # Get the corresponding distance
edges_to_add.append((node_id1, node_id2, {"length": distance})) # Prepare edge tuple
# Add all new centroid-to-centroid edges to the graph
if edges_to_add:
graph.add_edges_from(edges_to_add)
def _find_closest_node_to_center(
graph: nx.Graph,
center_point_geom: Point,
) -> str:
"""
Find the graph node ID closest to a geometric center point.
This function uses a KDTree to efficiently find the nearest node in the graph
to a given geometric point. This is used to identify the starting node for
network distance calculations when filtering by distance from a center point.
Parameters
----------
graph : networkx.Graph
The graph to search within.
center_point_geom : shapely.geometry.Point
The geometric point to find the closest node to.
Returns
-------
str
The ID of the closest node in the graph.
"""
# Extract node positions from the graph (all nodes in spatial graphs have 'pos')
pos = nx.get_node_attributes(graph, "pos")
node_ids = list(pos.keys())
node_coords = np.array(list(pos.values()))
# Create a KDTree for efficient nearest neighbor search
kdtree = KDTree(node_coords)
# Query for the closest node to the center point
_, idx = kdtree.query([center_point_geom.x, center_point_geom.y])
# Return the ID of the closest node
return str(node_ids[idx])
def _filter_nodes_by_path_length(
graph: nx.Graph,
source_node_id: str,
max_distance: float,
centroid_iloc_to_node_id: dict[int, str],
) -> list[int]:
"""
Filter nodes by their shortest path length from a source node.
This function calculates the shortest path distance from a given source node to
all other nodes in the graph using Dijkstra's algorithm. It then returns a list
of the integer locations (ilocs) of the tessellation centroids whose corresponding
nodes are within the specified maximum distance.
Parameters
----------
graph : networkx.Graph
The graph to perform the search on.
source_node_id : str
The ID of the source node for the path length calculation.
max_distance : float
The maximum path length for a node to be included.
centroid_iloc_to_node_id : dict[int, str]
A mapping from centroid integer location to its graph node ID.
Returns
-------
list[int]
A list of integer locations of the centroids that are within the distance.
"""
if source_node_id not in graph:
# This can happen if the center point is very far from any graph node
logger.warning("Source node for distance filtering not found in graph.")
return []
lengths = nx.single_source_dijkstra_path_length(
graph,
source_node_id,
weight="length",
)
# Get the ilocs of tessellation centroids that are within the max_distance
keep_ilocs = []
for iloc, node_id in centroid_iloc_to_node_id.items():
if node_id in lengths and lengths[node_id] <= max_distance:
keep_ilocs.append(iloc)
return keep_ilocs
def _filter_tessellation_by_network_distance(
tessellation: gpd.GeoDataFrame,
segments: gpd.GeoDataFrame,
center_point: gpd.GeoSeries | gpd.GeoDataFrame | Point, # Geographic center for filtering
max_distance: float, # Maximum network distance
) -> gpd.GeoDataFrame:
"""
Filter tessellation by network distance from a center point.
This function filters a tessellation GeoDataFrame to include only those cells
that are within a specified network distance from a given `center_point`.
It constructs a spatial graph combining street segments and tessellation
centroids, then uses shortest-path calculations to determine reachability.
Parameters
----------
tessellation : geopandas.GeoDataFrame
The tessellation GeoDataFrame to filter.
segments : geopandas.GeoDataFrame
The street segments GeoDataFrame used to build the network for distance calculations.
center_point : shapely.geometry.Point or geopandas.GeoSeries or geopandas.GeoDataFrame
The geographic center point(s) from which to calculate network distances.
max_distance : float
The maximum network distance (e.g., in meters) for a tessellation cell to be included.
Returns
-------
geopandas.GeoDataFrame
The filtered tessellation GeoDataFrame, containing only cells within the specified
network distance from the center point.
"""
# Return a copy of tessellation if it or segments are empty
if tessellation.empty or segments.empty:
return tessellation.copy()
# Get centroids of tessellation cells
tessellation_centroids = tessellation.geometry.centroid
# Build a spatial graph including segment endpoints and tessellation centroids as nodes
graph, centroid_iloc_to_node_id, segment_node_positions = _build_spatial_graph(
segments,
tessellation_centroids,
)
# Connect tessellation centroid nodes to the nearest segment graph nodes
_connect_centroids_to_segment_graph(
graph,
tessellation_centroids,
centroid_iloc_to_node_id,
segment_node_positions,
)
# Connect tessellation centroid nodes to each other (e.g., for paths through open spaces)
_connect_centroids_to_centroids(graph, tessellation_centroids, centroid_iloc_to_node_id)
# Normalize center_point input to a single Shapely Point geometry
center_geom = center_point.iloc[0] if isinstance(center_point, gpd.GeoSeries) else center_point
# Find the graph node closest to the geographic center_geom
center_node_id_in_graph = _find_closest_node_to_center(graph, center_geom)
# Filter centroid ilocs based on network path length from the center_node_id_in_graph
keep_ilocs = _filter_nodes_by_path_length(
graph,
center_node_id_in_graph,
max_distance,
centroid_iloc_to_node_id,
)
# Return the subset of the original tessellation corresponding to the kept ilocs
return tessellation.iloc[sorted(keep_ilocs)].copy()
def _add_building_info(
tessellation: gpd.GeoDataFrame,
buildings: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
"""
Add building information to tessellation.
This function performs a spatial join between the tessellation GeoDataFrame
and the buildings GeoDataFrame to associate each tessellation cell with
the building(s) it contains or intersects. It adds a new column
`building_geometry` to the tessellation, containing the geometry of the
intersecting building.
Parameters
----------
tessellation : geopandas.GeoDataFrame
The tessellation GeoDataFrame to which building information will be added.
buildings : geopandas.GeoDataFrame
The GeoDataFrame containing building geometries.
Returns
-------
geopandas.GeoDataFrame
The tessellation GeoDataFrame with an added `building_geometry` column.
"""
# Perform a spatial join (left join) from tessellation to buildings based on intersection
# This adds columns from 'buildings' to 'tessellation' for intersecting features.
# 'index_right' column is added by sjoin, containing the index of the joined building.
joined = gpd.sjoin(tessellation, buildings, how="left", predicate="intersects")
# If 'index_right' exists (meaning some joins occurred), map building geometries
if "index_right" in joined.columns:
# Create a mapping from building index to building geometry
building_geom_map = buildings.geometry.to_dict()
# Use the 'index_right' (building indices) to look up geometries from the map
# This creates a pandas Series of Shapely geometries.
building_geometries_series = joined["index_right"].map(building_geom_map)
# Convert this pandas Series to a GeoSeries, assigning the CRS from the source 'buildings' GDF
joined["building_geometry"] = gpd.GeoSeries(building_geometries_series, crs=buildings.crs)
# Remove the temporary 'index_right' column
joined = joined.drop(columns=["index_right"])
return joined # Return tessellation with added building geometry (if any)
def _create_empty_edges_gdf(
crs: str | int | None,
from_col: str, # Name for the 'from' node ID column
to_col: str, # Name for the 'to' node ID column
extra_cols: list[str] | None = None, # Optional list of additional column names
) -> gpd.GeoDataFrame:
"""
Create an empty edges GeoDataFrame with specified column structure.
This helper function generates an empty GeoDataFrame suitable for representing
graph edges, ensuring it has the correct column names for 'from' and 'to'
node IDs, and optionally additional columns, along with a geometry column
and a specified Coordinate Reference System (CRS). This is useful for
initializing empty edge GeoDataFrames when no connections are found or
when setting up a new graph structure.
Parameters
----------
crs : str, int, or None
Coordinate reference system.
from_col : str
Name for the 'from' node ID column.
to_col : str
Name for the 'to' node ID column.
extra_cols : list[str], optional
Optional list of additional column names.
Returns
-------
geopandas.GeoDataFrame
Empty GeoDataFrame with specified columns.
"""
# Initialize list of column names with 'from' and 'to' ID columns
columns = [from_col, to_col]
# Add any extra columns if provided
if extra_cols:
columns.extend(extra_cols)
# Add the 'geometry' column name (standard for GeoDataFrames)
columns.append("geometry")
# Create an empty GeoDataFrame with the defined columns, geometry column, and CRS
return gpd.GeoDataFrame(columns=columns, geometry="geometry", crs=crs)
def _set_node_index(gdf: gpd.GeoDataFrame, col: str) -> gpd.GeoDataFrame:
"""
Set GeoDataFrame index using a specified column, if it exists.
This function attempts to set the index of a GeoDataFrame to a specified column.
If the GeoDataFrame is empty, it safely returns an empty GeoDataFrame with an
empty index. Otherwise, it sets the specified column as the new index, dropping
the column from the DataFrame's columns.
Parameters
----------
gdf : geopandas.GeoDataFrame
Input GeoDataFrame.
col : str
Column name to use as index.
Returns
-------
geopandas.GeoDataFrame
GeoDataFrame with index set if column exists.
See Also
--------
_set_edge_index : Set multi-index for edge GeoDataFrames.
Examples
--------
>>> indexed_gdf = _set_node_index(gdf, 'node_id')
"""
if gdf.empty:
# For an empty GDF, set an empty index.
# Attempting to set_index with a non-existent column `col` would error.
# If `col` is the intended index name, it can be assigned after.
return gdf.set_index(pd.Index([])) # Safest for empty
return gdf.set_index(col, drop=True)
def _set_edge_index(
gdf: gpd.GeoDataFrame,
from_col: str, # Column name for the 'from' part of the MultiIndex
to_col: str, # Column name for the 'to' part of the MultiIndex
) -> gpd.GeoDataFrame:
"""
Set multi-index for edge GeoDataFrame.
This function sets a MultiIndex on an edge GeoDataFrame using specified
'from' and 'to' column names. This is crucial for representing graph edges
where each edge is uniquely identified by its source and target nodes.
Parameters
----------
gdf : geopandas.GeoDataFrame
The edge GeoDataFrame to modify.
from_col : str
The name of the column to be used as the first level of the MultiIndex
(representing the source node ID).
to_col : str
The name of the column to be used as the second level of the MultiIndex
(representing the target node ID).
Returns
-------
geopandas.GeoDataFrame
The GeoDataFrame with the new MultiIndex applied.
"""
return gdf.set_index([from_col, to_col])
# ============================================================================
# HELPER FUNCTIONS FOR SPATIAL WEIGHTS AND ADJACENCY
# ============================================================================
def _create_spatial_weights(gdf: gpd.GeoDataFrame, contiguity: str) -> libpysal.weights.W | None:
"""
Create spatial weights matrix for adjacency analysis.
This function generates a spatial weights matrix based on the specified contiguity
type (Queen or Rook) for a given GeoDataFrame of polygon geometries. This matrix
is crucial for defining adjacency relationships between spatial units, which is
a fundamental step in many spatial analysis and graph construction tasks.
Parameters
----------
gdf : geopandas.GeoDataFrame
Input GeoDataFrame with polygon geometries.
contiguity : str
Type of contiguity ("queen" or "rook").
Returns
-------
libpysal.weights.W or None
Spatial weights matrix, or None if creation fails.
See Also
--------
_extract_adjacency_relationships : Extract relationships from spatial weights matrix.
Examples
--------
>>> weights = _create_spatial_weights(gdf, 'queen')
"""
# Attempt to create spatial weights matrix using libpysal
if contiguity == "queen": # Queen contiguity (shared edges or vertices)
return libpysal.weights.Queen.from_dataframe(gdf, use_index=True)
# Rook contiguity (shared edges only)
return libpysal.weights.Rook.from_dataframe(gdf, use_index=True)
def _extract_adjacency_relationships(
spatial_weights: libpysal.weights.W, # Precomputed spatial weights matrix
gdf: gpd.GeoDataFrame, # GeoDataFrame from which weights were computed (indexed 0..N-1)
id_col: str, # Name of the column in 'gdf' containing the actual polygon IDs
group_col: str | None, # Optional column name for grouping adjacencies
) -> pd.DataFrame:
"""
Extract adjacency relationships from spatial weights matrix.
This function processes a spatial weights matrix to extract pairs of adjacent
polygons and their corresponding IDs. It can optionally filter these relationships
to include only those within specified groups, ensuring that connections are
meaningful within a larger spatial context.
Parameters
----------
spatial_weights : libpysal.weights.W
Spatial weights matrix.
gdf : geopandas.GeoDataFrame
Input GeoDataFrame.
id_col : str
ID column name.
group_col : str, optional
Group column name for filtering.
Returns
-------
pandas.DataFrame
DataFrame with adjacency relationships.
See Also
--------
_create_spatial_weights : Create spatial weights matrix.
_create_adjacency_edges : Create edge geometries from adjacency data.
Examples
--------
>>> adj_data = _extract_adjacency_relationships(weights, gdf, 'id', 'group')
"""
# Convert spatial weights to COO (Coordinate Format) sparse matrix for easier pair extraction
coo = spatial_weights.sparse.tocoo()
# Create a mask to get only unique pairs (upper triangle, row < col) to avoid duplicates (i,j) and (j,i)
mask = coo.row < coo.col
rows = coo.row[mask] # Indices of 'from' polygons (based on gdf's 0..N-1 index)
cols = coo.col[mask] # Indices of 'to' polygons (based on gdf's 0..N-1 index)
# Extract the actual polygon IDs using the 'id_col' from the input 'gdf'
from_ids = gdf.iloc[rows][id_col].to_numpy() # Actual IDs for 'from' polygons
to_ids = gdf.iloc[cols][id_col].to_numpy() # Actual IDs for 'to' polygons
# If grouping is specified, filter adjacencies to only include pairs within the same group
if group_col:
grp_i = gdf.iloc[rows][group_col].to_numpy() # Group values for 'from' polygons
grp_j = gdf.iloc[cols][group_col].to_numpy() # Group values for 'to' polygons
valid_group_mask = grp_i == grp_j # Mask for pairs in the same group
# Apply group filter to indices and IDs
rows = rows[valid_group_mask]
cols = cols[valid_group_mask]
from_ids = from_ids[valid_group_mask]
to_ids = to_ids[valid_group_mask]
groups = grp_i[valid_group_mask] # Group values for the valid pairs
else:
# If no grouping, assign a default group "all" to all pairs
groups = np.full(len(from_ids), "all", dtype=object)
group_col = "group" # Set group_col name for the output DataFrame
# Filter out self-loops (though coo.row < coo.col should prevent this for simple IDs)
# This is an additional safeguard, especially if id_col might not be perfectly unique per geometry.
valid_ids_filter = from_ids != to_ids
from_ids = from_ids[valid_ids_filter]
to_ids = to_ids[valid_ids_filter]
groups = groups[valid_ids_filter]
rows = rows[valid_ids_filter]
cols = cols[valid_ids_filter]
# Return a DataFrame containing the adjacency relationships
return pd.DataFrame(
{
"row": rows, # Original 0..N-1 index of 'from' polygon in gdf_indexed
"col": cols, # Original 0..N-1 index of 'to' polygon in gdf_indexed
"from_private_id": from_ids, # Actual ID of 'from' polygon
"to_private_id": to_ids, # Actual ID of 'to' polygon
group_col: groups, # Group identifier for the pair
},
)
def _create_adjacency_edges(
adjacency_data: pd.DataFrame, # DataFrame from _extract_adjacency_relationships
gdf: gpd.GeoDataFrame, # Original private_gdf, indexed 0..N-1, used for centroid lookup
group_col: str, # Name of the group column in adjacency_data
) -> pd.DataFrame: # Returns a DataFrame, to be converted to GeoDataFrame by caller
"""
Create edge geometries from adjacency relationships.
This function processes adjacency data to generate edge geometries connecting
adjacent spatial features, creating LineString geometries between centroids
of neighboring features based on their spatial relationships.
Parameters
----------
adjacency_data : pandas.DataFrame
DataFrame with adjacency relationships (must include 'row', 'col',
'from_private_id', 'to_private_id', and group_col columns).
gdf : geopandas.GeoDataFrame
Input GeoDataFrame with geometries (assumed to have a simple 0..N-1 index
corresponding to 'row'/'col' in adjacency_data).
group_col : str
Group column name present in adjacency_data.
Returns
-------
pandas.DataFrame
DataFrame with edge geometries.
See Also
--------
_extract_adjacency_relationships : Extract adjacency relationships.
Examples
--------
>>> edges_df = _create_adjacency_edges(adj_data, gdf, 'group')
"""
# Work with a copy to avoid modifying the input DataFrame
adjacency_processed = adjacency_data.copy()
# Sort ID pairs to ensure canonical representation (e.g., (A,B) not (B,A)) for deduplication
id_pairs = adjacency_processed[["from_private_id", "to_private_id"]].to_numpy()
sorted_id_pairs = np.sort(id_pairs, axis=1) # Sort each pair [id1, id2]
adjacency_processed["from_private_id"] = sorted_id_pairs[:, 0] # Assign smaller ID to 'from'
adjacency_processed["to_private_id"] = sorted_id_pairs[:, 1] # Assign larger ID to 'to'
# Drop duplicate edges based on the sorted (from_id, to_id) and group
adjacency_processed = adjacency_processed.drop_duplicates(
subset=["from_private_id", "to_private_id", group_col],
)
if adjacency_processed.empty:
return adjacency_processed.reindex(
columns=["from_private_id", "to_private_id", group_col, "geometry"],
)
# Calculate centroids of all polygons in the input GeoDataFrame 'gdf'
# 'gdf' is assumed to be the one with 0..N-1 index matching 'row'/'col' in adjacency_data
centroids = gdf.geometry.centroid
# Get the 0..N-1 indices for 'from' and 'to' polygons from adjacency_data
rows_idx = adjacency_processed["row"].to_numpy() # Indices for 'from' centroids
cols_idx = adjacency_processed["col"].to_numpy() # Indices for 'to' centroids
# Look up centroid geometries using these indices
from_centroids = centroids.iloc[rows_idx] # Centroids of 'from' polygons
to_centroids = centroids.iloc[cols_idx] # Centroids of 'to' polygons
# Extract (x,y) coordinates for from_centroids and to_centroids
from_coords = np.array(list(zip(from_centroids.x, from_centroids.y, strict=True)))
to_coords = np.array(list(zip(to_centroids.x, to_centroids.y, strict=True)))
# Stack coordinates to form pairs for LineString creation [(x1,y1), (x2,y2)]
line_coords = np.stack((from_coords, to_coords), axis=1)
# Create LineStrings and assign to the 'geometry' column
adjacency_processed["geometry"] = list(sh_linestrings(line_coords))
# Drop intermediate 'row' and 'col' index columns if they exist
columns_to_drop = [col for col in ["row", "col"] if col in adjacency_processed.columns]
# Define final columns for the output DataFrame
final_columns = ["from_private_id", "to_private_id", group_col, "geometry"]
# Return the processed DataFrame with selected columns
return adjacency_processed.drop(columns=columns_to_drop)[final_columns]