src/analysis.py

← Back to submission · View raw on GitHub

"""Analysis: confidence intervals, bottleneck identification, output writers."""

from __future__ import annotations

import csv
import json
import math
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd
from scipy import stats


# Columns we expose in results.csv (top-level, scalar). Per-resource details
# live in summary.json; trying to flatten them into CSV produces a pile of
# scenario-specific columns nobody can read.
RESULTS_CSV_COLUMNS = [
    "scenario_id",
    "replication",
    "random_seed",
    "truck_count",
    "shift_length_hours",
    "total_tonnes_delivered",
    "tonnes_per_hour",
    "average_truck_cycle_time_min",
    "average_truck_utilisation",
    "crusher_utilisation",
    "loader_utilisation_L_N",
    "loader_utilisation_L_S",
    "average_loader_queue_time_min",
    "average_crusher_queue_time_min",
    "dump_events",
]


def _ci95(values: np.ndarray) -> tuple[float, float]:
    """Two-sided 95% CI via Student's t-distribution."""
    arr = np.asarray(values, dtype=float)
    arr = arr[np.isfinite(arr)]
    if len(arr) < 2:
        m = float(arr.mean()) if len(arr) else float("nan")
        return m, m
    mean = float(arr.mean())
    sem = float(stats.sem(arr))
    h = float(stats.t.ppf(0.975, df=len(arr) - 1)) * sem
    return mean - h, mean + h


def flatten_loader_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Pull the dict in ``loader_utilisation`` into per-loader columns."""
    df = df.copy()
    if "loader_utilisation" not in df.columns:
        return df
    loaders = sorted({k for d in df["loader_utilisation"] for k in d.keys()})
    for lid in loaders:
        df[f"loader_utilisation_{lid}"] = df["loader_utilisation"].apply(
            lambda d: d.get(lid, float("nan"))
        )
    return df


def write_results_csv(results_df: pd.DataFrame, path: Path) -> None:
    """Write ``results.csv`` with one row per (scenario, replication)."""
    df = flatten_loader_columns(results_df)
    cols = [c for c in RESULTS_CSV_COLUMNS if c in df.columns]
    df[cols].to_csv(path, index=False)


def write_event_log_csv(events: list[dict[str, Any]], path: Path) -> None:
    """Write the concatenated event log."""
    if not events:
        path.write_text("", encoding="utf-8")
        return
    cols = [
        "time_min", "replication", "scenario_id", "truck_id", "event_type",
        "from_node", "to_node", "location", "loaded", "payload_tonnes",
        "resource_id", "queue_length",
    ]
    with path.open("w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=cols)
        w.writeheader()
        for e in events:
            row = {c: e.get(c) for c in cols}
            w.writerow(row)


def identify_bottlenecks(per_resource_stats_list: list[dict[str, Any]],
                         top_n: int = 3) -> list[dict[str, Any]]:
    """Identify the resources that, on average across replications, had the
    longest queueing waits.

    Returns a list of dicts ordered by descending mean queue wait, including
    loaders, edges, and the crusher.
    """
    accum: dict[tuple[str, str], list[float]] = {}

    for prs in per_resource_stats_list:
        for lid, s in prs.get("loaders", {}).items():
            accum.setdefault(("loader", lid), []).append(s["mean_queue_wait_min"])
        c = prs.get("crusher", {})
        if c:
            accum.setdefault(("crusher", "D_CRUSH"), []).append(c["mean_queue_wait_min"])
        for eid, s in prs.get("edges", {}).items():
            accum.setdefault(("edge", eid), []).append(s["mean_queue_wait_min"])

    rows = []
    for (kind, rid), vals in accum.items():
        arr = np.array(vals, dtype=float)
        rows.append({
            "resource_kind": kind,
            "resource_id": rid,
            "mean_queue_wait_min": float(arr.mean()),
            "max_queue_wait_min": float(arr.max()),
        })
    rows.sort(key=lambda r: r["mean_queue_wait_min"], reverse=True)
    return rows[:top_n]


def summarise_scenario(scenario_id: str, df: pd.DataFrame) -> dict[str, Any]:
    """Compute mean + 95% CI per metric for one scenario's replications."""
    df = df[df["scenario_id"] == scenario_id]
    n = len(df)
    if n == 0:
        return {}

    tonnes = df["total_tonnes_delivered"].to_numpy(dtype=float)
    tph = df["tonnes_per_hour"].to_numpy(dtype=float)

    tonnes_lo, tonnes_hi = _ci95(tonnes)
    tph_lo, tph_hi = _ci95(tph)

    # Loader utilisation: mean across reps, per loader.
    loader_keys = sorted({k for d in df["loader_utilisation"] for k in d.keys()})
    loader_util = {
        lid: float(np.mean([d.get(lid, float("nan")) for d in df["loader_utilisation"]]))
        for lid in loader_keys
    }

    bottlenecks = identify_bottlenecks(df["per_resource_queue_stats"].tolist())

    return {
        "replications": int(n),
        "shift_length_hours": float(df["shift_length_hours"].iloc[0]),
        "truck_count": int(df["truck_count"].iloc[0]),
        "total_tonnes_mean": float(tonnes.mean()),
        "total_tonnes_ci95_low": float(tonnes_lo),
        "total_tonnes_ci95_high": float(tonnes_hi),
        "total_tonnes_std": float(tonnes.std(ddof=1)) if n > 1 else 0.0,
        "tonnes_per_hour_mean": float(tph.mean()),
        "tonnes_per_hour_ci95_low": float(tph_lo),
        "tonnes_per_hour_ci95_high": float(tph_hi),
        "average_cycle_time_min": float(df["average_truck_cycle_time_min"].mean()),
        "truck_utilisation_mean": float(df["average_truck_utilisation"].mean()),
        "loader_utilisation": loader_util,
        "crusher_utilisation": float(df["crusher_utilisation"].mean()),
        "average_loader_queue_time_min": float(df["average_loader_queue_time_min"].mean()),
        "average_crusher_queue_time_min": float(df["average_crusher_queue_time_min"].mean()),
        "top_bottlenecks": bottlenecks,
    }


def build_summary(results_df: pd.DataFrame,
                  scenarios: list[str],
                  *,
                  benchmark_id: str,
                  key_assumptions: list[str],
                  model_limitations: list[str],
                  additional_scenarios_proposed: list[str] | None = None) -> dict[str, Any]:
    """Build the full ``summary.json`` payload."""
    return {
        "benchmark_id": benchmark_id,
        "scenarios": {
            sid: summarise_scenario(sid, results_df)
            for sid in scenarios
        },
        "key_assumptions": key_assumptions,
        "model_limitations": model_limitations,
        "additional_scenarios_proposed": additional_scenarios_proposed or [],
    }


def write_summary_json(summary: dict[str, Any], path: Path) -> None:
    path.write_text(json.dumps(summary, indent=2, default=_json_default), encoding="utf-8")


def _json_default(o: Any) -> Any:
    if isinstance(o, (np.floating, np.integer)):
        return o.item()
    if isinstance(o, np.ndarray):
        return o.tolist()
    if isinstance(o, float) and not math.isfinite(o):
        return None
    raise TypeError(f"Cannot serialise type {type(o)} ({o!r})")


def behavioural_self_check(summary: dict[str, Any]) -> list[dict[str, Any]]:
    """Reproduce the harness behavioural checks so we see them at run time."""
    sc = summary.get("scenarios", {})

    def m(sid: str) -> float | None:
        v = sc.get(sid, {}).get("total_tonnes_mean")
        return float(v) if v is not None else None

    checks: list[dict[str, Any]] = []
    pairs = [
        ("trucks_12_gt_trucks_4",
         lambda: m("trucks_12") is not None and m("trucks_4") is not None
                 and m("trucks_12") > m("trucks_4"),
         "Higher fleet should usually outperform lower fleet."),
        ("baseline_gt_trucks_4",
         lambda: m("baseline") is not None and m("trucks_4") is not None
                 and m("baseline") > m("trucks_4"),
         "Baseline should usually outperform 4-truck case."),
        ("ramp_upgrade_ge_baseline",
         lambda: m("ramp_upgrade") is not None and m("baseline") is not None
                 and m("ramp_upgrade") >= 0.95 * m("baseline"),
         "Ramp upgrade should usually improve or maintain throughput."),
        ("crusher_slowdown_lt_baseline",
         lambda: m("crusher_slowdown") is not None and m("baseline") is not None
                 and m("crusher_slowdown") < m("baseline"),
         "Slower crusher should usually reduce throughput."),
        ("ramp_closed_le_baseline",
         lambda: m("ramp_closed") is not None and m("baseline") is not None
                 and m("ramp_closed") <= 1.05 * m("baseline"),
         "Ramp closure should usually not improve throughput."),
    ]
    for name, predicate, desc in pairs:
        try:
            ok = bool(predicate())
        except Exception:
            ok = False
        checks.append({"name": name, "passed": ok, "description": desc})

    if all(m(s) is not None for s in ("trucks_4", "baseline", "trucks_12")):
        increase_4_to_8 = m("baseline") - m("trucks_4")
        increase_8_to_12 = m("trucks_12") - m("baseline")
        sat = increase_8_to_12 <= 1.25 * increase_4_to_8
    else:
        sat = False
    checks.append({
        "name": "truck_count_saturation_plausible",
        "passed": bool(sat),
        "description": "Throughput should show some saturation as trucks increase.",
    })
    return checks