src/mine_sim/model.py

← Back to submission · View raw on GitHub

"""SimPy discrete-event model of the synthetic-mine ore haulage system.

This module is the *active* simulation. It:

1. Spawns one SimPy process per truck.
2. Implements the ore cycle: PARK -> chosen LOAD -> CRUSH -> next LOAD ...
3. Acquires SimPy ``Resource`` slots for loaders, the crusher, and every
   capacity-1 edge; the rest of the graph is a plain ``timeout``.
4. Appends :class:`~mine_sim.events.EventRecord` rows and feeds a
   :class:`~mine_sim.metrics.MetricsRecorder`.

Design contracts:

* All trucks are released simultaneously at ``t = 0`` from PARK.
* Dispatch: each empty truck picks the loader minimising
  ``travel_to_loader + queue_len * mean_load_time + own_mean_load`` (see
  :func:`mine_sim.routing.select_loader`). The *route* is static; the
  *loader choice* is dynamic (live queue lengths).
* Edge travel time = ``free_flow_time / speed_factor * lognormal(cv)``;
  empty trucks use ``empty_speed_factor``, loaded use ``loaded_speed_factor``.
* Loading and dumping draw from ``truncated_normal(mean, sd, 0.1)``.
* The shift ends with ``env.run(until=480)``; only ``end_dump`` events that
  occur strictly before the cut count toward throughput. A fresh
  ``simpy.Environment`` per replication means in-flight resource holds at the
  cut vanish with the env — no manual release needed.

The class stays a thin SimPy host; the testable logic (dispatch cost,
distributions, routing) lives in pure modules.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Iterable

import numpy as np
import simpy

from mine_sim.events import (
    EVENT_ARRIVE_CRUSHER,
    EVENT_ARRIVE_LOADER,
    EVENT_DEPART_CRUSHER,
    EVENT_DEPART_LOADER,
    EVENT_DISPATCH,
    EVENT_EDGE_ENTER,
    EVENT_EDGE_LEAVE,
    EVENT_END_DUMP,
    EVENT_END_LOAD,
    EVENT_START_DUMP,
    EVENT_START_LOAD,
    EventRecord,
)
from mine_sim.metrics import MetricsRecorder
from mine_sim.rng import (
    ReplicationRNG,
    lognormal_noise_multiplier,
    truncated_normal,
)
from mine_sim.routing import LoaderCandidate, RoutingTable, select_loader
from mine_sim.scenarios import ScenarioConfig
from mine_sim.topology import EdgeView, LoaderSpec, Topology, TruckSpec


# ---------------------------------------------------------------------------
# Internal handle dataclasses bundling SimPy resources with their specs.
# ---------------------------------------------------------------------------
@dataclass
class _LoaderHandle:
    spec: LoaderSpec
    resource: simpy.Resource


@dataclass
class _EdgeHandle:
    edge: EdgeView
    resource: simpy.Resource


@dataclass
class _CrusherHandle:
    dump_id: str
    mean_dump_time_min: float
    sd_dump_time_min: float
    resource: simpy.Resource


def _stochastic_edge_time_min(
    edge: EdgeView,
    speed_factor: float,
    travel_noise_cv: float,
    rng: np.random.Generator,
) -> float:
    """Sampled traversal time for one edge.

    ``free_flow_time / speed_factor * lognormal_noise(cv)``. Closed or
    infinite-time edges raise so a routing bug surfaces immediately rather
    than producing a silent zero/inf.
    """
    if edge.closed:
        raise RuntimeError(f"Cannot traverse closed edge {edge.edge_id}")
    base = edge.free_flow_time_min
    if base == float("inf"):
        raise RuntimeError(f"Edge {edge.edge_id} has infinite free-flow time")
    if speed_factor <= 0:
        raise ValueError(f"speed_factor must be > 0, got {speed_factor}")
    multiplier = lognormal_noise_multiplier(rng, cv=travel_noise_cv)
    return (base / speed_factor) * multiplier


@dataclass
class MineSimulation:
    """Wires SimPy together for one replication.

    Not itself a SimPy process: it owns the environment, resources, RNG
    bundle, recorder, and event list, and schedules one truck process each.
    Use :func:`mine_sim.runner.run_replication` as the public entry point.
    """

    scenario: ScenarioConfig
    topology: Topology
    routes: RoutingTable
    rng: ReplicationRNG
    recorder: MetricsRecorder
    events: list[EventRecord] = field(default_factory=list)

    env: simpy.Environment = field(init=False)
    loaders: dict[str, _LoaderHandle] = field(default_factory=dict, init=False)
    edge_resources: dict[str, _EdgeHandle] = field(default_factory=dict, init=False)
    crusher: _CrusherHandle = field(init=False)

    def __post_init__(self) -> None:
        self.env = simpy.Environment()
        self._build_resources()
        self._schedule_trucks()

    # ------------------------------------------------------------------
    # Wiring
    # ------------------------------------------------------------------
    def _build_resources(self) -> None:
        for loader in self.topology.loaders.values():
            self.loaders[loader.loader_id] = _LoaderHandle(
                spec=loader,
                resource=simpy.Resource(self.env, capacity=loader.capacity),
            )

        crusher_spec = next(
            (d for d in self.topology.dump_points.values() if d.type == "crusher"),
            None,
        )
        if crusher_spec is None:
            raise RuntimeError("Topology has no dump_point of type 'crusher'")
        self.crusher = _CrusherHandle(
            dump_id=crusher_spec.dump_id,
            mean_dump_time_min=crusher_spec.mean_dump_time_min,
            sd_dump_time_min=crusher_spec.sd_dump_time_min,
            resource=simpy.Resource(self.env, capacity=crusher_spec.capacity),
        )

        for edge_id in self.topology.capacity_constrained_edges():
            edge = self.topology.edges[edge_id]
            self.edge_resources[edge_id] = _EdgeHandle(
                edge=edge,
                resource=simpy.Resource(self.env, capacity=max(1, edge.capacity)),
            )

    def _schedule_trucks(self) -> None:
        for truck in self.topology.trucks:
            self.env.process(self._truck_process(truck))

    # ------------------------------------------------------------------
    # Event helper
    # ------------------------------------------------------------------
    def _emit(
        self,
        truck_id: str,
        event_type: str,
        *,
        from_node: str | None = None,
        to_node: str | None = None,
        location: str | None = None,
        loaded: bool | None = None,
        payload_tonnes: float | None = None,
        resource_id: str | None = None,
        queue_length: int | None = None,
    ) -> None:
        self.events.append(
            EventRecord(
                time_min=float(self.env.now),
                replication=self.recorder.replication_index,
                scenario_id=self.scenario.scenario_id,
                truck_id=truck_id,
                event_type=event_type,
                from_node=from_node,
                to_node=to_node,
                location=location,
                loaded=loaded,
                payload_tonnes=payload_tonnes,
                resource_id=resource_id,
                queue_length=queue_length,
            )
        )

    @staticmethod
    def _occupancy(resource: simpy.Resource) -> int:
        """Live count of users in service + waiting in the queue."""
        return resource.count + len(resource.queue)

    # ------------------------------------------------------------------
    # Dispatch helper
    # ------------------------------------------------------------------
    def _choose_loader(self, origin: str, speed_factor: float) -> _LoaderHandle:
        """Apply the dynamic dispatch policy: argmin expected time-to-loaded.

        Builds a :class:`LoaderCandidate` per reachable loader from the live
        queue state and the static free-flow route time (scaled by the empty
        speed factor), then delegates the argmin + tie-break to
        :func:`mine_sim.routing.select_loader`.
        """
        candidates: list[LoaderCandidate] = []
        handles: dict[str, _LoaderHandle] = {}
        for loader_id in sorted(self.loaders):
            handle = self.loaders[loader_id]
            route = self.routes.get(origin, handle.spec.node_id)
            if route is None:
                continue
            travel_min = route.free_flow_time_min / max(speed_factor, 1e-9)
            candidates.append(
                LoaderCandidate(
                    loader_id=loader_id,
                    travel_time_min=travel_min,
                    queue_len=self._occupancy(handle.resource),
                    mean_load_time_min=handle.spec.mean_load_time_min,
                )
            )
            handles[loader_id] = handle
        if not candidates:
            raise RuntimeError(
                f"No reachable loader from {origin} — reachability check broken?"
            )
        return handles[select_loader(candidates)]

    # ------------------------------------------------------------------
    # Travel along a route (sequence of edges)
    # ------------------------------------------------------------------
    def _travel(
        self,
        truck_id: str,
        edge_ids: Iterable[str],
        speed_factor: float,
        loaded: bool,
        payload_tonnes: float,
    ):
        """Generator: walk a route, holding capacity-1 edge resources.

        Capacity-1 edges are wrapped with request/release; queue waits and
        traversal times are recorded and emitted as edge_enter/edge_leave.
        Free-flow edges are plain timeouts. Productive time accrues for every
        completed wait + traversal.
        """
        edge_noise_rng = self.rng["edge_noise"]
        noise_cv = self.scenario.stochasticity.travel_time_noise_cv
        for edge_id in edge_ids:
            edge = self.topology.edges[edge_id]
            travel_time = _stochastic_edge_time_min(
                edge=edge,
                speed_factor=speed_factor,
                travel_noise_cv=noise_cv,
                rng=edge_noise_rng,
            )
            if edge.is_capacity_constrained:
                yield from self._traverse_constrained_edge(
                    truck_id, edge, travel_time, loaded, payload_tonnes
                )
            else:
                yield self.env.timeout(travel_time)
                self.recorder.add_productive_time(truck_id, travel_time)

    def _traverse_constrained_edge(
        self,
        truck_id: str,
        edge: EdgeView,
        travel_time: float,
        loaded: bool,
        payload_tonnes: float,
    ):
        """Generator: request a capacity-1 edge, traverse, release."""
        handle = self.edge_resources[edge.edge_id]
        wait_start = self.env.now
        with handle.resource.request() as req:
            yield req
            wait_time = self.env.now - wait_start
            self._emit(
                truck_id,
                EVENT_EDGE_ENTER,
                from_node=edge.from_node,
                to_node=edge.to_node,
                location=edge.from_node,
                loaded=loaded,
                payload_tonnes=payload_tonnes,
                resource_id=edge.edge_id,
                queue_length=self._occupancy(handle.resource),
            )
            yield self.env.timeout(travel_time)
            self.recorder.record_edge_traversal(
                edge_id=edge.edge_id,
                wait_time_min=wait_time,
                traversal_time_min=travel_time,
            )
            self.recorder.add_productive_time(truck_id, wait_time + travel_time)
            self._emit(
                truck_id,
                EVENT_EDGE_LEAVE,
                from_node=edge.from_node,
                to_node=edge.to_node,
                location=edge.to_node,
                loaded=loaded,
                payload_tonnes=payload_tonnes,
                resource_id=edge.edge_id,
                queue_length=self._occupancy(handle.resource),
            )

    # ------------------------------------------------------------------
    # Loading / dumping
    # ------------------------------------------------------------------
    def _load(self, truck: TruckSpec, loader: _LoaderHandle):
        """Generator: queue + service at a loader, emitting events."""
        loading_rng = self.rng["loading"]
        self._emit(
            truck.truck_id,
            EVENT_ARRIVE_LOADER,
            location=loader.spec.node_id,
            loaded=False,
            payload_tonnes=0.0,
            resource_id=loader.spec.loader_id,
            queue_length=self._occupancy(loader.resource),
        )
        wait_start = self.env.now
        with loader.resource.request() as req:
            yield req
            wait_time = self.env.now - wait_start
            self._emit(
                truck.truck_id,
                EVENT_START_LOAD,
                location=loader.spec.node_id,
                loaded=False,
                payload_tonnes=0.0,
                resource_id=loader.spec.loader_id,
                queue_length=self._occupancy(loader.resource),
            )
            duration = truncated_normal(
                loading_rng,
                mean=loader.spec.mean_load_time_min,
                sd=loader.spec.sd_load_time_min,
            )
            yield self.env.timeout(duration)
            self.recorder.record_loader_service(
                loader_id=loader.spec.loader_id,
                wait_time_min=wait_time,
                service_time_min=duration,
            )
            self.recorder.add_productive_time(truck.truck_id, wait_time + duration)
            self._emit(
                truck.truck_id,
                EVENT_END_LOAD,
                location=loader.spec.node_id,
                loaded=True,
                payload_tonnes=truck.payload_tonnes,
                resource_id=loader.spec.loader_id,
                queue_length=self._occupancy(loader.resource),
            )
        self._emit(
            truck.truck_id,
            EVENT_DEPART_LOADER,
            location=loader.spec.node_id,
            loaded=True,
            payload_tonnes=truck.payload_tonnes,
            resource_id=loader.spec.loader_id,
            queue_length=self._occupancy(loader.resource),
        )

    def _dump(self, truck: TruckSpec):
        """Generator: queue + service at the crusher; credit tonnes if before cut."""
        dumping_rng = self.rng["dumping"]
        self._emit(
            truck.truck_id,
            EVENT_ARRIVE_CRUSHER,
            location="CRUSH",
            loaded=True,
            payload_tonnes=truck.payload_tonnes,
            resource_id=self.crusher.dump_id,
            queue_length=self._occupancy(self.crusher.resource),
        )
        wait_start = self.env.now
        with self.crusher.resource.request() as req:
            yield req
            wait_time = self.env.now - wait_start
            self._emit(
                truck.truck_id,
                EVENT_START_DUMP,
                location="CRUSH",
                loaded=True,
                payload_tonnes=truck.payload_tonnes,
                resource_id=self.crusher.dump_id,
                queue_length=self._occupancy(self.crusher.resource),
            )
            duration = truncated_normal(
                dumping_rng,
                mean=self.crusher.mean_dump_time_min,
                sd=self.crusher.sd_dump_time_min,
            )
            yield self.env.timeout(duration)
            self.recorder.record_crusher_service(
                wait_time_min=wait_time,
                service_time_min=duration,
            )
            self.recorder.add_productive_time(truck.truck_id, wait_time + duration)
            now = self.env.now
            # Hard cut: only credit dumps that close strictly before shift end.
            if now < self.recorder.shift_length_min:
                self.recorder.record_completed_dump(now, truck.truck_id)
                self.recorder.record_cycle_end(truck.truck_id, now)
            self._emit(
                truck.truck_id,
                EVENT_END_DUMP,
                location="CRUSH",
                loaded=False,
                payload_tonnes=truck.payload_tonnes,
                resource_id=self.crusher.dump_id,
                queue_length=self._occupancy(self.crusher.resource),
            )
        self._emit(
            truck.truck_id,
            EVENT_DEPART_CRUSHER,
            location="CRUSH",
            loaded=False,
            payload_tonnes=0.0,
            resource_id=self.crusher.dump_id,
            queue_length=self._occupancy(self.crusher.resource),
        )

    # ------------------------------------------------------------------
    # Main truck loop
    # ------------------------------------------------------------------
    def _truck_process(self, truck: TruckSpec):
        """SimPy process generator implementing the ore haulage loop."""
        self.recorder.record_dispatch(truck.truck_id, time_min=self.env.now)
        self._emit(
            truck.truck_id,
            EVENT_DISPATCH,
            location=truck.start_node,
            loaded=False,
            payload_tonnes=0.0,
            queue_length=0,
        )
        current_node = truck.start_node
        shift_min = self.recorder.shift_length_min

        while self.env.now < shift_min:
            chosen = self._choose_loader(
                origin=current_node,
                speed_factor=truck.empty_speed_factor,
            )
            route_to_loader = self.routes.require(current_node, chosen.spec.node_id)
            yield from self._travel(
                truck_id=truck.truck_id,
                edge_ids=route_to_loader.edge_ids,
                speed_factor=truck.empty_speed_factor,
                loaded=False,
                payload_tonnes=0.0,
            )
            current_node = chosen.spec.node_id
            if self.env.now >= shift_min:
                break

            yield from self._load(truck, chosen)
            if self.env.now >= shift_min:
                break

            route_to_crusher = self.routes.require(current_node, "CRUSH")
            yield from self._travel(
                truck_id=truck.truck_id,
                edge_ids=route_to_crusher.edge_ids,
                speed_factor=truck.loaded_speed_factor,
                loaded=True,
                payload_tonnes=truck.payload_tonnes,
            )
            current_node = "CRUSH"
            if self.env.now >= shift_min:
                break

            yield from self._dump(truck)
            # _dump credits tonnes + cycle end if before the cut; loop repeats.

    # ------------------------------------------------------------------
    # Run
    # ------------------------------------------------------------------
    def run(self, until_min: float | None = None) -> None:
        """Run the SimPy environment until the shift cut (or a custom horizon)."""
        horizon = (
            until_min if until_min is not None else self.recorder.shift_length_min
        )
        self.env.run(until=horizon)


__all__ = ["MineSimulation"]