"""Discrete-event simulation of a synthetic open-pit mine haulage system.
This module is the *model layer*. It is responsible for:
* loading the topology / equipment data (``data/*.csv``);
* resolving a scenario configuration (``data/scenarios/*.yaml``) including the
``inherits:`` chain and any ``*_overrides`` blocks;
* building a directed road graph and computing shortest-*time* routes;
* running a single SimPy replication of an 8-hour shift and returning a rich
metrics dictionary plus (optionally) a detailed event trace.
The orchestration layer (scenarios x replications, aggregation, file writing)
lives in ``run_experiment.py`` so that the model stays free of I/O concerns.
Design summary
--------------
* Trucks are active SimPy processes running a load -> haul -> dump -> return
cycle until the shift clock expires.
* Loaders, the crusher dump and capacity-constrained road segments are SimPy
``Resource`` objects (capacity taken from the data).
* Throughput is recorded only on *completed* dump events at the crusher.
* All randomness is drawn from a single ``numpy`` generator seeded per
replication so runs are reproducible; the same seed index is reused across
scenarios to give common random numbers for paired comparisons.
See ``conceptual_model.md`` for the full conceptual model.
"""
from __future__ import annotations
import csv
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable
import networkx as nx
import numpy as np
import simpy
import yaml
# A road segment whose ``capacity`` is at least this value is treated as
# effectively unconstrained (no SimPy resource is created for it). The data
# uses 999 as the "wide road" sentinel; the ramp upgrade scenario raises the
# ramp capacity to 999 which, with this rule, removes the ramp constraint.
UNCONSTRAINED_CAPACITY = 999
class RouteError(RuntimeError):
"""Raised when a required origin/destination pair has no feasible route.
Per the task brief, an impossible route must fail clearly rather than
silently producing misleading throughput numbers.
"""
# --------------------------------------------------------------------------- #
# Data records
# --------------------------------------------------------------------------- #
@dataclass
class Node:
node_id: str
node_name: str
node_type: str
x: float
y: float
z: float
service_mean: float | None = None
service_sd: float | None = None
@dataclass
class Edge:
edge_id: str
from_node: str
to_node: str
distance_m: float
max_speed_kph: float
road_type: str
capacity: float
closed: bool = False
def base_time_min(self, loaded: bool, empty_factor: float, loaded_factor: float) -> float:
"""Deterministic traversal time (minutes) before travel noise."""
factor = loaded_factor if loaded else empty_factor
speed = self.max_speed_kph * factor
return (self.distance_m / 1000.0) / speed * 60.0
@property
def constrained(self) -> bool:
return self.capacity < UNCONSTRAINED_CAPACITY
@dataclass
class Loader:
loader_id: str
node_id: str
capacity: int
bucket_capacity_tonnes: float
mean_load_min: float
sd_load_min: float
availability: float
@dataclass
class Dump:
dump_id: str
node_id: str
dump_type: str
capacity: int
mean_dump_min: float
sd_dump_min: float
@dataclass
class Truck:
truck_id: str
payload_tonnes: float
empty_speed_factor: float
loaded_speed_factor: float
availability: float
start_node: str
@dataclass
class ScenarioConfig:
scenario_id: str
description: str
shift_length_hours: float
replications: int
base_random_seed: int
warmup_minutes: float
routing: dict[str, Any]
production: dict[str, Any]
dispatching: dict[str, Any]
stochasticity: dict[str, Any]
truck_count: int
edge_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)
node_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)
dump_point_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)
loader_overrides: dict[str, dict[str, Any]] = field(default_factory=dict)
# --------------------------------------------------------------------------- #
# Data loading
# --------------------------------------------------------------------------- #
def _read_csv(path: Path) -> list[dict[str, str]]:
with path.open(newline="", encoding="utf-8") as fh:
return list(csv.DictReader(fh))
def _to_float(value: str | None, default: float | None = None) -> float | None:
if value is None or str(value).strip() == "":
return default
return float(value)
def load_nodes(data_dir: Path) -> dict[str, Node]:
nodes: dict[str, Node] = {}
for row in _read_csv(data_dir / "nodes.csv"):
nodes[row["node_id"]] = Node(
node_id=row["node_id"],
node_name=row["node_name"],
node_type=row["node_type"],
x=_to_float(row["x_m"], 0.0),
y=_to_float(row["y_m"], 0.0),
z=_to_float(row["z_m"], 0.0),
service_mean=_to_float(row.get("service_time_mean_min")),
service_sd=_to_float(row.get("service_time_sd_min")),
)
return nodes
def load_edges(data_dir: Path) -> dict[str, Edge]:
edges: dict[str, Edge] = {}
for row in _read_csv(data_dir / "edges.csv"):
edges[row["edge_id"]] = Edge(
edge_id=row["edge_id"],
from_node=row["from_node"],
to_node=row["to_node"],
distance_m=_to_float(row["distance_m"], 0.0),
max_speed_kph=_to_float(row["max_speed_kph"], 1.0),
road_type=row["road_type"],
capacity=_to_float(row["capacity"], UNCONSTRAINED_CAPACITY),
closed=str(row.get("closed", "false")).strip().lower() == "true",
)
return edges
def load_loaders(data_dir: Path) -> dict[str, Loader]:
loaders: dict[str, Loader] = {}
for row in _read_csv(data_dir / "loaders.csv"):
loaders[row["loader_id"]] = Loader(
loader_id=row["loader_id"],
node_id=row["node_id"],
capacity=int(_to_float(row["capacity"], 1)),
bucket_capacity_tonnes=_to_float(row["bucket_capacity_tonnes"], 100.0),
mean_load_min=_to_float(row["mean_load_time_min"], 5.0),
sd_load_min=_to_float(row["sd_load_time_min"], 0.0),
availability=_to_float(row["availability"], 1.0),
)
return loaders
def load_dumps(data_dir: Path) -> dict[str, Dump]:
dumps: dict[str, Dump] = {}
for row in _read_csv(data_dir / "dump_points.csv"):
dumps[row["dump_id"]] = Dump(
dump_id=row["dump_id"],
node_id=row["node_id"],
dump_type=row["type"],
capacity=int(_to_float(row["capacity"], 1)),
mean_dump_min=_to_float(row["mean_dump_time_min"], 4.0),
sd_dump_min=_to_float(row["sd_dump_time_min"], 0.0),
)
return dumps
def load_trucks(data_dir: Path) -> list[Truck]:
trucks: list[Truck] = []
for row in _read_csv(data_dir / "trucks.csv"):
trucks.append(
Truck(
truck_id=row["truck_id"],
payload_tonnes=_to_float(row["payload_tonnes"], 100.0),
empty_speed_factor=_to_float(row["empty_speed_factor"], 1.0),
loaded_speed_factor=_to_float(row["loaded_speed_factor"], 1.0),
availability=_to_float(row["availability"], 1.0),
start_node=row["start_node"],
)
)
return trucks
# --------------------------------------------------------------------------- #
# Scenario configuration (inheritance + overrides)
# --------------------------------------------------------------------------- #
def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""Recursively merge ``override`` into a copy of ``base``."""
result = dict(base)
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = _deep_merge(result[key], value)
else:
result[key] = value
return result
def load_raw_scenario(scenario_path: Path, base_dir: Path | None = None) -> dict[str, Any]:
"""Load a scenario YAML, resolving its ``inherits:`` chain.
A parent named by ``inherits`` is searched for first next to the child and
then in ``base_dir`` (the canonical ``data/scenarios`` directory), so that
an agent-proposed scenario stored elsewhere can still inherit ``baseline``.
"""
raw = yaml.safe_load(scenario_path.read_text(encoding="utf-8")) or {}
parent_name = raw.get("inherits")
if parent_name:
search = [scenario_path.parent / f"{parent_name}.yaml"]
if base_dir is not None:
search.append(base_dir / f"{parent_name}.yaml")
parent_path = next((p for p in search if p.exists()), None)
if parent_path is None:
raise FileNotFoundError(
f"Scenario '{scenario_path.name}' inherits '{parent_name}' "
f"but no {parent_name}.yaml was found in {[str(p) for p in search]}."
)
parent = load_raw_scenario(parent_path, base_dir)
raw = _deep_merge(parent, {k: v for k, v in raw.items() if k != "inherits"})
return raw
def build_scenario_config(scenario_path: Path, base_dir: Path | None = None) -> ScenarioConfig:
raw = load_raw_scenario(scenario_path, base_dir)
sim = raw.get("simulation", {})
return ScenarioConfig(
scenario_id=raw["scenario_id"],
description=raw.get("description", ""),
shift_length_hours=float(sim.get("shift_length_hours", 8)),
replications=int(sim.get("replications", 30)),
base_random_seed=int(sim.get("base_random_seed", 12345)),
warmup_minutes=float(sim.get("warmup_minutes", 0)),
routing=raw.get("routing", {}),
production=raw.get("production", {}),
dispatching=raw.get("dispatching", {}),
stochasticity=raw.get("stochasticity", {}),
truck_count=int(raw.get("fleet", {}).get("truck_count", 8)),
edge_overrides=raw.get("edge_overrides", {}) or {},
node_overrides=raw.get("node_overrides", {}) or {},
dump_point_overrides=raw.get("dump_point_overrides", {}) or {},
loader_overrides=raw.get("loader_overrides", {}) or {},
)
def apply_overrides(
config: ScenarioConfig,
nodes: dict[str, Node],
edges: dict[str, Edge],
loaders: dict[str, Loader],
dumps: dict[str, Dump],
) -> tuple[dict[str, Node], dict[str, Edge], dict[str, Loader], dict[str, Dump]]:
"""Return scenario-specific copies of the world with overrides applied."""
edges = {eid: Edge(**vars(e)) for eid, e in edges.items()}
nodes = {nid: Node(**vars(n)) for nid, n in nodes.items()}
loaders = {lid: Loader(**vars(l)) for lid, l in loaders.items()}
dumps = {did: Dump(**vars(d)) for did, d in dumps.items()}
for eid, ov in config.edge_overrides.items():
if eid not in edges:
raise KeyError(f"edge_override targets unknown edge '{eid}'")
e = edges[eid]
if "capacity" in ov:
e.capacity = float(ov["capacity"])
if "max_speed_kph" in ov:
e.max_speed_kph = float(ov["max_speed_kph"])
if "closed" in ov:
e.closed = bool(ov["closed"])
if "distance_m" in ov:
e.distance_m = float(ov["distance_m"])
for nid, ov in config.node_overrides.items():
if nid not in nodes:
raise KeyError(f"node_override targets unknown node '{nid}'")
n = nodes[nid]
if "service_time_mean_min" in ov:
n.service_mean = float(ov["service_time_mean_min"])
if "service_time_sd_min" in ov:
n.service_sd = float(ov["service_time_sd_min"])
for did, ov in config.dump_point_overrides.items():
if did not in dumps:
raise KeyError(f"dump_point_override targets unknown dump '{did}'")
d = dumps[did]
if "mean_dump_time_min" in ov:
d.mean_dump_min = float(ov["mean_dump_time_min"])
if "sd_dump_time_min" in ov:
d.sd_dump_min = float(ov["sd_dump_time_min"])
if "capacity" in ov:
d.capacity = int(ov["capacity"])
for lid, ov in config.loader_overrides.items():
if lid not in loaders:
raise KeyError(f"loader_override targets unknown loader '{lid}'")
l = loaders[lid]
if "mean_load_time_min" in ov:
l.mean_load_min = float(ov["mean_load_time_min"])
if "sd_load_time_min" in ov:
l.sd_load_min = float(ov["sd_load_time_min"])
if "capacity" in ov:
l.capacity = int(ov["capacity"])
return nodes, edges, loaders, dumps
# --------------------------------------------------------------------------- #
# Routing
# --------------------------------------------------------------------------- #
class Router:
"""Shortest-time routing over the (scenario-specific) road graph.
Edge weight is the deterministic free-flow traversal time at the edge's
maximum speed. Because the truck speed factor is a uniform multiplier it
does not change which path is shortest, so a single graph serves both
loaded and empty routing; the per-leg *time* is recomputed with the right
factor when needed.
"""
def __init__(self, edges: dict[str, Edge]):
self.edges = edges
self.graph = nx.DiGraph()
for e in edges.values():
if e.closed:
continue
self.graph.add_edge(
e.from_node,
e.to_node,
weight=(e.distance_m / 1000.0) / e.max_speed_kph * 60.0,
edge_id=e.edge_id,
)
self._route_cache: dict[tuple[str, str], list[str]] = {}
def route_edges(self, src: str, dst: str) -> list[str]:
"""Return the list of edge ids on the shortest-time path src -> dst."""
if src == dst:
return []
key = (src, dst)
if key in self._route_cache:
return self._route_cache[key]
try:
node_path = nx.shortest_path(self.graph, src, dst, weight="weight")
except (nx.NetworkXNoPath, nx.NodeNotFound) as exc:
raise RouteError(f"No route from {src} to {dst} (closed roads?).") from exc
edge_ids = [self.graph[a][b]["edge_id"] for a, b in zip(node_path[:-1], node_path[1:])]
self._route_cache[key] = edge_ids
return edge_ids
def leg_time(self, src: str, dst: str, loaded: bool, empty_factor: float, loaded_factor: float) -> float:
"""Deterministic (noise-free) travel time in minutes for a leg."""
return sum(
self.edges[eid].base_time_min(loaded, empty_factor, loaded_factor)
for eid in self.route_edges(src, dst)
)
# --------------------------------------------------------------------------- #
# Stochastic helpers
# --------------------------------------------------------------------------- #
def truncated_normal(rng: np.random.Generator, mean: float, sd: float, low: float = 0.0) -> float:
"""Sample a normal truncated below ``low`` (resample, then clamp)."""
if sd <= 0:
return max(mean, low + 1e-9)
for _ in range(16):
value = rng.normal(mean, sd)
if value > low:
return value
return max(mean, low + 1e-9)
def travel_multiplier(rng: np.random.Generator, cv: float) -> float:
"""Positive multiplicative travel-time noise with mean 1 and given CV."""
if cv <= 0:
return 1.0
sigma = math.sqrt(math.log(1.0 + cv * cv))
mu = -0.5 * sigma * sigma
return float(rng.lognormal(mu, sigma))
# --------------------------------------------------------------------------- #
# Replication recorder
# --------------------------------------------------------------------------- #
class Recorder:
"""Accumulates counters during one replication (independent of logging)."""
def __init__(self, truck_ids: list[str], loader_ids: list[str],
dump_id: str, constrained_edge_ids: list[str]):
self.tonnes = 0.0
self.dumps = 0
self.crusher_busy = 0.0
self.crusher_wait: list[float] = []
self.loader_busy = {lid: 0.0 for lid in loader_ids}
self.loader_wait: list[float] = []
self.loader_wait_by = {lid: [] for lid in loader_ids}
self.edge_busy = {eid: 0.0 for eid in constrained_edge_ids}
self.edge_wait = {eid: 0.0 for eid in constrained_edge_ids}
self.edge_wait_count = {eid: 0 for eid in constrained_edge_ids}
self.truck_travel = {tid: 0.0 for tid in truck_ids}
self.truck_load = {tid: 0.0 for tid in truck_ids}
self.truck_dump = {tid: 0.0 for tid in truck_ids}
self.truck_loader_wait = {tid: 0.0 for tid in truck_ids}
self.truck_crusher_wait = {tid: 0.0 for tid in truck_ids}
self.truck_load_starts = {tid: [] for tid in truck_ids}
# --------------------------------------------------------------------------- #
# Single replication
# --------------------------------------------------------------------------- #
def run_replication(
config: ScenarioConfig,
nodes: dict[str, Node],
edges: dict[str, Edge],
loaders: dict[str, Loader],
dumps: dict[str, Dump],
trucks: list[Truck],
replication: int,
log_events: bool = False,
) -> dict[str, Any]:
"""Run one 8-hour shift replication and return a metrics dictionary.
If ``log_events`` is True a detailed event trace is returned under the
``"events"`` key (used for replication 0 of each scenario).
"""
s_nodes, s_edges, s_loaders, s_dumps = apply_overrides(config, nodes, edges, loaders, dumps)
router = Router(s_edges)
shift_min = config.shift_length_hours * 60.0
warmup_min = config.warmup_minutes
travel_cv = float(config.stochasticity.get("travel_time_noise_cv", 0.0))
road_caps_on = bool(config.routing.get("road_capacity_enabled", True))
# Production targets.
ore_sources = list(config.production.get("ore_sources", [l.node_id for l in s_loaders.values()]))
dump_destination = config.production.get("dump_destination", "CRUSH")
crusher_node = dump_destination
dump_candidates = [d for d in s_dumps.values() if d.node_id == dump_destination]
if not dump_candidates:
raise RouteError(f"No dump point at destination node '{dump_destination}'.")
crusher = dump_candidates[0]
active_loaders = [l for l in s_loaders.values() if l.node_id in ore_sources and l.availability > 0]
if not active_loaders:
raise RouteError("No active loaders available for the configured ore sources.")
fleet = trucks[: config.truck_count]
if not fleet:
raise RouteError("Fleet is empty; truck_count resolved to 0.")
ref = fleet[0] # all trucks identical in the supplied data
# Validate every leg the dispatcher may ask for (fail loudly up-front).
for loader in active_loaders:
router.route_edges(ref.start_node, loader.node_id)
router.route_edges(loader.node_id, crusher_node)
router.route_edges(crusher_node, loader.node_id)
# SimPy resources.
env = simpy.Environment()
loader_res = {l.loader_id: simpy.Resource(env, capacity=max(1, l.capacity)) for l in active_loaders}
crusher_res = simpy.Resource(env, capacity=max(1, crusher.capacity))
# Committed-assignment counter: trucks dispatched to a loader but not yet
# finished loading. Used by the dispatcher so in-flight trucks are counted
# when estimating the wait, which spreads the fleet across loaders instead
# of herding everyone onto the nominally-nearest one.
assigned = {l.loader_id: 0 for l in active_loaders}
constrained_edge_ids = [e.edge_id for e in s_edges.values() if e.constrained and road_caps_on]
edge_res = {eid: simpy.Resource(env, capacity=max(1, int(s_edges[eid].capacity))) for eid in constrained_edge_ids}
rec = Recorder([t.truck_id for t in fleet], list(loader_res), crusher.dump_id, constrained_edge_ids)
events: list[dict[str, Any]] = []
def log(truck_id: str, event_type: str, *, from_node: str = "", to_node: str = "",
location: str = "", loaded: bool | str = "", payload: float | str = "",
resource_id: str = "", queue_length: int | str = "") -> None:
if not log_events:
return
events.append({
"time_min": round(env.now, 4),
"replication": replication,
"scenario_id": config.scenario_id,
"truck_id": truck_id,
"event_type": event_type,
"from_node": from_node,
"to_node": to_node,
"location": location,
"loaded": int(loaded) if isinstance(loaded, bool) else loaded,
"payload_tonnes": payload,
"resource_id": resource_id,
"queue_length": queue_length,
})
def choose_loader(current_node: str) -> Loader:
"""Nearest-available-loader dispatch, tie-broken by expected cycle time.
Score = estimated time until *this* truck starts loading
= travel time to the loader
+ (queued + in-service requests) x mean load time.
Ties (and the documented secondary objective) are broken by the
loader's expected full cycle time (load + haul + dump + return).
"""
best: Loader | None = None
best_key: tuple[float, float, str] | None = None
for loader in active_loaders:
travel = router.leg_time(current_node, loader.node_id, False,
ref.empty_speed_factor, ref.loaded_speed_factor)
pending = assigned[loader.loader_id]
est_start = travel + pending * loader.mean_load_min
cycle = (loader.mean_load_min
+ router.leg_time(loader.node_id, crusher_node, True,
ref.empty_speed_factor, ref.loaded_speed_factor)
+ crusher.mean_dump_min
+ router.leg_time(crusher_node, loader.node_id, False,
ref.empty_speed_factor, ref.loaded_speed_factor))
key = (est_start, cycle, loader.loader_id)
if best_key is None or key < best_key:
best_key, best = key, loader
assert best is not None
return best
def traverse(truck: Truck, src: str, dst: str, loaded: bool):
"""Generator: move a truck along the shortest-time route src -> dst."""
for eid in router.route_edges(src, dst):
edge = s_edges[eid]
duration = edge.base_time_min(loaded, truck.empty_speed_factor,
truck.loaded_speed_factor) * travel_multiplier(rng, travel_cv)
res = edge_res.get(eid)
payload = truck.payload_tonnes if loaded else 0
if res is not None:
join = env.now
request = res.request()
yield request
wait = env.now - join
rec.edge_wait[eid] += wait
rec.edge_wait_count[eid] += 1
log(truck.truck_id, "enter_edge", from_node=edge.from_node, to_node=edge.to_node,
loaded=loaded, payload=payload, resource_id=eid, queue_length=len(res.queue))
try:
yield env.timeout(duration)
finally:
res.release(request)
rec.edge_busy[eid] += duration
else:
log(truck.truck_id, "enter_edge", from_node=edge.from_node, to_node=edge.to_node,
loaded=loaded, payload=payload)
yield env.timeout(duration)
rec.truck_travel[truck.truck_id] += duration
def truck_process(truck: Truck):
location = truck.start_node
while True:
loader = choose_loader(location)
assigned[loader.loader_id] += 1
log(truck.truck_id, "dispatch", location=location, to_node=loader.node_id,
resource_id=loader.loader_id)
# Empty travel to the assigned loader.
yield from traverse(truck, location, loader.node_id, loaded=False)
location = loader.node_id
# Queue + load.
res = loader_res[loader.loader_id]
join = env.now
log(truck.truck_id, "queue_loader", location=location,
resource_id=loader.loader_id, queue_length=len(res.queue))
request = res.request()
yield request
wait = env.now - join
rec.loader_wait.append(wait)
rec.loader_wait_by[loader.loader_id].append(wait)
rec.truck_loader_wait[truck.truck_id] += wait
load_time = truncated_normal(rng, loader.mean_load_min, loader.sd_load_min)
rec.truck_load_starts[truck.truck_id].append(env.now)
log(truck.truck_id, "load_start", location=location, resource_id=loader.loader_id)
yield env.timeout(load_time)
res.release(request)
assigned[loader.loader_id] -= 1
rec.loader_busy[loader.loader_id] += load_time
rec.truck_load[truck.truck_id] += load_time
log(truck.truck_id, "load_end", location=location, resource_id=loader.loader_id,
loaded=True, payload=truck.payload_tonnes)
# Loaded haul to the crusher.
yield from traverse(truck, location, crusher_node, loaded=True)
location = crusher_node
# Queue + dump.
join = env.now
log(truck.truck_id, "queue_crusher", location=location, resource_id=crusher.dump_id,
queue_length=len(crusher_res.queue), loaded=True, payload=truck.payload_tonnes)
request = crusher_res.request()
yield request
wait = env.now - join
rec.crusher_wait.append(wait)
rec.truck_crusher_wait[truck.truck_id] += wait
dump_time = truncated_normal(rng, crusher.mean_dump_min, crusher.sd_dump_min)
log(truck.truck_id, "dump_start", location=location, resource_id=crusher.dump_id,
loaded=True, payload=truck.payload_tonnes)
yield env.timeout(dump_time)
crusher_res.release(request)
rec.crusher_busy += dump_time
rec.truck_dump[truck.truck_id] += dump_time
if env.now >= warmup_min:
rec.tonnes += truck.payload_tonnes
rec.dumps += 1
log(truck.truck_id, "dump_end", location=location, resource_id=crusher.dump_id,
loaded=False, payload=truck.payload_tonnes)
# Loop: the empty return is the first leg of the next dispatch.
rng = np.random.default_rng(config.base_random_seed + replication)
for truck in fleet:
env.process(truck_process(truck))
env.run(until=shift_min)
return _summarise_replication(config, replication, shift_min, warmup_min, fleet,
active_loaders, crusher, constrained_edge_ids,
s_edges, rec, events if log_events else None)
def _summarise_replication(config, replication, shift_min, warmup_min, fleet,
active_loaders, crusher, constrained_edge_ids, s_edges,
rec: Recorder, events: list[dict[str, Any]] | None) -> dict[str, Any]:
window_hours = (shift_min - warmup_min) / 60.0
seed = config.base_random_seed + replication
cycles: list[float] = []
for starts in rec.truck_load_starts.values():
cycles.extend(b - a for a, b in zip(starts[:-1], starts[1:]))
truck_utils = []
for truck in fleet:
productive = (rec.truck_travel[truck.truck_id]
+ rec.truck_load[truck.truck_id]
+ rec.truck_dump[truck.truck_id])
truck_utils.append(min(productive / shift_min, 1.0))
# Utilisation is the average fraction of servers busy, i.e. normalised by
# the resource capacity, so multi-server resources stay on a 0..1 scale.
loader_caps = {l.loader_id: max(1, l.capacity) for l in active_loaders}
loader_util = {lid: rec.loader_busy[lid] / (shift_min * loader_caps[lid]) for lid in rec.loader_busy}
loader_qt = {
lid: (sum(w) / len(w) if w else 0.0)
for lid, w in rec.loader_wait_by.items()
}
edge_util = {
eid: rec.edge_busy[eid] / (shift_min * max(1, int(s_edges[eid].capacity)))
for eid in constrained_edge_ids
}
edge_qt = {
eid: (rec.edge_wait[eid] / rec.edge_wait_count[eid] if rec.edge_wait_count[eid] else 0.0)
for eid in constrained_edge_ids
}
# Combined per-resource utilisation table for bottleneck ranking.
resource_util = {crusher.dump_id: rec.crusher_busy / (shift_min * max(1, crusher.capacity))}
resource_kind = {crusher.dump_id: "crusher"}
resource_qt = {crusher.dump_id: (sum(rec.crusher_wait) / len(rec.crusher_wait) if rec.crusher_wait else 0.0)}
for lid in loader_util:
resource_util[lid] = loader_util[lid]
resource_kind[lid] = "loader"
resource_qt[lid] = loader_qt[lid]
for eid in edge_util:
resource_util[eid] = edge_util[eid]
resource_kind[eid] = "road"
resource_qt[eid] = edge_qt[eid]
def _mean(values: Iterable[float]) -> float:
values = list(values)
return float(sum(values) / len(values)) if values else float("nan")
metrics: dict[str, Any] = {
"scenario_id": config.scenario_id,
"replication": replication,
"random_seed": seed,
"total_tonnes_delivered": rec.tonnes,
"tonnes_per_hour": rec.tonnes / window_hours if window_hours else float("nan"),
"n_dumps": rec.dumps,
"average_truck_cycle_time_min": _mean(cycles),
"average_truck_utilisation": _mean(truck_utils),
"crusher_utilisation": rec.crusher_busy / (shift_min * max(1, crusher.capacity)),
"average_loader_queue_time_min": _mean(rec.loader_wait),
"average_crusher_queue_time_min": _mean(rec.crusher_wait),
"loader_utilisation": loader_util,
"loader_queue_time": loader_qt,
"edge_utilisation": edge_util,
"edge_queue_time": edge_qt,
"resource_utilisation": resource_util,
"resource_kind": resource_kind,
"resource_queue_time": resource_qt,
}
if events is not None:
metrics["events"] = events
return metrics
# --------------------------------------------------------------------------- #
# Convenience loader for the whole data directory
# --------------------------------------------------------------------------- #
@dataclass
class World:
nodes: dict[str, Node]
edges: dict[str, Edge]
loaders: dict[str, Loader]
dumps: dict[str, Dump]
trucks: list[Truck]
def load_world(data_dir: Path) -> World:
return World(
nodes=load_nodes(data_dir),
edges=load_edges(data_dir),
loaders=load_loaders(data_dir),
dumps=load_dumps(data_dir),
trucks=load_trucks(data_dir),
)
mine_sim.py
← Back to submission · View raw on GitHub