babylon.engine

Simulation engine for the Babylon game loop.

This package contains the core game loop logic: - simulation_engine: The step() function for state transformation - simulation: Simulation facade class for multi-tick runs with history - scenarios: Factory functions for creating initial states - factories: Entity factory functions (create_proletariat, create_bourgeoisie) - history_formatter: Narrative generation from simulation history - Dependency injection: ServiceContainer, EventBus, FormulaRegistry

Phase 2.1: Refactored to modular System architecture. Sprint 3: Central Committee (Dependency Injection) Sprint 9: Integration proof with Simulation facade

babylon.engine.step(state, config, persistent_context=None, defines=None, calculator_overrides=None)[source]

Advance simulation by one tick using the modular engine.

This is the heart of Phase 2. It transforms a WorldState through one tick of simulated time by applying the MLM-TW formulas.

Parameters:
  • state (WorldState) – Current world state (immutable)

  • config (SimulationConfig) – Simulation configuration with formula coefficients

  • persistent_context (dict[str, Any] | None) – Optional context dict that persists across ticks. Used by systems that need to track state between ticks (e.g., ConsciousnessSystem’s previous_wages for bifurcation mechanic).

  • defines (GameDefines | None) – Optional custom GameDefines. If None, loads from default defines.yaml location. Use this for scenario-specific calibration.

  • calculator_overrides (dict[str, Any] | None) – Optional dict of calculator instances to inject into ServiceContainer (e.g., melt_calculator, tensor_registry).

Return type:

WorldState

Returns:

New WorldState at tick + 1

Order encodes historical materialism:
  1. Economic base (value extraction)

  2. Consciousness (responds to material conditions)

  3. Survival calculus (probability updates)

  4. Contradictions (tension from all above)

  5. Event capture (log significant changes)

class babylon.engine.SimulationEngine(systems)[source]

Bases: object

Modular engine that advances the simulation by iterating through Systems.

The engine holds a list of systems and executes them in sequence. Order encodes materialist causality (ADR032): 1. Vitality (death check) 2. Territory (land state) 3. Production (value creation) 4. Solidarity (organization) 5. Imperial Rent (extraction) 6. Decomposition (LA crisis) 7. Control Ratio (terminal decision) 8. Metabolism (ecology) 9. Survival (risk assessment) 10. Struggle (agency) 11. Consciousness (ideology) 12. Contradiction (tension) 13. ContradictionField (field computation) - Feature 002 14. FieldDerivative (derivatives + principal) - Feature 002 15. EdgeTransition (predicates + state machine) - Feature 002

Parameters:

systems (list[System])

__init__(systems)[source]

Initialize the engine with a list of systems.

Parameters:

systems (list[System]) – Ordered list of systems to execute each tick. Order matters! Economic systems must run before ideology.

Return type:

None

run_tick(graph, services, context)[source]

Execute all systems in order for one tick.

All logs emitted during this method are automatically tagged with tick number and a unique correlation_id (UUID) for tracing.

Parameters:
  • graph – NetworkX graph (mutated in place by systems)

  • services – ServiceContainer with config, formulas, event_bus, database, metrics

  • context – TickContext or dict passed to all systems

Return type:

None

Spec 008: Logs within run_tick() include tick and correlation_id.

property systems: list[System]

Read-only access to registered systems.

class babylon.engine.Simulation(initial_state, config, observers=None, defines=None, tensor_registry=None, calculator_overrides=None)[source]

Bases: object

Facade class for running multi-tick simulations with history preservation.

The Simulation class provides a stateful wrapper around the pure step() function, managing: - Current WorldState - History of all previous states - Persistent ServiceContainer for dependency injection - Observer notifications for AI/narrative components (Sprint 3.1)

Example

>>> from babylon.engine.factories import create_proletariat, create_bourgeoisie
>>> from babylon.models import WorldState, SimulationConfig, Relationship, EdgeType
>>>
>>> worker = create_proletariat()
>>> owner = create_bourgeoisie()
>>> exploitation = Relationship(
...     source_id=worker.id, target_id=owner.id,
...     edge_type=EdgeType.EXPLOITATION
... )
>>> state = WorldState(entities={worker.id: worker, owner.id: owner},
...                    relationships=[exploitation])
>>> config = SimulationConfig()
>>>
>>> sim = Simulation(state, config)
>>> sim.run(100)
>>> print(f"Worker wealth after 100 ticks: {sim.current_state.entities[worker.id].wealth}")
With observers:
>>> from babylon.ai import NarrativeDirector
>>> director = NarrativeDirector()
>>> sim = Simulation(state, config, observers=[director])
>>> sim.run(10)
>>> sim.end()  # Triggers on_simulation_end
Parameters:
__init__(initial_state, config, observers=None, defines=None, tensor_registry=None, calculator_overrides=None)[source]

Initialize simulation with initial state and configuration.

Parameters:
  • initial_state (WorldState) – Starting WorldState at tick 0

  • config (SimulationConfig) – Simulation configuration with formula coefficients

  • observers (list[SimulationObserver] | None) – Optional list of SimulationObserver instances to notify

  • defines (GameDefines | None) – Optional custom GameDefines for scenario-specific coefficients. If None, loads from default defines.yaml location.

  • tensor_registry (TensorRegistry | None) – Optional TensorRegistry for cached tensor data access. If None, tensor data is not available. If provided, it should be pre-hydrated with the relevant counties and years.

  • calculator_overrides (dict[str, Any] | None) – Optional dict of calculator instances to inject into ServiceContainer on each tick (Feature 020).

Return type:

None

add_observer(observer)[source]

Register an observer for simulation notifications.

Observers added after simulation has started will not receive on_simulation_start, but will receive on_tick and on_simulation_end notifications.

Parameters:

observer (SimulationObserver) – Observer implementing SimulationObserver protocol.

Return type:

None

property config: SimulationConfig

Return the simulation configuration.

property current_state: WorldState

Return the current WorldState.

property defines: GameDefines

Return the game defines.

end()[source]

Signal simulation end and notify observers.

Calls on_simulation_end on all registered observers with the current (final) state.

No-op if simulation has not started (no step() calls made). Can be called multiple times, but only the first call after step() will notify observers.

Return type:

None

classmethod from_sqlite(fips_codes, year=2022, observers=None, defines=None, years=None)[source]

Create simulation initialized from SQLite reference database.

This is the main entry point for the MVP simulation engine. It hydrates territories from the reference database with profit_rate computed from QCEW/BEA data.

Parameters:
  • fips_codes (list[str]) – List of 5-digit FIPS codes for counties to simulate. Example: [“26163”, “26125”] for Wayne and Oakland counties.

  • year (int) – Data year for QCEW/BEA data (default 2022).

  • observers (list[SimulationObserver] | None) – Optional list of SimulationObserver instances.

  • defines (GameDefines | None) – Optional custom GameDefines for scenario-specific coefficients.

  • years (Sequence[int] | None) – Optional sequence of years for multi-year time series. When provided, tensor data is hydrated for all specified years and the economics calculator factory is wired automatically.

Return type:

Simulation

Returns:

Initialized Simulation with territories hydrated from database.

Raises:

ValueError – If fips_codes is empty, contains duplicates that reduce to fewer unique codes, or any county is not found in database.

Example

>>> sim = Simulation.from_sqlite(
...     fips_codes=["26163", "26125"],  # Detroit metro
...     year=2022
... )
>>> snapshot = sim.get_snapshot()
>>> wayne = snapshot.territories["26163"]
>>> print(f"Wayne County profit rate: {wayne.profit_rate}")

See also

  • plan.md#Hydration Flow

  • quickstart.md

get_current_tick()[source]

Return the current tick number.

Implements SimulationState protocol.

Return type:

int

Returns:

Non-negative integer representing the current simulation tick. Tick 0 is the initial state before any step() calls.

get_hexes_for_territory(territory_id)[source]

Return the H3 indices claimed by a territory.

Implements SimulationState protocol.

Parameters:

territory_id (str) – Unique identifier for the territory.

Return type:

set[str]

Returns:

Set of H3 index strings. Empty set if territory not found.

get_history()[source]

Return all WorldState snapshots from initial to current.

The history includes: - Index 0: Initial state (tick 0) - Index N: State after N steps (tick N)

Return type:

list[WorldState]

Returns:

List of WorldState snapshots in chronological order.

get_node_by_spatial_index(h3_index)[source]

Return the territory that claims a specific H3 hex (T027).

Implements SimulationState protocol.

This method bridges the spatial representation (H3 hexes used by map visualization like pydeck) to the simulation’s territory model.

Parameters:

h3_index (str) – H3 cell index (15-character lowercase hex string).

Return type:

TerritoryState | None

Returns:

TerritoryState if a territory claims this hex, None otherwise.

Raises:

ValueError – If h3_index is not a valid H3 cell index.

get_outcome()[source]

Return current game outcome from EndgameDetector if present.

Searches registered observers for an EndgameDetector and returns its current outcome. If no EndgameDetector is registered, returns IN_PROGRESS.

Return type:

GameOutcome

Returns:

GameOutcome enum value indicating current game state.

Example

>>> from babylon.engine.observers import EndgameDetector
>>> detector = EndgameDetector()
>>> sim = Simulation(state, config, observers=[detector])
>>> sim.get_outcome()
<GameOutcome.IN_PROGRESS: 'in_progress'>
get_snapshot()[source]

Return a complete snapshot of the current simulation state.

Implements SimulationState protocol.

The snapshot is immutable - modifying the returned object does not affect the simulation. The tensor_registry reference allows cached tensor data access without database queries.

Return type:

SimulationSnapshot

Returns:

SimulationSnapshot containing all state at the current tick.

get_territory_state(territory_id)[source]

Return the state of a specific territory.

Implements SimulationState protocol.

Parameters:

territory_id (str) – Unique identifier for the territory (FIPS code for counties).

Return type:

TerritoryState | None

Returns:

TerritoryState if the territory exists, None otherwise.

get_time_series()[source]

Extract time series records from completed simulation.

Reads accumulated tick dynamics snapshots stored in persistent_context by the step() function at each year boundary. Each snapshot contains county-level economic state computed by TickDynamicsSystem.

Returns:

year, fips, class distribution shares (bourgeoisie_share, petit_bourgeoisie_share, la_share, proletariat_share, lumpen_share), profit_rate, phi_hour, throughput_position, data_source, and Vol I/II/III fields: capital_stock, median_wage, employment (Vol I); circuit_money, circuit_productive, circuit_commodity, liquidity_ratio, realization_crisis (Vol II); surplus_total, interest_payments, ground_rent, profit_of_enterprise, financialization_share, overaccumulation, profit_squeeze (Vol III).

Return type:

List of dicts with keys

Example

>>> sim = Simulation.from_sqlite(["26163"], year=2022, years=[2022])
>>> sim.run(52)
>>> ts = sim.get_time_series()
>>> for record in ts:
...     print(f"{record['year']} {record['fips']}: LA={record['la_share']:.2f}")
property observers: list[SimulationObserver]

Return copy of registered observers.

Returns a copy to preserve encapsulation - modifying the returned list does not affect the internal observer list.

Returns:

A copy of the list of registered observers.

register_observer(callback)[source]

Register a GUI callback for tick notifications.

Implements SimulationControl protocol.

Thread Safety:

Callbacks receive a frozen SimulationSnapshot, not a live reference to mutable simulation state. The ProtocolObserverAdapter creates the snapshot BEFORE iterating callbacks, ensuring: - All callbacks see the same consistent state - GUI code cannot race with engine mutations - Callback processing time does not affect snapshot consistency

Callbacks are invoked in registration order. Duplicate registration is idempotent (callback invoked once per tick).

Parameters:

callback (Callable[[int, SimulationSnapshot], None]) – Function to call after each tick. Signature: (tick: int, snapshot: SimulationSnapshot) -> None

Return type:

None

remove_observer(observer)[source]

Remove an observer. No-op if observer not present.

Parameters:

observer (SimulationObserver) – Observer to remove from notifications.

Return type:

None

reset()[source]

Reset simulation to initial state (tick 0).

Implements SimulationControl protocol.

Restores the simulation to its state immediately after initialization: - tick = 0 - All territory states reset to initial values - profit_rate returns to initial computed values - WorldState reset to initial state - History cleared

Implementation note: reset() restores CACHED initial state.

Return type:

None

run(ticks)[source]

Run simulation for N ticks.

Parameters:

ticks (int) – Number of ticks to advance the simulation

Return type:

WorldState

Returns:

The final WorldState after all ticks complete.

Raises:

ValueError – If ticks is negative or zero

run_until_endgame(max_ticks=1000)[source]

Run simulation until an endgame condition is met or max_ticks reached.

This method runs the simulation step by step, checking after each tick whether the EndgameDetector has detected a game ending condition. It terminates early if an endgame is reached.

Parameters:

max_ticks (int) – Maximum number of ticks to run before returning. Defaults to 1000 to prevent infinite loops.

Returns:

  • final_state: The WorldState when simulation stopped

  • outcome: GameOutcome indicating why simulation stopped (may be IN_PROGRESS if max_ticks reached without endgame)

Return type:

Tuple of (final_state, outcome)

Raises:

ValueError – If max_ticks is negative.

Example

>>> from babylon.engine.observers import EndgameDetector
>>> detector = EndgameDetector()
>>> sim = Simulation(state, config, observers=[detector])
>>> final_state, outcome = sim.run_until_endgame(max_ticks=100)
>>> if outcome == GameOutcome.REVOLUTIONARY_VICTORY:
...     print("The workers have won!")
property services: ServiceContainer

Return the persistent ServiceContainer.

step(n=1)[source]

Advance simulation by n ticks.

Implements SimulationControl protocol’s step(n) method.

Applies the step() function to transform the current state, records the new state in history, updates current_state, and notifies registered observers.

On first step, observers receive on_simulation_start before on_tick.

The persistent context is passed to step() to maintain state across ticks (e.g., previous_wages for bifurcation mechanic).

Parameters:

n (int) – Number of ticks to advance. Must be positive. Defaults to 1 for backward compatibility.

Return type:

WorldState

Returns:

The new WorldState after all ticks complete.

Raises:

ValueError – If n <= 0.

property tensor_registry: TensorRegistry | None

Return the TensorRegistry for cached economic data access.

Returns:

TensorRegistry if initialized, None otherwise.

unregister_observer(callback)[source]

Remove a previously registered GUI callback.

Implements SimulationControl protocol.

If the callback was not registered, this is a no-op (no error raised).

Parameters:

callback (Callable[[int, SimulationSnapshot], None]) – The callback function to remove.

Return type:

None

update_state(new_state)[source]

Update the current state mid-simulation.

This allows modifying the simulation state (e.g., changing relationships) while preserving the persistent context across ticks. Useful for testing scenarios like wage cuts where the previous_wages context must be preserved.

Parameters:

new_state (WorldState) – New WorldState to use as current state. The tick should match the expected continuation tick.

Return type:

None

Note

This does NOT add the new state to history - history reflects actual simulation progression, not manual state updates.

class babylon.engine.AsyncSimulationRunner(simulation, tick_interval=1.0)[source]

Bases: object

Async runner that decouples UI from simulation engine.

The runner wraps a Simulation instance and provides:

  1. Non-blocking steps: Uses asyncio.to_thread() to run simulation.step() without blocking the event loop.

  2. State queue: Pushes WorldState snapshots to an async queue for the UI to consume at its own pace.

  3. Continuous play: start()/stop() manage a background loop that steps the simulation at configurable intervals.

  4. Queue overflow handling: Drops oldest states when queue is full (MAX_QUEUE_SIZE=10) to prevent memory issues.

Parameters:
MAX_QUEUE_SIZE

Maximum states to buffer before dropping oldest.

Example

>>> runner = AsyncSimulationRunner(sim, tick_interval=0.5)
>>> state = await runner.step_once()  # Single step
>>> print(f"Now at tick {state.tick}")
Thread Safety:

The runner uses an asyncio.Lock to serialize step_once() calls, ensuring simulation state consistency even with concurrent access.

MAX_QUEUE_SIZE: int = 10
__init__(simulation, tick_interval=1.0)[source]

Initialize the async runner with a simulation.

Parameters:
  • simulation (Simulation) – The Simulation facade to run.

  • tick_interval (float) – Seconds between steps in continuous mode. Must be positive.

Raises:

ValueError – If tick_interval is not positive.

Return type:

None

async drain_queue()[source]

Remove and return all states from the queue.

Return type:

list[WorldState]

Returns:

List of all WorldState objects that were in the queue, in order from oldest to newest. Empty list if queue was empty.

async get_state()[source]

Get a state from the queue without blocking.

Return type:

WorldState | None

Returns:

The next WorldState if available, None otherwise.

async get_state_blocking(timeout=None)[source]

Get a state from the queue, waiting if necessary.

Parameters:

timeout (float | None) – Maximum seconds to wait. None means wait forever.

Return type:

WorldState

Returns:

The next WorldState from the queue.

Raises:

asyncio.TimeoutError – If timeout expires before a state is available.

property is_running: bool

Return True if continuous mode is active.

property queue: Queue[WorldState]

Return the state queue for direct access.

Returns:

The asyncio.Queue containing WorldState snapshots.

async reset(new_simulation)[source]

Reset the runner with a new simulation.

Stops the runner if running, drains the queue, and assigns the new simulation for future steps.

Parameters:

new_simulation (Simulation) – The new Simulation instance to use.

Return type:

None

property simulation: Simulation

Return the wrapped Simulation instance.

async start()[source]

Start continuous simulation mode.

Creates a background task that steps the simulation at tick_interval intervals. Idempotent - calling start() when already running is a no-op.

The task reference is stored to prevent garbage collection.

Return type:

None

async step_once()[source]

Execute a single simulation step without blocking.

Uses asyncio.to_thread() to run simulation.step() in a thread pool, keeping the event loop responsive.

The new state is pushed to the queue. If the queue is full, the oldest state is dropped to make room.

Return type:

WorldState

Returns:

The new WorldState after the step.

Note

Uses an asyncio.Lock to serialize concurrent calls, ensuring simulation state consistency.

async stop()[source]

Stop continuous simulation mode.

Cancels the background task and waits for it to finish. Idempotent - calling stop() when not running is a no-op.

Return type:

None

property tick_interval: float

Return the interval between steps in continuous mode.

babylon.engine.create_proletariat(id=PERIPHERY_WORKER_ID, name='Proletariat', wealth=0.5, ideology=-0.3, organization=0.1, repression_faced=0.5, subsistence_threshold=0.3, p_acquiescence=0.0, p_revolution=0.0, description='Exploited working class', effective_wealth=0.0, unearned_increment=0.0, ppp_multiplier=1.0)[source]

Create a proletariat (exploited class) social class.

The proletariat is defined by: - PERIPHERY_PROLETARIAT role (exploited in the world system) - Low default wealth (0.5) - Slightly revolutionary ideology (-0.3) - Low organization (0.1 = 10%) - Moderate repression faced (0.5)

Parameters:
  • id (str) – Unique identifier matching ^C[0-9]{3}$ pattern (default: “C001”)

  • name (str) – Human-readable name (default: “Proletariat”)

  • wealth (float) – Economic resources (default: 0.5)

  • ideology (float | IdeologicalProfile) – Ideological position, -1=revolutionary to +1=reactionary (default: -0.3)

  • organization (float) – Collective cohesion (default: 0.1)

  • repression_faced (float) – State violence level (default: 0.5)

  • subsistence_threshold (float) – Minimum wealth for survival (default: 0.3)

  • p_acquiescence (float) – P(S|A) - survival through acquiescence (default: 0.0, calculated by engine)

  • p_revolution (float) – P(S|R) - survival through revolution (default: 0.0, calculated by engine)

  • description (str) – Optional description (default: “Exploited working class”)

  • effective_wealth (float) – PPP-adjusted wealth (default: 0.0, calculated by engine)

  • unearned_increment (float) – PPP bonus (default: 0.0, calculated by engine)

  • ppp_multiplier (float) – PPP multiplier applied to wages (default: 1.0)

Return type:

SocialClass

Returns:

SocialClass configured as proletariat

Example

>>> worker = create_proletariat()
>>> worker.role
<SocialRole.PERIPHERY_PROLETARIAT: 'periphery_proletariat'>
>>> worker.wealth
0.5
babylon.engine.create_bourgeoisie(id=COMPRADOR_ID, name='Bourgeoisie', wealth=10.0, ideology=0.8, organization=0.7, repression_faced=0.1, subsistence_threshold=0.1, p_acquiescence=0.0, p_revolution=0.0, description='Capital-owning exploiter class', effective_wealth=0.0, unearned_increment=0.0, ppp_multiplier=1.0)[source]

Create a bourgeoisie (exploiter class) social class.

The bourgeoisie is defined by: - CORE_BOURGEOISIE role (exploiter in the world system) - High default wealth (10.0) - Reactionary ideology (0.8) - High organization (0.7 = 70%) - Low repression faced (0.1 - protected by state)

Parameters:
  • id (str) – Unique identifier matching ^C[0-9]{3}$ pattern (default: “C002”)

  • name (str) – Human-readable name (default: “Bourgeoisie”)

  • wealth (float) – Economic resources (default: 10.0)

  • ideology (float | IdeologicalProfile) – Ideological position, -1=revolutionary to +1=reactionary (default: 0.8)

  • organization (float) – Collective cohesion (default: 0.7)

  • repression_faced (float) – State violence level (default: 0.1)

  • subsistence_threshold (float) – Minimum wealth for survival (default: 0.1)

  • p_acquiescence (float) – P(S|A) - survival through acquiescence (default: 0.0, calculated by engine)

  • p_revolution (float) – P(S|R) - survival through revolution (default: 0.0, calculated by engine)

  • description (str) – Optional description (default: “Capital-owning exploiter class”)

  • effective_wealth (float) – PPP-adjusted wealth (default: 0.0, calculated by engine)

  • unearned_increment (float) – PPP bonus (default: 0.0, calculated by engine)

  • ppp_multiplier (float) – PPP multiplier applied to wages (default: 1.0)

Return type:

SocialClass

Returns:

SocialClass configured as bourgeoisie

Example

>>> owner = create_bourgeoisie()
>>> owner.role
<SocialRole.CORE_BOURGEOISIE: 'core_bourgeoisie'>
>>> owner.wealth
10.0
babylon.engine.format_class_struggle_history(simulation)[source]

Format simulation history as a narrative of class struggle.

Generates a human-readable summary of the simulation history, highlighting wealth transfers, ideological changes, and tension accumulation.

Parameters:

simulation (Simulation) – A Simulation instance with history data

Return type:

str

Returns:

A formatted string narrative describing the class struggle dynamics.

Example

>>> sim = Simulation(initial_state, config)
>>> sim.run(100)
>>> print(format_class_struggle_history(sim))
# History of Class Struggle
...
babylon.engine.create_two_node_scenario(worker_wealth=0.5, owner_wealth=0.5, extraction_efficiency=0.8, repression_level=0.5, worker_organization=0.1, worker_ideology=0.0)[source]

Create the minimal viable dialectic: one worker, one owner, one exploitation edge.

This is the two-node scenario from the Phase 1 blueprint, now ready for Phase 2 simulation. It models the fundamental class relationship: - Worker produces value (source of exploitation edge) - Owner extracts imperial rent (target of exploitation edge) - Tension accumulates on the edge

Parameters:
  • worker_wealth (float) – Initial wealth for periphery worker (default 0.5)

  • owner_wealth (float) – Initial wealth for core owner (default 0.5)

  • extraction_efficiency (float) – Alpha in imperial rent formula (default 0.8)

  • repression_level (float) – State violence capacity (default 0.5)

  • worker_organization (float) – Worker class cohesion (default 0.1)

  • worker_ideology (float) – Worker ideology, -1=revolutionary to +1=reactionary (default 0.0)

Return type:

tuple[WorldState, SimulationConfig, GameDefines]

Returns:

Tuple of (WorldState, SimulationConfig, GameDefines) ready for step() function.

Example

>>> state, config, defines = create_two_node_scenario()
>>> for _ in range(100):
...     state = step(state, config)
>>> print(f"Worker wealth after 100 ticks: {state.entities['C001'].wealth}")
babylon.engine.create_imperial_circuit_scenario(periphery_wealth=0.6, core_wealth=0.9, comprador_cut=0.90, imperial_rent_pool=100.0, extraction_efficiency=0.8, repression_level=0.5, solidarity_strength=0.0)[source]

Create the 4-node Imperial Circuit scenario.

This scenario fixes the “Robin Hood” bug in create_two_node_scenario() where super-wages incorrectly flow to periphery workers. In MLM-TW theory, super-wages should only go to the Labor Aristocracy (core workers), NOT periphery workers.

Topology:

        graph LR
    Pw["P_w (Periphery Workers)"] -->|EXPLOITATION| Pc["P_c (Comprador)"]
    Pc -->|TRIBUTE| Cb["C_b (Core Bourgeoisie)"]
    Cb -->|WAGES| Cw["C_w (Labor Aristocracy)"]
    Cb -->|CLIENT_STATE| Pc
    Pw -.->|"SOLIDARITY (0.0)"| Cw
    

Value Flow:

  1. EXPLOITATION: P_w -> P_c (imperial rent extraction from workers)

  2. TRIBUTE: P_c -> C_b (comprador sends tribute, keeps comprador_cut)

  3. WAGES: C_b -> C_w (super-wages to labor aristocracy, NOT periphery!)

  4. CLIENT_STATE: C_b -> P_c (subsidy to stabilize client state)

  5. SOLIDARITY: P_w -> C_w (potential internationalism, starts at 0)

Parameters:
  • periphery_wealth (float) – Initial wealth for periphery worker P001 (default 0.1)

  • core_wealth (float) – Initial wealth for core bourgeoisie C001 (default 0.9)

  • comprador_cut (float) – Fraction comprador keeps from extracted value (default 0.90)

  • imperial_rent_pool (float) – Initial imperial rent pool (default 100.0)

  • extraction_efficiency (float) – Alpha in imperial rent formula (default 0.8)

  • repression_level (float) – Base repression level (default 0.5)

  • solidarity_strength (float) – Initial solidarity between P_w and C_w (default 0.0). When > 0, wage crisis routes to class consciousness (revolutionary). When = 0, wage crisis routes to national identity (fascist).

Return type:

tuple[WorldState, SimulationConfig, GameDefines]

Returns:

Tuple of (WorldState, SimulationConfig, GameDefines) ready for step() function.

Example

>>> state, config, defines = create_imperial_circuit_scenario()
>>> # Verify wages go to labor aristocracy, not periphery
>>> wages_edges = [r for r in state.relationships if r.edge_type == EdgeType.WAGES]
>>> assert state.entities[wages_edges[0].target_id].role == SocialRole.LABOR_ARISTOCRACY
babylon.engine.create_high_tension_scenario()[source]

Create a scenario with high initial tension.

Worker is poor, owner is rich, tension is already elevated. Useful for testing phase transitions and rupture conditions.

Return type:

tuple[WorldState, SimulationConfig, GameDefines]

Returns:

Tuple of (WorldState, SimulationConfig, GameDefines) near rupture point.

babylon.engine.create_labor_aristocracy_scenario()[source]

Create a scenario with a labor aristocracy (Wc > Vc).

Worker receives more than they produce, enabled by imperial rent from elsewhere. Tests consciousness decay mechanics.

Return type:

tuple[WorldState, SimulationConfig, GameDefines]

Returns:

Tuple of (WorldState, SimulationConfig, GameDefines) with labor aristocracy.

class babylon.engine.Event(type, tick, payload, timestamp=<factory>)[source]

Bases: object

Immutable event representing a simulation occurrence.

Events are frozen dataclasses to ensure they cannot be modified after creation, maintaining integrity of the event history.

Parameters:
type

Event type identifier (e.g., “tick”, “rupture”, “synthesis”)

tick

Simulation tick when the event occurred

payload

Event-specific data dictionary

timestamp

Wall-clock time when event was created

__init__(type, tick, payload, timestamp=<factory>)
Parameters:
Return type:

None

type: str
tick: int
payload: dict[str, Any]
timestamp: datetime
class babylon.engine.EventBus[source]

Bases: object

Publish/subscribe event bus for simulation components.

The EventBus enables decoupled communication between systems. Components can subscribe to specific event types and will be notified when events of that type are published.

All published events are stored in history for replay/debugging.

Epoch 1→2 Bridge: Supports optional interceptor chain for adversarial mechanics. If no interceptors are registered, events flow through with zero overhead (backwards compatible).

The interceptor chain processes events before emission: - Interceptors are sorted by priority (higher runs first) - Each interceptor can ALLOW, BLOCK, or MODIFY the event - If blocked, the event is logged and not emitted - If modified, the modified event continues through the chain

Example

>>> bus = EventBus()
>>> def on_tick(event: Event) -> None:
...     print(f"Tick {event.tick}: {event.payload}")
>>> bus.subscribe("tick", on_tick)
>>> bus.publish(Event(type="tick", tick=1, payload={"value": 42}))
Tick 1: {'value': 42}
__init__()[source]

Initialize an empty event bus.

Return type:

None

clear_blocked_events()[source]

Remove all blocked event records.

Return type:

None

clear_history()[source]

Remove all events from history.

Return type:

None

get_blocked_events()[source]

Get a copy of all blocked events.

The blocked events audit channel records every event that was stopped by an interceptor, including the blocking reason.

Return type:

list[BlockedEvent]

Returns:

List of BlockedEvent records in chronological order.

get_history()[source]

Get a copy of all published events.

Return type:

list[Event]

Returns:

List of events in chronological order (oldest first).

property interceptor_count: int

Number of registered interceptors.

publish(event, context=None)[source]

Publish an event to all subscribed handlers.

If interceptors are registered, the event passes through the interceptor chain first. If any interceptor blocks the event, it is logged to the blocked events audit channel and not emitted.

The event is stored in history only if it passes all interceptors.

Parameters:
  • event (Event) – The event to publish.

  • context (WorldContext | None) – Optional world context for interceptors. Required for Epoch 2 adversarial mechanics.

Return type:

None

register_interceptor(interceptor)[source]

Register an interceptor to process events before emission.

Interceptors are sorted by priority (higher first) each time an event is published. Multiple interceptors with the same priority execute in registration order.

Parameters:

interceptor (EventInterceptor) – The interceptor to register.

Return type:

None

Example

>>> from babylon.engine.interceptor import EventInterceptor
>>> bus = EventBus()
>>> bus.register_interceptor(my_security_interceptor)
subscribe(event_type, handler)[source]

Subscribe a handler to receive events of a specific type.

Parameters:
  • event_type (str) – The type of events to subscribe to

  • handler (Callable[[Event], None]) – Callable that receives Event objects

Return type:

None

unregister_interceptor(interceptor)[source]

Remove an interceptor from the chain.

Parameters:

interceptor (EventInterceptor) – The interceptor to remove.

Raises:

ValueError – If the interceptor is not registered.

Return type:

None

class babylon.engine.DatabaseConnection(url='sqlite:///babylon.sqlite')[source]

Bases: object

Injectable database connection wrapper.

Wraps SQLAlchemy engine and sessionmaker to provide clean resource management and testability.

Example

>>> db = DatabaseConnection(url="sqlite:///:memory:")
>>> with db.session() as session:
...     result = session.execute(text("SELECT 1"))
...     print(result.scalar())
1
>>> db.close()
Parameters:

url (str)

__init__(url='sqlite:///babylon.sqlite')[source]

Initialize database connection.

Parameters:

url (str) – SQLAlchemy database URL. Defaults to local SQLite file. Use “sqlite:///:memory:” for in-memory testing.

Return type:

None

close()[source]

Dispose of the database engine and release resources.

After calling close(), the connection cannot be used. Attempting to create new sessions will fail.

Return type:

None

session()[source]

Get a database session within a context manager.

The session is automatically closed when the context exits. On exception, the session is rolled back.

Yields:

SQLAlchemy Session object

Return type:

Iterator[Session]

Example

>>> with db.session() as session:
...     session.execute(text("INSERT INTO ..."))
...     session.commit()
Return type:

Iterator[Session]

class babylon.engine.FormulaRegistry[source]

Bases: object

Registry for named mathematical formulas.

Provides a central lookup for all simulation formulas, enabling: - Hot-swapping formulas for testing with mocks - Modding support for custom formula implementations - Centralized formula management

Example

>>> registry = FormulaRegistry.default()
>>> la = registry.get("labor_aristocracy_ratio")
>>> result = la(core_wages=120.0, value_produced=100.0)
__init__()[source]

Initialize an empty formula registry.

Return type:

None

classmethod default()[source]

Create a registry pre-populated with all standard formulas.

Registers formulas from babylon.formulas: - labor_aristocracy_ratio - is_labor_aristocracy - consciousness_drift - acquiescence_probability - revolution_probability - crossover_threshold - loss_aversion - exchange_ratio - exploitation_rate - value_transfer - prebisch_singer

Return type:

FormulaRegistry

Returns:

FormulaRegistry with all standard formulas registered

get(name)[source]

Retrieve a formula by name.

Parameters:

name (str) – The formula identifier

Return type:

Callable[..., Any]

Returns:

The registered formula callable

Raises:

KeyError – If no formula is registered with the given name

list_formulas()[source]

List all registered formula names.

Return type:

list[str]

Returns:

List of formula names in arbitrary order

register(name, func)[source]

Register or replace a formula by name.

Parameters:
  • name (str) – Unique identifier for the formula

  • func (Callable[..., Any]) – Callable implementing the formula

Return type:

None

class babylon.engine.ServiceContainer(config, database, event_bus, formulas, defines, metrics, field_registry=None, reserve_army_data_source=None, dispossession_data_source=None, productivity_data_source=None, melt_calculator=None, basket_calculator=None, gamma_calculator=None, capital_calculator=None, throughput_calculator=None, transition_engine=None, tensor_registry=None, community_hypergraph=None, turnover_profile_source=None, inventory_data_source=None, depreciation_data_source=None, hex_grid=None, persistence=None, tracer=None, distribution_calculator=None, interest_calculator=None, credit_cycle_detector=None, fictitious_capital_calculator=None, rent_calculator=None, housing_calculator=None, counter_tendency_calculator=None, value_basis_converter=None, financial_crisis_assessor=None, z1_source=None, housing_data_source=None)[source]

Bases: object

Container for all simulation services.

Aggregates the six core services needed by the simulation, plus optional economics calculator services for tick dynamics (Feature 017):

Core:
  • config: Immutable simulation parameters

  • database: Database connection for persistence

  • event_bus: Publish/subscribe communication

  • formulas: Registry of mathematical formulas

  • defines: Centralized game coefficients (Paradox Refactor)

  • metrics: Telemetry collector for observability (Spec 008)

Field Topology (Feature 002, optional for backward compatibility):
  • field_registry: Contradiction field computation registry

Economics (Feature 017, all optional for backward compatibility):
  • melt_calculator: National MELT computation (Feature 013)

  • basket_calculator: Basket visibility computation (Feature 013)

  • gamma_calculator: Reproductive visibility computation (Feature 015)

  • capital_calculator: Capital stock computation (Feature 012)

  • throughput_calculator: Throughput position computation (Feature 014)

  • transition_engine: Class transition engine (Feature 016)

  • tensor_registry: Cached economic tensor data (Feature 011)

Example

>>> container = ServiceContainer.create()
>>> rent = container.formulas.get("imperial_rent")
>>> container.event_bus.publish(Event(...))
>>> with container.database.session() as session:
...     # do database work
>>> container.database.close()
>>> default_org = container.defines.DEFAULT_ORGANIZATION
>>> container.metrics.increment("ticks_processed")
Parameters:
  • config (SimulationConfig)

  • database (DatabaseConnection)

  • event_bus (EventBus)

  • formulas (FormulaRegistry)

  • defines (GameDefines)

  • metrics (MetricsCollectorProtocol)

  • field_registry (Any)

  • reserve_army_data_source (Any)

  • dispossession_data_source (Any)

  • productivity_data_source (Any)

  • melt_calculator (Any)

  • basket_calculator (Any)

  • gamma_calculator (Any)

  • capital_calculator (Any)

  • throughput_calculator (Any)

  • transition_engine (Any)

  • tensor_registry (Any)

  • community_hypergraph (Any)

  • turnover_profile_source (Any)

  • inventory_data_source (Any)

  • depreciation_data_source (Any)

  • hex_grid (Any)

  • persistence (Any)

  • tracer (Any)

  • distribution_calculator (Any)

  • interest_calculator (Any)

  • credit_cycle_detector (Any)

  • fictitious_capital_calculator (Any)

  • rent_calculator (Any)

  • housing_calculator (Any)

  • counter_tendency_calculator (Any)

  • value_basis_converter (Any)

  • financial_crisis_assessor (Any)

  • z1_source (Any)

  • housing_data_source (Any)

__init__(config, database, event_bus, formulas, defines, metrics, field_registry=None, reserve_army_data_source=None, dispossession_data_source=None, productivity_data_source=None, melt_calculator=None, basket_calculator=None, gamma_calculator=None, capital_calculator=None, throughput_calculator=None, transition_engine=None, tensor_registry=None, community_hypergraph=None, turnover_profile_source=None, inventory_data_source=None, depreciation_data_source=None, hex_grid=None, persistence=None, tracer=None, distribution_calculator=None, interest_calculator=None, credit_cycle_detector=None, fictitious_capital_calculator=None, rent_calculator=None, housing_calculator=None, counter_tendency_calculator=None, value_basis_converter=None, financial_crisis_assessor=None, z1_source=None, housing_data_source=None)
Parameters:
  • config (SimulationConfig)

  • database (DatabaseConnection)

  • event_bus (EventBus)

  • formulas (FormulaRegistry)

  • defines (GameDefines)

  • metrics (MetricsCollectorProtocol)

  • field_registry (Any)

  • reserve_army_data_source (Any)

  • dispossession_data_source (Any)

  • productivity_data_source (Any)

  • melt_calculator (Any)

  • basket_calculator (Any)

  • gamma_calculator (Any)

  • capital_calculator (Any)

  • throughput_calculator (Any)

  • transition_engine (Any)

  • tensor_registry (Any)

  • community_hypergraph (Any)

  • turnover_profile_source (Any)

  • inventory_data_source (Any)

  • depreciation_data_source (Any)

  • hex_grid (Any)

  • persistence (Any)

  • tracer (Any)

  • distribution_calculator (Any)

  • interest_calculator (Any)

  • credit_cycle_detector (Any)

  • fictitious_capital_calculator (Any)

  • rent_calculator (Any)

  • housing_calculator (Any)

  • counter_tendency_calculator (Any)

  • value_basis_converter (Any)

  • financial_crisis_assessor (Any)

  • z1_source (Any)

  • housing_data_source (Any)

Return type:

None

basket_calculator: Any = None
capital_calculator: Any = None
community_hypergraph: Any = None
counter_tendency_calculator: Any = None
classmethod create(config=None, defines=None, metrics=None, *, hex_grid=None, persistence=None, tracer=None, reserve_army_data_source=None, dispossession_data_source=None, productivity_data_source=None, field_registry=None, melt_calculator=None, basket_calculator=None, gamma_calculator=None, capital_calculator=None, throughput_calculator=None, transition_engine=None, tensor_registry=None, community_hypergraph=None, turnover_profile_source=None, inventory_data_source=None, depreciation_data_source=None, distribution_calculator=None, interest_calculator=None, credit_cycle_detector=None, fictitious_capital_calculator=None, rent_calculator=None, housing_calculator=None, counter_tendency_calculator=None, value_basis_converter=None, financial_crisis_assessor=None, z1_source=None, housing_data_source=None)[source]

Factory method to create a fully-initialized container.

Creates all services with sensible defaults. Uses in-memory SQLite for database isolation in tests.

Parameters:
  • config (SimulationConfig | None) – Optional custom config. If None, uses default SimulationConfig.

  • defines (GameDefines | None) – Optional custom defines. If None, uses default GameDefines.

  • metrics (MetricsCollectorProtocol | None) – Optional custom metrics collector. If None, creates a new MetricsCollector instance. Pass a mock for testing.

  • field_registry (Any) – Optional FieldRegistry for contradiction fields (Feature 002).

  • melt_calculator (Any) – Optional MELTCalculator (Feature 013).

  • basket_calculator (Any) – Optional BasketVisibilityCalculator (Feature 013).

  • gamma_calculator (Any) – Optional GammaIIICalculator (Feature 015).

  • capital_calculator (Any) – Optional CapitalStockCalculator (Feature 012).

  • throughput_calculator (Any) – Optional ThroughputCalculator (Feature 014).

  • transition_engine (Any) – Optional ClassTransitionEngine (Feature 016).

  • tensor_registry (Any) – Optional TensorRegistry for cached tensor data (Feature 011).

  • community_hypergraph (Any) – Optional XGI Hypergraph for community membership (Feature 022).

  • hex_grid (Any)

  • persistence (Any)

  • tracer (Any)

  • reserve_army_data_source (Any)

  • dispossession_data_source (Any)

  • productivity_data_source (Any)

  • turnover_profile_source (Any)

  • inventory_data_source (Any)

  • depreciation_data_source (Any)

  • distribution_calculator (Any)

  • interest_calculator (Any)

  • credit_cycle_detector (Any)

  • fictitious_capital_calculator (Any)

  • rent_calculator (Any)

  • housing_calculator (Any)

  • counter_tendency_calculator (Any)

  • value_basis_converter (Any)

  • financial_crisis_assessor (Any)

  • z1_source (Any)

  • housing_data_source (Any)

Return type:

ServiceContainer

Returns:

ServiceContainer with all services initialized

credit_cycle_detector: Any = None
depreciation_data_source: Any = None
dispossession_data_source: Any = None
distribution_calculator: Any = None
fictitious_capital_calculator: Any = None
field_registry: Any = None
financial_crisis_assessor: Any = None
gamma_calculator: Any = None
hex_grid: Any = None
housing_calculator: Any = None
housing_data_source: Any = None
interest_calculator: Any = None
inventory_data_source: Any = None
melt_calculator: Any = None
persistence: Any = None
productivity_data_source: Any = None
rent_calculator: Any = None
reserve_army_data_source: Any = None
tensor_registry: Any = None
throughput_calculator: Any = None
tracer: Any = None
transition_engine: Any = None
turnover_profile_source: Any = None
value_basis_converter: Any = None
z1_source: Any = None
config: SimulationConfig
database: DatabaseConnection
event_bus: EventBus
formulas: FormulaRegistry
defines: GameDefines
metrics: MetricsCollectorProtocol
class babylon.engine.SimulationObserver(*args, **kwargs)[source]

Bases: Protocol

Protocol for entities observing simulation state changes.

Observers receive notifications at three lifecycle points: 1. on_simulation_start() - when simulation begins (first step) 2. on_tick() - after each tick completes 3. on_simulation_end() - when simulation ends (explicit end() call)

Note: All state objects (WorldState) are frozen and immutable. Attempting to modify them will raise AttributeError.

Example

>>> class MyObserver:
...     @property
...     def name(self) -> str:
...         return "MyObserver"
...
...     def on_simulation_start(
...         self, initial_state: WorldState, config: SimulationConfig
...     ) -> None:
...         print(f"Started at tick {initial_state.tick}")
...
...     def on_tick(
...         self, previous_state: WorldState, new_state: WorldState
...     ) -> None:
...         print(f"Tick {previous_state.tick} -> {new_state.tick}")
...
...     def on_simulation_end(self, final_state: WorldState) -> None:
...         print(f"Ended at tick {final_state.tick}")
__init__(*args, **kwargs)
property name: str

Observer identifier for logging and debugging.

Returns:

A string identifying this observer instance.

on_simulation_end(final_state)[source]

Called when simulation ends (on explicit end() call).

Use this hook to cleanup resources, generate summaries, or finalize any accumulated data.

Parameters:

final_state (WorldState) – The final WorldState when simulation ends.

Return type:

None

on_simulation_start(initial_state, config)[source]

Called when simulation begins (on first step call).

Use this hook to initialize resources, establish context, or prepare for observing the simulation run.

Parameters:
  • initial_state (WorldState) – The WorldState at tick 0 (before any steps).

  • config (SimulationConfig) – The SimulationConfig for this run.

Return type:

None

on_tick(previous_state, new_state)[source]

Called after each tick completes with both states for delta analysis.

This is the primary notification hook. Observers receive both the previous and new state to enable delta analysis (what changed).

Parameters:
  • previous_state (WorldState) – WorldState before the tick.

  • new_state (WorldState) – WorldState after the tick.

Return type:

None

class babylon.engine.TopologyMonitor(resilience_test_interval=5, resilience_removal_rate=None, logger=None, gaseous_threshold=None, condensation_threshold=None, vanguard_threshold=None)[source]

Bases: object

Observer tracking solidarity network condensation.

Implements SimulationObserver protocol to receive state change notifications and analyze the topology of SOLIDARITY edges.

Monitors:
  • Connected components (atomization vs. condensation)

  • Percolation ratio (L_max / N)

  • Liquidity metrics (potential vs. actual solidarity)

  • Resilience (survives targeted node removal)

Narrative states logged:
  • Gaseous: percolation < 0.1 (atomized)

  • Liquid: percolation crosses 0.5 (condensation detected)

  • Brittle: potential >> actual (broad but fragile)

  • Fragile: resilience = False (Sword of Damocles)

Parameters:
  • resilience_test_interval (int)

  • resilience_removal_rate (float | None)

  • logger (logging.Logger | None)

  • gaseous_threshold (float | None)

  • condensation_threshold (float | None)

  • vanguard_threshold (float | None)

name

Observer identifier (“TopologyMonitor”)

history

List of TopologySnapshot for each tick

__init__(resilience_test_interval=5, resilience_removal_rate=None, logger=None, gaseous_threshold=None, condensation_threshold=None, vanguard_threshold=None)[source]

Initialize TopologyMonitor.

Parameters:
  • resilience_test_interval (int) – Run resilience test every N ticks (0 = disabled). Default 5.

  • resilience_removal_rate (float | None) – Fraction of nodes to remove in test. Defaults to GameDefines.topology.resilience_removal_rate.

  • logger (Logger | None) – Logger instance (default: module logger)

  • gaseous_threshold (float | None) – Percolation ratio below this = atomized. Defaults to GameDefines.topology.gaseous_threshold.

  • condensation_threshold (float | None) – Percolation ratio for phase transition. Defaults to GameDefines.topology.condensation_threshold.

  • vanguard_threshold (float | None) – Cadre density threshold for solid phase. Defaults to GameDefines.topology.vanguard_density_threshold.

Return type:

None

get_pending_events()[source]

Return and clear pending events for collection by Simulation facade.

Observer events cannot be emitted directly to WorldState because observers run AFTER WorldState is frozen. Instead, pending events are collected by the Simulation facade and injected into the NEXT tick’s WorldState.

Return type:

list[SimulationEvent]

Returns:

List of pending SimulationEvent objects (cleared after return).

property history: list[TopologySnapshot]

Return copy of snapshot history.

property name: str

Return observer identifier.

on_simulation_end(_final_state)[source]

Called when simulation ends.

Logs summary of topology metrics.

Parameters:

_final_state (WorldState) – Final WorldState when simulation ends (unused)

Return type:

None

on_simulation_start(initial_state, _config)[source]

Called when simulation begins.

Initializes history and records initial topology snapshot.

Parameters:
Return type:

None

on_tick(_previous_state, new_state)[source]

Called after each tick completes.

Records topology snapshot and detects phase transitions.

Parameters:
  • _previous_state (WorldState) – WorldState before the tick (unused)

  • new_state (WorldState) – WorldState after the tick

Return type:

None

class babylon.engine.GraphProtocol(*args, **kwargs)[source]

Bases: Protocol

Protocol for backend-agnostic graph operations.

Systems interact with the simulation graph ONLY through this protocol. The concrete implementation (NetworkX, DuckDB) is hidden behind adapters.

This protocol is runtime_checkable, enabling isinstance() checks for protocol compliance.

Example

>>> class MyAdapter:
...     def add_node(self, node_id: str, node_type: str, **attrs: Any) -> None:
...         pass
...     # ... implement all 16 methods
>>> adapter = MyAdapter()
>>> isinstance(adapter, GraphProtocol)
True
__init__(*args, **kwargs)
add_edge(source, target, edge_type, weight=1.0, **attributes)[source]

Add directed edge with type, weight, and attributes.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge category (e.g., ‘SOLIDARITY’, ‘EXPLOITATION’).

  • weight (float) – Generic weight (default 1.0).

  • **attributes (Any) – Type-specific attributes to store on the edge.

Return type:

None

add_node(node_id, node_type, **attributes)[source]

Add a node with type marker and arbitrary attributes.

Parameters:
  • node_id (str) – Unique identifier for the node.

  • node_type (str) – Discriminator for polymorphism (e.g., ‘social_class’).

  • **attributes (Any) – Type-specific attributes to store on the node.

Return type:

None

aggregate(target, group_by=None, agg_func='count', agg_attr=None)[source]

Aggregate over nodes or edges.

Parameters:
  • target (Literal['nodes', 'edges']) – Whether to aggregate nodes or edges.

  • group_by (str | None) – Attribute to group by (e.g., ‘type’).

  • agg_func (Literal['count', 'sum', 'avg', 'min', 'max']) – Aggregation function to apply.

  • agg_attr (str | None) – Attribute to aggregate (required for sum/avg/min/max).

Return type:

dict[str, float]

Returns:

Dict mapping group keys to aggregated values.

Example

>>> graph.aggregate("nodes", group_by="type")
{"social_class": 4, "territory": 2}
count_edges(edge_type=None)[source]

Count edges, optionally by type.

Parameters:

edge_type (str | None) – Filter by edge type (None = count all).

Return type:

int

Returns:

Number of matching edges.

count_nodes(node_type=None)[source]

Count nodes, optionally by type.

Parameters:

node_type (str | None) – Filter by node type (None = count all).

Return type:

int

Returns:

Number of matching nodes.

execute_traversal(query)[source]

Execute a generic traversal query.

This is the hook for complex operations like percolation analysis, pathfinding, and component detection.

Parameters:

query (TraversalQuery) – TraversalQuery specifying the traversal.

Return type:

TraversalResult

Returns:

TraversalResult with nodes, edges, paths, or aggregates.

Raises:

ValueError – If query_type is not supported.

get_edge(source, target, edge_type)[source]

Retrieve specific edge by source, target, and type.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge type to match.

Return type:

GraphEdge | None

Returns:

GraphEdge model if found, None otherwise.

get_graph_attr(key, default=None)[source]

Retrieve a graph-level attribute.

Graph attributes store global metadata (e.g., economy state, base_year, tick_dynamics). Maps to a metadata table in DuckDB.

Parameters:
  • key (str) – Attribute name to retrieve.

  • default (Any) – Value to return if attribute not present.

Return type:

Any

Returns:

The attribute value or default.

get_neighborhood(node_id, radius=1, edge_types=None, direction='out')[source]

Get all nodes within radius hops of the source node.

Parameters:
  • node_id (str) – Center node for neighborhood.

  • radius (int) – Maximum hop distance (1 = immediate neighbors).

  • edge_types (set[str] | None) – Filter to specific edge types (None = all).

  • direction (Literal['out', 'in', 'both']) – Which edges to follow: outgoing, incoming, or both.

Return type:

Any

Returns:

SubgraphView or equivalent containing nodes in neighborhood.

Raises:

KeyError – If node does not exist.

get_node(node_id)[source]

Retrieve node by ID.

Parameters:

node_id (str) – The node identifier to look up.

Return type:

GraphNode | None

Returns:

GraphNode model if found, None otherwise.

query_edges(edge_type=None, predicate=None, min_weight=None, max_weight=None)[source]

Query edges with optional filtering.

Returns an iterator for DuckDB compatibility (lazy evaluation).

Parameters:
  • edge_type (str | None) – Filter by edge type.

  • predicate (Callable[[GraphEdge], bool] | None) – Python callable for complex filtering.

  • min_weight (float | None) – Minimum weight threshold.

  • max_weight (float | None) – Maximum weight threshold.

Return type:

Iterator[GraphEdge]

Returns:

Iterator of matching GraphEdge models.

query_nodes(node_type=None, predicate=None, attributes=None)[source]

Query nodes with optional filtering.

Returns an iterator for DuckDB compatibility (lazy evaluation).

Parameters:
  • node_type (str | None) – Filter by node type (None = all types).

  • predicate (Callable[[GraphNode], bool] | None) – Python callable for complex filtering.

  • attributes (dict[str, Any] | None) – Attribute equality filter (DuckDB-translatable).

Return type:

Iterator[GraphNode]

Returns:

Iterator of matching GraphNode models.

remove_edge(source, target, edge_type)[source]

Remove specific edge.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge type to match.

Raises:

KeyError – If edge does not exist.

Return type:

None

remove_node(node_id)[source]

Remove node and all incident edges.

Parameters:

node_id (str) – The node identifier to remove.

Raises:

KeyError – If node does not exist.

Return type:

None

set_graph_attr(key, value)[source]

Set a graph-level attribute.

Parameters:
  • key (str) – Attribute name to set.

  • value (Any) – Value to store.

Return type:

None

shortest_path(source, target, edge_types=None, weight_attr=None)[source]

Find shortest path between two nodes.

Parameters:
  • source (str) – Start node ID.

  • target (str) – End node ID.

  • edge_types (set[str] | None) – Filter to specific edge types.

  • weight_attr (str | None) – Attribute to use as weight (None = hop count).

Return type:

list[str] | None

Returns:

List of node IDs in path, or None if no path exists.

update_edge(source, target, edge_type, **attributes)[source]

Partial update of edge attributes.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge type to match.

  • **attributes (Any) – Attributes to update (merged with existing).

Raises:

KeyError – If edge does not exist.

Return type:

None

update_node(node_id, **attributes)[source]

Partial update of node attributes (merge, not replace).

Parameters:
  • node_id (str) – The node identifier to update.

  • **attributes (Any) – Attributes to update (merged with existing).

Raises:

KeyError – If node does not exist.

Return type:

None

class babylon.engine.NetworkXAdapter[source]

Bases: AggregationMixin, QueryMixin

Reference implementation of GraphProtocol using NetworkX.

Wraps nx.DiGraph and provides all 18 GraphProtocol methods. Node types are stored as ‘_node_type’ attribute, edge types as ‘_edge_type’.

Inherits from mixins:
  • AggregationMixin: aggregate()

  • QueryMixin: query_nodes(), query_edges(), count_nodes(), count_edges()

Example

>>> adapter = NetworkXAdapter()
>>> adapter.add_node("C001", "social_class", wealth=100.0)
>>> node = adapter.get_node("C001")
>>> node.wealth
100.0
__init__()[source]

Initialize with empty DiGraph.

Return type:

None

add_edge(source, target, edge_type, weight=1.0, **attributes)[source]

Add directed edge with type and weight.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge category.

  • weight (float) – Generic weight (default 1.0).

  • **attributes (Any) – Type-specific attributes.

Return type:

None

add_node(node_id, node_type, **attributes)[source]

Add a node with type marker and attributes.

Parameters:
  • node_id (str) – Unique identifier for the node.

  • node_type (str) – Discriminator for polymorphism.

  • **attributes (Any) – Type-specific attributes to store.

Return type:

None

execute_traversal(query)[source]

Execute generic traversal query.

Parameters:

query (TraversalQuery) – TraversalQuery specifying the traversal.

Return type:

TraversalResult

Returns:

TraversalResult with nodes, edges, paths, or aggregates.

Raises:

ValueError – If query_type is not supported.

get_edge(source, target, edge_type)[source]

Retrieve specific edge by source, target, and type.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge type to match.

Return type:

GraphEdge | None

Returns:

GraphEdge model if found and type matches, None otherwise.

get_graph_attr(key, default=None)[source]

Retrieve a graph-level attribute.

Parameters:
  • key (str) – Attribute name to retrieve.

  • default (Any) – Value to return if attribute not present.

Return type:

Any

Returns:

The attribute value or default.

get_neighborhood(node_id, radius=1, edge_types=None, direction='out')[source]

Get all nodes within radius hops of the source node.

Parameters:
  • node_id (str) – Center node for neighborhood.

  • radius (int) – Maximum hop distance (1 = immediate neighbors).

  • edge_types (set[str] | None) – Filter to specific edge types (None = all).

  • direction (Literal['out', 'in', 'both']) – Which edges to follow: outgoing, incoming, or both.

Return type:

SubgraphView

Returns:

SubgraphView containing nodes in neighborhood.

Raises:

KeyError – If node does not exist.

get_node(node_id)[source]

Retrieve node by ID.

Parameters:

node_id (str) – The node identifier to look up.

Return type:

GraphNode | None

Returns:

GraphNode model if found, None otherwise.

remove_edge(source, target, edge_type)[source]

Remove specific edge.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge type to match.

Raises:

KeyError – If edge does not exist or type doesn’t match.

Return type:

None

remove_node(node_id)[source]

Remove node and all incident edges.

Parameters:

node_id (str) – The node identifier to remove.

Raises:

KeyError – If node does not exist.

Return type:

None

set_graph_attr(key, value)[source]

Set a graph-level attribute.

Parameters:
  • key (str) – Attribute name to set.

  • value (Any) – Value to store.

Return type:

None

shortest_path(source, target, edge_types=None, weight_attr=None)[source]

Find shortest path between two nodes.

Parameters:
  • source (str) – Start node ID.

  • target (str) – End node ID.

  • edge_types (set[str] | None) – Filter to specific edge types.

  • weight_attr (str | None) – Attribute to use as weight (None = hop count).

Return type:

list[str] | None

Returns:

List of node IDs in path, or None if no path exists.

property underlying_graph: nx.DiGraph[str]

Access the underlying NetworkX DiGraph.

Used by WorldState.from_graph() to unwrap the adapter back to a raw graph, and by topology_monitor for NetworkX-specific algorithms (copy-and-purge resilience testing) that have no protocol equivalent.

Returns:

The raw nx.DiGraph backing this adapter.

update_edge(source, target, edge_type, **attributes)[source]

Partial update of edge attributes.

Parameters:
  • source (str) – Source node ID.

  • target (str) – Target node ID.

  • edge_type (str) – Edge type to match.

  • **attributes (Any) – Attributes to update.

Raises:

KeyError – If edge does not exist or type doesn’t match.

Return type:

None

update_node(node_id, **attributes)[source]

Partial update of node attributes (merge, not replace).

Parameters:
  • node_id (str) – The node identifier to update.

  • **attributes (Any) – Attributes to update.

Raises:

KeyError – If node does not exist.

Return type:

None

classmethod wrap(graph)[source]

Wrap an existing nx.DiGraph in a protocol adapter.

Normalizes edge keys: raw graphs from WorldState.to_graph() store edge_type (from Pydantic model_dump()), but the adapter uses _edge_type. This method adds _edge_type alongside existing edge_type so protocol methods work correctly. Similarly normalizes node_type_node_type for node type discrimination.

The adapter holds a reference to the graph (not a copy), so mutations via protocol methods flow through to the original graph.

Parameters:

graph – An existing NetworkX DiGraph to wrap.

Returns:

A NetworkXAdapter backed by the given graph.

Return type:

NetworkXAdapter

Modules

adapters

Graph adapters for the Babylon simulation engine.

bifurcation_monitor

Bifurcation topology monitor (T032, Phase 10, Feature 033).

community_state_store

Community state store protocol and default implementation (Feature 033).

context

Context models for simulation tick execution.

database

Injectable database connection for the simulation engine.

dialectics

Babylon v2 Dialectic-First Engine.

errors

Transition error types for the simulation engine.

event_bus

Event system for decoupled communication in the simulation.

event_evaluator

Event Template evaluation engine.

factories

Factory functions for creating simulation entities.

field_registry

Field registry for extensible contradiction fields.

formula_registry

Formula registry for hot-swappable mathematical functions.

graph_protocol

Graph Protocol definition for backend-agnostic graph operations.

graph_wrappers

Typed graph wrappers for Spec 040 Discipline 5.

history

History Stack module for the Babylon simulation engine.

history_formatter

History formatter for generating narrative summaries.

hydration

Hydration package for initializing simulation state from reference data.

interceptor

Event Interceptor pattern for Epoch 2 adversarial mechanics.

invariants

Invariant protocol and concrete invariants for the simulation engine.

observer

Observer protocol for simulation state change notifications.

observer_adapter

ProtocolObserverAdapter for thread-safe GUI callback delivery.

observers

Observer implementations for simulation state monitoring.

phase

Phase-typed state for engine tick ordering (Spec 040 Discipline 4).

result

Result type for total functions with explicit error channels.

runner

Async simulation runner for non-blocking UI integration.

scenarios

Factory functions for creating simulation scenarios.

scenarios_wayne_county

Wayne County Organizer scenario — MVP entry point.

services

Service container for dependency injection.

simdb

Simulation database module (ephemeral per-run state).

simulation

Simulation facade class for running multi-tick simulations.

simulation_engine

Simulation engine for the Babylon game loop.

systems

Simulation systems for the Babylon engine.

topology_monitor

Topology Monitor for phase transition detection (Sprint 3.1, 3.3).

trap_detection

Trap detection system for the Wayne County Organizer.