"""Per-replication metric accumulator and immutable result dataclasses.
The simulation calls :class:`MetricsRecorder` while it runs, then asks for an
immutable :class:`ReplicationMetrics` snapshot at the hard shift cut.
Cross-replication aggregation (means, CIs) happens elsewhere
(:mod:`mine_sim.aggregate`); this module is side-effect-free between calls.
Performance measures tracked per replication:
* ``total_tonnes_delivered`` - payload x count(end_dump at CRUSH with
time_min < shift_length). In-flight dumps at the cut are not credited.
* ``tonnes_per_hour`` - total_tonnes_delivered / shift_length_hours.
* ``average_truck_cycle_time_min`` - mean completed-cycle duration. The
first cycle is ``dispatch -> end_dump``; later cycles ``end_dump ->
end_dump``.
* ``average_truck_utilisation`` - mean over trucks of
``productive_busy_time / shift_length_min`` (productive = completed
phases only; a phase straddling the cut adds 0, a small documented
under-count).
* ``crusher_utilisation`` / ``loader_utilisation`` - busy_time /
shift_length_min.
* ``average_loader_queue_time_min`` / ``average_crusher_queue_time_min`` -
mean queue wait per service event.
* ``edge_metrics`` - per capacity-1 edge: utilisation, mean wait, mean
traversal time, count, total wait.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from types import MappingProxyType
from typing import Mapping
# ---------------------------------------------------------------------------
# Mutable per-entity counters (internal helpers). Converted to frozen
# snapshots when the replication ends.
# ---------------------------------------------------------------------------
class _TruckCounter:
__slots__ = (
"truck_id",
"productive_busy_time",
"completed_cycle_count",
"completed_cycle_total_min",
"last_cycle_anchor",
)
def __init__(self, truck_id: str) -> None:
self.truck_id = truck_id
self.productive_busy_time = 0.0
self.completed_cycle_count = 0
self.completed_cycle_total_min = 0.0
self.last_cycle_anchor: float | None = None
class _ResourceCounter:
__slots__ = (
"resource_id",
"busy_time",
"wait_time_total",
"service_count",
"traversal_count",
)
def __init__(self, resource_id: str) -> None:
self.resource_id = resource_id
self.busy_time = 0.0
self.wait_time_total = 0.0
self.service_count = 0
self.traversal_count = 0
# ---------------------------------------------------------------------------
# Public, immutable result dataclasses
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class EdgeMetrics:
"""Per capacity-1 edge KPIs at end-of-shift."""
edge_id: str
utilisation: float
mean_queue_wait_min: float
mean_traversal_time_min: float
traversal_count: int
total_wait_time_min: float
@dataclass(frozen=True)
class LoaderMetrics:
"""Per loader KPIs at end-of-shift."""
loader_id: str
utilisation: float
mean_queue_wait_min: float
services_completed: int
@dataclass(frozen=True)
class CrusherMetrics:
"""Crusher KPIs at end-of-shift."""
dump_id: str
utilisation: float
mean_queue_wait_min: float
services_completed: int
@dataclass(frozen=True)
class ReplicationMetrics:
"""Immutable snapshot of one replication's KPIs."""
scenario_id: str
replication_index: int
random_seed: int
shift_length_min: float
truck_count: int
total_tonnes_delivered: float
tonnes_per_hour: float
average_truck_cycle_time_min: float
average_truck_utilisation: float
crusher: CrusherMetrics
loaders: Mapping[str, LoaderMetrics]
edges: Mapping[str, EdgeMetrics]
average_loader_queue_time_min: float
average_crusher_queue_time_min: float
completed_dumps: int
# ---------------------------------------------------------------------------
# Recorder โ the live object the simulation writes into
# ---------------------------------------------------------------------------
@dataclass
class MetricsRecorder:
"""Mutable recorder for a single replication.
The simulation calls the ``record_*`` methods; at end-of-shift the runner
calls :meth:`finalise` for a :class:`ReplicationMetrics` snapshot. Not
thread-safe, but SimPy runs on a single scheduler so that is fine.
"""
scenario_id: str
replication_index: int
random_seed: int
shift_length_min: float
payload_tonnes: float
truck_ids: tuple[str, ...]
loader_ids: tuple[str, ...]
crusher_id: str
capacity_edge_ids: tuple[str, ...]
_trucks: dict[str, _TruckCounter] = field(default_factory=dict, init=False)
_loaders: dict[str, _ResourceCounter] = field(default_factory=dict, init=False)
_crusher: _ResourceCounter | None = field(default=None, init=False)
_edges: dict[str, _ResourceCounter] = field(default_factory=dict, init=False)
completed_dumps: int = field(default=0, init=False)
edge_traversal_total_min: dict[str, float] = field(
default_factory=dict, init=False
)
def __post_init__(self) -> None:
for tid in self.truck_ids:
self._trucks[tid] = _TruckCounter(tid)
for lid in self.loader_ids:
self._loaders[lid] = _ResourceCounter(lid)
self._crusher = _ResourceCounter(self.crusher_id)
for eid in self.capacity_edge_ids:
self._edges[eid] = _ResourceCounter(eid)
self.edge_traversal_total_min[eid] = 0.0
# ------------------------------------------------------------------
# Truck-level updates
# ------------------------------------------------------------------
def record_dispatch(self, truck_id: str, time_min: float) -> None:
self._trucks[truck_id].last_cycle_anchor = time_min
def add_productive_time(self, truck_id: str, duration_min: float) -> None:
if duration_min <= 0:
return
self._trucks[truck_id].productive_busy_time += duration_min
def record_cycle_end(self, truck_id: str, time_min: float) -> None:
"""Record a completed cycle at each ``end_dump`` before the cut.
First cycle = ``dispatch -> end_dump``; subsequent = ``end_dump ->
end_dump``. ``last_cycle_anchor`` marks the start of the current
cycle and is reset to this ``end_dump`` time.
"""
truck = self._trucks[truck_id]
anchor = truck.last_cycle_anchor
if anchor is None:
return
duration = time_min - anchor
if duration > 0:
truck.completed_cycle_count += 1
truck.completed_cycle_total_min += duration
truck.last_cycle_anchor = time_min
# ------------------------------------------------------------------
# Resource-level updates
# ------------------------------------------------------------------
def record_loader_service(
self,
loader_id: str,
wait_time_min: float,
service_time_min: float,
) -> None:
counter = self._loaders[loader_id]
counter.service_count += 1
counter.wait_time_total += max(0.0, wait_time_min)
counter.busy_time += max(0.0, service_time_min)
def record_crusher_service(
self,
wait_time_min: float,
service_time_min: float,
) -> None:
assert self._crusher is not None
self._crusher.service_count += 1
self._crusher.wait_time_total += max(0.0, wait_time_min)
self._crusher.busy_time += max(0.0, service_time_min)
def record_completed_dump(self, time_min: float, truck_id: str) -> None:
"""Credit one closed dump (only when ``time_min < shift_length``)."""
if time_min >= self.shift_length_min:
return
self.completed_dumps += 1
def record_edge_traversal(
self,
edge_id: str,
wait_time_min: float,
traversal_time_min: float,
) -> None:
counter = self._edges.get(edge_id)
if counter is None:
return
counter.traversal_count += 1
counter.wait_time_total += max(0.0, wait_time_min)
counter.busy_time += max(0.0, traversal_time_min)
self.edge_traversal_total_min[edge_id] += max(0.0, traversal_time_min)
# ------------------------------------------------------------------
# Snapshot
# ------------------------------------------------------------------
def finalise(self) -> ReplicationMetrics:
shift_min = self.shift_length_min
shift_hours = shift_min / 60.0 if shift_min > 0 else 1.0
total_tonnes = self.completed_dumps * self.payload_tonnes
tonnes_per_hour = total_tonnes / shift_hours
avg_cycle_time, avg_utilisation = self._truck_aggregates(shift_min)
loader_metrics, avg_loader_queue = self._loader_aggregates(shift_min)
crusher_metrics = self._crusher_snapshot(shift_min)
edge_metrics = self._edge_aggregates(shift_min)
return ReplicationMetrics(
scenario_id=self.scenario_id,
replication_index=self.replication_index,
random_seed=self.random_seed,
shift_length_min=shift_min,
truck_count=len(self.truck_ids),
total_tonnes_delivered=total_tonnes,
tonnes_per_hour=tonnes_per_hour,
average_truck_cycle_time_min=avg_cycle_time,
average_truck_utilisation=avg_utilisation,
crusher=crusher_metrics,
loaders=MappingProxyType(loader_metrics),
edges=MappingProxyType(edge_metrics),
average_loader_queue_time_min=avg_loader_queue,
average_crusher_queue_time_min=crusher_metrics.mean_queue_wait_min,
completed_dumps=self.completed_dumps,
)
# ------------------------------------------------------------------
# Aggregation helpers (kept small so finalise stays readable)
# ------------------------------------------------------------------
def _truck_aggregates(self, shift_min: float) -> tuple[float, float]:
cycle_total = 0.0
cycle_count = 0
utilisations: list[float] = []
for truck in self._trucks.values():
cycle_total += truck.completed_cycle_total_min
cycle_count += truck.completed_cycle_count
if shift_min > 0:
utilisations.append(min(1.0, truck.productive_busy_time / shift_min))
avg_cycle = cycle_total / cycle_count if cycle_count > 0 else 0.0
avg_util = sum(utilisations) / len(utilisations) if utilisations else 0.0
return avg_cycle, avg_util
def _loader_aggregates(
self, shift_min: float
) -> tuple[dict[str, LoaderMetrics], float]:
loader_metrics: dict[str, LoaderMetrics] = {}
wait_total = 0.0
service_total = 0
for loader_id, counter in self._loaders.items():
mean_wait = (
counter.wait_time_total / counter.service_count
if counter.service_count
else 0.0
)
utilisation = (
min(1.0, counter.busy_time / shift_min) if shift_min > 0 else 0.0
)
loader_metrics[loader_id] = LoaderMetrics(
loader_id=loader_id,
utilisation=utilisation,
mean_queue_wait_min=mean_wait,
services_completed=counter.service_count,
)
wait_total += counter.wait_time_total
service_total += counter.service_count
avg_loader_queue = wait_total / service_total if service_total else 0.0
return loader_metrics, avg_loader_queue
def _crusher_snapshot(self, shift_min: float) -> CrusherMetrics:
assert self._crusher is not None
mean_wait = (
self._crusher.wait_time_total / self._crusher.service_count
if self._crusher.service_count
else 0.0
)
utilisation = (
min(1.0, self._crusher.busy_time / shift_min) if shift_min > 0 else 0.0
)
return CrusherMetrics(
dump_id=self.crusher_id,
utilisation=utilisation,
mean_queue_wait_min=mean_wait,
services_completed=self._crusher.service_count,
)
def _edge_aggregates(self, shift_min: float) -> dict[str, EdgeMetrics]:
edge_metrics: dict[str, EdgeMetrics] = {}
for edge_id, counter in self._edges.items():
mean_wait = (
counter.wait_time_total / counter.traversal_count
if counter.traversal_count
else 0.0
)
mean_traversal = (
self.edge_traversal_total_min[edge_id] / counter.traversal_count
if counter.traversal_count
else 0.0
)
utilisation = (
min(1.0, counter.busy_time / shift_min) if shift_min > 0 else 0.0
)
edge_metrics[edge_id] = EdgeMetrics(
edge_id=edge_id,
utilisation=utilisation,
mean_queue_wait_min=mean_wait,
mean_traversal_time_min=mean_traversal,
traversal_count=counter.traversal_count,
total_wait_time_min=counter.wait_time_total,
)
return edge_metrics
__all__ = [
"CrusherMetrics",
"EdgeMetrics",
"LoaderMetrics",
"MetricsRecorder",
"ReplicationMetrics",
]
src/mine_sim/metrics.py
โ Back to submission ยท View raw on GitHub