feat(client): WebSocket connect, call, id routing (#4) #22

Merged
charles merged 1 commit from feat/4-client-connect-call into main 2026-04-11 18:42:31 +00:00
Owner

Closes #4. Stacked on #21.

Summary

First slice of RpcClient: connection, request/response, JSON-RPC error mapping. Notifications are observed by the reader task but discarded for now — ticket #5 adds the bounded buffer, #6 adds subscribe/wait_for.

Architecture

  • RpcClient { inner: Arc<Inner> } is cheaply cloneable. All public methods take &self so cloned handles can be used concurrently from different tasks. Resolves spec-review §1.
  • Inner holds an mpsc::UnboundedSender<Message> for the writer task, a Mutex<HashMap<u64, oneshot::Sender<...>>> for pending requests, an AtomicU64 id counter, and the default per-call timeout.
  • Two background tokio tasks per connection:
    • write_loop: drains the mpsc into the WebSocket sink.
    • read_loop: parses frames, dispatches id-bearing messages to the matching pending oneshot, drops everything else (notifications come back in #5).

Connect

  • RpcClient::connect(url, token) wraps tokio_tungstenite::connect_async in a 10 s timeout.
  • token: Option<&str> is sent as Authorization: Bearer <token> via the upgrade headers.
  • All connect-side failures map to TestError::Connection.

Call

  • call(method, params) uses the default 30 s timeout; call_timeout(method, params, duration) overrides it.
  • Auto-incrementing u64 id, served by an AtomicU64.
  • Value::Null params is rewritten to {} per spec §4.1.
  • JSON-RPC error envelope → Err(TestError::RpcError { code, message }). Resolves spec-review §6call returns Ok(Value) only on JSON-RPC success.
  • Timeout → Err(TestError::Timeout { event: "call:<method>", duration }). The pending entry is cleaned up on timeout.

Checklist (from issue #4)

  • connect(url, token) with bearer auth + 10 s connect timeout
  • &self interior mutability via Arc<Inner> (resolves spec review §1)
  • Background reader dispatches by id via oneshot
  • call / call_timeout with auto-incrementing id
  • Default 30 s timeout, override via call_timeout
  • JSON-RPC error → TestError::RpcError (resolves spec review §6)
  • Value::Null params rewritten to {} (spec §4.1)
  • Timeout produces TestError::Timeout { event: "call:<method>", .. }

Test plan (7 new tests in src/client.rs)

  • spawn_echo_server — in-process tokio_tungstenite::accept_async fixture handling ping, fail, slow
  • call_returns_result_on_success
  • call_with_null_params_sends_empty_object
  • call_returns_rpc_error_on_failure — asserts TestError::RpcError { code: -32601, .. }
  • call_timeout_actually_times_out — asserts TestError::Timeout with "call:slow" event
  • ids_increment_per_call — three sequential calls, each gets the right echo back (proves id routing)
  • cloned_client_shares_connection — clones use the same underlying connection
  • connect_to_invalid_url_returns_connection_error
  • just qa green locally — 27/27 unit tests pass
  • CI green on Forgejo

Notes for the reviewer

  • Default impl for RpcClient exists only so the prelude re-export from #1 (pub use crate::client::RpcClient) keeps compiling. The default client's channels are detached so any call returns Err. Real clients always come from connect.
  • I deliberately did not add notification storage in this PR. The reader task simply drops them with a comment. #5 changes that branch to push into the bounded buffer.
  • tokio_tungstenite::tungstenite::Message::Text in 0.26 takes a Utf8Bytes; String::into() works for the conversion.
  • Reader task exits cleanly on Message::Close, transport error, or end-of-stream.
  • The 30 s default call timeout matches spec §2.2.
Closes #4. **Stacked on #21.** ## Summary First slice of `RpcClient`: connection, request/response, JSON-RPC error mapping. Notifications are observed by the reader task but **discarded** for now — ticket #5 adds the bounded buffer, #6 adds `subscribe`/`wait_for`. ### Architecture - `RpcClient { inner: Arc<Inner> }` is cheaply cloneable. All public methods take `&self` so cloned handles can be used concurrently from different tasks. **Resolves spec-review §1.** - `Inner` holds an `mpsc::UnboundedSender<Message>` for the writer task, a `Mutex<HashMap<u64, oneshot::Sender<...>>>` for pending requests, an `AtomicU64` id counter, and the default per-call timeout. - Two background tokio tasks per connection: - `write_loop`: drains the mpsc into the WebSocket sink. - `read_loop`: parses frames, dispatches id-bearing messages to the matching pending oneshot, drops everything else (notifications come back in #5). ### Connect - `RpcClient::connect(url, token)` wraps `tokio_tungstenite::connect_async` in a 10 s timeout. - `token: Option<&str>` is sent as `Authorization: Bearer <token>` via the upgrade headers. - All connect-side failures map to `TestError::Connection`. ### Call - `call(method, params)` uses the default 30 s timeout; `call_timeout(method, params, duration)` overrides it. - Auto-incrementing u64 id, served by an `AtomicU64`. - `Value::Null` params is rewritten to `{}` per spec §4.1. - JSON-RPC error envelope → `Err(TestError::RpcError { code, message })`. **Resolves spec-review §6** — `call` returns `Ok(Value)` only on JSON-RPC success. - Timeout → `Err(TestError::Timeout { event: "call:<method>", duration })`. The pending entry is cleaned up on timeout. ## Checklist (from issue #4) - [x] `connect(url, token)` with bearer auth + 10 s connect timeout - [x] `&self` interior mutability via `Arc<Inner>` (resolves spec review §1) - [x] Background reader dispatches by id via oneshot - [x] `call` / `call_timeout` with auto-incrementing id - [x] Default 30 s timeout, override via `call_timeout` - [x] JSON-RPC error → `TestError::RpcError` (resolves spec review §6) - [x] `Value::Null` params rewritten to `{}` (spec §4.1) - [x] Timeout produces `TestError::Timeout { event: "call:<method>", .. }` ## Test plan (7 new tests in `src/client.rs`) - [x] `spawn_echo_server` — in-process `tokio_tungstenite::accept_async` fixture handling `ping`, `fail`, `slow` - [x] `call_returns_result_on_success` - [x] `call_with_null_params_sends_empty_object` - [x] `call_returns_rpc_error_on_failure` — asserts `TestError::RpcError { code: -32601, .. }` - [x] `call_timeout_actually_times_out` — asserts `TestError::Timeout` with `"call:slow"` event - [x] `ids_increment_per_call` — three sequential calls, each gets the right echo back (proves id routing) - [x] `cloned_client_shares_connection` — clones use the same underlying connection - [x] `connect_to_invalid_url_returns_connection_error` - [x] `just qa` green locally — 27/27 unit tests pass - [ ] CI green on Forgejo ## Notes for the reviewer - `Default` impl for `RpcClient` exists only so the prelude re-export from #1 (`pub use crate::client::RpcClient`) keeps compiling. The default client's channels are detached so any call returns `Err`. Real clients always come from `connect`. - I deliberately did **not** add notification storage in this PR. The reader task simply drops them with a comment. #5 changes that branch to push into the bounded buffer. - `tokio_tungstenite::tungstenite::Message::Text` in 0.26 takes a `Utf8Bytes`; `String::into()` works for the conversion. - Reader task exits cleanly on `Message::Close`, transport error, or end-of-stream. - The 30 s default call timeout matches spec §2.2.
Implements ticket #4. Notifications are observed by the reader but
discarded for now — ticket #5 adds the bounded buffer.

Architecture
- RpcClient is a cheap Arc<Inner> handle. Cloning it gives another
  handle on the same connection. All public methods take &self and
  may be called concurrently from different tasks (resolves spec
  review §1).
- Inner holds: an mpsc::UnboundedSender<Message> for the writer task,
  a Mutex<HashMap<u64, oneshot::Sender>> for pending requests, an
  AtomicU64 id counter, and the default per-call timeout.
- Two background tokio tasks per connection:
    - write_loop: drains the mpsc into the WebSocket sink
    - read_loop: parses incoming frames; id-bearing messages are
      dispatched to the matching pending oneshot, others are dropped

Connect
- RpcClient::connect(url, token) wraps tokio_tungstenite::connect_async
  in a 10 s timeout.
- token: Option<&str> is sent as Authorization: Bearer <token> in the
  upgrade request headers.
- Failures map to TestError::Connection.

Call
- call(method, params) uses the default 30 s timeout.
- call_timeout(method, params, duration) lets callers override.
- Auto-incrementing u64 id (AtomicU64).
- Null params is rewritten to {} per spec §4.1.
- JSON-RPC error envelope → Err(TestError::RpcError { code, message }).
  Resolves spec review §6: call() returns Ok(Value) only on JSON-RPC
  success; errors are bubbled as Err(RpcError).
- Timeout → TestError::Timeout { event: "call:<method>", duration };
  the pending entry is cleaned up on timeout.

Tests (7 new in src/client.rs)
- spawn_echo_server: in-process tokio_tungstenite::accept_async fixture
  handling ping/fail/slow methods so each test stays self-contained
- call_returns_result_on_success
- call_with_null_params_sends_empty_object (spec §4.1 verification)
- call_returns_rpc_error_on_failure
- call_timeout_actually_times_out
- ids_increment_per_call (3 sequential calls; routing sanity check)
- cloned_client_shares_connection (proves &self + clone)
- connect_to_invalid_url_returns_connection_error

just qa green: 27/27 unit tests, fmt and clippy -D warnings clean.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
charles left a comment

Review — WebSocket connect, call, id routing (#4)

Minor : Ordering::SeqCst sur le compteur d'ID

let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst);

SeqCst impose un ordre total global entre toutes les opérations atomiques du programme — largement plus fort que nécessaire ici. Relaxed suffit pour un compteur monotone dont la seule contrainte est que chaque appel obtienne une valeur distincte.

Minor : channel sortant non borné

mpsc::unbounded_channel n'a pas de backpressure. Pour des scénarios de test avec du parallélisme borné, c'est acceptable. Vaut la peine d'être documenté pour les utilisateurs qui construiraient des tests à débit élevé.

Déconnexion du write loop et waiters en attente

Quand write_loop se termine (connexion fermée), les oneshot::Sender restent dans la PendingMap jusqu'à l'expiration de leurs timeouts individuels. La déconnexion provoque donc une cascade de Timeout plutôt qu'une Connection error immédiate. Comportement attendu mais mériterait une note dans les docs ou un TODO.

Ce qui est bien

  • Arc<Inner> pour clone sans coût — design correct.
  • Cleanup de la pending map au timeout — évite les fuites mémoire.
  • Params null → {} conforme à la spec JSON-RPC et bien testé.
  • Default impl retourne un client détaché avec un doc-comment clair.
  • Echo server avec variantes fail, slow — bonne couverture.
## Review — WebSocket connect, call, id routing (#4) ### Minor : `Ordering::SeqCst` sur le compteur d'ID ```rust let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); ``` `SeqCst` impose un ordre total global entre toutes les opérations atomiques du programme — largement plus fort que nécessaire ici. `Relaxed` suffit pour un compteur monotone dont la seule contrainte est que chaque appel obtienne une valeur distincte. ### Minor : channel sortant non borné `mpsc::unbounded_channel` n'a pas de backpressure. Pour des scénarios de test avec du parallélisme borné, c'est acceptable. Vaut la peine d'être documenté pour les utilisateurs qui construiraient des tests à débit élevé. ### Déconnexion du write loop et waiters en attente Quand `write_loop` se termine (connexion fermée), les `oneshot::Sender` restent dans la `PendingMap` jusqu'à l'expiration de leurs timeouts individuels. La déconnexion provoque donc une cascade de `Timeout` plutôt qu'une `Connection` error immédiate. Comportement attendu mais mériterait une note dans les docs ou un TODO. ### Ce qui est bien - `Arc<Inner>` pour clone sans coût — design correct. - Cleanup de la `pending` map au timeout — évite les fuites mémoire. - Params null → `{}` conforme à la spec JSON-RPC et bien testé. - `Default` impl retourne un client détaché avec un doc-comment clair. - Echo server avec variantes `fail`, `slow` — bonne couverture.
charles left a comment

Commentaires sans bloquant. Deux points mineurs : SeqCstRelaxed sur le compteur d'ID, et documenter le comportement cascade-Timeout en cas de déconnexion.

Commentaires sans bloquant. Deux points mineurs : `SeqCst` → `Relaxed` sur le compteur d'ID, et documenter le comportement cascade-Timeout en cas de déconnexion.
charles changed target branch from feat/3-health-polling to main 2026-04-11 18:42:24 +00:00
charles deleted branch feat/4-client-connect-call 2026-04-11 18:42:31 +00:00
Sign in to join this conversation.
No description provided.