"""sim.py — Scenario runner for mine haulage discrete-event simulation.
Usage:
python sim.py # run all scenarios
python sim.py --scenario baseline # single scenario
python sim.py --replications 10 # override replications
python sim.py --plot # also generate topology.png
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
import yaml
from scipy import stats
from sim_core import (
build_graph,
extract_metrics,
run_replication,
)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
ROOT = Path(__file__).parent
DATA = ROOT / "data"
SCENARIOS_DIR = DATA / "scenarios"
# ---------------------------------------------------------------------------
# Data loading
# ---------------------------------------------------------------------------
def load_data():
nodes = pd.read_csv(DATA / "nodes.csv")
edges = pd.read_csv(DATA / "edges.csv")
trucks = pd.read_csv(DATA / "trucks.csv")
loaders = pd.read_csv(DATA / "loaders.csv")
dump_points = pd.read_csv(DATA / "dump_points.csv")
return nodes, edges, trucks, loaders, dump_points
def load_scenario_yaml(path: Path) -> Dict:
with open(path) as f:
return yaml.safe_load(f)
def resolve_scenario(yaml_path: Path, all_yamls: Dict[str, Dict]) -> Dict:
"""Merge inherited base with overrides, returning a flat config dict."""
raw = load_scenario_yaml(yaml_path)
base: Dict = {}
if "inherits" in raw:
parent_id = raw["inherits"]
parent_path = SCENARIOS_DIR / f"{parent_id}.yaml"
base = resolve_scenario(parent_path, all_yamls)
# Deep merge raw into base
merged = _deep_merge(base, raw)
return merged
def _deep_merge(base: Dict, override: Dict) -> Dict:
result = dict(base)
for k, v in override.items():
if isinstance(v, dict) and isinstance(result.get(k), dict):
result[k] = _deep_merge(result[k], v)
else:
result[k] = v
return result
def flatten_scenario(merged: Dict) -> Dict:
"""Extract flat config values from nested YAML structure."""
sim = merged.get("simulation", {})
routing = merged.get("routing", {})
stoch = merged.get("stochasticity", {})
fleet = merged.get("fleet", {})
production = merged.get("production", {})
return {
"scenario_id": merged.get("scenario_id", "unknown"),
"description": merged.get("description", ""),
"shift_length_min": sim.get("shift_length_hours", 8) * 60.0,
"replications": sim.get("replications", 30),
"base_seed": sim.get("base_random_seed", 12345),
"noise_cv": stoch.get("travel_time_noise_cv", 0.10),
"dump_destination": production.get("dump_destination", "CRUSH"),
"truck_count": fleet.get("truck_count", 8),
"edge_overrides": merged.get("edge_overrides", {}),
"node_overrides": merged.get("node_overrides", {}),
"dump_point_overrides": merged.get("dump_point_overrides", {}),
}
# ---------------------------------------------------------------------------
# Confidence intervals
# ---------------------------------------------------------------------------
def ci95(values: List[float]) -> tuple[float, float]:
n = len(values)
if n < 2:
v = values[0] if values else 0.0
return v, v
m = np.mean(values)
se = stats.sem(values)
h = se * stats.t.ppf(0.975, df=n - 1)
return float(m - h), float(m + h)
# ---------------------------------------------------------------------------
# Run one scenario
# ---------------------------------------------------------------------------
def run_scenario(
cfg: Dict,
nodes: pd.DataFrame,
edges: pd.DataFrame,
trucks: pd.DataFrame,
loaders_df: pd.DataFrame,
dump_points_df: pd.DataFrame,
rep_override: Optional[int] = None,
) -> tuple[List[Dict], List[Dict]]:
scenario_id = cfg["scenario_id"]
n_reps = rep_override if rep_override is not None else cfg["replications"]
base_seed = cfg["base_seed"]
G = build_graph(
nodes, edges,
edge_overrides=cfg.get("edge_overrides"),
node_overrides=cfg.get("node_overrides"),
)
# Apply dump point overrides
dp_overrides = cfg.get("dump_point_overrides", {})
dump_rows = []
for _, row in dump_points_df.iterrows():
d = row.to_dict()
if row["dump_id"] in dp_overrides:
d.update(dp_overrides[row["dump_id"]])
dump_rows.append(d)
dump_points_resolved = pd.DataFrame(dump_rows)
dump_node = cfg["dump_destination"]
dump_point_row = dump_points_resolved[dump_points_resolved["node_id"] == dump_node].iloc[0]
dump_point = dump_point_row.to_dict()
loaders = [
row.to_dict()
for _, row in loaders_df.iterrows()
if row["node_id"] in cfg.get("ore_sources",
["LOAD_N", "LOAD_S"])
or True # include all loaders
]
# Only include loaders reachable from PARK
reachable_loaders = []
for loader in loaders:
try:
import networkx as nx
nx.shortest_path(G, "PARK", loader["node_id"])
reachable_loaders.append(loader)
except Exception:
pass
if not reachable_loaders:
print(f" WARNING: no loaders reachable in {scenario_id}!", file=sys.stderr)
reachable_loaders = loaders
rep_results = []
all_events = []
print(f" Running {n_reps} replications", end="", flush=True)
for rep in range(n_reps):
rng = np.random.default_rng(base_seed + rep)
log_events = (rep == 0) # only log first replication to keep file size manageable
state = run_replication(
cfg, G, reachable_loaders, dump_point, trucks,
rng, rep, scenario_id, log_events=log_events,
)
metrics = extract_metrics(state, cfg, trucks)
seed_used = base_seed + rep
rep_results.append({
"scenario_id": scenario_id,
"replication": rep + 1,
"random_seed": seed_used,
"total_tonnes_delivered": round(metrics["total_tonnes"], 2),
"tonnes_per_hour": round(metrics["tph"], 3),
"average_truck_cycle_time_min": round(metrics["avg_cycle_min"], 3),
"average_truck_utilisation": round(metrics["avg_truck_util"], 4),
"crusher_utilisation": round(metrics["crusher_util"], 4),
"average_loader_queue_time_min": round(metrics["avg_loader_q_min"], 3),
"average_crusher_queue_time_min": round(metrics["avg_crusher_q_min"], 3),
"n_dumps": metrics["n_dumps"],
})
all_events.extend(state.event_log)
print(".", end="", flush=True)
print()
return rep_results, all_events
# ---------------------------------------------------------------------------
# Aggregate scenario results into summary stats
# ---------------------------------------------------------------------------
def aggregate_scenario(rep_results: List[Dict], cfg: Dict) -> Dict:
tonnes = [r["total_tonnes_delivered"] for r in rep_results]
tph = [r["tonnes_per_hour"] for r in rep_results]
cycle = [r["average_truck_cycle_time_min"] for r in rep_results]
util = [r["average_truck_utilisation"] for r in rep_results]
crusher_u = [r["crusher_utilisation"] for r in rep_results]
lq = [r["average_loader_queue_time_min"] for r in rep_results]
cq = [r["average_crusher_queue_time_min"] for r in rep_results]
tonnes_lo, tonnes_hi = ci95(tonnes)
tph_lo, tph_hi = ci95(tph)
return {
"replications": len(rep_results),
"shift_length_hours": cfg["shift_length_min"] / 60.0,
"total_tonnes_mean": round(float(np.mean(tonnes)), 2),
"total_tonnes_ci95_low": round(tonnes_lo, 2),
"total_tonnes_ci95_high": round(tonnes_hi, 2),
"tonnes_per_hour_mean": round(float(np.mean(tph)), 3),
"tonnes_per_hour_ci95_low": round(tph_lo, 3),
"tonnes_per_hour_ci95_high": round(tph_hi, 3),
"average_cycle_time_min": round(float(np.mean(cycle)), 3),
"truck_utilisation_mean": round(float(np.mean(util)), 4),
"crusher_utilisation": round(float(np.mean(crusher_u)), 4),
"average_loader_queue_time_min": round(float(np.mean(lq)), 3),
"average_crusher_queue_time_min": round(float(np.mean(cq)), 3),
"top_bottlenecks": _identify_bottlenecks(
float(np.mean(crusher_u)), float(np.mean(lq)), float(np.mean(cq)),
float(np.mean(util))
),
}
def _identify_bottlenecks(crusher_u, lq, cq, truck_u) -> List[str]:
bottlenecks = []
if crusher_u > 0.80:
bottlenecks.append(f"Crusher highly utilised ({crusher_u:.1%})")
if cq > 2.0:
bottlenecks.append(f"Long crusher queue (avg {cq:.1f} min wait)")
if lq > 2.0:
bottlenecks.append(f"Long loader queue (avg {lq:.1f} min wait)")
if truck_u > 0.90:
bottlenecks.append(f"Trucks near saturation (avg util {truck_u:.1%})")
if not bottlenecks:
bottlenecks.append("No severe bottleneck detected")
return bottlenecks
# ---------------------------------------------------------------------------
# Write outputs
# ---------------------------------------------------------------------------
def write_results_csv(all_rep_results: List[Dict]) -> None:
df = pd.DataFrame(all_rep_results)
df.to_csv(ROOT / "results.csv", index=False)
print(f" Written results.csv ({len(df)} rows)")
def write_summary_json(scenario_summaries: Dict, scenario_cfgs: Dict) -> None:
key_assumptions = [
"Trucks follow shortest travel-time routes recalculated at each dispatch decision",
"Loading and dumping service times are truncated-normal (truncated at zero)",
"Travel time noise is lognormal with CV=0.10 (mean preserved at 1.0x)",
"Capacity-constrained road segments (capacity=1) are SimPy Resources; only one truck traverses at a time",
"Loaders and crusher have capacity 1 (single-server queues, FIFO)",
"All trucks are identical (100 t payload, loaded speed factor 0.85)",
"Shift warmup is zero; trucks depart from PARK at time 0",
"Trucks that complete a dump after shift_end are not counted (strict cutoff)",
"Dispatching: nearest-available-loader with queue-length penalty",
"All 12 trucks available; fleet scenarios select the first N from trucks.csv",
]
model_limitations = [
"No fuel or shift-change delays modelled",
"No truck breakdowns or maintenance schedules",
"Dispatch does not account for congestion on capacity-1 roads",
"Travel noise is independent between edges (no correlated delays)",
"Loader availability is 100% throughout the shift",
"Waste haulage not modelled (all ore goes to crusher)",
"Parking delays at shift start not modelled",
]
payload = {
"benchmark_id": "001_synthetic_mine_throughput",
"scenarios": scenario_summaries,
"key_assumptions": key_assumptions,
"model_limitations": model_limitations,
"additional_scenarios_proposed": [
{
"id": "loader_upgrade",
"description": "Reduce LOAD_N service time mean from 6.5 to 4.5 min (match LOAD_S) "
"to test whether loader speed at North Pit limits throughput.",
}
],
}
with open(ROOT / "summary.json", "w") as f:
json.dump(payload, f, indent=2)
print(" Written summary.json")
def write_event_log(all_events: List[Dict]) -> None:
if not all_events:
pd.DataFrame(
columns=["time_min", "replication", "scenario_id", "truck_id",
"event_type", "from_node", "to_node", "location",
"loaded", "payload_tonnes", "resource_id", "queue_length"]
).to_csv(ROOT / "event_log.csv", index=False)
else:
df = pd.DataFrame(all_events)
df.to_csv(ROOT / "event_log.csv", index=False)
print(f" Written event_log.csv ({len(all_events)} events)")
# ---------------------------------------------------------------------------
# Topology visualisation
# ---------------------------------------------------------------------------
def plot_topology(nodes: pd.DataFrame, edges: pd.DataFrame) -> None:
try:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
except ImportError:
print(" matplotlib not available, skipping topology plot")
return
fig, ax = plt.subplots(figsize=(14, 10))
color_map = {
"parking": "#888888",
"junction": "#4A90D9",
"load_ore": "#E67E22",
"crusher": "#E74C3C",
"waste_dump": "#95A5A6",
"maintenance": "#8E44AD",
}
pos = {row["node_id"]: (row["x_m"], row["y_m"]) for _, row in nodes.iterrows()}
# Draw edges
for _, row in edges.iterrows():
x0, y0 = pos[row["from_node"]]
x1, y1 = pos[row["to_node"]]
cap = int(row["capacity"])
lw = 3.0 if cap < 10 else 1.0
color = "#E74C3C" if cap < 10 else "#BDC3C7"
ax.annotate(
"", xy=(x1, y1), xytext=(x0, y0),
arrowprops=dict(arrowstyle="-|>", color=color, lw=lw,
connectionstyle="arc3,rad=0.05"),
)
# Draw nodes
for _, row in nodes.iterrows():
x, y = pos[row["node_id"]]
c = color_map.get(row["node_type"], "#2ECC71")
ax.scatter(x, y, s=300, c=c, zorder=5, edgecolors="white", linewidths=1.5)
ax.text(x, y + 80, row["node_name"], ha="center", va="bottom", fontsize=7,
fontweight="bold")
# Legend
patches = [
mpatches.Patch(color=v, label=k.replace("_", " ").title())
for k, v in color_map.items()
]
patches.append(mpatches.Patch(color="#E74C3C", label="Constrained road (cap=1)"))
patches.append(mpatches.Patch(color="#BDC3C7", label="Unconstrained road"))
ax.legend(handles=patches, loc="upper left", fontsize=8)
ax.set_title("Mine Topology — Node and Road Network", fontsize=13, fontweight="bold")
ax.set_xlabel("x (m)")
ax.set_ylabel("y (m)")
ax.set_aspect("equal")
ax.grid(True, alpha=0.2)
plt.tight_layout()
plt.savefig(ROOT / "topology.png", dpi=150, bbox_inches="tight")
plt.close()
print(" Written topology.png")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
SCENARIO_ORDER = [
"baseline", "trucks_4", "trucks_12",
"ramp_upgrade", "crusher_slowdown", "ramp_closed",
]
def main():
parser = argparse.ArgumentParser(description="Mine haulage DES runner")
parser.add_argument("--scenario", default=None, help="Single scenario to run")
parser.add_argument("--replications", type=int, default=None,
help="Override number of replications")
parser.add_argument("--plot", action="store_true", help="Generate topology.png")
args = parser.parse_args()
t0 = time.time()
print("Loading data...")
nodes, edges, trucks, loaders_df, dump_points_df = load_data()
if args.plot:
print("Generating topology plot...")
plot_topology(nodes, edges)
scenarios_to_run = [args.scenario] if args.scenario else SCENARIO_ORDER
all_rep_results: List[Dict] = []
all_events: List[Dict] = []
scenario_summaries: Dict = {}
scenario_cfgs: Dict = {}
for sid in scenarios_to_run:
yaml_path = SCENARIOS_DIR / f"{sid}.yaml"
if not yaml_path.exists():
print(f" Scenario file not found: {yaml_path}", file=sys.stderr)
continue
print(f"\nScenario: {sid}")
merged = resolve_scenario(yaml_path, {})
cfg = flatten_scenario(merged)
scenario_cfgs[sid] = cfg
rep_results, events = run_scenario(
cfg, nodes, edges, trucks, loaders_df, dump_points_df,
rep_override=args.replications,
)
all_rep_results.extend(rep_results)
all_events.extend(events)
summary = aggregate_scenario(rep_results, cfg)
scenario_summaries[sid] = summary
mean_t = summary["total_tonnes_mean"]
ci_lo = summary["total_tonnes_ci95_low"]
ci_hi = summary["total_tonnes_ci95_high"]
tph = summary["tonnes_per_hour_mean"]
print(f" Throughput: {mean_t:.0f} t [{ci_lo:.0f}–{ci_hi:.0f}] ({tph:.1f} t/h)")
print("\nWriting outputs...")
write_results_csv(all_rep_results)
write_summary_json(scenario_summaries, scenario_cfgs)
write_event_log(all_events)
elapsed = time.time() - t0
print(f"\nDone in {elapsed:.1f}s")
_print_summary(scenario_summaries)
def _print_summary(summaries: Dict) -> None:
print("\n" + "=" * 72)
print(f"{'Scenario':<20} {'Mean t':>8} {'95% CI':>16} {'t/h':>7} "
f"{'Crusher%':>9} {'CrusherQ min':>13}")
print("-" * 72)
for sid, s in summaries.items():
ci_str = f"[{s['total_tonnes_ci95_low']:.0f}–{s['total_tonnes_ci95_high']:.0f}]"
print(f"{sid:<20} {s['total_tonnes_mean']:>8.0f} {ci_str:>16} "
f"{s['tonnes_per_hour_mean']:>7.1f} "
f"{s['crusher_utilisation']:>9.1%} "
f"{s['average_crusher_queue_time_min']:>13.2f}")
print("=" * 72)
if __name__ == "__main__":
main()
sim.py
← Back to submission · View raw on GitHub