"""SimPy discrete-event simulation of mine haulage."""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import networkx as nx
import numpy as np
import pandas as pd
import simpy
from .topology import (
EdgeRecord,
NodeRecord,
Router,
apply_edge_overrides,
apply_node_overrides,
build_graph,
lane_id_of,
load_edges,
load_nodes,
)
# ---------- Stochastic helpers --------------------------------------------------
def truncated_normal(rng: np.random.Generator, mean: float, sd: float,
min_val: Optional[float] = None) -> float:
"""Normal sample clipped at lower bound. Default lower bound = max(0.05, mean-3*sd)."""
if sd <= 0:
return float(mean)
lower = min_val if min_val is not None else max(0.05, mean - 3.0 * sd)
val = rng.normal(loc=mean, scale=sd)
if val < lower:
val = lower
return float(val)
def lognormal_noise(rng: np.random.Generator, cv: float) -> float:
"""Multiplicative lognormal noise factor with given coefficient of variation."""
if cv <= 0:
return 1.0
sigma = math.sqrt(math.log(1.0 + cv * cv))
mu = -0.5 * sigma * sigma
return float(rng.lognormal(mean=mu, sigma=sigma))
# ---------- Stats containers ----------------------------------------------------
@dataclass
class TruckStats:
truck_id: str
cycles_completed: int = 0
tonnes_delivered: float = 0.0
busy_time_min: float = 0.0 # any active activity (loading, dumping, travelling)
queue_loader_time_min: float = 0.0
queue_crusher_time_min: float = 0.0
queue_lane_time_min: float = 0.0
cycle_times_min: List[float] = field(default_factory=list)
last_cycle_start: float = 0.0
@dataclass
class ResourceStats:
name: str
capacity: int
busy_time_min: float = 0.0
queue_time_min: float = 0.0
requests: int = 0
last_state_change: float = 0.0
in_use: int = 0
queue_len: int = 0
queue_wait_samples: List[float] = field(default_factory=list)
# ---------- Simulation state ----------------------------------------------------
@dataclass
class SimConfig:
scenario_id: str
replication: int
random_seed: int
shift_minutes: float
truck_count: int
travel_cv: float
routing_objective: str
dispatch_policy: str
dispatch_tiebreak: str
allow_bypass: bool
road_capacity_enabled: bool
class SimState:
def __init__(
self,
env: simpy.Environment,
cfg: SimConfig,
nodes: Dict[str, NodeRecord],
edges: List[EdgeRecord],
graph: nx.DiGraph,
router: Router,
loader_specs: List[Dict[str, Any]],
crusher_spec: Dict[str, Any],
loaded_factor: float,
empty_factor: float,
rng: np.random.Generator,
event_log: List[Dict[str, Any]],
):
self.env = env
self.cfg = cfg
self.nodes = nodes
self.edges = edges
self.graph = graph
self.router = router
self.rng = rng
self.loaded_factor = loaded_factor
self.empty_factor = empty_factor
self.event_log = event_log
# Loaders: one Resource per loader, with metadata.
self.loaders: Dict[str, simpy.Resource] = {}
self.loader_specs: Dict[str, Dict[str, Any]] = {}
self.loader_stats: Dict[str, ResourceStats] = {}
for spec in loader_specs:
lid = spec["loader_id"]
cap = int(spec.get("capacity", 1))
self.loaders[lid] = simpy.Resource(env, capacity=cap)
self.loader_specs[lid] = spec
self.loader_stats[lid] = ResourceStats(name=lid, capacity=cap)
# Crusher
self.crusher: simpy.Resource = simpy.Resource(
env, capacity=int(crusher_spec.get("capacity", 1))
)
self.crusher_spec = crusher_spec
self.crusher_stats = ResourceStats(name="crusher", capacity=int(crusher_spec.get("capacity", 1)))
# Lane resources for capacity-constrained edges.
self.lane_resources: Dict[str, simpy.Resource] = {}
self.lane_capacity: Dict[str, int] = {}
self.lane_stats: Dict[str, ResourceStats] = {}
if cfg.road_capacity_enabled:
lane_min_capacity: Dict[str, int] = {}
for e in edges:
if e.closed:
continue
if e.capacity >= 999:
continue
lid = lane_id_of(e.edge_id)
if lid not in lane_min_capacity or e.capacity < lane_min_capacity[lid]:
lane_min_capacity[lid] = e.capacity
for lid, cap in lane_min_capacity.items():
self.lane_resources[lid] = simpy.Resource(env, capacity=int(cap))
self.lane_capacity[lid] = int(cap)
self.lane_stats[lid] = ResourceStats(name=f"lane_{lid}", capacity=int(cap))
self.truck_stats: Dict[str, TruckStats] = {}
# Resource utilisation accounting -------------------------------------------------
def _record_resource_busy(self, stats: ResourceStats, busy: bool) -> None:
now = self.env.now
# If transitioning, accumulate prior period
if stats.in_use > 0:
stats.busy_time_min += (now - stats.last_state_change) * stats.in_use / max(stats.capacity, 1)
# Record updated state - actually we'll just track raw busy fraction.
stats.last_state_change = now
if busy:
stats.in_use = min(stats.in_use + 1, stats.capacity)
else:
stats.in_use = max(stats.in_use - 1, 0)
def request_loader(self, loader_id: str):
return self.loaders[loader_id].request()
def request_crusher(self):
return self.crusher.request()
def request_lane(self, lane_id: str):
if lane_id in self.lane_resources:
return self.lane_resources[lane_id].request()
return None
# ---------- Truck process -------------------------------------------------------
def _expected_cycle_time(state: SimState, current_node: str, loader_id: str,
load_node: str, dump_node: str) -> float:
"""Estimate one full cycle time from current_node through the loader/dump."""
spec = state.loader_specs[loader_id]
load_time = float(spec["mean_load_time_min"])
dump_time = float(state.crusher_spec.get("mean_dump_time_min", 3.5))
t_to_load = state.router.path_time_min(current_node, load_node)
t_to_dump = state.router.path_time_min(load_node, dump_node)
queue_pen = (
state.loaders[loader_id].count * float(spec["mean_load_time_min"])
+ len(state.loaders[loader_id].queue) * float(spec["mean_load_time_min"])
)
return t_to_load + load_time + t_to_dump + dump_time + queue_pen
def _select_loader(state: SimState, current_node: str, dump_node: str) -> str:
policy = state.cfg.dispatch_policy
tiebreak = state.cfg.dispatch_tiebreak
candidates: List[Tuple[str, str, float, float]] = []
for lid, spec in state.loader_specs.items():
node = spec["node_id"]
try:
t_to = state.router.path_time_min(current_node, node)
except RuntimeError:
continue # unreachable
cycle = _expected_cycle_time(state, current_node, lid, node, dump_node)
candidates.append((lid, node, t_to, cycle))
if not candidates:
raise RuntimeError("No loader reachable from current location")
if policy == "nearest_available_loader":
# Prefer idle loader; among tied, shortest expected cycle time.
idle = [c for c in candidates if state.loaders[c[0]].count == 0
and len(state.loaders[c[0]].queue) == 0]
pool = idle if idle else candidates
if tiebreak == "shortest_expected_cycle_time":
pool.sort(key=lambda c: (c[3], c[2]))
else:
pool.sort(key=lambda c: (c[2], c[3]))
return pool[0][0]
# default: shortest cycle time
candidates.sort(key=lambda c: (c[3], c[2]))
return candidates[0][0]
def _emit(state: SimState, *, truck_id: str, event_type: str,
from_node: str = "", to_node: str = "", location: str = "",
loaded: Optional[bool] = None, payload_tonnes: float = 0.0,
resource_id: str = "", queue_length: int = 0) -> None:
state.event_log.append({
"time_min": round(float(state.env.now), 4),
"replication": state.cfg.replication,
"scenario_id": state.cfg.scenario_id,
"truck_id": truck_id,
"event_type": event_type,
"from_node": from_node,
"to_node": to_node,
"location": location,
"loaded": "" if loaded is None else int(bool(loaded)),
"payload_tonnes": round(float(payload_tonnes), 4),
"resource_id": resource_id,
"queue_length": int(queue_length),
})
def _travel(state: SimState, truck_id: str, source: str, target: str, *, loaded: bool,
payload: float):
"""Generator: traverse shortest path from source to target.
Holds capacity-constrained lane resources for the duration of each segment.
"""
if source == target:
return
path = state.router.shortest_path(source, target)
truck = state.truck_stats[truck_id]
speed_factor = state.loaded_factor if loaded else state.empty_factor
for u, v in zip(path[:-1], path[1:]):
edge_data = state.graph[u][v]
if edge_data.get("closed"):
raise RuntimeError(
f"Attempt to traverse closed edge {edge_data.get('edge_id')}"
)
lane_id = edge_data["lane_id"]
capacity = int(edge_data["capacity"])
distance_m = float(edge_data["distance_m"])
max_speed = float(edge_data["max_speed_kph"])
# Nominal travel time at this speed.
nominal_min = (distance_m / 1000.0) / max(max_speed * speed_factor, 1e-6) * 60.0
noise = lognormal_noise(state.rng, state.cfg.travel_cv)
actual_min = nominal_min * noise
constrained = (
state.cfg.road_capacity_enabled
and capacity < 999
and lane_id in state.lane_resources
)
if constrained:
req = state.lane_resources[lane_id].request()
t_request = state.env.now
yield req
wait = state.env.now - t_request
truck.queue_lane_time_min += wait
stats = state.lane_stats[lane_id]
stats.requests += 1
stats.queue_wait_samples.append(wait)
t_start = state.env.now
_emit(state, truck_id=truck_id, event_type="enter_edge",
from_node=u, to_node=v, location=u,
loaded=loaded, payload_tonnes=payload,
resource_id=edge_data["edge_id"],
queue_length=len(state.lane_resources[lane_id].queue))
yield state.env.timeout(actual_min)
stats.busy_time_min += state.env.now - t_start
state.lane_resources[lane_id].release(req)
_emit(state, truck_id=truck_id, event_type="exit_edge",
from_node=u, to_node=v, location=v,
loaded=loaded, payload_tonnes=payload,
resource_id=edge_data["edge_id"])
else:
_emit(state, truck_id=truck_id, event_type="enter_edge",
from_node=u, to_node=v, location=u,
loaded=loaded, payload_tonnes=payload,
resource_id=edge_data["edge_id"])
yield state.env.timeout(actual_min)
_emit(state, truck_id=truck_id, event_type="exit_edge",
from_node=u, to_node=v, location=v,
loaded=loaded, payload_tonnes=payload,
resource_id=edge_data["edge_id"])
truck.busy_time_min += actual_min
def truck_process(state: SimState, truck_id: str, payload_tonnes: float):
env = state.env
truck = state.truck_stats[truck_id]
truck.last_cycle_start = env.now
current_node = "PARK"
dump_node = "CRUSH"
_emit(state, truck_id=truck_id, event_type="dispatched",
location=current_node, loaded=False, payload_tonnes=0.0)
while env.now < state.cfg.shift_minutes:
cycle_start = env.now
# Choose loader.
loader_id = _select_loader(state, current_node, dump_node)
load_node = state.loader_specs[loader_id]["node_id"]
spec = state.loader_specs[loader_id]
_emit(state, truck_id=truck_id, event_type="route_to_loader",
from_node=current_node, to_node=load_node, location=current_node,
loaded=False, resource_id=loader_id)
# Travel empty to load node.
yield from _travel(state, truck_id, current_node, load_node,
loaded=False, payload=0.0)
current_node = load_node
_emit(state, truck_id=truck_id, event_type="arrive_loader",
location=load_node, loaded=False, resource_id=loader_id,
queue_length=len(state.loaders[loader_id].queue))
if env.now >= state.cfg.shift_minutes:
break
# Queue for loader.
req = state.loaders[loader_id].request()
t_q = env.now
_emit(state, truck_id=truck_id, event_type="queue_loader",
location=load_node, loaded=False, resource_id=loader_id,
queue_length=len(state.loaders[loader_id].queue))
yield req
wait = env.now - t_q
truck.queue_loader_time_min += wait
loader_stats = state.loader_stats[loader_id]
loader_stats.requests += 1
loader_stats.queue_wait_samples.append(wait)
# Loading.
load_time = truncated_normal(
state.rng,
mean=float(spec["mean_load_time_min"]),
sd=float(spec.get("sd_load_time_min", 0.0)),
)
_emit(state, truck_id=truck_id, event_type="load_start",
location=load_node, loaded=False, resource_id=loader_id)
t_start = env.now
yield env.timeout(load_time)
loader_stats.busy_time_min += env.now - t_start
truck.busy_time_min += load_time
state.loaders[loader_id].release(req)
_emit(state, truck_id=truck_id, event_type="load_end",
location=load_node, loaded=True, payload_tonnes=payload_tonnes,
resource_id=loader_id)
# Travel loaded to crusher.
yield from _travel(state, truck_id, load_node, dump_node,
loaded=True, payload=payload_tonnes)
current_node = dump_node
_emit(state, truck_id=truck_id, event_type="arrive_crusher",
location=dump_node, loaded=True, payload_tonnes=payload_tonnes,
resource_id="crusher",
queue_length=len(state.crusher.queue))
# Queue for crusher.
req_c = state.crusher.request()
t_q = env.now
_emit(state, truck_id=truck_id, event_type="queue_crusher",
location=dump_node, loaded=True, payload_tonnes=payload_tonnes,
resource_id="crusher",
queue_length=len(state.crusher.queue))
yield req_c
wait_c = env.now - t_q
truck.queue_crusher_time_min += wait_c
state.crusher_stats.requests += 1
state.crusher_stats.queue_wait_samples.append(wait_c)
dump_time = truncated_normal(
state.rng,
mean=float(state.crusher_spec.get("mean_dump_time_min", 3.5)),
sd=float(state.crusher_spec.get("sd_dump_time_min", 0.0)),
)
_emit(state, truck_id=truck_id, event_type="dump_start",
location=dump_node, loaded=True, payload_tonnes=payload_tonnes,
resource_id="crusher")
t_start = env.now
yield env.timeout(dump_time)
state.crusher_stats.busy_time_min += env.now - t_start
truck.busy_time_min += dump_time
state.crusher.release(req_c)
# Cycle complete: tonnes counted at dump-end (crusher delivery).
truck.cycles_completed += 1
truck.tonnes_delivered += payload_tonnes
truck.cycle_times_min.append(env.now - cycle_start)
_emit(state, truck_id=truck_id, event_type="dump_end",
location=dump_node, loaded=False, payload_tonnes=payload_tonnes,
resource_id="crusher")
_emit(state, truck_id=truck_id, event_type="shift_end",
location=current_node, loaded=False)
# ---------- Run a single replication --------------------------------------------
def run_replication(
*,
scenario: Dict[str, Any],
nodes: Dict[str, NodeRecord],
edges: List[EdgeRecord],
truck_records: pd.DataFrame,
loader_records: pd.DataFrame,
dump_records: pd.DataFrame,
replication_index: int,
base_seed: int,
capture_event_log: bool = True,
) -> Dict[str, Any]:
seed = base_seed + replication_index
rng = np.random.default_rng(seed)
sim_cfg_raw = scenario.get("simulation", {})
shift_minutes = float(sim_cfg_raw.get("shift_length_hours", 8)) * 60.0
routing = scenario.get("routing", {})
dispatching = scenario.get("dispatching", {})
stochastic = scenario.get("stochasticity", {})
fleet = scenario.get("fleet", {})
cfg = SimConfig(
scenario_id=str(scenario.get("scenario_id", "")),
replication=replication_index,
random_seed=seed,
shift_minutes=shift_minutes,
truck_count=int(fleet.get("truck_count", len(truck_records))),
travel_cv=float(stochastic.get("travel_time_noise_cv", 0.0)),
routing_objective=str(routing.get("objective", "shortest_time")),
dispatch_policy=str(dispatching.get("policy", "nearest_available_loader")),
dispatch_tiebreak=str(dispatching.get(
"tie_breaker", "shortest_expected_cycle_time"
)),
allow_bypass=bool(routing.get("allow_bypass", True)),
road_capacity_enabled=bool(routing.get("road_capacity_enabled", True)),
)
# Apply overrides.
edges_eff = apply_edge_overrides(edges, scenario.get("edge_overrides", {}))
nodes_eff = apply_node_overrides(nodes, scenario.get("node_overrides", {}))
# Loaded/empty speed factors come from the truck records (homogeneous fleet here).
if not truck_records.empty:
empty_factor = float(truck_records.iloc[0]["empty_speed_factor"])
loaded_factor = float(truck_records.iloc[0]["loaded_speed_factor"])
else:
empty_factor, loaded_factor = 1.0, 0.85
graph = build_graph(
nodes_eff, edges_eff,
empty_speed_factor=empty_factor,
loaded_speed_factor=loaded_factor,
drop_closed=True,
)
router = Router(graph, weight_attr="weight")
# Loader specs (with possible overrides).
loader_specs: List[Dict[str, Any]] = []
loader_overrides = scenario.get("loader_overrides", {})
for _, row in loader_records.iterrows():
spec = {
"loader_id": str(row["loader_id"]),
"node_id": str(row["node_id"]),
"capacity": int(row["capacity"]),
"bucket_capacity_tonnes": float(row["bucket_capacity_tonnes"]),
"mean_load_time_min": float(row["mean_load_time_min"]),
"sd_load_time_min": float(row["sd_load_time_min"]),
"availability": float(row["availability"]),
}
if spec["loader_id"] in loader_overrides:
spec.update(loader_overrides[spec["loader_id"]])
loader_specs.append(spec)
# Crusher (dump) spec.
dump_overrides = scenario.get("dump_point_overrides", {})
crusher_row = dump_records[dump_records["dump_id"] == "D_CRUSH"].iloc[0]
crusher_spec = {
"dump_id": str(crusher_row["dump_id"]),
"node_id": str(crusher_row["node_id"]),
"type": str(crusher_row["type"]),
"capacity": int(crusher_row["capacity"]),
"mean_dump_time_min": float(crusher_row["mean_dump_time_min"]),
"sd_dump_time_min": float(crusher_row["sd_dump_time_min"]),
}
if "D_CRUSH" in dump_overrides:
crusher_spec.update(dump_overrides["D_CRUSH"])
# Build SimPy environment and state.
env = simpy.Environment()
event_log: List[Dict[str, Any]] = []
state = SimState(
env=env, cfg=cfg, nodes=nodes_eff, edges=edges_eff,
graph=graph, router=router,
loader_specs=loader_specs, crusher_spec=crusher_spec,
loaded_factor=loaded_factor, empty_factor=empty_factor,
rng=rng, event_log=event_log,
)
# Build trucks (subset by truck_count, in order from CSV).
selected_trucks = truck_records.iloc[: cfg.truck_count]
for _, t_row in selected_trucks.iterrows():
tid = str(t_row["truck_id"])
state.truck_stats[tid] = TruckStats(truck_id=tid)
env.process(truck_process(state, tid, payload_tonnes=float(t_row["payload_tonnes"])))
# Run.
env.run(until=shift_minutes)
# Aggregate metrics.
total_tonnes = sum(t.tonnes_delivered for t in state.truck_stats.values())
cycle_times = [c for t in state.truck_stats.values() for c in t.cycle_times_min]
avg_cycle = float(np.mean(cycle_times)) if cycle_times else 0.0
avg_truck_busy = float(np.mean([t.busy_time_min / shift_minutes
for t in state.truck_stats.values()]))
# Per-loader queue wait (mean wait per request at each loader)
loader_queue_waits: Dict[str, float] = {}
for lid, ls in state.loader_stats.items():
loader_queue_waits[lid] = float(np.mean(ls.queue_wait_samples)) if ls.queue_wait_samples else 0.0
crusher_queue_wait = (
float(np.mean(state.crusher_stats.queue_wait_samples))
if state.crusher_stats.queue_wait_samples else 0.0
)
# Resource utilisation.
loader_util: Dict[str, float] = {}
for lid, ls in state.loader_stats.items():
loader_util[lid] = min(1.0, ls.busy_time_min / shift_minutes)
crusher_util = min(1.0, state.crusher_stats.busy_time_min / shift_minutes)
# Lane utilisation.
lane_util: Dict[str, float] = {}
lane_queue_wait: Dict[str, float] = {}
for lid, ls in state.lane_stats.items():
lane_util[lid] = min(1.0, ls.busy_time_min / (shift_minutes * max(ls.capacity, 1)))
lane_queue_wait[lid] = float(np.mean(ls.queue_wait_samples)) if ls.queue_wait_samples else 0.0
return {
"scenario_id": cfg.scenario_id,
"replication": replication_index,
"random_seed": seed,
"shift_minutes": shift_minutes,
"truck_count": cfg.truck_count,
"total_tonnes_delivered": total_tonnes,
"tonnes_per_hour": total_tonnes / (shift_minutes / 60.0),
"average_truck_cycle_time_min": avg_cycle,
"average_truck_utilisation": avg_truck_busy,
"crusher_utilisation": crusher_util,
"loader_utilisation": loader_util,
"average_loader_queue_time_min": float(np.mean(list(loader_queue_waits.values())))
if loader_queue_waits else 0.0,
"loader_queue_waits": loader_queue_waits,
"average_crusher_queue_time_min": crusher_queue_wait,
"lane_utilisation": lane_util,
"lane_queue_wait_min": lane_queue_wait,
"cycles_completed": int(sum(t.cycles_completed for t in state.truck_stats.values())),
"event_log": event_log if capture_event_log else None,
}
def load_input_dataframes(data_dir: Path) -> Tuple[Dict[str, NodeRecord], List[EdgeRecord],
pd.DataFrame, pd.DataFrame, pd.DataFrame]:
nodes = load_nodes(data_dir / "nodes.csv")
edges = load_edges(data_dir / "edges.csv")
trucks = pd.read_csv(data_dir / "trucks.csv")
loaders = pd.read_csv(data_dir / "loaders.csv")
dump_points = pd.read_csv(data_dir / "dump_points.csv")
return nodes, edges, trucks, loaders, dump_points
src/simulation.py
← Back to submission · View raw on GitHub