"""Per-replication metric accumulator and result dataclasses.
The simulation calls into :class:`MetricsRecorder` while it runs, then
asks for an immutable :class:`ReplicationMetrics` snapshot at the hard
shift cut. Aggregating across replications happens elsewhere; this
module is intentionally side-effect-free between calls.
Performance measures we track per replication, lifted directly from the
Seed contract and ``conceptual_model.md`` section 5:
* ``total_tonnes_delivered`` — payload * count(end_dump events at CRUSH
with time_min < shift_length).
* ``tonnes_per_hour`` — total_tonnes_delivered / shift_length_hours.
* ``average_truck_cycle_time_min`` — mean of completed cycle durations.
The first cycle uses ``dispatch -> end_dump``; subsequent cycles use
``end_dump -> end_dump``.
* ``average_truck_utilisation`` — mean across trucks of
``productive_busy_time / shift_length_min``.
* ``crusher_utilisation`` — D_CRUSH busy_time / shift_length_min.
* ``loader_utilisation`` per loader — busy_time / shift_length_min.
* ``average_loader_queue_time_min`` / ``average_crusher_queue_time_min``
— mean wait per service event.
* ``edge_metrics`` — for every capacity-1 edge: utilisation, mean
wait, mean traversal time, count, total wait time.
We deliberately do *not* compute confidence intervals here. CI math is a
cross-replication concern handled by ``aggregate.py`` (a later sub-AC).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from types import MappingProxyType
from typing import Mapping
# ---------------------------------------------------------------------------
# Per-truck running counters used during the simulation. Mutable internally
# but converted to a frozen snapshot when the rep ends.
# ---------------------------------------------------------------------------
class _TruckCounter:
"""Mutable per-truck accumulator (internal helper)."""
__slots__ = (
"truck_id",
"productive_busy_time",
"completed_cycle_count",
"completed_cycle_total_min",
"last_cycle_anchor",
)
def __init__(self, truck_id: str) -> None:
self.truck_id: str = truck_id
self.productive_busy_time: float = 0.0
self.completed_cycle_count: int = 0
self.completed_cycle_total_min: float = 0.0
# Time of the last "cycle anchor" — initially the dispatch time
# (set by the recorder), then updated to each end_dump.
self.last_cycle_anchor: float | None = None
# ---------------------------------------------------------------------------
# Per-resource running counters.
# ---------------------------------------------------------------------------
class _ResourceCounter:
"""Mutable per-resource accumulator (internal helper)."""
__slots__ = (
"resource_id",
"busy_time",
"wait_time_total",
"service_count",
"traversal_count",
)
def __init__(self, resource_id: str) -> None:
self.resource_id: str = resource_id
self.busy_time: float = 0.0
self.wait_time_total: float = 0.0
self.service_count: int = 0
self.traversal_count: int = 0
# ---------------------------------------------------------------------------
# Public, immutable result dataclasses
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class EdgeMetrics:
"""Per capacity-1 edge KPIs at end-of-shift."""
edge_id: str
utilisation: float
mean_queue_wait_min: float
mean_traversal_time_min: float
traversal_count: int
total_wait_time_min: float
@dataclass(frozen=True)
class LoaderMetrics:
"""Per loader KPIs at end-of-shift."""
loader_id: str
utilisation: float
mean_queue_wait_min: float
services_completed: int
@dataclass(frozen=True)
class CrusherMetrics:
"""Per crusher KPIs at end-of-shift."""
dump_id: str
utilisation: float
mean_queue_wait_min: float
services_completed: int
@dataclass(frozen=True)
class ReplicationMetrics:
"""Immutable snapshot of one rep's KPIs.
The runner returns this object; aggregation across reps walks a list
of these and computes Student-t CIs.
"""
scenario_id: str
replication_index: int
random_seed: int
shift_length_min: float
truck_count: int
total_tonnes_delivered: float
tonnes_per_hour: float
average_truck_cycle_time_min: float
average_truck_utilisation: float
crusher: CrusherMetrics
loaders: Mapping[str, LoaderMetrics]
edges: Mapping[str, EdgeMetrics]
average_loader_queue_time_min: float
average_crusher_queue_time_min: float
completed_dumps: int
# ---------------------------------------------------------------------------
# Recorder — the live object the simulation writes into
# ---------------------------------------------------------------------------
@dataclass
class MetricsRecorder:
"""Mutable recorder used during a single replication.
Construct via :func:`make_recorder`. The simulation calls the
``record_*`` methods; at end-of-shift the runner calls
:meth:`finalise` to obtain a :class:`ReplicationMetrics` snapshot.
Not thread-safe — but SimPy runs on a single greenlet-style scheduler,
so that is fine.
"""
scenario_id: str
replication_index: int
random_seed: int
shift_length_min: float
payload_tonnes: float
truck_ids: tuple[str, ...]
loader_ids: tuple[str, ...]
crusher_id: str
capacity_edge_ids: tuple[str, ...]
# Internal state
_trucks: dict[str, _TruckCounter] = field(default_factory=dict, init=False)
_loaders: dict[str, _ResourceCounter] = field(default_factory=dict, init=False)
_crusher: _ResourceCounter | None = field(default=None, init=False)
_edges: dict[str, _ResourceCounter] = field(default_factory=dict, init=False)
completed_dumps: int = field(default=0, init=False)
edge_traversal_total_min: dict[str, float] = field(default_factory=dict, init=False)
def __post_init__(self) -> None:
for tid in self.truck_ids:
self._trucks[tid] = _TruckCounter(tid)
for lid in self.loader_ids:
self._loaders[lid] = _ResourceCounter(lid)
self._crusher = _ResourceCounter(self.crusher_id)
for eid in self.capacity_edge_ids:
self._edges[eid] = _ResourceCounter(eid)
self.edge_traversal_total_min[eid] = 0.0
# ------------------------------------------------------------------
# Truck-level updates
# ------------------------------------------------------------------
def record_dispatch(self, truck_id: str, time_min: float) -> None:
truck = self._trucks[truck_id]
truck.last_cycle_anchor = time_min
def add_productive_time(self, truck_id: str, duration_min: float) -> None:
if duration_min <= 0:
return
self._trucks[truck_id].productive_busy_time += duration_min
def record_cycle_end(self, truck_id: str, time_min: float) -> None:
"""Called at every ``end_dump`` event before the post-shift cut.
The Seed defines the first cycle as ``dispatch -> end_dump`` and
every subsequent cycle as ``end_dump -> end_dump``. We use
``last_cycle_anchor`` for the start of the current cycle and reset
it to the current ``end_dump`` time afterwards.
"""
truck = self._trucks[truck_id]
anchor = truck.last_cycle_anchor
if anchor is None:
return
duration = time_min - anchor
if duration > 0:
truck.completed_cycle_count += 1
truck.completed_cycle_total_min += duration
truck.last_cycle_anchor = time_min
# ------------------------------------------------------------------
# Resource-level updates
# ------------------------------------------------------------------
def record_loader_service(
self,
loader_id: str,
wait_time_min: float,
service_time_min: float,
) -> None:
counter = self._loaders[loader_id]
counter.service_count += 1
counter.wait_time_total += max(0.0, wait_time_min)
counter.busy_time += max(0.0, service_time_min)
def record_crusher_service(
self,
wait_time_min: float,
service_time_min: float,
) -> None:
assert self._crusher is not None
self._crusher.service_count += 1
self._crusher.wait_time_total += max(0.0, wait_time_min)
self._crusher.busy_time += max(0.0, service_time_min)
def record_completed_dump(self, time_min: float, truck_id: str) -> None:
"""Credit one closed dump (only call when ``time_min < shift_length``)."""
if time_min >= self.shift_length_min:
return
self.completed_dumps += 1
def record_edge_traversal(
self,
edge_id: str,
wait_time_min: float,
traversal_time_min: float,
) -> None:
counter = self._edges.get(edge_id)
if counter is None:
return
counter.traversal_count += 1
counter.wait_time_total += max(0.0, wait_time_min)
counter.busy_time += max(0.0, traversal_time_min)
self.edge_traversal_total_min[edge_id] += max(0.0, traversal_time_min)
# ------------------------------------------------------------------
# Snapshot
# ------------------------------------------------------------------
def finalise(self) -> ReplicationMetrics:
shift_min = self.shift_length_min
shift_hours = shift_min / 60.0 if shift_min > 0 else 1.0
total_tonnes = self.completed_dumps * self.payload_tonnes
tonnes_per_hour = total_tonnes / shift_hours
# Truck cycle / utilisation -------------------------------------------
cycle_totals: list[float] = []
cycle_counts: list[int] = []
utilisations: list[float] = []
for truck in self._trucks.values():
if truck.completed_cycle_count > 0:
cycle_totals.append(truck.completed_cycle_total_min)
cycle_counts.append(truck.completed_cycle_count)
if shift_min > 0:
utilisations.append(min(1.0, truck.productive_busy_time / shift_min))
if sum(cycle_counts) > 0:
avg_cycle_time = sum(cycle_totals) / sum(cycle_counts)
else:
avg_cycle_time = 0.0
avg_utilisation = (
sum(utilisations) / len(utilisations) if utilisations else 0.0
)
# Loader / crusher metrics --------------------------------------------
loader_metrics: dict[str, LoaderMetrics] = {}
loader_wait_totals: list[float] = []
loader_service_counts: list[int] = []
for loader_id, counter in self._loaders.items():
mean_wait = (
counter.wait_time_total / counter.service_count
if counter.service_count
else 0.0
)
utilisation = (
min(1.0, counter.busy_time / shift_min) if shift_min > 0 else 0.0
)
loader_metrics[loader_id] = LoaderMetrics(
loader_id=loader_id,
utilisation=utilisation,
mean_queue_wait_min=mean_wait,
services_completed=counter.service_count,
)
loader_wait_totals.append(counter.wait_time_total)
loader_service_counts.append(counter.service_count)
total_loader_services = sum(loader_service_counts)
avg_loader_queue = (
sum(loader_wait_totals) / total_loader_services
if total_loader_services
else 0.0
)
assert self._crusher is not None
crusher_mean_wait = (
self._crusher.wait_time_total / self._crusher.service_count
if self._crusher.service_count
else 0.0
)
crusher_metrics = CrusherMetrics(
dump_id=self.crusher_id,
utilisation=(
min(1.0, self._crusher.busy_time / shift_min)
if shift_min > 0
else 0.0
),
mean_queue_wait_min=crusher_mean_wait,
services_completed=self._crusher.service_count,
)
# Edge metrics --------------------------------------------------------
edge_metrics: dict[str, EdgeMetrics] = {}
for edge_id, counter in self._edges.items():
mean_wait = (
counter.wait_time_total / counter.traversal_count
if counter.traversal_count
else 0.0
)
mean_traversal = (
self.edge_traversal_total_min[edge_id] / counter.traversal_count
if counter.traversal_count
else 0.0
)
utilisation = (
min(1.0, counter.busy_time / shift_min) if shift_min > 0 else 0.0
)
edge_metrics[edge_id] = EdgeMetrics(
edge_id=edge_id,
utilisation=utilisation,
mean_queue_wait_min=mean_wait,
mean_traversal_time_min=mean_traversal,
traversal_count=counter.traversal_count,
total_wait_time_min=counter.wait_time_total,
)
return ReplicationMetrics(
scenario_id=self.scenario_id,
replication_index=self.replication_index,
random_seed=self.random_seed,
shift_length_min=shift_min,
truck_count=len(self.truck_ids),
total_tonnes_delivered=total_tonnes,
tonnes_per_hour=tonnes_per_hour,
average_truck_cycle_time_min=avg_cycle_time,
average_truck_utilisation=avg_utilisation,
crusher=crusher_metrics,
loaders=MappingProxyType(loader_metrics),
edges=MappingProxyType(edge_metrics),
average_loader_queue_time_min=avg_loader_queue,
average_crusher_queue_time_min=crusher_mean_wait,
completed_dumps=self.completed_dumps,
)
__all__ = [
"CrusherMetrics",
"EdgeMetrics",
"LoaderMetrics",
"MetricsRecorder",
"ReplicationMetrics",
]
src/mine_sim/metrics.py
← Back to submission · View raw on GitHub