feat(client): bounded notification buffer + broadcast (#5) #23

Merged
charles merged 1 commit from feat/5-notification-buffer into main 2026-04-11 18:42:39 +00:00
Owner

Closes #5. Stacked on #22.

Summary

Adds the persistent notification storage that wait_for / wait_for_any / collect_events will draw from in #6 and #7. Notifications received between calls are now retained instead of being dropped.

What lands here

  • Notification { method, params } record.
  • NotificationStore — bounded VecDeque with drop-oldest overflow policy. Tracks dropped_total and rate-limits a stderr warning to every 100 drops (per spec §4.3) to avoid log spam.
  • drain_first(method, predicate) — removes and returns the first matching notification. Building block for wait_for.
  • Inner.notifications: Mutex<NotificationStore> + Inner.notify_tx: broadcast::Sender<Notification>.
  • read_loop classifies incoming frames: id-bearing → existing oneshot routing; method-bearing (no id) → push into the store and fan out via broadcast.

New configurable surface

  • ClientOptions { buffer_size, call_timeout, connect_timeout } — defaults match the existing constants (1000 / 30 s / 10 s).
  • RpcClient::connect_with_options(url, token, opts)connect() now delegates to it with ClientOptions::default().

New public observability

  • buffered_notification_count() — how many records are currently held.
  • dropped_notification_count() — cumulative drops since connect.

Internal helpers (pub(crate), allow(dead_code) until #6 wires them up)

  • try_drain_notification(method, predicate)
  • subscribe_notifications() → fresh broadcast::Receiver

Checklist (from issue #5)

  • Reader classifies messages by id presence
  • Bounded VecDeque<Notification> with configurable capacity (default 1000)
  • Overflow drops the oldest notification
  • Drop policy logged to stderr, rate-limited to once per 100 drops
  • Broadcast channel (tokio::sync::broadcast) fans out new notifications to wait_for subscribers
  • Internal API drains matching items (drain_first + try_drain_notification)
  • Reader exits cleanly on Close / transport error / EOF (already done in #4, unchanged)
  • Buffer unit-tested in isolation: fill past capacity, verify oldest dropped, verify drain matches predicate

Test plan (7 new tests in src/client.rs)

  • store_push_under_capacity_keeps_all
  • store_push_overflow_drops_oldest
  • store_drain_first_returns_match_and_removes_it
  • store_drain_first_skips_method_mismatch
  • notification_is_buffered_when_server_emits_one — extends the test echo server with a notify method that emits N notifications then responds; asserts the notification is observed and drained
  • notification_buffer_overflows_drop_oldest — cap=3, server emits 5; asserts buffered_notification_count == 3 and dropped_notification_count == 2
  • broadcast_subscriber_sees_new_notificationsubscribe_notifications().recv() fires within 500 ms of the server emitting a notification
  • just qa green locally — 34/34 unit tests
  • CI green on Forgejo

Notes for the reviewer

  • The #[allow(dead_code)] on try_drain_notification and subscribe_notifications will be removed in #6 when wait_for calls them.
  • broadcast::channel capacity is opts.buffer_size.max(16). The persistent buffer is the source of truth; the broadcast channel is purely a wake-up signal for waiters.
  • Echo server's notify method takes {event, data, count?} so the same fixture can drive both single-notification and overflow tests without spawning multiple servers.
  • The drop warning is opinionated and not configurable in v0.1, per the issue's "Notes" section.
Closes #5. **Stacked on #22.** ## Summary Adds the persistent notification storage that `wait_for` / `wait_for_any` / `collect_events` will draw from in #6 and #7. Notifications received between calls are now retained instead of being dropped. ### What lands here - **`Notification { method, params }`** record. - **`NotificationStore`** — bounded `VecDeque` with drop-oldest overflow policy. Tracks `dropped_total` and rate-limits a stderr warning to every 100 drops (per spec §4.3) to avoid log spam. - **`drain_first(method, predicate)`** — removes and returns the first matching notification. Building block for `wait_for`. - **`Inner.notifications: Mutex<NotificationStore>`** + **`Inner.notify_tx: broadcast::Sender<Notification>`**. - **`read_loop` classifies** incoming frames: id-bearing → existing oneshot routing; method-bearing (no id) → push into the store **and** fan out via broadcast. ### New configurable surface - `ClientOptions { buffer_size, call_timeout, connect_timeout }` — defaults match the existing constants (1000 / 30 s / 10 s). - `RpcClient::connect_with_options(url, token, opts)` — `connect()` now delegates to it with `ClientOptions::default()`. ### New public observability - `buffered_notification_count()` — how many records are currently held. - `dropped_notification_count()` — cumulative drops since connect. ### Internal helpers (pub(crate), `allow(dead_code)` until #6 wires them up) - `try_drain_notification(method, predicate)` - `subscribe_notifications()` → fresh `broadcast::Receiver` ## Checklist (from issue #5) - [x] Reader classifies messages by `id` presence - [x] Bounded `VecDeque<Notification>` with configurable capacity (default 1000) - [x] Overflow drops the **oldest** notification - [x] Drop policy logged to stderr, rate-limited to once per 100 drops - [x] Broadcast channel (`tokio::sync::broadcast`) fans out new notifications to wait_for subscribers - [x] Internal API drains matching items (`drain_first` + `try_drain_notification`) - [x] Reader exits cleanly on Close / transport error / EOF (already done in #4, unchanged) - [x] Buffer unit-tested in isolation: fill past capacity, verify oldest dropped, verify drain matches predicate ## Test plan (7 new tests in `src/client.rs`) - [x] `store_push_under_capacity_keeps_all` - [x] `store_push_overflow_drops_oldest` - [x] `store_drain_first_returns_match_and_removes_it` - [x] `store_drain_first_skips_method_mismatch` - [x] `notification_is_buffered_when_server_emits_one` — extends the test echo server with a `notify` method that emits N notifications then responds; asserts the notification is observed and drained - [x] `notification_buffer_overflows_drop_oldest` — cap=3, server emits 5; asserts `buffered_notification_count == 3` and `dropped_notification_count == 2` - [x] `broadcast_subscriber_sees_new_notification` — `subscribe_notifications().recv()` fires within 500 ms of the server emitting a notification - [x] `just qa` green locally — 34/34 unit tests - [ ] CI green on Forgejo ## Notes for the reviewer - The `#[allow(dead_code)]` on `try_drain_notification` and `subscribe_notifications` will be removed in #6 when `wait_for` calls them. - `broadcast::channel` capacity is `opts.buffer_size.max(16)`. The persistent buffer is the source of truth; the broadcast channel is purely a wake-up signal for waiters. - Echo server's `notify` method takes `{event, data, count?}` so the same fixture can drive both single-notification and overflow tests without spawning multiple servers. - The drop warning is **opinionated and not configurable in v0.1**, per the issue's "Notes" section.
Adds the persistent notification storage that wait_for / wait_for_any /
collect_events will draw from in #6 and #7.

Buffer
- New Notification { method, params } record.
- New NotificationStore: bounded VecDeque with drop-oldest overflow
  policy. Tracks dropped_total and rate-limits a stderr warning to
  every 100 drops to avoid log spam.
- drain_first(method, predicate) removes and returns the first matching
  notification — the building block for #6's wait_for.

Inner client extensions
- Inner gains `notifications: Mutex<NotificationStore>` and
  `notify_tx: broadcast::Sender<Notification>`.
- read_loop now classifies frames: id-bearing → existing oneshot
  routing; method-bearing (no id) → push into store + fan out via
  broadcast. Unknown frames still drop silently.

Configurable client options
- New ClientOptions struct: buffer_size (default 1000),
  call_timeout (30 s), connect_timeout (10 s).
- New connect_with_options(url, token, opts) constructor; the existing
  connect() now delegates to it with ClientOptions::default(). The
  default 1000-entry buffer matches spec §4.3.

Public observability
- buffered_notification_count(): how many records are currently held.
- dropped_notification_count(): cumulative drops since connect.

Internal helpers (pub(crate), allow(dead_code) until #6 wires them up)
- try_drain_notification(method, predicate) → wraps store.drain_first
- subscribe_notifications() → returns a fresh broadcast::Receiver

Tests (7 new in src/client.rs)
- store_push_under_capacity_keeps_all
- store_push_overflow_drops_oldest
- store_drain_first_returns_match_and_removes_it
- store_drain_first_skips_method_mismatch
- notification_is_buffered_when_server_emits_one (echo server gains
  a "notify" method that emits N notifications then responds)
- notification_buffer_overflows_drop_oldest (cap=3, server sends 5)
- broadcast_subscriber_sees_new_notification

just qa green: 34/34 unit tests, fmt and clippy -D warnings clean.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
charles left a comment

Review — bounded notification buffer + broadcast (#5)

Double stockage (buffer persistant + broadcast)

La conception est correcte : le broadcast sert à waker les waiters, le NotificationStore est la source de vérité. La souscription au broadcast avant le scan du buffer (implémenté en #6) ferme la race window. Bon design.

NotificationStore::push — throttling du warning

if self.drops_since_warn >= NOTIFICATION_DROP_WARN_EVERY {
    eprintln!("...dropped {} so far...", self.dropped_total, ...);
    self.drops_since_warn = 0;
}

Le premier warning se déclenche au 100ème drop, puis tous les 100 drops supplémentaires — cohérent et intentionnel. Mais dropped_total inclut les drops courants depuis le warning précédent, donc le message est correct.

ClientOptions

Bon ajout — permet de customiser le buffer sans toucher à l'API de connexion de base. connect_with_options garde connect simple pour le cas courant.

Minor

  • #[allow(dead_code)] sur drain_first est approprié ici puisque wait_for arrive en #6. Sera retiré proprement dans la PR suivante.

Tests

  • Tests unitaires sur NotificationStore (push, overflow, drain_first) — excellents, vérifient les invariants internes.
  • Tests d'intégration avec le broadcast — couvrent le cas de buffer overflow et le subscriber en temps réel.

Aucun bloquant.

## Review — bounded notification buffer + broadcast (#5) ### Double stockage (buffer persistant + broadcast) La conception est correcte : le broadcast sert à waker les waiters, le `NotificationStore` est la source de vérité. La souscription au broadcast *avant* le scan du buffer (implémenté en #6) ferme la race window. Bon design. ### `NotificationStore::push` — throttling du warning ```rust if self.drops_since_warn >= NOTIFICATION_DROP_WARN_EVERY { eprintln!("...dropped {} so far...", self.dropped_total, ...); self.drops_since_warn = 0; } ``` Le premier warning se déclenche au 100ème drop, puis tous les 100 drops supplémentaires — cohérent et intentionnel. Mais `dropped_total` inclut les drops courants depuis le warning précédent, donc le message est correct. ### `ClientOptions` Bon ajout — permet de customiser le buffer sans toucher à l'API de connexion de base. `connect_with_options` garde `connect` simple pour le cas courant. ### Minor - `#[allow(dead_code)]` sur `drain_first` est approprié ici puisque `wait_for` arrive en #6. Sera retiré proprement dans la PR suivante. ### Tests - Tests unitaires sur `NotificationStore` (push, overflow, drain_first) — excellents, vérifient les invariants internes. - Tests d'intégration avec le broadcast — couvrent le cas de buffer overflow et le subscriber en temps réel. Aucun bloquant.
charles left a comment

Pas de bloquant — design double-stockage (buffer + broadcast) bien pensé, bonne couverture unitaire.

✅ Pas de bloquant — design double-stockage (buffer + broadcast) bien pensé, bonne couverture unitaire.
charles changed target branch from feat/4-client-connect-call to main 2026-04-11 18:42:31 +00:00
charles deleted branch feat/5-notification-buffer 2026-04-11 18:42:39 +00:00
Sign in to join this conversation.
No description provided.