babylon.engine.observers
Observer implementations for simulation state monitoring.
This package contains concrete SimulationObserver implementations that monitor specific aspects of the simulation state:
EconomyMonitor: Detects sudden drops in imperial_rent_pool (>20%)
CausalChainObserver: Detects Shock Doctrine pattern (Crash -> Austerity -> Radicalization)
EndgameDetector: Detects game ending conditions (Slice 1.6)
PersistenceObserver: Persists state via RuntimePersistence protocol (Feature 037)
SessionRecorder: Persists tick-by-tick state via RuntimePersistence protocol (Feature 037)
Observers follow the Observer Pattern: they receive state change notifications but cannot modify simulation state. This separation allows AI components to generate narrative from state changes without affecting the deterministic mechanics.
Schema validation is provided for observer JSON outputs:
validate_narrative_frame: Validate NarrativeFrame against JSON schema
is_valid_narrative_frame: Boolean check for NarrativeFrame validity
- class babylon.engine.observers.CausalChainObserver(logger=None)[source]
Bases:
objectObserver detecting the Shock Doctrine causal chain pattern.
Implements SimulationObserver protocol to receive state change notifications and analyze for the Crash -> Austerity -> Radicalization pattern that emerges from economic crises.
The pattern is detected when: - Tick N: Pool drops >= 20% (ECONOMIC_SHOCK) - Tick N+1 or later: Wage decreases (AUSTERITY_RESPONSE) - Tick N+2 or later: P(Revolution) increases (RADICALIZATION)
When detected, outputs a JSON NarrativeFrame with [NARRATIVE_JSON] prefix at WARNING level for AI narrative generation.
- Parameters:
logger (logging.Logger | None)
- CRASH_THRESHOLD
Class constant defining crash trigger (-0.20 = 20% drop).
- BUFFER_SIZE
Size of the rolling history buffer (5 ticks).
- name
Observer identifier (“CausalChainObserver”).
Example
>>> from babylon.engine.observers.causal import CausalChainObserver >>> observer = CausalChainObserver() >>> observer.name 'CausalChainObserver'
-
CRASH_THRESHOLD:
float= -0.2 Percentage drop threshold that triggers economic shock detection (-20%).
- property name: str
Return observer identifier.
- Returns:
String “CausalChainObserver” for logging and debugging.
- on_simulation_end(final_state)[source]
Called when simulation ends.
No-op for CausalChainObserver. No cleanup or summary needed.
- Parameters:
final_state (
WorldState) – Final WorldState when simulation ends (unused).- Return type:
- on_simulation_start(initial_state, config)[source]
Called when simulation begins.
Clears the history buffer and records the initial state as baseline.
- Parameters:
initial_state (
WorldState) – WorldState at tick 0.config (
SimulationConfig) – SimulationConfig for this run (unused).
- Return type:
- on_tick(previous_state, new_state)[source]
Called after each tick completes with both states for delta analysis.
Records the new state and checks for the Shock Doctrine pattern. If detected, outputs a JSON NarrativeFrame.
- Parameters:
previous_state (
WorldState) – WorldState before the tick (unused, history is internal).new_state (
WorldState) – WorldState after the tick.
- Return type:
- class babylon.engine.observers.EconomyMonitor(logger=None)[source]
Bases:
objectObserver detecting economic crises via imperial_rent_pool drops.
Implements SimulationObserver protocol to receive state change notifications and analyze economic state for crisis conditions.
A crisis is detected when the imperial_rent_pool drops by 20% or more from the previous tick. The [CRISIS_DETECTED] log marker allows AI narrative systems to respond appropriately.
- Parameters:
logger (logging.Logger | None)
- CRISIS_THRESHOLD
Class constant defining crisis trigger (-0.20 = 20% drop).
- name
Observer identifier (“EconomyMonitor”).
Example
>>> from babylon.engine.observers.economic import EconomyMonitor >>> monitor = EconomyMonitor() >>> monitor.name 'EconomyMonitor'
- property name: str
Return observer identifier.
- Returns:
String “EconomyMonitor” for logging and debugging.
- on_simulation_end(final_state)[source]
Called when simulation ends.
No-op for EconomyMonitor. No cleanup or summary needed.
- Parameters:
final_state (
WorldState) – Final WorldState when simulation ends (unused).- Return type:
- on_simulation_start(initial_state, config)[source]
Called when simulation begins.
No-op for EconomyMonitor. Crisis detection only operates on state transitions, not initial state.
- Parameters:
initial_state (
WorldState) – WorldState at tick 0 (unused).config (
SimulationConfig) – SimulationConfig for this run (unused).
- Return type:
- on_tick(previous_state, new_state)[source]
Called after each tick completes with both states for delta analysis.
Compares imperial_rent_pool between states and logs a warning if the drop exceeds CRISIS_THRESHOLD (20%).
- Parameters:
previous_state (
WorldState) – WorldState before the tick.new_state (
WorldState) – WorldState after the tick.
- Return type:
- class babylon.engine.observers.EndgameDetector(logger=None, defines=None)[source]
Bases:
objectObserver detecting game ending conditions.
Implements SimulationObserver protocol to receive state change notifications and check for endgame conditions after each tick.
The detector maintains: - Current outcome (IN_PROGRESS until game ends) - Consecutive overshoot tick counter (for ecological collapse) - Pending events list (for ENDGAME_REACHED event emission) - Configurable thresholds via GameDefines.endgame
- Parameters:
logger (logging.Logger | None)
defines (GameDefines | None)
- name
Observer identifier (“EndgameDetector”).
- outcome
Current GameOutcome (starts as IN_PROGRESS).
- is_game_over
Boolean indicating if game has ended.
- defines
EndgameDefines containing threshold configuration.
Example
>>> from babylon.engine.observers.endgame_detector import EndgameDetector >>> detector = EndgameDetector() >>> detector.outcome <GameOutcome.IN_PROGRESS: 'in_progress'> >>> detector.is_game_over False
>>> # Custom thresholds via GameDefines >>> from babylon.config.defines import GameDefines, EndgameDefines >>> custom = GameDefines(endgame=EndgameDefines( ... revolutionary_percolation_threshold=0.5, ... fascist_majority_threshold=5, ... )) >>> detector = EndgameDetector(defines=custom) >>> detector.defines.revolutionary_percolation_threshold 0.5
- __init__(logger=None, defines=None)[source]
Initialize EndgameDetector.
- Parameters:
logger (
Logger|None) – Logger instance for endgame notifications. Defaults to module-level logger if not provided.defines (
GameDefines|None) – GameDefines containing endgame thresholds. Defaults to GameDefines() with standard thresholds.
- Return type:
None
- property defines: EndgameDefines
Return the endgame threshold configuration.
- Returns:
EndgameDefines containing all threshold values.
- get_pending_events()[source]
Return and clear pending events for collection by Simulation facade.
Observer events cannot be emitted directly to WorldState because observers run AFTER WorldState is frozen. Instead, pending events are collected by the Simulation facade and injected into the NEXT tick’s WorldState.
- Return type:
- Returns:
List of pending SimulationEvent objects (cleared after return).
- property is_game_over: bool
Return True if game has ended.
- Returns:
Boolean indicating if outcome is not IN_PROGRESS.
- property name: str
Return observer identifier.
- Returns:
String “EndgameDetector” for logging and debugging.
- on_simulation_end(final_state)[source]
Called when simulation ends.
No-op for EndgameDetector. Endgame state is already determined.
- Parameters:
final_state (
WorldState) – Final WorldState when simulation ends (unused).- Return type:
- on_simulation_start(initial_state, config)[source]
Called when simulation begins.
Resets detector to initial state, allowing reuse across multiple simulation runs.
- Parameters:
initial_state (
WorldState) – WorldState at tick 0.config (
SimulationConfig) – SimulationConfig for this run (unused).
- Return type:
- on_tick(previous_state, new_state)[source]
Called after each tick completes with both states for delta analysis.
Checks for endgame conditions in priority order: 1. Revolutionary victory (highest priority - the people won) 2. Ecological collapse 3. Fascist consolidation (lowest priority)
If game has already ended, this is a no-op.
- Parameters:
previous_state (
WorldState) – WorldState before the tick (unused).new_state (
WorldState) – WorldState after the tick.
- Return type:
- property outcome: GameOutcome
Return current game outcome.
- Returns:
GameOutcome enum value (IN_PROGRESS, REVOLUTIONARY_VICTORY, ECOLOGICAL_COLLAPSE, or FASCIST_CONSOLIDATION).
- class babylon.engine.observers.PersistenceObserver(persistence, session_id, tracer=None)[source]
Bases:
objectObserver that persists simulation state after each tick.
Implements
SimulationObserverprotocol. No changes toSimulationfacade orSimulationEnginerequired.- Parameters:
persistence (RuntimePersistence)
session_id (UUID)
tracer (TraceCollector | None)
- _persistence
Backend implementing RuntimePersistence.
- _session_id
Session UUID for data scoping.
- _tracer
Optional trace collector.
- on_simulation_end(final_state)[source]
Called when simulation ends. Finalizes metadata.
- Parameters:
final_state (
WorldState) – The final WorldState.- Return type:
- on_simulation_start(initial_state, config)[source]
Called when simulation begins. Persists initial state and config.
- Parameters:
initial_state (
WorldState) – The WorldState at tick 0.config (
SimulationConfig) – The SimulationConfig for this run.
- Return type:
- on_tick(previous_state, new_state)[source]
Called after each tick. Persists new state.
- Parameters:
previous_state (
WorldState) – WorldState before the tick (unused).new_state (
WorldState) – WorldState after the tick.
- Return type:
- class babylon.engine.observers.SessionRecorder(persistence, session_id, tracer=None)[source]
Bases:
objectObserver that persists simulation state via RuntimePersistence protocol.
Implements SimulationObserver protocol (ADR030/032/033). Records tick-by-tick state snapshots to the configured
RuntimePersistencebackend, enabling:Replay from any point using stored state
Temporal queries (“state at tick 500”, “diffs between ticks”)
Debugging via rich mutation logs
Post-hoc analysis without re-running simulation
When the backend implements
PostgresRuntimeExtensions, extended subsystem state (graph metadata, community state, etc.) is also persisted.- Parameters:
persistence (RuntimePersistence)
session_id (UUID)
tracer (TraceCollector | None)
- _persistence
Backend implementing RuntimePersistence.
- _session_id
Session UUID for data scoping.
- _tracer
Optional trace collector for execution tracing.
- _started
Whether simulation has started (for lifecycle validation).
- on_simulation_end(final_state)[source]
Called when simulation ends. Finalizes recording metadata.
- Parameters:
final_state (
WorldState) – The final WorldState when simulation ends.- Return type:
- on_simulation_start(initial_state, config)[source]
Called when simulation begins. Initializes recording metadata.
- Parameters:
initial_state (
WorldState) – The WorldState at tick 0.config (
SimulationConfig) – The SimulationConfig for this run.
- Return type:
- on_tick(previous_state, new_state)[source]
Called after each tick completes. Records new state to database.
- Parameters:
previous_state (
WorldState) – WorldState before the tick (unused, for delta analysis).new_state (
WorldState) – WorldState after the tick.
- Return type:
- class babylon.engine.observers.TickStateRecorder(mode='interactive', rolling_window=50)[source]
Bases:
objectObserver that records simulation state at each tick for analysis.
Implements SimulationObserver protocol. Extracts entity and edge metrics at each tick, with optional rolling window for memory efficiency in interactive mode.
Renamed from MetricsCollector to avoid namespace collision with babylon.metrics.collector.MetricsCollector (RAG telemetry).
- Parameters:
mode (Literal['interactive', 'batch'])
rolling_window (int)
- export_json(path, defines, config, csv_path=None)[source]
Write JSON metadata to file.
- Parameters:
path (
Path) – Output path for JSON filedefines (
GameDefines) – GameDefines with fundamental parametersconfig (
SimulationConfig) – SimulationConfig with run settingscsv_path (
Path|None) – Optional path to associated CSV time-series file
- Return type:
- property history: list[TickMetrics]
Return metrics history as a list.
- property latest: TickMetrics | None
Return most recent tick metrics, or None if empty.
- on_simulation_end(final_state)[source]
Called when simulation ends. No-op for MetricsCollector.
- Return type:
- Parameters:
final_state (WorldState)
- on_simulation_start(initial_state, config)[source]
Called when simulation begins. Clears history and records tick 0.
- Return type:
- Parameters:
initial_state (WorldState)
config (SimulationConfig)
- on_tick(previous_state, new_state)[source]
Called after each tick completes. Records new state.
- Return type:
- Parameters:
previous_state (WorldState)
new_state (WorldState)
- property summary: SweepSummary | None
Return sweep summary, or None if no data collected.
- to_json(defines, config, csv_path=None)[source]
Export run metadata as structured JSON for reproducibility.
Captures the causal DAG hierarchy: - Level 1 (Fundamental): GameDefines parameters - Level 2 (Config): SimulationConfig settings - Level 3 (Emergent): SweepSummary computed from simulation
- Parameters:
defines (
GameDefines) – GameDefines with fundamental parametersconfig (
SimulationConfig) – SimulationConfig with run settingscsv_path (
Path|None) – Optional path to associated CSV time-series file
- Return type:
- Returns:
Structured dict ready for JSON serialization
- babylon.engine.observers.is_valid_narrative_frame(frame)[source]
Check if a NarrativeFrame is valid against the JSON schema.
Convenience method that returns a boolean instead of error list.
- Parameters:
frame (
dict[str,Any]) – Dictionary representing the NarrativeFrame to validate.- Return type:
- Returns:
True if valid, False otherwise.
Example
>>> frame = { ... "pattern": "SHOCK_DOCTRINE", ... "causal_graph": { ... "nodes": [{"id": "n1", "type": "ECONOMIC_SHOCK", "tick": 0}], ... "edges": [] ... } ... } >>> is_valid_narrative_frame(frame) True
- babylon.engine.observers.validate_narrative_frame(frame)[source]
Validate a NarrativeFrame against the JSON schema.
- Parameters:
frame (
dict[str,Any]) – Dictionary representing the NarrativeFrame to validate.- Return type:
- Returns:
List of validation error messages. Empty list if valid.
Example
>>> frame = {"pattern": "TEST", "causal_graph": {"nodes": [], "edges": []}} >>> errors = validate_narrative_frame(frame) >>> # Returns errors because nodes must have minItems: 1
Modules
CausalChainObserver for detecting the "Shock Doctrine" pattern (Sprint 3.2). |
|
EconomyMonitor observer for economic crisis detection (Sprint 3.1). |
|
EndgameDetector observer for game ending detection (Slice 1.6). |
|
TickStateRecorder observer for unified simulation metrics. |
|
Persistence observer for automatic state persistence after tick (Feature 037). |
|
JSON Schema validation for observer outputs (Sprint 3.2). |
|
SessionRecorder observer for persistent simulation state recording. |