fix(messaging): reconcile adapter runtime state#372
fix(messaging): reconcile adapter runtime state#372PyRo1121 wants to merge 5 commits intospacedriveapp:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughRefactors adapter hot-reload into a reconcile model: adds FileWatcherHandle, deterministic Twitch token naming, adapter fingerprinting, per-adapter supervisor runtimes with backoff, build_desired_configured_adapters(), and MessagingManager::reconcile_configured to apply changes atomically. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| if !email_config.imap_host.is_empty() { | ||
| let fingerprint = format!( | ||
| "imap_host={};imap_port={};imap_username={};imap_password={};imap_use_tls={};smtp_host={};smtp_port={};smtp_username={};smtp_password={};smtp_use_starttls={};from_address={};from_name={:?};poll_interval_secs={};folders={:?};allowed_senders={:?};max_body_bytes={};max_attachment_bytes={}", | ||
| email_config.imap_host, |
There was a problem hiding this comment.
These fingerprints embed raw secrets (passwords/tokens/auth tokens). Since the fingerprint is stored in MessagingManager (and can be easy to accidentally log/surface later), it’d be safer to hash/redact sensitive fields (e.g. sha256(token)), rather than including them verbatim.
| /// Build the persisted token filename for a named Twitch adapter instance. | ||
| pub fn named_twitch_token_file_name(name: &str) -> String { | ||
| let safe_name: String = name | ||
| .chars() |
There was a problem hiding this comment.
Worth bounding safe_name length to avoid path/filename length issues if someone gives a very long instance name.
| .chars() | |
| let safe_name: String = name | |
| .chars() | |
| .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' }) | |
| .take(64) | |
| .collect(); |
src/messaging/manager.rs
Outdated
| .await | ||
| .insert(name.clone(), Arc::clone(&adapter)); | ||
| if self.started.load(Ordering::SeqCst) { | ||
| self.stop_runtime(&name, old_adapter, false).await.ok(); |
There was a problem hiding this comment.
Minor: started is loaded twice here. Caching the value once reads cleaner and avoids any future footguns if started ever changes semantics.
| self.stop_runtime(&name, old_adapter, false).await.ok(); | |
| let started = self.started.load(Ordering::SeqCst); | |
| if started { | |
| self.stop_runtime(&name, old_adapter, false).await.ok(); | |
| } | |
| if started && let Err(error) = self.start_runtime(name.clone(), adapter, false).await { | |
| tracing::warn!(adapter = %name, %error, "failed to start adapter registered after manager start"); | |
| } |
src/messaging/manager.rs
Outdated
| let _stream = manager.start().await.expect("start manager"); | ||
|
|
||
| tokio::time::sleep(std::time::Duration::from_millis(40)).await; | ||
| let before_remove = adapter.start_calls.load(Ordering::SeqCst); |
There was a problem hiding this comment.
This test looks a bit timing-sensitive (sleep(40ms) / sleep(50ms)) and could flake on slow/loaded CI. It might be more robust to have TestAdapter::start() signal each call via a channel/Notify, then assert no further signals arrive after remove_adapter().
| .cloned() | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| let mut first_error = None; |
There was a problem hiding this comment.
One edge case here: stale-removal only considers configured_fingerprints keys. Since startup registers adapters but doesn’t seed fingerprints, the first config reload that disables/removes an adapter won’t remove it (because current_configured is empty and the adapter isn’t in desired).
Seems worth either seeding configured_fingerprints during initial adapter setup, or basing the removal sweep on the running self.adapters set (at least on the first reconcile).
src/main.rs
Outdated
| // Restart file watcher with the new agent data | ||
| _file_watcher = spacebot::config::spawn_file_watcher( | ||
| if let Some(old_watcher) = _file_watcher.take() { | ||
| old_watcher.abort(); |
There was a problem hiding this comment.
Heads up: spawn_file_watcher() uses tokio::task::spawn_blocking, so JoinHandle::abort() likely won’t actually stop the blocking watcher loop once it’s running. If the goal is to avoid duplicate watchers, you may need an explicit shutdown signal inside spawn_file_watcher (e.g. watch/CancellationToken + periodic check) so the loop can exit and drop the notify watcher.
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/main.rs (1)
2221-2234:⚠️ Potential issue | 🟠 MajorUpdate
api_state'sLlmManagerwhen config reloads with new provider credentials.When providers are reconfigured,
new_llm_manageris created and passed to agents and the file watcher, butapi_stateretains the startup instance. API handlers callingstate.llm_manager.read().awaitwill continue using stale credentials until restart.🩹 Minimal fix
Ok(new_llm) => { let new_llm_manager = Arc::new(new_llm); + api_state.set_llm_manager(new_llm_manager.clone()).await; // Update agent_humans from the reloaded config // before initialize_agents so agents see the // latest [[humans]] entries.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main.rs` around lines 2221 - 2234, The api_state's LlmManager must be replaced when config reloads so API handlers stop using stale credentials: after creating new_llm_manager (the value passed into agents and spawn_file_watcher) acquire a write lock on api_state.llm_manager (or the shared state variable named api_state/state) and assign/replace it with new_llm_manager.clone() (or swap the Arc/Box underlying the RwLock) so subsequent state.llm_manager.read().await returns the updated manager; ensure cloning/Arc semantics match existing LlmManager type to avoid ownership issues.
🧹 Nitpick comments (3)
src/config/watcher.rs (1)
46-60: Prefer.ok()for the channel send here.Dropping the send error is fine if the receiver is gone, but the repo guideline calls out channel sends as the case that should use
.ok()rather thanlet _ =.♻️ Small cleanup
- let _ = tx.send(event); + tx.send(event).ok(); ... - let _ = tx.send(event); + tx.send(event).ok();As per coding guidelines, "Don't silently discard errors. No
let _ =on Results. Handle them, log them, or propagate them. The only exception is.ok()on channel sends where the receiver may be dropped"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/config/watcher.rs` around lines 46 - 60, The channel send results in this watcher closure use `let _ = tx.send(event);` which silently discards the Result; replace each `let _ = tx.send(event);` in the notify closure (the closure passed to notify::recommended_watcher in src/config/watcher.rs that matches on EventKind::Create, Modify(Data), Remove and Modify(Any)) with `tx.send(event).ok();` so the send error is explicitly converted to Option and dropped per repo guideline.src/messaging/manager.rs (2)
85-85: Don't discardstop_runtime()results at these call sites.These
.ok()calls make future lifecycle regressions invisible here. Please either log unexpectedErrs explicitly or splitstop_runtime()into strict/non-strict APIs instead of dropping theResult.As per coding guidelines, "Don't silently discard errors. No
let _ =on Results. Handle them, log them, or propagate them. The only exception is.ok()on channel sends where the receiver may be dropped"Also applies to: 105-105, 159-159, 223-225
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` at line 85, The calls that end with .ok() on the Result returned by self.stop_runtime(...) must not silently discard errors; replace those .ok() uses (the calls to stop_runtime with the signature stop_runtime(&name, old_adapter, false).await) with explicit handling: await the Result and either propagate the error or log it (e.g., if let Err(e) = self.stop_runtime(...).await { tracing::error!(%name, adapter=?old_adapter, error=%e, "failed to stop runtime"); } ). Do this at every call site flagged (the multiple invocations of stop_runtime in manager.rs) — alternatively, if you need two behaviors, provide two API variants (e.g., stop_runtime_strict / stop_runtime_best_effort) and use the appropriate one instead of dropping the Result.
11-12: Use full sender/receiver names in this lifecycle code.
tx/rxmake this already-concurrent path harder to scan and don't match the repo naming rule. Renaming them while this API is still new will make the supervisor/shutdown flow easier to follow.As per coding guidelines, "Don't abbreviate variable names. Use
queuenotq,messagenotmsg,channelnotch. Common abbreviations likeconfigare fine"Also applies to: 25-26, 55-57, 259-260, 361-366
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/config/types.rs`:
- Around line 1558-1567: The sanitized filename prefix generated in
named_twitch_token_file_name can be arbitrarily long; truncate the safe_name
after sanitizing (e.g., limit to a fixed number of characters like 32) before
combining it with hash_prefix so the resulting filename stays within filesystem
limits while preserving uniqueness via the hash; keep the existing sanitization
logic (safe_name), then slice/truncate that string and use the truncated value
when building the final format!("twitch_token_{safe_name}_{hash_prefix}.json").
In `@src/main.rs`:
- Around line 2218-2220: The current code calls JoinHandle::abort() on
_file_watcher which doesn't stop a spawn_blocking watcher; change to cooperative
shutdown by adding a cancellation signal (e.g., an Arc<AtomicBool> or
tokio::sync::watch/oneshot) that you pass into spawn_file_watcher when creating
the blocking task, have the blocking closure (inside spawn_file_watcher)
periodically check that signal and exit cleanly when set, and when replacing the
watcher set the signal for the old watcher and then await its JoinHandle (await
_file_watcher.take().unwrap().await) to ensure it has exited before spawning the
new watcher; reference spawn_file_watcher, _file_watcher, and the JoinHandle
previously aborted.
In `@src/messaging/manager.rs`:
- Around line 121-139: The start() code snapshots adapters before setting
self.started, allowing concurrent register*/reconcile_configured to insert
adapters that see started==false and get skipped; fix by flipping the order so
the started flag is set before taking the adapters snapshot: set
self.started.store(true, Ordering::SeqCst) (and still validate fan_in_rx with
the same context check) prior to reading self.adapters and collecting into
adapters, then iterate and call start_runtime(name, adapter, false). Reference:
the symbols self.started, adapters snapshot, start_runtime, fan_in_rx, and
register*/reconcile_configured.
- Around line 277-316: The supervisor currently resets next_retry_delay to
INITIAL_RETRY_DELAY immediately when start_result is Ok(stream), which allows a
hot loop if the stream ends instantly; change the logic so that next_retry_delay
is only reset after the stream has proven healthy (e.g., after receiving the
first message or surviving a short "healthy" interval) and treat an immediate
None from stream like a start failure by applying exponential backoff (using
next_retry_delay = (next_retry_delay * 2).min(MAX_RETRY_DELAY)) before
attempting start() again; update the branches around start_result, the loop
handling maybe_message/None, and the handling of stream to reference
start_result, stream, maybe_message, next_retry_delay, INITIAL_RETRY_DELAY,
MAX_RETRY_DELAY and fan_in_tx so the delay is not reset on short-lived streams.
- Around line 390-410: The current code returns Err(error) immediately when
adapter.shutdown().await fails (propagate_shutdown_error), which allows
remove_adapter()/reconcile_configured() to run before the old supervisor task is
aborted/joined; change the shutdown sequence so you always finish tearing down
the runtime supervisor before returning the shutdown error: after
adapter.shutdown().await returns Err(error) still proceed to abort the runtime
task (runtime.task.abort()) and await runtime.task.await (logging any join
failures), and only then, if propagate_shutdown_error is true, return the
original shutdown error from adapter.shutdown(); reference adapter.shutdown(),
runtime.task.abort(), runtime.task.await, and propagate_shutdown_error in your
change.
---
Outside diff comments:
In `@src/main.rs`:
- Around line 2221-2234: The api_state's LlmManager must be replaced when config
reloads so API handlers stop using stale credentials: after creating
new_llm_manager (the value passed into agents and spawn_file_watcher) acquire a
write lock on api_state.llm_manager (or the shared state variable named
api_state/state) and assign/replace it with new_llm_manager.clone() (or swap the
Arc/Box underlying the RwLock) so subsequent state.llm_manager.read().await
returns the updated manager; ensure cloning/Arc semantics match existing
LlmManager type to avoid ownership issues.
---
Nitpick comments:
In `@src/config/watcher.rs`:
- Around line 46-60: The channel send results in this watcher closure use `let _
= tx.send(event);` which silently discards the Result; replace each `let _ =
tx.send(event);` in the notify closure (the closure passed to
notify::recommended_watcher in src/config/watcher.rs that matches on
EventKind::Create, Modify(Data), Remove and Modify(Any)) with
`tx.send(event).ok();` so the send error is explicitly converted to Option and
dropped per repo guideline.
In `@src/messaging/manager.rs`:
- Line 85: The calls that end with .ok() on the Result returned by
self.stop_runtime(...) must not silently discard errors; replace those .ok()
uses (the calls to stop_runtime with the signature stop_runtime(&name,
old_adapter, false).await) with explicit handling: await the Result and either
propagate the error or log it (e.g., if let Err(e) =
self.stop_runtime(...).await { tracing::error!(%name, adapter=?old_adapter,
error=%e, "failed to stop runtime"); } ). Do this at every call site flagged
(the multiple invocations of stop_runtime in manager.rs) — alternatively, if you
need two behaviors, provide two API variants (e.g., stop_runtime_strict /
stop_runtime_best_effort) and use the appropriate one instead of dropping the
Result.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 635ff3c5-7999-411b-8610-1de492c1e32f
📒 Files selected for processing (6)
docs/content/docs/(configuration)/config.mdxsrc/config/types.rssrc/config/watcher.rssrc/main.rssrc/messaging.rssrc/messaging/manager.rs
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/config/watcher.rs (1)
244-272:⚠️ Potential issue | 🟠 MajorRun reconciliation serially instead of detaching it.
This block fires
reconcile_configured()in the background and returns immediately. Because reconciliation awaits adaptershutdown()/start(), a later file event can launch a second desired-state transition against the same manager before the first one finishes, and the detached task also survives when the watcher itself is aborted during provider transitions. Eitherblock_onthis from the watcher thread or guard reconciliation behind a single mutex.💡 One straightforward way to keep reconciles ordered
- rt.spawn(async move { - match build_desired_configured_adapters( - &config, - &instance_dir, - discord_permissions, - slack_permissions, - telegram_permissions, - twitch_permissions, - ) { - Ok(desired) => { - if let Err(error) = manager.reconcile_configured(desired).await { - tracing::warn!(%error, "messaging adapter reconciliation encountered errors"); - } - } - Err(error) => { - tracing::error!(%error, "failed to build desired messaging adapters from config change"); - } - } - }); + match build_desired_configured_adapters( + &config, + &instance_dir, + discord_permissions, + slack_permissions, + telegram_permissions, + twitch_permissions, + ) { + Ok(desired) => { + rt.block_on(async { + if let Err(error) = manager.reconcile_configured(desired).await { + tracing::warn!(%error, "messaging adapter reconciliation encountered errors"); + } + }); + } + Err(error) => { + tracing::error!(%error, "failed to build desired messaging adapters from config change"); + } + }
♻️ Duplicate comments (3)
src/messaging/manager.rs (3)
388-405:⚠️ Potential issue | 🟠 MajorAlways tear down the runtime before returning shutdown errors.
Line 392 returns before Lines 397-405 abort/join the supervisor. A failed
adapter.shutdown()during removal or replacement can therefore leave the old task alive under the same adapter name. Capture the shutdown error, finish the runtime teardown, and only then return it.💡 Suggested teardown ordering
- if let Some(adapter) = adapter - && let Err(error) = adapter.shutdown().await - { - if propagate_shutdown_error { - return Err(error); - } - tracing::warn!(adapter = %name, %error, "failed to shut down adapter"); - } + let shutdown_error = if let Some(adapter) = adapter { + match adapter.shutdown().await { + Ok(()) => None, + Err(error) => { + if !propagate_shutdown_error { + tracing::warn!(adapter = %name, %error, "failed to shut down adapter"); + } + Some(error) + } + } + } else { + None + }; if let Some(runtime) = runtime { runtime.task.abort(); match runtime.task.await { Ok(()) => {} @@ } } } + if let Some(error) = shutdown_error + && propagate_shutdown_error + { + return Err(error); + } + Ok(())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 388 - 405, The current code returns immediately when adapter.shutdown().await returns Err(error), which can bypass tearing down the supervisor task; instead, capture the shutdown error (e.g., let shutdown_err = Some(error)) but do not return yet, always proceed to the runtime teardown logic (call runtime.task.abort() and await runtime.task) and only after the runtime join completes, if propagate_shutdown_error is true return the captured error; otherwise log/warn as before. Update the block around adapter.shutdown(), propagate_shutdown_error, runtime.task.abort(), and runtime.task.await to follow this ordering and reference adapter.shutdown(), propagate_shutdown_error, runtime.task.abort(), runtime.task.await, and the adapter name variable (name).
266-279:⚠️ Potential issue | 🟠 MajorBack off after immediate stream termination.
Lines 267 and 278 reset
next_retry_delayas soon asstart()returnsOk(_). Ifstream.next()then yieldsNoneon the first poll, Lines 312-314 jump straight back toadapter.start()with no sleep, which can hot-loop against a flaky provider. Only reset the delay after the stream proves healthy, and treat immediate EOF like a failed start.Also applies to: 298-315
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 266 - 279, The code resets next_retry_delay = INITIAL_RETRY_DELAY as soon as adapter.start() returns Ok(stream), which allows a hot-loop if stream.next() yields None immediately; change the logic so that an Ok from adapter.start() does NOT reset next_retry_delay yet — only reset next_retry_delay after the stream proves healthy by yielding at least one Some from stream.next(); if stream.next() returns None on the first poll treat it like a failed start (fall through to the backoff path and increase next_retry_delay) so that immediate EOF does not bypass the sleep. Ensure this change is applied to the branch handling start_result (where adapter.start() is matched) and the other similar branch around initial_stream handling.
118-137:⚠️ Potential issue | 🟠 MajorSerialize
start()with adapter insertion/fingerprinting.Any adapter inserted between Lines 119-125 and Line 133 sees
started == false, sostart_runtime()returns early, while Lines 231-234 can still record its fingerprint. After that, later reconciles treat it as unchanged even though no supervisor was ever installed.start(),register*(), andreconcile_configured()need one shared startup/reconfiguration gate here.Also applies to: 217-234
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 118 - 137, start() races with adapter insertion/fingerprinting because adapters can be added after you snapshot adapters but before you set started/launch supervisors, causing start_runtime() to skip supervisor install while later fingerprinting records it as unchanged; fix by adding a single startup/reconfigure gate that serializes start(), register*() and reconcile_configured(): acquire that gate in start() before snapshotting self.adapters and set self.started under the same gate, then release after all start_runtime(name, adapter, false) calls complete; ensure register*() and reconcile_configured() also acquire the same gate when inserting or computing fingerprints so they cannot insert adapters while start() is in progress (use a new async Mutex/lock field or reuse an existing write lock to protect adapter insertion/fingerprinting).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/config/types.rs`:
- Around line 1558-1568: Replace all manual constructions/deletions of
"twitch_token_{safe_name}.json" in messaging logic with the new helper
crate::config::named_twitch_token_file_name(name): use that function when
computing filenames for read/write/delete, and when migrating existing data,
detect the old filename variant and either move/rename it to the new name or
fall back to loading from the old file if the new one is absent; also ensure
cleanup removes both old and new filenames. Specifically, wherever code builds
or unlinks twitch token filenames (the sites that currently form
"twitch_token_{safe_name}.json"), call named_twitch_token_file_name(name) for
all operations and add a small compatibility step to check for and migrate the
legacy file to the new hashed name before writing or deleting.
In `@src/messaging/manager.rs`:
- Around line 324-349: start_runtime currently awaits adapter.start() before
calling install_supervisor, leaving the initial connect outside supervisor
control; change the flow so the supervisor owns the initial start attempt:
either call install_supervisor(name, Arc::clone(&adapter), None).await first to
publish an AdapterRuntime and have the supervisor spawn the task that performs
adapter.start() and stores the resulting stream, or create an abortable task
(e.g., tokio::spawn with an AbortHandle) for adapter.start() and pass that
handle/placeholder into install_supervisor so remove_adapter can abort it; apply
the same change to the other similar block that creates the initial_stream and
calls install_supervisor(name, adapter, Some(initial_stream)).await so no start
attempt happens outside supervisor ownership.
- Around line 79-90: The call to stop_runtime(&name, old_adapter,
false).await.ok() silently discards errors; change the call sites (the
stop_runtime invocations near the adapter registration and other noted ranges)
to handle the Result instead of calling .ok() — either propagate the error
(return/? from the surrounding async function) or log it with
tracing::warn!/error! including context (adapter name and the returned error) so
shutdown failures are visible; apply the same treatment to the other
stop_runtime calls and ensure start_runtime(...) error handling remains
consistent (the existing tracing::warn for start_runtime can be used as a
pattern).
---
Duplicate comments:
In `@src/messaging/manager.rs`:
- Around line 388-405: The current code returns immediately when
adapter.shutdown().await returns Err(error), which can bypass tearing down the
supervisor task; instead, capture the shutdown error (e.g., let shutdown_err =
Some(error)) but do not return yet, always proceed to the runtime teardown logic
(call runtime.task.abort() and await runtime.task) and only after the runtime
join completes, if propagate_shutdown_error is true return the captured error;
otherwise log/warn as before. Update the block around adapter.shutdown(),
propagate_shutdown_error, runtime.task.abort(), and runtime.task.await to follow
this ordering and reference adapter.shutdown(), propagate_shutdown_error,
runtime.task.abort(), runtime.task.await, and the adapter name variable (name).
- Around line 266-279: The code resets next_retry_delay = INITIAL_RETRY_DELAY as
soon as adapter.start() returns Ok(stream), which allows a hot-loop if
stream.next() yields None immediately; change the logic so that an Ok from
adapter.start() does NOT reset next_retry_delay yet — only reset
next_retry_delay after the stream proves healthy by yielding at least one Some
from stream.next(); if stream.next() returns None on the first poll treat it
like a failed start (fall through to the backoff path and increase
next_retry_delay) so that immediate EOF does not bypass the sleep. Ensure this
change is applied to the branch handling start_result (where adapter.start() is
matched) and the other similar branch around initial_stream handling.
- Around line 118-137: start() races with adapter insertion/fingerprinting
because adapters can be added after you snapshot adapters but before you set
started/launch supervisors, causing start_runtime() to skip supervisor install
while later fingerprinting records it as unchanged; fix by adding a single
startup/reconfigure gate that serializes start(), register*() and
reconcile_configured(): acquire that gate in start() before snapshotting
self.adapters and set self.started under the same gate, then release after all
start_runtime(name, adapter, false) calls complete; ensure register*() and
reconcile_configured() also acquire the same gate when inserting or computing
fingerprints so they cannot insert adapters while start() is in progress (use a
new async Mutex/lock field or reuse an existing write lock to protect adapter
insertion/fingerprinting).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b8bb49f9-38dc-4f04-816b-7cbd82d16b10
📒 Files selected for processing (3)
src/config/types.rssrc/config/watcher.rssrc/messaging/manager.rs
|
Follow-up for commit This pass addressed the newest review feedback on PR #372. What changed:
Validation rerun locally after these changes:
Result:
|
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (5)
src/messaging/manager.rs (4)
82-93:⚠️ Potential issue | 🟡 MinorDon't swallow
stop_runtime()failures.These
.ok()calls discard shutdown/join errors from a non-channelResult. If an old runtime fails to stop, the replacement path keeps going and you lose the only signal that supervisors may now overlap.As per coding guidelines "Don't silently discard errors. No
let _ =on Results. Handle them, log them, or propagate them. The only exception is.ok()on channel sends where the receiver may be dropped".Also applies to: 102-113, 158-165, 224-230
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 82 - 93, The code is currently discarding errors from stop_runtime by calling .ok(); update all occurrences (e.g., the calls inside the adapter registration flow around stop_runtime(&name, old_adapter, false).await.ok()) to handle the Result instead of swallowing it: await the stop_runtime call, check for Err and either log the error with tracing::warn!/error! including the adapter name/context or propagate the error up so callers can react (same treatment for the similar occurrences at the other locations you noted). Ensure you reference the same parameters (name, old_adapter, boolean flag) when logging so supervisors can detect overlapping runtimes.
553-564:⚠️ Potential issue | 🟠 MajorThis doesn't actually seed fingerprints.
Lines 561-563 insert
""for each adapter, so the firstreconcile_configured()after startup still sees every configured adapter as changed and restarts them on the next config edit. Either initialize startup adapters through the same fingerprintedConfiguredAdapterpath, or let this method receive the real fingerprint values.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 553 - 564, seed_configured_fingerprints_from_registered currently writes empty strings into configured_fingerprints which causes reconcile_configured to treat everything as changed; instead populate configured_fingerprints with the actual fingerprints from the registered adapter objects (or reuse the same creation path that produces ConfiguredAdapter fingerprints). Update seed_configured_fingerprints_from_registered to read the registered adapters map (e.g., registered_adapters.read().await or the structure holding adapter state), get each adapter's real fingerprint (e.g., ConfiguredAdapter::fingerprint or the field/method that returns the fingerprint) and insert that value into configured_fingerprints.entry(name).or_insert_with(|| actual_fingerprint), or change the function signature to accept a map of name->fingerprint and seed from that, so reconcile_configured no longer sees all adapters as changed on startup.
285-299:⚠️ Potential issue | 🟠 MajorImmediate EOFs still collapse to the initial delay.
Line 288 resets
next_retry_delayon any successfulstart(), even if the stream returnsNoneimmediately. A connect/EOF loop therefore sleeps onlyINITIAL_RETRY_DELAYforever instead of backing off across attempts. Reset the delay only after the stream proves healthy.Also applies to: 327-341
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 285 - 299, The code resets next_retry_delay to INITIAL_RETRY_DELAY as soon as start_result returns Ok(stream), but that incorrectly collapses backoff for immediate-EOF/None streams; instead, remove the immediate reset inside the Ok(start) branch and only reset next_retry_delay after the stream has proven healthy (e.g., after receiving the first successful item/event from the stream or otherwise confirming it is not an immediate EOF). Update the logic around the match on start_result (the Ok(stream) branch where stream is used) and the analogous block later (the second match handling start_result) so the reset happens after a confirmed successful read from stream rather than on start success. Ensure you reference and adjust next_retry_delay, INITIAL_RETRY_DELAY, start_result, start(), and stream consistently in both places.
353-379:⚠️ Potential issue | 🟠 MajorKeep the initial connect under supervisor ownership.
start_runtime()still awaitsadapter.start()before anAdapterRuntimeexists. If that future hangs, the caller keepslifecycle_mutexheld andremove_adapter()/shutdown()has nothing to abort. The supervisor needs to own the first start attempt too.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 353 - 379, Change start_runtime so it does not await adapter.start() directly; instead create/install the supervisor first and let the supervisor perform the initial start attempt so the first connect is owned/abortable by the supervisor. Concretely, replace the adapter.start().await + matching logic in start_runtime with a call to install_supervisor(name.clone(), Arc::clone(&adapter), None) (or install_supervisor(name, adapter, None)) so the supervisor owns the initial start; if you still need to surface the first-start error when surface_start_error is true, have install_supervisor return or expose a future/result for the initial start that start_runtime can optionally await, but do not perform adapter.start() inside start_runtime itself. Ensure references to remove_adapter(), shutdown(), and lifecycle_mutex remain compatible with this change.src/main.rs (1)
2218-2232:⚠️ Potential issue | 🟠 MajorDrop the old watcher before you spawn the replacement.
FileWatcherHandleonly begins shutdown inDrop(src/config/watcher.rs, Lines 20-29). Keeping it in_old_watchermeans the setup-mode watcher keeps running until the end of this arm, after the replacement watcher is already live, so both can process the same change burst against different runtime state. At minimum, drop the old handle beforespawn_file_watcher(...); ideally wait for the blocking task to exit once the handle exposes a joinable shutdown.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main.rs` around lines 2218 - 2232, The current code keeps the old FileWatcherHandle in _old_watcher until after calling spacebot::config::spawn_file_watcher, so the old watcher isn't dropped (and shutdown begun) until after the new watcher is running; change the order to take and drop the old handle before calling spawn_file_watcher: call let _old_watcher = _file_watcher.take(); explicitly drop(_old_watcher) (or let _old_watcher = _file_watcher.take(); if you want to then await a joinable shutdown, call its join/await method) before creating the new watcher with spacebot::config::spawn_file_watcher, and if FileWatcherHandle exposes a joinable shutdown API, await that to ensure the previous watcher has fully exited before assigning _file_watcher to the new handle.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/messaging/manager.rs`:
- Around line 203-207: reconcile_configured() is holding lifecycle_mutex and
then awaits remove_adapter(), which deadlocks because remove_adapter() re-locks
lifecycle_mutex; fix by performing the stale-adapter removal without awaiting a
function that re-acquires the lock — either inline the removal logic currently
in remove_adapter() directly in reconcile_configured() while using the existing
lifecycle_mutex guard, or create a private helper (e.g.,
remove_adapter_while_locked or remove_adapter_inner) that assumes the
lifecycle_mutex guard is already held and call that from reconcile_configured();
ensure any async awaits that would drop the guard are avoided or moved outside
the locked section.
- Around line 195-201: current_configured is being populated from self.adapters
(which includes shared adapters like the one registered by register_shared()),
causing reconcile to treat shared adapters as stale; change the snapshot to
iterate only the configured adapter set (the same source used to build desired)
instead of self.adapters. Replace the
self.adapters.read().await.keys().cloned().collect::<Vec<_>>() usage when
setting current_configured with the configured/desired keys (e.g.,
desired.keys().cloned().collect::<Vec<_>>() or the equivalent configured-adapter
collection your code uses) so the stale-sweep only considers adapters that are
actually configured.
---
Duplicate comments:
In `@src/main.rs`:
- Around line 2218-2232: The current code keeps the old FileWatcherHandle in
_old_watcher until after calling spacebot::config::spawn_file_watcher, so the
old watcher isn't dropped (and shutdown begun) until after the new watcher is
running; change the order to take and drop the old handle before calling
spawn_file_watcher: call let _old_watcher = _file_watcher.take(); explicitly
drop(_old_watcher) (or let _old_watcher = _file_watcher.take(); if you want to
then await a joinable shutdown, call its join/await method) before creating the
new watcher with spacebot::config::spawn_file_watcher, and if FileWatcherHandle
exposes a joinable shutdown API, await that to ensure the previous watcher has
fully exited before assigning _file_watcher to the new handle.
In `@src/messaging/manager.rs`:
- Around line 82-93: The code is currently discarding errors from stop_runtime
by calling .ok(); update all occurrences (e.g., the calls inside the adapter
registration flow around stop_runtime(&name, old_adapter, false).await.ok()) to
handle the Result instead of swallowing it: await the stop_runtime call, check
for Err and either log the error with tracing::warn!/error! including the
adapter name/context or propagate the error up so callers can react (same
treatment for the similar occurrences at the other locations you noted). Ensure
you reference the same parameters (name, old_adapter, boolean flag) when logging
so supervisors can detect overlapping runtimes.
- Around line 553-564: seed_configured_fingerprints_from_registered currently
writes empty strings into configured_fingerprints which causes
reconcile_configured to treat everything as changed; instead populate
configured_fingerprints with the actual fingerprints from the registered adapter
objects (or reuse the same creation path that produces ConfiguredAdapter
fingerprints). Update seed_configured_fingerprints_from_registered to read the
registered adapters map (e.g., registered_adapters.read().await or the structure
holding adapter state), get each adapter's real fingerprint (e.g.,
ConfiguredAdapter::fingerprint or the field/method that returns the fingerprint)
and insert that value into configured_fingerprints.entry(name).or_insert_with(||
actual_fingerprint), or change the function signature to accept a map of
name->fingerprint and seed from that, so reconcile_configured no longer sees all
adapters as changed on startup.
- Around line 285-299: The code resets next_retry_delay to INITIAL_RETRY_DELAY
as soon as start_result returns Ok(stream), but that incorrectly collapses
backoff for immediate-EOF/None streams; instead, remove the immediate reset
inside the Ok(start) branch and only reset next_retry_delay after the stream has
proven healthy (e.g., after receiving the first successful item/event from the
stream or otherwise confirming it is not an immediate EOF). Update the logic
around the match on start_result (the Ok(stream) branch where stream is used)
and the analogous block later (the second match handling start_result) so the
reset happens after a confirmed successful read from stream rather than on start
success. Ensure you reference and adjust next_retry_delay, INITIAL_RETRY_DELAY,
start_result, start(), and stream consistently in both places.
- Around line 353-379: Change start_runtime so it does not await adapter.start()
directly; instead create/install the supervisor first and let the supervisor
perform the initial start attempt so the first connect is owned/abortable by the
supervisor. Concretely, replace the adapter.start().await + matching logic in
start_runtime with a call to install_supervisor(name.clone(),
Arc::clone(&adapter), None) (or install_supervisor(name, adapter, None)) so the
supervisor owns the initial start; if you still need to surface the first-start
error when surface_start_error is true, have install_supervisor return or expose
a future/result for the initial start that start_runtime can optionally await,
but do not perform adapter.start() inside start_runtime itself. Ensure
references to remove_adapter(), shutdown(), and lifecycle_mutex remain
compatible with this change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1c147ab7-5db5-4f84-93ba-b6d61c98bcd8
📒 Files selected for processing (4)
src/config.rssrc/config/watcher.rssrc/main.rssrc/messaging/manager.rs
| let mut first_error = None; | ||
| for name in current_configured { | ||
| if !desired_names.contains(&name) | ||
| && let Err(error) = self.remove_adapter(&name).await | ||
| { |
There was a problem hiding this comment.
This stale-removal path deadlocks under the lifecycle lock.
reconcile_configured() already holds lifecycle_mutex, then Line 206 awaits remove_adapter(), which tries to lock the same mutex again. The first stale adapter hangs reconciliation permanently; inline the removal here or add a private helper that assumes the guard is already held.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/messaging/manager.rs` around lines 203 - 207, reconcile_configured() is
holding lifecycle_mutex and then awaits remove_adapter(), which deadlocks
because remove_adapter() re-locks lifecycle_mutex; fix by performing the
stale-adapter removal without awaiting a function that re-acquires the lock —
either inline the removal logic currently in remove_adapter() directly in
reconcile_configured() while using the existing lifecycle_mutex guard, or create
a private helper (e.g., remove_adapter_while_locked or remove_adapter_inner)
that assumes the lifecycle_mutex guard is already held and call that from
reconcile_configured(); ensure any async awaits that would drop the guard are
avoided or moved outside the locked section.
|
Follow-up for commit This pass fixes the remaining live What changed:
Validation rerun locally after the fix:
Result:
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/main.rs (1)
2219-2233:⚠️ Potential issue | 🟠 MajorDrop the old watcher before spawning the replacement.
let _old_watcher = _file_watcher.take();keeps the old handle alive until the end of this match arm, soFileWatcherHandle::drop()does not send the shutdown signal until after the new watcher is already running. That still leaves a duplicate-watcher window.♻️ Proposed fix
- let _old_watcher = _file_watcher.take(); + drop(_file_watcher.take()); _file_watcher = Some(spacebot::config::spawn_file_watcher(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main.rs` around lines 2219 - 2233, The current code keeps the old FileWatcherHandle alive until after spawn_file_watcher runs by assigning let _old_watcher = _file_watcher.take(); — drop the old watcher immediately before calling spawn_file_watcher so FileWatcherHandle::drop sends the shutdown signal prior to starting the replacement; e.g., take the option from _file_watcher into a temporary (or if let Some(old) = _file_watcher.take() { drop(old); }) then call spacebot::config::spawn_file_watcher(...) and assign its handle to _file_watcher.
♻️ Duplicate comments (2)
src/messaging/manager.rs (2)
206-209:⚠️ Potential issue | 🔴 CriticalThis stale-adapter loop still self-deadlocks.
reconcile_configured()already holdslifecycle_mutex, then awaitsremove_adapter(), which tries to lock the same mutex again on Line 533. The first stale adapter will hang reconciliation permanently; this needs an inner removal helper that assumes the lifecycle lock is already held.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 206 - 209, reconcile_configured() currently holds lifecycle_mutex and awaits remove_adapter(), which deadlocks because remove_adapter() tries to lock lifecycle_mutex again; extract the core removal logic into a new internal helper (e.g., remove_adapter_locked or remove_adapter_inner) that assumes the lifecycle_mutex is already held, update reconcile_configured() to call this helper directly when it holds the mutex, and change remove_adapter() to acquire lifecycle_mutex then call the same helper so external callers keep existing behavior; ensure the helper is async and returns the same Result/Err shape as remove_adapter() so error handling remains unchanged.
197-203:⚠️ Potential issue | 🔴 CriticalStale removal still includes shared adapters.
current_configuredis collected fromself.adapters, so reconciliation still sweeps non-config runtimes too.src/main.rsalso registerswebchatviaregister_shared(), and it never appears indesired, so the next reconcile will tear it down as stale unless this loop uses a configured-only set.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 197 - 203, current_configured is currently built from self.adapters and thus includes adapters registered via register_shared (e.g., webchat), causing them to be treated as stale; change the collection to only include configured (non-shared) adapters by filtering out shared registrations when reading self.adapters (e.g., replace the keys().cloned().collect with an iterator that filters by adapter.shared == false or !adapter.is_shared(), then maps to the keys), or alternatively intersect with the desired/configured set if you have one; ensure you reference the same map used in the diff (self.adapters.read().await) and exclude entries registered via register_shared().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/messaging/manager.rs`:
- Around line 373-395: The initial-start wait currently does a
potentially-unbounded await on the oneshot receiver (first_start_result_rx /
rx.await) inside register_and_start() / reconcile_configured() while holding
lifecycle_mutex, which can deadlock if adapter.start() hangs; wrap the receive
in a bounded timeout (e.g., use tokio::time::timeout with a sensible duration)
when awaiting the receiver returned by install_supervisor, and on timeout return
an Err indicating the initial start timed out so the API/reload path fails fast
while the supervisor continues retrying in the background; update the match
handling for rx.await to handle the timeout case and include the adapter name in
the returned error.
- Around line 294-301: The channel send results on first_start_result_tx are
being ignored with `let _ = ...`; update both send calls (the successful send of
Ok(()) and the error send of Err(error.to_string())) to use `.ok()` instead of
`let _ =` so we explicitly acknowledge a dropped receiver per the repo rule;
locate the sends where `first_start_result_tx.take()` is used in the
stream-starting code path and replace the `let _ = tx.send(...)` patterns with
`tx.send(...).ok()` ensuring the same messages (Ok(()) and
Err(error.to_string())) are sent.
---
Outside diff comments:
In `@src/main.rs`:
- Around line 2219-2233: The current code keeps the old FileWatcherHandle alive
until after spawn_file_watcher runs by assigning let _old_watcher =
_file_watcher.take(); — drop the old watcher immediately before calling
spawn_file_watcher so FileWatcherHandle::drop sends the shutdown signal prior to
starting the replacement; e.g., take the option from _file_watcher into a
temporary (or if let Some(old) = _file_watcher.take() { drop(old); }) then call
spacebot::config::spawn_file_watcher(...) and assign its handle to
_file_watcher.
---
Duplicate comments:
In `@src/messaging/manager.rs`:
- Around line 206-209: reconcile_configured() currently holds lifecycle_mutex
and awaits remove_adapter(), which deadlocks because remove_adapter() tries to
lock lifecycle_mutex again; extract the core removal logic into a new internal
helper (e.g., remove_adapter_locked or remove_adapter_inner) that assumes the
lifecycle_mutex is already held, update reconcile_configured() to call this
helper directly when it holds the mutex, and change remove_adapter() to acquire
lifecycle_mutex then call the same helper so external callers keep existing
behavior; ensure the helper is async and returns the same Result/Err shape as
remove_adapter() so error handling remains unchanged.
- Around line 197-203: current_configured is currently built from self.adapters
and thus includes adapters registered via register_shared (e.g., webchat),
causing them to be treated as stale; change the collection to only include
configured (non-shared) adapters by filtering out shared registrations when
reading self.adapters (e.g., replace the keys().cloned().collect with an
iterator that filters by adapter.shared == false or !adapter.is_shared(), then
maps to the keys), or alternatively intersect with the desired/configured set if
you have one; ensure you reference the same map used in the diff
(self.adapters.read().await) and exclude entries registered via
register_shared().
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4d45fa55-66c3-4a99-8fa9-6a425d3bc805
📒 Files selected for processing (3)
src/api/messaging.rssrc/main.rssrc/messaging/manager.rs
| if let Some(tx) = first_start_result_tx.take() { | ||
| let _ = tx.send(Ok(())); | ||
| } | ||
| stream | ||
| } | ||
| Err(error) => { | ||
| if let Some(tx) = first_start_result_tx.take() { | ||
| let _ = tx.send(Err(error.to_string())); |
There was a problem hiding this comment.
Use the channel-send exception explicitly here.
If dropping the receiver is acceptable, prefer .ok() over let _ = so this follows the repository’s Result-handling rule.
As per coding guidelines "Don't silently discard errors. No let _ = on Results. Handle them, log them, or propagate them. The only exception is .ok() on channel sends where the receiver may be dropped".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/messaging/manager.rs` around lines 294 - 301, The channel send results on
first_start_result_tx are being ignored with `let _ = ...`; update both send
calls (the successful send of Ok(()) and the error send of
Err(error.to_string())) to use `.ok()` instead of `let _ =` so we explicitly
acknowledge a dropped receiver per the repo rule; locate the sends where
`first_start_result_tx.take()` is used in the stream-starting code path and
replace the `let _ = tx.send(...)` patterns with `tx.send(...).ok()` ensuring
the same messages (Ok(()) and Err(error.to_string())) are sent.
| let first_start_result_rx = if surface_start_error { | ||
| let (tx, rx) = oneshot::channel(); | ||
| self.install_supervisor(name.clone(), adapter, Some(tx)) | ||
| .await; | ||
| Some(rx) | ||
| } else { | ||
| self.install_supervisor(name.clone(), adapter, None).await; | ||
| None | ||
| }; | ||
|
|
||
| if let Some(rx) = first_start_result_rx { | ||
| match rx.await { | ||
| Ok(Ok(())) => {} | ||
| Ok(Err(error)) => { | ||
| return Err(anyhow::anyhow!("failed to start adapter '{name}': {error}").into()); | ||
| } | ||
| Err(error) => { | ||
| return Err(anyhow::anyhow!( | ||
| "failed to receive initial start result for '{name}': {error}" | ||
| ) | ||
| .into()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Bound the first-start wait.
register_and_start() and reconcile_configured() call this while holding lifecycle_mutex. If adapter.start() hangs, rx.await never completes, the caller hangs, and remove_adapter() cannot acquire the mutex to stop the supervisor. A timeout here would let the API/reload path fail fast while the supervisor keeps retrying in the background.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/messaging/manager.rs` around lines 373 - 395, The initial-start wait
currently does a potentially-unbounded await on the oneshot receiver
(first_start_result_rx / rx.await) inside register_and_start() /
reconcile_configured() while holding lifecycle_mutex, which can deadlock if
adapter.start() hangs; wrap the receive in a bounded timeout (e.g., use
tokio::time::timeout with a sensible duration) when awaiting the receiver
returned by install_supervisor, and on timeout return an Err indicating the
initial start timed out so the API/reload path fails fast while the supervisor
continues retrying in the background; update the match handling for rx.await to
handle the timeout case and include the adapter name in the returned error.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/messaging/manager.rs (1)
383-395:⚠️ Potential issue | 🟠 MajorBound the first-start wait to prevent deadlock.
This
rx.awaithas no timeout. Ifadapter.start()hangs in the supervisor, this blocks indefinitely while holdinglifecycle_mutex. Sinceremove_adapter()also needs that mutex, there's no way to cancel a hanging adapter — causing a deadlock.Consider wrapping the receive in
tokio::time::timeoutso the API/reload path can fail fast while the supervisor continues retrying in the background.🛡️ Proposed fix with timeout
+ const FIRST_START_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + if let Some(rx) = first_start_result_rx { - match rx.await { - Ok(Ok(())) => {} - Ok(Err(error)) => { - return Err(anyhow::anyhow!("failed to start adapter '{name}': {error}").into()); - } - Err(error) => { - return Err(anyhow::anyhow!( - "failed to receive initial start result for '{name}': {error}" - ) - .into()); + match tokio::time::timeout(FIRST_START_TIMEOUT, rx).await { + Ok(Ok(Ok(()))) => {} + Ok(Ok(Err(error))) => { + return Err(anyhow::anyhow!("failed to start adapter '{name}': {error}").into()); + } + Ok(Err(error)) => { + return Err(anyhow::anyhow!( + "failed to receive initial start result for '{name}': {error}" + ) + .into()); + } + Err(_) => { + tracing::warn!(adapter = %name, "initial start timed out; supervisor will keep retrying"); } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 383 - 395, The await on first_start_result_rx can block indefinitely and cause a deadlock while holding lifecycle_mutex; wrap the receive in tokio::time::timeout (e.g., tokio::time::Duration::from_secs configurable) when awaiting first_start_result_rx in the block that follows adapter.start(), map a timeout (Elapsed) into an Err return that surfaces a fast-failing API/reload error (including the adapter name) while allowing the supervisor to keep retrying in the background; ensure the change references first_start_result_rx and the matching arms (Ok(Ok(())), Ok(Err(error)), Err(error)) so the timeout case is handled alongside those and remove_adapter()/supervisor can still acquire lifecycle_mutex.
🧹 Nitpick comments (1)
src/messaging/manager.rs (1)
294-301: Use.ok()for channel sends per coding guidelines.These oneshot channel sends use
let _ =to discard the result. Per coding guidelines, the exception for channel sends where the receiver may be dropped should use.ok()explicitly.♻️ Proposed fix
tracing::info!(adapter = %name, "adapter started successfully"); if let Some(tx) = first_start_result_tx.take() { - let _ = tx.send(Ok(())); + tx.send(Ok(())).ok(); } stream } Err(error) => { if let Some(tx) = first_start_result_tx.take() { - let _ = tx.send(Err(error.to_string())); + tx.send(Err(error.to_string())).ok(); }As per coding guidelines: "Don't silently discard errors. No
let _ =on Results. The only exception is.ok()on channel sends where the receiver may be dropped".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/messaging/manager.rs` around lines 294 - 301, Replace the silent `let _ = tx.send(...)` calls with the explicit `.ok()` pattern for the oneshot channel sends: locate the `first_start_result_tx.take()` branches where `tx.send(Ok(()))` and `tx.send(Err(error.to_string()))` are invoked and change those lines to `tx.send(...).ok();` so the send result is intentionally discarded per the coding guideline.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/messaging/manager.rs`:
- Around line 383-395: The await on first_start_result_rx can block indefinitely
and cause a deadlock while holding lifecycle_mutex; wrap the receive in
tokio::time::timeout (e.g., tokio::time::Duration::from_secs configurable) when
awaiting first_start_result_rx in the block that follows adapter.start(), map a
timeout (Elapsed) into an Err return that surfaces a fast-failing API/reload
error (including the adapter name) while allowing the supervisor to keep
retrying in the background; ensure the change references first_start_result_rx
and the matching arms (Ok(Ok(())), Ok(Err(error)), Err(error)) so the timeout
case is handled alongside those and remove_adapter()/supervisor can still
acquire lifecycle_mutex.
---
Nitpick comments:
In `@src/messaging/manager.rs`:
- Around line 294-301: Replace the silent `let _ = tx.send(...)` calls with the
explicit `.ok()` pattern for the oneshot channel sends: locate the
`first_start_result_tx.take()` branches where `tx.send(Ok(()))` and
`tx.send(Err(error.to_string()))` are invoked and change those lines to
`tx.send(...).ok();` so the send result is intentionally discarded per the
coding guideline.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6dcb3570-2b84-4497-89fd-1667edd643f8
📒 Files selected for processing (1)
src/messaging/manager.rs
What?
This PR turns messaging hot reload from an additive-only "start missing adapters" path into an actual desired-state reconciler.
Before this change:
ArcSwaps,MessagingManager.After this change:
MessagingManagerowns runtime supervision for each adapter via a manager-owned supervisor task,DefaultHasher,Why?
The old behavior made config reload operationally unsafe:
That means the runtime state could diverge from
config.tomlfor long periods. This PR fixes that specific high-risk class of drift.How?
1. Manager-owned adapter supervision
Files:
src/messaging/manager.rssrc/messaging.rsMessagingManagernow owns:Each running adapter now gets a supervisor task that:
This replaces the earlier split model of detached forwarders plus detached retry tasks with no lifecycle owner.
2. Desired-state reconciliation in the watcher
File:
src/config/watcher.rsThe watcher now builds a desired set of configurable adapters and calls
MessagingManager::reconcile_configured(...)instead of only hot-starting missing ones.Reconciliation handles:
The adapter fingerprint includes the runtime-relevant config for that adapter and its effective permissions, so binding-driven permission changes also participate in replacement.
3. Stable Twitch token filename helper
Files:
src/config/types.rssrc/config/watcher.rssrc/main.rsNamed Twitch adapters now use a stable SHA-256-based filename helper rather than
DefaultHasher, whose output is not stable across std/compiler releases.4. File watcher replacement safety
File:
src/main.rsWhen provider setup transitions from setup mode to initialized agents, the old watcher handle is now aborted before a new watcher is installed. This avoids duplicate watcher tasks continuing to operate after replacement.
5. Docs
File:
docs/content/docs/(configuration)/config.mdxUpdated the hot-reload docs to reflect that messaging adapter runtime is now reconciled live instead of requiring restart for adapter config changes.
Standards / Docs consulted
Current primary references used while designing the lifecycle model:
JoinHandle::abort/ task cancellation semantics: docs.rs/tokio 1.49 JoinHandlewatchchannel behavior for cooperative shutdown signaling: docs.rs/tokio 1.49 watchDefaultHasheris not stable across releases: std::collections::hash_map::DefaultHasherLocal repo docs also informed the design:
docs/design-docs/named-messaging-adapters.mddocs/content/docs/(configuration)/config.mdxdocs/docker.md(checked for container/runtime implications; no Docker-specific behavioral changes were needed beyond existing runtime assumptions)Testing
Added focused manager tests for:
Validation run locally with a pinned local
protocbinary:cargo fmtPROTOC=/tmp/protoc-bin/bin/protoc cargo check --all-targetsPROTOC=/tmp/protoc-bin/bin/protoc cargo clippy --all-targets -- -D warningsPROTOC=/tmp/protoc-bin/bin/protoc cargo test --lib -qResult:
494 passed; 0 failedScope notes
This PR is intentionally focused on runtime adapter reconciliation/supervision. It does not attempt to redesign unrelated admin-plane or memory-system issues from other branches/PRs.
Note
Messaging adapter lifecycle is now fully reconciled on config reload.
MessagingManagerowns supervisor tasks for each adapter and restarts them when config changes or streams terminate unexpectedly. Adapters can now be safely added, modified, or removed without requiring a process restart. Twitch named-instance token filenames now use stable SHA-256 hashing instead of platform-dependentDefaultHasher. All adapter lifecycle changes are validated with unit tests.Written by Tembo for commit 0b2b102. This will update automatically on new commits.