src/mine_sim/metrics.py

โ† Back to submission ยท View raw on GitHub

"""Per-replication metric accumulator and immutable result dataclasses.

The simulation calls :class:`MetricsRecorder` while it runs, then asks for an
immutable :class:`ReplicationMetrics` snapshot at the hard shift cut.
Cross-replication aggregation (means, CIs) happens elsewhere
(:mod:`mine_sim.aggregate`); this module is side-effect-free between calls.

Performance measures tracked per replication:

* ``total_tonnes_delivered`` - payload x count(end_dump at CRUSH with
  time_min < shift_length). In-flight dumps at the cut are not credited.
* ``tonnes_per_hour`` - total_tonnes_delivered / shift_length_hours.
* ``average_truck_cycle_time_min`` - mean completed-cycle duration. The
  first cycle is ``dispatch -> end_dump``; later cycles ``end_dump ->
  end_dump``.
* ``average_truck_utilisation`` - mean over trucks of
  ``productive_busy_time / shift_length_min`` (productive = completed
  phases only; a phase straddling the cut adds 0, a small documented
  under-count).
* ``crusher_utilisation`` / ``loader_utilisation`` - busy_time /
  shift_length_min.
* ``average_loader_queue_time_min`` / ``average_crusher_queue_time_min`` -
  mean queue wait per service event.
* ``edge_metrics`` - per capacity-1 edge: utilisation, mean wait, mean
  traversal time, count, total wait.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from types import MappingProxyType
from typing import Mapping


# ---------------------------------------------------------------------------
# Mutable per-entity counters (internal helpers). Converted to frozen
# snapshots when the replication ends.
# ---------------------------------------------------------------------------
class _TruckCounter:
    __slots__ = (
        "truck_id",
        "productive_busy_time",
        "completed_cycle_count",
        "completed_cycle_total_min",
        "last_cycle_anchor",
    )

    def __init__(self, truck_id: str) -> None:
        self.truck_id = truck_id
        self.productive_busy_time = 0.0
        self.completed_cycle_count = 0
        self.completed_cycle_total_min = 0.0
        self.last_cycle_anchor: float | None = None


class _ResourceCounter:
    __slots__ = (
        "resource_id",
        "busy_time",
        "wait_time_total",
        "service_count",
        "traversal_count",
    )

    def __init__(self, resource_id: str) -> None:
        self.resource_id = resource_id
        self.busy_time = 0.0
        self.wait_time_total = 0.0
        self.service_count = 0
        self.traversal_count = 0


# ---------------------------------------------------------------------------
# Public, immutable result dataclasses
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class EdgeMetrics:
    """Per capacity-1 edge KPIs at end-of-shift."""

    edge_id: str
    utilisation: float
    mean_queue_wait_min: float
    mean_traversal_time_min: float
    traversal_count: int
    total_wait_time_min: float


@dataclass(frozen=True)
class LoaderMetrics:
    """Per loader KPIs at end-of-shift."""

    loader_id: str
    utilisation: float
    mean_queue_wait_min: float
    services_completed: int


@dataclass(frozen=True)
class CrusherMetrics:
    """Crusher KPIs at end-of-shift."""

    dump_id: str
    utilisation: float
    mean_queue_wait_min: float
    services_completed: int


@dataclass(frozen=True)
class ReplicationMetrics:
    """Immutable snapshot of one replication's KPIs."""

    scenario_id: str
    replication_index: int
    random_seed: int
    shift_length_min: float
    truck_count: int

    total_tonnes_delivered: float
    tonnes_per_hour: float
    average_truck_cycle_time_min: float
    average_truck_utilisation: float

    crusher: CrusherMetrics
    loaders: Mapping[str, LoaderMetrics]
    edges: Mapping[str, EdgeMetrics]

    average_loader_queue_time_min: float
    average_crusher_queue_time_min: float

    completed_dumps: int


# ---------------------------------------------------------------------------
# Recorder โ€” the live object the simulation writes into
# ---------------------------------------------------------------------------
@dataclass
class MetricsRecorder:
    """Mutable recorder for a single replication.

    The simulation calls the ``record_*`` methods; at end-of-shift the runner
    calls :meth:`finalise` for a :class:`ReplicationMetrics` snapshot. Not
    thread-safe, but SimPy runs on a single scheduler so that is fine.
    """

    scenario_id: str
    replication_index: int
    random_seed: int
    shift_length_min: float
    payload_tonnes: float

    truck_ids: tuple[str, ...]
    loader_ids: tuple[str, ...]
    crusher_id: str
    capacity_edge_ids: tuple[str, ...]

    _trucks: dict[str, _TruckCounter] = field(default_factory=dict, init=False)
    _loaders: dict[str, _ResourceCounter] = field(default_factory=dict, init=False)
    _crusher: _ResourceCounter | None = field(default=None, init=False)
    _edges: dict[str, _ResourceCounter] = field(default_factory=dict, init=False)

    completed_dumps: int = field(default=0, init=False)
    edge_traversal_total_min: dict[str, float] = field(
        default_factory=dict, init=False
    )

    def __post_init__(self) -> None:
        for tid in self.truck_ids:
            self._trucks[tid] = _TruckCounter(tid)
        for lid in self.loader_ids:
            self._loaders[lid] = _ResourceCounter(lid)
        self._crusher = _ResourceCounter(self.crusher_id)
        for eid in self.capacity_edge_ids:
            self._edges[eid] = _ResourceCounter(eid)
            self.edge_traversal_total_min[eid] = 0.0

    # ------------------------------------------------------------------
    # Truck-level updates
    # ------------------------------------------------------------------
    def record_dispatch(self, truck_id: str, time_min: float) -> None:
        self._trucks[truck_id].last_cycle_anchor = time_min

    def add_productive_time(self, truck_id: str, duration_min: float) -> None:
        if duration_min <= 0:
            return
        self._trucks[truck_id].productive_busy_time += duration_min

    def record_cycle_end(self, truck_id: str, time_min: float) -> None:
        """Record a completed cycle at each ``end_dump`` before the cut.

        First cycle = ``dispatch -> end_dump``; subsequent = ``end_dump ->
        end_dump``. ``last_cycle_anchor`` marks the start of the current
        cycle and is reset to this ``end_dump`` time.
        """
        truck = self._trucks[truck_id]
        anchor = truck.last_cycle_anchor
        if anchor is None:
            return
        duration = time_min - anchor
        if duration > 0:
            truck.completed_cycle_count += 1
            truck.completed_cycle_total_min += duration
        truck.last_cycle_anchor = time_min

    # ------------------------------------------------------------------
    # Resource-level updates
    # ------------------------------------------------------------------
    def record_loader_service(
        self,
        loader_id: str,
        wait_time_min: float,
        service_time_min: float,
    ) -> None:
        counter = self._loaders[loader_id]
        counter.service_count += 1
        counter.wait_time_total += max(0.0, wait_time_min)
        counter.busy_time += max(0.0, service_time_min)

    def record_crusher_service(
        self,
        wait_time_min: float,
        service_time_min: float,
    ) -> None:
        assert self._crusher is not None
        self._crusher.service_count += 1
        self._crusher.wait_time_total += max(0.0, wait_time_min)
        self._crusher.busy_time += max(0.0, service_time_min)

    def record_completed_dump(self, time_min: float, truck_id: str) -> None:
        """Credit one closed dump (only when ``time_min < shift_length``)."""
        if time_min >= self.shift_length_min:
            return
        self.completed_dumps += 1

    def record_edge_traversal(
        self,
        edge_id: str,
        wait_time_min: float,
        traversal_time_min: float,
    ) -> None:
        counter = self._edges.get(edge_id)
        if counter is None:
            return
        counter.traversal_count += 1
        counter.wait_time_total += max(0.0, wait_time_min)
        counter.busy_time += max(0.0, traversal_time_min)
        self.edge_traversal_total_min[edge_id] += max(0.0, traversal_time_min)

    # ------------------------------------------------------------------
    # Snapshot
    # ------------------------------------------------------------------
    def finalise(self) -> ReplicationMetrics:
        shift_min = self.shift_length_min
        shift_hours = shift_min / 60.0 if shift_min > 0 else 1.0

        total_tonnes = self.completed_dumps * self.payload_tonnes
        tonnes_per_hour = total_tonnes / shift_hours

        avg_cycle_time, avg_utilisation = self._truck_aggregates(shift_min)
        loader_metrics, avg_loader_queue = self._loader_aggregates(shift_min)
        crusher_metrics = self._crusher_snapshot(shift_min)
        edge_metrics = self._edge_aggregates(shift_min)

        return ReplicationMetrics(
            scenario_id=self.scenario_id,
            replication_index=self.replication_index,
            random_seed=self.random_seed,
            shift_length_min=shift_min,
            truck_count=len(self.truck_ids),
            total_tonnes_delivered=total_tonnes,
            tonnes_per_hour=tonnes_per_hour,
            average_truck_cycle_time_min=avg_cycle_time,
            average_truck_utilisation=avg_utilisation,
            crusher=crusher_metrics,
            loaders=MappingProxyType(loader_metrics),
            edges=MappingProxyType(edge_metrics),
            average_loader_queue_time_min=avg_loader_queue,
            average_crusher_queue_time_min=crusher_metrics.mean_queue_wait_min,
            completed_dumps=self.completed_dumps,
        )

    # ------------------------------------------------------------------
    # Aggregation helpers (kept small so finalise stays readable)
    # ------------------------------------------------------------------
    def _truck_aggregates(self, shift_min: float) -> tuple[float, float]:
        cycle_total = 0.0
        cycle_count = 0
        utilisations: list[float] = []
        for truck in self._trucks.values():
            cycle_total += truck.completed_cycle_total_min
            cycle_count += truck.completed_cycle_count
            if shift_min > 0:
                utilisations.append(min(1.0, truck.productive_busy_time / shift_min))
        avg_cycle = cycle_total / cycle_count if cycle_count > 0 else 0.0
        avg_util = sum(utilisations) / len(utilisations) if utilisations else 0.0
        return avg_cycle, avg_util

    def _loader_aggregates(
        self, shift_min: float
    ) -> tuple[dict[str, LoaderMetrics], float]:
        loader_metrics: dict[str, LoaderMetrics] = {}
        wait_total = 0.0
        service_total = 0
        for loader_id, counter in self._loaders.items():
            mean_wait = (
                counter.wait_time_total / counter.service_count
                if counter.service_count
                else 0.0
            )
            utilisation = (
                min(1.0, counter.busy_time / shift_min) if shift_min > 0 else 0.0
            )
            loader_metrics[loader_id] = LoaderMetrics(
                loader_id=loader_id,
                utilisation=utilisation,
                mean_queue_wait_min=mean_wait,
                services_completed=counter.service_count,
            )
            wait_total += counter.wait_time_total
            service_total += counter.service_count
        avg_loader_queue = wait_total / service_total if service_total else 0.0
        return loader_metrics, avg_loader_queue

    def _crusher_snapshot(self, shift_min: float) -> CrusherMetrics:
        assert self._crusher is not None
        mean_wait = (
            self._crusher.wait_time_total / self._crusher.service_count
            if self._crusher.service_count
            else 0.0
        )
        utilisation = (
            min(1.0, self._crusher.busy_time / shift_min) if shift_min > 0 else 0.0
        )
        return CrusherMetrics(
            dump_id=self.crusher_id,
            utilisation=utilisation,
            mean_queue_wait_min=mean_wait,
            services_completed=self._crusher.service_count,
        )

    def _edge_aggregates(self, shift_min: float) -> dict[str, EdgeMetrics]:
        edge_metrics: dict[str, EdgeMetrics] = {}
        for edge_id, counter in self._edges.items():
            mean_wait = (
                counter.wait_time_total / counter.traversal_count
                if counter.traversal_count
                else 0.0
            )
            mean_traversal = (
                self.edge_traversal_total_min[edge_id] / counter.traversal_count
                if counter.traversal_count
                else 0.0
            )
            utilisation = (
                min(1.0, counter.busy_time / shift_min) if shift_min > 0 else 0.0
            )
            edge_metrics[edge_id] = EdgeMetrics(
                edge_id=edge_id,
                utilisation=utilisation,
                mean_queue_wait_min=mean_wait,
                mean_traversal_time_min=mean_traversal,
                traversal_count=counter.traversal_count,
                total_wait_time_min=counter.wait_time_total,
            )
        return edge_metrics


__all__ = [
    "CrusherMetrics",
    "EdgeMetrics",
    "LoaderMetrics",
    "MetricsRecorder",
    "ReplicationMetrics",
]