mine_sim.py

← Back to submission · View raw on GitHub

"""Discrete-event simulation of a synthetic open-pit mine haulage system.

This module is the *model layer*. It is responsible for:

* loading the topology / equipment data (``data/*.csv``);
* resolving a scenario configuration (``data/scenarios/*.yaml``) including the
  ``inherits:`` chain and any ``*_overrides`` blocks;
* building a directed road graph and computing shortest-*time* routes;
* running a single SimPy replication of an 8-hour shift and returning a rich
  metrics dictionary plus (optionally) a detailed event trace.

The orchestration layer (scenarios x replications, aggregation, file writing)
lives in ``run_experiment.py`` so that the model stays free of I/O concerns.

Design summary
--------------
* Trucks are active SimPy processes running a load -> haul -> dump -> return
  cycle until the shift clock expires.
* Loaders, the crusher dump and capacity-constrained road segments are SimPy
  ``Resource`` objects (capacity taken from the data).
* Throughput is recorded only on *completed* dump events at the crusher.
* All randomness is drawn from a single ``numpy`` generator seeded per
  replication so runs are reproducible; the same seed index is reused across
  scenarios to give common random numbers for paired comparisons.

See ``conceptual_model.md`` for the full conceptual model.
"""

from __future__ import annotations

import csv
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable

import networkx as nx
import numpy as np
import simpy
import yaml

# A road segment whose ``capacity`` is at least this value is treated as
# effectively unconstrained (no SimPy resource is created for it). The data
# uses 999 as the "wide road" sentinel; the ramp upgrade scenario raises the
# ramp capacity to 999 which, with this rule, removes the ramp constraint.
UNCONSTRAINED_CAPACITY = 999


class RouteError(RuntimeError):
    """Raised when a required origin/destination pair has no feasible route.

    Per the task brief, an impossible route must fail clearly rather than
    silently producing misleading throughput numbers.
    """


# --------------------------------------------------------------------------- #
# Data records
# --------------------------------------------------------------------------- #
@dataclass
class Node:
    node_id: str
    node_name: str
    node_type: str
    x: float
    y: float
    z: float
    service_mean: float | None = None
    service_sd: float | None = None


@dataclass
class Edge:
    edge_id: str
    from_node: str
    to_node: str
    distance_m: float
    max_speed_kph: float
    road_type: str
    capacity: float
    closed: bool = False

    def base_time_min(self, loaded: bool, empty_factor: float, loaded_factor: float) -> float:
        """Deterministic traversal time (minutes) before travel noise."""
        factor = loaded_factor if loaded else empty_factor
        speed = self.max_speed_kph * factor
        return (self.distance_m / 1000.0) / speed * 60.0

    @property
    def constrained(self) -> bool:
        return self.capacity < UNCONSTRAINED_CAPACITY


@dataclass
class Loader:
    loader_id: str
    node_id: str
    capacity: int
    bucket_capacity_tonnes: float
    mean_load_min: float
    sd_load_min: float
    availability: float


@dataclass
class Dump:
    dump_id: str
    node_id: str
    dump_type: str
    capacity: int
    mean_dump_min: float
    sd_dump_min: float


@dataclass
class Truck:
    truck_id: str
    payload_tonnes: float
    empty_speed_factor: float
    loaded_speed_factor: float
    availability: float
    start_node: str


@dataclass
class ScenarioConfig:
    scenario_id: str
    description: str
    shift_length_hours: float
    replications: int
    base_random_seed: int
    warmup_minutes: float
    routing: dict[str, Any]
    production: dict[str, Any]
    dispatching: dict[str, Any]
    stochasticity: dict[str, Any]
    truck_count: int
    edge_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)
    node_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)
    dump_point_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)
    loader_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)


# --------------------------------------------------------------------------- #
# Data loading
# --------------------------------------------------------------------------- #
def _read_csv(path: Path) -> list[dict[str, str]]:
    with path.open(newline="", encoding="utf-8") as fh:
        return list(csv.DictReader(fh))


def _to_float(value: str | None, default: float | None = None) -> float | None:
    if value is None or str(value).strip() == "":
        return default
    return float(value)


def load_nodes(data_dir: Path) -> dict[str, Node]:
    nodes: dict[str, Node] = {}
    for row in _read_csv(data_dir / "nodes.csv"):
        nodes[row["node_id"]] = Node(
            node_id=row["node_id"],
            node_name=row["node_name"],
            node_type=row["node_type"],
            x=_to_float(row["x_m"], 0.0),
            y=_to_float(row["y_m"], 0.0),
            z=_to_float(row["z_m"], 0.0),
            service_mean=_to_float(row.get("service_time_mean_min")),
            service_sd=_to_float(row.get("service_time_sd_min")),
        )
    return nodes


def load_edges(data_dir: Path) -> dict[str, Edge]:
    edges: dict[str, Edge] = {}
    for row in _read_csv(data_dir / "edges.csv"):
        edges[row["edge_id"]] = Edge(
            edge_id=row["edge_id"],
            from_node=row["from_node"],
            to_node=row["to_node"],
            distance_m=_to_float(row["distance_m"], 0.0),
            max_speed_kph=_to_float(row["max_speed_kph"], 1.0),
            road_type=row["road_type"],
            capacity=_to_float(row["capacity"], UNCONSTRAINED_CAPACITY),
            closed=str(row.get("closed", "false")).strip().lower() == "true",
        )
    return edges


def load_loaders(data_dir: Path) -> dict[str, Loader]:
    loaders: dict[str, Loader] = {}
    for row in _read_csv(data_dir / "loaders.csv"):
        loaders[row["loader_id"]] = Loader(
            loader_id=row["loader_id"],
            node_id=row["node_id"],
            capacity=int(_to_float(row["capacity"], 1)),
            bucket_capacity_tonnes=_to_float(row["bucket_capacity_tonnes"], 100.0),
            mean_load_min=_to_float(row["mean_load_time_min"], 5.0),
            sd_load_min=_to_float(row["sd_load_time_min"], 0.0),
            availability=_to_float(row["availability"], 1.0),
        )
    return loaders


def load_dumps(data_dir: Path) -> dict[str, Dump]:
    dumps: dict[str, Dump] = {}
    for row in _read_csv(data_dir / "dump_points.csv"):
        dumps[row["dump_id"]] = Dump(
            dump_id=row["dump_id"],
            node_id=row["node_id"],
            dump_type=row["type"],
            capacity=int(_to_float(row["capacity"], 1)),
            mean_dump_min=_to_float(row["mean_dump_time_min"], 4.0),
            sd_dump_min=_to_float(row["sd_dump_time_min"], 0.0),
        )
    return dumps


def load_trucks(data_dir: Path) -> list[Truck]:
    trucks: list[Truck] = []
    for row in _read_csv(data_dir / "trucks.csv"):
        trucks.append(
            Truck(
                truck_id=row["truck_id"],
                payload_tonnes=_to_float(row["payload_tonnes"], 100.0),
                empty_speed_factor=_to_float(row["empty_speed_factor"], 1.0),
                loaded_speed_factor=_to_float(row["loaded_speed_factor"], 1.0),
                availability=_to_float(row["availability"], 1.0),
                start_node=row["start_node"],
            )
        )
    return trucks


# --------------------------------------------------------------------------- #
# Scenario configuration (inheritance + overrides)
# --------------------------------------------------------------------------- #
def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
    """Recursively merge ``override`` into a copy of ``base``."""
    result = dict(base)
    for key, value in override.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = _deep_merge(result[key], value)
        else:
            result[key] = value
    return result


def load_raw_scenario(scenario_path: Path, base_dir: Path | None = None) -> dict[str, Any]:
    """Load a scenario YAML, resolving its ``inherits:`` chain.

    A parent named by ``inherits`` is searched for first next to the child and
    then in ``base_dir`` (the canonical ``data/scenarios`` directory), so that
    an agent-proposed scenario stored elsewhere can still inherit ``baseline``.
    """
    raw = yaml.safe_load(scenario_path.read_text(encoding="utf-8")) or {}
    parent_name = raw.get("inherits")
    if parent_name:
        search = [scenario_path.parent / f"{parent_name}.yaml"]
        if base_dir is not None:
            search.append(base_dir / f"{parent_name}.yaml")
        parent_path = next((p for p in search if p.exists()), None)
        if parent_path is None:
            raise FileNotFoundError(
                f"Scenario '{scenario_path.name}' inherits '{parent_name}' "
                f"but no {parent_name}.yaml was found in {[str(p) for p in search]}."
            )
        parent = load_raw_scenario(parent_path, base_dir)
        raw = _deep_merge(parent, {k: v for k, v in raw.items() if k != "inherits"})
    return raw


def build_scenario_config(scenario_path: Path, base_dir: Path | None = None) -> ScenarioConfig:
    raw = load_raw_scenario(scenario_path, base_dir)
    sim = raw.get("simulation", {})
    return ScenarioConfig(
        scenario_id=raw["scenario_id"],
        description=raw.get("description", ""),
        shift_length_hours=float(sim.get("shift_length_hours", 8)),
        replications=int(sim.get("replications", 30)),
        base_random_seed=int(sim.get("base_random_seed", 12345)),
        warmup_minutes=float(sim.get("warmup_minutes", 0)),
        routing=raw.get("routing", {}),
        production=raw.get("production", {}),
        dispatching=raw.get("dispatching", {}),
        stochasticity=raw.get("stochasticity", {}),
        truck_count=int(raw.get("fleet", {}).get("truck_count", 8)),
        edge_overrides=raw.get("edge_overrides", {}) or {},
        node_overrides=raw.get("node_overrides", {}) or {},
        dump_point_overrides=raw.get("dump_point_overrides", {}) or {},
        loader_overrides=raw.get("loader_overrides", {}) or {},
    )


def apply_overrides(
    config: ScenarioConfig,
    nodes: dict[str, Node],
    edges: dict[str, Edge],
    loaders: dict[str, Loader],
    dumps: dict[str, Dump],
) -> tuple[dict[str, Node], dict[str, Edge], dict[str, Loader], dict[str, Dump]]:
    """Return scenario-specific copies of the world with overrides applied."""
    edges = {eid: Edge(**vars(e)) for eid, e in edges.items()}
    nodes = {nid: Node(**vars(n)) for nid, n in nodes.items()}
    loaders = {lid: Loader(**vars(l)) for lid, l in loaders.items()}
    dumps = {did: Dump(**vars(d)) for did, d in dumps.items()}

    for eid, ov in config.edge_overrides.items():
        if eid not in edges:
            raise KeyError(f"edge_override targets unknown edge '{eid}'")
        e = edges[eid]
        if "capacity" in ov:
            e.capacity = float(ov["capacity"])
        if "max_speed_kph" in ov:
            e.max_speed_kph = float(ov["max_speed_kph"])
        if "closed" in ov:
            e.closed = bool(ov["closed"])
        if "distance_m" in ov:
            e.distance_m = float(ov["distance_m"])

    for nid, ov in config.node_overrides.items():
        if nid not in nodes:
            raise KeyError(f"node_override targets unknown node '{nid}'")
        n = nodes[nid]
        if "service_time_mean_min" in ov:
            n.service_mean = float(ov["service_time_mean_min"])
        if "service_time_sd_min" in ov:
            n.service_sd = float(ov["service_time_sd_min"])

    for did, ov in config.dump_point_overrides.items():
        if did not in dumps:
            raise KeyError(f"dump_point_override targets unknown dump '{did}'")
        d = dumps[did]
        if "mean_dump_time_min" in ov:
            d.mean_dump_min = float(ov["mean_dump_time_min"])
        if "sd_dump_time_min" in ov:
            d.sd_dump_min = float(ov["sd_dump_time_min"])
        if "capacity" in ov:
            d.capacity = int(ov["capacity"])

    for lid, ov in config.loader_overrides.items():
        if lid not in loaders:
            raise KeyError(f"loader_override targets unknown loader '{lid}'")
        l = loaders[lid]
        if "mean_load_time_min" in ov:
            l.mean_load_min = float(ov["mean_load_time_min"])
        if "sd_load_time_min" in ov:
            l.sd_load_min = float(ov["sd_load_time_min"])
        if "capacity" in ov:
            l.capacity = int(ov["capacity"])

    return nodes, edges, loaders, dumps


# --------------------------------------------------------------------------- #
# Routing
# --------------------------------------------------------------------------- #
class Router:
    """Shortest-time routing over the (scenario-specific) road graph.

    Edge weight is the deterministic free-flow traversal time at the edge's
    maximum speed. Because the truck speed factor is a uniform multiplier it
    does not change which path is shortest, so a single graph serves both
    loaded and empty routing; the per-leg *time* is recomputed with the right
    factor when needed.
    """

    def __init__(self, edges: dict[str, Edge]):
        self.edges = edges
        self.graph = nx.DiGraph()
        for e in edges.values():
            if e.closed:
                continue
            self.graph.add_edge(
                e.from_node,
                e.to_node,
                weight=(e.distance_m / 1000.0) / e.max_speed_kph * 60.0,
                edge_id=e.edge_id,
            )
        self._route_cache: dict[tuple[str, str], list[str]] = {}

    def route_edges(self, src: str, dst: str) -> list[str]:
        """Return the list of edge ids on the shortest-time path src -> dst."""
        if src == dst:
            return []
        key = (src, dst)
        if key in self._route_cache:
            return self._route_cache[key]
        try:
            node_path = nx.shortest_path(self.graph, src, dst, weight="weight")
        except (nx.NetworkXNoPath, nx.NodeNotFound) as exc:
            raise RouteError(f"No route from {src} to {dst} (closed roads?).") from exc
        edge_ids = [self.graph[a][b]["edge_id"] for a, b in zip(node_path[:-1], node_path[1:])]
        self._route_cache[key] = edge_ids
        return edge_ids

    def leg_time(self, src: str, dst: str, loaded: bool, empty_factor: float, loaded_factor: float) -> float:
        """Deterministic (noise-free) travel time in minutes for a leg."""
        return sum(
            self.edges[eid].base_time_min(loaded, empty_factor, loaded_factor)
            for eid in self.route_edges(src, dst)
        )


# --------------------------------------------------------------------------- #
# Stochastic helpers
# --------------------------------------------------------------------------- #
def truncated_normal(rng: np.random.Generator, mean: float, sd: float, low: float = 0.0) -> float:
    """Sample a normal truncated below ``low`` (resample, then clamp)."""
    if sd <= 0:
        return max(mean, low + 1e-9)
    for _ in range(16):
        value = rng.normal(mean, sd)
        if value > low:
            return value
    return max(mean, low + 1e-9)


def travel_multiplier(rng: np.random.Generator, cv: float) -> float:
    """Positive multiplicative travel-time noise with mean 1 and given CV."""
    if cv <= 0:
        return 1.0
    sigma = math.sqrt(math.log(1.0 + cv * cv))
    mu = -0.5 * sigma * sigma
    return float(rng.lognormal(mu, sigma))


# --------------------------------------------------------------------------- #
# Replication recorder
# --------------------------------------------------------------------------- #
class Recorder:
    """Accumulates counters during one replication (independent of logging)."""

    def __init__(self, truck_ids: list[str], loader_ids: list[str],
                 dump_id: str, constrained_edge_ids: list[str]):
        self.tonnes = 0.0
        self.dumps = 0
        self.crusher_busy = 0.0
        self.crusher_wait: list[float] = []
        self.loader_busy = {lid: 0.0 for lid in loader_ids}
        self.loader_wait: list[float] = []
        self.loader_wait_by = {lid: [] for lid in loader_ids}
        self.edge_busy = {eid: 0.0 for eid in constrained_edge_ids}
        self.edge_wait = {eid: 0.0 for eid in constrained_edge_ids}
        self.edge_wait_count = {eid: 0 for eid in constrained_edge_ids}
        self.truck_travel = {tid: 0.0 for tid in truck_ids}
        self.truck_load = {tid: 0.0 for tid in truck_ids}
        self.truck_dump = {tid: 0.0 for tid in truck_ids}
        self.truck_loader_wait = {tid: 0.0 for tid in truck_ids}
        self.truck_crusher_wait = {tid: 0.0 for tid in truck_ids}
        self.truck_load_starts = {tid: [] for tid in truck_ids}


# --------------------------------------------------------------------------- #
# Single replication
# --------------------------------------------------------------------------- #
def run_replication(
    config: ScenarioConfig,
    nodes: dict[str, Node],
    edges: dict[str, Edge],
    loaders: dict[str, Loader],
    dumps: dict[str, Dump],
    trucks: list[Truck],
    replication: int,
    log_events: bool = False,
) -> dict[str, Any]:
    """Run one 8-hour shift replication and return a metrics dictionary.

    If ``log_events`` is True a detailed event trace is returned under the
    ``"events"`` key (used for replication 0 of each scenario).
    """
    s_nodes, s_edges, s_loaders, s_dumps = apply_overrides(config, nodes, edges, loaders, dumps)

    router = Router(s_edges)
    shift_min = config.shift_length_hours * 60.0
    warmup_min = config.warmup_minutes
    travel_cv = float(config.stochasticity.get("travel_time_noise_cv", 0.0))
    road_caps_on = bool(config.routing.get("road_capacity_enabled", True))

    # Production targets.
    ore_sources = list(config.production.get("ore_sources", [l.node_id for l in s_loaders.values()]))
    dump_destination = config.production.get("dump_destination", "CRUSH")
    crusher_node = dump_destination
    dump_candidates = [d for d in s_dumps.values() if d.node_id == dump_destination]
    if not dump_candidates:
        raise RouteError(f"No dump point at destination node '{dump_destination}'.")
    crusher = dump_candidates[0]

    active_loaders = [l for l in s_loaders.values() if l.node_id in ore_sources and l.availability > 0]
    if not active_loaders:
        raise RouteError("No active loaders available for the configured ore sources.")

    fleet = trucks[: config.truck_count]
    if not fleet:
        raise RouteError("Fleet is empty; truck_count resolved to 0.")
    ref = fleet[0]  # all trucks identical in the supplied data

    # Validate every leg the dispatcher may ask for (fail loudly up-front).
    for loader in active_loaders:
        router.route_edges(ref.start_node, loader.node_id)
        router.route_edges(loader.node_id, crusher_node)
        router.route_edges(crusher_node, loader.node_id)

    # SimPy resources.
    env = simpy.Environment()
    loader_res = {l.loader_id: simpy.Resource(env, capacity=max(1, l.capacity)) for l in active_loaders}
    crusher_res = simpy.Resource(env, capacity=max(1, crusher.capacity))
    # Committed-assignment counter: trucks dispatched to a loader but not yet
    # finished loading. Used by the dispatcher so in-flight trucks are counted
    # when estimating the wait, which spreads the fleet across loaders instead
    # of herding everyone onto the nominally-nearest one.
    assigned = {l.loader_id: 0 for l in active_loaders}

    constrained_edge_ids = [e.edge_id for e in s_edges.values() if e.constrained and road_caps_on]
    edge_res = {eid: simpy.Resource(env, capacity=max(1, int(s_edges[eid].capacity))) for eid in constrained_edge_ids}

    rec = Recorder([t.truck_id for t in fleet], list(loader_res), crusher.dump_id, constrained_edge_ids)
    events: list[dict[str, Any]] = []

    def log(truck_id: str, event_type: str, *, from_node: str = "", to_node: str = "",
            location: str = "", loaded: bool | str = "", payload: float | str = "",
            resource_id: str = "", queue_length: int | str = "") -> None:
        if not log_events:
            return
        events.append({
            "time_min": round(env.now, 4),
            "replication": replication,
            "scenario_id": config.scenario_id,
            "truck_id": truck_id,
            "event_type": event_type,
            "from_node": from_node,
            "to_node": to_node,
            "location": location,
            "loaded": int(loaded) if isinstance(loaded, bool) else loaded,
            "payload_tonnes": payload,
            "resource_id": resource_id,
            "queue_length": queue_length,
        })

    def choose_loader(current_node: str) -> Loader:
        """Nearest-available-loader dispatch, tie-broken by expected cycle time.

        Score = estimated time until *this* truck starts loading
              = travel time to the loader
              + (queued + in-service requests) x mean load time.
        Ties (and the documented secondary objective) are broken by the
        loader's expected full cycle time (load + haul + dump + return).
        """
        best: Loader | None = None
        best_key: tuple[float, float, str] | None = None
        for loader in active_loaders:
            travel = router.leg_time(current_node, loader.node_id, False,
                                     ref.empty_speed_factor, ref.loaded_speed_factor)
            pending = assigned[loader.loader_id]
            est_start = travel + pending * loader.mean_load_min
            cycle = (loader.mean_load_min
                     + router.leg_time(loader.node_id, crusher_node, True,
                                       ref.empty_speed_factor, ref.loaded_speed_factor)
                     + crusher.mean_dump_min
                     + router.leg_time(crusher_node, loader.node_id, False,
                                       ref.empty_speed_factor, ref.loaded_speed_factor))
            key = (est_start, cycle, loader.loader_id)
            if best_key is None or key < best_key:
                best_key, best = key, loader
        assert best is not None
        return best

    def traverse(truck: Truck, src: str, dst: str, loaded: bool):
        """Generator: move a truck along the shortest-time route src -> dst."""
        for eid in router.route_edges(src, dst):
            edge = s_edges[eid]
            duration = edge.base_time_min(loaded, truck.empty_speed_factor,
                                          truck.loaded_speed_factor) * travel_multiplier(rng, travel_cv)
            res = edge_res.get(eid)
            payload = truck.payload_tonnes if loaded else 0
            if res is not None:
                join = env.now
                request = res.request()
                yield request
                wait = env.now - join
                rec.edge_wait[eid] += wait
                rec.edge_wait_count[eid] += 1
                log(truck.truck_id, "enter_edge", from_node=edge.from_node, to_node=edge.to_node,
                    loaded=loaded, payload=payload, resource_id=eid, queue_length=len(res.queue))
                try:
                    yield env.timeout(duration)
                finally:
                    res.release(request)
                rec.edge_busy[eid] += duration
            else:
                log(truck.truck_id, "enter_edge", from_node=edge.from_node, to_node=edge.to_node,
                    loaded=loaded, payload=payload)
                yield env.timeout(duration)
            rec.truck_travel[truck.truck_id] += duration

    def truck_process(truck: Truck):
        location = truck.start_node
        while True:
            loader = choose_loader(location)
            assigned[loader.loader_id] += 1
            log(truck.truck_id, "dispatch", location=location, to_node=loader.node_id,
                resource_id=loader.loader_id)

            # Empty travel to the assigned loader.
            yield from traverse(truck, location, loader.node_id, loaded=False)
            location = loader.node_id

            # Queue + load.
            res = loader_res[loader.loader_id]
            join = env.now
            log(truck.truck_id, "queue_loader", location=location,
                resource_id=loader.loader_id, queue_length=len(res.queue))
            request = res.request()
            yield request
            wait = env.now - join
            rec.loader_wait.append(wait)
            rec.loader_wait_by[loader.loader_id].append(wait)
            rec.truck_loader_wait[truck.truck_id] += wait
            load_time = truncated_normal(rng, loader.mean_load_min, loader.sd_load_min)
            rec.truck_load_starts[truck.truck_id].append(env.now)
            log(truck.truck_id, "load_start", location=location, resource_id=loader.loader_id)
            yield env.timeout(load_time)
            res.release(request)
            assigned[loader.loader_id] -= 1
            rec.loader_busy[loader.loader_id] += load_time
            rec.truck_load[truck.truck_id] += load_time
            log(truck.truck_id, "load_end", location=location, resource_id=loader.loader_id,
                loaded=True, payload=truck.payload_tonnes)

            # Loaded haul to the crusher.
            yield from traverse(truck, location, crusher_node, loaded=True)
            location = crusher_node

            # Queue + dump.
            join = env.now
            log(truck.truck_id, "queue_crusher", location=location, resource_id=crusher.dump_id,
                queue_length=len(crusher_res.queue), loaded=True, payload=truck.payload_tonnes)
            request = crusher_res.request()
            yield request
            wait = env.now - join
            rec.crusher_wait.append(wait)
            rec.truck_crusher_wait[truck.truck_id] += wait
            dump_time = truncated_normal(rng, crusher.mean_dump_min, crusher.sd_dump_min)
            log(truck.truck_id, "dump_start", location=location, resource_id=crusher.dump_id,
                loaded=True, payload=truck.payload_tonnes)
            yield env.timeout(dump_time)
            crusher_res.release(request)
            rec.crusher_busy += dump_time
            rec.truck_dump[truck.truck_id] += dump_time
            if env.now >= warmup_min:
                rec.tonnes += truck.payload_tonnes
                rec.dumps += 1
            log(truck.truck_id, "dump_end", location=location, resource_id=crusher.dump_id,
                loaded=False, payload=truck.payload_tonnes)
            # Loop: the empty return is the first leg of the next dispatch.

    rng = np.random.default_rng(config.base_random_seed + replication)
    for truck in fleet:
        env.process(truck_process(truck))
    env.run(until=shift_min)

    return _summarise_replication(config, replication, shift_min, warmup_min, fleet,
                                  active_loaders, crusher, constrained_edge_ids,
                                  s_edges, rec, events if log_events else None)


def _summarise_replication(config, replication, shift_min, warmup_min, fleet,
                           active_loaders, crusher, constrained_edge_ids, s_edges,
                           rec: Recorder, events: list[dict[str, Any]] | None) -> dict[str, Any]:
    window_hours = (shift_min - warmup_min) / 60.0
    seed = config.base_random_seed + replication

    cycles: list[float] = []
    for starts in rec.truck_load_starts.values():
        cycles.extend(b - a for a, b in zip(starts[:-1], starts[1:]))

    truck_utils = []
    for truck in fleet:
        productive = (rec.truck_travel[truck.truck_id]
                      + rec.truck_load[truck.truck_id]
                      + rec.truck_dump[truck.truck_id])
        truck_utils.append(min(productive / shift_min, 1.0))

    # Utilisation is the average fraction of servers busy, i.e. normalised by
    # the resource capacity, so multi-server resources stay on a 0..1 scale.
    loader_caps = {l.loader_id: max(1, l.capacity) for l in active_loaders}
    loader_util = {lid: rec.loader_busy[lid] / (shift_min * loader_caps[lid]) for lid in rec.loader_busy}
    loader_qt = {
        lid: (sum(w) / len(w) if w else 0.0)
        for lid, w in rec.loader_wait_by.items()
    }
    edge_util = {
        eid: rec.edge_busy[eid] / (shift_min * max(1, int(s_edges[eid].capacity)))
        for eid in constrained_edge_ids
    }
    edge_qt = {
        eid: (rec.edge_wait[eid] / rec.edge_wait_count[eid] if rec.edge_wait_count[eid] else 0.0)
        for eid in constrained_edge_ids
    }

    # Combined per-resource utilisation table for bottleneck ranking.
    resource_util = {crusher.dump_id: rec.crusher_busy / (shift_min * max(1, crusher.capacity))}
    resource_kind = {crusher.dump_id: "crusher"}
    resource_qt = {crusher.dump_id: (sum(rec.crusher_wait) / len(rec.crusher_wait) if rec.crusher_wait else 0.0)}
    for lid in loader_util:
        resource_util[lid] = loader_util[lid]
        resource_kind[lid] = "loader"
        resource_qt[lid] = loader_qt[lid]
    for eid in edge_util:
        resource_util[eid] = edge_util[eid]
        resource_kind[eid] = "road"
        resource_qt[eid] = edge_qt[eid]

    def _mean(values: Iterable[float]) -> float:
        values = list(values)
        return float(sum(values) / len(values)) if values else float("nan")

    metrics: dict[str, Any] = {
        "scenario_id": config.scenario_id,
        "replication": replication,
        "random_seed": seed,
        "total_tonnes_delivered": rec.tonnes,
        "tonnes_per_hour": rec.tonnes / window_hours if window_hours else float("nan"),
        "n_dumps": rec.dumps,
        "average_truck_cycle_time_min": _mean(cycles),
        "average_truck_utilisation": _mean(truck_utils),
        "crusher_utilisation": rec.crusher_busy / (shift_min * max(1, crusher.capacity)),
        "average_loader_queue_time_min": _mean(rec.loader_wait),
        "average_crusher_queue_time_min": _mean(rec.crusher_wait),
        "loader_utilisation": loader_util,
        "loader_queue_time": loader_qt,
        "edge_utilisation": edge_util,
        "edge_queue_time": edge_qt,
        "resource_utilisation": resource_util,
        "resource_kind": resource_kind,
        "resource_queue_time": resource_qt,
    }
    if events is not None:
        metrics["events"] = events
    return metrics


# --------------------------------------------------------------------------- #
# Convenience loader for the whole data directory
# --------------------------------------------------------------------------- #
@dataclass
class World:
    nodes: dict[str, Node]
    edges: dict[str, Edge]
    loaders: dict[str, Loader]
    dumps: dict[str, Dump]
    trucks: list[Truck]


def load_world(data_dir: Path) -> World:
    return World(
        nodes=load_nodes(data_dir),
        edges=load_edges(data_dir),
        loaders=load_loaders(data_dir),
        dumps=load_dumps(data_dir),
        trucks=load_trucks(data_dir),
    )