"""Aggregate per-replication and per-scenario metrics from SimResult objects."""
from __future__ import annotations
from statistics import mean
from typing import Any
import numpy as np
from scipy import stats
from .simulation import SimResult
def _safe_mean(values: list[float]) -> float:
return float(mean(values)) if values else 0.0
def _ci95(values: list[float]) -> tuple[float, float]:
"""Return 95% confidence interval (lo, hi) for a list of replication values."""
n = len(values)
if n == 0:
return (0.0, 0.0)
if n == 1:
v = float(values[0])
return (v, v)
arr = np.asarray(values, dtype=float)
m = float(np.mean(arr))
sem = float(stats.sem(arr))
if sem == 0.0:
return (m, m)
lo, hi = stats.t.interval(0.95, df=n - 1, loc=m, scale=sem)
return (float(lo), float(hi))
def _waits_after(
waits: list[tuple[float, float]], threshold: float
) -> list[float]:
"""Return only the wait durations whose request_time >= ``threshold``."""
return [w for (req_t, w) in waits if req_t >= threshold]
def aggregate_replication(sim_result: SimResult) -> dict[str, Any]:
"""Compute per-replication KPIs from a SimResult.
Strict shift + warmup counting:
- Only cycles with ``warmup_minutes <= dump_start < shift_minutes`` contribute
to delivered tonnes (a cycle started in the warmup window is excluded).
- ``tonnes_per_hour`` denominator is the post-warmup window
``(shift_minutes - warmup_minutes) / 60``.
- Loader/crusher/edge queue means use only requests issued at or after the
warmup boundary.
- Utilisation accumulators are already gated to the post-warmup window
inside the simulation's _ResourceTracker.
"""
shift_min = sim_result.shift_minutes
warmup_min = sim_result.warmup_minutes
measurement_min = max(0.0, shift_min - warmup_min)
counted_cycles = [
c
for c in sim_result.cycles
if warmup_min <= c["dump_start"] < shift_min
]
total_tonnes = sum(c["payload_tonnes"] for c in counted_cycles)
tonnes_per_hour = (
total_tonnes / (measurement_min / 60.0) if measurement_min > 0 else 0.0
)
cycle_times = [c["cycle_time_min"] for c in counted_cycles]
avg_cycle_time = _safe_mean(cycle_times)
# Truck utilisation: per-truck busy ratio across the post-warmup window.
truck_utilisations: list[float] = []
if sim_result.truck_busy_min and measurement_min > 0:
for _tid, busy in sim_result.truck_busy_min.items():
ratio = busy / measurement_min
truck_utilisations.append(min(1.0, max(0.0, ratio)))
avg_truck_util = _safe_mean(truck_utilisations)
# Crusher utilisation.
crusher_util = (
sim_result.crusher_busy_min / measurement_min if measurement_min > 0 else 0.0
)
# Loader utilisations (per-loader).
loader_utils: dict[str, float] = {}
for lid, busy in sim_result.loader_busy_min.items():
loader_utils[lid] = busy / measurement_min if measurement_min > 0 else 0.0
# Loader queue waits, post-warmup only.
all_loader_waits: list[float] = []
loader_wait_means: dict[str, float] = {}
for lid, waits in sim_result.loader_queue_waits.items():
post_waits = _waits_after(waits, warmup_min)
all_loader_waits.extend(post_waits)
loader_wait_means[lid] = _safe_mean(post_waits)
avg_loader_queue_wait = _safe_mean(all_loader_waits)
avg_crusher_queue_wait = _safe_mean(
_waits_after(sim_result.crusher_queue_waits, warmup_min)
)
# Constrained edge queue waits (per edge), post-warmup only.
edge_wait_means: dict[str, float] = {}
for eid, waits in sim_result.edge_queue_waits.items():
edge_wait_means[eid] = _safe_mean(_waits_after(waits, warmup_min))
return {
"scenario_id": sim_result.scenario_id,
"replication": sim_result.replication,
"random_seed": sim_result.seed,
"warmup_minutes": float(warmup_min),
"measurement_minutes": float(measurement_min),
"total_tonnes_delivered": float(total_tonnes),
"tonnes_per_hour": float(tonnes_per_hour),
"average_truck_cycle_time_min": float(avg_cycle_time),
"average_truck_utilisation": float(avg_truck_util),
"crusher_utilisation": float(crusher_util),
"average_loader_queue_time_min": float(avg_loader_queue_wait),
"average_crusher_queue_time_min": float(avg_crusher_queue_wait),
"loader_utilisation": loader_utils,
"loader_queue_time_min_by_loader": loader_wait_means,
"edge_queue_time_min_by_edge": edge_wait_means,
"edge_utilisation_by_edge": {
eid: (busy / measurement_min if measurement_min > 0 else 0.0)
for eid, busy in sim_result.edge_busy_min.items()
},
"completed_cycles": int(len(counted_cycles)),
}
def aggregate_scenario(rep_results: list[dict[str, Any]]) -> dict[str, Any]:
"""Aggregate per-replication metrics into per-scenario summary statistics."""
if not rep_results:
return {}
scenario_id = rep_results[0]["scenario_id"]
replications = len(rep_results)
def col(name: str) -> list[float]:
return [float(r[name]) for r in rep_results]
tonnes = col("total_tonnes_delivered")
tph = col("tonnes_per_hour")
cycle = col("average_truck_cycle_time_min")
truck_u = col("average_truck_utilisation")
crusher_u = col("crusher_utilisation")
loader_q = col("average_loader_queue_time_min")
crusher_q = col("average_crusher_queue_time_min")
tonnes_lo, tonnes_hi = _ci95(tonnes)
tph_lo, tph_hi = _ci95(tph)
# Per-loader utilisation means
loader_ids: set[str] = set()
for r in rep_results:
loader_ids.update(r.get("loader_utilisation", {}).keys())
loader_util_mean = {
lid: _safe_mean(
[float(r.get("loader_utilisation", {}).get(lid, 0.0)) for r in rep_results]
)
for lid in sorted(loader_ids)
}
loader_queue_mean_by_id = {
lid: _safe_mean(
[
float(r.get("loader_queue_time_min_by_loader", {}).get(lid, 0.0))
for r in rep_results
]
)
for lid in sorted(loader_ids)
}
# Per-edge mean queue time and utilisation
edge_ids: set[str] = set()
for r in rep_results:
edge_ids.update(r.get("edge_queue_time_min_by_edge", {}).keys())
edge_queue_mean = {
eid: _safe_mean(
[
float(r.get("edge_queue_time_min_by_edge", {}).get(eid, 0.0))
for r in rep_results
]
)
for eid in sorted(edge_ids)
}
edge_util_mean = {
eid: _safe_mean(
[
float(r.get("edge_utilisation_by_edge", {}).get(eid, 0.0))
for r in rep_results
]
)
for eid in sorted(edge_ids)
}
# Top bottlenecks: rank constrained resources (loaders, crusher, constrained edges).
# Primary key utilisation, secondary key queue time, both descending. This puts
# steady-state binding constraints (e.g. 95% util crusher) ahead of pure startup
# transients (e.g. ramp queue caused by all trucks dispatching at t=0).
bottleneck_candidates: list[dict[str, Any]] = []
for lid, qt in loader_queue_mean_by_id.items():
bottleneck_candidates.append(
{
"resource_id": lid,
"mean_queue_time_min": float(qt),
"mean_utilisation": float(loader_util_mean.get(lid, 0.0)),
}
)
bottleneck_candidates.append(
{
"resource_id": "D_CRUSH",
"mean_queue_time_min": float(_safe_mean(crusher_q)),
"mean_utilisation": float(_safe_mean(crusher_u)),
}
)
for eid, qt in edge_queue_mean.items():
bottleneck_candidates.append(
{
"resource_id": eid,
"mean_queue_time_min": float(qt),
"mean_utilisation": float(edge_util_mean.get(eid, 0.0)),
}
)
bottleneck_candidates.sort(
key=lambda x: (x["mean_utilisation"], x["mean_queue_time_min"]),
reverse=True,
)
top_bottlenecks = bottleneck_candidates[:5]
return {
"scenario_id": scenario_id,
"replications": replications,
"shift_length_hours": float(rep_results[0].get("shift_length_hours", 8.0)),
"total_tonnes_mean": float(_safe_mean(tonnes)),
"total_tonnes_ci95_low": float(tonnes_lo),
"total_tonnes_ci95_high": float(tonnes_hi),
"tonnes_per_hour_mean": float(_safe_mean(tph)),
"tonnes_per_hour_ci95_low": float(tph_lo),
"tonnes_per_hour_ci95_high": float(tph_hi),
"average_cycle_time_min": float(_safe_mean(cycle)),
"truck_utilisation_mean": float(_safe_mean(truck_u)),
"loader_utilisation": loader_util_mean,
"crusher_utilisation": float(_safe_mean(crusher_u)),
"average_loader_queue_time_min": float(_safe_mean(loader_q)),
"average_crusher_queue_time_min": float(_safe_mean(crusher_q)),
"edge_mean_queue_time_min": edge_queue_mean,
"edge_mean_utilisation": edge_util_mean,
"top_bottlenecks": top_bottlenecks,
}
src/metrics.py
← Back to submission · View raw on GitHub