Source code for city2graph.mobility

"""
Mobility / OD matrix utilities.

This module introduces the public function ``od_matrix_to_graph`` which
converts Origin-Destination (OD) data (adjacency matrices or edge lists)
into spatial graph representations used throughout the city2graph
ecosystem.

Notes
-----
This module includes a complete implementation of ``od_matrix_to_graph``:
input validation, zone alignment, conversion to a canonical edgelist,
thresholding and self-loop handling, optional geometry creation, and an
optional NetworkX export path.

Examples
--------
See the function docstring for usage examples with adjacency matrices,
NumPy arrays and edge lists (single/multi weight columns).
"""

from __future__ import annotations

import logging
import numbers
import warnings
from typing import TYPE_CHECKING
from typing import Literal
from typing import cast

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import LineString

# Use city2graph converters for compatibility across the stack
from .utils import gdf_to_nx

if TYPE_CHECKING:
    from collections.abc import Callable
    from collections.abc import Mapping

    import networkx as nx
    from numpy.typing import NDArray

__all__ = ["od_matrix_to_graph"]

# Logger for informational summaries and errors (warnings used for data quality)
logger = logging.getLogger(__name__)


[docs] def od_matrix_to_graph( # noqa: PLR0913 (public API requires many parameters) od_data: pd.DataFrame | np.ndarray, zones_gdf: gpd.GeoDataFrame, zone_id_col: str | None = None, *, matrix_type: Literal["edgelist", "adjacency"] = "edgelist", source_col: str = "source", target_col: str = "target", weight_cols: list[str] | None = None, threshold: float | None = None, threshold_col: str | None = None, include_self_loops: bool = False, compute_edge_geometry: bool = True, directed: bool = True, as_nx: bool = False, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame] | nx.Graph | nx.DiGraph: """ Convert OD data (edge list or adjacency matrix) into graph structures. Creates spatially-aware graphs from OD data following city2graph's GeoDataFrame-first design. Supports adjacency matrices (DataFrame or ndarray) and edgelists with one or multiple numeric weight columns. By default, this function returns a pair of GeoDataFrames representing nodes and edges. When ``directed=False``, the output is undirected: for each unordered pair {u, v}, the edge weight equals the sum of directed weights in both directions (u->v plus v->u). When a threshold is provided in undirected mode, it is applied after this summation. By default edges are directed and thresholded with the rule weight >= threshold (or, when no threshold provided, strictly > 0). Optionally, it can return a NetworkX graph when ``as_nx=True``. Parameters ---------- od_data : pandas.DataFrame | numpy.ndarray * When ``matrix_type='adjacency'``: a square DataFrame whose index & columns are zone IDs, or a square ndarray whose ordering matches ``zones_gdf``. * When ``matrix_type='edgelist'``: a DataFrame containing origin, destination and one or more numeric flow columns. zones_gdf : geopandas.GeoDataFrame GeoDataFrame of zones. Must contain unique identifiers in ``zone_id_col``. zone_id_col : str, optional Name of the zone ID column in ``zones_gdf`` (required in this initial skeleton; automatic inference may be added later). matrix_type : {'edgelist','adjacency'}, default 'edgelist' Declares how to interpret ``od_data``. source_col, target_col : str, default 'source','target' Column names for origins / destinations when using an edge list. weight_cols : Sequence[str] | None Edge list weight (flow) columns to preserve. A single column acts as the canonical weight. If multiple columns are provided a ``threshold_col`` must be designated in the full implementation. threshold : float, optional Minimum flow retained (>=) applied to ``threshold_col`` (future logic). threshold_col : str, optional Column among ``weight_cols`` used for thresholding & canonical weight (required when ``len(weight_cols) > 1`` in full implementation). include_self_loops : bool, default False Keep flows where origin == destination (defaults drop when False). compute_edge_geometry : bool, default True Whether to build LineString geometries from zone centroids. directed : bool, default True Whether to build a directed graph. If False, reciprocal edges are merged by summing their weights (and all provided weight columns). as_nx : bool, default False If True, final output will be an NetworkX graph (``nx.DiGraph`` when ``directed=True``; otherwise ``nx.Graph``). Returns ------- tuple of (geopandas.GeoDataFrame, geopandas.GeoDataFrame) The nodes and edges GeoDataFrames. The nodes GeoDataFrame index is aligned with the zone identifier (``zone_id_col`` when provided, else the original ``zones_gdf.index``). The edges GeoDataFrame uses a pandas MultiIndex on (source_id, target_id) and does not carry separate 'source'/'target' columns, to comply with gdf_to_nx/gdf_to_pyg. networkx.DiGraph When ``as_nx=True``, a directed graph with node and edge attributes. networkx.Graph When ``as_nx=True`` and ``directed=False``, an undirected graph with node and edge attributes. """ # --- Validation (Task 2) ------------------------------------------------ _validate_zones_gdf(zones_gdf, zone_id_col) _validate_crs(zones_gdf) # Validate matrix_type and that threshold is numeric if provided (Req 2.7) if threshold is not None and not isinstance(threshold, numbers.Number): msg = "threshold must be numeric (int or float)" raise ValueError(msg) validators: Mapping[str, Callable[..., None]] = { "adjacency": _validate_adjacency_data, "edgelist": _validate_edgelist_data, } if matrix_type not in validators: _msg = "matrix_type must be 'edgelist' or 'adjacency'" raise ValueError(_msg) validators[matrix_type]( od_data, zones_gdf=zones_gdf, source_col=source_col, target_col=target_col, weight_cols=weight_cols, ) if matrix_type == "edgelist": # Edgelist specifics: validate primary/threshold relationships # weight_cols is required for edgelist inputs (validated above) assert weight_cols is not None _validate_weights_threshold( cast("pd.DataFrame", od_data), weight_cols=weight_cols, _threshold=threshold, threshold_col=threshold_col, ) # --- Conversion to canonical edgelist ---------------------------------- # For undirected mode, postpone thresholding until after symmetrization post_sum_threshold = threshold if not directed else None if matrix_type == "edgelist": # Filter to zones and aggregate duplicates first aligned = _align_edgelist_zones( cast("pd.DataFrame", od_data), zones_gdf=zones_gdf, zone_id_col=zone_id_col, source_col=source_col, target_col=target_col, ) # Normalize: thresholding, self-loops policy, canonical columns edge_df = _normalize_edgelist( aligned, source_col=source_col, target_col=target_col, weight_cols=weight_cols if weight_cols is not None else [], # In undirected mode, thresholding is applied later threshold=None if not directed else threshold, threshold_col=threshold_col, include_self_loops=include_self_loops, ) elif matrix_type == "adjacency" and isinstance(od_data, pd.DataFrame): # Align labels with zones adj = _align_adjacency_zones( od_data, zones_gdf=zones_gdf, zone_id_col=zone_id_col, ) edge_df = _adjacency_to_edgelist( adj, include_self_loops=include_self_loops, # In undirected mode, thresholding is applied later threshold=None if not directed else threshold, ) elif matrix_type == "adjacency" and isinstance(od_data, np.ndarray): zone_ids = _align_numpy_array_zones( od_data, zones_gdf=zones_gdf, zone_id_col=zone_id_col, ) edge_df = _adjacency_to_edgelist( od_data, zone_ids, include_self_loops=include_self_loops, # In undirected mode, thresholding is applied later threshold=None if not directed else threshold, ) # Ensure canonical columns exist even if empty result if edge_df.empty: edge_df = _empty_edgeframe( include_extra_weights=(matrix_type == "edgelist" and bool(weight_cols)), extra_weights=(weight_cols or []), ) # If undirected, symmetrize by merging reciprocal edges and summing weights if not directed and not edge_df.empty: # Sum canonical 'weight' and any provided additional weight columns sum_cols = ["weight"] if matrix_type == "edgelist" and weight_cols: # Ensure we sum the explicitly requested weight columns as well # (they are already present in edge_df) for c in weight_cols: if c not in sum_cols: sum_cols.append(c) edge_df = _symmetrize_edges(edge_df, sum_cols=sum_cols) # Apply threshold after summation when requested if post_sum_threshold is not None: edge_df = edge_df.loc[_apply_threshold(edge_df["weight"], threshold=post_sum_threshold)] # --- Spatial assembly (Task 5) ----------------------------------------- # Nodes: set index aligned with zone identifier (column or original index) nodes_gdf = ( zones_gdf.set_index(zone_id_col, drop=False).copy() if zone_id_col is not None else zones_gdf.copy() ) # Edges: create geometry, then convert to MultiIndex (source,target) edges_gdf = _create_edge_geometries( edge_df, zones_gdf, zone_id_col=zone_id_col, source_col="source", target_col="target", compute_edge_geometry=compute_edge_geometry, ) # Convert to MultiIndex using the node identifiers and drop source/target columns if not edges_gdf.empty: mi = pd.MultiIndex.from_arrays( [edges_gdf["source"].to_numpy(), edges_gdf["target"].to_numpy()], names=["source", "target"], ) edges_gdf = edges_gdf.drop(columns=["source", "target"]) # keep canonical weight/attrs edges_gdf.index = mi # Ensure CRS preserved (GeoPandas keeps it on GeoDataFrame) # --- Output selection (Task 6) ----------------------------------------- if not as_nx: # GeoDataFrame-first API: return (nodes, edges) logger.info("Created graph with %d nodes and %d edges", len(nodes_gdf), len(edges_gdf)) return nodes_gdf, edges_gdf G = gdf_to_nx( nodes=nodes_gdf, edges=edges_gdf, keep_geom=compute_edge_geometry, directed=directed ) logger.info( "Created graph with %d nodes and %d edges", G.number_of_nodes(), G.number_of_edges() ) return G
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _validate_zones_gdf(zones_gdf: gpd.GeoDataFrame, zone_id_col: str | None) -> None: """ Validate the zones GeoDataFrame structure. Ensures the presence of a valid identifier column and basic integrity constraints (non-null, unique IDs) before downstream processing. Parameters ---------- zones_gdf : geopandas.GeoDataFrame GeoDataFrame containing zone geometries and attributes. zone_id_col : str | None Column name holding unique zone identifiers. Must be provided and present in ``zones_gdf``. Returns ------- None This function validates input and raises on failure. """ # Ensure zones_gdf is a GeoDataFrame (public API contracts/tests) if not isinstance(zones_gdf, gpd.GeoDataFrame): msg = "zones_gdf must be a GeoDataFrame" raise TypeError(msg) # Accept either an explicit zone id column or the DataFrame index if zone_id_col is not None: if zone_id_col not in zones_gdf.columns: msg = f"zone_id_col '{zone_id_col}' not found in zones_gdf columns" raise ValueError(msg) ids = zones_gdf[zone_id_col] else: ids = zones_gdf.index if ids.isna().any(): msg = "zone_id_col contains null values" raise ValueError(msg) if not ids.is_unique: msg = "zone_id_col values must be unique" raise ValueError(msg) # --------------------------------------------------------------------------- # Data normalization helpers # - _adjacency_to_edgelist: fast vectorized conversion using NumPy # - _normalize_edgelist: multi-attribute aggregation and thresholding # - _apply_threshold: common filtering semantics # --------------------------------------------------------------------------- def _warn_and_clean_adjacency(arr: np.ndarray) -> np.ndarray: """ Replace NaNs with 0 and warn on data quality issues (NaNs, negatives). Sanitizes adjacency arrays by converting missing values to zeros while preserving negative values, emitting warnings to flag potential issues. Parameters ---------- arr : numpy.ndarray Adjacency matrix values to sanitize. Returns ------- numpy.ndarray Array with NaNs replaced by 0.0. Negative values are preserved. """ n_nans = int(np.isnan(arr).sum()) if n_nans: warnings.warn( f"Adjacency contains {n_nans} NaN values; treating as 0 (requirement 2.3)", UserWarning, stacklevel=2, ) arr = np.nan_to_num(arr, nan=0.0) if (arr < 0).any(): warnings.warn( "Adjacency contains negative weights; keeping values (requirement 5.6)", UserWarning, stacklevel=2, ) return arr def _extract_array_and_ids( adjacency: pd.DataFrame | np.ndarray, zone_ids: pd.Index | None, ) -> tuple[np.ndarray, pd.Index]: """ Extract a numeric ndarray and matching zone ids from input. Accepts either a labeled DataFrame (preferred) or a raw ndarray with externally provided zone identifiers for consistent downstream mapping. Parameters ---------- adjacency : pandas.DataFrame | numpy.ndarray Adjacency matrix as DataFrame (labels in index/columns) or ndarray. zone_ids : pandas.Index | None Zone identifiers when ``adjacency`` is an ndarray. Returns ------- tuple[numpy.ndarray, pandas.Index] Tuple of (array, zone_ids). """ if isinstance(adjacency, pd.DataFrame): ids = pd.Index(adjacency.index) arr = adjacency.to_numpy(dtype=float, copy=False) return arr, ids arr = np.asarray(adjacency, dtype=float) # Upstream ensures zone_ids is provided for ndarray inputs return arr, cast("pd.Index", zone_ids) def _build_adjacency_mask( arr: np.ndarray, *, include_self_loops: bool, threshold: float | None, ) -> NDArray[np.bool_]: """ Build a boolean mask selecting matrix entries to keep. The mask encodes the default thresholding semantics used throughout the module: when ``threshold`` is ``None`` entries strictly greater than 0 are selected, otherwise entries greater than or equal to the provided ``threshold`` are selected. When ``include_self_loops`` is ``False``, the diagonal is set to ``False`` to drop self-loops. Parameters ---------- arr : numpy.ndarray Square adjacency matrix of weights to be filtered. include_self_loops : bool Whether to keep diagonal entries (self-loops). When ``False``, the diagonal is removed from the mask. threshold : float or None Inclusive minimum value to retain. If ``None``, values must be strictly greater than 0 to be kept. Returns ------- numpy.ndarray of bool Boolean mask with the same shape as ``arr`` indicating entries that pass the thresholding and self-loop policy. """ mask: NDArray[np.bool_] = (arr > 0) if threshold is None else (arr >= threshold) if include_self_loops: return mask.astype(bool, copy=False) # Drop diagonal in-place for clarity mask = mask.astype(bool, copy=False) np.fill_diagonal(mask, val=False) return mask def _apply_threshold(series: pd.Series, *, threshold: float | None) -> pd.Series: """ Return boolean mask for threshold semantics. Encodes the default behavior of dropping zeros when no threshold is provided, otherwise applying an inclusive cutoff (>= threshold). Parameters ---------- series : pandas.Series Series of numeric values to filter. threshold : float | None Minimum value to retain. When ``None``, values strictly greater than zero are kept. Returns ------- pandas.Series Boolean mask indicating which values pass the threshold. """ return series > 0 if threshold is None else series >= threshold def _empty_edgeframe(*, include_extra_weights: bool, extra_weights: list[str]) -> pd.DataFrame: """ Construct a canonical empty edge DataFrame with appropriate columns. This helper is used when upstream processing yields no edges (for example, after thresholding or when no valid zone pairs remain). It guarantees a consistent schema for downstream consumers by returning the expected columns even in the absence of data. Parameters ---------- include_extra_weights : bool Whether to include additional weight columns beyond the canonical 'weight'. extra_weights : list[str] Names of the extra weight columns to include when requested. Returns ------- pandas.DataFrame Empty DataFrame with columns ['source','target','weight',*extra_weights]. """ cols = ["source", "target", "weight"] if include_extra_weights: # Preserve provided order cols.extend(extra_weights) return pd.DataFrame(columns=cols) def _adjacency_to_edgelist( adjacency: pd.DataFrame | np.ndarray, zone_ids: pd.Index | None = None, *, include_self_loops: bool = False, threshold: float | None = None, ) -> pd.DataFrame: """ Convert an adjacency matrix into a canonical edgelist DataFrame. Produces a long-form table of directed edges from a square matrix by applying thresholding and (optionally) dropping diagonal entries. Parameters ---------- adjacency : DataFrame | ndarray Square matrix of flows. If DataFrame, index/columns are zone IDs and override ``zone_ids``. If ndarray, ``zone_ids`` must be provided and correspond to row/column order (validated upstream). zone_ids : pd.Index | None Zone identifiers in the order matching the matrix when ``adjacency`` is an ndarray. include_self_loops : bool Whether to keep diagonal elements subject to thresholding. threshold : float | None Minimum flow to keep. None => drop zeros by default. Returns ------- DataFrame Edge list with columns ['source', 'target', 'weight']. """ arr, ids = _extract_array_and_ids(adjacency, zone_ids) # Clean and build mask arr = _warn_and_clean_adjacency(arr) mask = _build_adjacency_mask(arr, include_self_loops=include_self_loops, threshold=threshold) # Vectorized extraction of (i,j,weight) i, j = np.where(mask) return pd.DataFrame( { "source": ids.take(i), "target": ids.take(j), "weight": arr[i, j], } ) def _coerce_weight_columns( df: pd.DataFrame, weight_cols: list[str], ) -> pd.DataFrame: """ Coerce weight columns to numeric with NaN->0 and warnings. Converts specified columns to numeric dtype, treating pre-existing NaNs as zeros and warning when negative values are encountered. Parameters ---------- df : pandas.DataFrame Input edge list DataFrame. weight_cols : list[str] Names of columns to coerce to numeric. Returns ------- pandas.DataFrame DataFrame with specified columns coerced to numeric (NaNs filled where originally present) and warnings emitted for negatives. """ coerced = df.copy() for col in weight_cols: original = coerced[col] original_nans = int(pd.isna(original).sum()) s = pd.to_numeric(original, errors="coerce") n_total = len(s) after_nans = int(pd.isna(s).sum()) if n_total > 0 and after_nans == n_total: msg = f"Column '{col}' could not be coerced to numeric (all values are non-numeric)" raise ValueError(msg) # Treat pre-existing NaNs as 0 with a warning (Req 2.3), # but if coercion introduced new NaNs (after_nans > original_nans) -> error (Req 5.10/5.11) if after_nans > original_nans: msg = f"Column '{col}' contains non-numeric values that cannot be coerced to numeric" raise ValueError(msg) if original_nans: warnings.warn( f"Column '{col}' has {original_nans} NaN values; treating as 0 (requirement 2.3)", UserWarning, stacklevel=2, ) # Only fill positions that were originally NaN s = s.mask(pd.isna(original), 0) if (s < 0).any(): warnings.warn( f"Column '{col}' contains negative weights; keeping values (requirement 5.6)", UserWarning, stacklevel=2, ) coerced[col] = s return coerced def _aggregate_edgelist( df: pd.DataFrame, *, source_col: str, target_col: str, weight_cols: list[str], ) -> pd.DataFrame: """ Aggregate duplicate (source, target) pairs by summing weight columns. Groups edges by origin and destination, summing all provided weight columns to remove duplicates in a single pass. Parameters ---------- df : pandas.DataFrame Edge list DataFrame. source_col, target_col : str Column names for origin/destination identifiers. weight_cols : list[str] Weight columns to sum during aggregation. Returns ------- pandas.DataFrame Aggregated edge list. """ return df.groupby([source_col, target_col], as_index=False)[weight_cols].sum() def _resolve_primary(weight_cols: list[str], threshold_col: str | None) -> str: """ Return the primary weight column used for thresholding and canonical weight. When multiple weight columns exist, this selects the one used for thresholding and populating the canonical 'weight' field. Parameters ---------- weight_cols : list[str] Candidate weight columns. threshold_col : str | None Preferred column to use if multiple weights are present. Returns ------- str The selected primary weight column name. """ # Upstream validator ensures either single weight or a valid threshold_col if len(weight_cols) == 1: return weight_cols[0] return cast("str", threshold_col) def _normalize_edgelist( edgelist_df: pd.DataFrame, *, source_col: str, target_col: str, weight_cols: list[str], threshold: float | None, threshold_col: str | None, include_self_loops: bool, ) -> pd.DataFrame: """ Normalize an edgelist with optional multi-attribute weights. Coerces weights to numeric, aggregates duplicates, applies thresholding, and returns canonical columns with a primary 'weight' field. Parameters ---------- edgelist_df : pandas.DataFrame Input edge list DataFrame containing at least the source and target columns and the specified weight columns. source_col : str Column name for origin identifiers in ``edgelist_df``. target_col : str Column name for destination identifiers in ``edgelist_df``. weight_cols : list[str] Names of numeric weight (flow) columns to process and preserve. threshold : float | None Inclusive threshold applied to the primary weight column; when ``None``, zeros are dropped by default. threshold_col : str | None Name of the primary weight column used for thresholding when multiple weights are provided; if a single weight column exists, this may be ``None``. include_self_loops : bool Whether to retain edges where source equals target; defaults to False. Returns ------- pandas.DataFrame Canonical edge list with columns ['source', 'target', 'weight', ...], where 'weight' mirrors the chosen primary weight column. """ # Coerce and aggregate in focused helpers coerced = _coerce_weight_columns(edgelist_df, weight_cols) agg_df = _aggregate_edgelist( coerced, source_col=source_col, target_col=target_col, weight_cols=weight_cols ) # Remove self-loops unless explicitly included if not include_self_loops: agg_df = agg_df.loc[agg_df[source_col] != agg_df[target_col]] # Select primary column for thresholding and canonical weight primary = _resolve_primary(weight_cols, threshold_col) # Apply threshold mask = _apply_threshold(agg_df[primary], threshold=threshold) filtered = agg_df.loc[mask].copy() # Build canonical columns and rename source/target for consistency out = filtered.rename(columns={source_col: "source", target_col: "target"}) out.insert(2, "weight", out[primary]) # canonical weight mirrors primary # Ensure canonical ordering: source, target, weight, then other weight columns # Keep only specified weight columns (MVP drops other non-weight attributes per 6.6) remaining_weights = [c for c in weight_cols] # 'weight' already mirrors primary; still preserve all requested columns cols = ["source", "target", "weight", *remaining_weights] return out.loc[:, cols] def _validate_adjacency_data( od_data: pd.DataFrame | np.ndarray, *, zones_gdf: gpd.GeoDataFrame, **kwargs: object, ) -> None: """ Validate adjacency style OD data (square, labels/order). Checks shape, labeling, and compatibility with the provided zones when using ndarray inputs. Parameters ---------- od_data : pandas.DataFrame | numpy.ndarray Adjacency data. zones_gdf : geopandas.GeoDataFrame Zones for validating shape/ordering when ndarray is used. Returns ------- None This function validates input and raises on failure. Other Parameters ---------------- **kwargs : object Additional keyword arguments ignored by this validator. """ _ = kwargs # ignore extra keyword arguments intentionally if isinstance(od_data, pd.DataFrame): checks = [ (od_data.shape[0] == od_data.shape[1], "Adjacency DataFrame must be square"), ( od_data.index.equals(od_data.columns), "Adjacency DataFrame index and columns must match exactly", ), ( od_data.index.is_unique and od_data.columns.is_unique, "Adjacency DataFrame index and columns must be unique", ), ] for ok, msg in checks: if not ok: raise ValueError(msg) elif isinstance(od_data, np.ndarray): if not (od_data.ndim == 2 and od_data.shape[0] == od_data.shape[1]): msg = "Adjacency ndarray must be 2D square" raise ValueError(msg) if od_data.shape[0] != len(zones_gdf): msg = "Adjacency ndarray size must match number of zones in zones_gdf" raise ValueError(msg) warnings.warn( "Assuming ndarray row/column ordering matches zones_gdf order (requirement 5.7)", UserWarning, stacklevel=2, ) else: msg = "For matrix_type='adjacency', od_data must be a pandas DataFrame or numpy ndarray" raise TypeError(msg) def _validate_edgelist_data( od_data: pd.DataFrame, *, source_col: str, target_col: str, weight_cols: list[str] | None, **kwargs: object, ) -> None: """ Validate edgelist structural requirements (required cols, weights present). Ensures the presence of required columns and weight fields prior to normalization and thresholding. Parameters ---------- od_data : pandas.DataFrame Edge list data. source_col, target_col : str Column names for origin/destination identifiers. weight_cols : list[str] | None Names of weight columns that must exist. Returns ------- None This function validates input and raises on failure. Other Parameters ---------------- **kwargs : object Additional keyword arguments ignored by this validator. """ _ = kwargs # ignore extra keyword arguments intentionally if not isinstance(od_data, pd.DataFrame): msg = "For matrix_type='edgelist', od_data must be a pandas DataFrame" raise TypeError(msg) missing_basic = [c for c in (source_col, target_col) if c not in od_data.columns] if missing_basic: msg = f"Edgelist DataFrame missing required columns: {', '.join(missing_basic)}" raise ValueError(msg) if not weight_cols: msg = "weight_cols must be provided (at least one weight column) for edgelist input" raise ValueError(msg) missing_w = [c for c in weight_cols if c not in od_data.columns] if missing_w: msg = f"weight_cols contain columns not present in edgelist: {', '.join(missing_w)}" raise ValueError(msg) def _validate_weights_threshold( _od_edgelist: pd.DataFrame, *, weight_cols: list[str], _threshold: float | None, threshold_col: str | None, ) -> None: """ Validate weight columns and threshold semantics (subset of full spec). Confirms that a valid primary column is designated when multiple weights are supplied and that provided names are consistent. Parameters ---------- _od_edgelist : pandas.DataFrame Edge list data (unused here; included for signature symmetry). weight_cols : list[str] | None Weight columns provided by the caller. _threshold : float | None Threshold value. threshold_col : str | None Name of the column used for thresholding when multiple weights exist. Returns ------- None This function validates input and raises on failure. """ # Validate threshold_col selection and unify coercion using shared helper if len(weight_cols) > 1 and (threshold_col is None or threshold_col not in weight_cols): msg = "When multiple weight_cols are provided a valid threshold_col must be specified" raise ValueError(msg) if len(weight_cols) == 1 and (threshold_col is not None and threshold_col not in weight_cols): msg = "threshold_col not in weight_cols" raise ValueError(msg) # Do not coerce here; coercion happens once in _normalize_edgelist # --------------------------------------------------------------------------- # Zone alignment and mapping helpers # --------------------------------------------------------------------------- def _align_adjacency_zones( adjacency_df: pd.DataFrame, *, zones_gdf: gpd.GeoDataFrame, zone_id_col: str | None, ) -> pd.DataFrame: """ Align an adjacency DataFrame to the provided zones. Reindexes the matrix to the intersection of zone identifiers and warns about labels that are missing on either side. Parameters ---------- adjacency_df : pandas.DataFrame Square adjacency matrix with zone ID index/columns. zones_gdf : geopandas.GeoDataFrame Zones GeoDataFrame. zone_id_col : str Column name holding unique zone identifiers in ``zones_gdf``. Returns ------- pandas.DataFrame Sub-matrix aligned to overlapping zone IDs in both inputs. """ matrix_ids = pd.Index(adjacency_df.index) zones_ids = ( pd.Index(zones_gdf[zone_id_col]) if zone_id_col is not None else pd.Index(zones_gdf.index) ) common = matrix_ids.intersection(zones_ids) if common.empty: msg = "No overlapping zone IDs between adjacency matrix and zones_gdf" logger.error(msg) raise ValueError(msg) missing_in_matrix = zones_ids.difference(matrix_ids) missing_in_zones = matrix_ids.difference(zones_ids) if len(missing_in_matrix) > 0: warnings.warn( f"{len(missing_in_matrix)} zone IDs in zones_gdf not present in adjacency; they will be isolated nodes", UserWarning, stacklevel=2, ) if len(missing_in_zones) > 0: warnings.warn( f"{len(missing_in_zones)} labels in adjacency not present in zones_gdf; related edges will be dropped", UserWarning, stacklevel=2, ) # Reindex to common ids (preserve the order from adjacency_df to keep consistency with provided matrix) return adjacency_df.reindex(index=common, columns=common) def _align_numpy_array_zones( adjacency_array: np.ndarray, *, zones_gdf: gpd.GeoDataFrame, zone_id_col: str | None, ) -> pd.Index: """ Validate ndarray size and return zone ids in ``zones_gdf`` order. Ensures the array size matches the number of zones and yields the identifier sequence in the same order as the zones GeoDataFrame. Parameters ---------- adjacency_array : numpy.ndarray Square adjacency array. zones_gdf : geopandas.GeoDataFrame Zones GeoDataFrame. zone_id_col : str Column name holding unique zone identifiers in ``zones_gdf``. Returns ------- pandas.Index Zone identifiers in the same order as ``zones_gdf``. """ # Upstream validation already checks shape and size; here we just warn about assumed ordering warnings.warn( ( "Assuming ndarray row/column ordering matches zones_gdf order (requirement 5.7); " f"ndarray rows={adjacency_array.shape[0]}" ), UserWarning, stacklevel=2, ) return cast( "pd.Index", pd.Index(zones_gdf[zone_id_col] if zone_id_col is not None else zones_gdf.index) ) def _align_edgelist_zones( edgelist_df: pd.DataFrame, *, zones_gdf: gpd.GeoDataFrame, zone_id_col: str | None, source_col: str, target_col: str, ) -> pd.DataFrame: """ Filter edgelist to valid zones (no aggregation here). Drops edges referencing unknown zone identifiers while preserving duplicates for later aggregation. Parameters ---------- edgelist_df : pandas.DataFrame Input edgelist containing at least ``source_col`` and ``target_col``. zones_gdf : geopandas.GeoDataFrame Zones with ``zone_id_col`` present. zone_id_col : str Column name holding unique zone identifiers. source_col, target_col : str Column names for origin/destination identifiers in ``edgelist_df``. Returns ------- pandas.DataFrame Filtered edgelist containing only valid zone IDs. Duplicate ``(source, target)`` rows are preserved here and handled downstream. """ valid_ids = set( (zones_gdf[zone_id_col] if zone_id_col is not None else zones_gdf.index).tolist() ) before = len(edgelist_df) mask_valid = edgelist_df[source_col].isin(valid_ids) & edgelist_df[target_col].isin(valid_ids) filtered = edgelist_df.loc[mask_valid].copy() dropped = before - len(filtered) if dropped > 0: warnings.warn( f"Dropped {dropped} edges referencing unknown zone IDs (requirement 3.6)", UserWarning, stacklevel=2, ) if filtered.empty: msg = "No overlapping zone IDs between edgelist and zones_gdf" logger.error(msg) raise ValueError(msg) return filtered # --------------------------------------------------------------------------- # Spatial geometry helpers (centroids and edge geometries) # --------------------------------------------------------------------------- def _validate_crs(zones_gdf: gpd.GeoDataFrame) -> None: """ Warn about CRS issues and ensure CRS info is preserved. Emits warnings for missing CRS and geographic CRS to signal potential issues with distance-based computations. Parameters ---------- zones_gdf : geopandas.GeoDataFrame Zones GeoDataFrame whose CRS will be validated. Returns ------- None This function validates input and raises on failure. """ crs = zones_gdf.crs if crs is None: warnings.warn( "zones_gdf has no CRS set; outputs will have undefined CRS (requirement 3.1)", UserWarning, stacklevel=2, ) return # pyproj.CRS exposes is_geographic when available if getattr(crs, "is_geographic", False): warnings.warn( "Geographic CRS detected; distance/length measures may be inaccurate (requirement 3.5)", UserWarning, stacklevel=2, ) def _compute_centroids(zones_gdf: gpd.GeoDataFrame) -> gpd.GeoSeries: """ Compute centroids for zones preserving CRS. Calculates feature centroids and explicitly propagates the CRS to the resulting GeoSeries for consistency. Parameters ---------- zones_gdf : geopandas.GeoDataFrame Zones GeoDataFrame whose feature centroids will be computed. Returns ------- geopandas.GeoSeries GeoSeries indexed like ``zones_gdf`` with the same CRS. """ # GeoPandas returns a GeoSeries with the same CRS as input geometry centroids = zones_gdf.geometry.centroid # Explicitly carry CRS (some versions keep it automatically, we enforce) centroids.set_crs(zones_gdf.crs, allow_override=True, inplace=True) return centroids def _create_edge_geometries( edges_df: pd.DataFrame, zones_gdf: gpd.GeoDataFrame, *, zone_id_col: str | None, source_col: str = "source", target_col: str = "target", compute_edge_geometry: bool = True, ) -> gpd.GeoDataFrame: """ Create LineString geometries connecting zone centroids for each edge. For each row in ``edges_df`` the function looks up the centroid of the origin and destination zones and constructs a ``shapely.geometry.LineString`` between them. If ``compute_edge_geometry`` is ``False`` or the input is empty, a GeoDataFrame is returned with a ``geometry`` column containing ``None`` for each row. Edges with unknown zone IDs or missing centroids are dropped with a warning. Parameters ---------- edges_df : pandas.DataFrame Canonical edgelist containing at least ``source_col`` and ``target_col`` columns, and optionally weight attributes. zones_gdf : geopandas.GeoDataFrame GeoDataFrame of zone geometries used to compute centroids. zone_id_col : str or None Name of the identifier column in ``zones_gdf``. If ``None``, the index of ``zones_gdf`` is used as the identifier space. source_col : str, default 'source' Column name in ``edges_df`` holding origin zone identifiers. target_col : str, default 'target' Column name in ``edges_df`` holding destination zone identifiers. compute_edge_geometry : bool, default True Whether to compute LineString geometries. When ``False``, no geometry is computed and ``None`` is stored in the geometry column. Returns ------- geopandas.GeoDataFrame GeoDataFrame with the same non-geometry columns as ``edges_df`` and a ``geometry`` column containing LineStrings connecting origin and destination centroids. The CRS is inherited from ``zones_gdf``. """ e = edges_df.copy() if not compute_edge_geometry or e.empty: geom = gpd.GeoSeries([None] * len(e), crs=zones_gdf.crs) return gpd.GeoDataFrame(e, geometry=geom, crs=zones_gdf.crs) # Centroid lookup by ID centroids = _compute_centroids(zones_gdf) if zone_id_col is not None: centroids.index = pd.Index(zones_gdf[zone_id_col]) # Map ids -> centroids via dict for clarity lookup = centroids.to_dict() src_pts = e[source_col].map(lookup) tgt_pts = e[target_col].map(lookup) missing_any = src_pts.isna() | tgt_pts.isna() n_missing = int(missing_any.sum()) if n_missing: warnings.warn( f"Dropping {n_missing} edges with unknown zone IDs or missing centroid(s) (requirement 3.6)", UserWarning, stacklevel=2, ) e = e.loc[~missing_any].copy() src_pts = src_pts.loc[~missing_any] tgt_pts = tgt_pts.loc[~missing_any] if e.empty: return gpd.GeoDataFrame(e, geometry=gpd.GeoSeries([], crs=zones_gdf.crs), crs=zones_gdf.crs) lines = [ LineString([a, b]) for a, b in zip(src_pts.to_numpy(), tgt_pts.to_numpy(), strict=True) ] geom = gpd.GeoSeries(lines, crs=zones_gdf.crs) return gpd.GeoDataFrame(e.reset_index(drop=True), geometry=geom, crs=zones_gdf.crs) def _symmetrize_edges(edges: pd.DataFrame, *, sum_cols: list[str]) -> pd.DataFrame: """ Merge reciprocal directed edges into undirected edges by summing weights. For each unordered pair {u, v}, produce a single edge with ``source`` <= ``target`` (based on string representation ordering) and sum the provided ``sum_cols`` across both directions. Self-loops (u == v) are preserved and included as-is. Parameters ---------- edges : pandas.DataFrame Canonical directed edgelist containing 'source', 'target', and sum_cols. sum_cols : list[str] Columns to sum when merging reciprocal edges. Must include 'weight'. Returns ------- pandas.DataFrame Undirected edgelist with merged weights and canonical columns. """ edges_work = edges.copy() # Normalize (u, v) ordering deterministically; keep self-loops unchanged # Use astype(str) to ensure consistent comparison across dtypes s_str = edges_work["source"].astype(str) t_str = edges_work["target"].astype(str) # mask where source string is lexicographically <= target string keep = s_str <= t_str src_norm = edges_work["source"].where(keep, edges_work["target"]) # swap when needed tgt_norm = edges_work["target"].where(keep, edges_work["source"]) # swap when needed edges_work["_u"] = src_norm edges_work["_v"] = tgt_norm # Group by normalized unordered pair and sum all requested columns group_cols = ["_u", "_v"] agg = edges_work.groupby(group_cols, as_index=False)[sum_cols].sum() # Rename normalized cols back to canonical names agg = agg.rename(columns={"_u": "source", "_v": "target"}) # Ensure column order: source, target, weight, then any others in sum_cols order other_cols = [c for c in sum_cols if c != "weight"] ordered_cols = ["source", "target", "weight", *other_cols] # Preserve any remaining non-summed columns (if present) by left-joining original # But to keep the function focused and deterministic, we return only canonical columns return agg.loc[:, ordered_cols]