src/mine_sim/simulation.py

← Back to submission · View raw on GitHub

"""SimPy discrete-event mine throughput simulation.

One replication produces a typed ``ReplicationResult`` with throughput, per-truck
productive / queue time, and per-resource utilisation / queue stats. Event log
entries are emitted as plain dicts for downstream CSV writing.

Design (per interview-decisions memory):

* One ``simpy.Resource`` per directed capacity-1 edge (mirrors the CSV).
* Lognormal travel-time noise with cv=0.10 around free-flow mean.
* Normal-truncated load and dump samples; sample = ``max(0.1, N(mu, sigma))``.
* Dispatch rule: ``min(travel_empty + queue_len * mean_load + own_load)``.
* Simultaneous t=0 dispatch for all trucks.
* Hard cut at ``shift_length_min`` — ``env.run(until=shift_length_min)``;
  throughput counts only ``dump_end`` events with time <= shift_length_min.
* Per-rep RNG seed = ``base_random_seed + rep_idx``.
* Truck utilisation = productive time only (loading + dumping + traveling).
"""

from __future__ import annotations

import math
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple

import numpy as np
import simpy

from .data import StaticData, Truck
from .routing import RoutingTables, build_routing, choose_loader
from .scenarios import ScenarioConfig
from .topology import ScenarioTopology, free_flow_minutes


@dataclass
class TruckStats:
    truck_id: str
    productive_min: float = 0.0
    loader_queue_min: float = 0.0
    crusher_queue_min: float = 0.0
    edge_queue_min: float = 0.0
    cycles_completed: int = 0
    tonnes_delivered: float = 0.0
    cycle_times: List[float] = field(default_factory=list)


@dataclass
class ResourceStats:
    """Tracking object for a single SimPy resource (loader / crusher / edge).

    ``busy_intervals`` is a list of ``(start_min, end_min)`` pairs giving the
    time the resource was *in use*. ``queue_waits`` is a list of (wait_minutes)
    values, one per requesting truck.
    """
    resource_id: str
    kind: str   # "loader" | "crusher" | "edge"
    busy_intervals: List[Tuple[float, float]] = field(default_factory=list)
    queue_waits: List[float] = field(default_factory=list)


@dataclass
class ReplicationResult:
    scenario_id: str
    replication: int
    random_seed: int
    shift_length_min: float
    total_tonnes: float
    tonnes_per_hour: float
    truck_stats: Dict[str, TruckStats]
    resource_stats: Dict[str, ResourceStats]
    events: List[Dict[str, Any]]


# ---------------------------------------------------------------------------
# Helpers


def _sample_lognormal(rng: np.random.Generator, mean: float, cv: float) -> float:
    """Lognormal sample with target arithmetic mean ``mean`` and coefficient of
    variation ``cv``. Falls back to ``mean`` for degenerate inputs.
    """
    if mean <= 0:
        return 0.0
    if cv <= 0:
        return mean
    sigma2 = math.log(1.0 + cv * cv)
    sigma = math.sqrt(sigma2)
    mu = math.log(mean) - 0.5 * sigma2
    return float(rng.lognormal(mean=mu, sigma=sigma))


def _sample_truncnorm(rng: np.random.Generator, mean: float, sd: float, floor: float = 0.1) -> float:
    """Sample N(mean, sd) and clamp to ``floor`` from below."""
    if sd <= 0:
        return float(max(floor, mean))
    s = float(rng.normal(loc=mean, scale=sd))
    return float(max(floor, s))


# ---------------------------------------------------------------------------
# Event log helper


class EventLog:
    """Lightweight, append-only event log."""

    def __init__(self) -> None:
        self.events: List[Dict[str, Any]] = []

    def add(
        self,
        env: simpy.Environment,
        *,
        replication: int,
        scenario_id: str,
        truck_id: str,
        event_type: str,
        from_node: str = "",
        to_node: str = "",
        location: str = "",
        loaded: Optional[bool] = None,
        payload_tonnes: float = 0.0,
        resource_id: str = "",
        queue_length: int = 0,
    ) -> None:
        self.events.append({
            "time_min": round(env.now, 4),
            "replication": replication,
            "scenario_id": scenario_id,
            "truck_id": truck_id,
            "event_type": event_type,
            "from_node": from_node,
            "to_node": to_node,
            "location": location,
            "loaded": "" if loaded is None else int(bool(loaded)),
            "payload_tonnes": payload_tonnes,
            "resource_id": resource_id,
            "queue_length": queue_length,
        })


# ---------------------------------------------------------------------------
# Truck process factory


def _truck_process(
    env: simpy.Environment,
    *,
    rng: np.random.Generator,
    truck: Truck,
    scenario: ScenarioConfig,
    topology: ScenarioTopology,
    routing: RoutingTables,
    loader_resources: Dict[str, simpy.Resource],
    crusher_resource: simpy.Resource,
    crusher_id: str,
    crusher_node: str,
    edge_resources: Dict[str, simpy.Resource],
    edge_stats: Dict[str, ResourceStats],
    loader_stats: Dict[str, ResourceStats],
    crusher_stats: ResourceStats,
    truck_stats: TruckStats,
    log: EventLog,
    travel_cv: float,
):
    """Generator: loops dispatch -> load -> haul -> dump -> return until shift cut."""
    shift_min = scenario.shift_length_minutes
    candidate_loader_ids = list(loader_resources.keys())
    crusher_dump_mean = float(topology.dump_points_effective[crusher_id]["mean_dump_time_min"])
    crusher_dump_sd = float(topology.dump_points_effective[crusher_id]["sd_dump_time_min"])

    current_node = truck.start_node

    while env.now < shift_min:
        # --- Dispatch ---------------------------------------------------------
        queue_lengths = {
            lid: len(loader_resources[lid].queue) + (1 if loader_resources[lid].count > 0 else 0)
            for lid in candidate_loader_ids
        }
        loader_id = choose_loader(
            current_node=current_node,
            candidate_loader_ids=candidate_loader_ids,
            loaders_effective=topology.loaders_effective,
            routing=routing,
            queue_lengths=queue_lengths,
        )
        loader_attrs = topology.loaders_effective[loader_id]
        loader_node = loader_attrs["node_id"]

        cycle_start = env.now
        log.add(env, replication=-1, scenario_id=scenario.scenario_id, truck_id=truck.truck_id,
                event_type="dispatched", from_node=current_node, to_node=loader_node,
                location=current_node, loaded=False, resource_id=loader_id)

        # --- Travel empty to loader ------------------------------------------
        yield from _traverse(
            env, truck.truck_id, scenario.scenario_id,
            current_node, loader_node, loaded=False,
            routing=routing, edge_resources=edge_resources, edge_stats=edge_stats,
            truck_stats=truck_stats, log=log, rng=rng, travel_cv=travel_cv,
            payload_tonnes=0.0,
        )
        current_node = loader_node
        log.add(env, replication=-1, scenario_id=scenario.scenario_id, truck_id=truck.truck_id,
                event_type="arrive_loader", location=loader_node, loaded=False, resource_id=loader_id,
                queue_length=len(loader_resources[loader_id].queue))

        # --- Load -------------------------------------------------------------
        wait_start = env.now
        with loader_resources[loader_id].request() as req:
            yield req
            wait_end = env.now
            wait = wait_end - wait_start
            truck_stats.loader_queue_min += wait
            loader_stats[loader_id].queue_waits.append(wait)
            log.add(env, replication=-1, scenario_id=scenario.scenario_id, truck_id=truck.truck_id,
                    event_type="load_start", location=loader_node, loaded=False, resource_id=loader_id)
            mean_load = float(loader_attrs["mean_load_time_min"])
            sd_load = float(loader_attrs["sd_load_time_min"])
            load_t = _sample_truncnorm(rng, mean_load, sd_load)
            busy_start = env.now
            yield env.timeout(load_t)
            busy_end = env.now
            loader_stats[loader_id].busy_intervals.append((busy_start, busy_end))
            truck_stats.productive_min += (busy_end - busy_start)
            payload = truck.payload_tonnes
            log.add(env, replication=-1, scenario_id=scenario.scenario_id, truck_id=truck.truck_id,
                    event_type="load_end", location=loader_node, loaded=True,
                    payload_tonnes=payload, resource_id=loader_id)

        # --- Travel loaded to crusher ----------------------------------------
        yield from _traverse(
            env, truck.truck_id, scenario.scenario_id,
            loader_node, crusher_node, loaded=True,
            routing=routing, edge_resources=edge_resources, edge_stats=edge_stats,
            truck_stats=truck_stats, log=log, rng=rng, travel_cv=travel_cv,
            payload_tonnes=truck.payload_tonnes,
        )
        current_node = crusher_node
        log.add(env, replication=-1, scenario_id=scenario.scenario_id, truck_id=truck.truck_id,
                event_type="arrive_crusher", location=crusher_node, loaded=True,
                payload_tonnes=truck.payload_tonnes, resource_id=crusher_id,
                queue_length=len(crusher_resource.queue))

        # --- Dump -------------------------------------------------------------
        wait_start = env.now
        with crusher_resource.request() as req:
            yield req
            wait_end = env.now
            wait = wait_end - wait_start
            truck_stats.crusher_queue_min += wait
            crusher_stats.queue_waits.append(wait)
            log.add(env, replication=-1, scenario_id=scenario.scenario_id, truck_id=truck.truck_id,
                    event_type="dump_start", location=crusher_node, loaded=True,
                    payload_tonnes=truck.payload_tonnes, resource_id=crusher_id)
            dump_t = _sample_truncnorm(rng, crusher_dump_mean, crusher_dump_sd)
            busy_start = env.now
            yield env.timeout(dump_t)
            busy_end = env.now
            crusher_stats.busy_intervals.append((busy_start, busy_end))
            truck_stats.productive_min += (busy_end - busy_start)
            cycle_end = env.now
            cycle_time = cycle_end - cycle_start
            # Throughput counts only dumps completed within the shift window.
            if busy_end <= shift_min:
                truck_stats.tonnes_delivered += truck.payload_tonnes
                truck_stats.cycles_completed += 1
                truck_stats.cycle_times.append(cycle_time)
            log.add(env, replication=-1, scenario_id=scenario.scenario_id, truck_id=truck.truck_id,
                    event_type="dump_end", location=crusher_node, loaded=False,
                    payload_tonnes=truck.payload_tonnes, resource_id=crusher_id)

    # Truck terminates by simply returning when env.run hits the cut. (We don't
    # need explicit teardown; cancelled yields are dropped automatically.)


def _traverse(
    env: simpy.Environment,
    truck_id: str,
    scenario_id: str,
    src: str,
    dst: str,
    *,
    loaded: bool,
    routing: RoutingTables,
    edge_resources: Dict[str, simpy.Resource],
    edge_stats: Dict[str, ResourceStats],
    truck_stats: TruckStats,
    log: EventLog,
    rng: np.random.Generator,
    travel_cv: float,
    payload_tonnes: float,
):
    """Walk an ordered edge path, requesting capacity-1 edge resources as needed."""
    if src == dst:
        return
    edge_path = routing.edge_ids[(src, dst)]
    node_path = routing.paths[(src, dst)]
    free_flow_map = routing.edge_time_loaded if loaded else routing.edge_time_empty
    for idx, eid in enumerate(edge_path):
        u_node = node_path[idx]
        v_node = node_path[idx + 1]
        nominal = free_flow_map[eid]
        actual = _sample_lognormal(rng, mean=nominal, cv=travel_cv)
        if eid in edge_resources:
            wait_start = env.now
            with edge_resources[eid].request() as ereq:
                yield ereq
                wait_end = env.now
                wait = wait_end - wait_start
                truck_stats.edge_queue_min += wait
                edge_stats[eid].queue_waits.append(wait)
                log.add(env, replication=-1, scenario_id=scenario_id, truck_id=truck_id,
                        event_type="enter_edge", from_node=u_node, to_node=v_node,
                        location=u_node, loaded=loaded, payload_tonnes=payload_tonnes,
                        resource_id=eid, queue_length=len(edge_resources[eid].queue))
                busy_start = env.now
                yield env.timeout(actual)
                busy_end = env.now
                edge_stats[eid].busy_intervals.append((busy_start, busy_end))
                truck_stats.productive_min += (busy_end - busy_start)
                log.add(env, replication=-1, scenario_id=scenario_id, truck_id=truck_id,
                        event_type="leave_edge", from_node=u_node, to_node=v_node,
                        location=v_node, loaded=loaded, payload_tonnes=payload_tonnes,
                        resource_id=eid)
        else:
            yield env.timeout(actual)
            truck_stats.productive_min += actual


# ---------------------------------------------------------------------------
# Replication entry point


def run_replication(
    *,
    static: StaticData,
    scenario: ScenarioConfig,
    topology: ScenarioTopology,
    rep_idx: int,
) -> ReplicationResult:
    """Run a single replication and return the typed result."""
    seed = scenario.base_random_seed + rep_idx
    rng = np.random.default_rng(seed)

    env = simpy.Environment()

    # Identify the crusher dump_id (production target).
    crusher_id: Optional[str] = None
    crusher_node: Optional[str] = None
    target_node = scenario.production.get("dump_destination", "CRUSH")
    for did, attrs in topology.dump_points_effective.items():
        if attrs["node_id"] == target_node:
            crusher_id = did
            crusher_node = attrs["node_id"]
            break
    if crusher_id is None or crusher_node is None:
        raise RuntimeError(f"Could not locate crusher dump for node '{target_node}'")

    # Loader resources.
    loader_resources: Dict[str, simpy.Resource] = {}
    loader_stats: Dict[str, ResourceStats] = {}
    for lid, attrs in topology.loaders_effective.items():
        capacity = max(1, int(attrs["capacity"]))
        loader_resources[lid] = simpy.Resource(env, capacity=capacity)
        loader_stats[lid] = ResourceStats(resource_id=lid, kind="loader")

    # Crusher resource.
    crusher_capacity = max(1, int(topology.dump_points_effective[crusher_id]["capacity"]))
    crusher_resource = simpy.Resource(env, capacity=crusher_capacity)
    crusher_stats = ResourceStats(resource_id=crusher_id, kind="crusher")

    # Capacity-1 edge resources.
    edge_resources: Dict[str, simpy.Resource] = {}
    edge_stats: Dict[str, ResourceStats] = {}
    for eid in topology.capacity_one_edge_ids:
        edge_resources[eid] = simpy.Resource(env, capacity=1)
        edge_stats[eid] = ResourceStats(resource_id=eid, kind="edge")

    # Routing — based on the *first* truck's speed factors. Per memory the routing
    # is static per scenario; all trucks share identical factors in our data.
    sample_truck = static.trucks[0]
    routing = build_routing(
        topology,
        empty_speed_factor=sample_truck.empty_speed_factor,
        loaded_speed_factor=sample_truck.loaded_speed_factor,
    )

    # Build truck stats and processes.
    fleet = static.trucks[: scenario.truck_count]
    truck_stats: Dict[str, TruckStats] = {t.truck_id: TruckStats(truck_id=t.truck_id) for t in fleet}
    log = EventLog()

    travel_cv = float(scenario.stochasticity.get("travel_time_noise_cv", 0.10))

    for t in fleet:
        env.process(_truck_process(
            env,
            rng=rng,
            truck=t,
            scenario=scenario,
            topology=topology,
            routing=routing,
            loader_resources=loader_resources,
            crusher_resource=crusher_resource,
            crusher_id=crusher_id,
            crusher_node=crusher_node,
            edge_resources=edge_resources,
            edge_stats=edge_stats,
            loader_stats=loader_stats,
            crusher_stats=crusher_stats,
            truck_stats=truck_stats[t.truck_id],
            log=log,
            travel_cv=travel_cv,
        ))

    env.run(until=scenario.shift_length_minutes)

    # Stamp the rep idx onto every event (we used -1 as a placeholder).
    for ev in log.events:
        ev["replication"] = rep_idx

    total_tonnes = sum(ts.tonnes_delivered for ts in truck_stats.values())
    tonnes_per_hour = total_tonnes / scenario.shift_length_hours

    resource_stats: Dict[str, ResourceStats] = {}
    for lid, rs in loader_stats.items():
        resource_stats[f"loader::{lid}"] = rs
    resource_stats[f"crusher::{crusher_id}"] = crusher_stats
    for eid, rs in edge_stats.items():
        resource_stats[f"edge::{eid}"] = rs

    return ReplicationResult(
        scenario_id=scenario.scenario_id,
        replication=rep_idx,
        random_seed=seed,
        shift_length_min=scenario.shift_length_minutes,
        total_tonnes=total_tonnes,
        tonnes_per_hour=tonnes_per_hour,
        truck_stats=truck_stats,
        resource_stats=resource_stats,
        events=log.events,
    )