simulate.py

← Back to submission · View raw on GitHub

#!/usr/bin/env python3
"""Discrete-event mine haulage simulation for the synthetic throughput benchmark."""

from __future__ import annotations

import argparse
import copy
import csv
import json
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import networkx as nx
import numpy as np
import pandas as pd
import simpy
import yaml
from scipy import stats


REQUIRED_SCENARIOS = [
    "baseline",
    "trucks_4",
    "trucks_12",
    "ramp_upgrade",
    "crusher_slowdown",
    "ramp_closed",
]


BENCHMARK_ID = "001_synthetic_mine_throughput"
HIGH_CAPACITY = 999


@dataclass
class TrackedResource:
    resource_id: str
    resource_type: str
    capacity: int
    env: simpy.Environment
    resource: simpy.Resource = field(init=False)
    busy_time_min: float = 0.0
    queue_waits_min: list[float] = field(default_factory=list)
    queue_entries: int = 0

    def __post_init__(self) -> None:
        self.resource = simpy.Resource(self.env, capacity=self.capacity)

    def add_busy_time(self, start_min: float, duration_min: float, shift_end_min: float) -> None:
        clipped = max(0.0, min(start_min + duration_min, shift_end_min) - start_min)
        self.busy_time_min += clipped

    def utilisation(self, shift_end_min: float) -> float:
        if shift_end_min <= 0 or self.capacity <= 0:
            return 0.0
        return self.busy_time_min / (self.capacity * shift_end_min)

    def average_queue_wait(self) -> float:
        if not self.queue_waits_min:
            return 0.0
        return float(np.mean(self.queue_waits_min))


@dataclass
class TruckState:
    truck_id: str
    payload_tonnes: float
    empty_speed_factor: float
    loaded_speed_factor: float
    start_node: str
    productive_time_min: float = 0.0
    cycle_times_min: list[float] = field(default_factory=list)


class MineSimulation:
    def __init__(
        self,
        data: dict[str, pd.DataFrame],
        scenario: dict[str, Any],
        replication: int,
        random_seed: int,
        output_events: list[dict[str, Any]],
    ) -> None:
        self.data = data
        self.scenario = scenario
        self.replication = replication
        self.random_seed = random_seed
        self.rng = np.random.default_rng(random_seed)
        self.env = simpy.Environment()
        self.shift_end_min = float(scenario["simulation"]["shift_length_hours"]) * 60.0
        self.output_events = output_events

        self.nodes = self._records_by_id(data["nodes"], "node_id")
        self.edges = self._records_by_id(data["edges"], "edge_id")
        self.loaders_df = data["loaders"]
        self.dumps_df = data["dump_points"]
        self.trucks_df = data["trucks"]

        self.ore_sources = list(scenario["production"]["ore_sources"])
        self.dump_node = scenario["production"]["dump_destination"]
        self.travel_cv = float(scenario.get("stochasticity", {}).get("travel_time_noise_cv", 0.0))
        self.road_capacity_enabled = bool(
            scenario.get("routing", {}).get("road_capacity_enabled", True)
        )

        self.graph = self._build_graph()
        self.loader_by_node = self._build_loader_lookup()
        self.dump = self._build_dump_lookup()
        self.loaders = self._build_loader_resources()
        self.crusher = self._build_crusher_resource()
        self.road_resource_for_edge, self.roads = self._build_road_resources()
        self.trucks = self._build_trucks()

        self.total_tonnes_delivered = 0.0
        self.completed_cycles = 0

        self._validate_routes()

    @staticmethod
    def _records_by_id(df: pd.DataFrame, id_column: str) -> dict[str, dict[str, Any]]:
        records: dict[str, dict[str, Any]] = {}
        for record in df.to_dict(orient="records"):
            records[str(record[id_column])] = record
        return records

    def _build_graph(self) -> nx.DiGraph:
        graph = nx.DiGraph()
        for node_id in self.nodes:
            graph.add_node(node_id)

        for edge in self.edges.values():
            if parse_bool(edge.get("closed", False)):
                continue
            speed = float(edge["max_speed_kph"])
            if speed <= 0:
                continue
            distance = float(edge["distance_m"])
            base_time_min = distance / 1000.0 / speed * 60.0
            graph.add_edge(
                edge["from_node"],
                edge["to_node"],
                weight=base_time_min,
                edge_id=edge["edge_id"],
            )
        return graph

    def _build_loader_lookup(self) -> dict[str, dict[str, Any]]:
        return {
            row["node_id"]: row
            for row in self.loaders_df.to_dict(orient="records")
            if float(row.get("availability", 1.0)) > 0.0
        }

    def _build_dump_lookup(self) -> dict[str, Any]:
        matches = self.dumps_df[self.dumps_df["node_id"] == self.dump_node]
        if matches.empty:
            raise ValueError(f"No dump point is configured at destination {self.dump_node}")
        return matches.iloc[0].to_dict()

    def _build_loader_resources(self) -> dict[str, TrackedResource]:
        resources: dict[str, TrackedResource] = {}
        for node_id in self.ore_sources:
            if node_id not in self.loader_by_node:
                raise ValueError(f"Ore source {node_id} has no loader definition")
            loader = self.loader_by_node[node_id]
            resources[loader["loader_id"]] = TrackedResource(
                str(loader["loader_id"]),
                "loader",
                int(loader["capacity"]),
                self.env,
            )
        return resources

    def _build_crusher_resource(self) -> TrackedResource:
        return TrackedResource(
            str(self.dump["dump_id"]),
            "crusher",
            int(self.dump["capacity"]),
            self.env,
        )

    def _build_road_resources(self) -> tuple[dict[str, str], dict[str, TrackedResource]]:
        if not self.road_capacity_enabled:
            return {}, {}

        grouped_caps: dict[str, list[int]] = {}
        edge_to_group: dict[str, str] = {}
        for edge in self.edges.values():
            if parse_bool(edge.get("closed", False)):
                continue
            capacity = int(edge["capacity"])
            if capacity >= HIGH_CAPACITY:
                continue
            group_id = physical_road_id(edge["from_node"], edge["to_node"])
            grouped_caps.setdefault(group_id, []).append(capacity)
            edge_to_group[edge["edge_id"]] = group_id

        resources = {
            group_id: TrackedResource(
                group_id,
                "road",
                max(1, min(capacities)),
                self.env,
            )
            for group_id, capacities in grouped_caps.items()
        }
        return edge_to_group, resources

    def _build_trucks(self) -> list[TruckState]:
        count = int(self.scenario["fleet"]["truck_count"])
        available = self.trucks_df[self.trucks_df["availability"].astype(float) > 0.0].copy()
        available = available.sort_values("truck_id").head(count)
        if len(available) < count:
            raise ValueError(f"Scenario requested {count} trucks, but only {len(available)} are available")

        trucks = []
        for row in available.to_dict(orient="records"):
            trucks.append(
                TruckState(
                    truck_id=str(row["truck_id"]),
                    payload_tonnes=float(row["payload_tonnes"]),
                    empty_speed_factor=float(row["empty_speed_factor"]),
                    loaded_speed_factor=float(row["loaded_speed_factor"]),
                    start_node=str(row["start_node"]),
                )
            )
        return trucks

    def _validate_routes(self) -> None:
        origins = {truck.start_node for truck in self.trucks} | {self.dump_node}
        for origin in origins:
            for loader_node in self.ore_sources:
                self.shortest_path(origin, loader_node)
        for loader_node in self.ore_sources:
            self.shortest_path(loader_node, self.dump_node)

    def log_event(
        self,
        truck: TruckState,
        event_type: str,
        *,
        from_node: str = "",
        to_node: str = "",
        location: str = "",
        loaded: bool = False,
        payload_tonnes: float = 0.0,
        resource_id: str = "",
        queue_length: int | str = "",
    ) -> None:
        self.output_events.append(
            {
                "time_min": round(float(self.env.now), 4),
                "replication": self.replication,
                "scenario_id": self.scenario["scenario_id"],
                "truck_id": truck.truck_id,
                "event_type": event_type,
                "from_node": from_node,
                "to_node": to_node,
                "location": location,
                "loaded": bool(loaded),
                "payload_tonnes": round(float(payload_tonnes), 4),
                "resource_id": resource_id,
                "queue_length": queue_length,
            }
        )

    def shortest_path(self, source: str, target: str) -> list[str]:
        try:
            return nx.shortest_path(self.graph, source=source, target=target, weight="weight")
        except (nx.NetworkXNoPath, nx.NodeNotFound) as exc:
            scenario_id = self.scenario["scenario_id"]
            raise RuntimeError(
                f"No route from {source} to {target} in scenario {scenario_id}"
            ) from exc

    def expected_route_time(self, path: list[str], truck: TruckState, loaded: bool) -> float:
        total = 0.0
        factor = truck.loaded_speed_factor if loaded else truck.empty_speed_factor
        for from_node, to_node in zip(path[:-1], path[1:]):
            edge_id = self.graph[from_node][to_node]["edge_id"]
            edge = self.edges[edge_id]
            speed = float(edge["max_speed_kph"]) * factor
            total += float(edge["distance_m"]) / 1000.0 / speed * 60.0
        return total

    def choose_loader(self, current_node: str, truck: TruckState) -> tuple[str, str]:
        candidates = []
        for loader_node in self.ore_sources:
            loader = self.loader_by_node[loader_node]
            loader_id = str(loader["loader_id"])
            resource = self.loaders[loader_id]
            empty_path = self.shortest_path(current_node, loader_node)
            loaded_path = self.shortest_path(loader_node, self.dump_node)

            empty_time = self.expected_route_time(empty_path, truck, loaded=False)
            loaded_time = self.expected_route_time(loaded_path, truck, loaded=True)
            mean_load = float(loader["mean_load_time_min"])
            queued_work = resource.resource.count + len(resource.resource.queue)
            estimated_wait = queued_work / resource.capacity * mean_load
            score = empty_time + estimated_wait + mean_load + loaded_time
            candidates.append((score, loaded_time + mean_load, loader_id, loader_node))

        candidates.sort()
        _, _, loader_id, loader_node = candidates[0]
        return loader_id, loader_node

    def sample_service_time(self, mean: float, sd: float) -> float:
        if sd <= 0:
            return max(0.1, mean)
        for _ in range(100):
            sample = float(self.rng.normal(mean, sd))
            if sample > 0:
                return sample
        return max(0.1, mean)

    def sample_travel_time(self, edge: dict[str, Any], truck: TruckState, loaded: bool) -> float:
        speed_factor = truck.loaded_speed_factor if loaded else truck.empty_speed_factor
        speed_kph = float(edge["max_speed_kph"]) * speed_factor
        mean_min = float(edge["distance_m"]) / 1000.0 / speed_kph * 60.0
        if self.travel_cv <= 0:
            return mean_min
        sigma = math.sqrt(math.log(self.travel_cv * self.travel_cv + 1.0))
        mu = -0.5 * sigma * sigma
        return mean_min * float(self.rng.lognormal(mu, sigma))

    def add_truck_productive_time(self, truck: TruckState, start: float, duration: float) -> None:
        truck.productive_time_min += max(0.0, min(start + duration, self.shift_end_min) - start)

    def traverse_route(
        self,
        truck: TruckState,
        path: list[str],
        *,
        loaded: bool,
        payload_tonnes: float,
    ) -> Any:
        for from_node, to_node in zip(path[:-1], path[1:]):
            edge_id = self.graph[from_node][to_node]["edge_id"]
            edge = self.edges[edge_id]
            road_id = self.road_resource_for_edge.get(edge_id, "")
            duration = self.sample_travel_time(edge, truck, loaded)

            if road_id:
                road = self.roads[road_id]
                road.queue_entries += 1
                self.log_event(
                    truck,
                    "road_queue",
                    from_node=from_node,
                    to_node=to_node,
                    location=from_node,
                    loaded=loaded,
                    payload_tonnes=payload_tonnes,
                    resource_id=road_id,
                    queue_length=len(road.resource.queue),
                )
                request_time = float(self.env.now)
                with road.resource.request() as request:
                    yield request
                    wait = float(self.env.now) - request_time
                    road.queue_waits_min.append(wait)
                    self.log_event(
                        truck,
                        "road_enter",
                        from_node=from_node,
                        to_node=to_node,
                        location=from_node,
                        loaded=loaded,
                        payload_tonnes=payload_tonnes,
                        resource_id=road_id,
                        queue_length=len(road.resource.queue),
                    )
                    start = float(self.env.now)
                    road.add_busy_time(start, duration, self.shift_end_min)
                    self.add_truck_productive_time(truck, start, duration)
                    yield self.env.timeout(duration)
            else:
                self.log_event(
                    truck,
                    "road_enter",
                    from_node=from_node,
                    to_node=to_node,
                    location=from_node,
                    loaded=loaded,
                    payload_tonnes=payload_tonnes,
                    resource_id=edge_id,
                    queue_length=0,
                )
                start = float(self.env.now)
                self.add_truck_productive_time(truck, start, duration)
                yield self.env.timeout(duration)

    def load_truck(self, truck: TruckState, loader_id: str, loader_node: str) -> Any:
        loader_row = self.loader_by_node[loader_node]
        loader = self.loaders[loader_id]
        loader.queue_entries += 1
        self.log_event(
            truck,
            "loader_queue",
            location=loader_node,
            loaded=False,
            payload_tonnes=0.0,
            resource_id=loader_id,
            queue_length=len(loader.resource.queue),
        )
        request_time = float(self.env.now)
        with loader.resource.request() as request:
            yield request
            wait = float(self.env.now) - request_time
            loader.queue_waits_min.append(wait)
            self.log_event(
                truck,
                "loading_start",
                location=loader_node,
                loaded=False,
                payload_tonnes=0.0,
                resource_id=loader_id,
                queue_length=len(loader.resource.queue),
            )
            duration = self.sample_service_time(
                float(loader_row["mean_load_time_min"]),
                float(loader_row["sd_load_time_min"]),
            )
            start = float(self.env.now)
            loader.add_busy_time(start, duration, self.shift_end_min)
            self.add_truck_productive_time(truck, start, duration)
            yield self.env.timeout(duration)
            self.log_event(
                truck,
                "loading_end",
                location=loader_node,
                loaded=True,
                payload_tonnes=truck.payload_tonnes,
                resource_id=loader_id,
                queue_length=len(loader.resource.queue),
            )

    def dump_truck(self, truck: TruckState) -> Any:
        crusher = self.crusher
        crusher.queue_entries += 1
        self.log_event(
            truck,
            "crusher_queue",
            location=self.dump_node,
            loaded=True,
            payload_tonnes=truck.payload_tonnes,
            resource_id=crusher.resource_id,
            queue_length=len(crusher.resource.queue),
        )
        request_time = float(self.env.now)
        with crusher.resource.request() as request:
            yield request
            wait = float(self.env.now) - request_time
            crusher.queue_waits_min.append(wait)
            self.log_event(
                truck,
                "dumping_start",
                location=self.dump_node,
                loaded=True,
                payload_tonnes=truck.payload_tonnes,
                resource_id=crusher.resource_id,
                queue_length=len(crusher.resource.queue),
            )
            duration = self.sample_service_time(
                float(self.dump["mean_dump_time_min"]),
                float(self.dump["sd_dump_time_min"]),
            )
            start = float(self.env.now)
            crusher.add_busy_time(start, duration, self.shift_end_min)
            self.add_truck_productive_time(truck, start, duration)
            yield self.env.timeout(duration)
            if self.env.now <= self.shift_end_min:
                self.total_tonnes_delivered += truck.payload_tonnes
                self.completed_cycles += 1
                self.log_event(
                    truck,
                    "dumping_end",
                    location=self.dump_node,
                    loaded=False,
                    payload_tonnes=truck.payload_tonnes,
                    resource_id=crusher.resource_id,
                    queue_length=len(crusher.resource.queue),
                )

    def truck_process(self, truck: TruckState) -> Any:
        current_node = truck.start_node
        self.log_event(truck, "truck_dispatched", location=current_node)

        while self.env.now < self.shift_end_min:
            cycle_start = float(self.env.now)
            loader_id, loader_node = self.choose_loader(current_node, truck)
            self.log_event(
                truck,
                "dispatch_to_loader",
                from_node=current_node,
                to_node=loader_node,
                location=current_node,
                resource_id=loader_id,
            )

            empty_path = self.shortest_path(current_node, loader_node)
            yield from self.traverse_route(
                truck,
                empty_path,
                loaded=False,
                payload_tonnes=0.0,
            )
            current_node = loader_node
            if self.env.now >= self.shift_end_min:
                break

            yield from self.load_truck(truck, loader_id, loader_node)
            if self.env.now >= self.shift_end_min:
                break

            loaded_path = self.shortest_path(loader_node, self.dump_node)
            yield from self.traverse_route(
                truck,
                loaded_path,
                loaded=True,
                payload_tonnes=truck.payload_tonnes,
            )
            current_node = self.dump_node
            if self.env.now >= self.shift_end_min:
                break

            before_completed = self.completed_cycles
            yield from self.dump_truck(truck)
            if self.completed_cycles > before_completed:
                truck.cycle_times_min.append(float(self.env.now) - cycle_start)

    def run(self) -> dict[str, Any]:
        for truck in self.trucks:
            self.env.process(self.truck_process(truck))
        self.env.run(until=self.shift_end_min + 1e-9)

        loader_waits = [
            wait
            for loader in self.loaders.values()
            for wait in loader.queue_waits_min
        ]
        cycle_times = [cycle for truck in self.trucks for cycle in truck.cycle_times_min]
        productive_utils = [
            truck.productive_time_min / self.shift_end_min
            for truck in self.trucks
        ]
        road_waits = [
            wait
            for road in self.roads.values()
            for wait in road.queue_waits_min
        ]

        result: dict[str, Any] = {
            "scenario_id": self.scenario["scenario_id"],
            "replication": self.replication,
            "random_seed": self.random_seed,
            "truck_count": len(self.trucks),
            "total_tonnes_delivered": self.total_tonnes_delivered,
            "tonnes_per_hour": self.total_tonnes_delivered / (self.shift_end_min / 60.0),
            "average_truck_cycle_time_min": safe_mean(cycle_times),
            "average_truck_utilisation": safe_mean(productive_utils),
            "crusher_utilisation": self.crusher.utilisation(self.shift_end_min),
            "average_loader_queue_time_min": safe_mean(loader_waits),
            "average_crusher_queue_time_min": self.crusher.average_queue_wait(),
            "average_road_queue_time_min": safe_mean(road_waits),
            "completed_cycles": self.completed_cycles,
        }

        for loader_id, loader in sorted(self.loaders.items()):
            result[f"loader_utilisation_{loader_id}"] = loader.utilisation(self.shift_end_min)
            result[f"loader_queue_time_{loader_id}_min"] = loader.average_queue_wait()
        for road_id, road in sorted(self.roads.items()):
            safe_id = encode_resource_id(road_id)
            result[f"road_utilisation_{safe_id}"] = road.utilisation(self.shift_end_min)
            result[f"road_queue_time_{safe_id}_min"] = road.average_queue_wait()
        return result


def parse_bool(value: Any) -> bool:
    if isinstance(value, bool):
        return value
    if value is None or (isinstance(value, float) and math.isnan(value)):
        return False
    return str(value).strip().lower() in {"true", "1", "yes", "y"}


def physical_road_id(from_node: str, to_node: str) -> str:
    a, b = sorted([str(from_node), str(to_node)])
    return f"road:{a}-{b}"


def encode_resource_id(resource_id: str) -> str:
    return resource_id.replace("_", "__underscore__").replace(":", "__colon__").replace("-", "__dash__")


def decode_resource_id(encoded: str) -> str:
    return (
        encoded.replace("__dash__", "-")
        .replace("__colon__", ":")
        .replace("__underscore__", "_")
    )


def safe_mean(values: list[float]) -> float:
    if not values:
        return 0.0
    return float(np.mean(values))


def ci95(values: pd.Series) -> tuple[float, float, float]:
    clean = values.astype(float).dropna()
    mean = float(clean.mean()) if len(clean) else 0.0
    if len(clean) <= 1:
        return mean, mean, mean
    sem = float(clean.sem())
    half_width = float(stats.t.ppf(0.975, len(clean) - 1) * sem)
    return mean, mean - half_width, mean + half_width


def deep_merge(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]:
    merged = copy.deepcopy(base)
    for key, value in overlay.items():
        if key == "inherits":
            continue
        if isinstance(value, dict) and isinstance(merged.get(key), dict):
            merged[key] = deep_merge(merged[key], value)
        else:
            merged[key] = copy.deepcopy(value)
    return merged


def load_yaml(path: Path) -> dict[str, Any]:
    with path.open("r", encoding="utf-8") as handle:
        return yaml.safe_load(handle)


def load_scenario(scenarios_dir: Path, scenario_id: str) -> dict[str, Any]:
    raw = load_yaml(scenarios_dir / f"{scenario_id}.yaml")
    parent_id = raw.get("inherits")
    if parent_id:
        parent = load_scenario(scenarios_dir, parent_id)
        return deep_merge(parent, raw)
    return raw


def apply_overrides(data: dict[str, pd.DataFrame], scenario: dict[str, Any]) -> dict[str, pd.DataFrame]:
    copied = {name: df.copy(deep=True) for name, df in data.items()}

    for edge_id, overrides in scenario.get("edge_overrides", {}).items():
        apply_dataframe_overrides(copied["edges"], "edge_id", edge_id, overrides)
    for node_id, overrides in scenario.get("node_overrides", {}).items():
        apply_dataframe_overrides(copied["nodes"], "node_id", node_id, overrides)
    for dump_id, overrides in scenario.get("dump_point_overrides", {}).items():
        apply_dataframe_overrides(copied["dump_points"], "dump_id", dump_id, overrides)
    for loader_id, overrides in scenario.get("loader_overrides", {}).items():
        apply_dataframe_overrides(copied["loaders"], "loader_id", loader_id, overrides)
    for truck_id, overrides in scenario.get("truck_overrides", {}).items():
        apply_dataframe_overrides(copied["trucks"], "truck_id", truck_id, overrides)

    copied["edges"]["closed"] = copied["edges"]["closed"].map(parse_bool)
    return copied


def apply_dataframe_overrides(
    df: pd.DataFrame,
    id_column: str,
    record_id: str,
    overrides: dict[str, Any],
) -> None:
    mask = df[id_column] == record_id
    if not mask.any():
        raise ValueError(f"Cannot apply override for missing {id_column}={record_id}")
    for column, value in overrides.items():
        if column not in df.columns:
            raise ValueError(f"Cannot apply override to missing column {column}")
        df.loc[mask, column] = value


def load_input_data(data_dir: Path) -> dict[str, pd.DataFrame]:
    return {
        "nodes": pd.read_csv(data_dir / "nodes.csv"),
        "edges": pd.read_csv(data_dir / "edges.csv"),
        "trucks": pd.read_csv(data_dir / "trucks.csv"),
        "loaders": pd.read_csv(data_dir / "loaders.csv"),
        "dump_points": pd.read_csv(data_dir / "dump_points.csv"),
    }


def run_experiments(data_dir: Path, scenario_ids: list[str]) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, Any]]:
    base_data = load_input_data(data_dir)
    scenarios_dir = data_dir / "scenarios"
    all_results: list[dict[str, Any]] = []
    all_events: list[dict[str, Any]] = []
    summary_scenarios: dict[str, Any] = {}

    for scenario_id in scenario_ids:
        scenario = load_scenario(scenarios_dir, scenario_id)
        replications = int(scenario["simulation"]["replications"])
        base_seed = int(scenario["simulation"]["base_random_seed"])
        scenario_results = []

        for replication in range(1, replications + 1):
            random_seed = base_seed + replication - 1
            scenario_data = apply_overrides(base_data, scenario)
            simulation = MineSimulation(
                scenario_data,
                scenario,
                replication,
                random_seed,
                all_events,
            )
            result = simulation.run()
            all_results.append(result)
            scenario_results.append(result)

        scenario_df = pd.DataFrame(scenario_results)
        summary_scenarios[scenario_id] = summarize_scenario(scenario, scenario_df)

    results_df = pd.DataFrame(all_results)
    events_df = pd.DataFrame(all_events)
    summary = {
        "benchmark_id": BENCHMARK_ID,
        "scenarios": summary_scenarios,
        "key_assumptions": [
            "Trucks are continuously dispatched while the 8-hour shift is active; throughput is counted only when dumping completes before the shift cutoff.",
            "Scenario fleet size selects the first available truck records from trucks.csv; truck availability is treated as deterministic inclusion when greater than zero.",
            "Roads with capacity below 999 are modelled as SimPy resources, paired by their two endpoint nodes so opposite directions share a physical constrained segment.",
            "Routing uses shortest expected travel time over open directed edges; stochastic travel time is applied after route selection.",
            "Loading and dumping times are positive truncated normal samples using the means and standard deviations in the input data.",
        ],
        "model_limitations": [
            "No truck breakdowns, refuelling, operator shift changes, blasting delays, or maintenance events are modelled.",
            "Dispatch uses current queue lengths and mean service times, not a global optimiser or look-ahead controller.",
            "Road resource occupancy is first-come, first-served and does not model passing bays, priority rules, or detailed traffic interactions.",
            "Crusher feed is represented only by dump service time and a single dumping resource; downstream plant constraints are outside the system boundary.",
        ],
        "additional_scenarios_proposed": [
            {
                "scenario_id": "loader_upgrade_south",
                "description": "Test whether a second or faster south-pit loader would unlock more value than adding trucks once the crusher and ramp are near saturation.",
            }
        ],
    }
    return results_df, events_df, summary


def summarize_scenario(scenario: dict[str, Any], scenario_df: pd.DataFrame) -> dict[str, Any]:
    total_mean, total_low, total_high = ci95(scenario_df["total_tonnes_delivered"])
    tph_mean, tph_low, tph_high = ci95(scenario_df["tonnes_per_hour"])

    loader_utilisation = {}
    for column in sorted(c for c in scenario_df.columns if c.startswith("loader_utilisation_")):
        loader_id = column.replace("loader_utilisation_", "")
        loader_utilisation[loader_id] = float(scenario_df[column].mean())

    bottlenecks = identify_bottlenecks(scenario_df)

    return {
        "replications": int(len(scenario_df)),
        "shift_length_hours": float(scenario["simulation"]["shift_length_hours"]),
        "truck_count": int(scenario["fleet"]["truck_count"]),
        "total_tonnes_mean": total_mean,
        "total_tonnes_ci95_low": total_low,
        "total_tonnes_ci95_high": total_high,
        "tonnes_per_hour_mean": tph_mean,
        "tonnes_per_hour_ci95_low": tph_low,
        "tonnes_per_hour_ci95_high": tph_high,
        "average_cycle_time_min": float(scenario_df["average_truck_cycle_time_min"].mean()),
        "truck_utilisation_mean": float(scenario_df["average_truck_utilisation"].mean()),
        "loader_utilisation": loader_utilisation,
        "crusher_utilisation": float(scenario_df["crusher_utilisation"].mean()),
        "average_loader_queue_time_min": float(scenario_df["average_loader_queue_time_min"].mean()),
        "average_crusher_queue_time_min": float(scenario_df["average_crusher_queue_time_min"].mean()),
        "top_bottlenecks": bottlenecks,
    }


def identify_bottlenecks(scenario_df: pd.DataFrame) -> list[dict[str, Any]]:
    candidates: list[dict[str, Any]] = [
        {
            "resource_id": "D_CRUSH",
            "resource_type": "crusher",
            "utilisation_mean": float(scenario_df["crusher_utilisation"].mean()),
            "average_queue_wait_min": float(scenario_df["average_crusher_queue_time_min"].mean()),
        }
    ]

    for column in sorted(c for c in scenario_df.columns if c.startswith("loader_utilisation_")):
        loader_id = column.replace("loader_utilisation_", "")
        queue_column = f"loader_queue_time_{loader_id}_min"
        candidates.append(
            {
                "resource_id": loader_id,
                "resource_type": "loader",
                "utilisation_mean": float(scenario_df[column].mean()),
                "average_queue_wait_min": float(scenario_df.get(queue_column, pd.Series([0.0])).mean()),
            }
        )

    for column in sorted(c for c in scenario_df.columns if c.startswith("road_utilisation_")):
        safe_id = column.replace("road_utilisation_", "")
        road_id = decode_resource_id(safe_id)
        queue_column = f"road_queue_time_{safe_id}_min"
        candidates.append(
            {
                "resource_id": road_id,
                "resource_type": "road",
                "utilisation_mean": float(scenario_df[column].mean()),
                "average_queue_wait_min": float(scenario_df.get(queue_column, pd.Series([0.0])).mean()),
            }
        )

    candidates.sort(
        key=lambda item: (
            item["utilisation_mean"],
            item["average_queue_wait_min"],
        ),
        reverse=True,
    )
    return candidates[:5]


def write_outputs(
    output_dir: Path,
    results_df: pd.DataFrame,
    events_df: pd.DataFrame,
    summary: dict[str, Any],
) -> None:
    output_dir.mkdir(parents=True, exist_ok=True)
    results_df.to_csv(output_dir / "results.csv", index=False)
    events_df.to_csv(output_dir / "event_log.csv", index=False, quoting=csv.QUOTE_MINIMAL)
    with (output_dir / "summary.json").open("w", encoding="utf-8") as handle:
        json.dump(summary, handle, indent=2)
        handle.write("\n")


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--data-dir", type=Path, default=Path("data"))
    parser.add_argument("--output-dir", type=Path, default=Path("."))
    parser.add_argument(
        "--scenarios",
        nargs="+",
        default=REQUIRED_SCENARIOS,
        help="Scenario ids to run, without .yaml extension.",
    )
    return parser.parse_args()


def main() -> None:
    args = parse_args()
    results_df, events_df, summary = run_experiments(args.data_dir, args.scenarios)
    write_outputs(args.output_dir, results_df, events_df, summary)

    print(f"Wrote {args.output_dir / 'results.csv'} ({len(results_df)} rows)")
    print(f"Wrote {args.output_dir / 'event_log.csv'} ({len(events_df)} rows)")
    print(f"Wrote {args.output_dir / 'summary.json'}")


if __name__ == "__main__":
    main()