"""Cross-replication KPI aggregation with Student-t 95% confidence intervals.
Where :mod:`mine_sim.metrics` produces one frozen
:class:`~mine_sim.metrics.ReplicationMetrics` per shift, this module
collapses a list of them into a single :class:`ScenarioSummary` carrying the
mean and Student-t (n-1) 95% CI for every reported KPI.
Design contracts:
* CIs are ``mean +/- t_{n-1, 0.975} * s / sqrt(n)`` via
:func:`scipy.stats.t.ppf`. Degenerate cases (n < 2 or zero variance)
collapse the half-width to zero (both bounds equal the mean) rather than
producing NaN, so JSON serialisation stays numeric.
* ``top_bottlenecks`` is ranked by the composite score
``mean(utilisation) * mean(queue_wait_min)``. Loaders, the crusher, and
every capacity-1 edge participate in one ranking.
* All return types are immutable so a summary is safe to share across
writers.
* No I/O here โ callers pass already-loaded records and get a dataclass back.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from types import MappingProxyType
from typing import Iterable, Mapping, Sequence
from scipy import stats # type: ignore[import-untyped]
from mine_sim.metrics import ReplicationMetrics
from mine_sim.runner import ReplicationResult
#: Two-sided confidence level.
DEFAULT_CONFIDENCE_LEVEL: float = 0.95
@dataclass(frozen=True)
class StatSummary:
"""Mean + Student-t CI bundle for one KPI series.
All numeric fields are guaranteed finite; degenerate cases yield
``ci95_low == ci95_high == mean`` rather than NaN.
"""
mean: float
ci95_low: float
ci95_high: float
std: float
n: int
@property
def half_width(self) -> float:
return self.ci95_high - self.mean
def _sample_std(values: Sequence[float]) -> float:
"""Sample standard deviation with n-1 degrees of freedom."""
n = len(values)
if n < 2:
return 0.0
mean = sum(values) / n
sse = sum((v - mean) ** 2 for v in values)
return math.sqrt(sse / (n - 1))
def student_t_ci_95(
values: Sequence[float],
*,
confidence: float = DEFAULT_CONFIDENCE_LEVEL,
) -> StatSummary:
"""Return mean and Student-t (n-1) two-sided CI for ``values``.
* n == 0 -> all-zero summary (avoids NaN in JSON).
* n == 1 or zero variance -> half-width 0 (``mean +/- 0``).
* Otherwise uses :func:`scipy.stats.t.ppf` for the critical value.
"""
if not 0.0 < confidence < 1.0:
raise ValueError(
f"confidence must be strictly between 0 and 1, got {confidence}"
)
n = len(values)
if n == 0:
return StatSummary(mean=0.0, ci95_low=0.0, ci95_high=0.0, std=0.0, n=0)
mean = sum(values) / n
if n < 2:
return StatSummary(mean=mean, ci95_low=mean, ci95_high=mean, std=0.0, n=n)
std = _sample_std(values)
if std == 0.0:
return StatSummary(mean=mean, ci95_low=mean, ci95_high=mean, std=0.0, n=n)
alpha = 1.0 - confidence
t_crit = float(stats.t.ppf(1.0 - alpha / 2.0, df=n - 1))
half_width = t_crit * std / math.sqrt(n)
return StatSummary(
mean=mean,
ci95_low=mean - half_width,
ci95_high=mean + half_width,
std=std,
n=n,
)
@dataclass(frozen=True)
class BottleneckEntry:
"""One row of the ``top_bottlenecks`` ranking.
``composite_score = utilisation_mean * mean_queue_wait_min``.
"""
resource_id: str
resource_kind: str # "loader" | "crusher" | "edge"
utilisation_mean: float
mean_queue_wait_min: float
composite_score: float
@dataclass(frozen=True)
class LoaderSummary:
loader_id: str
utilisation: StatSummary
mean_queue_wait_min: StatSummary
services_completed: StatSummary
@dataclass(frozen=True)
class CrusherSummary:
dump_id: str
utilisation: StatSummary
mean_queue_wait_min: StatSummary
services_completed: StatSummary
@dataclass(frozen=True)
class EdgeSummary:
edge_id: str
utilisation: StatSummary
mean_queue_wait_min: StatSummary
mean_traversal_time_min: StatSummary
traversal_count: StatSummary
@dataclass(frozen=True)
class ScenarioSummary:
"""Cross-replication KPI summary for one scenario."""
scenario_id: str
replications: int
shift_length_hours: float
total_tonnes_delivered: StatSummary
tonnes_per_hour: StatSummary
average_truck_cycle_time_min: StatSummary
average_truck_utilisation: StatSummary
crusher_utilisation: StatSummary
average_loader_queue_time_min: StatSummary
average_crusher_queue_time_min: StatSummary
loaders: Mapping[str, LoaderSummary]
crusher: CrusherSummary
edges: Mapping[str, EdgeSummary]
top_bottlenecks: tuple[BottleneckEntry, ...]
@dataclass(frozen=True)
class RunSummary:
"""A flat collection of :class:`ScenarioSummary` keyed by scenario_id."""
scenarios: Mapping[str, ScenarioSummary]
@property
def scenario_ids(self) -> tuple[str, ...]:
return tuple(self.scenarios.keys())
# ---------------------------------------------------------------------------
# Aggregation helpers
# ---------------------------------------------------------------------------
def _coerce_metrics(
reps: Sequence[ReplicationMetrics] | Sequence[ReplicationResult],
) -> tuple[ReplicationMetrics, ...]:
"""Accept either ``ReplicationMetrics`` or ``ReplicationResult``."""
out: list[ReplicationMetrics] = []
for rep in reps:
if isinstance(rep, ReplicationMetrics):
out.append(rep)
elif hasattr(rep, "metrics") and isinstance(rep.metrics, ReplicationMetrics):
out.append(rep.metrics)
else:
raise TypeError(
"Aggregation expected ReplicationMetrics or ReplicationResult, "
f"got {type(rep).__name__}"
)
return tuple(out)
def _series(values: Iterable[float]) -> StatSummary:
return student_t_ci_95(tuple(float(v) for v in values))
def _validate_homogeneous_scenario(reps: Sequence[ReplicationMetrics]) -> str:
if not reps:
raise ValueError("Cannot aggregate an empty replication sequence.")
scenario_ids = {rep.scenario_id for rep in reps}
if len(scenario_ids) != 1:
raise ValueError(
"All replications passed to aggregate_scenario must share one "
f"scenario_id; got {sorted(scenario_ids)}"
)
return next(iter(scenario_ids))
def _resource_id_universe(
reps: Sequence[ReplicationMetrics],
attr: str,
) -> tuple[str, ...]:
"""Stable sorted union of resource ids across replications."""
ids: set[str] = set()
for rep in reps:
ids.update(getattr(rep, attr).keys())
return tuple(sorted(ids))
def _loader_series(reps, loader_id, extractor) -> StatSummary:
return _series(
extractor(rep.loaders[loader_id]) for rep in reps if loader_id in rep.loaders
)
def _edge_series(reps, edge_id, extractor) -> StatSummary:
return _series(
extractor(rep.edges[edge_id]) for rep in reps if edge_id in rep.edges
)
def _build_loader_summaries(reps) -> dict[str, LoaderSummary]:
out: dict[str, LoaderSummary] = {}
for loader_id in _resource_id_universe(reps, "loaders"):
out[loader_id] = LoaderSummary(
loader_id=loader_id,
utilisation=_loader_series(reps, loader_id, lambda lm: lm.utilisation),
mean_queue_wait_min=_loader_series(
reps, loader_id, lambda lm: lm.mean_queue_wait_min
),
services_completed=_loader_series(
reps, loader_id, lambda lm: lm.services_completed
),
)
return out
def _build_edge_summaries(reps) -> dict[str, EdgeSummary]:
out: dict[str, EdgeSummary] = {}
for edge_id in _resource_id_universe(reps, "edges"):
out[edge_id] = EdgeSummary(
edge_id=edge_id,
utilisation=_edge_series(reps, edge_id, lambda em: em.utilisation),
mean_queue_wait_min=_edge_series(
reps, edge_id, lambda em: em.mean_queue_wait_min
),
mean_traversal_time_min=_edge_series(
reps, edge_id, lambda em: em.mean_traversal_time_min
),
traversal_count=_edge_series(reps, edge_id, lambda em: em.traversal_count),
)
return out
def _rank_bottlenecks(
loaders: Mapping[str, LoaderSummary],
crusher: CrusherSummary,
edges: Mapping[str, EdgeSummary],
top_n: int,
) -> tuple[BottleneckEntry, ...]:
"""Rank loaders + crusher + edges by composite score (util x queue wait)."""
entries: list[BottleneckEntry] = []
def add(resource_id: str, kind: str, util: float, wait: float) -> None:
entries.append(
BottleneckEntry(
resource_id=resource_id,
resource_kind=kind,
utilisation_mean=util,
mean_queue_wait_min=wait,
composite_score=util * wait,
)
)
for loader_id, summary in loaders.items():
add(loader_id, "loader", summary.utilisation.mean, summary.mean_queue_wait_min.mean)
add(
crusher.dump_id,
"crusher",
crusher.utilisation.mean,
crusher.mean_queue_wait_min.mean,
)
for edge_id, summary in edges.items():
add(edge_id, "edge", summary.utilisation.mean, summary.mean_queue_wait_min.mean)
# Sort: composite desc, then util desc, then id asc for full determinism.
entries.sort(key=lambda b: (-b.composite_score, -b.utilisation_mean, b.resource_id))
return tuple(entries[: max(0, int(top_n))])
def aggregate_scenario(
replications: Sequence[ReplicationMetrics] | Sequence[ReplicationResult],
*,
top_bottleneck_count: int = 5,
) -> ScenarioSummary:
"""Aggregate one scenario's replications into a :class:`ScenarioSummary`."""
reps = _coerce_metrics(replications)
scenario_id = _validate_homogeneous_scenario(reps)
shift_length_hours = reps[0].shift_length_min / 60.0
crusher_util = _series(rep.crusher.utilisation for rep in reps)
crusher_queue = _series(rep.average_crusher_queue_time_min for rep in reps)
loaders = _build_loader_summaries(reps)
edges = _build_edge_summaries(reps)
crusher = CrusherSummary(
dump_id=reps[0].crusher.dump_id,
utilisation=crusher_util,
mean_queue_wait_min=crusher_queue,
services_completed=_series(rep.crusher.services_completed for rep in reps),
)
return ScenarioSummary(
scenario_id=scenario_id,
replications=len(reps),
shift_length_hours=shift_length_hours,
total_tonnes_delivered=_series(rep.total_tonnes_delivered for rep in reps),
tonnes_per_hour=_series(rep.tonnes_per_hour for rep in reps),
average_truck_cycle_time_min=_series(
rep.average_truck_cycle_time_min for rep in reps
),
average_truck_utilisation=_series(
rep.average_truck_utilisation for rep in reps
),
crusher_utilisation=crusher_util,
average_loader_queue_time_min=_series(
rep.average_loader_queue_time_min for rep in reps
),
average_crusher_queue_time_min=crusher_queue,
loaders=MappingProxyType(loaders),
crusher=crusher,
edges=MappingProxyType(edges),
top_bottlenecks=_rank_bottlenecks(loaders, crusher, edges, top_bottleneck_count),
)
def aggregate_run(
run: Mapping[str, Sequence[ReplicationMetrics] | Sequence[ReplicationResult]],
*,
top_bottleneck_count: int = 5,
) -> RunSummary:
"""Aggregate a multi-scenario run into a :class:`RunSummary`."""
summaries: dict[str, ScenarioSummary] = {}
for scenario_id, reps in run.items():
summary = aggregate_scenario(reps, top_bottleneck_count=top_bottleneck_count)
if summary.scenario_id != scenario_id:
raise ValueError(
f"Scenario id mismatch: key '{scenario_id}' vs metrics "
f"'{summary.scenario_id}'. Refusing to mis-key the summary."
)
summaries[scenario_id] = summary
return RunSummary(scenarios=MappingProxyType(summaries))
__all__ = [
"BottleneckEntry",
"CrusherSummary",
"DEFAULT_CONFIDENCE_LEVEL",
"EdgeSummary",
"LoaderSummary",
"RunSummary",
"ScenarioSummary",
"StatSummary",
"aggregate_run",
"aggregate_scenario",
"student_t_ci_95",
]
src/mine_sim/aggregate.py
โ Back to submission ยท View raw on GitHub