feat(client): bounded notification buffer + broadcast (#5) #23
No reviewers
Labels
No labels
area:assertions
area:cli
area:client
area:harness
area:meta
area:reporting
area:runner
type:user-story
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
charles/ws-rpc-test!23
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "feat/5-notification-buffer"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Closes #5. Stacked on #22.
Summary
Adds the persistent notification storage that
wait_for/wait_for_any/collect_eventswill draw from in #6 and #7. Notifications received between calls are now retained instead of being dropped.What lands here
Notification { method, params }record.NotificationStore— boundedVecDequewith drop-oldest overflow policy. Tracksdropped_totaland rate-limits a stderr warning to every 100 drops (per spec §4.3) to avoid log spam.drain_first(method, predicate)— removes and returns the first matching notification. Building block forwait_for.Inner.notifications: Mutex<NotificationStore>+Inner.notify_tx: broadcast::Sender<Notification>.read_loopclassifies incoming frames: id-bearing → existing oneshot routing; method-bearing (no id) → push into the store and fan out via broadcast.New configurable surface
ClientOptions { buffer_size, call_timeout, connect_timeout }— defaults match the existing constants (1000 / 30 s / 10 s).RpcClient::connect_with_options(url, token, opts)—connect()now delegates to it withClientOptions::default().New public observability
buffered_notification_count()— how many records are currently held.dropped_notification_count()— cumulative drops since connect.Internal helpers (pub(crate),
allow(dead_code)until #6 wires them up)try_drain_notification(method, predicate)subscribe_notifications()→ freshbroadcast::ReceiverChecklist (from issue #5)
idpresenceVecDeque<Notification>with configurable capacity (default 1000)tokio::sync::broadcast) fans out new notifications to wait_for subscribersdrain_first+try_drain_notification)Test plan (7 new tests in
src/client.rs)store_push_under_capacity_keeps_allstore_push_overflow_drops_oldeststore_drain_first_returns_match_and_removes_itstore_drain_first_skips_method_mismatchnotification_is_buffered_when_server_emits_one— extends the test echo server with anotifymethod that emits N notifications then responds; asserts the notification is observed and drainednotification_buffer_overflows_drop_oldest— cap=3, server emits 5; assertsbuffered_notification_count == 3anddropped_notification_count == 2broadcast_subscriber_sees_new_notification—subscribe_notifications().recv()fires within 500 ms of the server emitting a notificationjust qagreen locally — 34/34 unit testsNotes for the reviewer
#[allow(dead_code)]ontry_drain_notificationandsubscribe_notificationswill be removed in #6 whenwait_forcalls them.broadcast::channelcapacity isopts.buffer_size.max(16). The persistent buffer is the source of truth; the broadcast channel is purely a wake-up signal for waiters.notifymethod takes{event, data, count?}so the same fixture can drive both single-notification and overflow tests without spawning multiple servers.Review — bounded notification buffer + broadcast (#5)
Double stockage (buffer persistant + broadcast)
La conception est correcte : le broadcast sert à waker les waiters, le
NotificationStoreest la source de vérité. La souscription au broadcast avant le scan du buffer (implémenté en #6) ferme la race window. Bon design.NotificationStore::push— throttling du warningLe premier warning se déclenche au 100ème drop, puis tous les 100 drops supplémentaires — cohérent et intentionnel. Mais
dropped_totalinclut les drops courants depuis le warning précédent, donc le message est correct.ClientOptionsBon ajout — permet de customiser le buffer sans toucher à l'API de connexion de base.
connect_with_optionsgardeconnectsimple pour le cas courant.Minor
#[allow(dead_code)]surdrain_firstest approprié ici puisquewait_forarrive en #6. Sera retiré proprement dans la PR suivante.Tests
NotificationStore(push, overflow, drain_first) — excellents, vérifient les invariants internes.Aucun bloquant.
✅ Pas de bloquant — design double-stockage (buffer + broadcast) bien pensé, bonne couverture unitaire.