"""MetricsCollector observer for unified simulation metrics.
Implements SimulationObserver protocol to collect comprehensive metrics
during simulation runs. Supports two modes:
- "interactive": Rolling window of recent ticks (for dashboard)
- "batch": Accumulates all history (for parameter sweeps)
Sprint 4.1: Phase 4 Dashboard/Sweeper unification.
Sprint 4.1C: Add JSON export for DAG structure preservation.
"""
from __future__ import annotations
import json
from collections import deque
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Final, Literal
from babylon.models.enums import EdgeType
from babylon.models.metrics import EdgeMetrics, EntityMetrics, SweepSummary, TickMetrics
if TYPE_CHECKING:
from babylon.config.defines import GameDefines
from babylon.models.config import SimulationConfig
from babylon.models.entities.social_class import SocialClass
from babylon.models.world_state import WorldState
# Entity ID to slot mapping
ENTITY_SLOTS: Final[dict[str, str]] = {
"C001": "p_w", # Periphery Worker
"C002": "p_c", # Comprador
"C003": "c_b", # Core Bourgeoisie
"C004": "c_w", # Labor Aristocracy
}
# Death threshold for outcome determination
DEATH_THRESHOLD: Final[float] = 0.001
[docs]
class MetricsCollector:
"""Observer that collects simulation metrics for analysis.
Implements SimulationObserver protocol. Extracts entity and edge
metrics at each tick, with optional rolling window for memory
efficiency in interactive mode.
"""
[docs]
def __init__(
self,
mode: Literal["interactive", "batch"] = "interactive",
rolling_window: int = 50,
) -> None:
"""Initialize the collector.
Args:
mode: "interactive" uses rolling window, "batch" keeps all history
rolling_window: Maximum ticks to keep in interactive mode
"""
self._mode: Literal["interactive", "batch"] = mode
self._rolling_window = rolling_window
self._history: deque[TickMetrics] | list[TickMetrics]
if mode == "interactive":
self._history = deque(maxlen=rolling_window)
else:
self._history = []
@property
def name(self) -> str:
"""Return observer identifier."""
return "MetricsCollector"
@property
def latest(self) -> TickMetrics | None:
"""Return most recent tick metrics, or None if empty."""
if not self._history:
return None
return self._history[-1]
@property
def history(self) -> list[TickMetrics]:
"""Return metrics history as a list."""
return list(self._history)
@property
def summary(self) -> SweepSummary | None:
"""Return sweep summary, or None if no data collected."""
if not self._history:
return None
return self._compute_summary()
[docs]
def on_simulation_start(
self,
initial_state: WorldState,
config: SimulationConfig, # noqa: ARG002 - Required by SimulationObserver protocol
) -> None:
"""Called when simulation begins. Clears history and records tick 0."""
# Clear history
if self._mode == "interactive":
self._history = deque(maxlen=self._rolling_window)
else:
self._history = []
# Record initial state
snapshot = self._record_snapshot(initial_state)
self._history.append(snapshot)
[docs]
def on_tick(
self,
previous_state: WorldState, # noqa: ARG002 - Required by SimulationObserver protocol
new_state: WorldState,
) -> None:
"""Called after each tick completes. Records new state."""
snapshot = self._record_snapshot(new_state)
self._history.append(snapshot)
[docs]
def on_simulation_end(self, final_state: WorldState) -> None:
"""Called when simulation ends. No-op for MetricsCollector."""
pass
[docs]
def to_csv_rows(self) -> list[dict[str, Any]]:
"""Export metrics history as list of dicts for CSV output."""
rows: list[dict[str, Any]] = []
for tick_metrics in self._history:
row: dict[str, Any] = {"tick": tick_metrics.tick}
# Flatten p_w metrics
if tick_metrics.p_w is not None:
row["p_w_wealth"] = tick_metrics.p_w.wealth
row["p_w_consciousness"] = tick_metrics.p_w.consciousness
row["p_w_national_identity"] = tick_metrics.p_w.national_identity
row["p_w_agitation"] = tick_metrics.p_w.agitation
row["p_w_psa"] = tick_metrics.p_w.p_acquiescence
row["p_w_psr"] = tick_metrics.p_w.p_revolution
row["p_w_organization"] = tick_metrics.p_w.organization
# Flatten p_c metrics (wealth only)
if tick_metrics.p_c is not None:
row["p_c_wealth"] = tick_metrics.p_c.wealth
# Flatten c_b metrics (wealth only)
if tick_metrics.c_b is not None:
row["c_b_wealth"] = tick_metrics.c_b.wealth
# Flatten c_w metrics
if tick_metrics.c_w is not None:
row["c_w_wealth"] = tick_metrics.c_w.wealth
row["c_w_consciousness"] = tick_metrics.c_w.consciousness
row["c_w_national_identity"] = tick_metrics.c_w.national_identity
row["c_w_agitation"] = tick_metrics.c_w.agitation
# Edge metrics
row["exploitation_tension"] = tick_metrics.edges.exploitation_tension
row["exploitation_rent"] = tick_metrics.edges.exploitation_rent
row["tribute_flow"] = tick_metrics.edges.tribute_flow
row["wages_paid"] = tick_metrics.edges.wages_paid
row["solidarity_strength"] = tick_metrics.edges.solidarity_strength
# Global metrics
row["imperial_rent_pool"] = tick_metrics.imperial_rent_pool
row["global_tension"] = tick_metrics.global_tension
# Economy drivers (Phase 4.1B)
row["current_super_wage_rate"] = tick_metrics.current_super_wage_rate
row["current_repression_level"] = tick_metrics.current_repression_level
row["pool_ratio"] = tick_metrics.pool_ratio
# Derived differentials (Phase 4.1B)
row["consciousness_gap"] = tick_metrics.consciousness_gap
row["wealth_gap"] = tick_metrics.wealth_gap
# Ecological metrics (Slice 1.4)
row["overshoot_ratio"] = tick_metrics.overshoot_ratio
row["total_biocapacity"] = tick_metrics.total_biocapacity
row["total_consumption"] = tick_metrics.total_consumption
rows.append(row)
return rows
[docs]
def to_json(
self,
defines: GameDefines,
config: SimulationConfig,
csv_path: Path | None = None,
) -> dict[str, Any]:
"""Export run metadata as structured JSON for reproducibility.
Captures the causal DAG hierarchy:
- Level 1 (Fundamental): GameDefines parameters
- Level 2 (Config): SimulationConfig settings
- Level 3 (Emergent): SweepSummary computed from simulation
Args:
defines: GameDefines with fundamental parameters
config: SimulationConfig with run settings
csv_path: Optional path to associated CSV time-series file
Returns:
Structured dict ready for JSON serialization
"""
summary = self.summary
return {
"schema_version": "1.0",
"generated_at": datetime.now(UTC).isoformat(),
"causal_dag_levels": {
"fundamental": "GameDefines - exogenous parameters",
"config": "SimulationConfig - run settings",
"emergent": "SweepSummary - observed outcomes",
},
"fundamentals": defines.model_dump(mode="json"),
"config": config.model_dump(mode="json"),
"summary": summary.model_dump(mode="json") if summary else None,
"ticks_collected": len(self._history),
"time_series_csv": str(csv_path) if csv_path else None,
}
[docs]
def export_json(
self,
path: Path,
defines: GameDefines,
config: SimulationConfig,
csv_path: Path | None = None,
) -> None:
"""Write JSON metadata to file.
Args:
path: Output path for JSON file
defines: GameDefines with fundamental parameters
config: SimulationConfig with run settings
csv_path: Optional path to associated CSV time-series file
"""
data = self.to_json(defines, config, csv_path)
path.write_text(json.dumps(data, indent=2, default=str))
def _record_snapshot(self, state: WorldState) -> TickMetrics:
"""Extract metrics from WorldState and create TickMetrics."""
# Extract entity metrics
entity_slots: dict[str, EntityMetrics | None] = {}
for entity_id, slot_name in ENTITY_SLOTS.items():
entity = state.entities.get(entity_id)
if entity is not None:
entity_slots[slot_name] = self._extract_entity_metrics(entity)
else:
entity_slots[slot_name] = None
# Extract edge metrics
edge_metrics = self._extract_edge_metrics(state)
# Extract global metrics
imperial_rent_pool = 0.0
if state.economy is not None:
imperial_rent_pool = float(state.economy.imperial_rent_pool)
global_tension = self._compute_global_tension(state)
# Extract economy drivers
current_super_wage_rate = 0.20
current_repression_level = 0.5
pool_ratio = 1.0
if state.economy is not None:
current_super_wage_rate = float(state.economy.current_super_wage_rate)
current_repression_level = float(state.economy.current_repression_level)
pool_ratio = min(float(state.economy.imperial_rent_pool) / 100.0, 1.0)
# Calculate differentials
consciousness_gap = 0.0
wealth_gap = 0.0
p_w = entity_slots.get("p_w")
c_w = entity_slots.get("c_w")
c_b = entity_slots.get("c_b")
if p_w is not None and c_w is not None:
consciousness_gap = float(p_w.consciousness) - float(c_w.consciousness)
if c_b is not None and p_w is not None:
wealth_gap = float(c_b.wealth) - float(p_w.wealth)
# Ecological Metrics (Slice 1.4)
overshoot_ratio = state.overshoot_ratio if hasattr(state, "overshoot_ratio") else 0.0
total_biocapacity = (
float(state.total_biocapacity) if hasattr(state, "total_biocapacity") else 0.0
)
total_consumption = (
float(state.total_consumption) if hasattr(state, "total_consumption") else 0.0
)
return TickMetrics(
tick=state.tick,
p_w=entity_slots.get("p_w"),
p_c=entity_slots.get("p_c"),
c_b=entity_slots.get("c_b"),
c_w=entity_slots.get("c_w"),
edges=edge_metrics,
imperial_rent_pool=imperial_rent_pool,
global_tension=global_tension,
current_super_wage_rate=current_super_wage_rate,
current_repression_level=current_repression_level,
pool_ratio=pool_ratio,
consciousness_gap=consciousness_gap,
wealth_gap=wealth_gap,
overshoot_ratio=overshoot_ratio,
total_biocapacity=total_biocapacity,
total_consumption=total_consumption,
)
def _extract_entity_metrics(self, entity: SocialClass) -> EntityMetrics:
"""Extract EntityMetrics from a SocialClass entity."""
return EntityMetrics(
wealth=float(entity.wealth),
consciousness=float(entity.ideology.class_consciousness),
national_identity=float(entity.ideology.national_identity),
agitation=float(entity.ideology.agitation),
p_acquiescence=float(entity.p_acquiescence),
p_revolution=float(entity.p_revolution),
organization=float(entity.organization),
)
def _extract_edge_metrics(self, state: WorldState) -> EdgeMetrics:
"""Extract EdgeMetrics from WorldState relationships with aggregation."""
exploitation_tensions: list[float] = []
exploitation_rents: list[float] = []
tribute_flows: list[float] = []
wages_paid_list: list[float] = []
solidarity_strengths: list[float] = []
for rel in state.relationships:
if rel.edge_type == EdgeType.EXPLOITATION:
exploitation_tensions.append(float(rel.tension))
exploitation_rents.append(float(rel.value_flow))
elif rel.edge_type == EdgeType.TRIBUTE:
tribute_flows.append(float(rel.value_flow))
elif rel.edge_type == EdgeType.WAGES:
wages_paid_list.append(float(rel.value_flow))
elif rel.edge_type == EdgeType.SOLIDARITY:
solidarity_strengths.append(float(rel.solidarity_strength))
return EdgeMetrics(
exploitation_tension=max(exploitation_tensions, default=0.0),
exploitation_rent=sum(exploitation_rents),
tribute_flow=sum(tribute_flows),
wages_paid=sum(wages_paid_list),
solidarity_strength=max(solidarity_strengths, default=0.0),
)
def _compute_global_tension(self, state: WorldState) -> float:
"""Compute average tension across EXPLOITATION relationships only."""
exploitation_rels = [
rel for rel in state.relationships if rel.edge_type == EdgeType.EXPLOITATION
]
if not exploitation_rels:
return 0.0
total_tension = sum(float(rel.tension) for rel in exploitation_rels)
return total_tension / len(exploitation_rels)
def _compute_summary(self) -> SweepSummary:
"""Compute SweepSummary from history."""
history_list = list(self._history)
if not history_list:
return SweepSummary(
ticks_survived=0,
outcome="ERROR",
final_p_w_wealth=0.0,
final_p_c_wealth=0.0,
final_c_b_wealth=0.0,
final_c_w_wealth=0.0,
max_tension=0.0,
crossover_tick=None,
cumulative_rent=0.0,
peak_p_w_consciousness=0.0,
peak_c_w_consciousness=0.0,
)
last_tick = history_list[-1]
ticks_survived = len(history_list)
# Determine outcome
p_w_wealth = last_tick.p_w.wealth if last_tick.p_w is not None else 0.0
outcome: Literal["SURVIVED", "DIED", "ERROR"] = (
"DIED" if p_w_wealth <= DEATH_THRESHOLD else "SURVIVED"
)
# Final wealth values
final_p_w_wealth = last_tick.p_w.wealth if last_tick.p_w is not None else 0.0
final_p_c_wealth = last_tick.p_c.wealth if last_tick.p_c is not None else 0.0
final_c_b_wealth = last_tick.c_b.wealth if last_tick.c_b is not None else 0.0
final_c_w_wealth = last_tick.c_w.wealth if last_tick.c_w is not None else 0.0
# Max tension
max_tension = max(tick.edges.exploitation_tension for tick in history_list)
# Crossover detection (first tick where P(S|R) > P(S|A))
crossover_tick: int | None = None
for tick in history_list:
if tick.p_w is not None and tick.p_w.p_revolution > tick.p_w.p_acquiescence:
crossover_tick = tick.tick
break
# Cumulative rent
cumulative_rent = sum(tick.edges.exploitation_rent for tick in history_list)
# Peak consciousness
peak_p_w_consciousness = max(
(tick.p_w.consciousness for tick in history_list if tick.p_w is not None),
default=0.0,
)
peak_c_w_consciousness = max(
(tick.c_w.consciousness for tick in history_list if tick.c_w is not None),
default=0.0,
)
return SweepSummary(
ticks_survived=ticks_survived,
outcome=outcome,
final_p_w_wealth=final_p_w_wealth,
final_p_c_wealth=final_p_c_wealth,
final_c_b_wealth=final_c_b_wealth,
final_c_w_wealth=final_c_w_wealth,
max_tension=max_tension,
crossover_tick=crossover_tick,
cumulative_rent=cumulative_rent,
peak_p_w_consciousness=peak_p_w_consciousness,
peak_c_w_consciousness=peak_c_w_consciousness,
)