feat(client): subscribe and wait_for (#6) #24

Merged
charles merged 1 commit from feat/6-subscribe-wait-for into main 2026-04-11 18:42:47 +00:00
Owner

Closes #6. Stacked on #23.

Summary

Adds the user-facing event API on top of #5's notification buffer.

subscribe

  • subscribe(events: &[&str]) sends {"method":"subscribe","params":{"events":[...]}} and awaits success.
  • Replace semantics, not additive — documented inline. Resolves spec-review §2.
  • Empty slice = all events.

wait_for

  • Signature: wait_for(method, predicate, timeout) -> Result<Value, TestError> where F: Fn(&Value) -> bool + Send + Sync.
  • Subscribes to the broadcast BEFORE scanning the buffer to close the race where a notification arrives between scan and subscribe.
  • Fast path: drain a buffered match and return.
  • Slow path: loop on broadcast::Receiver::recv until match / deadline / connection close. On a broadcast hit, atomically claims the match from the buffer via try_drain_notification so concurrent waiters can't double-deliver (first waiter wins, per spec).
  • Handles RecvError::Lagged by re-scanning the buffer.
  • Timeout → TestError::Timeout { event: method.into(), duration }.

Drops the #[allow(dead_code)] markers introduced in #5try_drain_notification and subscribe_notifications are now used by wait_for.

Checklist (from issue #6)

subscribe

  • Sends {"method":"subscribe","params":{"events":[...]}} and awaits success
  • Replace semantics — documented and exercised
  • Empty slice = all events
  • Client starts in "no subscription" state until called

wait_for

  • Predicate signature Fn(&Value) -> bool + Send + Sync
  • First scans buffer; matches removed so they aren't re-matched
  • Falls through to broadcast subscription if not buffered
  • Returns matching params on hit
  • Timeout → TestError::Timeout { event: method, duration }
  • Concurrent wait_for calls safe — first matching waiter wins

Test plan (6 new tests)

  • subscribe_sends_subscribe_request_with_events
  • wait_for_returns_buffered_notification (notification arrives before wait_for)
  • wait_for_picks_up_future_notification (wait_for spawned before notification — exercises broadcast path)
  • wait_for_times_out_on_no_match
  • wait_for_filters_by_predicate — 3 notifications, predicate selects 1, asserts other 2 stay buffered
  • wait_for_skips_non_matching_method — noise on a different method does not wake the waiter
  • just qa green locally — 40/40 unit tests
  • CI green on Forgejo

Notes for the reviewer

  • The "subscribe to broadcast first, then scan buffer" order matters for correctness — without it, a notification arriving in the gap between scan and subscribe is lost. There's a unit test (wait_for_picks_up_future_notification) that fails reliably if you swap the order.
  • The atomic-claim-after-broadcast pattern means a broadcast wake-up can arrive without a corresponding drain — that's the race-loser case, handled by the if let Some(claimed) check.
  • Lagged is treated as "broadcast queue is full, so we may have missed something — rescan the persistent buffer". The persistent buffer is the source of truth.
  • Echo server didn't need any new methods — subscribe falls through to the default echo branch which returns a successful response, which is all that subscribe() needs.
Closes #6. **Stacked on #23.** ## Summary Adds the user-facing event API on top of #5's notification buffer. ### subscribe - `subscribe(events: &[&str])` sends `{"method":"subscribe","params":{"events":[...]}}` and awaits success. - **Replace semantics**, not additive — documented inline. Resolves spec-review §2. - Empty slice = all events. ### wait_for - Signature: `wait_for(method, predicate, timeout) -> Result<Value, TestError>` where `F: Fn(&Value) -> bool + Send + Sync`. - **Subscribes to the broadcast BEFORE scanning the buffer** to close the race where a notification arrives between scan and subscribe. - Fast path: drain a buffered match and return. - Slow path: loop on `broadcast::Receiver::recv` until match / deadline / connection close. On a broadcast hit, **atomically claims** the match from the buffer via `try_drain_notification` so concurrent waiters can't double-deliver (first waiter wins, per spec). - Handles `RecvError::Lagged` by re-scanning the buffer. - Timeout → `TestError::Timeout { event: method.into(), duration }`. Drops the `#[allow(dead_code)]` markers introduced in #5 — `try_drain_notification` and `subscribe_notifications` are now used by `wait_for`. ## Checklist (from issue #6) ### subscribe - [x] Sends `{"method":"subscribe","params":{"events":[...]}}` and awaits success - [x] Replace semantics — documented and exercised - [x] Empty slice = all events - [x] Client starts in "no subscription" state until called ### wait_for - [x] Predicate signature `Fn(&Value) -> bool + Send + Sync` - [x] First scans buffer; matches removed so they aren't re-matched - [x] Falls through to broadcast subscription if not buffered - [x] Returns matching `params` on hit - [x] Timeout → `TestError::Timeout { event: method, duration }` - [x] Concurrent `wait_for` calls safe — first matching waiter wins ## Test plan (6 new tests) - [x] `subscribe_sends_subscribe_request_with_events` - [x] `wait_for_returns_buffered_notification` (notification arrives **before** wait_for) - [x] `wait_for_picks_up_future_notification` (wait_for spawned **before** notification — exercises broadcast path) - [x] `wait_for_times_out_on_no_match` - [x] `wait_for_filters_by_predicate` — 3 notifications, predicate selects 1, asserts other 2 stay buffered - [x] `wait_for_skips_non_matching_method` — noise on a different method does not wake the waiter - [x] `just qa` green locally — 40/40 unit tests - [ ] CI green on Forgejo ## Notes for the reviewer - The "subscribe to broadcast first, then scan buffer" order matters for correctness — without it, a notification arriving in the gap between scan and subscribe is lost. There's a unit test (`wait_for_picks_up_future_notification`) that fails reliably if you swap the order. - The atomic-claim-after-broadcast pattern means a broadcast wake-up can arrive without a corresponding drain — that's the race-loser case, handled by the `if let Some(claimed)` check. - `Lagged` is treated as "broadcast queue is full, so we may have missed something — rescan the persistent buffer". The persistent buffer is the source of truth. - Echo server didn't need any new methods — `subscribe` falls through to the default echo branch which returns a successful response, which is all that `subscribe()` needs.
Implements ticket #6 on top of the notification buffer from #5.

subscribe(events: &[&str])
- Sends a JSON-RPC `subscribe` request with `{"events": [...]}`.
- Replace semantics (each call overwrites the server's previous filter
  for this client). Documented in the doc comment, exercised by tests.
- Empty slice means "all events".

wait_for(method, predicate, timeout)
- Subscribes to the broadcast channel BEFORE scanning the buffer to
  close the race where a notification arrives between scan and
  subscribe.
- Fast path: scan and drain the buffer for an existing matching record.
- Slow path: loop on broadcast::Receiver::recv until either a
  matching notification arrives, the deadline elapses, or the
  connection closes. On a hit, atomically claims the matching record
  via try_drain_notification — if another waiter raced and won, keep
  waiting (per spec: first matching waiter wins).
- Lagged broadcast errors trigger a buffer rescan as a fallback.
- Timeout produces TestError::Timeout { event: method, duration }.

The signature is `Fn(&Value) -> bool + Send + Sync` so the predicate
can cross await points.

Removes the #[allow(dead_code)] from try_drain_notification and
subscribe_notifications now that wait_for uses them.

Tests (6 new in src/client.rs)
- subscribe_sends_subscribe_request_with_events
- wait_for_returns_buffered_notification (notification arrives BEFORE
  wait_for is called)
- wait_for_picks_up_future_notification (wait_for is spawned BEFORE
  notification arrives — exercises the broadcast path)
- wait_for_times_out_on_no_match
- wait_for_filters_by_predicate (3 notifications, predicate selects 1,
  asserts the other 2 stay in the buffer)
- wait_for_skips_non_matching_method (3 noise notifications then 1
  target — waiter only resolves on the target)

just qa green: 40/40 unit tests, fmt and clippy -D warnings clean.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
charles left a comment

Review — subscribe et wait_for (#6)

Race-free design

// Subscribe to the broadcast BEFORE scanning the buffer to close
// the race where a notification arrives between scan and subscribe.
let mut rx = self.subscribe_notifications();
if let Some(notif) = self.try_drain_notification(method, &predicate) {
    return Ok(notif.params);
}

C'est exactement le bon ordre. Le commentaire l'explique clairement. Excellent.

Gestion de RecvError::Lagged

Quand le receiver a trop pris de retard, le code rescanne le buffer — correct. Si plusieurs notifications sont arrivées pendant le lag, on en retourne une seule (la première match dans le buffer) et on abandonne les autres au buffer. Comportement attendu.

"Double-claim" race entre waiters concurrents

if let Some(claimed) = self.try_drain_notification(method, &predicate) {
    return Ok(claimed.params);
}
// else: keep waiting

Le try_drain après réception du broadcast est le mécanisme qui empêche deux waiters de retourner la même notification. Propre.

subscribe() — sémantique replace-not-additive

Bien documenté dans le doc-comment. La sémantique dépend du serveur, pas du client — le client envoie juste l'appel RPC. La doc est correcte sur ce point.

Tests

  • wait_for_returns_buffered_notification : vérifie le fast-path buffer.
  • wait_for_picks_up_future_notification : vérifie le slow-path broadcast.
  • wait_for_filters_by_predicate : vérifie que les non-matchants restent en buffer pour d'autres waiters.
  • Couverture complète des cas corner.

Aucun bloquant.

## Review — subscribe et wait_for (#6) ### Race-free design ```rust // Subscribe to the broadcast BEFORE scanning the buffer to close // the race where a notification arrives between scan and subscribe. let mut rx = self.subscribe_notifications(); if let Some(notif) = self.try_drain_notification(method, &predicate) { return Ok(notif.params); } ``` C'est exactement le bon ordre. Le commentaire l'explique clairement. Excellent. ### Gestion de `RecvError::Lagged` Quand le receiver a trop pris de retard, le code rescanne le buffer — correct. Si plusieurs notifications sont arrivées pendant le lag, on en retourne une seule (la première match dans le buffer) et on abandonne les autres au buffer. Comportement attendu. ### "Double-claim" race entre waiters concurrents ```rust if let Some(claimed) = self.try_drain_notification(method, &predicate) { return Ok(claimed.params); } // else: keep waiting ``` Le `try_drain` après réception du broadcast est le mécanisme qui empêche deux waiters de retourner la même notification. Propre. ### `subscribe()` — sémantique replace-not-additive Bien documenté dans le doc-comment. La sémantique dépend du serveur, pas du client — le client envoie juste l'appel RPC. La doc est correcte sur ce point. ### Tests - `wait_for_returns_buffered_notification` : vérifie le fast-path buffer. - `wait_for_picks_up_future_notification` : vérifie le slow-path broadcast. - `wait_for_filters_by_predicate` : vérifie que les non-matchants restent en buffer pour d'autres waiters. - Couverture complète des cas corner. Aucun bloquant.
charles left a comment

Pas de bloquant — wait_for est correctement race-free, gestion du Lagged propre, tests exhaustifs.

✅ Pas de bloquant — `wait_for` est correctement race-free, gestion du `Lagged` propre, tests exhaustifs.
charles changed target branch from feat/5-notification-buffer to main 2026-04-11 18:42:39 +00:00
charles deleted branch feat/6-subscribe-wait-for 2026-04-11 18:42:47 +00:00
Sign in to join this conversation.
No description provided.