src/mine_sim/routing.py

โ† Back to submission ยท View raw on GitHub

"""Static shortest-time routing and the dispatch rule.

Per design memory:
- Routes are computed once per scenario via Dijkstra over free-flow times.
- Closures are honoured by topology construction (closed edges are absent).
- Dispatch chooses the loader minimising:
      travel_to_loader_empty + queue_len * mean_load_time + own_load_time
  where ``own_load_time`` = mean_load_time of the chosen loader.
- WASTE / MAINT nodes are excluded from any haulage route.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import networkx as nx

from .data import Truck
from .topology import ScenarioTopology, free_flow_minutes


# Nodes excluded from haulage routing.
EXCLUDED_NODE_TYPES = {"waste_dump", "maintenance"}


@dataclass
class RoutingTables:
    """Pre-computed shortest-time paths and travel times.

    ``paths[(src, dst)]`` is a list of node ids beginning with ``src`` and ending
    with ``dst``. ``edge_ids[(src, dst)]`` is the corresponding ordered list of
    edge ids. Two travel-time maps are pre-computed: empty and loaded.
    """
    edge_time_empty: Dict[str, float]    # edge_id -> minutes (empty)
    edge_time_loaded: Dict[str, float]   # edge_id -> minutes (loaded)
    paths: Dict[Tuple[str, str], List[str]]
    edge_ids: Dict[Tuple[str, str], List[str]]
    travel_min_empty: Dict[Tuple[str, str], float]
    travel_min_loaded: Dict[Tuple[str, str], float]


def build_routing(
    topology: ScenarioTopology,
    empty_speed_factor: float,
    loaded_speed_factor: float,
) -> RoutingTables:
    """Compute Dijkstra shortest-time paths between all interesting node pairs.

    Excludes WASTE and MAINTENANCE nodes from valid intermediates by stripping
    them from the routing graph. Crusher (a dump_point but a routing endpoint)
    is retained.
    """
    g = topology.graph
    routing_graph = g.copy()
    for nid, attrs in topology.nodes_effective.items():
        if attrs.get("node_type") in EXCLUDED_NODE_TYPES:
            if nid in routing_graph:
                routing_graph.remove_node(nid)

    edge_time_empty: Dict[str, float] = {}
    edge_time_loaded: Dict[str, float] = {}
    weight_empty_attr = {}
    weight_loaded_attr = {}

    for u, v, data in routing_graph.edges(data=True):
        eid = data["edge_id"]
        t_empty = free_flow_minutes(data["distance_m"], data["max_speed_kph"], empty_speed_factor)
        t_loaded = free_flow_minutes(data["distance_m"], data["max_speed_kph"], loaded_speed_factor)
        edge_time_empty[eid] = t_empty
        edge_time_loaded[eid] = t_loaded
        weight_empty_attr[(u, v)] = t_empty
        weight_loaded_attr[(u, v)] = t_loaded

    nx.set_edge_attributes(routing_graph, weight_empty_attr, name="t_empty")
    nx.set_edge_attributes(routing_graph, weight_loaded_attr, name="t_loaded")

    interesting_sources = {
        nid for nid, attrs in topology.nodes_effective.items()
        if attrs.get("node_type") not in EXCLUDED_NODE_TYPES and nid in routing_graph
    }

    paths: Dict[Tuple[str, str], List[str]] = {}
    edge_ids: Dict[Tuple[str, str], List[str]] = {}
    travel_empty: Dict[Tuple[str, str], float] = {}
    travel_loaded: Dict[Tuple[str, str], float] = {}

    for src in interesting_sources:
        # Use empty-weight tree for pickup legs (truck arrives empty); loaded for delivery.
        # We compute both โ€” the simulation picks the right map per leg.
        try:
            empty_lengths, empty_paths = nx.single_source_dijkstra(
                routing_graph, src, weight="t_empty"
            )
        except nx.NetworkXNoPath:
            continue
        try:
            loaded_lengths, loaded_paths = nx.single_source_dijkstra(
                routing_graph, src, weight="t_loaded"
            )
        except nx.NetworkXNoPath:
            continue
        for dst, length in empty_lengths.items():
            if dst == src:
                continue
            paths[(src, dst)] = empty_paths[dst]
            edge_ids[(src, dst)] = _node_path_to_edge_ids(empty_paths[dst], routing_graph)
            travel_empty[(src, dst)] = length
        for dst, length in loaded_lengths.items():
            if dst == src:
                continue
            travel_loaded[(src, dst)] = length

    return RoutingTables(
        edge_time_empty=edge_time_empty,
        edge_time_loaded=edge_time_loaded,
        paths=paths,
        edge_ids=edge_ids,
        travel_min_empty=travel_empty,
        travel_min_loaded=travel_loaded,
    )


def _node_path_to_edge_ids(node_path: List[str], graph: nx.DiGraph) -> List[str]:
    """Map a node-id path to the ordered list of edge ids it traverses."""
    out: List[str] = []
    for u, v in zip(node_path, node_path[1:]):
        out.append(graph[u][v]["edge_id"])
    return out


def choose_loader(
    *,
    current_node: str,
    candidate_loader_ids: List[str],
    loaders_effective: Dict[str, Dict],
    routing: RoutingTables,
    queue_lengths: Dict[str, int],
) -> str:
    """Apply the dispatch rule and return the loader id with the lowest expected cost.

    cost(L) = travel_empty(current_node -> loader_node)
              + queue_len(L) * mean_load(L)
              + own_load(L)
            = travel_empty + (queue_len + 1) * mean_load(L)
    """
    best_id: Optional[str] = None
    best_cost = float("inf")
    for lid in candidate_loader_ids:
        attrs = loaders_effective[lid]
        loader_node = attrs["node_id"]
        # If current_node IS the loader node (e.g., truck just dumped at crusher
        # then re-dispatched to same), travel = 0.
        if current_node == loader_node:
            travel = 0.0
        else:
            travel = routing.travel_min_empty.get((current_node, loader_node), float("inf"))
        if travel == float("inf"):
            continue
        mean_load = float(attrs["mean_load_time_min"])
        ql = int(queue_lengths.get(lid, 0))
        cost = travel + (ql + 1) * mean_load
        if cost < best_cost:
            best_cost = cost
            best_id = lid
    if best_id is None:
        raise RuntimeError(
            f"No reachable loader from {current_node} among candidates {candidate_loader_ids}"
        )
    return best_id