"""SimPy discrete-event model of the synthetic-mine ore haulage system.
This module is the *active* simulation. It:
1. Spawns one SimPy process per truck.
2. Implements the ore cycle: PARK -> chosen LOAD -> CRUSH -> next LOAD ...
3. Acquires SimPy ``Resource`` slots for loaders, the crusher, and every
capacity-1 edge; the rest of the graph is a plain ``timeout``.
4. Appends :class:`~mine_sim.events.EventRecord` rows and feeds a
:class:`~mine_sim.metrics.MetricsRecorder`.
Design contracts:
* All trucks are released simultaneously at ``t = 0`` from PARK.
* Dispatch: each empty truck picks the loader minimising
``travel_to_loader + queue_len * mean_load_time + own_mean_load`` (see
:func:`mine_sim.routing.select_loader`). The *route* is static; the
*loader choice* is dynamic (live queue lengths).
* Edge travel time = ``free_flow_time / speed_factor * lognormal(cv)``;
empty trucks use ``empty_speed_factor``, loaded use ``loaded_speed_factor``.
* Loading and dumping draw from ``truncated_normal(mean, sd, 0.1)``.
* The shift ends with ``env.run(until=480)``; only ``end_dump`` events that
occur strictly before the cut count toward throughput. A fresh
``simpy.Environment`` per replication means in-flight resource holds at the
cut vanish with the env — no manual release needed.
The class stays a thin SimPy host; the testable logic (dispatch cost,
distributions, routing) lives in pure modules.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable
import numpy as np
import simpy
from mine_sim.events import (
EVENT_ARRIVE_CRUSHER,
EVENT_ARRIVE_LOADER,
EVENT_DEPART_CRUSHER,
EVENT_DEPART_LOADER,
EVENT_DISPATCH,
EVENT_EDGE_ENTER,
EVENT_EDGE_LEAVE,
EVENT_END_DUMP,
EVENT_END_LOAD,
EVENT_START_DUMP,
EVENT_START_LOAD,
EventRecord,
)
from mine_sim.metrics import MetricsRecorder
from mine_sim.rng import (
ReplicationRNG,
lognormal_noise_multiplier,
truncated_normal,
)
from mine_sim.routing import LoaderCandidate, RoutingTable, select_loader
from mine_sim.scenarios import ScenarioConfig
from mine_sim.topology import EdgeView, LoaderSpec, Topology, TruckSpec
# ---------------------------------------------------------------------------
# Internal handle dataclasses bundling SimPy resources with their specs.
# ---------------------------------------------------------------------------
@dataclass
class _LoaderHandle:
spec: LoaderSpec
resource: simpy.Resource
@dataclass
class _EdgeHandle:
edge: EdgeView
resource: simpy.Resource
@dataclass
class _CrusherHandle:
dump_id: str
mean_dump_time_min: float
sd_dump_time_min: float
resource: simpy.Resource
def _stochastic_edge_time_min(
edge: EdgeView,
speed_factor: float,
travel_noise_cv: float,
rng: np.random.Generator,
) -> float:
"""Sampled traversal time for one edge.
``free_flow_time / speed_factor * lognormal_noise(cv)``. Closed or
infinite-time edges raise so a routing bug surfaces immediately rather
than producing a silent zero/inf.
"""
if edge.closed:
raise RuntimeError(f"Cannot traverse closed edge {edge.edge_id}")
base = edge.free_flow_time_min
if base == float("inf"):
raise RuntimeError(f"Edge {edge.edge_id} has infinite free-flow time")
if speed_factor <= 0:
raise ValueError(f"speed_factor must be > 0, got {speed_factor}")
multiplier = lognormal_noise_multiplier(rng, cv=travel_noise_cv)
return (base / speed_factor) * multiplier
@dataclass
class MineSimulation:
"""Wires SimPy together for one replication.
Not itself a SimPy process: it owns the environment, resources, RNG
bundle, recorder, and event list, and schedules one truck process each.
Use :func:`mine_sim.runner.run_replication` as the public entry point.
"""
scenario: ScenarioConfig
topology: Topology
routes: RoutingTable
rng: ReplicationRNG
recorder: MetricsRecorder
events: list[EventRecord] = field(default_factory=list)
env: simpy.Environment = field(init=False)
loaders: dict[str, _LoaderHandle] = field(default_factory=dict, init=False)
edge_resources: dict[str, _EdgeHandle] = field(default_factory=dict, init=False)
crusher: _CrusherHandle = field(init=False)
def __post_init__(self) -> None:
self.env = simpy.Environment()
self._build_resources()
self._schedule_trucks()
# ------------------------------------------------------------------
# Wiring
# ------------------------------------------------------------------
def _build_resources(self) -> None:
for loader in self.topology.loaders.values():
self.loaders[loader.loader_id] = _LoaderHandle(
spec=loader,
resource=simpy.Resource(self.env, capacity=loader.capacity),
)
crusher_spec = next(
(d for d in self.topology.dump_points.values() if d.type == "crusher"),
None,
)
if crusher_spec is None:
raise RuntimeError("Topology has no dump_point of type 'crusher'")
self.crusher = _CrusherHandle(
dump_id=crusher_spec.dump_id,
mean_dump_time_min=crusher_spec.mean_dump_time_min,
sd_dump_time_min=crusher_spec.sd_dump_time_min,
resource=simpy.Resource(self.env, capacity=crusher_spec.capacity),
)
for edge_id in self.topology.capacity_constrained_edges():
edge = self.topology.edges[edge_id]
self.edge_resources[edge_id] = _EdgeHandle(
edge=edge,
resource=simpy.Resource(self.env, capacity=max(1, edge.capacity)),
)
def _schedule_trucks(self) -> None:
for truck in self.topology.trucks:
self.env.process(self._truck_process(truck))
# ------------------------------------------------------------------
# Event helper
# ------------------------------------------------------------------
def _emit(
self,
truck_id: str,
event_type: str,
*,
from_node: str | None = None,
to_node: str | None = None,
location: str | None = None,
loaded: bool | None = None,
payload_tonnes: float | None = None,
resource_id: str | None = None,
queue_length: int | None = None,
) -> None:
self.events.append(
EventRecord(
time_min=float(self.env.now),
replication=self.recorder.replication_index,
scenario_id=self.scenario.scenario_id,
truck_id=truck_id,
event_type=event_type,
from_node=from_node,
to_node=to_node,
location=location,
loaded=loaded,
payload_tonnes=payload_tonnes,
resource_id=resource_id,
queue_length=queue_length,
)
)
@staticmethod
def _occupancy(resource: simpy.Resource) -> int:
"""Live count of users in service + waiting in the queue."""
return resource.count + len(resource.queue)
# ------------------------------------------------------------------
# Dispatch helper
# ------------------------------------------------------------------
def _choose_loader(self, origin: str, speed_factor: float) -> _LoaderHandle:
"""Apply the dynamic dispatch policy: argmin expected time-to-loaded.
Builds a :class:`LoaderCandidate` per reachable loader from the live
queue state and the static free-flow route time (scaled by the empty
speed factor), then delegates the argmin + tie-break to
:func:`mine_sim.routing.select_loader`.
"""
candidates: list[LoaderCandidate] = []
handles: dict[str, _LoaderHandle] = {}
for loader_id in sorted(self.loaders):
handle = self.loaders[loader_id]
route = self.routes.get(origin, handle.spec.node_id)
if route is None:
continue
travel_min = route.free_flow_time_min / max(speed_factor, 1e-9)
candidates.append(
LoaderCandidate(
loader_id=loader_id,
travel_time_min=travel_min,
queue_len=self._occupancy(handle.resource),
mean_load_time_min=handle.spec.mean_load_time_min,
)
)
handles[loader_id] = handle
if not candidates:
raise RuntimeError(
f"No reachable loader from {origin} — reachability check broken?"
)
return handles[select_loader(candidates)]
# ------------------------------------------------------------------
# Travel along a route (sequence of edges)
# ------------------------------------------------------------------
def _travel(
self,
truck_id: str,
edge_ids: Iterable[str],
speed_factor: float,
loaded: bool,
payload_tonnes: float,
):
"""Generator: walk a route, holding capacity-1 edge resources.
Capacity-1 edges are wrapped with request/release; queue waits and
traversal times are recorded and emitted as edge_enter/edge_leave.
Free-flow edges are plain timeouts. Productive time accrues for every
completed wait + traversal.
"""
edge_noise_rng = self.rng["edge_noise"]
noise_cv = self.scenario.stochasticity.travel_time_noise_cv
for edge_id in edge_ids:
edge = self.topology.edges[edge_id]
travel_time = _stochastic_edge_time_min(
edge=edge,
speed_factor=speed_factor,
travel_noise_cv=noise_cv,
rng=edge_noise_rng,
)
if edge.is_capacity_constrained:
yield from self._traverse_constrained_edge(
truck_id, edge, travel_time, loaded, payload_tonnes
)
else:
yield self.env.timeout(travel_time)
self.recorder.add_productive_time(truck_id, travel_time)
def _traverse_constrained_edge(
self,
truck_id: str,
edge: EdgeView,
travel_time: float,
loaded: bool,
payload_tonnes: float,
):
"""Generator: request a capacity-1 edge, traverse, release."""
handle = self.edge_resources[edge.edge_id]
wait_start = self.env.now
with handle.resource.request() as req:
yield req
wait_time = self.env.now - wait_start
self._emit(
truck_id,
EVENT_EDGE_ENTER,
from_node=edge.from_node,
to_node=edge.to_node,
location=edge.from_node,
loaded=loaded,
payload_tonnes=payload_tonnes,
resource_id=edge.edge_id,
queue_length=self._occupancy(handle.resource),
)
yield self.env.timeout(travel_time)
self.recorder.record_edge_traversal(
edge_id=edge.edge_id,
wait_time_min=wait_time,
traversal_time_min=travel_time,
)
self.recorder.add_productive_time(truck_id, wait_time + travel_time)
self._emit(
truck_id,
EVENT_EDGE_LEAVE,
from_node=edge.from_node,
to_node=edge.to_node,
location=edge.to_node,
loaded=loaded,
payload_tonnes=payload_tonnes,
resource_id=edge.edge_id,
queue_length=self._occupancy(handle.resource),
)
# ------------------------------------------------------------------
# Loading / dumping
# ------------------------------------------------------------------
def _load(self, truck: TruckSpec, loader: _LoaderHandle):
"""Generator: queue + service at a loader, emitting events."""
loading_rng = self.rng["loading"]
self._emit(
truck.truck_id,
EVENT_ARRIVE_LOADER,
location=loader.spec.node_id,
loaded=False,
payload_tonnes=0.0,
resource_id=loader.spec.loader_id,
queue_length=self._occupancy(loader.resource),
)
wait_start = self.env.now
with loader.resource.request() as req:
yield req
wait_time = self.env.now - wait_start
self._emit(
truck.truck_id,
EVENT_START_LOAD,
location=loader.spec.node_id,
loaded=False,
payload_tonnes=0.0,
resource_id=loader.spec.loader_id,
queue_length=self._occupancy(loader.resource),
)
duration = truncated_normal(
loading_rng,
mean=loader.spec.mean_load_time_min,
sd=loader.spec.sd_load_time_min,
)
yield self.env.timeout(duration)
self.recorder.record_loader_service(
loader_id=loader.spec.loader_id,
wait_time_min=wait_time,
service_time_min=duration,
)
self.recorder.add_productive_time(truck.truck_id, wait_time + duration)
self._emit(
truck.truck_id,
EVENT_END_LOAD,
location=loader.spec.node_id,
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=loader.spec.loader_id,
queue_length=self._occupancy(loader.resource),
)
self._emit(
truck.truck_id,
EVENT_DEPART_LOADER,
location=loader.spec.node_id,
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=loader.spec.loader_id,
queue_length=self._occupancy(loader.resource),
)
def _dump(self, truck: TruckSpec):
"""Generator: queue + service at the crusher; credit tonnes if before cut."""
dumping_rng = self.rng["dumping"]
self._emit(
truck.truck_id,
EVENT_ARRIVE_CRUSHER,
location="CRUSH",
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=self.crusher.dump_id,
queue_length=self._occupancy(self.crusher.resource),
)
wait_start = self.env.now
with self.crusher.resource.request() as req:
yield req
wait_time = self.env.now - wait_start
self._emit(
truck.truck_id,
EVENT_START_DUMP,
location="CRUSH",
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=self.crusher.dump_id,
queue_length=self._occupancy(self.crusher.resource),
)
duration = truncated_normal(
dumping_rng,
mean=self.crusher.mean_dump_time_min,
sd=self.crusher.sd_dump_time_min,
)
yield self.env.timeout(duration)
self.recorder.record_crusher_service(
wait_time_min=wait_time,
service_time_min=duration,
)
self.recorder.add_productive_time(truck.truck_id, wait_time + duration)
now = self.env.now
# Hard cut: only credit dumps that close strictly before shift end.
if now < self.recorder.shift_length_min:
self.recorder.record_completed_dump(now, truck.truck_id)
self.recorder.record_cycle_end(truck.truck_id, now)
self._emit(
truck.truck_id,
EVENT_END_DUMP,
location="CRUSH",
loaded=False,
payload_tonnes=truck.payload_tonnes,
resource_id=self.crusher.dump_id,
queue_length=self._occupancy(self.crusher.resource),
)
self._emit(
truck.truck_id,
EVENT_DEPART_CRUSHER,
location="CRUSH",
loaded=False,
payload_tonnes=0.0,
resource_id=self.crusher.dump_id,
queue_length=self._occupancy(self.crusher.resource),
)
# ------------------------------------------------------------------
# Main truck loop
# ------------------------------------------------------------------
def _truck_process(self, truck: TruckSpec):
"""SimPy process generator implementing the ore haulage loop."""
self.recorder.record_dispatch(truck.truck_id, time_min=self.env.now)
self._emit(
truck.truck_id,
EVENT_DISPATCH,
location=truck.start_node,
loaded=False,
payload_tonnes=0.0,
queue_length=0,
)
current_node = truck.start_node
shift_min = self.recorder.shift_length_min
while self.env.now < shift_min:
chosen = self._choose_loader(
origin=current_node,
speed_factor=truck.empty_speed_factor,
)
route_to_loader = self.routes.require(current_node, chosen.spec.node_id)
yield from self._travel(
truck_id=truck.truck_id,
edge_ids=route_to_loader.edge_ids,
speed_factor=truck.empty_speed_factor,
loaded=False,
payload_tonnes=0.0,
)
current_node = chosen.spec.node_id
if self.env.now >= shift_min:
break
yield from self._load(truck, chosen)
if self.env.now >= shift_min:
break
route_to_crusher = self.routes.require(current_node, "CRUSH")
yield from self._travel(
truck_id=truck.truck_id,
edge_ids=route_to_crusher.edge_ids,
speed_factor=truck.loaded_speed_factor,
loaded=True,
payload_tonnes=truck.payload_tonnes,
)
current_node = "CRUSH"
if self.env.now >= shift_min:
break
yield from self._dump(truck)
# _dump credits tonnes + cycle end if before the cut; loop repeats.
# ------------------------------------------------------------------
# Run
# ------------------------------------------------------------------
def run(self, until_min: float | None = None) -> None:
"""Run the SimPy environment until the shift cut (or a custom horizon)."""
horizon = (
until_min if until_min is not None else self.recorder.shift_length_min
)
self.env.run(until=horizon)
__all__ = ["MineSimulation"]
src/mine_sim/model.py
← Back to submission · View raw on GitHub