"""SimPy simulation core: resources, truck process, dispatcher, event log.
Each replication instantiates a fresh :class:`MineSim`, registers SimPy
resources (loaders, crusher, capacity-constrained edges) and then runs one
truck process per truck in the fleet. Events that matter for traceability
or for utilisation/queue accounting are appended to ``event_log``.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
import networkx as nx
import numpy as np
import simpy
from .model import (
CONSTRAINED_CAPACITY_THRESHOLD,
compute_route,
lognormal_unit_mean,
truncated_normal,
)
# --- Public dataclasses for stats ----------------------------------------------------
@dataclass
class ResourceStats:
"""Cumulative busy/queue stats for a single SimPy resource."""
busy_time: float = 0.0
queue_wait_total: float = 0.0
queue_wait_count: int = 0
queue_length_samples: list[tuple[float, int]] = field(default_factory=list)
def mean_queue_wait(self) -> float:
return self.queue_wait_total / self.queue_wait_count if self.queue_wait_count else 0.0
@dataclass
class TruckStats:
"""Per-truck metrics."""
truck_id: str
cycles_completed: int = 0
total_loaded_tonnes: float = 0.0
busy_time: float = 0.0 # time travelling, loading, dumping (NOT queueing)
queue_time: float = 0.0
cycle_starts: list[float] = field(default_factory=list)
# --- Main sim object -----------------------------------------------------------------
class MineSim:
"""One replication of the mine throughput simulation.
Attributes set after :meth:`run` finishes:
total_tonnes_delivered, dump_events, event_log,
truck_stats, loader_stats, crusher_stats, edge_stats.
"""
def __init__(self,
env: simpy.Environment,
scenario_cfg: dict[str, Any],
graph: nx.DiGraph,
trucks_df,
loaders_df,
dump_points_df,
edges_df,
rng: np.random.Generator,
scenario_id: str,
replication: int):
self.env = env
self.scenario_cfg = scenario_cfg
self.graph = graph
self.trucks_df = trucks_df
self.loaders_df = loaders_df
self.dump_points_df = dump_points_df
self.edges_df = edges_df
self.rng = rng
self.scenario_id = scenario_id
self.replication = replication
self.shift_end_min: float = float(scenario_cfg["simulation"]["shift_length_hours"]) * 60.0
self.travel_cv: float = float(scenario_cfg["stochasticity"].get("travel_time_noise_cv", 0.0))
self.dump_node: str = scenario_cfg["production"]["dump_destination"]
# Resources
self.loaders: dict[str, simpy.Resource] = {}
self.loader_meta: dict[str, dict[str, Any]] = {}
for _, row in loaders_df.iterrows():
self.loaders[row["loader_id"]] = simpy.Resource(env, capacity=int(row["capacity"]))
self.loader_meta[row["loader_id"]] = {
"node_id": row["node_id"],
"mean_load_time_min": float(row["mean_load_time_min"]),
"sd_load_time_min": float(row["sd_load_time_min"]),
"bucket_capacity_tonnes": float(row["bucket_capacity_tonnes"]),
}
dump_row = dump_points_df.loc[dump_points_df["node_id"] == self.dump_node].iloc[0]
self.crusher = simpy.Resource(env, capacity=int(dump_row["capacity"]))
self.crusher_meta = {
"dump_id": dump_row["dump_id"],
"mean_dump_time_min": float(dump_row["mean_dump_time_min"]),
"sd_dump_time_min": float(dump_row["sd_dump_time_min"]),
}
# Constrained edges only (capacity < threshold). Open edges only.
self.edge_resources: dict[str, simpy.Resource] = {}
self.edge_meta: dict[str, dict[str, Any]] = {}
for _, row in edges_df.iterrows():
if bool(row["closed"]):
continue
cap = int(row["capacity"])
if cap >= CONSTRAINED_CAPACITY_THRESHOLD:
continue
self.edge_resources[row["edge_id"]] = simpy.Resource(env, capacity=cap)
self.edge_meta[row["edge_id"]] = {
"from_node": row["from_node"],
"to_node": row["to_node"],
}
# Stats
self.loader_stats: dict[str, ResourceStats] = {lid: ResourceStats() for lid in self.loaders}
self.crusher_stats = ResourceStats()
self.edge_stats: dict[str, ResourceStats] = {eid: ResourceStats() for eid in self.edge_resources}
self.truck_stats: dict[str, TruckStats] = {}
self.event_log: list[dict[str, Any]] = []
self.total_tonnes_delivered: float = 0.0
self.dump_events: int = 0
# ---- logging helpers ----------------------------------------------------
def _shift_clipped(self, start: float, duration: float) -> float:
"""Return the portion of ``[start, start+duration]`` inside the shift.
Used so utilisation stats only credit work done during the 8-hour shift,
not the tail period in which already-loaded trucks finish their dumps.
"""
end = start + duration
return max(0.0, min(end, self.shift_end_min) - max(start, 0.0))
def log(self, truck_id: str, event_type: str,
from_node: str | None = None, to_node: str | None = None,
location: str | None = None, loaded: bool | None = None,
payload_tonnes: float | None = None,
resource_id: str | None = None,
queue_length: int | None = None) -> None:
self.event_log.append({
"time_min": round(self.env.now, 4),
"replication": self.replication,
"scenario_id": self.scenario_id,
"truck_id": truck_id,
"event_type": event_type,
"from_node": from_node,
"to_node": to_node,
"location": location,
"loaded": loaded,
"payload_tonnes": payload_tonnes,
"resource_id": resource_id,
"queue_length": queue_length,
})
# ---- resource interactions ---------------------------------------------
def _request_resource(self, resource: simpy.Resource, stats: ResourceStats):
"""Generator-friendly helper. Yields the queue-wait duration."""
arrival = self.env.now
stats.queue_length_samples.append((arrival, len(resource.queue)))
return resource.request(), arrival
def traverse_edge(self, edge: dict[str, Any], speed_factor: float, truck_id: str,
*, loaded: bool):
"""Generator: traverse a single edge, requesting its resource if constrained."""
edge_id = edge["edge_id"]
# Stochastic multiplier on travel time (mean 1).
noise = lognormal_unit_mean(self.rng, self.travel_cv)
# speed_factor < 1 for loaded trucks => longer travel.
travel_min = (edge["travel_min"] / max(speed_factor, 1e-6)) * noise
if edge_id in self.edge_resources:
resource = self.edge_resources[edge_id]
stats = self.edge_stats[edge_id]
arrival = self.env.now
stats.queue_length_samples.append((arrival, len(resource.queue)))
self.log(truck_id, "edge_queue_join",
from_node=edge["from_node"], to_node=edge["to_node"],
loaded=loaded, resource_id=edge_id,
queue_length=len(resource.queue))
with resource.request() as req:
yield req
wait = self.env.now - arrival
stats.queue_wait_total += wait
stats.queue_wait_count += 1
self.log(truck_id, "edge_entered",
from_node=edge["from_node"], to_node=edge["to_node"],
loaded=loaded, resource_id=edge_id,
queue_length=len(resource.queue))
start = self.env.now
yield self.env.timeout(travel_min)
stats.busy_time += self._shift_clipped(start, travel_min)
self.log(truck_id, "edge_exited",
from_node=edge["from_node"], to_node=edge["to_node"],
loaded=loaded, resource_id=edge_id,
queue_length=len(resource.queue))
else:
self.log(truck_id, "edge_traversed_unconstrained",
from_node=edge["from_node"], to_node=edge["to_node"],
loaded=loaded, resource_id=edge_id)
yield self.env.timeout(travel_min)
# ---- dispatcher ---------------------------------------------------------
def pick_loader(self, current_node: str) -> str:
"""Choose a loader by ``nearest_available_loader`` policy.
Score = travel_time(current_node -> loader_node)
+ queue_size * mean_load_time_loader.
Tie-breaker: shorter expected return cycle (loader -> crusher).
"""
best_id = None
best_score = float("inf")
best_return = float("inf")
for lid, meta in self.loader_meta.items():
try:
edges = compute_route(self.graph, current_node, meta["node_id"])
travel = sum(e["travel_min"] for e in edges)
except Exception:
continue
queue = len(self.loaders[lid].queue) + (1 if self.loaders[lid].count else 0)
score = travel + queue * meta["mean_load_time_min"]
if score < best_score - 1e-9:
best_id, best_score = lid, score
# Tie-breaker: pre-compute return-leg time.
try:
ret_edges = compute_route(self.graph, meta["node_id"], self.dump_node)
best_return = sum(e["travel_min"] for e in ret_edges)
except Exception:
best_return = float("inf")
elif abs(score - best_score) < 1e-9:
try:
ret_edges = compute_route(self.graph, meta["node_id"], self.dump_node)
ret_time = sum(e["travel_min"] for e in ret_edges)
except Exception:
ret_time = float("inf")
if ret_time < best_return:
best_id, best_return = lid, ret_time
if best_id is None:
raise RuntimeError(f"No reachable loader from {current_node}")
return best_id
# ---- truck process ------------------------------------------------------
def truck_process(self, truck_row, initial_stagger: float):
"""SimPy process for a single truck."""
env = self.env
truck_id = truck_row["truck_id"]
empty_factor = float(truck_row["empty_speed_factor"])
loaded_factor = float(truck_row["loaded_speed_factor"])
payload = float(truck_row["payload_tonnes"])
ts = TruckStats(truck_id=truck_id)
self.truck_stats[truck_id] = ts
current_node = truck_row["start_node"]
yield env.timeout(initial_stagger)
self.log(truck_id, "truck_dispatched", location=current_node, loaded=False)
while env.now < self.shift_end_min:
# Choose a loader and route to it.
loader_id = self.pick_loader(current_node)
loader_meta = self.loader_meta[loader_id]
target = loader_meta["node_id"]
try:
edges = compute_route(self.graph, current_node, target)
except Exception as exc:
self.log(truck_id, "routing_error", location=current_node)
raise
ts.cycle_starts.append(env.now)
travel_start = env.now
for e in edges:
yield from self.traverse_edge(e, empty_factor, truck_id, loaded=False)
current_node = e["to_node"]
ts.busy_time += self._shift_clipped(travel_start, env.now - travel_start)
self.log(truck_id, "arrived_at_loader",
location=target, resource_id=loader_id, loaded=False)
# Request the loader.
loader = self.loaders[loader_id]
lstats = self.loader_stats[loader_id]
arrival = env.now
lstats.queue_length_samples.append((arrival, len(loader.queue)))
with loader.request() as req:
yield req
wait = env.now - arrival
lstats.queue_wait_total += wait
lstats.queue_wait_count += 1
ts.queue_time += wait
self.log(truck_id, "load_start",
location=target, resource_id=loader_id, loaded=False,
queue_length=len(loader.queue))
load_time = truncated_normal(self.rng,
loader_meta["mean_load_time_min"],
loader_meta["sd_load_time_min"])
load_start = env.now
yield env.timeout(load_time)
in_shift = self._shift_clipped(load_start, load_time)
lstats.busy_time += in_shift
ts.busy_time += in_shift
self.log(truck_id, "load_end",
location=target, resource_id=loader_id, loaded=True,
payload_tonnes=payload)
# Route to dump and traverse loaded.
try:
edges_back = compute_route(self.graph, target, self.dump_node)
except Exception:
self.log(truck_id, "routing_error", location=target)
raise
travel_start = env.now
for e in edges_back:
yield from self.traverse_edge(e, loaded_factor, truck_id, loaded=True)
current_node = e["to_node"]
ts.busy_time += self._shift_clipped(travel_start, env.now - travel_start)
self.log(truck_id, "arrived_at_crusher",
location=self.dump_node, resource_id=self.crusher_meta["dump_id"],
loaded=True, payload_tonnes=payload)
# Dump.
arrival = env.now
self.crusher_stats.queue_length_samples.append((arrival, len(self.crusher.queue)))
with self.crusher.request() as req:
yield req
wait = env.now - arrival
self.crusher_stats.queue_wait_total += wait
self.crusher_stats.queue_wait_count += 1
ts.queue_time += wait
self.log(truck_id, "dump_start",
location=self.dump_node, resource_id=self.crusher_meta["dump_id"],
loaded=True, payload_tonnes=payload,
queue_length=len(self.crusher.queue))
dump_time = truncated_normal(self.rng,
self.crusher_meta["mean_dump_time_min"],
self.crusher_meta["sd_dump_time_min"])
dump_start = env.now
yield env.timeout(dump_time)
in_shift = self._shift_clipped(dump_start, dump_time)
self.crusher_stats.busy_time += in_shift
ts.busy_time += in_shift
self.log(truck_id, "dump_end",
location=self.dump_node, resource_id=self.crusher_meta["dump_id"],
loaded=False, payload_tonnes=payload)
# Tonnes count only at completed dump events.
self.total_tonnes_delivered += payload
self.dump_events += 1
ts.cycles_completed += 1
ts.total_loaded_tonnes += payload
self.log(truck_id, "shift_end_truncated", location=current_node, loaded=False)
# ---- run ----------------------------------------------------------------
def run(self) -> None:
"""Schedule all trucks and run the simulation until shift end (plus tail)."""
# Random initial stagger over [0, 60] s = [0, 1] min, per truck.
for _, truck_row in self.trucks_df.iterrows():
stagger = float(self.rng.uniform(0.0, 1.0))
self.env.process(self.truck_process(truck_row, stagger))
# Run until well past shift_end so any in-flight loaded trucks finish.
self.env.run(until=self.shift_end_min + 240.0)
def aggregate_truck_metrics(sim: MineSim) -> dict[str, float]:
"""Compute fleet-level metrics after a single run."""
n_trucks = len(sim.truck_stats)
shift_min = sim.shift_end_min
if n_trucks == 0:
return {}
# Cycle time: mean inter-cycle-start gap (across all trucks).
deltas: list[float] = []
for ts in sim.truck_stats.values():
starts = sorted(ts.cycle_starts)
for a, b in zip(starts[:-1], starts[1:]):
deltas.append(b - a)
avg_cycle = float(np.mean(deltas)) if deltas else float("nan")
truck_busy_total = sum(ts.busy_time for ts in sim.truck_stats.values())
truck_util = truck_busy_total / (n_trucks * shift_min) if shift_min > 0 else 0.0
loader_util = {
lid: stats.busy_time / shift_min if shift_min > 0 else 0.0
for lid, stats in sim.loader_stats.items()
}
crusher_util = sim.crusher_stats.busy_time / shift_min if shift_min > 0 else 0.0
avg_loader_q = (
sum(s.queue_wait_total for s in sim.loader_stats.values())
/ max(1, sum(s.queue_wait_count for s in sim.loader_stats.values()))
)
avg_crusher_q = sim.crusher_stats.mean_queue_wait()
return {
"total_tonnes_delivered": sim.total_tonnes_delivered,
"tonnes_per_hour": sim.total_tonnes_delivered / (shift_min / 60.0),
"average_truck_cycle_time_min": avg_cycle,
"average_truck_utilisation": truck_util,
"crusher_utilisation": crusher_util,
"loader_utilisation": loader_util,
"average_loader_queue_time_min": avg_loader_q,
"average_crusher_queue_time_min": avg_crusher_q,
"dump_events": sim.dump_events,
}
src/simulation.py
← Back to submission · View raw on GitHub