Source code for city2graph.morphology

"""Network analysis functions for creating and manipulating network graphs."""

import logging

import geopandas as gpd
import libpysal
import momepy
import networkx as nx
import numpy as np
import pandas as pd
import shapely

from city2graph.utils import create_tessellation
from city2graph.utils import dual_graph
from city2graph.utils import filter_graph_by_distance

# Set up logger for this module
logger = logging.getLogger(__name__)

# Define the public API for this module
__all__ = [
    "morphological_graph",
    "private_to_private_graph",
    "private_to_public_graph",
    "public_to_public_graph",
]


# ===============================================================================
# VALIDATION FUNCTIONS
# ===============================================================================

def _validate_inputs(
    privates: gpd.GeoDataFrame,
    publics: gpd.GeoDataFrame,
    buffer: float,
) -> None:
    """Validate input parameters for spatial operations."""
    if not isinstance(privates, gpd.GeoDataFrame):
        msg = "privates must be a GeoDataFrame"
        raise TypeError(msg)
    if not isinstance(publics, gpd.GeoDataFrame):
        msg = "publics must be a GeoDataFrame"
        raise TypeError(msg)
    if not isinstance(buffer, (int, float)):
        msg = "buffer must be a number"
        raise TypeError(msg)


def _check_empty_dataframes(
    privates: gpd.GeoDataFrame,
    publics: gpd.GeoDataFrame,
) -> bool:
    """Check if DataFrames are empty and warn accordingly."""
    if privates.empty:
        logger.warning("privates GeoDataFrame is empty")
        return True
    if publics.empty:
        logger.warning("publics GeoDataFrame is empty")
        return True
    return False


def _validate_columns(
    publics: gpd.GeoDataFrame,
    privates: gpd.GeoDataFrame,
    public_id_col: str,
    private_id_col: str | None,
    public_geom_col: str | None,
) -> None:
    """Validate that required columns exist in DataFrames."""
    if public_id_col not in publics.columns:
        msg = f"public_id_col '{public_id_col}' not found in publics"
        raise ValueError(msg)
    if private_id_col and private_id_col not in privates.columns:
        msg = f"private_id_col '{private_id_col}' not found in privates"
        raise ValueError(msg)
    if public_geom_col and public_geom_col not in publics.columns:
        msg = f"public_geom_col '{public_geom_col}' not found in publics"
        raise ValueError(msg)


def _validate_geometries(
    privates: gpd.GeoDataFrame,
    publics: gpd.GeoDataFrame,
) -> None:
    """Validate geometry types in DataFrames."""
    # Check private space geometries (should be polygons)
    invalid_private_geoms = ~privates.geometry.apply(
        lambda g: isinstance(g, (shapely.geometry.Polygon, shapely.geometry.MultiPolygon)),
    )
    if invalid_private_geoms.any():
        logger.warning(
            "Found %d private geometries that are not Polygons or MultiPolygons",
            invalid_private_geoms.sum(),
        )

    # Check public space geometries (should be linestrings)
    invalid_public_geoms = ~publics.geometry.apply(
        lambda g: isinstance(g, (shapely.geometry.LineString, shapely.geometry.MultiLineString)),
    )
    if invalid_public_geoms.any():
        logger.warning(
            "Found %d public geometries that are not LineStrings or MultiLineStrings",
            invalid_public_geoms.sum(),
        )


def _ensure_crs_compatibility(
    target_gdf: gpd.GeoDataFrame,
    source_gdf: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
    """Ensure source GeoDataFrame has same CRS as target, transforming if necessary."""
    if source_gdf.crs != target_gdf.crs:
        logger.warning(
            "CRS mismatch detected. Converting from %s to %s",
            source_gdf.crs, target_gdf.crs,
        )
        return source_gdf.to_crs(target_gdf.crs)
    return source_gdf


# ===============================================================================
# SPATIAL ANALYSIS FUNCTIONS
# ===============================================================================

def _prepare_id_columns(
    privates: gpd.GeoDataFrame,
    publics: gpd.GeoDataFrame,
    private_id_col: str | None,
    public_id_col: str | None,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, str, str]:
    """Prepare ID columns for spatial operations."""
    privates_copy = privates.copy()
    publics_copy = publics.copy()

    if private_id_col is None:
        privates_copy = privates_copy.reset_index(drop=True)
        private_id_col = "index"
        privates_copy[private_id_col] = privates_copy.index

    if public_id_col is None:
        publics_copy = publics_copy.reset_index(drop=True)
        public_id_col = "index"
        publics_copy[public_id_col] = publics_copy.index

    return privates_copy, publics_copy, private_id_col, public_id_col


def _prepare_buffered_geometry(
    publics: gpd.GeoDataFrame,
    public_geom_col: str | None,
    buffer: float,
) -> gpd.GeoDataFrame:
    """Prepare buffered public geometries for spatial operations."""
    publics_copy = publics.copy()

    # Use alternative geometry column if specified
    if public_geom_col is not None:
        publics_copy.geometry = gpd.GeoSeries(
            publics_copy[public_geom_col], crs=publics_copy.crs,
        )

    # Create buffered geometries for intersection test
    publics_copy.geometry = publics_copy.geometry.buffer(buffer)
    return publics_copy


def _get_adjacent_publics(
    privates: gpd.GeoDataFrame,
    publics: gpd.GeoDataFrame,
    public_id_col: str = "id",
    private_id_col: str | None = None,
    public_geom_col: str | None = None,
    buffer: float = 1,
) -> dict[str, list[str]]:
    """
    Identify streets that are contained in or adjacent to each polygon.

    This function returns a dictionary mapping each polygon (by its index)
    to a list of street identifiers that intersect the polygon geometry.

    Parameters
    ----------
    privates : geopandas.GeoDataFrame
        GeoDataFrame containing private space polygons
    publics : geopandas.GeoDataFrame
        GeoDataFrame containing public space linestrings
    public_id_col : str, default "id"
        Column name in publics that uniquely identifies each public space
    private_id_col : str, default None
        Column name in privates that uniquely identifies each private space
        If None, will use DataFrame index
    public_geom_col : str, default None
        Column name in publics containing alternative geometry to use
        If None, will use the geometry column
    buffer : float, default 1
        Buffer distance to apply to public geometries for intersection

    Returns
    -------
    dict
        Dictionary mapping private space IDs to lists of adjacent public space IDs

    Raises
    ------
    ValueError
        If input GeoDataFrames are empty or required columns are missing
    TypeError
        If inputs are not GeoDataFrames or if buffer is not a number
    """
    # Input validation
    _validate_inputs(privates, publics, buffer)

    # Check for empty DataFrames
    if _check_empty_dataframes(privates, publics):
        return {}

    # Check if required columns exist
    _validate_columns(publics, privates, public_id_col, private_id_col, public_geom_col)

    # Check geometry types
    _validate_geometries(privates, publics)

    # Ensure CRS compatibility
    publics = _ensure_crs_compatibility(privates, publics)

    # Prepare ID columns
    privates_prep, publics_prep, private_id_final, public_id_final = _prepare_id_columns(
        privates, publics, private_id_col, public_id_col,
    )

    # Prepare buffered geometries
    publics_buffered = _prepare_buffered_geometry(publics_prep, public_geom_col, buffer)

    # Perform spatial join
    joined = gpd.sjoin(
        publics_buffered,
        privates_prep,
        how="inner",
        predicate="intersects",
    )

    # Group by private_id and collect public ids
    index_col = "index_right" if private_id_final == "index" else private_id_final
    return (
        joined.groupby(index_col)[public_id_final]
        .apply(lambda x: x.unique().tolist())
        .to_dict()
    )



def _validate_connecting_lines_inputs(
    enclosed_tess: gpd.GeoDataFrame,
    public_dual_gdf: gpd.GeoDataFrame,
    adjacent_streets: dict,
) -> None:
    """Validate inputs for creating connecting lines."""
    if not isinstance(enclosed_tess, gpd.GeoDataFrame):
        msg = "enclosed_tess must be a GeoDataFrame"
        raise TypeError(msg)
    if not isinstance(public_dual_gdf, gpd.GeoDataFrame):
        msg = "public_dual_gdf must be a GeoDataFrame"
        raise TypeError(msg)
    if not isinstance(adjacent_streets, dict):
        msg = "adjacent_streets must be a dictionary"
        raise TypeError(msg)


def _check_empty_connecting_inputs(
    enclosed_tess: gpd.GeoDataFrame,
    public_dual_gdf: gpd.GeoDataFrame,
    adjacent_streets: dict,
) -> gpd.GeoDataFrame | None:
    """Check for empty inputs and return empty GeoDataFrame if found."""
    empty_gdf = gpd.GeoDataFrame(
        {"private_id": [], "public_id": [], "geometry": []},
        crs=enclosed_tess.crs,
    )

    if enclosed_tess.empty:
        logger.warning("enclosed_tess GeoDataFrame is empty")
        return empty_gdf
    if public_dual_gdf.empty:
        logger.warning("public_dual_gdf GeoDataFrame is empty")
        return empty_gdf
    if not adjacent_streets:
        logger.warning("adjacent_streets dictionary is empty")
        return empty_gdf

    return None


def _prepare_id_columns_for_connections(
    enclosed_tess: gpd.GeoDataFrame,
    public_dual_gdf: gpd.GeoDataFrame,
    private_id_col: str | None,
    public_id_col: str | None,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, str, str]:
    """Prepare ID columns for connection creation."""
    enclosed_tess_copy = enclosed_tess.copy()
    public_dual_copy = public_dual_gdf.copy()

    # Handle private ID column
    if private_id_col is None:
        enclosed_tess_copy = enclosed_tess_copy.reset_index(drop=True)
        private_id_col = "index"
        enclosed_tess_copy[private_id_col] = enclosed_tess_copy.index

    # Handle public ID column
    if public_id_col is None:
        public_dual_copy = public_dual_copy.reset_index(drop=True)
        public_id_col = "index"
        public_dual_copy[public_id_col] = public_dual_copy.index

    return enclosed_tess_copy, public_dual_copy, private_id_col, public_id_col


def _create_connecting_lines(
    enclosed_tess: gpd.GeoDataFrame,
    public_dual_gdf: gpd.GeoDataFrame,
    adjacent_streets: dict,
    private_id_col: str | None = None,
    public_id_col: str | None = None,
) -> gpd.GeoDataFrame:
    """
    Create LineStrings connecting tessellation centroids to adjacent street nodes.

    Parameters
    ----------
    enclosed_tess : geopandas.GeoDataFrame
        GeoDataFrame containing private space polygons
    public_dual_gdf : geopandas.GeoDataFrame
        GeoDataFrame containing public space nodes
    adjacent_streets : dict
        Dictionary mapping private space IDs to lists of adjacent public space IDs
    private_id_col : str, default None
        Column name in enclosed_tess that uniquely identifies each private space
        If None, will use DataFrame index
    public_id_col : str, default None
        Column name in public_dual_gdf that uniquely identifies each public space
        If None, will use DataFrame index

    Returns
    -------
    geopandas.GeoDataFrame
        GeoDataFrame containing LineString connections between private and public spaces

    Raises
    ------
    TypeError
        If inputs are not of correct type
    ValueError
        If required data is missing or invalid
    """
    # Input validation
    _validate_connecting_lines_inputs(enclosed_tess, public_dual_gdf, adjacent_streets)

    # Check for empty inputs
    empty_result = _check_empty_connecting_inputs(enclosed_tess, public_dual_gdf, adjacent_streets)
    if empty_result is not None:
        return empty_result

    # Check CRS compatibility
    if enclosed_tess.crs != public_dual_gdf.crs:
        logger.warning(
            "CRS mismatch between enclosed_tess and public_dual_gdf. "
            "Converting public_dual_gdf to match enclosed_tess CRS.",
        )
        public_dual_gdf = public_dual_gdf.to_crs(enclosed_tess.crs)

    # Prepare ID columns for tessellation
    enclosed_tess_prep = enclosed_tess.copy()
    if private_id_col is None:
        enclosed_tess_prep = enclosed_tess_prep.reset_index(drop=True)
        private_id_col = "index"
        enclosed_tess_prep[private_id_col] = enclosed_tess_prep.index

    # Convert the adjacency dictionary to a DataFrame for vectorized operations
    connections = []
    for private_id, public_ids in adjacent_streets.items():
        connections.extend(
            [{"private_id": private_id, "public_id": pid} for pid in public_ids],
        )

    if not connections:
        return gpd.GeoDataFrame(
            {"private_id": [], "public_id": [], "geometry": []},
            crs=enclosed_tess.crs,
        )

    # Create DataFrame of connections
    connections_df = pd.DataFrame(connections)

    # Compute centroids once
    centroids = enclosed_tess_prep.set_index(private_id_col)["geometry"].centroid

    # Join connections with centroids
    connections_df = connections_df.join(centroids.rename("centroid"), on="private_id")

    # For dual graphs, the index typically corresponds to the original segment IDs
    # Create a mapping from public_id to dual graph geometry
    if public_id_col and public_id_col in public_dual_gdf.columns:
        # If the dual graph has the ID column, use it
        public_geom_series = public_dual_gdf.set_index(public_id_col)["geometry"]
    else:
        # Otherwise, assume the index corresponds to the public IDs
        public_geom_series = public_dual_gdf["geometry"]
        # Reset index to make it accessible for joining
        public_geom_series.index = public_dual_gdf.index

    # Join with public geometries
    connections_df = connections_df.join(
        public_geom_series.rename("public_geom"),
        on="public_id",
    )

    # Filter valid connections and create LineStrings
    valid_df = connections_df.dropna(subset=["centroid", "public_geom"])

    if valid_df.empty:
        return gpd.GeoDataFrame(
            {"private_id": [], "public_id": [], "geometry": []},
            crs=enclosed_tess.crs,
        )

    # Create LineStrings vectorized
    valid_df["geometry"] = valid_df.apply(
        lambda row: shapely.LineString(
            [(row.centroid.x, row.centroid.y), (row.public_geom.x, row.public_geom.y)],
        ),
        axis=1,
    )

    # Create final GeoDataFrame
    return gpd.GeoDataFrame(
        valid_df[["private_id", "public_id", "geometry"]],
        geometry="geometry",
        crs=enclosed_tess.crs,
    )


def _prep_contiguity_graph(
    tess_group: gpd.GeoDataFrame,
    private_id_col: str | None,
    contiguity: str) -> tuple[nx.Graph | None, dict | None]:
    """
    Prepare a contiguity-based graph for a group of tessellation polygons.

    Parameters
    ----------
    tess_group : geopandas.GeoDataFrame
        GeoDataFrame containing tessellation polygons for one group
    private_id_col : str
        Column name that uniquely identifies each private space
    contiguity : str
        Type of contiguity ('queen' or 'rook')

    Returns
    -------
    tuple
        (NetworkX graph, mapping of positions to IDs)
    """
    # Skip if only one polygon
    if tess_group.shape[0] < 2:
        return None, None

    # Create mapping from position to ID
    pos_to_id_mapping = dict(
        enumerate(tess_group[private_id_col] if private_id_col else tess_group.index),
    )

    # Create contiguity weights
    if contiguity.lower() == "queen":
        w = libpysal.weights.Queen.from_dataframe(
            tess_group, geom_col="geometry", use_index=False,
        )
    elif contiguity.lower() == "rook":
        w = libpysal.weights.Rook.from_dataframe(
            tess_group, geom_col="geometry", use_index=False,
        )
    else:
        msg = "contiguity must be 'queen' or 'rook'"
        raise ValueError(msg)

    w_nx = w.to_networkx()

    # Skip if no edges
    if w_nx.number_of_edges() == 0:
        return None, None

    # Compute centroids vectorized
    centroids = np.column_stack(
        [tess_group.geometry.centroid.x, tess_group.geometry.centroid.y],
    )

    # Create positions dictionary
    positions = {node: (x, y) for node, (x, y) in zip(w_nx.nodes(), centroids, strict=False)}

    # Set node attributes
    nx.set_node_attributes(w_nx, {n: pos[0] for n, pos in positions.items()}, "x")
    nx.set_node_attributes(w_nx, {n: pos[1] for n, pos in positions.items()}, "y")

    # Create edge geometries
    node_data = dict(w_nx.nodes(data=True))
    edge_geometry = {
        (u, v): shapely.geometry.LineString(
            [
                (node_data[u]["x"], node_data[u]["y"]),
                (node_data[v]["x"], node_data[v]["y"]),
            ],
        )
        for u, v in w_nx.edges()
    }

    nx.set_edge_attributes(w_nx, edge_geometry, "geometry")
    w_nx.graph["approach"] = "primal"

    return w_nx, pos_to_id_mapping


[docs] def private_to_private_graph( private_gdf: gpd.GeoDataFrame, private_id_col: str | None = None, group_col: str | None = None, contiguity: str = "queen") -> gpd.GeoDataFrame: """ Create contiguity matrices for private spaces. If group_col is provided, the contiguity is calculated for each group separately. Parameters ---------- private_gdf : geopandas.GeoDataFrame GeoDataFrame containing tessellation polygons private_id_col : str, default None Column name that uniquely identifies each private space If None, will use DataFrame index group_col : str, default None Column name used to group tessellation polygons If None, all polygons are treated as one group contiguity : str, default "queen" Type of contiguity, either 'queen' or 'rook' Returns ------- geopandas.GeoDataFrame GeoDataFrame containing LineString connections between adjacent private spaces Raises ------ TypeError If input is not a GeoDataFrame ValueError If contiguity is not 'queen' or 'rook' or if required columns are missing """ # Input validation if not isinstance(private_gdf, gpd.GeoDataFrame): msg = "private_gdf must be a GeoDataFrame" raise TypeError(msg) # Check for empty DataFrame if private_gdf.empty: logger.warning("private_gdf GeoDataFrame is empty") return gpd.GeoDataFrame( columns=[ "from_private_id", "to_private_id", group_col or "group", "geometry", ], geometry="geometry", crs=private_gdf.crs, ) # Validate contiguity parameter if contiguity.lower() not in ["queen", "rook"]: msg = "contiguity must be 'queen' or 'rook'" raise ValueError(msg) # Check specified columns exist if private_id_col is not None and private_id_col not in private_gdf.columns: msg = f"private_id_col '{private_id_col}' not found in private_gdf" raise ValueError(msg) if group_col is not None and group_col not in private_gdf.columns: msg = f"group_col '{group_col}' not found in private_gdf" raise ValueError(msg) # Validate geometry types invalid_geoms = private_gdf.geometry.apply( lambda g: not isinstance( g, (shapely.geometry.Polygon, shapely.geometry.MultiPolygon), ), ) if invalid_geoms.any(): logger.warning( "Found %d geometries that are not Polygon or MultiPolygon", invalid_geoms.sum(), ) # Check for null geometries if private_gdf.geometry.isna().any(): logger.warning("Found null geometries in private_gdf") private_gdf = private_gdf.dropna(subset=["geometry"]) # Handle private ID column if private_id_col is None: private_gdf = private_gdf.reset_index(drop=True) # Handle grouping if group_col is None: tess_groups = {"all": private_gdf.copy()} else: tess_groups = {name: group for name, group in private_gdf.groupby(group_col)} # noqa: C416 # List to collect edge DataFrames queen_edges_list = [] for enc, tess_group in tess_groups.items(): # Prepare contiguity graph for this group w_nx, pos_to_id_mapping = _prep_contiguity_graph( tess_group, private_id_col, contiguity, ) if w_nx is None: continue # Convert to GeoDataFrame _, edges = momepy.nx_to_gdf(w_nx) edges = edges.set_crs(tess_group.crs) # Map node indices to IDs vectorized edges["from_private_id"] = edges["node_start"].map(pos_to_id_mapping) edges["to_private_id"] = edges["node_end"].map(pos_to_id_mapping) # Drop unnecessary columns edges = edges.drop(columns=["node_start", "node_end"]) # Set group column group_key = group_col if group_col is not None else "group" edges[group_key] = enc # Select columns edges = edges[["from_private_id", "to_private_id", group_key, "geometry"]] queen_edges_list.append(edges) if queen_edges_list: return pd.concat(queen_edges_list, ignore_index=True) cols = [ "from_private_id", "to_private_id", group_col if group_col else "group", "geometry", ] return gpd.GeoDataFrame(columns=cols, crs=private_gdf.crs)
[docs] def private_to_public_graph( private_gdf: gpd.GeoDataFrame, public_gdf: gpd.GeoDataFrame, private_id_col: str | None = None, public_id_col: str | None = None, public_geom_col: str | None = None, tolerance: float = 1.0) -> gpd.GeoDataFrame: """ Create connections between private spaces and public spaces. Parameters ---------- private_gdf : geopandas.GeoDataFrame GeoDataFrame containing private space polygons public_gdf : geopandas.GeoDataFrame GeoDataFrame containing public space linestrings private_id_col : str, default None Column name in private_gdf that uniquely identifies each private space If None, will use DataFrame index public_id_col : str, default None Column name in public_gdf that uniquely identifies each public space If None, will use DataFrame index public_geom_col : str, default None Column name in public_gdf containing alternative geometry to use If None, will use the geometry column tolerance : float, default 1 Buffer tolerance for spatial joins Returns ------- geopandas.GeoDataFrame GeoDataFrame containing LineString connections between private and public spaces """ # Validate and filter public_gdf geometry types before creating dual graph if not public_gdf.geometry.apply( lambda g: isinstance(g, (shapely.geometry.LineString, shapely.geometry.MultiLineString)), ).all(): logger.warning( "Some geometries in public_gdf are not LineString or MultiLineString. " "Filtering to valid geometries only.", ) # Check if we have any valid geometries left if public_gdf.empty: return gpd.GeoDataFrame( {"private_id": [], "public_id": [], "geometry": []}, crs=private_gdf.crs if hasattr(private_gdf, "crs") else None, ) # Create dual graph from publics public_dual_gdf, public_to_public_dict = dual_graph( public_gdf, id_col=public_id_col, tolerance=tolerance, ) # Identify adjacent streets adjacent_publics = _get_adjacent_publics( private_gdf, public_gdf, private_id_col=private_id_col, public_id_col=public_id_col, public_geom_col=public_geom_col, buffer=tolerance, ) # Create connecting lines return _create_connecting_lines( private_gdf, public_dual_gdf, adjacent_publics, private_id_col=private_id_col, public_id_col=public_id_col, )
[docs] def public_to_public_graph( public_gdf: gpd.GeoDataFrame, public_id_col: str | None = None, tolerance: float = 1e-8) -> gpd.GeoDataFrame: """ Create connections between public spaces represented as a dual graph. Parameters ---------- public_gdf : geopandas.GeoDataFrame GeoDataFrame containing public space linestrings public_id_col : str, default None Column name that uniquely identifies each public space If None, will use DataFrame index tolerance : float, default 1e-8 Distance tolerance for detecting endpoint connections Returns ------- geopandas.GeoDataFrame GeoDataFrame containing LineString connections between public spaces """ # Validate and filter public_gdf geometry types before creating dual graph if not public_gdf.geometry.apply( lambda g: isinstance(g, (shapely.geometry.LineString, shapely.geometry.MultiLineString)), ).all(): logger.warning( "Some geometries in public_gdf are not LineString or MultiLineString. " "Filtering to valid geometries only.", ) # Check if we have any valid geometries left if public_gdf.empty: return gpd.GeoDataFrame( {"from_public_id": [], "to_public_id": [], "geometry": []}, crs=public_gdf.crs, ) # Create dual representation public_dual_gdf, public_to_public_dict = dual_graph( public_gdf, id_col=public_id_col, tolerance=tolerance, ) # Handle empty result if public_dual_gdf.empty or not public_to_public_dict: return gpd.GeoDataFrame( {"from_public_id": [], "to_public_id": [], "geometry": []}, crs=public_gdf.crs, ) # Extract point coordinates efficiently point_coords = public_dual_gdf.geometry.apply(lambda p: (p.x, p.y)).to_dict() # Track processed connections using a set processed_connections = set() # Prepare records list - preallocate for better performance connections_data = [] # Generate unique connections for from_id, connected_ids in public_to_public_dict.items(): if from_id not in point_coords: continue from_coords = point_coords[from_id] for to_id in connected_ids: # Create a unique identifier for this connection connection = tuple(sorted([from_id, to_id])) # Skip if already processed if connection in processed_connections or to_id not in point_coords: continue processed_connections.add(connection) to_coords = point_coords[to_id] # Add connection record connections_data.append( { "from_public_id": from_id, "to_public_id": to_id, "geometry": shapely.LineString([from_coords, to_coords]), }, ) # Create GeoDataFrame from connection data if connections_data: return gpd.GeoDataFrame( connections_data, geometry="geometry", crs=public_dual_gdf.crs, ) return gpd.GeoDataFrame( {"from_public_id": [], "to_public_id": [], "geometry": []}, crs=public_gdf.crs, )
# =============================================================================== # MAIN MORPHOLOGICAL GRAPH FUNCTIONS # =============================================================================== def _prepare_barrier_geometry( segments_gdf: gpd.GeoDataFrame, public_geom_col: str, ) -> tuple[gpd.GeoDataFrame, str]: """Prepare barrier geometry for tessellation creation.""" # Check if public_geom_col exists in segments_gdf if public_geom_col not in segments_gdf.columns: logger.warning( "Column '%s' not found in segments_gdf. Using 'geometry' column instead.", public_geom_col, ) public_geom_col = "geometry" # Create a GeoDataFrame with the barrier geometry for tessellation if public_geom_col == "geometry": # If using the main geometry column, just use the segments as-is barrier_gdf = segments_gdf.copy() else: # Create a completely new GeoDataFrame with the barrier geometry barrier_gdf = gpd.GeoDataFrame( segments_gdf.drop(columns=["geometry"]), geometry=segments_gdf[public_geom_col].values, crs=segments_gdf.crs, ) return barrier_gdf, public_geom_col def _filter_tessellation_by_adjacency( enclosed_tess: gpd.GeoDataFrame, segments_subset: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """Filter tessellation to only include cells adjacent to segments.""" if enclosed_tess.empty or segments_subset.empty: return enclosed_tess.copy() # Get the enclosure indices that are adjacent to the segments using a spatial join adjacent_enclosure_indices = gpd.sjoin( enclosed_tess, segments_subset, how="inner", predicate="intersects", ) # Check if the tessellation has an enclosure_index column (from enclosed tessellation) # If not, use the tessellation index as the enclosure index if "enclosure_index" in adjacent_enclosure_indices.columns: enclosure_indices = adjacent_enclosure_indices["enclosure_index"].unique() # Get enclosed tessellation for the adjacent enclosures if "enclosure_index" in enclosed_tess.columns: return enclosed_tess[enclosed_tess["enclosure_index"].isin(enclosure_indices)] else: # For morphological tessellation, use the tessellation index enclosure_indices = adjacent_enclosure_indices.index.unique() # For morphological tessellation, filter by index return enclosed_tess[enclosed_tess.index.isin(enclosure_indices)] return enclosed_tess.copy() def _prepare_tessellation_with_buildings( tess_subset: gpd.GeoDataFrame, buildings_gdf: gpd.GeoDataFrame, keep_buildings: bool, ) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Prepare tessellation data, optionally adding building information.""" if keep_buildings: tess_with_buildings = _add_buildings_to_tessellation(tess_subset, buildings_gdf) # Use the original tessellation for graph operations to avoid column conflicts tess_for_graphs = tess_subset # But use the enhanced tessellation for the final output return tess_with_buildings, tess_for_graphs return tess_subset, tess_subset
[docs] def morphological_graph( buildings_gdf: gpd.GeoDataFrame, segments_gdf: gpd.GeoDataFrame, center_point: gpd.GeoSeries | gpd.GeoDataFrame = None, distance: float | None = None, private_id_col: str = "tess_id", public_id_col: str = "id", public_geom_col: str = "barrier_geometry", contiguity: str = "queen", keep_buildings: bool = False, ) -> dict: """ Create a morphological graph from buildings and road segments. The private spaces are represented as tessellations, and the public spaces are represented as a dual graph of road segments. This function performs a series of operations: 1. Creates tessellations based on buildings and road barrier geometries 2. Optionally filters the network by distance from a specified center point 3. Identifies enclosed tessellations adjacent to the segments 4. Creates three types of connections: - Private to private (between tessellation cells) - Public to public (between road segments) - Private to public (between tessellation cells and road segments) Parameters ---------- buildings_gdf : geopandas.GeoDataFrame GeoDataFrame containing building polygons segments_gdf : geopandas.GeoDataFrame GeoDataFrame containing road segments as LineStrings. Should include a 'barrier_geometry' column or the column specified in public_geom_col. center_point : Union[Point, gpd.GeoSeries, gpd.GeoDataFrame], optional Optional center point for filtering the network by distance. If provided, only segments within the specified distance will be included. distance : float, default=1000 Maximum network distance from center_point to include segments, if center_point is provided. If None, no distance filtering will be applied even if center_point is provided. private_id_col : str, default='tess_id' Column name that uniquely identifies each tessellation cell (private space). public_id_col : str, default='id' Column name that uniquely identifies each road segment (public space). public_geom_col : str, default='barrier_geometry' Column name in segments_gdf containing the processed geometry to use. contiguity : str, default='queen' Type of contiguity for private-to-private connections, either 'queen' or 'rook'. keep_buildings : bool, default=False If True, performs a spatial left join between tessellation and buildings to include building attributes in the tessellation data. Building index columns are dropped. Returns ------- dict Dictionary containing the following elements: - 'nodes': Dictionary with 'private' and 'public' GeoDataFrames - 'edges': Dictionary with edge relationships between node types Notes ----- - If center_point is not provided, all segments will be included. - The barrier_geometry column should contain the processed geometries for road segments. If it doesn't exist, the function will use the column specified in public_geom_col. - The function requires the city2graph package and depends on create_tessellation, filter_graph_by_distance, create_private_to_private, create_public_to_public, and create_private_to_public functions. """ # Prepare barrier geometry for tessellation barrier_gdf, public_geom_col = _prepare_barrier_geometry(segments_gdf, public_geom_col) # Ensure barrier_gdf has the same CRS as buildings_gdf if barrier_gdf.crs != buildings_gdf.crs: barrier_gdf = barrier_gdf.to_crs(buildings_gdf.crs) # Create tessellations based on buildings and road barrier geometries enclosed_tess = create_tessellation(buildings_gdf, primary_barriers=barrier_gdf) # Optionally filter the network by distance from a specified center point if center_point is not None and distance is not None: segments_subset = filter_graph_by_distance( segments_gdf, center_point, distance=distance, ) else: segments_subset = segments_gdf.copy() # Filter tessellation to only include cells adjacent to segments tess_subset = _filter_tessellation_by_adjacency(enclosed_tess, segments_subset) # Prepare tessellation data, optionally adding building information tess_final, tess_for_graphs = _prepare_tessellation_with_buildings( tess_subset, buildings_gdf, keep_buildings, ) # Create connections between adjacent private spaces (tessellation cells) private_to_private = private_to_private_graph( tess_for_graphs, private_id_col=private_id_col, group_col="enclosure_index", contiguity=contiguity, ) # Create connections between street segments public_to_public = public_to_public_graph( segments_subset, public_id_col=public_id_col, ) # Create connections between private spaces and public spaces private_to_public = private_to_public_graph( tess_for_graphs, segments_subset, private_id_col=private_id_col, public_id_col=public_id_col, public_geom_col=public_geom_col, ) # Return structure compatible with pyg_to_gdf expectations return { "nodes": { "private": tess_final, "public": segments_subset, }, "edges": { ("private", "touched_to", "private"): private_to_private, ("public", "connected_to", "public"): public_to_public, ("private", "faced_to", "public"): private_to_public, }, }
# =============================================================================== # BUILDING INTEGRATION FUNCTIONS # =============================================================================== def _add_buildings_to_tessellation( tessellation_gdf: gpd.GeoDataFrame, buildings_gdf: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """ Add building information to tessellation cells using spatial left join. Parameters ---------- tessellation_gdf : gpd.GeoDataFrame Tessellation cells to add building information to buildings_gdf : gpd.GeoDataFrame Buildings to spatially join with tessellation cells Returns ------- gpd.GeoDataFrame Tessellation with building information joined, including building_geometry column """ if buildings_gdf.empty: return tessellation_gdf.copy() # Perform spatial left join with buildings joined = gpd.sjoin( tessellation_gdf, buildings_gdf, how="left", predicate="intersects", ) # Add building_geometry column as a separate GeoSeries if "index_right" in joined.columns: # Create mapping from building index to geometry building_geom_map = buildings_gdf["geometry"].to_dict() # Map building geometries to tessellation cells joined["building_geometry"] = joined["index_right"].map(building_geom_map) # Convert to GeoSeries with proper CRS joined["building_geometry"] = gpd.GeoSeries( joined["building_geometry"], crs=buildings_gdf.crs, ) # Remove the index_right column to prevent conflicts joined = joined.drop(columns=["index_right"]) return joined