"""TickStateRecorder observer for unified simulation metrics.
Implements SimulationObserver protocol to record tick-by-tick state
during simulation runs. Supports two modes:
- "interactive": Rolling window of recent ticks (for dashboard)
- "batch": Accumulates all history (for parameter sweeps)
Note: This class was renamed from MetricsCollector to TickStateRecorder
to avoid namespace collision with src/babylon/metrics/collector.py
(RAG telemetry collection).
Sprint 4.1: Phase 4 Dashboard/Sweeper unification.
Sprint 4.1C: Add JSON export for DAG structure preservation.
"""
from __future__ import annotations
import json
from collections import deque
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Final, Literal
from babylon.config.defines import GameDefines
from babylon.models.entity_registry import ENTITY_SLOT_NAMES, METRICS_ENTITY_IDS
from babylon.models.enums import EdgeType
from babylon.models.metrics import EdgeMetrics, EntityMetrics, SweepSummary, TickMetrics
if TYPE_CHECKING:
from babylon.config.defines import GameDefines
from babylon.models.config import SimulationConfig
from babylon.models.entities.social_class import SocialClass
from babylon.models.world_state import WorldState
# Entity ID to slot mapping - derived from entity registry
ENTITY_SLOTS: Final[dict[str, str]] = {
entity_id: ENTITY_SLOT_NAMES[entity_id] for entity_id in METRICS_ENTITY_IDS
}
[docs]
class TickStateRecorder:
"""Observer that records simulation state at each tick for analysis.
Implements SimulationObserver protocol. Extracts entity and edge
metrics at each tick, with optional rolling window for memory
efficiency in interactive mode.
Renamed from MetricsCollector to avoid namespace collision with
babylon.metrics.collector.MetricsCollector (RAG telemetry).
"""
[docs]
def __init__(
self,
mode: Literal["interactive", "batch"] = "interactive",
rolling_window: int = 50,
) -> None:
"""Initialize the collector.
Args:
mode: "interactive" uses rolling window, "batch" keeps all history
rolling_window: Maximum ticks to keep in interactive mode
"""
self._mode: Literal["interactive", "batch"] = mode
self._rolling_window = rolling_window
self._history: deque[TickMetrics] | list[TickMetrics]
if mode == "interactive":
self._history = deque(maxlen=rolling_window)
else:
self._history = []
@property
def name(self) -> str:
"""Return observer identifier."""
return "TickStateRecorder"
@property
def latest(self) -> TickMetrics | None:
"""Return most recent tick metrics, or None if empty."""
if not self._history:
return None
return self._history[-1]
@property
def history(self) -> list[TickMetrics]:
"""Return metrics history as a list."""
return list(self._history)
@property
def summary(self) -> SweepSummary | None:
"""Return sweep summary, or None if no data collected."""
if not self._history:
return None
return self._compute_summary()
[docs]
def on_simulation_start(
self,
initial_state: WorldState,
config: SimulationConfig, # noqa: ARG002 - Required by SimulationObserver protocol
) -> None:
"""Called when simulation begins. Clears history and records tick 0."""
# Clear history
if self._mode == "interactive":
self._history = deque(maxlen=self._rolling_window)
else:
self._history = []
# Record initial state
snapshot = self._record_snapshot(initial_state)
self._history.append(snapshot)
[docs]
def on_tick(
self,
previous_state: WorldState, # noqa: ARG002 - Required by SimulationObserver protocol
new_state: WorldState,
) -> None:
"""Called after each tick completes. Records new state."""
snapshot = self._record_snapshot(new_state)
self._history.append(snapshot)
[docs]
def on_simulation_end(self, final_state: WorldState) -> None:
"""Called when simulation ends. No-op for MetricsCollector."""
pass
[docs]
def to_csv_rows(self) -> list[dict[str, Any]]:
"""Export metrics history as list of dicts for CSV output."""
rows: list[dict[str, Any]] = []
for tick_metrics in self._history:
row: dict[str, Any] = {"tick": tick_metrics.tick}
# Flatten p_w metrics
if tick_metrics.p_w is not None:
row["p_w_wealth"] = tick_metrics.p_w.wealth
row["p_w_consciousness"] = tick_metrics.p_w.consciousness
row["p_w_national_identity"] = tick_metrics.p_w.national_identity
row["p_w_agitation"] = tick_metrics.p_w.agitation
row["p_w_psa"] = tick_metrics.p_w.p_acquiescence
row["p_w_psr"] = tick_metrics.p_w.p_revolution
row["p_w_organization"] = tick_metrics.p_w.organization
# Flatten p_c metrics (wealth only)
if tick_metrics.p_c is not None:
row["p_c_wealth"] = tick_metrics.p_c.wealth
# Flatten c_b metrics (wealth only)
if tick_metrics.c_b is not None:
row["c_b_wealth"] = tick_metrics.c_b.wealth
# Flatten c_w metrics
if tick_metrics.c_w is not None:
row["c_w_wealth"] = tick_metrics.c_w.wealth
row["c_w_consciousness"] = tick_metrics.c_w.consciousness
row["c_w_national_identity"] = tick_metrics.c_w.national_identity
row["c_w_agitation"] = tick_metrics.c_w.agitation
# Edge metrics
row["exploitation_tension"] = tick_metrics.edges.exploitation_tension
row["exploitation_rent"] = tick_metrics.edges.exploitation_rent
row["tribute_flow"] = tick_metrics.edges.tribute_flow
row["wages_paid"] = tick_metrics.edges.wages_paid
row["solidarity_strength"] = tick_metrics.edges.solidarity_strength
# Global metrics
row["imperial_rent_pool"] = tick_metrics.imperial_rent_pool
row["global_tension"] = tick_metrics.global_tension
# Economy drivers (Phase 4.1B)
row["current_super_wage_rate"] = tick_metrics.current_super_wage_rate
row["current_repression_level"] = tick_metrics.current_repression_level
row["pool_ratio"] = tick_metrics.pool_ratio
# Derived differentials (Phase 4.1B)
row["consciousness_gap"] = tick_metrics.consciousness_gap
row["wealth_gap"] = tick_metrics.wealth_gap
# Ecological metrics (Slice 1.4)
row["overshoot_ratio"] = tick_metrics.overshoot_ratio
row["total_biocapacity"] = tick_metrics.total_biocapacity
row["total_consumption"] = tick_metrics.total_consumption
rows.append(row)
return rows
[docs]
def to_json(
self,
defines: GameDefines,
config: SimulationConfig,
csv_path: Path | None = None,
) -> dict[str, Any]:
"""Export run metadata as structured JSON for reproducibility.
Captures the causal DAG hierarchy:
- Level 1 (Fundamental): GameDefines parameters
- Level 2 (Config): SimulationConfig settings
- Level 3 (Emergent): SweepSummary computed from simulation
Args:
defines: GameDefines with fundamental parameters
config: SimulationConfig with run settings
csv_path: Optional path to associated CSV time-series file
Returns:
Structured dict ready for JSON serialization
"""
summary = self.summary
return {
"schema_version": "1.0",
"generated_at": datetime.now(UTC).isoformat(),
"causal_dag_levels": {
"fundamental": "GameDefines - exogenous parameters",
"config": "SimulationConfig - run settings",
"emergent": "SweepSummary - observed outcomes",
},
"fundamentals": defines.model_dump(mode="json"),
"config": config.model_dump(mode="json"),
"summary": summary.model_dump(mode="json") if summary else None,
"ticks_collected": len(self._history),
"time_series_csv": str(csv_path) if csv_path else None,
}
[docs]
def export_json(
self,
path: Path,
defines: GameDefines,
config: SimulationConfig,
csv_path: Path | None = None,
) -> None:
"""Write JSON metadata to file.
Args:
path: Output path for JSON file
defines: GameDefines with fundamental parameters
config: SimulationConfig with run settings
csv_path: Optional path to associated CSV time-series file
"""
data = self.to_json(defines, config, csv_path)
path.write_text(json.dumps(data, indent=2, default=str))
def _record_snapshot(self, state: WorldState) -> TickMetrics:
"""Extract metrics from WorldState and create TickMetrics."""
# Extract entity metrics
entity_slots: dict[str, EntityMetrics | None] = {}
for entity_id, slot_name in ENTITY_SLOTS.items():
entity = state.entities.get(entity_id)
if entity is not None:
entity_slots[slot_name] = self._extract_entity_metrics(entity)
else:
entity_slots[slot_name] = None
# Extract edge metrics
edge_metrics = self._extract_edge_metrics(state)
# Extract global metrics
imperial_rent_pool = 0.0
if state.economy is not None:
imperial_rent_pool = float(state.economy.imperial_rent_pool)
global_tension = self._compute_global_tension(state)
# Extract economy drivers
current_super_wage_rate = 0.20
current_repression_level = 0.5
pool_ratio = 1.0
if state.economy is not None:
current_super_wage_rate = float(state.economy.current_super_wage_rate)
current_repression_level = float(state.economy.current_repression_level)
pool_ratio = min(float(state.economy.imperial_rent_pool) / 100.0, 1.0)
# Calculate differentials
consciousness_gap = 0.0
wealth_gap = 0.0
p_w = entity_slots.get("p_w")
c_w = entity_slots.get("c_w")
c_b = entity_slots.get("c_b")
if p_w is not None and c_w is not None:
consciousness_gap = float(p_w.consciousness) - float(c_w.consciousness)
if c_b is not None and p_w is not None:
wealth_gap = float(c_b.wealth) - float(p_w.wealth)
# Ecological Metrics (Slice 1.4)
overshoot_ratio = state.overshoot_ratio
total_biocapacity = float(state.total_biocapacity)
total_consumption = float(state.total_consumption)
return TickMetrics(
tick=state.tick,
p_w=entity_slots.get("p_w"),
p_c=entity_slots.get("p_c"),
c_b=entity_slots.get("c_b"),
c_w=entity_slots.get("c_w"),
edges=edge_metrics,
imperial_rent_pool=imperial_rent_pool,
global_tension=global_tension,
current_super_wage_rate=current_super_wage_rate,
current_repression_level=current_repression_level,
pool_ratio=pool_ratio,
consciousness_gap=consciousness_gap,
wealth_gap=wealth_gap,
overshoot_ratio=overshoot_ratio,
total_biocapacity=total_biocapacity,
total_consumption=total_consumption,
)
def _extract_entity_metrics(self, entity: SocialClass) -> EntityMetrics:
"""Extract EntityMetrics from a SocialClass entity."""
return EntityMetrics(
wealth=float(entity.wealth),
consciousness=float(entity.ideology.class_consciousness),
national_identity=float(entity.ideology.national_identity),
agitation=float(entity.ideology.agitation),
p_acquiescence=float(entity.p_acquiescence),
p_revolution=float(entity.p_revolution),
organization=float(entity.organization),
population=entity.population,
)
def _extract_edge_metrics(self, state: WorldState) -> EdgeMetrics:
"""Extract EdgeMetrics from WorldState relationships with aggregation."""
exploitation_tensions: list[float] = []
exploitation_rents: list[float] = []
tribute_flows: list[float] = []
wages_paid_list: list[float] = []
solidarity_strengths: list[float] = []
for rel in state.relationships:
if rel.edge_type == EdgeType.EXPLOITATION:
exploitation_tensions.append(float(rel.tension))
exploitation_rents.append(float(rel.value_flow))
elif rel.edge_type == EdgeType.TRIBUTE:
tribute_flows.append(float(rel.value_flow))
elif rel.edge_type == EdgeType.WAGES:
wages_paid_list.append(float(rel.value_flow))
elif rel.edge_type == EdgeType.SOLIDARITY:
solidarity_strengths.append(float(rel.solidarity_strength))
return EdgeMetrics(
exploitation_tension=max(exploitation_tensions, default=0.0),
exploitation_rent=sum(exploitation_rents),
tribute_flow=sum(tribute_flows),
wages_paid=sum(wages_paid_list),
solidarity_strength=max(solidarity_strengths, default=0.0),
)
def _compute_global_tension(self, state: WorldState) -> float:
"""Compute average tension across EXPLOITATION relationships only."""
exploitation_rels = [
rel for rel in state.relationships if rel.edge_type == EdgeType.EXPLOITATION
]
if not exploitation_rels:
return 0.0
total_tension = sum(float(rel.tension) for rel in exploitation_rels)
return total_tension / len(exploitation_rels)
def _compute_summary(self) -> SweepSummary:
"""Compute SweepSummary from history."""
history_list = list(self._history)
if not history_list:
return SweepSummary(
ticks_survived=0,
outcome="ERROR",
final_p_w_wealth=0.0,
final_p_c_wealth=0.0,
final_c_b_wealth=0.0,
final_c_w_wealth=0.0,
max_tension=0.0,
crossover_tick=None,
cumulative_rent=0.0,
peak_p_w_consciousness=0.0,
peak_c_w_consciousness=0.0,
)
last_tick = history_list[-1]
ticks_survived = len(history_list)
# Determine outcome
p_w_wealth = last_tick.p_w.wealth if last_tick.p_w is not None else 0.0
outcome: Literal["SURVIVED", "DIED", "ERROR"] = (
"DIED" if p_w_wealth <= GameDefines().economy.death_threshold else "SURVIVED"
)
# Final wealth values
final_p_w_wealth = last_tick.p_w.wealth if last_tick.p_w is not None else 0.0
final_p_c_wealth = last_tick.p_c.wealth if last_tick.p_c is not None else 0.0
final_c_b_wealth = last_tick.c_b.wealth if last_tick.c_b is not None else 0.0
final_c_w_wealth = last_tick.c_w.wealth if last_tick.c_w is not None else 0.0
# Max tension
max_tension = max(tick.edges.exploitation_tension for tick in history_list)
# Crossover detection (first tick where P(S|R) > P(S|A))
crossover_tick: int | None = None
for tick in history_list:
if tick.p_w is not None and tick.p_w.p_revolution > tick.p_w.p_acquiescence:
crossover_tick = tick.tick
break
# Cumulative rent
cumulative_rent = sum(tick.edges.exploitation_rent for tick in history_list)
# Peak consciousness
peak_p_w_consciousness = max(
(tick.p_w.consciousness for tick in history_list if tick.p_w is not None),
default=0.0,
)
peak_c_w_consciousness = max(
(tick.c_w.consciousness for tick in history_list if tick.c_w is not None),
default=0.0,
)
return SweepSummary(
ticks_survived=ticks_survived,
outcome=outcome,
final_p_w_wealth=final_p_w_wealth,
final_p_c_wealth=final_p_c_wealth,
final_c_b_wealth=final_c_b_wealth,
final_c_w_wealth=final_c_w_wealth,
max_tension=max_tension,
crossover_tick=crossover_tick,
cumulative_rent=cumulative_rent,
peak_p_w_consciousness=peak_p_w_consciousness,
peak_c_w_consciousness=peak_c_w_consciousness,
)