src/run_experiments.py

← Back to submission · View raw on GitHub

"""Replication runner that drives all required scenarios and writes outputs.

Reads scenario YAML files, applies them to the topology, runs N replications per
scenario, aggregates per-replication and per-scenario metrics, and writes:

- ``results.csv``    one row per (scenario, replication)
- ``summary.json``   aggregated per-scenario summary statistics
- ``event_log.csv``  trace of state transitions (capped to first M reps per scenario)
"""
from __future__ import annotations

import json
import time
from pathlib import Path
from typing import Any

import pandas as pd

from .metrics import aggregate_replication, aggregate_scenario
from .scenario import list_scenarios, load_scenario
from .simulation import run_simulation
from .topology import apply_scenario, load_topology


_DEFAULT_SCENARIOS_ORDER = [
    "baseline",
    "trucks_4",
    "trucks_12",
    "ramp_upgrade",
    "crusher_slowdown",
    "ramp_closed",
]

_KEY_ASSUMPTIONS = [
    "Loader and crusher service times are truncated normal (mean and SD from input data).",
    "Travel time per edge has multiplicative log-symmetric noise with CV=0.10.",
    "Routing uses shortest-time Dijkstra over open edges; edge weights = distance / (max_speed * speed_factor) with empty=1.0 and loaded=0.85.",
    "Dispatch policy is nearest_available_loader with tie-break by lowest expected cycle time including queue wait estimate.",
    "Capacity-constrained edges (capacity<999) are modelled as SimPy Resources; per-direction counters.",
    "All trucks start at PARK at t=0; the runner honours simulation.warmup_minutes (0 in shipped scenarios) by excluding pre-warmup events from utilisation, queue-time means, tonnes counting, and the tonnes/h denominator.",
    "A cycle is counted toward delivered tonnes if warmup_minutes <= dump_start < shift_end (480 min); in-flight dumps complete during a 60-minute grace window so trucks already at the crusher record their delivery.",
    "Per-replication seed is base_random_seed + replication_index for reproducibility.",
    "Top bottlenecks are ranked by (mean utilisation desc, mean queue time desc) so high-utilisation steady-state constraints rank above pure startup transients.",
]

_MODEL_LIMITATIONS = [
    "No equipment breakdowns, maintenance interruptions, or operator unavailability.",
    "No shift changes, refuelling, or operator breaks during the 8-hour window.",
    "Loaded and empty travel are modelled as separate directional resources; opposing traffic on a physically single-lane segment does not interact via meet-and-pass logic.",
    "Bypass route capacity (E15-E17) is treated as unconstrained; in reality bypasses may have width or speed limits not captured here.",
    "Ore is delivered in discrete 100-tonne payloads; partial deliveries are not modelled.",
    "Truck acceleration/deceleration profiles and switchback effects are not represented; speed factor is constant per edge.",
    "Loader and crusher service times use a simple truncated normal; bimodal or heavy-tailed effects (e.g. blocked chutes) are not captured.",
    "Warmup support is implemented; current scenarios use warmup=0 so startup effects (e.g. all trucks dispatching from PARK at t=0 and converging on the ramp) remain visible. Use --warmup-minutes on the CLI for ad-hoc analysis under non-default warmup.",
]


def run_all(
    data_dir: Path | str,
    output_dir: Path | str,
    scenarios: list[str] | None = None,
    replications_override: int | None = None,
    event_log_max_reps_per_scenario: int = 5,
    warmup_minutes_override: float | None = None,
    verbose: bool = True,
) -> dict[str, Any]:
    """Run all requested scenarios and write results.csv, summary.json, event_log.csv.

    ``warmup_minutes_override`` (CLI knob) replaces the per-scenario
    ``simulation.warmup_minutes`` value when not None. Use ``0`` to disable
    warmup explicitly; leave ``None`` to honour the YAML config (currently 0
    in every shipped scenario).
    """
    data_dir = Path(data_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    topology = load_topology(data_dir)
    scenarios_dir = data_dir / "scenarios"

    if scenarios is None or (len(scenarios) == 1 and scenarios[0] == "all"):
        # Default to canonical order then append any extras the user dropped in.
        available = list_scenarios(scenarios_dir)
        ordered = [s for s in _DEFAULT_SCENARIOS_ORDER if s in available]
        extras = [s for s in available if s not in _DEFAULT_SCENARIOS_ORDER]
        scenario_ids = ordered + extras
    else:
        scenario_ids = list(scenarios)

    all_rep_rows: list[dict[str, Any]] = []
    summary: dict[str, Any] = {
        "benchmark_id": "001_synthetic_mine_throughput",
        "scenarios": {},
        "key_assumptions": list(_KEY_ASSUMPTIONS),
        "model_limitations": list(_MODEL_LIMITATIONS),
        "additional_scenarios_proposed": [],
    }
    event_log_rows: list[dict[str, Any]] = []

    overall_start = time.perf_counter()

    for scenario_id in scenario_ids:
        scenario = load_scenario(scenarios_dir, scenario_id)
        if warmup_minutes_override is not None:
            scenario = _with_warmup_override(scenario, float(warmup_minutes_override))
        scenario_topology = apply_scenario(topology, scenario)

        sim_cfg = scenario.get("simulation", {})
        shift_hours = float(sim_cfg.get("shift_length_hours", 8))
        warmup_min_used = float(sim_cfg.get("warmup_minutes", 0) or 0)
        replications = int(replications_override or sim_cfg.get("replications", 30))
        base_seed = int(sim_cfg.get("base_random_seed", 12345))

        rep_metrics_list: list[dict[str, Any]] = []

        scen_start = time.perf_counter()
        for rep in range(replications):
            seed = base_seed + rep
            sim_result = run_simulation(
                scenario_topology, scenario, replication=rep, seed=seed
            )
            rep_metrics = aggregate_replication(sim_result)
            rep_metrics["shift_length_hours"] = shift_hours
            rep_metrics_list.append(rep_metrics)

            row = _flatten_results_row(rep_metrics)
            all_rep_rows.append(row)

            if rep < event_log_max_reps_per_scenario:
                event_log_rows.extend(sim_result.events)

            if verbose:
                print(
                    f"  [{scenario_id}] rep {rep+1}/{replications}  "
                    f"tonnes={rep_metrics['total_tonnes_delivered']:.0f}  "
                    f"t/h={rep_metrics['tonnes_per_hour']:.1f}",
                    flush=True,
                )

        scen_elapsed = time.perf_counter() - scen_start

        scenario_summary = aggregate_scenario(rep_metrics_list)
        scenario_summary["shift_length_hours"] = shift_hours
        scenario_summary["warmup_minutes"] = warmup_min_used
        scenario_summary["wallclock_seconds"] = round(scen_elapsed, 2)
        summary["scenarios"][scenario_id] = scenario_summary

        if verbose:
            print(
                f"[{scenario_id}] {replications} reps in {scen_elapsed:.1f}s  "
                f"t/h mean={scenario_summary['tonnes_per_hour_mean']:.1f} "
                f"95% CI=[{scenario_summary['tonnes_per_hour_ci95_low']:.1f}, "
                f"{scenario_summary['tonnes_per_hour_ci95_high']:.1f}]",
                flush=True,
            )

    summary["wallclock_seconds_total"] = round(time.perf_counter() - overall_start, 2)

    # Write outputs
    results_df = pd.DataFrame(all_rep_rows)
    results_df = _order_results_columns(results_df)
    # Edges/loaders that are absent in some scenarios (e.g. closed or unconstrained
    # in that scenario) appear as NaN; fill with 0 for legibility while keeping
    # the union of columns across scenarios.
    fill_cols = [c for c in results_df.columns if c.startswith(("edge_", "loader_"))]
    results_df[fill_cols] = results_df[fill_cols].fillna(0.0)
    results_path = output_dir / "results.csv"
    results_df.to_csv(results_path, index=False)

    summary_path = output_dir / "summary.json"
    with summary_path.open("w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2, default=_json_default)

    event_log_df = pd.DataFrame(event_log_rows)
    event_log_path = output_dir / "event_log.csv"
    if not event_log_df.empty:
        event_log_df = _order_event_log_columns(event_log_df)
    event_log_df.to_csv(event_log_path, index=False)

    if verbose:
        print(f"\nWrote {results_path}")
        print(f"Wrote {summary_path}")
        print(f"Wrote {event_log_path} ({len(event_log_df)} rows)")

    return summary


def _with_warmup_override(scenario: dict[str, Any], warmup_minutes: float) -> dict[str, Any]:
    """Return a scenario copy with simulation.warmup_minutes overridden (immutable)."""
    new_scenario = dict(scenario)
    new_sim = dict(scenario.get("simulation", {}))
    new_sim["warmup_minutes"] = float(warmup_minutes)
    new_scenario["simulation"] = new_sim
    return new_scenario


def _flatten_results_row(rep_metrics: dict[str, Any]) -> dict[str, Any]:
    """Flatten dict-valued metrics into named columns for the results CSV."""
    row: dict[str, Any] = {
        "scenario_id": rep_metrics["scenario_id"],
        "replication": rep_metrics["replication"],
        "random_seed": rep_metrics["random_seed"],
        "total_tonnes_delivered": rep_metrics["total_tonnes_delivered"],
        "tonnes_per_hour": rep_metrics["tonnes_per_hour"],
        "average_truck_cycle_time_min": rep_metrics["average_truck_cycle_time_min"],
        "average_truck_utilisation": rep_metrics["average_truck_utilisation"],
        "crusher_utilisation": rep_metrics["crusher_utilisation"],
        "average_loader_queue_time_min": rep_metrics["average_loader_queue_time_min"],
        "average_crusher_queue_time_min": rep_metrics["average_crusher_queue_time_min"],
        "completed_cycles": rep_metrics.get("completed_cycles", 0),
    }
    for lid, util in rep_metrics.get("loader_utilisation", {}).items():
        row[f"loader_{lid}_utilisation"] = util
    for lid, qt in rep_metrics.get("loader_queue_time_min_by_loader", {}).items():
        row[f"loader_{lid}_queue_time_min"] = qt
    for eid, qt in rep_metrics.get("edge_queue_time_min_by_edge", {}).items():
        row[f"edge_{eid}_queue_time_min"] = qt
    for eid, util in rep_metrics.get("edge_utilisation_by_edge", {}).items():
        row[f"edge_{eid}_utilisation"] = util
    return row


_PRIMARY_RESULT_COLUMNS = [
    "scenario_id",
    "replication",
    "random_seed",
    "total_tonnes_delivered",
    "tonnes_per_hour",
    "average_truck_cycle_time_min",
    "average_truck_utilisation",
    "crusher_utilisation",
    "average_loader_queue_time_min",
    "average_crusher_queue_time_min",
    "completed_cycles",
]


def _order_results_columns(df: pd.DataFrame) -> pd.DataFrame:
    cols = [c for c in _PRIMARY_RESULT_COLUMNS if c in df.columns]
    other = sorted(c for c in df.columns if c not in cols)
    return df[cols + other]


_EVENT_LOG_COLUMNS = [
    "time_min",
    "replication",
    "scenario_id",
    "truck_id",
    "event_type",
    "from_node",
    "to_node",
    "location",
    "loaded",
    "payload_tonnes",
    "resource_id",
    "queue_length",
]


def _order_event_log_columns(df: pd.DataFrame) -> pd.DataFrame:
    cols = [c for c in _EVENT_LOG_COLUMNS if c in df.columns]
    other = [c for c in df.columns if c not in cols]
    return df[cols + other]


def _json_default(obj: Any) -> Any:
    """Make numpy types JSON-serialisable."""
    try:
        import numpy as np

        if isinstance(obj, (np.integer,)):
            return int(obj)
        if isinstance(obj, (np.floating,)):
            return float(obj)
        if isinstance(obj, (np.ndarray,)):
            return obj.tolist()
    except Exception:
        pass
    if hasattr(obj, "item"):
        try:
            return obj.item()
        except Exception:
            pass
    return str(obj)