feat(client): wait_for_any, collect_events, call_and_wait (#7) #25

Merged
charles merged 1 commit from feat/7-multi-event-helpers into main 2026-04-11 18:42:56 +00:00
Owner

Closes #7. Stacked on #24.

Summary

Three convenience helpers on top of wait_for that cover the most common test patterns.

wait_for_any(methods, predicate, timeout)

  • Returns (matched_method: String, params: Value).
  • Predicate is Fn(&str, &Value) -> bool so it can branch on the matched method.
  • Same buffer-then-broadcast strategy as wait_for. New drain_first_any / try_drain_notification_any helpers handle the multi-method scan atomically.
  • Timeout error event string is methods.join("|").

collect_events(method, duration)

  • Drains all currently-buffered matches first, then loops on the broadcast channel until duration elapses, draining matches on each wake-up.
  • Never errors on timeout — returns whatever was collected (possibly empty).
  • Useful for verifying event streams (0% → 100% progress).

call_and_wait(method, params, event_method, predicate, timeout)

  • Subscribes to the broadcast BEFORE sending the call, so a notification emitted as a side effect of the call cannot be missed even if it arrives before the call's own response.
  • Returns (call_result, event_params).
  • If the call itself fails, the listener is dropped and the call error is propagated.

Checklist (from issue #7)

wait_for_any

  • Signature (methods, F: Fn(&str, &Value) -> bool, timeout) -> (String, Value)
  • Buffer-then-broadcast strategy
  • Timeout: event = methods.join("|")

collect_events

  • Signature (method, duration) -> Vec<Value>
  • Drains existing matches first
  • Never errors on timeout

call_and_wait

  • Listener armed BEFORE the call is sent
  • Returns (call_result, event_params)
  • Call failure propagates

General

  • All three methods take &self
  • Doc tests / unit tests cover each

Test plan (6 new tests)

  • wait_for_any_returns_first_matching_method
  • wait_for_any_times_out_with_combined_event_string
  • collect_events_drains_existing_buffer — server emits 4, assert len == 4
  • collect_events_returns_empty_on_no_match_without_error
  • call_and_wait_resolves_with_event — uses the notify-then-respond echo handler that exercises the race-prone path
  • call_and_wait_propagates_call_error
  • just qa green locally — 46/46 unit tests
  • CI green on Forgejo

Notes for the reviewer

  • I unified the predicate-types to closures with Fn (not FnOnce / FnMut) since they may be called many times during a buffer scan or broadcast loop.
  • collect_events could be made smarter by listening on the broadcast in a single recv_many style, but the simple loop is sufficient for the spec and easier to reason about.
  • call_and_wait deliberately calls self.call(...) directly rather than building the request inline so it inherits the same id-routing, RpcError mapping, and timeout behaviour. The timeout parameter on call_and_wait is the event wait, not the call wait — the call uses its own default 30 s.
  • The notify method on the test echo server (added in #5) was the right primitive: emits notification then responds, which is precisely the case call_and_wait exists to handle.
Closes #7. **Stacked on #24.** ## Summary Three convenience helpers on top of `wait_for` that cover the most common test patterns. ### `wait_for_any(methods, predicate, timeout)` - Returns `(matched_method: String, params: Value)`. - Predicate is `Fn(&str, &Value) -> bool` so it can branch on the matched method. - Same buffer-then-broadcast strategy as `wait_for`. New `drain_first_any` / `try_drain_notification_any` helpers handle the multi-method scan atomically. - Timeout error event string is `methods.join("|")`. ### `collect_events(method, duration)` - Drains all currently-buffered matches first, then loops on the broadcast channel until `duration` elapses, draining matches on each wake-up. - **Never errors on timeout** — returns whatever was collected (possibly empty). - Useful for verifying event streams (0% → 100% progress). ### `call_and_wait(method, params, event_method, predicate, timeout)` - **Subscribes to the broadcast BEFORE sending the call**, so a notification emitted as a side effect of the call cannot be missed even if it arrives before the call's own response. - Returns `(call_result, event_params)`. - If the call itself fails, the listener is dropped and the call error is propagated. ## Checklist (from issue #7) ### wait_for_any - [x] Signature `(methods, F: Fn(&str, &Value) -> bool, timeout) -> (String, Value)` - [x] Buffer-then-broadcast strategy - [x] Timeout: `event = methods.join("|")` ### collect_events - [x] Signature `(method, duration) -> Vec<Value>` - [x] Drains existing matches first - [x] Never errors on timeout ### call_and_wait - [x] Listener armed BEFORE the call is sent - [x] Returns `(call_result, event_params)` - [x] Call failure propagates ### General - [x] All three methods take `&self` - [x] Doc tests / unit tests cover each ## Test plan (6 new tests) - [x] `wait_for_any_returns_first_matching_method` - [x] `wait_for_any_times_out_with_combined_event_string` - [x] `collect_events_drains_existing_buffer` — server emits 4, assert `len == 4` - [x] `collect_events_returns_empty_on_no_match_without_error` - [x] `call_and_wait_resolves_with_event` — uses the `notify`-then-respond echo handler that exercises the race-prone path - [x] `call_and_wait_propagates_call_error` - [x] `just qa` green locally — 46/46 unit tests - [ ] CI green on Forgejo ## Notes for the reviewer - I unified the predicate-types to closures with `Fn` (not `FnOnce` / `FnMut`) since they may be called many times during a buffer scan or broadcast loop. - `collect_events` could be made smarter by listening on the broadcast in a single `recv_many` style, but the simple loop is sufficient for the spec and easier to reason about. - `call_and_wait` deliberately calls `self.call(...)` directly rather than building the request inline so it inherits the same id-routing, RpcError mapping, and timeout behaviour. The `timeout` parameter on `call_and_wait` is the **event** wait, not the call wait — the call uses its own default 30 s. - The `notify` method on the test echo server (added in #5) was the right primitive: emits notification then responds, which is precisely the case `call_and_wait` exists to handle.
Implements ticket #7 — multi-event helpers on top of #6's wait_for.

NotificationStore::drain_first_any
- Multi-method version of drain_first; matches the first record whose
  method is in the list and that satisfies F(method, params).

RpcClient::try_drain_notification_any
- pub(crate) wrapper for tests + wait_for_any.

wait_for_any(methods, predicate, timeout) -> (String, Value)
- Same buffer-then-broadcast strategy as wait_for, but the predicate
  signature is Fn(&str, &Value) -> bool so it can branch on the method.
- Returns (matched_method, params) on success.
- On timeout, the error event string is `methods.join("|")`.

collect_events(method, duration) -> Vec<Value>
- Drains all currently-buffered notifications matching `method`, then
  loops on the broadcast channel until `duration` elapses, draining
  matches from the buffer on each wake-up.
- Never errors on timeout — returns whatever was collected.
- Useful for verifying event streams (e.g. progress 0% → 100%).

call_and_wait(method, params, event_method, predicate, timeout)
- Race-free: subscribes to the broadcast BEFORE sending the call, so a
  notification emitted as a side effect of the call cannot be missed
  even if it arrives before the call's own response.
- Returns (call_result, event_params).
- If the call itself fails, the listener is dropped and the call error
  is propagated.

Tests (6 new in src/client.rs)
- wait_for_any_returns_first_matching_method
- wait_for_any_times_out_with_combined_event_string
- collect_events_drains_existing_buffer (server emits 4, asserts len=4)
- collect_events_returns_empty_on_no_match_without_error
- call_and_wait_resolves_with_event (uses the notify-then-respond
  echo handler — exercises the race-prone path)
- call_and_wait_propagates_call_error

just qa green: 46/46 unit tests, fmt and clippy -D warnings clean.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
charles left a comment

Review — wait_for_any, collect_events, call_and_wait (#7)

call_and_wait — vraiment race-free

let mut rx = self.subscribe_notifications();  // armé avant l'appel
let call_result = self.call(method, params).await?;

L'abonnement avant l'envoi de l'appel garantit qu'une notification déclenchée en réponse à l'appel ne peut pas être manquée même si elle arrive avant la réponse RPC. C'est le cas d'usage typique et c'est géré correctement.

collect_events — drain greedy après chaque broadcast

Ok(Ok(notification)) => {
    if notification.method != method { continue; }
    while let Some(notif) = self.try_drain_notification(method, |_| true) {
        collected.push(notif.params);
    }
}

Le drain est greedy après chaque signal broadcast — correct. Les notifications arrivant pendant le drain seront dans le buffer au prochain tour de boucle. Comportement sain.

wait_for_any — timeout event string

event: methods.join("|"),

Lisible dans les messages d'erreur (Timeout { event: "a|b", ... }). Bon choix.

Minor : drain_first_any — double itération

L'implémentation cherche la position puis appelle remove(pos). Pour un VecDeque c'est O(n) dans les deux cas, donc pas de gain à une implémentation combinée. Correct.

Tests

  • call_and_wait_propagates_call_error — cas important vérifié.
  • collect_events_returns_empty_on_no_match_without_error — vérifie que collect ne panic ni n'erreur.
  • Bonne couverture.

Aucun bloquant.

## Review — wait_for_any, collect_events, call_and_wait (#7) ### `call_and_wait` — vraiment race-free ```rust let mut rx = self.subscribe_notifications(); // armé avant l'appel let call_result = self.call(method, params).await?; ``` L'abonnement avant l'envoi de l'appel garantit qu'une notification déclenchée *en réponse* à l'appel ne peut pas être manquée même si elle arrive avant la réponse RPC. C'est le cas d'usage typique et c'est géré correctement. ### `collect_events` — drain greedy après chaque broadcast ```rust Ok(Ok(notification)) => { if notification.method != method { continue; } while let Some(notif) = self.try_drain_notification(method, |_| true) { collected.push(notif.params); } } ``` Le drain est greedy après chaque signal broadcast — correct. Les notifications arrivant *pendant* le drain seront dans le buffer au prochain tour de boucle. Comportement sain. ### `wait_for_any` — timeout event string ```rust event: methods.join("|"), ``` Lisible dans les messages d'erreur (`Timeout { event: "a|b", ... }`). Bon choix. ### Minor : `drain_first_any` — double itération L'implémentation cherche la position puis appelle `remove(pos)`. Pour un `VecDeque` c'est O(n) dans les deux cas, donc pas de gain à une implémentation combinée. Correct. ### Tests - `call_and_wait_propagates_call_error` — cas important vérifié. - `collect_events_returns_empty_on_no_match_without_error` — vérifie que collect ne panic ni n'erreur. - Bonne couverture. Aucun bloquant.
charles left a comment

Pas de bloquant — call_and_wait vraiment race-free, API des trois helpers claire et bien testée.

✅ Pas de bloquant — `call_and_wait` vraiment race-free, API des trois helpers claire et bien testée.
charles changed target branch from feat/6-subscribe-wait-for to main 2026-04-11 18:42:47 +00:00
charles deleted branch feat/7-multi-event-helpers 2026-04-11 18:42:56 +00:00
Sign in to join this conversation.
No description provided.