Series 1 — The Behavioral Intelligence OS • Part 3 of 8

The control plane is the kernel of the behavioral AI platform. It reads the registry, builds a dependency-sorted execution plan, runs engines in parallel batches, and collates outputs through the governance wrapper — all without any engine knowing about any other.

What a Control Plane Does That a Model Cannot

A language model processes a prompt and returns a completion. It cannot selectively activate one capability and disable another at runtime without a new model. The control plane can. It treats each engine as a unit of work with declared dependencies, and decides at request time — based on the registry state — what runs.

This means you can:

  • Disable the emotional-regulation engine for a client that has a DPA restriction, without any code change.
  • Run long-horizon-prediction in research-only mode for six months, collecting data without exposing outputs.
  • Add a new negotiation-tactics engine and promote it to active — again, no deployment.

Building the Execution Plan

The plan builder runs once per request (or can be cached on registry change). It uses Kahn's topological sort algorithm to group engines into parallel batches:

// core/control-plane.js

function buildExecutionPlan(activeEngines) {
  // Step 1: build adjacency list
  const inDegree = new Map();
  const graph    = new Map();

  for (const engine of activeEngines) {
    inDegree.set(engine.engineId, 0);
    graph.set(engine.engineId, []);
  }

  for (const engine of activeEngines) {
    for (const dep of engine.dependencies) {
      if (!graph.has(dep)) throw new Error(`Missing dependency: ${dep}`);
      graph.get(dep).push(engine.engineId);
      inDegree.set(engine.engineId, inDegree.get(engine.engineId) + 1);
    }
  }

  // Step 2: Kahn's algorithm — produce topological batches
  const batches = [];
  let   queue   = activeEngines
    .filter(e => inDegree.get(e.engineId) === 0)
    .map(e => e.engineId);

  while (queue.length > 0) {
    batches.push([...queue]);
    const next = [];
    for (const id of queue) {
      for (const dependent of graph.get(id)) {
        const newDeg = inDegree.get(dependent) - 1;
        inDegree.set(dependent, newDeg);
        if (newDeg === 0) next.push(dependent);
      }
    }
    queue = next;
  }

  if (batches.flat().length !== activeEngines.length) {
    throw new Error('Cycle detected in engine dependency graph');
  }

  return batches.map(ids => activeEngines.filter(e => ids.includes(e.engineId)));
}

Running the Pipeline

async function runPipeline(inputSignals, context) {
  const activeEngines = engineRegistry.getActive();
  const plan          = buildExecutionPlan(activeEngines);
  const results       = {};

  for (const batch of plan) {
    // All engines in a batch run in parallel
    const batchOutputs = await Promise.all(
      batch.map(engine => runEngineWithTimeout(engine, inputSignals, context))
    );

    batch.forEach((engine, i) => {
      const output = batchOutputs[i];
      results[engine.engineId] = output;

      // Publish events to the bus for engines in later batches
      for (const topic of engine.emitsTopics) {
        eventBus.emit(topic, { engineId: engine.engineId, output });
      }
    });
  }

  // Research-only engines run separately — their outputs are logged, never returned
  await runResearchEngines(inputSignals, context);

  return governanceWrapper.wrap(results, context);
}

async function runEngineWithTimeout(engine, signals, context) {
  const timeoutMs = computeTimeout(engine.computeCost);
  return Promise.race([
    engine.score(signals, context),
    new Promise((_, reject) =>
      setTimeout(() => reject(new Error(`Engine ${engine.engineId} timed out`)), timeoutMs)
    ),
  ]);
}

Research-Only Mode

Research-only engines run the full scoring pipeline but their outputs are written to a separate audit log and never returned to product adapters. This lets you collect real-world data on a new engine's behaviour before trusting it with live decisions.

async function runResearchEngines(signals, context) {
  const researchEngines = engineRegistry.getResearchOnly();
  for (const engine of researchEngines) {
    try {
      const output = await engine.score(signals, context);
      auditLogger.logResearchOutput(engine.engineId, output, context);
    } catch (err) {
      auditLogger.logResearchError(engine.engineId, err, context);
    }
  }
}

What to Watch For

  • Timeout calibration — Set engine timeouts proportional to computeCost. A cost-8 engine gets 8× the base timeout. Never let one slow engine block the whole pipeline.
  • Event bus backpressure — If a later-batch engine subscribes to a topic that was never emitted (because an earlier engine failed), it must handle the missing event gracefully.
  • Plan caching — The execution plan is deterministic for a given registry state. Cache it and invalidate only when the registry changes. Re-building it on every request is wasteful.