Source code for city2graph.overture

"""Module for loading and processing geospatial data from Overture Maps."""

import json
import logging
import subprocess
from pathlib import Path

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import LineString
from shapely.geometry import MultiLineString
from shapely.geometry import Point
from shapely.geometry import Polygon
from shapely.geometry.base import BaseGeometry

# Define the public API for this module
__all__ = [
    "load_overture_data",
    "process_overture_segments",
]

# Valid Overture Maps data types
VALID_OVERTURE_TYPES: set[str] = {
    "address",
    "bathymetry",
    "building",
    "building_part",
    "division",
    "division_area",
    "division_boundary",
    "place",
    "segment",
    "connector",
    "infrastructure",
    "land",
    "land_cover",
    "land_use",
    "water",
}

logger = logging.getLogger(__name__)


def _validate_overture_types(types: list[str] | None) -> list[str]:
    """Validate and return overture data types."""
    if types is None:
        return list(VALID_OVERTURE_TYPES)

    invalid_types = [t for t in types if t not in VALID_OVERTURE_TYPES]
    if invalid_types:
        msg = (
            f"Invalid Overture Maps data type(s): {invalid_types}. "
            f"Valid types are: {sorted(VALID_OVERTURE_TYPES)}"
        )
        raise ValueError(msg)
    return types


def _prepare_polygon_area(area: Polygon) -> tuple[list[float], Polygon | None]:
    """Transform polygon to WGS84 and extract bounding box."""
    wgs84_crs = "EPSG:4326"
    original_polygon = area

    if hasattr(area, "crs") and area.crs and area.crs != wgs84_crs:
        # Reproject polygon to WGS84
        original_polygon = area.to_crs(wgs84_crs)
        logger.info("Transformed polygon from %s to WGS84 (EPSG:4326)", area.crs)

    # Extract and round bounding box coordinates
    minx, miny, maxx, maxy = original_polygon.bounds
    bbox = [round(minx, 10), round(miny, 10), round(maxx, 10), round(maxy, 10)]
    return bbox, original_polygon


def _read_overture_data(
    output_path: str, process: subprocess.CompletedProcess, save_to_file: bool, data_type: str,
) -> gpd.GeoDataFrame:
    """Read data from file or stdout and return GeoDataFrame."""
    WGS84_CRS = "EPSG:4326"

    if save_to_file:
        if Path(output_path).exists() and Path(output_path).stat().st_size > 0:
            return gpd.read_file(output_path)
        logger.warning("No data returned for %s", data_type)

    if process.stdout and process.stdout.strip():
        try:
            return gpd.read_file(process.stdout)
        except (ValueError, TypeError, KeyError, UnicodeDecodeError) as e:
            logger.warning("Could not parse GeoJSON for %s: %s", data_type, e)

    return gpd.GeoDataFrame(geometry=[], crs=WGS84_CRS)


def _clip_to_polygon(gdf: gpd.GeoDataFrame, polygon: Polygon, data_type: str) -> gpd.GeoDataFrame:
    """Clip GeoDataFrame to polygon boundaries."""
    WGS84_CRS = "EPSG:4326"

    if polygon is None or gdf.empty:
        return gdf

    mask = gpd.GeoDataFrame(geometry=[polygon], crs=WGS84_CRS)
    if gdf.crs != mask.crs:
        mask = mask.to_crs(gdf.crs)

    try:
        return gpd.clip(gdf, mask)
    except (ValueError, AttributeError, RuntimeError) as e:
        logger.warning("Error clipping %s to polygon: %s", data_type, e)
        return gdf


def _process_single_overture_type(
    data_type: str,
    bbox_str: str,
    output_dir: str,
    prefix: str,
    save_to_file: bool,
    return_data: bool,
    original_polygon: Polygon | None,
) -> gpd.GeoDataFrame | None:
    """Process a single overture data type."""
    WGS84_CRS = "EPSG:4326"

    def _raise_invalid_data_type(data_type: str) -> None:
        """Raise ValueError for invalid data type."""
        msg = f"Invalid data type: {data_type}"
        raise ValueError(msg)

    def _raise_invalid_bbox_format(error_msg: str = "Invalid bbox format") -> None:
        """Raise ValueError for invalid bbox format."""
        raise ValueError(error_msg)

    # Validate data_type against known safe values to prevent injection
    if data_type not in VALID_OVERTURE_TYPES:
        _raise_invalid_data_type(data_type)

    # Validate and sanitize bbox_str to prevent injection
    try:
        bbox_parts = bbox_str.split(",")
        if len(bbox_parts) != 4:
            _raise_invalid_bbox_format()
        # Validate that all parts are valid floats
        validated_bbox = [float(part.strip()) for part in bbox_parts]
        safe_bbox_str = ",".join(map(str, validated_bbox))
    except (ValueError, TypeError) as e:
        msg = f"Invalid bbox format: {e}"
        raise ValueError(msg) from e

    # Validate output directory and prefix to prevent path traversal
    safe_output_dir = Path(output_dir).resolve()
    safe_prefix = Path(prefix).name if prefix else ""

    output_filename = f"{safe_prefix}{data_type}.geojson" if safe_prefix else f"{data_type}.geojson"
    output_path = Path(safe_output_dir) / output_filename

    cmd_parts = [
        "overturemaps", "download", f"--bbox={safe_bbox_str}",
        "-f", "geojson", f"--type={data_type}",
    ]

    if save_to_file:
        cmd_parts.extend(["-o", str(output_path)])

    try:
        process = subprocess.run(
            cmd_parts,
            check=True,
            stdout=subprocess.PIPE if not save_to_file else None,
            text=True,
        )

        if not return_data:
            return None

        gdf = _read_overture_data(output_path, process, save_to_file, data_type)
        gdf = _clip_to_polygon(gdf, original_polygon, data_type)

        if gdf.empty and "geometry" not in gdf:
            gdf = gpd.GeoDataFrame(geometry=[], crs=gdf.crs or WGS84_CRS)

        # Successfully processed data type
        if not gdf.empty:
            logger.warning("Successfully processed %s", data_type)

    except (OSError, ValueError, TypeError, KeyError, AttributeError) as e:
        logger.warning("Error processing %s data: %s", data_type, e)
        return gpd.GeoDataFrame(geometry=[], crs=WGS84_CRS) if return_data else None
    except subprocess.CalledProcessError as e:
        logger.warning("Error downloading %s: %s", data_type, e)
        return gpd.GeoDataFrame(geometry=[], crs=WGS84_CRS) if return_data else None
    else:
        return gdf


[docs] def load_overture_data( area: list[float] | Polygon, types: list[str] | None = None, output_dir: str = ".", prefix: str = "", save_to_file: bool = True, return_data: bool = True, ) -> dict[str, gpd.GeoDataFrame]: """ Load data from Overture Maps using the CLI tool and optionally save to GeoJSON files. Can accept either a bounding box or a Polygon as the area parameter. Parameters ---------- area : Union[List[float], Polygon] Either a bounding box as [min_lon, min_lat, max_lon, max_lat] in WGS84 coordinates or a Polygon in WGS84 coordinates (EPSG:4326). If provided in another CRS, it will be automatically transformed to WGS84. If a Polygon is provided, its bounding box will be used for the query and the results will be clipped to the Polygon boundaries. types : Optional[List[str]], default=None Types of data to download. If None, downloads all available types. Must be valid Overture Maps data types: address, bathymetry, building, building_part, division, division_area, division_boundary, place, segment, connector, infrastructure, land, land_cover, land_use, water. output_dir : str, default="." Directory to save the GeoJSON files prefix : str, default="" Prefix to add to the output filenames save_to_file : bool, default=True Whether to save the data to GeoJSON files return_data : bool, default=True Whether to return the data as GeoDataFrames Returns ------- Dict[str, gpd.GeoDataFrame] Dictionary mapping types to GeoDataFrames if return_data is True, otherwise an empty dict Raises ------ ValueError If any of the provided types are not valid Overture Maps data types Notes ----- The Overture Maps API requires coordinates in WGS84 (EPSG:4326) format. For more information, see https://docs.overturemaps.org/ """ types = _validate_overture_types(types) if save_to_file and not Path(output_dir).exists(): Path(output_dir).mkdir(parents=True) if isinstance(area, Polygon): bbox, original_polygon = _prepare_polygon_area(area) else: bbox, original_polygon = area, None bbox_str = ",".join(map(str, bbox)) result = {} for data_type in types: gdf = _process_single_overture_type( data_type, bbox_str, output_dir, prefix, save_to_file, return_data, original_polygon, ) if return_data: result[data_type] = gdf return result
def _extract_line_segment( line: LineString, start_point: Point, end_point: Point, start_dist: float, end_dist: float, ) -> LineString | None: """ Create a LineString segment between two points on a line. Parameters ---------- line : LineString Original line start_point : Point Starting point on the line end_point : Point Ending point on the line start_dist : float Distance of start_point from the start of line end_dist : float Distance of end_point from the start of line Returns ------- Optional[LineString] The extracted line segment """ coords = list(line.coords) new_coords = [] # Add the start point new_coords.append((start_point.x, start_point.y)) # Find all intermediate vertices current_dist = 0 for i in range(len(coords) - 1): p1, p2 = coords[i], coords[i + 1] seg = LineString([p1, p2]) seg_length = seg.length next_dist = current_dist + seg_length # If this segment is after our start point and before our end point if next_dist > start_dist and current_dist < end_dist: # If this vertex is after start but before end, include it if current_dist >= start_dist: new_coords.append(p1) # If next vertex is after end, add the endpoint and break if next_dist >= end_dist: new_coords.append((end_point.x, end_point.y)) break current_dist = next_dist # If we have at least two points, create a LineString if len(new_coords) >= 2: return LineString(new_coords) if len(new_coords) == 1: # Edge case: create a very short line p = new_coords[0] return LineString([(p[0], p[1]), (p[0] + 1e-9, p[1] + 1e-9)]) return None def _get_substring( line: LineString, start_pct: float, end_pct: float, ) -> LineString | None: """ Extract substring of a line between start_pct and end_pct. Parameters ---------- line : LineString The input line start_pct : float Start percentage (0-1) end_pct : float End percentage (0-1) Returns ------- Optional[LineString] The substring or None if invalid """ # Validate input parameters if (not isinstance(line, LineString) or start_pct < 0 or end_pct > 1 or start_pct >= end_pct): return None # For full line or nearly full line, return the original if abs(start_pct) < 1e-9 and abs(end_pct - 1) < 1e-9: return line # Calculate distances along the line total_length = line.length start_dist = start_pct * total_length end_dist = end_pct * total_length if abs(end_dist - start_dist) < 1e-9: return None try: # Get points at the specified distances start_point = line.interpolate(start_dist) end_point = line.interpolate(end_dist) # Handle case where start and end are at endpoints if start_dist <= 1e-9 and end_dist >= total_length - 1e-9: return line return _extract_line_segment(line, start_point, end_point, start_dist, end_dist) except (ValueError, AttributeError, TypeError) as e: logger.warning("Error creating line substring: %s", e) return None def _identify_barrier_mask(level_rules: str) -> list: """ Compute non-barrier intervals (barrier mask) from level_rules JSON. Only rules with "value" equal to 0 are considered as barriers. If any such rule has "between" equal to null, then the entire interval [0, 1] is treated as non-barrier. Parameters ---------- level_rules : str JSON string containing level rules with "value" and "between" fields. Example: '[{"value": 0, "between": [0.177, 0.836]}]' Returns ------- list List of non-barrier intervals as [start, end] pairs. Each interval represents a continuous non-barrier section. Examples -------- >>> level_rules = '[{"value": 0, "between": [0.177, 0.836]}, {"value": 0, "between": [0.957, 0.959]}]' >>> _identify_barrier_mask(level_rules) [[0.0, 0.177], [0.836, 0.957], [0.959, 1.0]] Notes ----- If any rule for which "value" equals 0 has "between" as null, then the function returns [[0.0, 1.0]]. The barrier intervals are extracted from rules where "value" != 0, and the returned intervals represent the complement (non-barrier sections). """ if not isinstance(level_rules, str) or level_rules.strip().lower() in ( "", "none", "null", ): return [[0.0, 1.0]] # Normalize Python None to JSON null for proper JSON parsing s = level_rules.replace("'", '"').replace("None", "null") try: rules = json.loads(s) except (json.JSONDecodeError, ValueError, TypeError) as e: logger.warning("JSON parse failed for level_rules: %s", e) return [[0.0, 1.0]] if not isinstance(rules, list): rules = [rules] barrier_intervals = [] for rule in rules: if isinstance(rule, dict) and rule.get("value") is not None and rule.get("value") != 0: between = rule.get("between") if between is None: return [] if isinstance(between, list) and len(between) == 2: barrier_intervals.append((float(between[0]), float(between[1]))) if not barrier_intervals: return [[0.0, 1.0]] barrier_intervals.sort(key=lambda x: x[0]) result = [] current = 0.0 for start, end in barrier_intervals: if start > current: result.append([current, start]) current = max(current, end) if current < 1.0: result.append([current, 1.0]) return result def _extract_barriers_from_mask(line: LineString, mask: list) -> BaseGeometry | None: """ Extract barrier parts from the line using the provided barrier mask. The mask is expected to be a list of [start, end] intervals. """ parts = [] for interval in mask: seg = _get_substring(line, interval[0], interval[1]) if seg and not seg.is_empty: parts.append(seg) if not parts: return None if len(parts) == 1: return parts[0] return MultiLineString(parts) def _get_barrier_geometry(row: pd.Series) -> BaseGeometry | None: if "barrier_mask" not in row: msg = "Column 'barrier_mask' not found in input row" raise KeyError(msg) barrier_mask = row["barrier_mask"] if barrier_mask is None: return None if barrier_mask == [[0.0, 1.0]]: return row.geometry try: geom = row.geometry if isinstance(geom, MultiLineString): parts = [] for part in geom.geoms: clipped = _extract_barriers_from_mask(part, barrier_mask) if clipped: parts.extend( clipped.geoms if isinstance(clipped, MultiLineString) else [clipped], ) return ( None if not parts else parts[0] if len(parts) == 1 else MultiLineString(parts) ) return _extract_barriers_from_mask(geom, barrier_mask) except (ValueError, AttributeError, TypeError): return None def _identify_connector_mask(connectors_info: str) -> list: """ Parse connectors_info and return a connector mask list. Parameters ---------- connectors_info : str JSON string containing connector information with "at" fields. Example: '[{"connector_id": "123", "at": 0.5}]' Returns ------- list List of floats starting with 0.0 and ending with 1.0. If connectors_info is empty or invalid, returns [0.0, 1.0]. Examples -------- >>> connectors_info = '[{"connector_id": "123", "at": 0.3}, {"connector_id": "456", "at": 0.7}]' >>> _identify_connector_mask(connectors_info) [0.0, 0.3, 0.7, 1.0] """ if not connectors_info or not str(connectors_info).strip(): return [0.0, 1.0] try: parsed = json.loads(connectors_info.replace("'", '"')) if isinstance(parsed, dict): connectors_list = [parsed] elif isinstance(parsed, list): connectors_list = parsed else: return [0.0, 1.0] valid_ps = [] for item in connectors_list: if isinstance(item, dict): at_val = item.get("at") if at_val is not None: valid_ps.append(float(at_val)) valid_ps.sort() except (json.JSONDecodeError, ValueError, TypeError): return [0.0, 1.0] else: return [0.0, *valid_ps, 1.0] def _recalc_barrier_mask(original_mask: list, sub_start: float, sub_end: float) -> list: """Recalculate barrier_mask for a subsegment defined by [sub_start, sub_end].""" if original_mask == [[0.0, 1.0]] or not original_mask: return original_mask new_mask = [] seg_length = sub_end - sub_start for interval in original_mask: inter_start = max(interval[0], sub_start) inter_end = min(interval[1], sub_end) if inter_start < inter_end: new_mask.append( [ (inter_start - sub_start) / seg_length, (inter_end - sub_start) / seg_length, ], ) return new_mask def _parse_connectors_info(connectors_info: str | None) -> list[dict]: """Parse and validate connectors info from row data.""" if not connectors_info or not str(connectors_info).strip(): return [] try: parsed = json.loads(str(connectors_info).replace("'", '"')) if isinstance(parsed, dict): return [parsed] if isinstance(parsed, list): return parsed return [] except (json.JSONDecodeError, ValueError, TypeError): return [] def _extract_valid_connectors(connectors_list: list[dict], valid_ids: set) -> list[float]: """Extract valid connector positions from connector list.""" valid_connectors = set() for item in connectors_list: if not isinstance(item, dict): continue connector_id = item.get("connector_id") at_value = item.get("at") if connector_id is None or at_value is None or connector_id not in valid_ids: continue valid_connectors.add(float(at_value)) return sorted(valid_connectors) def _create_connector_mask(valid_connectors: list[float]) -> list[float]: """Create connector mask from valid connector positions.""" mask = [] if not valid_connectors or valid_connectors[0] != 0.0: mask.append(0.0) mask.extend(valid_connectors) if not mask or mask[-1] != 1.0: mask.append(1.0) return mask def _create_split_row(row: pd.Series, part: LineString, start_pct: float, end_pct: float, mask: list[float], barrier_mask: list, original_id: str | int, counter: int) -> pd.Series: """Create a new row for a split segment part.""" new_row = row.copy() new_row.geometry = part new_row["split_from"] = start_pct new_row["split_to"] = end_pct new_row["connector_mask"] = mask new_row["barrier_mask"] = _recalc_barrier_mask(barrier_mask, start_pct, end_pct) new_row["id"] = f"{original_id}_{counter}" return new_row def _process_segment(row: pd.Series, valid_ids: set) -> list[pd.Series]: """ Process a single segment row for splitting by connectors. Parameters ---------- row : pd.Series A row from the segments GeoDataFrame valid_ids : set Set of valid connector IDs Returns ------- list[pd.Series] List of new rows created from splitting the segment """ geom = row.geometry connectors_info = row.get("connectors") # Parse connectors info connectors_list = _parse_connectors_info(connectors_info) if not connectors_list: return [row] # Extract valid connectors valid_connectors = _extract_valid_connectors(connectors_list, valid_ids) if not valid_connectors: return [row] # Create connector mask mask = _create_connector_mask(valid_connectors) # Generate split geometries split_rows = [] start_pct = 0.0 counter = 1 original_id = row.get("id", row.name) barrier_mask = row["barrier_mask"] # Process each connector split for at in valid_connectors: part = _get_substring(geom, start_pct, at) if part is not None and not part.is_empty: split_row = _create_split_row(row, part, start_pct, at, mask, barrier_mask, original_id, counter) split_rows.append(split_row) counter += 1 start_pct = at # Process the last segment part = _get_substring(geom, start_pct, 1.0) if part is not None and not part.is_empty: split_row = _create_split_row(row, part, start_pct, 1.0, mask, barrier_mask, original_id, counter) split_rows.append(split_row) return split_rows def _split_segments_by_connectors( segments_gdf: gpd.GeoDataFrame, connectors_gdf: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """ Split segments at connector points and update barrier masks accordingly. Optimized for performance with batch processing. Parameters ---------- segments_gdf : gpd.GeoDataFrame GeoDataFrame containing segments to be split connectors_gdf : gpd.GeoDataFrame GeoDataFrame containing connector points Returns ------- gpd.GeoDataFrame New GeoDataFrame with split segments """ # Precompute valid connector ids for a fast membership check valid_ids = set(connectors_gdf["id"]) # Pre-process connectors_info and level_rules for all rows at once if "connectors" in segments_gdf.columns: conn_series = segments_gdf["connectors"].astype(str) else: conn_series = pd.Series([""] * len(segments_gdf), index=segments_gdf.index) segments_gdf["connector_mask"] = conn_series.apply(_identify_connector_mask) if "level_rules" in segments_gdf.columns: lvl_series = segments_gdf["level_rules"].astype(str) else: lvl_series = pd.Series([""] * len(segments_gdf), index=segments_gdf.index) segments_gdf["barrier_mask"] = lvl_series.apply(_identify_barrier_mask) # Prepare data structures new_rows_data = [] # Process segments in batches to reduce memory pressure batch_size = 1000 for i in range(0, len(segments_gdf), batch_size): batch = segments_gdf.iloc[i : i + batch_size] batch_results = batch.apply( lambda row: _process_segment(row, valid_ids), axis=1, ) for rows in batch_results: new_rows_data.extend(rows) # Create a new GeoDataFrame from all processed rows, include split columns result_gdf = gpd.GeoDataFrame(new_rows_data, crs=segments_gdf.crs) # Reset the index of the resulting GeoDataFrame return result_gdf.reset_index(drop=True) def _rebuild_geometry( seg_id: str | int, geom: LineString, pivot_df: pd.DataFrame) -> list[tuple[float, float]]: """ Rebuild the geometry of a segment by replacing its endpoints with quantized centroids. Parameters ---------- seg_id : Any Identifier for the segment in the pivot_df geom : LineString Original geometry of the segment pivot_df : pd.DataFrame DataFrame containing quantized centroid coordinates for endpoints Returns ------- List[Tuple[float, float]] List of coordinate tuples for the rebuilt geometry """ start = ( pivot_df.loc[seg_id, ("x_centroid", "start")], pivot_df.loc[seg_id, ("y_centroid", "start")], ) end = ( pivot_df.loc[seg_id, ("x_centroid", "end")], pivot_df.loc[seg_id, ("y_centroid", "end")], ) coords = list(geom.coords) return [start] + coords[1:-1] + [end] if len(coords) > 2 else [start, end] def _adjust_segment_connectors( segments_gdf: gpd.GeoDataFrame, threshold: float, ) -> gpd.GeoDataFrame: """ Adjust segment connector endpoints by clustering endpoints within a threshold distance. This function identifies endpoints that are within a threshold distance of each other and replaces them with their cluster's centroid, creating more precise connections between LineString segments. Parameters ---------- segments_gdf : gpd.GeoDataFrame GeoDataFrame containing segment geometries (LineStrings) threshold : float Distance threshold for clustering endpoints. Endpoints whose coordinates quantize to the same bin (based on this threshold) will be merged. Returns ------- gpd.GeoDataFrame GeoDataFrame with adjusted LineString geometries where endpoints that were within the threshold have been merged to a common point Notes ----- The function works by: 1. Extracting start and end points from all LineStrings 2. Quantizing coordinates to bins based on the threshold 3. Computing the centroid for each bin 4. Rebuilding LineStrings with the new endpoint coordinates Only LineString geometries are processed; other geometry types are left unchanged. """ # Filter to only process LineString geometries mask = segments_gdf.geometry.type == "LineString" if not mask.any(): return segments_gdf valid = segments_gdf.loc[mask].copy() valid["seg_id"] = valid.index # Extract start and end points from all LineStrings starts = [(geom.coords[0][0], geom.coords[0][1]) for geom in valid.geometry] ends = [(geom.coords[-1][0], geom.coords[-1][1]) for geom in valid.geometry] # Create DataFrame with all endpoints for easier processing endpoints_df = pd.DataFrame( { "seg_id": list(valid["seg_id"]) * 2, "pos": ["start"] * len(valid) + ["end"] * len(valid), "x": [pt[0] for pt in starts] + [pt[0] for pt in ends], "y": [pt[1] for pt in starts] + [pt[1] for pt in ends], }, ) # Quantize coordinates to bins based on threshold endpoints_df["bin_x"] = np.rint(endpoints_df["x"] / threshold).astype(int) endpoints_df["bin_y"] = np.rint(endpoints_df["y"] / threshold).astype(int) endpoints_df["bin"] = list(zip(endpoints_df["bin_x"], endpoints_df["bin_y"], strict=False)) # Calculate centroids for each bin centroids = ( endpoints_df.groupby("bin")[["x", "y"]] .mean() .rename(columns={"x": "x_centroid", "y": "y_centroid"}) ) endpoints_df = endpoints_df.join(centroids, on="bin") # Pivot the dataframe to get centroid coordinates by segment and position pivot_df = endpoints_df.pivot_table( index="seg_id", columns="pos", values=["x_centroid", "y_centroid"], ) # Rebuild geometries using the centroid coordinates valid["geometry"] = valid.apply( lambda row: LineString( _rebuild_geometry(row["seg_id"], row.geometry, pivot_df), ), axis=1, ) # Update the original GeoDataFrame with the new geometries segments_gdf.update(valid) return segments_gdf
[docs] def process_overture_segments( segments_gdf: gpd.GeoDataFrame, get_barriers: bool = True, connectors_gdf: gpd.GeoDataFrame | None = None, threshold: float = 1.0, ) -> gpd.GeoDataFrame: """ Process segments from Overture Maps to be split by connectors and extract barriers. Parameters ---------- segments_gdf : gpd.GeoDataFrame Input segments with 'subtype' and 'level_rules'. get_barriers : bool If True, add 'barrier_geometry' column to output. connectors_gdf : Optional[gpd.GeoDataFrame] Connectors for splitting; if None connectors step is skipped. threshold : float Distance threshold for adjusting connectors. Returns ------- gpd.GeoDataFrame Processed road segments, including 'length' and optional 'barrier_geometry'. """ if get_barriers: segments_gdf["barrier_mask"] = segments_gdf["level_rules"].apply(_identify_barrier_mask) if connectors_gdf is not None: segments_gdf = _split_segments_by_connectors(segments_gdf, connectors_gdf) segments_gdf = _adjust_segment_connectors(segments_gdf, threshold=threshold) segments_gdf["length"] = segments_gdf.geometry.length if get_barriers: barrier_geoms = segments_gdf.apply(_get_barrier_geometry, axis=1) segments_gdf["barrier_geometry"] = gpd.GeoSeries(barrier_geoms, crs=segments_gdf.crs) return segments_gdf