src/experiment.py

← Back to submission · View raw on GitHub

"""Run multi-scenario, multi-replication experiments and produce outputs."""
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
from scipy import stats as scistats

from .scenario import load_scenario
from .simulation import load_input_dataframes, run_replication


# ---------- Statistics ---------------------------------------------------------


def ci95(values: List[float]) -> Dict[str, float]:
    """95% confidence interval for the mean using a t-distribution."""
    arr = np.asarray(values, dtype=float)
    n = arr.size
    if n == 0:
        return {"mean": 0.0, "ci95_low": 0.0, "ci95_high": 0.0, "n": 0}
    mean = float(arr.mean())
    if n == 1:
        return {"mean": mean, "ci95_low": mean, "ci95_high": mean, "n": 1}
    sem = float(arr.std(ddof=1) / np.sqrt(n))
    t_crit = float(scistats.t.ppf(0.975, df=n - 1))
    half = t_crit * sem
    return {"mean": mean, "ci95_low": mean - half, "ci95_high": mean + half, "n": n}


def aggregate_scenario(replication_results: List[Dict[str, Any]]) -> Dict[str, Any]:
    if not replication_results:
        return {}
    tonnes = [r["total_tonnes_delivered"] for r in replication_results]
    tph = [r["tonnes_per_hour"] for r in replication_results]
    cycle = [r["average_truck_cycle_time_min"] for r in replication_results]
    truck_util = [r["average_truck_utilisation"] for r in replication_results]
    crusher_util = [r["crusher_utilisation"] for r in replication_results]
    loader_q = [r["average_loader_queue_time_min"] for r in replication_results]
    crusher_q = [r["average_crusher_queue_time_min"] for r in replication_results]
    cycles_completed = [r["cycles_completed"] for r in replication_results]

    # Per-loader utilisation: average across reps.
    loader_ids = sorted({lid for r in replication_results for lid in r["loader_utilisation"].keys()})
    loader_util_mean: Dict[str, float] = {}
    for lid in loader_ids:
        loader_util_mean[lid] = float(np.mean([r["loader_utilisation"].get(lid, 0.0)
                                               for r in replication_results]))

    lane_ids = sorted({lid for r in replication_results for lid in r["lane_utilisation"].keys()})
    lane_util_mean: Dict[str, float] = {}
    lane_queue_mean: Dict[str, float] = {}
    for lid in lane_ids:
        lane_util_mean[lid] = float(np.mean([r["lane_utilisation"].get(lid, 0.0)
                                              for r in replication_results]))
        lane_queue_mean[lid] = float(np.mean([r["lane_queue_wait_min"].get(lid, 0.0)
                                               for r in replication_results]))

    tonnes_ci = ci95(tonnes)
    tph_ci = ci95(tph)
    cycle_ci = ci95(cycle)
    truck_util_ci = ci95(truck_util)
    crusher_util_ci = ci95(crusher_util)
    loader_q_ci = ci95(loader_q)
    crusher_q_ci = ci95(crusher_q)

    # Bottleneck heuristic: highest mean utilisation among (loaders, crusher, lanes).
    bottlenecks: List[Dict[str, Any]] = []
    for lid, u in loader_util_mean.items():
        bottlenecks.append({"resource": f"loader:{lid}", "type": "loader", "utilisation": u})
    bottlenecks.append({"resource": "crusher", "type": "crusher", "utilisation": float(crusher_util_ci["mean"])})
    for lid, u in lane_util_mean.items():
        bottlenecks.append({"resource": f"lane:{lid}", "type": "lane", "utilisation": u,
                            "queue_wait_mean_min": lane_queue_mean.get(lid, 0.0)})
    bottlenecks.sort(key=lambda b: b["utilisation"], reverse=True)
    top_bottlenecks = bottlenecks[:5]

    return {
        "scenario_id": replication_results[0]["scenario_id"],
        "replications": len(replication_results),
        "shift_length_hours": replication_results[0]["shift_minutes"] / 60.0,
        "truck_count": replication_results[0]["truck_count"],
        "total_tonnes_mean": tonnes_ci["mean"],
        "total_tonnes_ci95_low": tonnes_ci["ci95_low"],
        "total_tonnes_ci95_high": tonnes_ci["ci95_high"],
        "tonnes_per_hour_mean": tph_ci["mean"],
        "tonnes_per_hour_ci95_low": tph_ci["ci95_low"],
        "tonnes_per_hour_ci95_high": tph_ci["ci95_high"],
        "average_cycle_time_min": cycle_ci["mean"],
        "average_cycle_time_ci95_low": cycle_ci["ci95_low"],
        "average_cycle_time_ci95_high": cycle_ci["ci95_high"],
        "truck_utilisation_mean": truck_util_ci["mean"],
        "truck_utilisation_ci95_low": truck_util_ci["ci95_low"],
        "truck_utilisation_ci95_high": truck_util_ci["ci95_high"],
        "loader_utilisation": loader_util_mean,
        "crusher_utilisation": crusher_util_ci["mean"],
        "crusher_utilisation_ci95_low": crusher_util_ci["ci95_low"],
        "crusher_utilisation_ci95_high": crusher_util_ci["ci95_high"],
        "average_loader_queue_time_min": loader_q_ci["mean"],
        "average_loader_queue_time_ci95_low": loader_q_ci["ci95_low"],
        "average_loader_queue_time_ci95_high": loader_q_ci["ci95_high"],
        "average_crusher_queue_time_min": crusher_q_ci["mean"],
        "average_crusher_queue_time_ci95_low": crusher_q_ci["ci95_low"],
        "average_crusher_queue_time_ci95_high": crusher_q_ci["ci95_high"],
        "lane_utilisation": lane_util_mean,
        "lane_queue_wait_min": lane_queue_mean,
        "cycles_completed_mean": float(np.mean(cycles_completed)),
        "top_bottlenecks": top_bottlenecks,
    }


# ---------- Experiment runner --------------------------------------------------


def run_scenario(
    *,
    scenario_id: str,
    data_dir: Path,
    scenario_dir: Path,
    capture_event_log_reps: Optional[List[int]] = None,
) -> Dict[str, Any]:
    """Run all replications for a scenario.

    Returns the aggregated summary plus the list of replication-level results
    and a concatenated event log for the captured replications.
    """
    scenario = load_scenario(scenario_dir, scenario_id)
    nodes, edges, trucks, loaders, dump_points = load_input_dataframes(data_dir)

    sim_cfg = scenario.get("simulation", {})
    replications = int(sim_cfg.get("replications", 30))
    base_seed = int(sim_cfg.get("base_random_seed", 12345))

    if capture_event_log_reps is None:
        capture_event_log_reps = [0]

    rep_results: List[Dict[str, Any]] = []
    event_log_rows: List[Dict[str, Any]] = []

    for rep in range(replications):
        capture = rep in capture_event_log_reps
        result = run_replication(
            scenario=scenario,
            nodes=nodes, edges=edges,
            truck_records=trucks,
            loader_records=loaders,
            dump_records=dump_points,
            replication_index=rep,
            base_seed=base_seed,
            capture_event_log=capture,
        )
        if capture and result.get("event_log"):
            event_log_rows.extend(result["event_log"])
        # Drop event_log from the lightweight rep result for memory.
        result_lite = {k: v for k, v in result.items() if k != "event_log"}
        rep_results.append(result_lite)

    summary = aggregate_scenario(rep_results)
    return {
        "scenario_id": scenario_id,
        "summary": summary,
        "replications": rep_results,
        "event_log_rows": event_log_rows,
    }


def replications_to_dataframe(rep_results: List[Dict[str, Any]]) -> pd.DataFrame:
    """Flatten replication-level results into a wide DataFrame."""
    rows = []
    for r in rep_results:
        row = {
            "scenario_id": r["scenario_id"],
            "replication": r["replication"],
            "random_seed": r["random_seed"],
            "shift_minutes": r["shift_minutes"],
            "truck_count": r["truck_count"],
            "total_tonnes_delivered": r["total_tonnes_delivered"],
            "tonnes_per_hour": r["tonnes_per_hour"],
            "average_truck_cycle_time_min": r["average_truck_cycle_time_min"],
            "average_truck_utilisation": r["average_truck_utilisation"],
            "crusher_utilisation": r["crusher_utilisation"],
            "average_loader_queue_time_min": r["average_loader_queue_time_min"],
            "average_crusher_queue_time_min": r["average_crusher_queue_time_min"],
            "cycles_completed": r["cycles_completed"],
        }
        for lid, u in r["loader_utilisation"].items():
            row[f"loader_util_{lid}"] = u
        for lid, q in r["loader_queue_waits"].items():
            row[f"loader_queue_min_{lid}"] = q
        for lid, u in r["lane_utilisation"].items():
            row[f"lane_util_{lid}"] = u
        for lid, q in r["lane_queue_wait_min"].items():
            row[f"lane_queue_min_{lid}"] = q
        rows.append(row)
    return pd.DataFrame(rows)


def write_outputs(
    *,
    out_dir: Path,
    scenarios: List[Dict[str, Any]],
    benchmark_id: str = "001_synthetic_mine_throughput",
    key_assumptions: List[str] | None = None,
    model_limitations: List[str] | None = None,
    additional_scenarios_proposed: List[str] | None = None,
) -> None:
    out_dir.mkdir(parents=True, exist_ok=True)

    all_reps: List[Dict[str, Any]] = []
    all_events: List[Dict[str, Any]] = []
    summary_dict: Dict[str, Any] = {
        "benchmark_id": benchmark_id,
        "scenarios": {},
        "key_assumptions": list(key_assumptions or []),
        "model_limitations": list(model_limitations or []),
        "additional_scenarios_proposed": list(additional_scenarios_proposed or []),
    }
    for s in scenarios:
        summary_dict["scenarios"][s["scenario_id"]] = s["summary"]
        all_reps.extend(s["replications"])
        all_events.extend(s["event_log_rows"])

    df = replications_to_dataframe(all_reps)
    df.to_csv(out_dir / "results.csv", index=False)

    with (out_dir / "summary.json").open("w") as fh:
        json.dump(summary_dict, fh, indent=2, default=_json_default)

    # Event log
    event_columns = [
        "time_min", "replication", "scenario_id", "truck_id", "event_type",
        "from_node", "to_node", "location", "loaded", "payload_tonnes",
        "resource_id", "queue_length",
    ]
    if all_events:
        ev_df = pd.DataFrame(all_events)
        for c in event_columns:
            if c not in ev_df.columns:
                ev_df[c] = ""
        ev_df = ev_df[event_columns]
    else:
        ev_df = pd.DataFrame(columns=event_columns)
    ev_df.to_csv(out_dir / "event_log.csv", index=False)


def _json_default(obj):
    if isinstance(obj, (np.floating,)):
        return float(obj)
    if isinstance(obj, (np.integer,)):
        return int(obj)
    if isinstance(obj, (np.ndarray,)):
        return obj.tolist()
    raise TypeError(f"Not serialisable: {type(obj)}")