[ engine · a2a · live ]

A2A event bus

An async pub/sub event bus that runs 25 agents end-to-end. Every concrete agent inherits from Agent, declares subscribes and emits, and the runtime fans events out automatically. Forward cascade and closed feedback loops. 25 agents live today (21 active by default).

What it is · how it works · why it matters

[ what ]

An agent-to-agent async event bus. 21 specialized agents publish and subscribe to typed events; one trigger fires the full pipeline end-to-end.

[ how ]

~200 lines of asyncio fan-out (src/traderspace/bus/). Forward cascade: tick.eod → DataReady → … → OrderFilled. Backward feedback: OrderFilled → PolicyRetrained → PM and OrderFilled → PreferenceModelUpdated → PM. Every event is OTel-traced; AuditAgent writes JSONL.

[ why ]

The difference between "an AI tool" and "an AI agent." Fills don't just get logged — they trigger NeMo-RL DPO retrains and preference dataset refreshes that arrive back at the PM. The loop closes, and the system improves itself.

Overview

Lives at src/traderspace/bus/. Four files:

filerole
bus.pyThe EventBus class. publish · subscribe · register_agent · history · agent_registry. ~200 lines.
agent.pyBase Agent class. Subclasses set name · subscribes · emits and implement async handle(payload, event_type).
registry.pyIdempotent register_default_agents() — wires all 15 concrete agents on startup.
agents/The 15 concrete agents: data, feature_engineering, research × 3, signal, portfolio_optimization, portfolio_construction, compliance, pm_execution × 3, feedback × 3.

Zero external deps. Pure asyncio. The Observability page renders the live event log + topology directly off this.

Forward cascade

One trigger fires the whole pipeline:

POST /api/bus/trigger
{ "event_type": "Scheduler.tick.eod", "payload": { "universe": ["SPY","QQQ","NVDA",...] } }

→ DataAgent              emits DataReady             (yfinance bars pulled)
→ FeatureEngineeringAgent emits FeaturesReady        (mom_12_1, ret_21d, ann_vol per symbol)
→ TechnicalAgent         emits ResearchComplete ┐
→ FundamentalAgent       emits ResearchComplete ├─ fan-out parallel
→ SentimentAgent         emits ResearchComplete ┘
→ SignalAgent            emits SignalProposed       (waits for all 3 views, fuses)
→ PortfolioOptimizationAgent emits RebalanceProposed   (cuFOLIO solve, 5k scen, CVaR α=0.95)
→ PortfolioConstructionAgent emits RebalanceConstructed (whole-share rounding using broker positions)
→ ComplianceBusAgent     emits RebalanceCleared OR RebalanceBlocked
→ PortfolioManagerBusAgent emits RebalanceApproved   (AUTO_APPROVE=True in demo)
→ ExecutionBusAgent      emits OrderPlaced           (real broker call)
→ LiveMonitorAgent       emits OrderFilled           (republished for sim broker)

~30 events across 25 agents in ~5 seconds end-to-end on GB10.

Feedback loops (the unique part)

Every OrderFilled kicks two backward-facing agents:

OrderFilled ─→ NemoRLFeedbackAgent
                 counter++ ;; if count >= 10:
                   reset counter
                   kick off PPO training in a background thread
                   → emits PolicyRetrained { run_id, fills_accumulated, started_at }
                   → PortfolioManagerBusAgent receives the new policy id

OrderFilled ─→ PreferenceLearningAgent
                 extract_all() ;; build_dpo_dataset() ;; status()
                 → emits PreferenceModelUpdated { n_pairs, ready_to_train, blocking }
                 → PortfolioManagerBusAgent receives the new fingerprint

every meaningful event ─→ AuditAgent
                          appends to data/audit/bus_events.jsonl (replayable)

This is how the loop closes. Fills feed back to upstream agents — they kick the policy retrain and the preference dataset refresh, and the results arrive back at the PM agent so future decisions reflect the latest behaviour. Fills feed the policy retrain and the preference dataset refresh; the results arrive back at the PM agent so future decisions reflect the latest behaviour.

How to use it

Fire an event

curl -X POST http://127.0.0.1:8015/api/bus/trigger \
     -H 'content-type: application/json' \
     -d '{"event_type":"Scheduler.tick.eod","payload":{"universe":["SPY","QQQ","NVDA"]}}'

Watch what happens

curl 'http://127.0.0.1:8015/api/bus/events?limit=50' | jq .

Inspect the topology

curl 'http://127.0.0.1:8015/api/bus/agents' | jq .

Or — easier — open Observability and click ▶ Trigger end-to-end.

How to interpret the event log

Each event row has {ts, event_type, source_agent, payload}. Two patterns to look for:

Modifying — adding an agent

Add an agent in three steps:

  1. Create src/traderspace/bus/agents/my_agent.py with a class that inherits Agent, sets name · subscribes · emits, and implements async handle(payload, event_type).
  2. Register it in src/traderspace/bus/registry.py — add to the agents list in register_default_agents().
  3. Add its identity to src/traderspace/agents/identities.py and run PYTHONPATH=src python scripts/gen_skills.py so the docs + skill card regenerate.

Pattern: aggregator agent (wait-for-all)

class MyJoinAgent(Agent):
    name = "MyJoinAgent"
    subscribes = ("ResearchComplete",)
    emits = ("MyJoinComplete",)
    _buffer = {}

    async def handle(self, payload, event_type):
        key = payload["sleeve_id"]
        bucket = self._buffer.setdefault(key, [])
        bucket.append(payload)
        if len(bucket) < 3:  # wait for fundamental + technical + sentiment
            return {"waiting": len(bucket)}
        # all three arrived
        merged = self._merge(bucket)
        del self._buffer[key]
        await self.publish("MyJoinComplete", merged)
        return {"emitted": True}

Guarantees

REST surface

VerbPathPurpose
GET/api/bus/agentsRegistry topology. Returns [{name, subscribes, emits, status}, …].
GET/api/bus/events?limit=N&event_type=...Ring-buffered recent event log.
POST/api/bus/triggerFire an event. Body: {event_type, payload}.

See also

NVTrader v0.1.18 · docs ·⚠ Not financial advice ·Docs home ·App