run_experiment.py

← Back to submission · View raw on GitHub

"""Experiment runner for the synthetic mine throughput benchmark.

Runs every required scenario (plus one optional agent-proposed scenario) for
N replications of an 8-hour shift and writes the benchmark deliverables:

    results.csv      - one row per (scenario, replication)
    summary.json     - per-scenario means, 95% CIs and bottleneck ranking
    event_log.csv    - full event trace of replication 0 of each scenario
    run_metrics.json - wall-clock runtime / return code for this run

Usage
-----
    python run_experiment.py                 # all scenarios, 30 reps each
    python run_experiment.py --replications 5
    python run_experiment.py --scenarios baseline,ramp_closed

All output paths default to this script's directory, so the run is
self-contained and has no hard-coded absolute paths.
"""

from __future__ import annotations

import argparse
import csv
import json
import math
import sys
import time
from pathlib import Path
from typing import Any

import numpy as np
from scipy import stats

import mine_sim

HERE = Path(__file__).resolve().parent

# Required scenarios in reporting order, then the optional proposed scenario.
REQUIRED_SCENARIOS = [
    "baseline",
    "trucks_4",
    "trucks_12",
    "ramp_upgrade",
    "crusher_slowdown",
    "ramp_closed",
]
ADDITIONAL_SCENARIOS = ["crusher_debottleneck"]

# Curated road segments surfaced as explicit columns in results.csv. These are
# the capacity-1 segments of interest for bottleneck analysis.
CURATED_EDGES = ["E03_UP", "E05_TO_CRUSH", "E07_TO_LOAD_N", "E09_TO_LOAD_S"]

EVENT_COLUMNS = [
    "time_min", "replication", "scenario_id", "truck_id", "event_type",
    "from_node", "to_node", "location", "loaded", "payload_tonnes",
    "resource_id", "queue_length",
]

KEY_ASSUMPTIONS = [
    "Trucks start the shift at the parking area (PARK) empty and are dispatched "
    "to a loader; thereafter they cycle loader<->crusher and are not returned to "
    "PARK, so the narrow ramp E03 is only traversed during initial positioning.",
    "Ore is hauled only to the primary crusher (CRUSH); the waste dump and "
    "maintenance bay are outside the modelled ore-production boundary.",
    "Loading and dumping times are truncated-normal with the means/SDs in the "
    "data; per-edge travel time has multiplicative log-normal noise (mean 1, "
    "CV 0.10) on top of distance / (max_speed x truck speed factor).",
    "Capacity-1 road segments (ramp, crusher approach, pit access roads) are "
    "single-server SimPy resources; the two directions of each physical road "
    "are modelled as independent one-way resources, matching the data's "
    "separate-edge representation.",
    "All trucks and loaders have availability 1.0 in the data, so no breakdowns "
    "or refuelling are modelled.",
    "Dispatch is nearest-available-loader (travel + queue-aware wait), "
    "tie-broken by shortest expected cycle time, per the baseline config.",
    "warmup_minutes is 0 in every supplied scenario, so the full 8-hour shift "
    "(including the start-up transient) is measured, matching a whole-shift "
    "tonnage question.",
]

MODEL_LIMITATIONS = [
    "Given the supplied topology the narrow ramp is NOT on the loaded-haul "
    "cycle, so ramp upgrade/closure affect throughput only via start-of-shift "
    "positioning; the recurring bottleneck is the crusher complex.",
    "Bidirectional single-lane roads are modelled as two independent one-way "
    "capacity-1 resources, which understates head-on contention (acceptable "
    "here because flows are directional per leg).",
    "No grade/rimpull engine: speed limits come straight from the edge data via "
    "a single loaded speed factor (0.85), not from a physics-based haul model.",
    "Loaders and the crusher are simple single servers with no spotting time, "
    "shift breaks, blast windows, weather or operator variability.",
    "Truck utilisation counts only productive time (travel + load + dump); time "
    "queued at a resource is treated as non-productive idle.",
]


# --------------------------------------------------------------------------- #
# Statistics helpers
# --------------------------------------------------------------------------- #
def mean(values: list[float]) -> float:
    vals = [v for v in values if v is not None and not _isnan(v)]
    return float(np.mean(vals)) if vals else float("nan")


def ci95(values: list[float]) -> tuple[float, float]:
    """Two-sided 95% confidence interval for the mean (Student-t)."""
    vals = [v for v in values if v is not None and not _isnan(v)]
    n = len(vals)
    m = float(np.mean(vals)) if vals else float("nan")
    if n < 2:
        return m, m
    sd = float(np.std(vals, ddof=1))
    if sd == 0:
        return m, m
    half = float(stats.t.ppf(0.975, n - 1)) * sd / math.sqrt(n)
    return m - half, m + half


def _isnan(value: float) -> bool:
    return isinstance(value, float) and math.isnan(value)


def r(value: float, ndigits: int = 3) -> float | None:
    if value is None or _isnan(value):
        return None
    return round(float(value), ndigits)


# --------------------------------------------------------------------------- #
# Scenario discovery
# --------------------------------------------------------------------------- #
def resolve_scenario_path(name: str, data_dir: Path, additional_dir: Path) -> Path:
    candidates = [data_dir / "scenarios" / f"{name}.yaml", additional_dir / f"{name}.yaml"]
    for path in candidates:
        if path.exists():
            return path
    raise FileNotFoundError(f"Scenario '{name}' not found in {candidates}")


# --------------------------------------------------------------------------- #
# Aggregation
# --------------------------------------------------------------------------- #
def flatten_result_row(m: dict[str, Any]) -> dict[str, Any]:
    """Select the scalar columns written to results.csv for one replication."""
    row = {
        "scenario_id": m["scenario_id"],
        "replication": m["replication"],
        "random_seed": m["random_seed"],
        "total_tonnes_delivered": r(m["total_tonnes_delivered"], 1),
        "tonnes_per_hour": r(m["tonnes_per_hour"], 2),
        "n_dumps": m["n_dumps"],
        "average_truck_cycle_time_min": r(m["average_truck_cycle_time_min"], 2),
        "average_truck_utilisation": r(m["average_truck_utilisation"], 4),
        "crusher_utilisation": r(m["crusher_utilisation"], 4),
        "average_loader_queue_time_min": r(m["average_loader_queue_time_min"], 3),
        "average_crusher_queue_time_min": r(m["average_crusher_queue_time_min"], 3),
    }
    for lid in sorted(m["loader_utilisation"]):
        row[f"utilisation_{lid}"] = r(m["loader_utilisation"][lid], 4)
        row[f"queue_time_min_{lid}"] = r(m["loader_queue_time"][lid], 3)
    for eid in CURATED_EDGES:
        row[f"utilisation_{eid}"] = r(m["edge_utilisation"][eid], 4) if eid in m["edge_utilisation"] else ""
        row[f"queue_time_min_{eid}"] = r(m["edge_queue_time"][eid], 3) if eid in m["edge_queue_time"] else ""
    return row


def summarise_scenario(scenario_id: str, description: str, shift_hours: float,
                       reps: list[dict[str, Any]]) -> dict[str, Any]:
    tonnes = [m["total_tonnes_delivered"] for m in reps]
    tph = [m["tonnes_per_hour"] for m in reps]
    t_low, t_high = ci95(tonnes)
    h_low, h_high = ci95(tph)

    loader_ids = sorted(reps[0]["loader_utilisation"])
    loader_util = {lid: r(mean([m["loader_utilisation"][lid] for m in reps]), 4) for lid in loader_ids}

    # Mean utilisation / queue per resource across reps, for bottleneck ranking.
    resource_ids = list(reps[0]["resource_utilisation"])
    bottlenecks = []
    for rid in resource_ids:
        util = mean([m["resource_utilisation"][rid] for m in reps])
        qt = mean([m["resource_queue_time"][rid] for m in reps])
        bottlenecks.append({
            "resource_id": rid,
            "kind": reps[0]["resource_kind"][rid],
            "mean_utilisation": r(util, 4),
            "mean_queue_time_min": r(qt, 3),
        })
    bottlenecks.sort(key=lambda b: (b["mean_utilisation"] is None, -(b["mean_utilisation"] or 0)))

    return {
        "description": description,
        "replications": len(reps),
        "shift_length_hours": shift_hours,
        "total_tonnes_mean": r(mean(tonnes), 1),
        "total_tonnes_ci95_low": r(t_low, 1),
        "total_tonnes_ci95_high": r(t_high, 1),
        "tonnes_per_hour_mean": r(mean(tph), 2),
        "tonnes_per_hour_ci95_low": r(h_low, 2),
        "tonnes_per_hour_ci95_high": r(h_high, 2),
        "average_cycle_time_min": r(mean([m["average_truck_cycle_time_min"] for m in reps]), 2),
        "truck_utilisation_mean": r(mean([m["average_truck_utilisation"] for m in reps]), 4),
        "loader_utilisation": loader_util,
        "crusher_utilisation": r(mean([m["crusher_utilisation"] for m in reps]), 4),
        "average_loader_queue_time_min": r(mean([m["average_loader_queue_time_min"] for m in reps]), 3),
        "average_crusher_queue_time_min": r(mean([m["average_crusher_queue_time_min"] for m in reps]), 3),
        "mean_dumps": r(mean([float(m["n_dumps"]) for m in reps]), 1),
        "top_bottlenecks": bottlenecks[:5],
    }


# --------------------------------------------------------------------------- #
# Main
# --------------------------------------------------------------------------- #
def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(description=__doc__,
                                      formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument("--data-dir", type=Path, default=HERE / "data")
    parser.add_argument("--additional-dir", type=Path, default=HERE / "additional_scenarios")
    parser.add_argument("--out-dir", type=Path, default=HERE)
    parser.add_argument("--replications", type=int, default=None,
                        help="Override the replication count from the scenario config.")
    parser.add_argument("--scenarios", type=str, default=None,
                        help="Comma-separated subset of scenarios to run.")
    parser.add_argument("--log-replication", type=int, default=0,
                        help="Replication index logged in full to event_log.csv.")
    parser.add_argument("--no-metrics", action="store_true",
                        help="Do not write run_metrics.json.")
    parser.add_argument("--quiet", action="store_true")
    args = parser.parse_args(argv)

    if args.scenarios:
        scenario_names = [s.strip() for s in args.scenarios.split(",") if s.strip()]
    else:
        scenario_names = REQUIRED_SCENARIOS + [
            s for s in ADDITIONAL_SCENARIOS
            if (args.additional_dir / f"{s}.yaml").exists()
        ]

    world = mine_sim.load_world(args.data_dir)
    started = time.perf_counter()

    results_rows: list[dict[str, Any]] = []
    event_rows: list[dict[str, Any]] = []
    summary_scenarios: dict[str, Any] = {}

    for name in scenario_names:
        path = resolve_scenario_path(name, args.data_dir, args.additional_dir)
        config = mine_sim.build_scenario_config(path, base_dir=args.data_dir / "scenarios")
        reps_to_run = args.replications or config.replications

        rep_metrics: list[dict[str, Any]] = []
        for rep in range(reps_to_run):
            metrics = mine_sim.run_replication(
                config, world.nodes, world.edges, world.loaders, world.dumps,
                world.trucks, replication=rep, log_events=(rep == args.log_replication),
            )
            if "events" in metrics:
                event_rows.extend(metrics.pop("events"))
            results_rows.append(flatten_result_row(metrics))
            rep_metrics.append(metrics)

        summary_scenarios[name] = summarise_scenario(
            name, config.description, config.shift_length_hours, rep_metrics)
        if not args.quiet:
            s = summary_scenarios[name]
            print(f"{name:<22} tonnes={s['total_tonnes_mean']:>8} "
                  f"[{s['total_tonnes_ci95_low']}, {s['total_tonnes_ci95_high']}]  "
                  f"t/h={s['tonnes_per_hour_mean']:>7}  "
                  f"crusher_util={s['crusher_utilisation']}  "
                  f"top={s['top_bottlenecks'][0]['resource_id']}")

    elapsed = time.perf_counter() - started

    # ---- write results.csv ----
    args.out_dir.mkdir(parents=True, exist_ok=True)
    result_columns = list(results_rows[0].keys())
    with (args.out_dir / "results.csv").open("w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=result_columns)
        writer.writeheader()
        writer.writerows(results_rows)

    # ---- write event_log.csv ----
    with (args.out_dir / "event_log.csv").open("w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=EVENT_COLUMNS)
        writer.writeheader()
        writer.writerows(event_rows)

    # ---- write summary.json ----
    summary = build_summary(summary_scenarios)
    (args.out_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8")

    # ---- write run_metrics.json ----
    if not args.no_metrics:
        command = "python run_experiment.py" + (
            f" --replications {args.replications}" if args.replications else "")
        metrics_doc = {
            "command": command,
            "runtime_seconds": round(elapsed, 2),
            "return_code": 0,
            "timed_out": False,
            "scenarios_run": scenario_names,
            "replications_per_scenario": args.replications or "per-scenario config (30)",
            "python_version": sys.version.split()[0],
            "note": "Produced by run_experiment.py itself (self-timed, in-folder).",
        }
        (args.out_dir / "run_metrics.json").write_text(json.dumps(metrics_doc, indent=2), encoding="utf-8")

    if not args.quiet:
        print(f"\nWrote results.csv ({len(results_rows)} rows), "
              f"event_log.csv ({len(event_rows)} rows), summary.json, run_metrics.json")
        print(f"Total runtime: {elapsed:.1f}s")
    return 0


def build_summary(summary_scenarios: dict[str, Any]) -> dict[str, Any]:
    proposed = []
    if "crusher_debottleneck" in summary_scenarios and "baseline" in summary_scenarios:
        base = summary_scenarios["baseline"]["total_tonnes_mean"]
        deb = summary_scenarios["crusher_debottleneck"]["total_tonnes_mean"]
        uplift = round(100.0 * (deb - base) / base, 1) if base else None
        proposed.append({
            "scenario_id": "crusher_debottleneck",
            "rationale": "Add a second crusher dump bay and second approach lane "
                         "(D_CRUSH and E05 capacity 2) to test how much the crusher "
                         "complex caps throughput.",
            "total_tonnes_mean": deb,
            "uplift_vs_baseline_pct": uplift,
        })
    return {
        "benchmark_id": "001_synthetic_mine_throughput",
        "scenarios": summary_scenarios,
        "key_assumptions": KEY_ASSUMPTIONS,
        "model_limitations": MODEL_LIMITATIONS,
        "additional_scenarios_proposed": proposed,
    }


if __name__ == "__main__":
    raise SystemExit(main())