"""FieldDerivativeSystem — System #15 in materialist causality order.
Dialectical Field Topology (Feature 002): Computes spatial derivatives
(gradient on edges, Laplacian on nodes), temporal derivatives (df/dt,
d2f/dt2), principal contradiction identification, and continuity residuals.
Reference: FR-003 (gradient), FR-004 (Laplacian)
Reference: FR-006 (temporal derivatives)
Reference: FR-008 (principal contradiction)
Reference: FR-009 (continuity residuals)
Reference: R-006 (system ordering — position 15)
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
import networkx as nx
from babylon.engine.graph_protocol import GraphProtocol
from babylon.engine.services import ServiceContainer
from babylon.engine.event_bus import Event
from babylon.engine.systems.protocol import ContextType
from babylon.models.enums import EventType
logger = logging.getLogger(__name__)
[docs]
class FieldDerivativeSystem:
"""Compute spatial and temporal derivatives for contradiction fields.
Execution Order: 15 (after ContradictionFieldSystem)
Reads contradiction_fields from nodes (written by System #14),
computes gradients on edges, Laplacian at nodes, and temporal
derivatives from the rolling history in persistent_data.
"""
name = "field_derivative"
[docs]
def step(
self,
graph: nx.DiGraph[str] | GraphProtocol,
services: ServiceContainer,
context: ContextType,
) -> None:
"""Compute all spatial and temporal derivatives.
Args:
graph: Mutable graph (NetworkX or GraphProtocol).
services: ServiceContainer with field_registry.
context: TickContext or dict with tick and persistent_data.
"""
from babylon.engine.graph_protocol import GraphProtocol
if not isinstance(graph, GraphProtocol):
from babylon.engine.adapters.inmemory_adapter import NetworkXAdapter
graph = NetworkXAdapter.wrap(graph)
registry = services.field_registry
if registry is None:
return
field_names = registry.get_field_names()
if not field_names:
return
persistent_data = _get_persistent_data(context)
history: dict[str, dict[str, list[float]]] = persistent_data.get(
"contradiction_history", {}
)
# Extract tick for event emission
tick: int = 0
if hasattr(context, "tick"):
tick = context.tick
elif isinstance(context, dict):
tick_val = context.get("tick", 0)
tick = int(tick_val) if tick_val is not None else 0
# ─── Phase 1: Spatial gradients on edges ────────────────────
_compute_edge_gradients(graph, field_names)
# ─── Phase 2: Laplacian + temporal derivatives on nodes ─────
_compute_node_derivatives(graph, field_names, history)
# ─── Phase 3: Principal contradiction identification ────────
_identify_principal_contradiction(graph, field_names, persistent_data, services, tick)
def _compute_edge_gradients(
graph: GraphProtocol,
field_names: list[str],
) -> None:
"""Compute gradient = f(target) - f(source) on every edge.
Args:
graph: Graph with contradiction_fields on nodes.
field_names: List of field names to compute gradients for.
"""
for edge in graph.query_edges():
src_node = graph.get_node(edge.source_id)
tgt_node = graph.get_node(edge.target_id)
if src_node is None or tgt_node is None:
continue
src_fields: dict[str, float] = src_node.attributes.get("contradiction_fields", {})
tgt_fields: dict[str, float] = tgt_node.attributes.get("contradiction_fields", {})
# Skip edges where nodes don't have field data
if not src_fields or not tgt_fields:
continue
gradients: dict[str, float] = {}
for field_name in field_names:
src_val = src_fields.get(field_name, 0.0)
tgt_val = tgt_fields.get(field_name, 0.0)
gradients[field_name] = tgt_val - src_val
graph.update_edge(
edge.source_id,
edge.target_id,
edge.edge_type,
field_gradients=gradients,
)
def _compute_node_derivatives(
graph: GraphProtocol,
field_names: list[str],
history: dict[str, dict[str, list[float]]],
edge_weight_attr: str | None = None,
) -> None:
"""Compute Laplacian and temporal derivatives at each node.
When ``edge_weight_attr`` is provided, computes a weighted Laplacian:
``sum(w_j * (f(j) - f(i)))`` instead of ``sum(f(j) - f(i))``.
Args:
graph: Graph with contradiction_fields on nodes.
field_names: List of field names.
history: contradiction_history from persistent_data.
edge_weight_attr: Optional edge attribute name for weights.
None = unweighted (all weights 1.0), preserving backward compat.
"""
for node in graph.query_nodes(node_type="social_class"):
node_id = node.id
node_fields: dict[str, float] = node.attributes.get("contradiction_fields", {})
if not node_fields:
continue
node_history = history.get(node_id, {})
# Collect neighbor field values and edge weights
neighbor_fields, edge_weights = _collect_neighbor_fields(
graph,
node_id,
field_names,
edge_weight_attr=edge_weight_attr,
)
field_derivatives: dict[str, dict[str, float | None]] = {}
for field_name in field_names:
my_val = node_fields.get(field_name, 0.0)
# Laplacian: sum_j(w_j * (f(j) - f(i)))
neighbor_vals = neighbor_fields.get(field_name, [])
if neighbor_vals:
laplacian = sum(
w * (nv - my_val) for w, nv in zip(edge_weights, neighbor_vals, strict=True)
)
else:
laplacian = 0.0
if not neighbor_vals:
logger.debug(
"EC-002: Isolated node %s, Laplacian=0.0 for %s",
node_id,
field_name,
)
# Temporal derivatives from history
field_hist = node_history.get(field_name, [])
df_dt: float | None = None
d2f_dt2: float | None = None
if len(field_hist) >= 2:
# df/dt = f(t) - f(t-1)
df_dt = field_hist[-1] - field_hist[-2]
if len(field_hist) >= 3:
# d2f/dt2 = f(t) - 2*f(t-1) + f(t-2)
d2f_dt2 = field_hist[-1] - 2.0 * field_hist[-2] + field_hist[-3]
field_derivatives[field_name] = {
"laplacian": laplacian,
"df_dt": df_dt,
"d2f_dt2": d2f_dt2,
}
graph.update_node(node_id, field_derivatives=field_derivatives)
def _collect_neighbor_fields(
graph: GraphProtocol,
node_id: str,
field_names: list[str],
edge_weight_attr: str | None = None,
) -> tuple[dict[str, list[float]], list[float]]:
"""Collect field values and edge weights from all neighbors of a node.
Considers both incoming and outgoing edges to ensure full
Laplacian computation on the undirected graph structure.
Args:
graph: Graph protocol instance.
node_id: Node to collect neighbors for.
field_names: Field names to collect.
edge_weight_attr: Optional edge attribute name for weights.
None = all weights 1.0 (backward compatible).
Returns:
Tuple of (field_name -> neighbor values list, edge weights list).
Edge weights list is parallel to each field's neighbor values list
(one weight per neighbor, same order).
"""
result: dict[str, list[float]] = {name: [] for name in field_names}
weights: list[float] = []
# Collect unique neighbor IDs and their edge weights from both directions
neighbor_weights: dict[str, float] = {}
for edge in graph.query_edges():
nid: str | None = None
if edge.source_id == node_id:
nid = edge.target_id
elif edge.target_id == node_id:
nid = edge.source_id
if nid is not None and nid not in neighbor_weights:
if edge_weight_attr is not None:
w = float(edge.attributes.get(edge_weight_attr, 1.0))
else:
w = 1.0
neighbor_weights[nid] = w
for nid, w in neighbor_weights.items():
neighbor_node = graph.get_node(nid)
if neighbor_node is None:
continue
n_fields: dict[str, float] = neighbor_node.attributes.get("contradiction_fields", {})
has_fields = False
for field_name in field_names:
if field_name in n_fields:
result[field_name].append(n_fields[field_name])
has_fields = True
if has_fields:
weights.append(w)
return result, weights
def _identify_principal_contradiction(
graph: GraphProtocol,
field_names: list[str],
persistent_data: dict[str, Any],
services: ServiceContainer,
tick: int,
) -> None:
"""Identify the principal contradiction from temporal derivatives.
The principal contradiction is the field with the maximum |df/dt|
across all nodes. Tie-breaking: total magnitude, then exploitation
preferred (EC-004).
Args:
graph: Graph with field_derivatives on nodes.
field_names: Registered field names.
persistent_data: For tracking previous principal.
services: ServiceContainer for event_bus access.
tick: Current tick number.
"""
# Collect max |df/dt| per field across all nodes
field_max_abs_df_dt: dict[str, float] = dict.fromkeys(field_names, 0.0)
field_total_magnitude: dict[str, float] = dict.fromkeys(field_names, 0.0)
for node in graph.query_nodes(node_type="social_class"):
derivs: dict[str, dict[str, float | None]] = node.attributes.get("field_derivatives", {})
for field_name in field_names:
field_deriv = derivs.get(field_name, {})
df_dt = field_deriv.get("df_dt")
if df_dt is not None:
abs_val = abs(df_dt)
if abs_val > field_max_abs_df_dt[field_name]:
field_max_abs_df_dt[field_name] = abs_val
field_total_magnitude[field_name] += abs_val
# Find the field with maximum |df/dt|
principal_field: str | None = None
max_df_dt = 0.0
# Sort by: max |df/dt| desc, total magnitude desc, exploitation preferred
candidates = sorted(
field_names,
key=lambda f: (
field_max_abs_df_dt[f],
field_total_magnitude[f],
1.0 if f == "exploitation" else 0.0,
),
reverse=True,
)
if candidates and field_max_abs_df_dt[candidates[0]] > 0.0:
principal_field = candidates[0]
max_df_dt = field_max_abs_df_dt[candidates[0]]
# Check if principal changed from previous tick
previous_principal: str | None = persistent_data.get("_previous_principal_field")
changed = principal_field != previous_principal
# Write to graph-level attribute
graph.set_graph_attr(
"principal_contradiction",
{
"field_name": principal_field,
"max_abs_df_dt": max_df_dt,
"changed": changed,
},
)
# Emit event if principal changed and we have a real principal
if changed and principal_field is not None:
services.event_bus.publish(
Event(
type=EventType.PRINCIPAL_CONTRADICTION_SHIFT,
tick=tick,
payload={
"previous_field": previous_principal,
"new_field": principal_field,
"max_abs_df_dt": max_df_dt,
},
)
)
# Store for next tick comparison
persistent_data["_previous_principal_field"] = principal_field
def _get_persistent_data(context: ContextType) -> dict[str, Any]:
"""Extract persistent_data from context (TickContext or dict).
Args:
context: TickContext or dict with persistent_data key.
Returns:
Mutable persistent_data dict.
"""
if hasattr(context, "persistent_data"):
result: dict[str, Any] = context.persistent_data
return result
if isinstance(context, dict):
data: dict[str, Any] = context.setdefault("persistent_data", {})
return data
return {}