src/mine_sim/model.py

← Back to submission · View raw on GitHub

import simpy
import numpy as np
import pandas as pd

def sample_travel_time(base_time, cv, generator):
    if cv <= 0.0:
        return base_time
    sigma_log = np.sqrt(np.log(1.0 + cv**2))
    mu_log = np.log(base_time) - 0.5 * sigma_log**2
    return generator.lognormal(mu_log, sigma_log)

def sample_truncated_normal(mean, sd, generator, min_val=0.1):
    return max(min_val, generator.normal(mean, sd))

class SimulationModel:
    def __init__(self, scenario_cfg, mine_graph, replication_idx):
        self.scenario_cfg = scenario_cfg
        self.mine_graph = mine_graph
        self.replication_idx = replication_idx
        
        # Seed control
        self.base_seed = scenario_cfg["simulation"]["base_random_seed"]
        self.seed = self.base_seed + replication_idx
        self.rng = np.random.default_rng(self.seed)
        
        # SimPy environment
        self.env = simpy.Environment()
        
        # Shift configuration
        self.shift_length_hours = scenario_cfg["simulation"]["shift_length_hours"]
        self.shift_length_min = self.shift_length_hours * 60.0
        
        # Stochasticity configuration
        self.travel_time_noise_cv = scenario_cfg["stochasticity"]["travel_time_noise_cv"]
        
        # Dispatching factors (identical across trucks)
        self.empty_speed_factor = 1.0
        self.loaded_speed_factor = 0.85
        
        # Initialize SimPy resources
        self.loader_resources = {}
        self.loaders_data = {}
        for node_id, node_attr in self.mine_graph.G.nodes(data=True):
            if node_attr['node_type'] == 'load_ore':
                loader_id = "L_N" if node_id == "LOAD_N" else "L_S"
                mean_load = node_attr['service_time_mean_min']
                sd_load = node_attr['service_time_sd_min']
                
                self.loaders_data[loader_id] = {
                    "node_id": node_id,
                    "mean_load_time_min": mean_load,
                    "sd_load_time_min": sd_load,
                    "capacity": 1
                }
                self.loader_resources[loader_id] = simpy.Resource(self.env, capacity=1)
                
        # Crusher resource
        crusher_node_attr = self.mine_graph.G.nodes['CRUSH']
        self.dump_points_data = {
            "D_CRUSH": {
                "node_id": "CRUSH",
                "mean_dump_time_min": crusher_node_attr['service_time_mean_min'],
                "sd_dump_time_min": crusher_node_attr['service_time_sd_min'],
                "capacity": 1
            }
        }
        self.crusher_resource = simpy.Resource(self.env, capacity=1)
        
        # Capacity-constrained edges
        self.edge_resources = {}
        for u, v, d in self.mine_graph.G.edges(data=True):
            if d['capacity'] == 1:
                edge_id = d['edge_id']
                self.edge_resources[edge_id] = simpy.Resource(self.env, capacity=1)
                
        # Fleet setup
        self.truck_count = scenario_cfg["fleet"]["truck_count"]
        # Find first truck_count rows in trucks.csv
        # All trucks in the synthetic dataset have payload=100, empty_speed_factor=1.0, loaded_speed_factor=0.85
        
        # State variables and metrics
        self.total_tonnes_delivered = 0.0
        self.event_log = []
        
        # Truck metrics tracking
        # States: 'TRAVEL_EMPTY', 'TRAVEL_LOADED', 'LOADING', 'DUMPING', 'QUEUE_LOADER', 'QUEUE_CRUSHER', 'QUEUE_EDGE'
        self.truck_stats = {}
        self.truck_states = {} # truck_id -> (current_state, start_time)
        
        # Resource stats
        self.loader_stats = {
            "L_N": {"busy_time": 0.0, "state": "IDLE", "state_start": 0.0, "queue_times": [], "queue_lens": []},
            "L_S": {"busy_time": 0.0, "state": "IDLE", "state_start": 0.0, "queue_times": [], "queue_lens": []}
        }
        self.crusher_stats = {"busy_time": 0.0, "state": "IDLE", "state_start": 0.0, "queue_times": [], "queue_lens": []}
        
        self.edge_stats = {}
        for edge_id in self.edge_resources:
            self.edge_stats[edge_id] = {
                "busy_time": 0.0,
                "state": "IDLE",
                "state_start": 0.0,
                "queue_times": [],
                "queue_lens": []
            }
            
    def log_event(self, time_min, truck_id, event_type, from_node, to_node, location, loaded, payload, resource_id, queue_length):
        self.event_log.append({
            "time_min": round(time_min, 4),
            "replication": self.replication_idx,
            "scenario_id": self.scenario_cfg["scenario_id"],
            "truck_id": truck_id,
            "event_type": event_type,
            "from_node": from_node,
            "to_node": to_node,
            "location": location,
            "loaded": loaded,
            "payload_tonnes": payload,
            "resource_id": resource_id,
            "queue_length": queue_length
        })
        
    def transition_truck_state(self, truck_id, new_state, now):
        old_state, start_time = self.truck_states[truck_id]
        duration = now - start_time
        if duration > 0:
            self.truck_stats[truck_id][old_state] += duration
        self.truck_states[truck_id] = (new_state, now)
        
    def transition_loader_state(self, loader_id, new_state, now):
        old_state = self.loader_stats[loader_id]["state"]
        start_time = self.loader_stats[loader_id]["state_start"]
        duration = now - start_time
        if duration > 0:
            if old_state == "BUSY":
                self.loader_stats[loader_id]["busy_time"] += duration
        self.loader_stats[loader_id]["state"] = new_state
        self.loader_stats[loader_id]["state_start"] = now
        
    def transition_crusher_state(self, new_state, now):
        old_state = self.crusher_stats["state"]
        start_time = self.crusher_stats["state_start"]
        duration = now - start_time
        if duration > 0:
            if old_state == "BUSY":
                self.crusher_stats["busy_time"] += duration
        self.crusher_stats["state"] = new_state
        self.crusher_stats["state_start"] = now
        
    def transition_edge_state(self, edge_id, new_state, now):
        old_state = self.edge_stats[edge_id]["state"]
        start_time = self.edge_stats[edge_id]["state_start"]
        duration = now - start_time
        if duration > 0:
            if old_state == "BUSY":
                self.edge_stats[edge_id]["busy_time"] += duration
        self.edge_stats[edge_id]["state"] = new_state
        self.edge_stats[edge_id]["state_start"] = now

    def dispatch_truck(self, truck_id, current_node, now):
        """
        Implements: dispatch = min(travel + queue_len * mean_load + own_load)
        """
        best_loader_id = None
        best_loader_node = None
        best_score = float('inf')
        
        for loader_id, loader_info in self.loaders_data.items():
            loader_node = loader_info['node_id']
            
            # Calculate shortest path travel time from current_node to loader_node
            _, _, travel_time = self.mine_graph.compute_shortest_path(
                current_node, loader_node, is_loaded=False,
                empty_speed_factor=self.empty_speed_factor,
                loaded_speed_factor=self.loaded_speed_factor
            )
            
            # Calculate current queue length at loader (waiting + in-service)
            res = self.loader_resources[loader_id]
            queue_len = len(res.queue) + res.count
            
            mean_load = loader_info['mean_load_time_min']
            
            # Dispatch Formula: travel + queue_len * mean_load + own_load (which is mean_load)
            score = travel_time + queue_len * mean_load + mean_load
            
            if score < best_score:
                best_score = score
                best_loader_id = loader_id
                best_loader_node = loader_node
                
        return best_loader_id, best_loader_node

    def truck_process(self, truck_id):
        current_node = "PARK"
        is_loaded = False
        payload = 100.0 # Standard from trucks.csv
        
        # Register in stats
        self.truck_stats[truck_id] = {
            "TRAVEL_EMPTY": 0.0,
            "TRAVEL_LOADED": 0.0,
            "LOADING": 0.0,
            "DUMPING": 0.0,
            "QUEUE_LOADER": 0.0,
            "QUEUE_CRUSHER": 0.0,
            "QUEUE_EDGE": 0.0,
            "completed_cycles": []
        }
        self.truck_states[truck_id] = ("PARKED", 0.0)
        
        # Simultaneous t=0 dispatch
        loader_id, loader_node = self.dispatch_truck(truck_id, current_node, self.env.now)
        self.log_event(self.env.now, truck_id, "dispatch", current_node, loader_node, current_node, is_loaded, 0.0, loader_id, 0)
        self.transition_truck_state(truck_id, "TRAVEL_EMPTY", self.env.now)
        
        cycle_start_time = self.env.now
        target_node = loader_node
        
        while True:
            # 1. Travel from current_node to target_node edge-by-edge
            path_nodes, path_edges, _ = self.mine_graph.compute_shortest_path(
                current_node, target_node, is_loaded, self.empty_speed_factor, self.loaded_speed_factor
            )
            
            for i in range(len(path_nodes) - 1):
                u, v = path_nodes[i], path_nodes[i+1]
                edge_data = self.mine_graph.get_edge_details(u, v)
                edge_id = edge_data['edge_id']
                is_constrained = (edge_data['capacity'] == 1)
                
                # Base free flow travel time on this edge
                edge_base_time = self.mine_graph.calculate_free_flow_travel_time(
                    u, v, is_loaded, self.empty_speed_factor, self.loaded_speed_factor
                )
                # Sample with lognormal noise
                edge_time = sample_travel_time(edge_base_time, self.travel_time_noise_cv, self.rng)
                
                if is_constrained:
                    res = self.edge_resources[edge_id]
                    self.log_event(self.env.now, truck_id, "edge_queue_start", u, v, u, is_loaded, payload if is_loaded else 0.0, edge_id, len(res.queue))
                    self.transition_truck_state(truck_id, "QUEUE_EDGE", self.env.now)
                    
                    queue_start = self.env.now
                    with res.request() as req:
                        yield req
                        
                        queue_wait = self.env.now - queue_start
                        self.edge_stats[edge_id]["queue_times"].append(queue_wait)
                        
                        self.log_event(self.env.now, truck_id, "edge_enter", u, v, u, is_loaded, payload if is_loaded else 0.0, edge_id, len(res.queue))
                        self.transition_truck_state(truck_id, "TRAVEL_LOADED" if is_loaded else "TRAVEL_EMPTY", self.env.now)
                        
                        # Edge becomes busy
                        self.transition_edge_state(edge_id, "BUSY", self.env.now)
                        
                        yield self.env.timeout(edge_time)
                        
                        self.transition_edge_state(edge_id, "IDLE", self.env.now)
                        self.log_event(self.env.now, truck_id, "edge_leave", u, v, v, is_loaded, payload if is_loaded else 0.0, edge_id, len(res.queue))
                else:
                    self.log_event(self.env.now, truck_id, "travel_start", u, v, u, is_loaded, payload if is_loaded else 0.0, edge_id, 0)
                    self.transition_truck_state(truck_id, "TRAVEL_LOADED" if is_loaded else "TRAVEL_EMPTY", self.env.now)
                    
                    yield self.env.timeout(edge_time)
                    
                    self.log_event(self.env.now, truck_id, "travel_end", u, v, v, is_loaded, payload if is_loaded else 0.0, edge_id, 0)
                    
            current_node = target_node
            
            # 2. Arrived at target loader node
            if current_node in ["LOAD_N", "LOAD_S"]:
                loader_id = "L_N" if current_node == "LOAD_N" else "L_S"
                res = self.loader_resources[loader_id]
                loader_info = self.loaders_data[loader_id]
                
                self.log_event(self.env.now, truck_id, "loader_queue_start", current_node, current_node, current_node, is_loaded, 0.0, loader_id, len(res.queue))
                self.transition_truck_state(truck_id, "QUEUE_LOADER", self.env.now)
                
                queue_start = self.env.now
                with res.request() as req:
                    yield req
                    
                    queue_wait = self.env.now - queue_start
                    self.loader_stats[loader_id]["queue_times"].append(queue_wait)
                    
                    self.log_event(self.env.now, truck_id, "load_start", current_node, current_node, current_node, is_loaded, 0.0, loader_id, len(res.queue))
                    self.transition_truck_state(truck_id, "LOADING", self.env.now)
                    
                    # Loader becomes busy
                    self.transition_loader_state(loader_id, "BUSY", self.env.now)
                    
                    load_time = sample_truncated_normal(loader_info['mean_load_time_min'], loader_info['sd_load_time_min'], self.rng)
                    yield self.env.timeout(load_time)
                    
                    self.transition_loader_state(loader_id, "IDLE", self.env.now)
                    is_loaded = True
                    self.log_event(self.env.now, truck_id, "load_end", current_node, current_node, current_node, is_loaded, payload, loader_id, len(res.queue))
                    
                # Dispatch loaded truck to crusher
                target_node = "CRUSH"
                self.transition_truck_state(truck_id, "TRAVEL_LOADED", self.env.now)
                
            # 3. Arrived at crusher node
            elif current_node == "CRUSH":
                res = self.crusher_resource
                dump_info = self.dump_points_data["D_CRUSH"]
                
                self.log_event(self.env.now, truck_id, "crusher_queue_start", current_node, current_node, current_node, is_loaded, payload, "D_CRUSH", len(res.queue))
                self.transition_truck_state(truck_id, "QUEUE_CRUSHER", self.env.now)
                
                queue_start = self.env.now
                with res.request() as req:
                    yield req
                    
                    queue_wait = self.env.now - queue_start
                    self.crusher_stats["queue_times"].append(queue_wait)
                    
                    self.log_event(self.env.now, truck_id, "dump_start", current_node, current_node, current_node, is_loaded, payload, "D_CRUSH", len(res.queue))
                    self.transition_truck_state(truck_id, "DUMPING", self.env.now)
                    
                    # Crusher becomes busy
                    self.transition_crusher_state("BUSY", self.env.now)
                    
                    dump_time = sample_truncated_normal(dump_info['mean_dump_time_min'], dump_info['sd_dump_time_min'], self.rng)
                    yield self.env.timeout(dump_time)
                    
                    self.transition_crusher_state("IDLE", self.env.now)
                    self.total_tonnes_delivered += payload
                    is_loaded = False
                    
                    # Complete cycle
                    cycle_time = self.env.now - cycle_start_time
                    self.truck_stats[truck_id]["completed_cycles"].append(cycle_time)
                    
                    self.log_event(self.env.now, truck_id, "dump_end", current_node, current_node, current_node, is_loaded, payload, "D_CRUSH", len(res.queue))
                    
                # After dump completion, dispatch back to a loader empty
                loader_id, loader_node = self.dispatch_truck(truck_id, current_node, self.env.now)
                self.log_event(self.env.now, truck_id, "dispatch", current_node, loader_node, current_node, is_loaded, 0.0, loader_id, 0)
                self.transition_truck_state(truck_id, "TRAVEL_EMPTY", self.env.now)
                
                cycle_start_time = self.env.now
                target_node = loader_node

    def run(self):
        """
        Runs the simulation up to shift_length_min.
        """
        # Periodic recorder for queue lengths
        def queue_recorder():
            while True:
                # Loaders
                for loader_id, res in self.loader_resources.items():
                    self.loader_stats[loader_id]["queue_lens"].append(len(res.queue))
                # Crusher
                self.crusher_stats["queue_lens"].append(len(self.crusher_resource.queue))
                # Constrained edges
                for edge_id, res in self.edge_resources.items():
                    self.edge_stats[edge_id]["queue_lens"].append(len(res.queue))
                yield self.env.timeout(1.0) # record every minute
                
        self.env.process(queue_recorder())
        
        # Start truck processes
        for i in range(self.truck_count):
            truck_id = f"T{i+1:02d}"
            self.env.process(self.truck_process(truck_id))
            
        # Run environment
        self.env.run(until=self.shift_length_min)
        
        # 4. Finalize state timers up to exactly 480.0 minutes
        now = self.shift_length_min
        for truck_id in self.truck_stats:
            old_state, start_time = self.truck_states[truck_id]
            duration = now - start_time
            if duration > 0:
                self.truck_stats[truck_id][old_state] += duration
                
        for loader_id in self.loader_stats:
            self.transition_loader_state(loader_id, "IDLE", now)
            
        self.transition_crusher_state("IDLE", now)
        
        for edge_id in self.edge_stats:
            self.transition_edge_state(edge_id, "IDLE", now)
            
    def get_summary_metrics(self):
        """
        Computes summary statistics for this replication.
        """
        now = self.shift_length_min
        
        # Truck utilization (productive only = TRAVEL_EMPTY + TRAVEL_LOADED + LOADING + DUMPING)
        truck_utilisations = []
        cycle_times = []
        for truck_id, stats in self.truck_stats.items():
            productive_time = stats["TRAVEL_EMPTY"] + stats["TRAVEL_LOADED"] + stats["LOADING"] + stats["DUMPING"]
            truck_utilisations.append(productive_time / now)
            cycle_times.extend(stats["completed_cycles"])
            
        avg_truck_util = np.mean(truck_utilisations) if truck_utilisations else 0.0
        avg_cycle_time = np.mean(cycle_times) if cycle_times else 0.0
        
        # Resource utilisations
        loader_utils = {}
        for loader_id, stats in self.loader_stats.items():
            loader_utils[loader_id] = stats["busy_time"] / now
            
        crusher_util = self.crusher_stats["busy_time"] / now
        
        # Mean queue waits
        avg_loader_q_waits = []
        for loader_id, stats in self.loader_stats.items():
            if stats["queue_times"]:
                avg_loader_q_waits.append(np.mean(stats["queue_times"]))
            else:
                avg_loader_q_waits.append(0.0)
        avg_loader_queue_time = np.mean(avg_loader_q_waits) if avg_loader_q_waits else 0.0
        
        avg_crusher_queue_time = np.mean(self.crusher_stats["queue_times"]) if self.crusher_stats["queue_times"] else 0.0
        
        # Return dict of metrics
        return {
            "replication": self.replication_idx,
            "random_seed": self.seed,
            "total_tonnes_delivered": self.total_tonnes_delivered,
            "tonnes_per_hour": self.total_tonnes_delivered / (self.shift_length_hours),
            "average_truck_cycle_time_min": avg_cycle_time,
            "average_truck_utilisation": avg_truck_util,
            "crusher_utilisation": crusher_util,
            "loader_utilisation": loader_utils,
            "average_loader_queue_time_min": avg_loader_queue_time,
            "average_crusher_queue_time_min": avg_crusher_queue_time,
            "raw_truck_stats": self.truck_stats,
            "raw_loader_stats": self.loader_stats,
            "raw_crusher_stats": self.crusher_stats,
            "raw_edge_stats": self.edge_stats
        }