A2A event bus
An async pub/sub event bus that runs 25 agents end-to-end. Every concrete agent inherits from Agent, declares subscribes and emits, and the runtime fans events out automatically. Forward cascade and closed feedback loops. 25 agents live today (21 active by default).
What it is · how it works · why it matters
An agent-to-agent async event bus. 21 specialized agents publish and subscribe to typed events; one trigger fires the full pipeline end-to-end.
~200 lines of asyncio fan-out (src/traderspace/bus/). Forward cascade: tick.eod → DataReady → … → OrderFilled. Backward feedback: OrderFilled → PolicyRetrained → PM and OrderFilled → PreferenceModelUpdated → PM. Every event is OTel-traced; AuditAgent writes JSONL.
The difference between "an AI tool" and "an AI agent." Fills don't just get logged — they trigger NeMo-RL DPO retrains and preference dataset refreshes that arrive back at the PM. The loop closes, and the system improves itself.
Overview
Lives at src/traderspace/bus/. Four files:
| file | role |
|---|---|
bus.py | The EventBus class. publish · subscribe · register_agent · history · agent_registry. ~200 lines. |
agent.py | Base Agent class. Subclasses set name · subscribes · emits and implement async handle(payload, event_type). |
registry.py | Idempotent register_default_agents() — wires all 15 concrete agents on startup. |
agents/ | The 15 concrete agents: data, feature_engineering, research × 3, signal, portfolio_optimization, portfolio_construction, compliance, pm_execution × 3, feedback × 3. |
Zero external deps. Pure asyncio. The Observability page renders the live event log + topology directly off this.
Forward cascade
One trigger fires the whole pipeline:
POST /api/bus/trigger
{ "event_type": "Scheduler.tick.eod", "payload": { "universe": ["SPY","QQQ","NVDA",...] } }
→ DataAgent emits DataReady (yfinance bars pulled)
→ FeatureEngineeringAgent emits FeaturesReady (mom_12_1, ret_21d, ann_vol per symbol)
→ TechnicalAgent emits ResearchComplete ┐
→ FundamentalAgent emits ResearchComplete ├─ fan-out parallel
→ SentimentAgent emits ResearchComplete ┘
→ SignalAgent emits SignalProposed (waits for all 3 views, fuses)
→ PortfolioOptimizationAgent emits RebalanceProposed (cuFOLIO solve, 5k scen, CVaR α=0.95)
→ PortfolioConstructionAgent emits RebalanceConstructed (whole-share rounding using broker positions)
→ ComplianceBusAgent emits RebalanceCleared OR RebalanceBlocked
→ PortfolioManagerBusAgent emits RebalanceApproved (AUTO_APPROVE=True in demo)
→ ExecutionBusAgent emits OrderPlaced (real broker call)
→ LiveMonitorAgent emits OrderFilled (republished for sim broker)
~30 events across 25 agents in ~5 seconds end-to-end on GB10.
Feedback loops (the unique part)
Every OrderFilled kicks two backward-facing agents:
OrderFilled ─→ NemoRLFeedbackAgent
counter++ ;; if count >= 10:
reset counter
kick off PPO training in a background thread
→ emits PolicyRetrained { run_id, fills_accumulated, started_at }
→ PortfolioManagerBusAgent receives the new policy id
OrderFilled ─→ PreferenceLearningAgent
extract_all() ;; build_dpo_dataset() ;; status()
→ emits PreferenceModelUpdated { n_pairs, ready_to_train, blocking }
→ PortfolioManagerBusAgent receives the new fingerprint
every meaningful event ─→ AuditAgent
appends to data/audit/bus_events.jsonl (replayable)
This is how the loop closes. Fills feed back to upstream agents — they kick the policy retrain and the preference dataset refresh, and the results arrive back at the PM agent so future decisions reflect the latest behaviour. Fills feed the policy retrain and the preference dataset refresh; the results arrive back at the PM agent so future decisions reflect the latest behaviour.
How to use it
Fire an event
curl -X POST http://127.0.0.1:8015/api/bus/trigger \
-H 'content-type: application/json' \
-d '{"event_type":"Scheduler.tick.eod","payload":{"universe":["SPY","QQQ","NVDA"]}}'
Watch what happens
curl 'http://127.0.0.1:8015/api/bus/events?limit=50' | jq .
Inspect the topology
curl 'http://127.0.0.1:8015/api/bus/agents' | jq .
Or — easier — open Observability and click ▶ Trigger end-to-end.
How to interpret the event log
Each event row has {ts, event_type, source_agent, payload}. Two patterns to look for:
- Fan-out: three
ResearchCompleteevents in quick succession with differentsource_agent(Technical/Fundamental/Sentiment) — research agents ran in parallel. - Join: one
SignalProposedevent a moment after the threeResearchCompletes — SignalAgent waited for all three views before fusing. - Loop closure: a
PolicyRetrainedorPreferenceModelUpdatedevent some time after anOrderFilled— the feedback agents fired. - Block: a
RebalanceBlockedinstead ofRebalanceCleared— the compliance agent vetoed. Read the payload'sreasonslist.
Modifying — adding an agent
Add an agent in three steps:
- Create
src/traderspace/bus/agents/my_agent.pywith a class that inheritsAgent, setsname · subscribes · emits, and implementsasync handle(payload, event_type). - Register it in
src/traderspace/bus/registry.py— add to theagentslist inregister_default_agents(). - Add its identity to
src/traderspace/agents/identities.pyand runPYTHONPATH=src python scripts/gen_skills.pyso the docs + skill card regenerate.
Pattern: aggregator agent (wait-for-all)
class MyJoinAgent(Agent):
name = "MyJoinAgent"
subscribes = ("ResearchComplete",)
emits = ("MyJoinComplete",)
_buffer = {}
async def handle(self, payload, event_type):
key = payload["sleeve_id"]
bucket = self._buffer.setdefault(key, [])
bucket.append(payload)
if len(bucket) < 3: # wait for fundamental + technical + sentiment
return {"waiting": len(bucket)}
# all three arrived
merged = self._merge(bucket)
del self._buffer[key]
await self.publish("MyJoinComplete", merged)
return {"emitted": True}
Guarantees
- At-least-once delivery within the process. The bus iterates subscribers; each invocation is awaited individually.
- Error isolation — one handler raising does not stop fan-out; errors land in the trace ledger with the agent name attached.
- Ring-buffered history — last 1000 events available via
history(); older events are kept only in the audit JSONL. - OTel-traced — both
bus.publishand per-handlerbus.handlerspans land via NAT. - Not durable — the bus state is in-memory. If the process restarts, in-flight events are lost. Audit JSONL is the durable source of truth for what fired.
REST surface
| Verb | Path | Purpose |
|---|---|---|
| GET | /api/bus/agents | Registry topology. Returns [{name, subscribes, emits, status}, …]. |
| GET | /api/bus/events?limit=N&event_type=... | Ring-buffered recent event log. |
| POST | /api/bus/trigger | Fire an event. Body: {event_type, payload}. |
See also
- Event vocabulary — every event type the bus speaks, with payload schemas.
- Agent roster — all 30 agents + bus-status.
- Observability page — UI for the bus.