src/mine_sim/runner.py

← Back to submission · View raw on GitHub

"""Single-replication entry point for the mine throughput simulation.

:func:`run_replication` is the smallest public unit of work. Given a
scenario, the data directory, and a replication index it:

1. Loads the scenario-resolved :class:`~mine_sim.topology.Topology`.
2. Computes the static :class:`~mine_sim.routing.RoutingTable` and runs the
   reachability self-check (fails loudly on a missing OD pair).
3. Builds an independent :class:`~mine_sim.rng.ReplicationRNG` seeded at
   ``base_random_seed + replication_index``.
4. Runs a :class:`~mine_sim.model.MineSimulation` to the shift cut and
   finalises metrics.

The function is deterministic for a given ``(scenario, replication_index)``:
identical inputs always produce identical metric and event outputs.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Iterable

from mine_sim.events import EventRecord
from mine_sim.metrics import MetricsRecorder, ReplicationMetrics
from mine_sim.model import MineSimulation
from mine_sim.rng import make_replication_rng
from mine_sim.routing import RoutingTable, assert_reachable, compute_routes
from mine_sim.scenarios import ScenarioConfig
from mine_sim.topology import Topology, build_topology

#: Fallback payload only used if trucks.csv is empty (itself a bug); kept here
#: so the value never silently leaks into a real scenario.
_DEFAULT_PAYLOAD_TONNES = 100.0


@dataclass(frozen=True)
class ReplicationResult:
    """Output of one simulation replication: KPIs + event log + shared graph."""

    metrics: ReplicationMetrics
    events: tuple[EventRecord, ...]
    topology: Topology
    routing: RoutingTable


def _resolve_payload_tonnes(topology: Topology) -> float:
    """All trucks share one payload by spec; assert that and return it."""
    if not topology.trucks:
        return _DEFAULT_PAYLOAD_TONNES
    payloads = {truck.payload_tonnes for truck in topology.trucks}
    if len(payloads) != 1:
        raise ValueError(
            "Heterogeneous payloads detected; the model assumes a single "
            f"payload value but got {sorted(payloads)}"
        )
    return float(next(iter(payloads)))


def _capacity_edge_ids(topology: Topology) -> tuple[str, ...]:
    return tuple(sorted(topology.capacity_constrained_edges()))


def _crusher_dump_id(topology: Topology) -> str:
    for dump in topology.dump_points.values():
        if dump.type == "crusher":
            return dump.dump_id
    raise RuntimeError("No crusher dump point found in topology")


def _truck_ids(topology: Topology) -> tuple[str, ...]:
    return tuple(truck.truck_id for truck in topology.trucks)


def _loader_ids(topology: Topology) -> tuple[str, ...]:
    return tuple(sorted(topology.loaders))


def run_replication(
    scenario: ScenarioConfig,
    data_dir: str | Path,
    replication_index: int,
    *,
    topology: Topology | None = None,
    routing: RoutingTable | None = None,
) -> ReplicationResult:
    """Execute one replication and return KPIs + event log.

    ``topology``/``routing`` may be passed precomputed so a scenario only
    pays the CSV-load + Dijkstra cost once across its 30 replications.
    """
    if replication_index < 0:
        raise ValueError(f"replication_index must be >= 0, got {replication_index}")

    if topology is None:
        topology = build_topology(data_dir, scenario)
    if routing is None:
        routing = compute_routes(topology)
        assert_reachable(routing, scenario_id=scenario.scenario_id)

    rng = make_replication_rng(
        base_seed=scenario.simulation.base_random_seed,
        replication_index=replication_index,
    )

    recorder = MetricsRecorder(
        scenario_id=scenario.scenario_id,
        replication_index=replication_index,
        random_seed=rng.seed,
        shift_length_min=scenario.simulation.shift_length_minutes,
        payload_tonnes=_resolve_payload_tonnes(topology),
        truck_ids=_truck_ids(topology),
        loader_ids=_loader_ids(topology),
        crusher_id=_crusher_dump_id(topology),
        capacity_edge_ids=_capacity_edge_ids(topology),
    )

    sim = MineSimulation(
        scenario=scenario,
        topology=topology,
        routes=routing,
        rng=rng,
        recorder=recorder,
    )
    sim.run()

    return ReplicationResult(
        metrics=recorder.finalise(),
        events=tuple(sim.events),
        topology=topology,
        routing=routing,
    )


def run_replications(
    scenario: ScenarioConfig,
    data_dir: str | Path,
    replication_indices: Iterable[int] | None = None,
) -> list[ReplicationResult]:
    """Run a sequence of replications for one scenario.

    Topology and routing are computed once and shared, so the reachability
    self-check runs exactly once per scenario.
    """
    topology = build_topology(data_dir, scenario)
    routing = compute_routes(topology)
    assert_reachable(routing, scenario_id=scenario.scenario_id)

    indices = (
        list(replication_indices)
        if replication_indices is not None
        else list(range(scenario.simulation.replications))
    )
    return [
        run_replication(
            scenario=scenario,
            data_dir=data_dir,
            replication_index=idx,
            topology=topology,
            routing=routing,
        )
        for idx in indices
    ]


__all__ = ["ReplicationResult", "run_replication", "run_replications"]