"""SimPy discrete-event model of the synthetic mine ore haulage system.
This module is the *active* simulation. It:
1. Spawns one SimPy process per truck.
2. Implements the ore cycle: PARK -> chosen LOAD -> CRUSH -> next LOAD ...
3. Acquires SimPy ``Resource`` slots for loaders, the crusher, and every
capacity-1 edge — the rest of the graph is modelled as a plain ``timeout``.
4. Pushes events into a list of :class:`mine_sim.events.EventRecord` and
per-replication counters into a :class:`mine_sim.metrics.MetricsRecorder`.
Design contracts (Seed-derived):
* All trucks are released simultaneously at ``t = 0``.
* Dispatch policy: ``argmin(travel_to_loader + queue_len * mean_load_time
+ own_load_time)``. ``queue_len`` includes the truck currently in service.
* Travel time on an edge is ``free_flow_time / speed_factor *
lognormal(cv=0.10)``; ``empty_speed_factor`` for empty trucks,
``loaded_speed_factor`` for loaded trucks.
* Loading and dumping draws come from ``truncated_normal(mean, sd, 0.1)``.
* ``end_dump`` events that occur strictly before ``shift_length_min`` are
the only ones that count toward throughput (hard cut).
* Per-replication seed = ``base_seed + replication_index`` (handled in
:mod:`mine_sim.rng`).
The module deliberately keeps "shapes" small: ``MineSimulation`` is a thin
SimPy host; the real logic lives in functional helpers and the
:class:`MetricsRecorder` so it can be unit-tested without spinning up
SimPy.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable
import numpy as np
import simpy
from mine_sim.events import (
EVENT_ARRIVE_CRUSHER,
EVENT_ARRIVE_LOADER,
EVENT_DEPART_CRUSHER,
EVENT_DEPART_LOADER,
EVENT_DISPATCH,
EVENT_EDGE_ENTER,
EVENT_EDGE_LEAVE,
EVENT_END_DUMP,
EVENT_END_LOAD,
EVENT_START_DUMP,
EVENT_START_LOAD,
EventRecord,
)
from mine_sim.metrics import MetricsRecorder
from mine_sim.rng import (
ReplicationRNG,
lognormal_noise_multiplier,
truncated_normal,
)
from mine_sim.routing import RoutingTable
from mine_sim.scenarios import ScenarioConfig
from mine_sim.topology import EdgeView, LoaderSpec, Topology, TruckSpec
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
@dataclass
class _LoaderHandle:
"""Bundle a loader's SimPy ``Resource`` with its static spec."""
spec: LoaderSpec
resource: simpy.Resource
@dataclass
class _EdgeHandle:
"""SimPy ``Resource`` for a capacity-1 edge."""
edge: EdgeView
resource: simpy.Resource
@dataclass
class _CrusherHandle:
dump_id: str
mean_dump_time_min: float
sd_dump_time_min: float
resource: simpy.Resource
def _stochastic_edge_time_min(
edge: EdgeView,
speed_factor: float,
travel_noise_cv: float,
rng: np.random.Generator,
) -> float:
"""Sampled traversal time for a single edge.
``free_flow_time`` is multiplied by ``1 / speed_factor`` (truck class)
and by an independent lognormal noise multiplier with mean 1.
Closed edges should not appear in any precomputed route, but as a
safety we still raise so a routing bug surfaces immediately.
"""
if edge.closed:
raise RuntimeError(f"Cannot traverse closed edge {edge.edge_id}")
base = edge.free_flow_time_min
if base == float("inf"):
raise RuntimeError(f"Edge {edge.edge_id} has infinite free-flow time")
if speed_factor <= 0:
raise ValueError(f"speed_factor must be > 0, got {speed_factor}")
multiplier = lognormal_noise_multiplier(rng, cv=travel_noise_cv)
return (base / speed_factor) * multiplier
# ---------------------------------------------------------------------------
# Main simulation host
# ---------------------------------------------------------------------------
@dataclass
class MineSimulation:
"""Wires SimPy together for one replication.
The class is *not* itself a SimPy process; it owns the environment,
resources, RNG bundle, recorder, and event list, and exposes one
truck-process method ``run_truck`` that the constructor schedules.
Use :func:`run_replication` (in :mod:`mine_sim.runner`) as the public
entry point — it instantiates this class, runs the env, and finalises
metrics.
"""
scenario: ScenarioConfig
topology: Topology
routes: RoutingTable
rng: ReplicationRNG
recorder: MetricsRecorder
events: list[EventRecord] = field(default_factory=list)
# Filled in by ``_build_resources``
env: simpy.Environment = field(init=False)
loaders: dict[str, _LoaderHandle] = field(default_factory=dict, init=False)
edge_resources: dict[str, _EdgeHandle] = field(default_factory=dict, init=False)
crusher: _CrusherHandle = field(init=False)
def __post_init__(self) -> None:
self.env = simpy.Environment()
self._build_resources()
self._schedule_trucks()
# ------------------------------------------------------------------
# Wiring
# ------------------------------------------------------------------
def _build_resources(self) -> None:
# Loaders
for loader in self.topology.loaders.values():
# Skip loaders that aren't ore loaders (defensive, matches CSV).
self.loaders[loader.loader_id] = _LoaderHandle(
spec=loader,
resource=simpy.Resource(self.env, capacity=loader.capacity),
)
# Crusher (D_CRUSH only — WASTE is out-of-scope)
crusher_spec = next(
(
d
for d in self.topology.dump_points.values()
if d.type == "crusher"
),
None,
)
if crusher_spec is None:
raise RuntimeError("Topology has no dump_point of type 'crusher'")
self.crusher = _CrusherHandle(
dump_id=crusher_spec.dump_id,
mean_dump_time_min=crusher_spec.mean_dump_time_min,
sd_dump_time_min=crusher_spec.sd_dump_time_min,
resource=simpy.Resource(self.env, capacity=crusher_spec.capacity),
)
# Capacity-1 edge resources
for edge_id in self.topology.capacity_constrained_edges():
edge = self.topology.edges[edge_id]
self.edge_resources[edge_id] = _EdgeHandle(
edge=edge,
resource=simpy.Resource(self.env, capacity=max(1, edge.capacity)),
)
def _schedule_trucks(self) -> None:
for truck in self.topology.trucks:
self.env.process(self._truck_process(truck))
# ------------------------------------------------------------------
# Event helpers
# ------------------------------------------------------------------
def _emit(
self,
truck_id: str,
event_type: str,
*,
from_node: str | None = None,
to_node: str | None = None,
location: str | None = None,
loaded: bool | None = None,
payload_tonnes: float | None = None,
resource_id: str | None = None,
queue_length: int | None = None,
) -> None:
self.events.append(
EventRecord(
time_min=float(self.env.now),
replication=self.recorder.replication_index,
scenario_id=self.scenario.scenario_id,
truck_id=truck_id,
event_type=event_type,
from_node=from_node,
to_node=to_node,
location=location,
loaded=loaded,
payload_tonnes=payload_tonnes,
resource_id=resource_id,
queue_length=queue_length,
)
)
# ------------------------------------------------------------------
# Dispatch helper
# ------------------------------------------------------------------
def _choose_loader(
self,
origin: str,
speed_factor: float,
) -> _LoaderHandle:
"""Apply the dispatch policy: argmin over loaders.
Cost = travel_to_loader + queue_len * mean_load_time + own_load_time.
``travel_to_loader`` uses the precomputed free-flow route time
scaled by the truck's empty ``speed_factor``. ``queue_len`` is the
live SimPy resource count + queue length (i.e. includes the truck
currently being served). Ties are broken by lower loader id.
"""
best: _LoaderHandle | None = None
best_cost = float("inf")
# Sort to make ties deterministic.
for loader_id in sorted(self.loaders):
handle = self.loaders[loader_id]
route = self.routes.get(origin, handle.spec.node_id)
if route is None:
continue
travel_min = route.free_flow_time_min / max(speed_factor, 1e-9)
queue_len = handle.resource.count + len(handle.resource.queue)
cost = (
travel_min
+ queue_len * handle.spec.mean_load_time_min
+ handle.spec.mean_load_time_min
)
if cost < best_cost:
best = handle
best_cost = cost
if best is None:
raise RuntimeError(
f"No reachable loader from {origin} — reachability check broken?"
)
return best
# ------------------------------------------------------------------
# Travel along a route (sequence of edges)
# ------------------------------------------------------------------
def _travel(
self,
truck_id: str,
edge_ids: Iterable[str],
speed_factor: float,
loaded: bool,
payload_tonnes: float,
):
"""Generator: walk a route, holding capacity-1 edge resources.
Capacity-1 edges are wrapped with request/release; queue waits and
traversal times are recorded. Free-flow edges are plain timeouts.
Productive busy time is accumulated for every wall-clock minute of
wait + traversal.
"""
edge_noise_rng = self.rng["edge_noise"]
for edge_id in edge_ids:
edge = self.topology.edges[edge_id]
travel_time = _stochastic_edge_time_min(
edge=edge,
speed_factor=speed_factor,
travel_noise_cv=self.scenario.stochasticity.travel_time_noise_cv,
rng=edge_noise_rng,
)
if edge.is_capacity_constrained:
handle = self.edge_resources[edge_id]
# Per conceptual model: edge_enter / edge_leave bracket the
# *holding* of the Resource. Truck arrives at from_node and
# silently joins the queue; once granted (request boundary)
# we fire edge_enter, traverse, then fire edge_leave at the
# release boundary just before exiting the ``with`` block.
wait_start = self.env.now
with handle.resource.request() as req:
yield req
# --- Request boundary: resource just acquired ---
wait_time = self.env.now - wait_start
self._emit(
truck_id,
EVENT_EDGE_ENTER,
from_node=edge.from_node,
to_node=edge.to_node,
location=edge.from_node,
loaded=loaded,
payload_tonnes=payload_tonnes,
resource_id=edge_id,
queue_length=(
handle.resource.count + len(handle.resource.queue)
),
)
yield self.env.timeout(travel_time)
self.recorder.record_edge_traversal(
edge_id=edge_id,
wait_time_min=wait_time,
traversal_time_min=travel_time,
)
self.recorder.add_productive_time(
truck_id, wait_time + travel_time
)
# --- Release boundary: about to exit ``with`` block ---
self._emit(
truck_id,
EVENT_EDGE_LEAVE,
from_node=edge.from_node,
to_node=edge.to_node,
location=edge.to_node,
loaded=loaded,
payload_tonnes=payload_tonnes,
resource_id=edge_id,
queue_length=(
handle.resource.count + len(handle.resource.queue)
),
)
else:
yield self.env.timeout(travel_time)
self.recorder.add_productive_time(truck_id, travel_time)
# ------------------------------------------------------------------
# Loading / dumping
# ------------------------------------------------------------------
def _load(
self,
truck: TruckSpec,
loader: _LoaderHandle,
):
"""Generator: queue + service at a loader, emit events."""
loading_rng = self.rng["loading"]
queue_len = loader.resource.count + len(loader.resource.queue)
self._emit(
truck.truck_id,
EVENT_ARRIVE_LOADER,
location=loader.spec.node_id,
loaded=False,
payload_tonnes=0.0,
resource_id=loader.spec.loader_id,
queue_length=queue_len,
)
wait_start = self.env.now
with loader.resource.request() as req:
yield req
wait_time = self.env.now - wait_start
self._emit(
truck.truck_id,
EVENT_START_LOAD,
location=loader.spec.node_id,
loaded=False,
payload_tonnes=0.0,
resource_id=loader.spec.loader_id,
queue_length=loader.resource.count + len(loader.resource.queue),
)
duration = truncated_normal(
loading_rng,
mean=loader.spec.mean_load_time_min,
sd=loader.spec.sd_load_time_min,
)
yield self.env.timeout(duration)
self.recorder.record_loader_service(
loader_id=loader.spec.loader_id,
wait_time_min=wait_time,
service_time_min=duration,
)
self.recorder.add_productive_time(
truck.truck_id, wait_time + duration
)
self._emit(
truck.truck_id,
EVENT_END_LOAD,
location=loader.spec.node_id,
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=loader.spec.loader_id,
queue_length=loader.resource.count + len(loader.resource.queue),
)
self._emit(
truck.truck_id,
EVENT_DEPART_LOADER,
location=loader.spec.node_id,
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=loader.spec.loader_id,
queue_length=loader.resource.count + len(loader.resource.queue),
)
def _dump(
self,
truck: TruckSpec,
):
"""Generator: queue + service at the crusher; credit tonnes if before cut."""
dumping_rng = self.rng["dumping"]
queue_len = self.crusher.resource.count + len(self.crusher.resource.queue)
self._emit(
truck.truck_id,
EVENT_ARRIVE_CRUSHER,
location="CRUSH",
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=self.crusher.dump_id,
queue_length=queue_len,
)
wait_start = self.env.now
with self.crusher.resource.request() as req:
yield req
wait_time = self.env.now - wait_start
self._emit(
truck.truck_id,
EVENT_START_DUMP,
location="CRUSH",
loaded=True,
payload_tonnes=truck.payload_tonnes,
resource_id=self.crusher.dump_id,
queue_length=self.crusher.resource.count
+ len(self.crusher.resource.queue),
)
duration = truncated_normal(
dumping_rng,
mean=self.crusher.mean_dump_time_min,
sd=self.crusher.sd_dump_time_min,
)
yield self.env.timeout(duration)
self.recorder.record_crusher_service(
wait_time_min=wait_time,
service_time_min=duration,
)
self.recorder.add_productive_time(
truck.truck_id, wait_time + duration
)
now = self.env.now
# Hard cut: only credit completed dumps strictly before shift end.
if now < self.recorder.shift_length_min:
self.recorder.record_completed_dump(now, truck.truck_id)
self.recorder.record_cycle_end(truck.truck_id, now)
self._emit(
truck.truck_id,
EVENT_END_DUMP,
location="CRUSH",
loaded=False,
payload_tonnes=truck.payload_tonnes,
resource_id=self.crusher.dump_id,
queue_length=self.crusher.resource.count
+ len(self.crusher.resource.queue),
)
self._emit(
truck.truck_id,
EVENT_DEPART_CRUSHER,
location="CRUSH",
loaded=False,
payload_tonnes=0.0,
resource_id=self.crusher.dump_id,
queue_length=self.crusher.resource.count
+ len(self.crusher.resource.queue),
)
# ------------------------------------------------------------------
# Main truck loop
# ------------------------------------------------------------------
def _truck_process(self, truck: TruckSpec):
"""SimPy process generator implementing the ore haulage loop."""
# Initial dispatch event (all trucks released at t=0)
self.recorder.record_dispatch(truck.truck_id, time_min=self.env.now)
self._emit(
truck.truck_id,
EVENT_DISPATCH,
location=truck.start_node,
loaded=False,
payload_tonnes=0.0,
queue_length=0,
)
current_node = truck.start_node
shift_min = self.recorder.shift_length_min
while self.env.now < shift_min:
# ---- Choose a loader given current state -------------------
chosen = self._choose_loader(
origin=current_node,
speed_factor=truck.empty_speed_factor,
)
route_to_loader = self.routes.require(
current_node, chosen.spec.node_id
)
# Travel empty to the loader
yield from self._travel(
truck_id=truck.truck_id,
edge_ids=route_to_loader.edge_ids,
speed_factor=truck.empty_speed_factor,
loaded=False,
payload_tonnes=0.0,
)
current_node = chosen.spec.node_id
if self.env.now >= shift_min:
break
# ---- Load -----------------------------------------------------
yield from self._load(truck, chosen)
if self.env.now >= shift_min:
break
# ---- Travel loaded to crusher ---------------------------------
route_to_crusher = self.routes.require(current_node, "CRUSH")
yield from self._travel(
truck_id=truck.truck_id,
edge_ids=route_to_crusher.edge_ids,
speed_factor=truck.loaded_speed_factor,
loaded=True,
payload_tonnes=truck.payload_tonnes,
)
current_node = "CRUSH"
if self.env.now >= shift_min:
break
# ---- Dump -----------------------------------------------------
yield from self._dump(truck)
# The dump generator is responsible for crediting tonnes &
# cycle end if before the shift cut.
# Loop continues; truck will pick a new loader next iteration.
# ------------------------------------------------------------------
# Run
# ------------------------------------------------------------------
def run(self, until_min: float | None = None) -> None:
"""Run the simulation until the shift cut (or a custom horizon)."""
horizon = (
until_min if until_min is not None else self.recorder.shift_length_min
)
self.env.run(until=horizon)
__all__ = [
"MineSimulation",
]
src/mine_sim/model.py
← Back to submission · View raw on GitHub