babylon.engine.observers

Observer implementations for simulation state monitoring.

This package contains concrete SimulationObserver implementations that monitor specific aspects of the simulation state:

  • EconomyMonitor: Detects sudden drops in imperial_rent_pool (>20%)

  • CausalChainObserver: Detects Shock Doctrine pattern (Crash -> Austerity -> Radicalization)

  • EndgameDetector: Detects game ending conditions (Slice 1.6)

  • PersistenceObserver: Persists state via RuntimePersistence protocol (Feature 037)

  • SessionRecorder: Persists tick-by-tick state via RuntimePersistence protocol (Feature 037)

Observers follow the Observer Pattern: they receive state change notifications but cannot modify simulation state. This separation allows AI components to generate narrative from state changes without affecting the deterministic mechanics.

Schema validation is provided for observer JSON outputs:

  • validate_narrative_frame: Validate NarrativeFrame against JSON schema

  • is_valid_narrative_frame: Boolean check for NarrativeFrame validity

class babylon.engine.observers.CausalChainObserver(logger=None)[source]

Bases: object

Observer detecting the Shock Doctrine causal chain pattern.

Implements SimulationObserver protocol to receive state change notifications and analyze for the Crash -> Austerity -> Radicalization pattern that emerges from economic crises.

The pattern is detected when: - Tick N: Pool drops >= 20% (ECONOMIC_SHOCK) - Tick N+1 or later: Wage decreases (AUSTERITY_RESPONSE) - Tick N+2 or later: P(Revolution) increases (RADICALIZATION)

When detected, outputs a JSON NarrativeFrame with [NARRATIVE_JSON] prefix at WARNING level for AI narrative generation.

Parameters:

logger (logging.Logger | None)

CRASH_THRESHOLD

Class constant defining crash trigger (-0.20 = 20% drop).

BUFFER_SIZE

Size of the rolling history buffer (5 ticks).

name

Observer identifier (“CausalChainObserver”).

Example

>>> from babylon.engine.observers.causal import CausalChainObserver
>>> observer = CausalChainObserver()
>>> observer.name
'CausalChainObserver'
BUFFER_SIZE: int = 5

Size of the rolling history buffer for pattern detection.

CRASH_THRESHOLD: float = -0.2

Percentage drop threshold that triggers economic shock detection (-20%).

__init__(logger=None)[source]

Initialize CausalChainObserver.

Parameters:

logger (Logger | None) – Logger instance for narrative JSON output. Defaults to module-level logger if not provided.

Return type:

None

property name: str

Return observer identifier.

Returns:

String “CausalChainObserver” for logging and debugging.

on_simulation_end(final_state)[source]

Called when simulation ends.

No-op for CausalChainObserver. No cleanup or summary needed.

Parameters:

final_state (WorldState) – Final WorldState when simulation ends (unused).

Return type:

None

on_simulation_start(initial_state, config)[source]

Called when simulation begins.

Clears the history buffer and records the initial state as baseline.

Parameters:
Return type:

None

on_tick(previous_state, new_state)[source]

Called after each tick completes with both states for delta analysis.

Records the new state and checks for the Shock Doctrine pattern. If detected, outputs a JSON NarrativeFrame.

Parameters:
  • previous_state (WorldState) – WorldState before the tick (unused, history is internal).

  • new_state (WorldState) – WorldState after the tick.

Return type:

None

class babylon.engine.observers.EconomyMonitor(logger=None)[source]

Bases: object

Observer detecting economic crises via imperial_rent_pool drops.

Implements SimulationObserver protocol to receive state change notifications and analyze economic state for crisis conditions.

A crisis is detected when the imperial_rent_pool drops by 20% or more from the previous tick. The [CRISIS_DETECTED] log marker allows AI narrative systems to respond appropriately.

Parameters:

logger (logging.Logger | None)

CRISIS_THRESHOLD

Class constant defining crisis trigger (-0.20 = 20% drop).

name

Observer identifier (“EconomyMonitor”).

Example

>>> from babylon.engine.observers.economic import EconomyMonitor
>>> monitor = EconomyMonitor()
>>> monitor.name
'EconomyMonitor'
CRISIS_THRESHOLD: float = -0.2

Percentage drop threshold that triggers crisis detection (-20%).

__init__(logger=None)[source]

Initialize EconomyMonitor.

Parameters:

logger (Logger | None) – Logger instance for crisis warnings. Defaults to module-level logger if not provided.

Return type:

None

property name: str

Return observer identifier.

Returns:

String “EconomyMonitor” for logging and debugging.

on_simulation_end(final_state)[source]

Called when simulation ends.

No-op for EconomyMonitor. No cleanup or summary needed.

Parameters:

final_state (WorldState) – Final WorldState when simulation ends (unused).

Return type:

None

on_simulation_start(initial_state, config)[source]

Called when simulation begins.

No-op for EconomyMonitor. Crisis detection only operates on state transitions, not initial state.

Parameters:
  • initial_state (WorldState) – WorldState at tick 0 (unused).

  • config (SimulationConfig) – SimulationConfig for this run (unused).

Return type:

None

on_tick(previous_state, new_state)[source]

Called after each tick completes with both states for delta analysis.

Compares imperial_rent_pool between states and logs a warning if the drop exceeds CRISIS_THRESHOLD (20%).

Parameters:
  • previous_state (WorldState) – WorldState before the tick.

  • new_state (WorldState) – WorldState after the tick.

Return type:

None

class babylon.engine.observers.EndgameDetector(logger=None, defines=None)[source]

Bases: object

Observer detecting game ending conditions.

Implements SimulationObserver protocol to receive state change notifications and check for endgame conditions after each tick.

The detector maintains: - Current outcome (IN_PROGRESS until game ends) - Consecutive overshoot tick counter (for ecological collapse) - Pending events list (for ENDGAME_REACHED event emission) - Configurable thresholds via GameDefines.endgame

Parameters:
name

Observer identifier (“EndgameDetector”).

outcome

Current GameOutcome (starts as IN_PROGRESS).

is_game_over

Boolean indicating if game has ended.

defines

EndgameDefines containing threshold configuration.

Example

>>> from babylon.engine.observers.endgame_detector import EndgameDetector
>>> detector = EndgameDetector()
>>> detector.outcome
<GameOutcome.IN_PROGRESS: 'in_progress'>
>>> detector.is_game_over
False
>>> # Custom thresholds via GameDefines
>>> from babylon.config.defines import GameDefines, EndgameDefines
>>> custom = GameDefines(endgame=EndgameDefines(
...     revolutionary_percolation_threshold=0.5,
...     fascist_majority_threshold=5,
... ))
>>> detector = EndgameDetector(defines=custom)
>>> detector.defines.revolutionary_percolation_threshold
0.5
__init__(logger=None, defines=None)[source]

Initialize EndgameDetector.

Parameters:
  • logger (Logger | None) – Logger instance for endgame notifications. Defaults to module-level logger if not provided.

  • defines (GameDefines | None) – GameDefines containing endgame thresholds. Defaults to GameDefines() with standard thresholds.

Return type:

None

property defines: EndgameDefines

Return the endgame threshold configuration.

Returns:

EndgameDefines containing all threshold values.

get_pending_events()[source]

Return and clear pending events for collection by Simulation facade.

Observer events cannot be emitted directly to WorldState because observers run AFTER WorldState is frozen. Instead, pending events are collected by the Simulation facade and injected into the NEXT tick’s WorldState.

Return type:

list[EndgameEvent]

Returns:

List of pending SimulationEvent objects (cleared after return).

property is_game_over: bool

Return True if game has ended.

Returns:

Boolean indicating if outcome is not IN_PROGRESS.

property name: str

Return observer identifier.

Returns:

String “EndgameDetector” for logging and debugging.

on_simulation_end(final_state)[source]

Called when simulation ends.

No-op for EndgameDetector. Endgame state is already determined.

Parameters:

final_state (WorldState) – Final WorldState when simulation ends (unused).

Return type:

None

on_simulation_start(initial_state, config)[source]

Called when simulation begins.

Resets detector to initial state, allowing reuse across multiple simulation runs.

Parameters:
Return type:

None

on_tick(previous_state, new_state)[source]

Called after each tick completes with both states for delta analysis.

Checks for endgame conditions in priority order: 1. Revolutionary victory (highest priority - the people won) 2. Ecological collapse 3. Fascist consolidation (lowest priority)

If game has already ended, this is a no-op.

Parameters:
  • previous_state (WorldState) – WorldState before the tick (unused).

  • new_state (WorldState) – WorldState after the tick.

Return type:

None

property outcome: GameOutcome

Return current game outcome.

Returns:

GameOutcome enum value (IN_PROGRESS, REVOLUTIONARY_VICTORY, ECOLOGICAL_COLLAPSE, or FASCIST_CONSOLIDATION).

class babylon.engine.observers.PersistenceObserver(persistence, session_id, tracer=None)[source]

Bases: object

Observer that persists simulation state after each tick.

Implements SimulationObserver protocol. No changes to Simulation facade or SimulationEngine required.

Parameters:
  • persistence (RuntimePersistence)

  • session_id (UUID)

  • tracer (TraceCollector | None)

_persistence

Backend implementing RuntimePersistence.

_session_id

Session UUID for data scoping.

_tracer

Optional trace collector.

__init__(persistence, session_id, tracer=None)[source]

Initialize the persistence observer.

Parameters:
  • persistence (RuntimePersistence) – Backend implementing RuntimePersistence.

  • session_id (UUID) – Session UUID for data scoping.

  • tracer (TraceCollector | None) – Optional trace collector for execution tracing.

Return type:

None

property name: str

Observer identifier.

on_simulation_end(final_state)[source]

Called when simulation ends. Finalizes metadata.

Parameters:

final_state (WorldState) – The final WorldState.

Return type:

None

on_simulation_start(initial_state, config)[source]

Called when simulation begins. Persists initial state and config.

Parameters:
Return type:

None

on_tick(previous_state, new_state)[source]

Called after each tick. Persists new state.

Parameters:
  • previous_state (WorldState) – WorldState before the tick (unused).

  • new_state (WorldState) – WorldState after the tick.

Return type:

None

class babylon.engine.observers.SessionRecorder(persistence, session_id, tracer=None)[source]

Bases: object

Observer that persists simulation state via RuntimePersistence protocol.

Implements SimulationObserver protocol (ADR030/032/033). Records tick-by-tick state snapshots to the configured RuntimePersistence backend, enabling:

  • Replay from any point using stored state

  • Temporal queries (“state at tick 500”, “diffs between ticks”)

  • Debugging via rich mutation logs

  • Post-hoc analysis without re-running simulation

When the backend implements PostgresRuntimeExtensions, extended subsystem state (graph metadata, community state, etc.) is also persisted.

Parameters:
  • persistence (RuntimePersistence)

  • session_id (UUID)

  • tracer (TraceCollector | None)

_persistence

Backend implementing RuntimePersistence.

_session_id

Session UUID for data scoping.

_tracer

Optional trace collector for execution tracing.

_started

Whether simulation has started (for lifecycle validation).

__init__(persistence, session_id, tracer=None)[source]

Initialize the session recorder.

Parameters:
  • persistence (RuntimePersistence) – Backend implementing RuntimePersistence protocol.

  • session_id (UUID) – Session UUID for data scoping.

  • tracer (TraceCollector | None) – Optional trace collector for execution tracing.

Return type:

None

property name: str

Return observer identifier.

on_simulation_end(final_state)[source]

Called when simulation ends. Finalizes recording metadata.

Parameters:

final_state (WorldState) – The final WorldState when simulation ends.

Return type:

None

on_simulation_start(initial_state, config)[source]

Called when simulation begins. Initializes recording metadata.

Parameters:
Return type:

None

on_tick(previous_state, new_state)[source]

Called after each tick completes. Records new state to database.

Parameters:
  • previous_state (WorldState) – WorldState before the tick (unused, for delta analysis).

  • new_state (WorldState) – WorldState after the tick.

Return type:

None

class babylon.engine.observers.TickStateRecorder(mode='interactive', rolling_window=50)[source]

Bases: object

Observer that records simulation state at each tick for analysis.

Implements SimulationObserver protocol. Extracts entity and edge metrics at each tick, with optional rolling window for memory efficiency in interactive mode.

Renamed from MetricsCollector to avoid namespace collision with babylon.metrics.collector.MetricsCollector (RAG telemetry).

Parameters:
  • mode (Literal['interactive', 'batch'])

  • rolling_window (int)

__init__(mode='interactive', rolling_window=50)[source]

Initialize the collector.

Parameters:
  • mode (Literal['interactive', 'batch']) – “interactive” uses rolling window, “batch” keeps all history

  • rolling_window (int) – Maximum ticks to keep in interactive mode

Return type:

None

export_json(path, defines, config, csv_path=None)[source]

Write JSON metadata to file.

Parameters:
  • path (Path) – Output path for JSON file

  • defines (GameDefines) – GameDefines with fundamental parameters

  • config (SimulationConfig) – SimulationConfig with run settings

  • csv_path (Path | None) – Optional path to associated CSV time-series file

Return type:

None

property history: list[TickMetrics]

Return metrics history as a list.

property latest: TickMetrics | None

Return most recent tick metrics, or None if empty.

property name: str

Return observer identifier.

on_simulation_end(final_state)[source]

Called when simulation ends. No-op for MetricsCollector.

Return type:

None

Parameters:

final_state (WorldState)

on_simulation_start(initial_state, config)[source]

Called when simulation begins. Clears history and records tick 0.

Return type:

None

Parameters:
on_tick(previous_state, new_state)[source]

Called after each tick completes. Records new state.

Return type:

None

Parameters:
property summary: SweepSummary | None

Return sweep summary, or None if no data collected.

to_csv_rows()[source]

Export metrics history as list of dicts for CSV output.

Return type:

list[dict[str, Any]]

to_json(defines, config, csv_path=None)[source]

Export run metadata as structured JSON for reproducibility.

Captures the causal DAG hierarchy: - Level 1 (Fundamental): GameDefines parameters - Level 2 (Config): SimulationConfig settings - Level 3 (Emergent): SweepSummary computed from simulation

Parameters:
  • defines (GameDefines) – GameDefines with fundamental parameters

  • config (SimulationConfig) – SimulationConfig with run settings

  • csv_path (Path | None) – Optional path to associated CSV time-series file

Return type:

dict[str, Any]

Returns:

Structured dict ready for JSON serialization

babylon.engine.observers.is_valid_narrative_frame(frame)[source]

Check if a NarrativeFrame is valid against the JSON schema.

Convenience method that returns a boolean instead of error list.

Parameters:

frame (dict[str, Any]) – Dictionary representing the NarrativeFrame to validate.

Return type:

bool

Returns:

True if valid, False otherwise.

Example

>>> frame = {
...     "pattern": "SHOCK_DOCTRINE",
...     "causal_graph": {
...         "nodes": [{"id": "n1", "type": "ECONOMIC_SHOCK", "tick": 0}],
...         "edges": []
...     }
... }
>>> is_valid_narrative_frame(frame)
True
babylon.engine.observers.validate_narrative_frame(frame)[source]

Validate a NarrativeFrame against the JSON schema.

Parameters:

frame (dict[str, Any]) – Dictionary representing the NarrativeFrame to validate.

Return type:

list[str]

Returns:

List of validation error messages. Empty list if valid.

Example

>>> frame = {"pattern": "TEST", "causal_graph": {"nodes": [], "edges": []}}
>>> errors = validate_narrative_frame(frame)
>>> # Returns errors because nodes must have minItems: 1

Modules

causal

CausalChainObserver for detecting the "Shock Doctrine" pattern (Sprint 3.2).

economic

EconomyMonitor observer for economic crisis detection (Sprint 3.1).

endgame_detector

EndgameDetector observer for game ending detection (Slice 1.6).

metrics

TickStateRecorder observer for unified simulation metrics.

persistence_observer

Persistence observer for automatic state persistence after tick (Feature 037).

schema_validator

JSON Schema validation for observer outputs (Sprint 3.2).

session_recorder

SessionRecorder observer for persistent simulation state recording.