import simpy
import pandas as pd
import numpy as np
import networkx as nx
import yaml
import copy
import json
import os
import time
def load_data():
nodes = pd.read_csv('data/nodes.csv')
edges = pd.read_csv('data/edges.csv')
trucks = pd.read_csv('data/trucks.csv')
loaders = pd.read_csv('data/loaders.csv')
dump_points = pd.read_csv('data/dump_points.csv')
return nodes, edges, trucks, loaders, dump_points
def parse_scenario(scenario_file):
with open(f'data/scenarios/{scenario_file}', 'r') as f:
scenario = yaml.safe_load(f)
if 'inherits' in scenario:
with open(f"data/scenarios/{scenario['inherits']}.yaml", 'r') as f:
base = yaml.safe_load(f)
# Deep update
def update(d, u):
for k, v in u.items():
if isinstance(v, dict):
d[k] = update(d.get(k, {}), v)
else:
d[k] = v
return d
scenario = update(base, scenario)
return scenario
def build_graph(edges_df, scenario):
G = nx.DiGraph()
for _, row in edges_df.iterrows():
edge_id = row['edge_id']
closed = row.get('closed', False)
if isinstance(closed, str) and closed.lower() == 'true':
closed = True
capacity = row['capacity']
max_speed = row['max_speed_kph']
# apply overrides
if 'edge_overrides' in scenario and edge_id in scenario['edge_overrides']:
overrides = scenario['edge_overrides'][edge_id]
closed = overrides.get('closed', closed)
if isinstance(closed, str) and closed.lower() == 'true':
closed = True
capacity = overrides.get('capacity', capacity)
max_speed = overrides.get('max_speed_kph', max_speed)
if not closed:
base_time = (row['distance_m'] / 1000) / max_speed * 60
G.add_edge(row['from_node'], row['to_node'],
edge_id=edge_id,
distance_m=row['distance_m'],
max_speed=max_speed,
capacity=capacity,
base_time=base_time)
return G
def get_path(G, source, target):
try:
path = nx.shortest_path(G, source, target, weight='base_time')
return path
except nx.NetworkXNoPath:
return None
class MineSimulation:
def __init__(self, env, scenario_id, scenario, nodes, edges, trucks, loaders, dump_points, seed):
self.env = env
self.scenario_id = scenario_id
self.scenario = scenario
self.nodes = nodes
self.edges = edges
self.trucks = trucks
self.loaders = loaders
self.dump_points = dump_points
self.seed = seed
self.rng = np.random.default_rng(seed)
self.G = build_graph(self.edges, self.scenario)
self.edge_resources = {}
for u, v, data in self.G.edges(data=True):
cap = data['capacity']
if cap < 999:
self.edge_resources[data['edge_id']] = simpy.Resource(env, capacity=int(cap))
self.loader_resources = {}
self.loader_stats = {}
for _, row in self.loaders.iterrows():
l_id = row['loader_id']
n_id = row['node_id']
self.loader_resources[n_id] = simpy.Resource(env, capacity=int(row['capacity']))
self.loader_stats[n_id] = {
'mean_load_time': row['mean_load_time_min'],
'sd_load_time': row['sd_load_time_min']
}
self.dump_resources = {}
self.dump_stats = {}
for _, row in self.dump_points.iterrows():
d_id = row['dump_id']
n_id = row['node_id']
# overrides
mean_dump = row['mean_dump_time_min']
sd_dump = row['sd_dump_time_min']
if 'dump_point_overrides' in scenario and d_id in scenario['dump_point_overrides']:
ov = scenario['dump_point_overrides'][d_id]
mean_dump = ov.get('mean_dump_time_min', mean_dump)
sd_dump = ov.get('sd_dump_time_min', sd_dump)
self.dump_resources[n_id] = simpy.Resource(env, capacity=int(row['capacity']))
self.dump_stats[n_id] = {
'mean_dump_time': mean_dump,
'sd_dump_time': sd_dump
}
self.event_log = []
self.total_tonnes = 0
self.cycle_times = []
self.truck_utilization = {row['truck_id']: 0 for _, row in trucks.iterrows()}
self.loader_queue_times = []
self.crusher_queue_times = []
self.crusher_busy_time = 0
self.loader_busy_time = {n: 0 for n in self.loader_resources}
def log_event(self, time_min, truck_id, event_type, from_node, to_node, location, loaded, payload_tonnes, resource_id, queue_length):
self.event_log.append({
'time_min': time_min,
'replication': self.seed,
'scenario_id': self.scenario_id,
'truck_id': truck_id,
'event_type': event_type,
'from_node': from_node,
'to_node': to_node,
'location': location,
'loaded': loaded,
'payload_tonnes': payload_tonnes,
'resource_id': resource_id,
'queue_length': queue_length
})
def run_truck(self, truck_id, payload, empty_speed_factor, loaded_speed_factor, start_node):
current_node = start_node
shift_length = self.scenario['simulation']['shift_length_hours'] * 60
while self.env.now < shift_length:
cycle_start = self.env.now
# 1. Decide next loader
loaders_nodes = self.scenario['production']['ore_sources']
best_loader = None
best_score = float('inf')
for loader in loaders_nodes:
path = get_path(self.G, current_node, loader)
if path is None: continue
# calc travel time
tt = sum([self.G[path[i]][path[i+1]]['base_time'] for i in range(len(path)-1)]) / empty_speed_factor
res = self.loader_resources[loader]
exp_q_time = len(res.queue) * self.loader_stats[loader]['mean_load_time']
score = tt + exp_q_time
if score < best_score:
best_score = score
best_loader = loader
if best_loader is None:
break
# 2. Travel to loader
path = get_path(self.G, current_node, best_loader)
start_travel_time = self.env.now
for i in range(len(path)-1):
u, v = path[i], path[i+1]
edge_data = self.G[u][v]
base_t = edge_data['base_time'] / empty_speed_factor
cv = self.scenario.get('stochasticity', {}).get('travel_time_noise_cv', 0.10)
actual_t = max(0.1, self.rng.normal(base_t, base_t * cv))
self.log_event(self.env.now, truck_id, 'travel_start', u, v, u, False, 0, edge_data['edge_id'], 0)
if edge_data['capacity'] < 999:
res = self.edge_resources[edge_data['edge_id']]
req = res.request()
yield req
yield self.env.timeout(actual_t)
res.release(req)
else:
yield self.env.timeout(actual_t)
self.truck_utilization[truck_id] += (self.env.now - start_travel_time)
current_node = best_loader
# 3. Load
res = self.loader_resources[current_node]
self.log_event(self.env.now, truck_id, 'join_loader_queue', current_node, '', current_node, False, 0, current_node, len(res.queue))
q_start = self.env.now
req = res.request()
yield req
q_time = self.env.now - q_start
self.loader_queue_times.append(q_time)
mean_l = self.loader_stats[current_node]['mean_load_time']
sd_l = self.loader_stats[current_node]['sd_load_time']
load_t = max(0.1, self.rng.normal(mean_l, sd_l))
self.log_event(self.env.now, truck_id, 'loading_start', current_node, '', current_node, False, 0, current_node, len(res.queue))
yield self.env.timeout(load_t)
res.release(req)
self.log_event(self.env.now, truck_id, 'loading_end', current_node, '', current_node, True, payload, current_node, len(res.queue))
self.truck_utilization[truck_id] += load_t
self.loader_busy_time[current_node] += load_t
# 4. Travel to crusher
target_crusher = self.scenario['production']['dump_destination']
path = get_path(self.G, current_node, target_crusher)
if path is None:
break
start_travel_time = self.env.now
for i in range(len(path)-1):
u, v = path[i], path[i+1]
edge_data = self.G[u][v]
base_t = edge_data['base_time'] / loaded_speed_factor
cv = self.scenario.get('stochasticity', {}).get('travel_time_noise_cv', 0.10)
actual_t = max(0.1, self.rng.normal(base_t, base_t * cv))
self.log_event(self.env.now, truck_id, 'travel_start', u, v, u, True, payload, edge_data['edge_id'], 0)
if edge_data['capacity'] < 999:
res = self.edge_resources[edge_data['edge_id']]
req = res.request()
yield req
yield self.env.timeout(actual_t)
res.release(req)
else:
yield self.env.timeout(actual_t)
self.truck_utilization[truck_id] += (self.env.now - start_travel_time)
current_node = target_crusher
# 5. Dump
res = self.dump_resources[current_node]
self.log_event(self.env.now, truck_id, 'join_crusher_queue', current_node, '', current_node, True, payload, current_node, len(res.queue))
q_start = self.env.now
req = res.request()
yield req
q_time = self.env.now - q_start
self.crusher_queue_times.append(q_time)
mean_d = self.dump_stats[current_node]['mean_dump_time']
sd_d = self.dump_stats[current_node]['sd_dump_time']
dump_t = max(0.1, self.rng.normal(mean_d, sd_d))
self.log_event(self.env.now, truck_id, 'dumping_start', current_node, '', current_node, True, payload, current_node, len(res.queue))
yield self.env.timeout(dump_t)
res.release(req)
self.log_event(self.env.now, truck_id, 'dumping_end', current_node, '', current_node, False, 0, current_node, len(res.queue))
self.truck_utilization[truck_id] += dump_t
self.crusher_busy_time += dump_t
self.total_tonnes += payload
self.cycle_times.append(self.env.now - cycle_start)
def run_scenario(scenario_file, nodes, edges, trucks, loaders, dump_points, num_replications=30):
scenario_id = scenario_file.replace('.yaml', '')
scenario = parse_scenario(scenario_file)
rep_results = []
all_logs = []
base_seed = scenario['simulation'].get('base_random_seed', 12345)
shift_length = scenario['simulation']['shift_length_hours']
truck_count = scenario['fleet']['truck_count']
# We slice trucks based on truck_count
active_trucks = trucks.head(truck_count)
for rep in range(num_replications):
env = simpy.Environment()
seed = base_seed + rep
sim = MineSimulation(env, scenario_id, scenario, nodes, edges, active_trucks, loaders, dump_points, seed)
for _, row in active_trucks.iterrows():
env.process(sim.run_truck(
row['truck_id'],
row['payload_tonnes'],
row['empty_speed_factor'],
row['loaded_speed_factor'],
row['start_node']
))
env.run(until=shift_length * 60)
# Calculate stats
tph = sim.total_tonnes / shift_length
avg_cycle = np.mean(sim.cycle_times) if sim.cycle_times else 0
avg_truck_util = np.mean([v / (shift_length * 60) for v in sim.truck_utilization.values()]) if sim.truck_utilization else 0
crusher_util = sim.crusher_busy_time / (shift_length * 60)
avg_loader_q = np.mean(sim.loader_queue_times) if sim.loader_queue_times else 0
avg_crush_q = np.mean(sim.crusher_queue_times) if sim.crusher_queue_times else 0
loader_util = {k: v / (shift_length * 60) for k, v in sim.loader_busy_time.items()}
rep_results.append({
'scenario_id': scenario_id,
'replication': rep,
'random_seed': seed,
'total_tonnes_delivered': sim.total_tonnes,
'tonnes_per_hour': tph,
'average_truck_cycle_time_min': avg_cycle,
'average_truck_utilisation': avg_truck_util,
'crusher_utilisation': crusher_util,
'average_loader_queue_time_min': avg_loader_q,
'average_crusher_queue_time_min': avg_crush_q,
'loader_utilization': loader_util
})
all_logs.extend(sim.event_log)
return pd.DataFrame(rep_results), all_logs
if __name__ == "__main__":
nodes, edges, trucks, loaders, dump_points = load_data()
scenarios = [
'baseline.yaml',
'trucks_4.yaml',
'trucks_12.yaml',
'ramp_upgrade.yaml',
'crusher_slowdown.yaml',
'ramp_closed.yaml'
]
all_results = []
all_event_logs = []
for scen in scenarios:
print(f"Running scenario: {scen}")
res_df, logs = run_scenario(scen, nodes, edges, trucks, loaders, dump_points, num_replications=30)
all_results.append(res_df)
all_event_logs.extend(logs)
final_results = pd.concat(all_results, ignore_index=True)
# Exclude loader_utilization from CSV
csv_results = final_results.drop(columns=['loader_utilization'])
csv_results.to_csv('results.csv', index=False)
pd.DataFrame(all_event_logs).to_csv('event_log.csv', index=False)
# build summary.json
summary = {
"benchmark_id": "001_synthetic_mine_throughput",
"scenarios": {},
"key_assumptions": [
"Trucks travel at uniform speed equal to speed limit * speed factor",
"Shortest time path is calculated once per dispatch",
"Loading and dumping times follow truncated normal distributions bounded at 0.1 min",
"Travel times include normal noise with CV 0.1",
"Constrained roads allow only 1 truck at a time"
],
"model_limitations": [
"No acceleration/deceleration kinematics",
"No intersection delays beyond single-capacity road queues",
"No shift changes, breaks, or mid-shift refueling"
],
"additional_scenarios_proposed": []
}
for scenario_id, group in final_results.groupby('scenario_id'):
# calculate 95% CI
def ci95(data):
m = np.mean(data)
se = np.std(data, ddof=1) / np.sqrt(len(data))
return float(m - 1.96 * se), float(m + 1.96 * se)
t_m = float(group['total_tonnes_delivered'].mean())
t_l, t_h = ci95(group['total_tonnes_delivered'])
tph_m = float(group['tonnes_per_hour'].mean())
tph_l, tph_h = ci95(group['tonnes_per_hour'])
# average of loader_utilization over all reps
loader_util = {}
for rep_lu in group['loader_utilization']:
for k, v in rep_lu.items():
loader_util[k] = loader_util.get(k, 0) + v / len(group)
summary["scenarios"][scenario_id] = {
"replications": len(group),
"shift_length_hours": 8,
"total_tonnes_mean": t_m,
"total_tonnes_ci95_low": t_l,
"total_tonnes_ci95_high": t_h,
"tonnes_per_hour_mean": tph_m,
"tonnes_per_hour_ci95_low": tph_l,
"tonnes_per_hour_ci95_high": tph_h,
"average_cycle_time_min": float(group['average_truck_cycle_time_min'].mean()),
"truck_utilisation_mean": float(group['average_truck_utilisation'].mean()),
"loader_utilisation": loader_util,
"crusher_utilisation": float(group['crusher_utilisation'].mean()),
"average_loader_queue_time_min": float(group['average_loader_queue_time_min'].mean()),
"average_crusher_queue_time_min": float(group['average_crusher_queue_time_min'].mean()),
"top_bottlenecks": []
}
with open('summary.json', 'w') as f:
json.dump(summary, f, indent=2)
sim.py
← Back to submission · View raw on GitHub