"""Single-replication entry point for the mine throughput simulation.
:func:`run_replication` is the smallest public unit of work. Given a
scenario, the data directory, and a replication index it:
1. Loads the scenario-resolved :class:`~mine_sim.topology.Topology`.
2. Computes the static :class:`~mine_sim.routing.RoutingTable` and runs the
reachability self-check (fails loudly on a missing OD pair).
3. Builds an independent :class:`~mine_sim.rng.ReplicationRNG` seeded at
``base_random_seed + replication_index``.
4. Runs a :class:`~mine_sim.model.MineSimulation` to the shift cut and
finalises metrics.
The function is deterministic for a given ``(scenario, replication_index)``:
identical inputs always produce identical metric and event outputs.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
from mine_sim.events import EventRecord
from mine_sim.metrics import MetricsRecorder, ReplicationMetrics
from mine_sim.model import MineSimulation
from mine_sim.rng import make_replication_rng
from mine_sim.routing import RoutingTable, assert_reachable, compute_routes
from mine_sim.scenarios import ScenarioConfig
from mine_sim.topology import Topology, build_topology
#: Fallback payload only used if trucks.csv is empty (itself a bug); kept here
#: so the value never silently leaks into a real scenario.
_DEFAULT_PAYLOAD_TONNES = 100.0
@dataclass(frozen=True)
class ReplicationResult:
"""Output of one simulation replication: KPIs + event log + shared graph."""
metrics: ReplicationMetrics
events: tuple[EventRecord, ...]
topology: Topology
routing: RoutingTable
def _resolve_payload_tonnes(topology: Topology) -> float:
"""All trucks share one payload by spec; assert that and return it."""
if not topology.trucks:
return _DEFAULT_PAYLOAD_TONNES
payloads = {truck.payload_tonnes for truck in topology.trucks}
if len(payloads) != 1:
raise ValueError(
"Heterogeneous payloads detected; the model assumes a single "
f"payload value but got {sorted(payloads)}"
)
return float(next(iter(payloads)))
def _capacity_edge_ids(topology: Topology) -> tuple[str, ...]:
return tuple(sorted(topology.capacity_constrained_edges()))
def _crusher_dump_id(topology: Topology) -> str:
for dump in topology.dump_points.values():
if dump.type == "crusher":
return dump.dump_id
raise RuntimeError("No crusher dump point found in topology")
def _truck_ids(topology: Topology) -> tuple[str, ...]:
return tuple(truck.truck_id for truck in topology.trucks)
def _loader_ids(topology: Topology) -> tuple[str, ...]:
return tuple(sorted(topology.loaders))
def run_replication(
scenario: ScenarioConfig,
data_dir: str | Path,
replication_index: int,
*,
topology: Topology | None = None,
routing: RoutingTable | None = None,
) -> ReplicationResult:
"""Execute one replication and return KPIs + event log.
``topology``/``routing`` may be passed precomputed so a scenario only
pays the CSV-load + Dijkstra cost once across its 30 replications.
"""
if replication_index < 0:
raise ValueError(f"replication_index must be >= 0, got {replication_index}")
if topology is None:
topology = build_topology(data_dir, scenario)
if routing is None:
routing = compute_routes(topology)
assert_reachable(routing, scenario_id=scenario.scenario_id)
rng = make_replication_rng(
base_seed=scenario.simulation.base_random_seed,
replication_index=replication_index,
)
recorder = MetricsRecorder(
scenario_id=scenario.scenario_id,
replication_index=replication_index,
random_seed=rng.seed,
shift_length_min=scenario.simulation.shift_length_minutes,
payload_tonnes=_resolve_payload_tonnes(topology),
truck_ids=_truck_ids(topology),
loader_ids=_loader_ids(topology),
crusher_id=_crusher_dump_id(topology),
capacity_edge_ids=_capacity_edge_ids(topology),
)
sim = MineSimulation(
scenario=scenario,
topology=topology,
routes=routing,
rng=rng,
recorder=recorder,
)
sim.run()
return ReplicationResult(
metrics=recorder.finalise(),
events=tuple(sim.events),
topology=topology,
routing=routing,
)
def run_replications(
scenario: ScenarioConfig,
data_dir: str | Path,
replication_indices: Iterable[int] | None = None,
) -> list[ReplicationResult]:
"""Run a sequence of replications for one scenario.
Topology and routing are computed once and shared, so the reachability
self-check runs exactly once per scenario.
"""
topology = build_topology(data_dir, scenario)
routing = compute_routes(topology)
assert_reachable(routing, scenario_id=scenario.scenario_id)
indices = (
list(replication_indices)
if replication_indices is not None
else list(range(scenario.simulation.replications))
)
return [
run_replication(
scenario=scenario,
data_dir=data_dir,
replication_index=idx,
topology=topology,
routing=routing,
)
for idx in indices
]
__all__ = ["ReplicationResult", "run_replication", "run_replications"]
src/mine_sim/runner.py
← Back to submission · View raw on GitHub