OrchestrationEngine — dispatch loop & reactor framework #7

Open
opened 2026-04-16 11:29:19 +00:00 by claude-desktop · 0 comments
Collaborator

User story

As a backend developer, I want an OrchestrationEngine that receives commands, runs them through the Decider, persists resulting events via the EventStore, updates the in-memory read model, and broadcasts events to reactors and the frontend, so that the entire CQRS pipeline is wired together.

Acceptance criteria

OrchestrationEngine (engine.rs)

  • Struct holds: Arc<EventStore>, Arc<RwLock<OrchestrationReadModel>>, broadcast::Sender<StoredEvent>, Vec<Box<dyn Reactor>>
  • dispatch(&self, command: OrchestrationCommand) -> Result<(), EngineError> that executes the full pipeline:
    1. Read current read model snapshot
    2. Call decide(command, model) (pure, sync)
    3. Persist events via event_store.append_all()
    4. Update in-memory read model via project() fold
    5. Broadcast stored events to all subscribers
  • read_model(&self) -> OrchestrationReadModel — returns a clone of current state
  • EngineError enum wrapping DeciderError and PersistenceError

Reactor trait

  • #[async_trait] trait Reactor: Send + Sync with async fn on_event(&self, event: &StoredEvent, engine: &OrchestrationEngine)
  • Engine iterates reactors on each stored event after broadcast

Startup replay

  • OrchestrationEngine::new() replays all events from EventStore to rebuild the read model before accepting commands
  • After replay, engine is ready to dispatch new commands

Backend command loop (backend::start())

  • pub async fn start(command_rx: mpsc::Receiver<AppCommand>, event_tx: broadcast::Sender<AppEvent>) entry point
  • Loops on command_rx.recv(), routes AppCommand::Orchestration to engine dispatch
  • Wraps StoredEvent into AppEvent::DomainEvent before broadcasting to frontend
  • Routes AppCommand::Terminal to TerminalManager, AppCommand::Git to GitManager (stubs for now)

Tests

  • Integration test: dispatch CreateProject → verify read model updated
  • Integration test: dispatch StartTurn → verify TurnStarted + ThreadStatusChanged events emitted
  • Test: startup replay reconstructs correct read model from persisted events
  • Test: reactor receives events after dispatch

Out of scope

  • Specific reactor implementations (ProviderReactor, CheckpointReactor — separate stories)
  • Terminal/Git command routing logic (stubs only)

References

  • Spec §5.5 (OrchestrationEngine)
  • Spec §14.1 (Bootstrap — backend::start)

Dependencies

  • Blocked by: #4 (decider, projector), #5 (EventStore), #6 (AppCommand, AppEvent)
  • Blocks: #9, #12, #13
  • Branch off: stack on whichever of #5 or #6 lands last; if both open, branch off issue-6-ipc and cherry-pick from issue-5-sqlite
  • Full graph: #21
## User story As a **backend developer**, I want an `OrchestrationEngine` that receives commands, runs them through the Decider, persists resulting events via the EventStore, updates the in-memory read model, and broadcasts events to reactors and the frontend, so that the entire CQRS pipeline is wired together. ## Acceptance criteria ### OrchestrationEngine (`engine.rs`) - [ ] Struct holds: `Arc<EventStore>`, `Arc<RwLock<OrchestrationReadModel>>`, `broadcast::Sender<StoredEvent>`, `Vec<Box<dyn Reactor>>` - [ ] `dispatch(&self, command: OrchestrationCommand) -> Result<(), EngineError>` that executes the full pipeline: 1. Read current read model snapshot 2. Call `decide(command, model)` (pure, sync) 3. Persist events via `event_store.append_all()` 4. Update in-memory read model via `project()` fold 5. Broadcast stored events to all subscribers - [ ] `read_model(&self) -> OrchestrationReadModel` — returns a clone of current state - [ ] `EngineError` enum wrapping `DeciderError` and `PersistenceError` ### Reactor trait - [ ] `#[async_trait] trait Reactor: Send + Sync` with `async fn on_event(&self, event: &StoredEvent, engine: &OrchestrationEngine)` - [ ] Engine iterates reactors on each stored event after broadcast ### Startup replay - [ ] `OrchestrationEngine::new()` replays all events from EventStore to rebuild the read model before accepting commands - [ ] After replay, engine is ready to dispatch new commands ### Backend command loop (`backend::start()`) - [ ] `pub async fn start(command_rx: mpsc::Receiver<AppCommand>, event_tx: broadcast::Sender<AppEvent>)` entry point - [ ] Loops on `command_rx.recv()`, routes `AppCommand::Orchestration` to engine dispatch - [ ] Wraps `StoredEvent` into `AppEvent::DomainEvent` before broadcasting to frontend - [ ] Routes `AppCommand::Terminal` to TerminalManager, `AppCommand::Git` to GitManager (stubs for now) ### Tests - [ ] Integration test: dispatch CreateProject → verify read model updated - [ ] Integration test: dispatch StartTurn → verify TurnStarted + ThreadStatusChanged events emitted - [ ] Test: startup replay reconstructs correct read model from persisted events - [ ] Test: reactor receives events after dispatch ## Out of scope - Specific reactor implementations (ProviderReactor, CheckpointReactor — separate stories) - Terminal/Git command routing logic (stubs only) ## References - Spec §5.5 (OrchestrationEngine) - Spec §14.1 (Bootstrap — backend::start) ## Dependencies - **Blocked by:** #4 (decider, projector), #5 (EventStore), #6 (AppCommand, AppEvent) - **Blocks:** #9, #12, #13 - **Branch off:** stack on whichever of #5 or #6 lands last; if both open, branch off `issue-6-ipc` and cherry-pick from `issue-5-sqlite` - **Full graph:** #21
claude-desktop added this to the v0.1.0 milestone 2026-04-16 11:29:19 +00:00
Sign in to join this conversation.
No description provided.