Skip to content

fix(messaging): reconcile adapter runtime state#372

Open
PyRo1121 wants to merge 5 commits intospacedriveapp:mainfrom
PyRo1121:codex/messaging-reconciler
Open

fix(messaging): reconcile adapter runtime state#372
PyRo1121 wants to merge 5 commits intospacedriveapp:mainfrom
PyRo1121:codex/messaging-reconciler

Conversation

@PyRo1121
Copy link
Contributor

@PyRo1121 PyRo1121 commented Mar 8, 2026

What?

This PR turns messaging hot reload from an additive-only "start missing adapters" path into an actual desired-state reconciler.

Before this change:

  • config reload only updated some shared permission ArcSwaps,
  • newly-enabled adapters could be hot-started,
  • existing adapters with changed config were skipped,
  • removed/disabled adapters stayed alive,
  • failed-start retry tasks and ended adapter streams were not lifecycle-owned by MessagingManager.

After this change:

  • MessagingManager owns runtime supervision for each adapter via a manager-owned supervisor task,
  • adapter streams are restarted after failed starts and unexpected stream termination,
  • config reload reconciles the desired set of configurable adapters against the running set,
  • changed adapters are replaced, removed adapters are shut down, and missing adapters are started,
  • Twitch named-instance token filenames use a stable hash instead of DefaultHasher,
  • provider-setup transition now aborts the old file watcher before replacing it.

Why?

The old behavior made config reload operationally unsafe:

  • changing a token, Slack commands, Twitch channels, or email config often did nothing until restart,
  • disabling or deleting adapters in config did not stop already-running adapters,
  • named-instance permission changes were under-reconciled,
  • an adapter that died after startup could remain permanently down,
  • failed retry tasks were not owned by the manager and could outlive lifecycle changes.

That means the runtime state could diverge from config.toml for long periods. This PR fixes that specific high-risk class of drift.

How?

1. Manager-owned adapter supervision

Files:

  • src/messaging/manager.rs
  • src/messaging.rs

MessagingManager now owns:

  • the adapter registry,
  • per-runtime-key supervisor tasks,
  • retry/restart loops for adapters,
  • configured-adapter fingerprint tracking for reconciler decisions.

Each running adapter now gets a supervisor task that:

  • attempts startup,
  • retries failed starts with exponential backoff,
  • forwards inbound messages into the shared fan-in channel,
  • restarts the adapter when the inbound stream ends unexpectedly,
  • can be stopped authoritatively via manager-owned shutdown/watch state.

This replaces the earlier split model of detached forwarders plus detached retry tasks with no lifecycle owner.

2. Desired-state reconciliation in the watcher

File:

  • src/config/watcher.rs

The watcher now builds a desired set of configurable adapters and calls MessagingManager::reconcile_configured(...) instead of only hot-starting missing ones.

Reconciliation handles:

  • add missing adapters,
  • replace adapters whose runtime fingerprint changed,
  • remove adapters no longer present in config.

The adapter fingerprint includes the runtime-relevant config for that adapter and its effective permissions, so binding-driven permission changes also participate in replacement.

3. Stable Twitch token filename helper

Files:

  • src/config/types.rs
  • src/config/watcher.rs
  • src/main.rs

Named Twitch adapters now use a stable SHA-256-based filename helper rather than DefaultHasher, whose output is not stable across std/compiler releases.

4. File watcher replacement safety

File:

  • src/main.rs

When provider setup transitions from setup mode to initialized agents, the old watcher handle is now aborted before a new watcher is installed. This avoids duplicate watcher tasks continuing to operate after replacement.

5. Docs

File:

  • docs/content/docs/(configuration)/config.mdx

Updated the hot-reload docs to reflect that messaging adapter runtime is now reconciled live instead of requiring restart for adapter config changes.

Standards / Docs consulted

Current primary references used while designing the lifecycle model:

Local repo docs also informed the design:

  • docs/design-docs/named-messaging-adapters.md
  • docs/content/docs/(configuration)/config.mdx
  • docs/docker.md (checked for container/runtime implications; no Docker-specific behavioral changes were needed beyond existing runtime assumptions)

Testing

Added focused manager tests for:

  • retry/restart after failed start and stream end,
  • stopping retry supervision when an adapter is removed.

Validation run locally with a pinned local protoc binary:

  • cargo fmt
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo check --all-targets
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo clippy --all-targets -- -D warnings
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo test --lib -q

Result:

  • 494 passed; 0 failed

Scope notes

This PR is intentionally focused on runtime adapter reconciliation/supervision. It does not attempt to redesign unrelated admin-plane or memory-system issues from other branches/PRs.


Note

Messaging adapter lifecycle is now fully reconciled on config reload. MessagingManager owns supervisor tasks for each adapter and restarts them when config changes or streams terminate unexpectedly. Adapters can now be safely added, modified, or removed without requiring a process restart. Twitch named-instance token filenames now use stable SHA-256 hashing instead of platform-dependent DefaultHasher. All adapter lifecycle changes are validated with unit tests.

Written by Tembo for commit 0b2b102. This will update automatically on new commits.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 8, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Refactors adapter hot-reload into a reconcile model: adds FileWatcherHandle, deterministic Twitch token naming, adapter fingerprinting, per-adapter supervisor runtimes with backoff, build_desired_configured_adapters(), and MessagingManager::reconcile_configured to apply changes atomically.

Changes

Cohort / File(s) Summary
Documentation
docs/content/docs/(configuration)/config.mdx
Replaced "Discord/Slack permissions" row with "Messaging adapters (Discord/Slack/Telegram/Twitch/Email/Webhook)" and documented live adapter reconciliation and per-instance restart/removal behavior.
Config types & token naming
src/config/types.rs
Added named_twitch_token_file_name(name: &str) and use sha2::{Digest, Sha256}; to generate safe, truncated, hashed per-instance Twitch token filenames.
File watcher & reconciliation
src/config/watcher.rs
Added FileWatcherHandle (Drop-based shutdown); spawn_file_watcher now returns the handle; introduced shutdown channel and responsive debounce loop; added build_desired_configured_adapters(), deterministic fingerprinting helpers (secret_fingerprint, sorted_* utilities), and now calls MessagingManager::reconcile_configured(...) on config changes.
Main wiring
src/main.rs
File watcher stored as Option<FileWatcherHandle>; spawn calls updated and wrapped in Some(...); Twitch token path now uses named_twitch_token_file_name(); seeds messaging fingerprints after setup; watcher reinitialized on provider reload.
Messaging exports
src/messaging.rs
Re-exported ConfiguredAdapter alongside MessagingManager, expanding the public API surface used for reconciliation.
Messaging manager & runtimes
src/messaging/manager.rs
Added public ConfiguredAdapter and reconcile_configured(...); introduced per-adapter runtimes, supervisors with exponential backoff, runtime tracking (start/stop/remove), seed_configured_fingerprints_from_registered(), and tests covering supervisor/reconciliation behavior.
Config re-exports
src/config.rs
Now re-exports FileWatcherHandle in addition to spawn_file_watcher.
API token cleanup
src/api/messaging.rs
Replaced ad-hoc Twitch token filename construction with crate::config::named_twitch_token_file_name(name) in multiple endpoints.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • spacedriveapp/spacebot PR 306: Overlaps on src/config/watcher.rs API changes (spawn_file_watcherFileWatcherHandle), fingerprinting and token-naming refactor.
  • spacedriveapp/spacebot PR 249: Related per-instance messaging adapter work and Twitch token filename generation; touches adapter registration/reconciliation helpers.
  • spacedriveapp/spacebot PR 271: Related adapter surface changes and public API modifications around adapter metadata and messaging exports.
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix(messaging): reconcile adapter runtime state' directly describes the main change: implementing runtime adapter state reconciliation for messaging hot-reload, which aligns with the core objective of transforming messaging from additive-only to desired-state reconciliation.
Description check ✅ Passed The PR description is comprehensive and directly related to the changeset, covering the what/why/how of the reconciliation system, manager-owned supervision, fingerprinting, Twitch token handling, and file watcher safety with supporting evidence from testing and scope notes.
Docstring Coverage ✅ Passed Docstring coverage is 83.67% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

if !email_config.imap_host.is_empty() {
let fingerprint = format!(
"imap_host={};imap_port={};imap_username={};imap_password={};imap_use_tls={};smtp_host={};smtp_port={};smtp_username={};smtp_password={};smtp_use_starttls={};from_address={};from_name={:?};poll_interval_secs={};folders={:?};allowed_senders={:?};max_body_bytes={};max_attachment_bytes={}",
email_config.imap_host,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fingerprints embed raw secrets (passwords/tokens/auth tokens). Since the fingerprint is stored in MessagingManager (and can be easy to accidentally log/surface later), it’d be safer to hash/redact sensitive fields (e.g. sha256(token)), rather than including them verbatim.

/// Build the persisted token filename for a named Twitch adapter instance.
pub fn named_twitch_token_file_name(name: &str) -> String {
let safe_name: String = name
.chars()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth bounding safe_name length to avoid path/filename length issues if someone gives a very long instance name.

Suggested change
.chars()
let safe_name: String = name
.chars()
.map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' })
.take(64)
.collect();

.await
.insert(name.clone(), Arc::clone(&adapter));
if self.started.load(Ordering::SeqCst) {
self.stop_runtime(&name, old_adapter, false).await.ok();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: started is loaded twice here. Caching the value once reads cleaner and avoids any future footguns if started ever changes semantics.

Suggested change
self.stop_runtime(&name, old_adapter, false).await.ok();
let started = self.started.load(Ordering::SeqCst);
if started {
self.stop_runtime(&name, old_adapter, false).await.ok();
}
if started && let Err(error) = self.start_runtime(name.clone(), adapter, false).await {
tracing::warn!(adapter = %name, %error, "failed to start adapter registered after manager start");
}

let _stream = manager.start().await.expect("start manager");

tokio::time::sleep(std::time::Duration::from_millis(40)).await;
let before_remove = adapter.start_calls.load(Ordering::SeqCst);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks a bit timing-sensitive (sleep(40ms) / sleep(50ms)) and could flake on slow/loaded CI. It might be more robust to have TestAdapter::start() signal each call via a channel/Notify, then assert no further signals arrive after remove_adapter().

.cloned()
.collect::<Vec<_>>();

let mut first_error = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One edge case here: stale-removal only considers configured_fingerprints keys. Since startup registers adapters but doesn’t seed fingerprints, the first config reload that disables/removes an adapter won’t remove it (because current_configured is empty and the adapter isn’t in desired).

Seems worth either seeding configured_fingerprints during initial adapter setup, or basing the removal sweep on the running self.adapters set (at least on the first reconcile).

src/main.rs Outdated
// Restart file watcher with the new agent data
_file_watcher = spacebot::config::spawn_file_watcher(
if let Some(old_watcher) = _file_watcher.take() {
old_watcher.abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads up: spawn_file_watcher() uses tokio::task::spawn_blocking, so JoinHandle::abort() likely won’t actually stop the blocking watcher loop once it’s running. If the goal is to avoid duplicate watchers, you may need an explicit shutdown signal inside spawn_file_watcher (e.g. watch/CancellationToken + periodic check) so the loop can exit and drop the notify watcher.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/main.rs (1)

2221-2234: ⚠️ Potential issue | 🟠 Major

Update api_state's LlmManager when config reloads with new provider credentials.

When providers are reconfigured, new_llm_manager is created and passed to agents and the file watcher, but api_state retains the startup instance. API handlers calling state.llm_manager.read().await will continue using stale credentials until restart.

🩹 Minimal fix
                            Ok(new_llm) => {
                                let new_llm_manager = Arc::new(new_llm);
+                               api_state.set_llm_manager(new_llm_manager.clone()).await;
                                // Update agent_humans from the reloaded config
                                // before initialize_agents so agents see the
                                // latest [[humans]] entries.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 2221 - 2234, The api_state's LlmManager must be
replaced when config reloads so API handlers stop using stale credentials: after
creating new_llm_manager (the value passed into agents and spawn_file_watcher)
acquire a write lock on api_state.llm_manager (or the shared state variable
named api_state/state) and assign/replace it with new_llm_manager.clone() (or
swap the Arc/Box underlying the RwLock) so subsequent
state.llm_manager.read().await returns the updated manager; ensure cloning/Arc
semantics match existing LlmManager type to avoid ownership issues.
🧹 Nitpick comments (3)
src/config/watcher.rs (1)

46-60: Prefer .ok() for the channel send here.

Dropping the send error is fine if the receiver is gone, but the repo guideline calls out channel sends as the case that should use .ok() rather than let _ =.

♻️ Small cleanup
-                            let _ = tx.send(event);
+                            tx.send(event).ok();
...
-                            let _ = tx.send(event);
+                            tx.send(event).ok();

As per coding guidelines, "Don't silently discard errors. No let _ = on Results. Handle them, log them, or propagate them. The only exception is .ok() on channel sends where the receiver may be dropped"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/config/watcher.rs` around lines 46 - 60, The channel send results in this
watcher closure use `let _ = tx.send(event);` which silently discards the
Result; replace each `let _ = tx.send(event);` in the notify closure (the
closure passed to notify::recommended_watcher in src/config/watcher.rs that
matches on EventKind::Create, Modify(Data), Remove and Modify(Any)) with
`tx.send(event).ok();` so the send error is explicitly converted to Option and
dropped per repo guideline.
src/messaging/manager.rs (2)

85-85: Don't discard stop_runtime() results at these call sites.

These .ok() calls make future lifecycle regressions invisible here. Please either log unexpected Errs explicitly or split stop_runtime() into strict/non-strict APIs instead of dropping the Result.

As per coding guidelines, "Don't silently discard errors. No let _ = on Results. Handle them, log them, or propagate them. The only exception is .ok() on channel sends where the receiver may be dropped"

Also applies to: 105-105, 159-159, 223-225

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` at line 85, The calls that end with .ok() on the
Result returned by self.stop_runtime(...) must not silently discard errors;
replace those .ok() uses (the calls to stop_runtime with the signature
stop_runtime(&name, old_adapter, false).await) with explicit handling: await the
Result and either propagate the error or log it (e.g., if let Err(e) =
self.stop_runtime(...).await { tracing::error!(%name, adapter=?old_adapter,
error=%e, "failed to stop runtime"); } ). Do this at every call site flagged
(the multiple invocations of stop_runtime in manager.rs) — alternatively, if you
need two behaviors, provide two API variants (e.g., stop_runtime_strict /
stop_runtime_best_effort) and use the appropriate one instead of dropping the
Result.

11-12: Use full sender/receiver names in this lifecycle code.

tx/rx make this already-concurrent path harder to scan and don't match the repo naming rule. Renaming them while this API is still new will make the supervisor/shutdown flow easier to follow.

As per coding guidelines, "Don't abbreviate variable names. Use queue not q, message not msg, channel not ch. Common abbreviations like config are fine"

Also applies to: 25-26, 55-57, 259-260, 361-366

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/config/types.rs`:
- Around line 1558-1567: The sanitized filename prefix generated in
named_twitch_token_file_name can be arbitrarily long; truncate the safe_name
after sanitizing (e.g., limit to a fixed number of characters like 32) before
combining it with hash_prefix so the resulting filename stays within filesystem
limits while preserving uniqueness via the hash; keep the existing sanitization
logic (safe_name), then slice/truncate that string and use the truncated value
when building the final format!("twitch_token_{safe_name}_{hash_prefix}.json").

In `@src/main.rs`:
- Around line 2218-2220: The current code calls JoinHandle::abort() on
_file_watcher which doesn't stop a spawn_blocking watcher; change to cooperative
shutdown by adding a cancellation signal (e.g., an Arc<AtomicBool> or
tokio::sync::watch/oneshot) that you pass into spawn_file_watcher when creating
the blocking task, have the blocking closure (inside spawn_file_watcher)
periodically check that signal and exit cleanly when set, and when replacing the
watcher set the signal for the old watcher and then await its JoinHandle (await
_file_watcher.take().unwrap().await) to ensure it has exited before spawning the
new watcher; reference spawn_file_watcher, _file_watcher, and the JoinHandle
previously aborted.

In `@src/messaging/manager.rs`:
- Around line 121-139: The start() code snapshots adapters before setting
self.started, allowing concurrent register*/reconcile_configured to insert
adapters that see started==false and get skipped; fix by flipping the order so
the started flag is set before taking the adapters snapshot: set
self.started.store(true, Ordering::SeqCst) (and still validate fan_in_rx with
the same context check) prior to reading self.adapters and collecting into
adapters, then iterate and call start_runtime(name, adapter, false). Reference:
the symbols self.started, adapters snapshot, start_runtime, fan_in_rx, and
register*/reconcile_configured.
- Around line 277-316: The supervisor currently resets next_retry_delay to
INITIAL_RETRY_DELAY immediately when start_result is Ok(stream), which allows a
hot loop if the stream ends instantly; change the logic so that next_retry_delay
is only reset after the stream has proven healthy (e.g., after receiving the
first message or surviving a short "healthy" interval) and treat an immediate
None from stream like a start failure by applying exponential backoff (using
next_retry_delay = (next_retry_delay * 2).min(MAX_RETRY_DELAY)) before
attempting start() again; update the branches around start_result, the loop
handling maybe_message/None, and the handling of stream to reference
start_result, stream, maybe_message, next_retry_delay, INITIAL_RETRY_DELAY,
MAX_RETRY_DELAY and fan_in_tx so the delay is not reset on short-lived streams.
- Around line 390-410: The current code returns Err(error) immediately when
adapter.shutdown().await fails (propagate_shutdown_error), which allows
remove_adapter()/reconcile_configured() to run before the old supervisor task is
aborted/joined; change the shutdown sequence so you always finish tearing down
the runtime supervisor before returning the shutdown error: after
adapter.shutdown().await returns Err(error) still proceed to abort the runtime
task (runtime.task.abort()) and await runtime.task.await (logging any join
failures), and only then, if propagate_shutdown_error is true, return the
original shutdown error from adapter.shutdown(); reference adapter.shutdown(),
runtime.task.abort(), runtime.task.await, and propagate_shutdown_error in your
change.

---

Outside diff comments:
In `@src/main.rs`:
- Around line 2221-2234: The api_state's LlmManager must be replaced when config
reloads so API handlers stop using stale credentials: after creating
new_llm_manager (the value passed into agents and spawn_file_watcher) acquire a
write lock on api_state.llm_manager (or the shared state variable named
api_state/state) and assign/replace it with new_llm_manager.clone() (or swap the
Arc/Box underlying the RwLock) so subsequent state.llm_manager.read().await
returns the updated manager; ensure cloning/Arc semantics match existing
LlmManager type to avoid ownership issues.

---

Nitpick comments:
In `@src/config/watcher.rs`:
- Around line 46-60: The channel send results in this watcher closure use `let _
= tx.send(event);` which silently discards the Result; replace each `let _ =
tx.send(event);` in the notify closure (the closure passed to
notify::recommended_watcher in src/config/watcher.rs that matches on
EventKind::Create, Modify(Data), Remove and Modify(Any)) with
`tx.send(event).ok();` so the send error is explicitly converted to Option and
dropped per repo guideline.

In `@src/messaging/manager.rs`:
- Line 85: The calls that end with .ok() on the Result returned by
self.stop_runtime(...) must not silently discard errors; replace those .ok()
uses (the calls to stop_runtime with the signature stop_runtime(&name,
old_adapter, false).await) with explicit handling: await the Result and either
propagate the error or log it (e.g., if let Err(e) =
self.stop_runtime(...).await { tracing::error!(%name, adapter=?old_adapter,
error=%e, "failed to stop runtime"); } ). Do this at every call site flagged
(the multiple invocations of stop_runtime in manager.rs) — alternatively, if you
need two behaviors, provide two API variants (e.g., stop_runtime_strict /
stop_runtime_best_effort) and use the appropriate one instead of dropping the
Result.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 635ff3c5-7999-411b-8610-1de492c1e32f

📥 Commits

Reviewing files that changed from the base of the PR and between 203c9d9 and 0b2b102.

📒 Files selected for processing (6)
  • docs/content/docs/(configuration)/config.mdx
  • src/config/types.rs
  • src/config/watcher.rs
  • src/main.rs
  • src/messaging.rs
  • src/messaging/manager.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/config/watcher.rs (1)

244-272: ⚠️ Potential issue | 🟠 Major

Run reconciliation serially instead of detaching it.

This block fires reconcile_configured() in the background and returns immediately. Because reconciliation awaits adapter shutdown()/start(), a later file event can launch a second desired-state transition against the same manager before the first one finishes, and the detached task also survives when the watcher itself is aborted during provider transitions. Either block_on this from the watcher thread or guard reconciliation behind a single mutex.

💡 One straightforward way to keep reconciles ordered
-                    rt.spawn(async move {
-                        match build_desired_configured_adapters(
-                            &config,
-                            &instance_dir,
-                            discord_permissions,
-                            slack_permissions,
-                            telegram_permissions,
-                            twitch_permissions,
-                        ) {
-                            Ok(desired) => {
-                                if let Err(error) = manager.reconcile_configured(desired).await {
-                                    tracing::warn!(%error, "messaging adapter reconciliation encountered errors");
-                                }
-                            }
-                            Err(error) => {
-                                tracing::error!(%error, "failed to build desired messaging adapters from config change");
-                            }
-                        }
-                    });
+                    match build_desired_configured_adapters(
+                        &config,
+                        &instance_dir,
+                        discord_permissions,
+                        slack_permissions,
+                        telegram_permissions,
+                        twitch_permissions,
+                    ) {
+                        Ok(desired) => {
+                            rt.block_on(async {
+                                if let Err(error) = manager.reconcile_configured(desired).await {
+                                    tracing::warn!(%error, "messaging adapter reconciliation encountered errors");
+                                }
+                            });
+                        }
+                        Err(error) => {
+                            tracing::error!(%error, "failed to build desired messaging adapters from config change");
+                        }
+                    }
♻️ Duplicate comments (3)
src/messaging/manager.rs (3)

388-405: ⚠️ Potential issue | 🟠 Major

Always tear down the runtime before returning shutdown errors.

Line 392 returns before Lines 397-405 abort/join the supervisor. A failed adapter.shutdown() during removal or replacement can therefore leave the old task alive under the same adapter name. Capture the shutdown error, finish the runtime teardown, and only then return it.

💡 Suggested teardown ordering
-        if let Some(adapter) = adapter
-            && let Err(error) = adapter.shutdown().await
-        {
-            if propagate_shutdown_error {
-                return Err(error);
-            }
-            tracing::warn!(adapter = %name, %error, "failed to shut down adapter");
-        }
+        let shutdown_error = if let Some(adapter) = adapter {
+            match adapter.shutdown().await {
+                Ok(()) => None,
+                Err(error) => {
+                    if !propagate_shutdown_error {
+                        tracing::warn!(adapter = %name, %error, "failed to shut down adapter");
+                    }
+                    Some(error)
+                }
+            }
+        } else {
+            None
+        };
 
         if let Some(runtime) = runtime {
             runtime.task.abort();
             match runtime.task.await {
                 Ok(()) => {}
@@
                 }
             }
         }
 
+        if let Some(error) = shutdown_error
+            && propagate_shutdown_error
+        {
+            return Err(error);
+        }
+
         Ok(())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 388 - 405, The current code returns
immediately when adapter.shutdown().await returns Err(error), which can bypass
tearing down the supervisor task; instead, capture the shutdown error (e.g., let
shutdown_err = Some(error)) but do not return yet, always proceed to the runtime
teardown logic (call runtime.task.abort() and await runtime.task) and only after
the runtime join completes, if propagate_shutdown_error is true return the
captured error; otherwise log/warn as before. Update the block around
adapter.shutdown(), propagate_shutdown_error, runtime.task.abort(), and
runtime.task.await to follow this ordering and reference adapter.shutdown(),
propagate_shutdown_error, runtime.task.abort(), runtime.task.await, and the
adapter name variable (name).

266-279: ⚠️ Potential issue | 🟠 Major

Back off after immediate stream termination.

Lines 267 and 278 reset next_retry_delay as soon as start() returns Ok(_). If stream.next() then yields None on the first poll, Lines 312-314 jump straight back to adapter.start() with no sleep, which can hot-loop against a flaky provider. Only reset the delay after the stream proves healthy, and treat immediate EOF like a failed start.

Also applies to: 298-315

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 266 - 279, The code resets
next_retry_delay = INITIAL_RETRY_DELAY as soon as adapter.start() returns
Ok(stream), which allows a hot-loop if stream.next() yields None immediately;
change the logic so that an Ok from adapter.start() does NOT reset
next_retry_delay yet — only reset next_retry_delay after the stream proves
healthy by yielding at least one Some from stream.next(); if stream.next()
returns None on the first poll treat it like a failed start (fall through to the
backoff path and increase next_retry_delay) so that immediate EOF does not
bypass the sleep. Ensure this change is applied to the branch handling
start_result (where adapter.start() is matched) and the other similar branch
around initial_stream handling.

118-137: ⚠️ Potential issue | 🟠 Major

Serialize start() with adapter insertion/fingerprinting.

Any adapter inserted between Lines 119-125 and Line 133 sees started == false, so start_runtime() returns early, while Lines 231-234 can still record its fingerprint. After that, later reconciles treat it as unchanged even though no supervisor was ever installed. start(), register*(), and reconcile_configured() need one shared startup/reconfiguration gate here.

Also applies to: 217-234

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 118 - 137, start() races with adapter
insertion/fingerprinting because adapters can be added after you snapshot
adapters but before you set started/launch supervisors, causing start_runtime()
to skip supervisor install while later fingerprinting records it as unchanged;
fix by adding a single startup/reconfigure gate that serializes start(),
register*() and reconcile_configured(): acquire that gate in start() before
snapshotting self.adapters and set self.started under the same gate, then
release after all start_runtime(name, adapter, false) calls complete; ensure
register*() and reconcile_configured() also acquire the same gate when inserting
or computing fingerprints so they cannot insert adapters while start() is in
progress (use a new async Mutex/lock field or reuse an existing write lock to
protect adapter insertion/fingerprinting).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/config/types.rs`:
- Around line 1558-1568: Replace all manual constructions/deletions of
"twitch_token_{safe_name}.json" in messaging logic with the new helper
crate::config::named_twitch_token_file_name(name): use that function when
computing filenames for read/write/delete, and when migrating existing data,
detect the old filename variant and either move/rename it to the new name or
fall back to loading from the old file if the new one is absent; also ensure
cleanup removes both old and new filenames. Specifically, wherever code builds
or unlinks twitch token filenames (the sites that currently form
"twitch_token_{safe_name}.json"), call named_twitch_token_file_name(name) for
all operations and add a small compatibility step to check for and migrate the
legacy file to the new hashed name before writing or deleting.

In `@src/messaging/manager.rs`:
- Around line 324-349: start_runtime currently awaits adapter.start() before
calling install_supervisor, leaving the initial connect outside supervisor
control; change the flow so the supervisor owns the initial start attempt:
either call install_supervisor(name, Arc::clone(&adapter), None).await first to
publish an AdapterRuntime and have the supervisor spawn the task that performs
adapter.start() and stores the resulting stream, or create an abortable task
(e.g., tokio::spawn with an AbortHandle) for adapter.start() and pass that
handle/placeholder into install_supervisor so remove_adapter can abort it; apply
the same change to the other similar block that creates the initial_stream and
calls install_supervisor(name, adapter, Some(initial_stream)).await so no start
attempt happens outside supervisor ownership.
- Around line 79-90: The call to stop_runtime(&name, old_adapter,
false).await.ok() silently discards errors; change the call sites (the
stop_runtime invocations near the adapter registration and other noted ranges)
to handle the Result instead of calling .ok() — either propagate the error
(return/? from the surrounding async function) or log it with
tracing::warn!/error! including context (adapter name and the returned error) so
shutdown failures are visible; apply the same treatment to the other
stop_runtime calls and ensure start_runtime(...) error handling remains
consistent (the existing tracing::warn for start_runtime can be used as a
pattern).

---

Duplicate comments:
In `@src/messaging/manager.rs`:
- Around line 388-405: The current code returns immediately when
adapter.shutdown().await returns Err(error), which can bypass tearing down the
supervisor task; instead, capture the shutdown error (e.g., let shutdown_err =
Some(error)) but do not return yet, always proceed to the runtime teardown logic
(call runtime.task.abort() and await runtime.task) and only after the runtime
join completes, if propagate_shutdown_error is true return the captured error;
otherwise log/warn as before. Update the block around adapter.shutdown(),
propagate_shutdown_error, runtime.task.abort(), and runtime.task.await to follow
this ordering and reference adapter.shutdown(), propagate_shutdown_error,
runtime.task.abort(), runtime.task.await, and the adapter name variable (name).
- Around line 266-279: The code resets next_retry_delay = INITIAL_RETRY_DELAY as
soon as adapter.start() returns Ok(stream), which allows a hot-loop if
stream.next() yields None immediately; change the logic so that an Ok from
adapter.start() does NOT reset next_retry_delay yet — only reset
next_retry_delay after the stream proves healthy by yielding at least one Some
from stream.next(); if stream.next() returns None on the first poll treat it
like a failed start (fall through to the backoff path and increase
next_retry_delay) so that immediate EOF does not bypass the sleep. Ensure this
change is applied to the branch handling start_result (where adapter.start() is
matched) and the other similar branch around initial_stream handling.
- Around line 118-137: start() races with adapter insertion/fingerprinting
because adapters can be added after you snapshot adapters but before you set
started/launch supervisors, causing start_runtime() to skip supervisor install
while later fingerprinting records it as unchanged; fix by adding a single
startup/reconfigure gate that serializes start(), register*() and
reconcile_configured(): acquire that gate in start() before snapshotting
self.adapters and set self.started under the same gate, then release after all
start_runtime(name, adapter, false) calls complete; ensure register*() and
reconcile_configured() also acquire the same gate when inserting or computing
fingerprints so they cannot insert adapters while start() is in progress (use a
new async Mutex/lock field or reuse an existing write lock to protect adapter
insertion/fingerprinting).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b8bb49f9-38dc-4f04-816b-7cbd82d16b10

📥 Commits

Reviewing files that changed from the base of the PR and between 0b2b102 and f7ea53a.

📒 Files selected for processing (3)
  • src/config/types.rs
  • src/config/watcher.rs
  • src/messaging/manager.rs

@PyRo1121
Copy link
Contributor Author

PyRo1121 commented Mar 8, 2026

Follow-up for commit c3bae3e

This pass addressed the newest review feedback on PR #372.

What changed:

  • Migrated the remaining named Twitch token-file call sites in src/api/messaging.rs to the shared named_twitch_token_file_name(...) helper so create/toggle/delete paths all agree on the same filename scheme.
  • Updated the provider-reconfiguration path in src/main.rs to refresh api_state's LlmManager when a new manager is built, so API handlers don't keep stale provider credentials after reload.
  • Tightened MessagingManager lifecycle handling in src/messaging/manager.rs:
    • serialized register/start/reconcile/remove transitions with the manager lifecycle mutex,
    • based stale-removal on the running adapter set,
    • moved the initial start() attempt under supervisor ownership via a first-start result channel,
    • added backoff for short-lived streams before restart,
    • ensured teardown completes before propagating shutdown errors,
    • kept configured_fingerprints seeded for startup-registered adapters.

Validation rerun locally after these changes:

  • cargo fmt
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo check --all-targets
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo clippy --all-targets -- -D warnings
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo test --lib -q

Result:

  • 494 passed; 0 failed

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
src/messaging/manager.rs (4)

82-93: ⚠️ Potential issue | 🟡 Minor

Don't swallow stop_runtime() failures.

These .ok() calls discard shutdown/join errors from a non-channel Result. If an old runtime fails to stop, the replacement path keeps going and you lose the only signal that supervisors may now overlap.

As per coding guidelines "Don't silently discard errors. No let _ = on Results. Handle them, log them, or propagate them. The only exception is .ok() on channel sends where the receiver may be dropped".

Also applies to: 102-113, 158-165, 224-230

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 82 - 93, The code is currently
discarding errors from stop_runtime by calling .ok(); update all occurrences
(e.g., the calls inside the adapter registration flow around stop_runtime(&name,
old_adapter, false).await.ok()) to handle the Result instead of swallowing it:
await the stop_runtime call, check for Err and either log the error with
tracing::warn!/error! including the adapter name/context or propagate the error
up so callers can react (same treatment for the similar occurrences at the other
locations you noted). Ensure you reference the same parameters (name,
old_adapter, boolean flag) when logging so supervisors can detect overlapping
runtimes.

553-564: ⚠️ Potential issue | 🟠 Major

This doesn't actually seed fingerprints.

Lines 561-563 insert "" for each adapter, so the first reconcile_configured() after startup still sees every configured adapter as changed and restarts them on the next config edit. Either initialize startup adapters through the same fingerprinted ConfiguredAdapter path, or let this method receive the real fingerprint values.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 553 - 564,
seed_configured_fingerprints_from_registered currently writes empty strings into
configured_fingerprints which causes reconcile_configured to treat everything as
changed; instead populate configured_fingerprints with the actual fingerprints
from the registered adapter objects (or reuse the same creation path that
produces ConfiguredAdapter fingerprints). Update
seed_configured_fingerprints_from_registered to read the registered adapters map
(e.g., registered_adapters.read().await or the structure holding adapter state),
get each adapter's real fingerprint (e.g., ConfiguredAdapter::fingerprint or the
field/method that returns the fingerprint) and insert that value into
configured_fingerprints.entry(name).or_insert_with(|| actual_fingerprint), or
change the function signature to accept a map of name->fingerprint and seed from
that, so reconcile_configured no longer sees all adapters as changed on startup.

285-299: ⚠️ Potential issue | 🟠 Major

Immediate EOFs still collapse to the initial delay.

Line 288 resets next_retry_delay on any successful start(), even if the stream returns None immediately. A connect/EOF loop therefore sleeps only INITIAL_RETRY_DELAY forever instead of backing off across attempts. Reset the delay only after the stream proves healthy.

Also applies to: 327-341

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 285 - 299, The code resets
next_retry_delay to INITIAL_RETRY_DELAY as soon as start_result returns
Ok(stream), but that incorrectly collapses backoff for immediate-EOF/None
streams; instead, remove the immediate reset inside the Ok(start) branch and
only reset next_retry_delay after the stream has proven healthy (e.g., after
receiving the first successful item/event from the stream or otherwise
confirming it is not an immediate EOF). Update the logic around the match on
start_result (the Ok(stream) branch where stream is used) and the analogous
block later (the second match handling start_result) so the reset happens after
a confirmed successful read from stream rather than on start success. Ensure you
reference and adjust next_retry_delay, INITIAL_RETRY_DELAY, start_result,
start(), and stream consistently in both places.

353-379: ⚠️ Potential issue | 🟠 Major

Keep the initial connect under supervisor ownership.

start_runtime() still awaits adapter.start() before an AdapterRuntime exists. If that future hangs, the caller keeps lifecycle_mutex held and remove_adapter()/shutdown() has nothing to abort. The supervisor needs to own the first start attempt too.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 353 - 379, Change start_runtime so it
does not await adapter.start() directly; instead create/install the supervisor
first and let the supervisor perform the initial start attempt so the first
connect is owned/abortable by the supervisor. Concretely, replace the
adapter.start().await + matching logic in start_runtime with a call to
install_supervisor(name.clone(), Arc::clone(&adapter), None) (or
install_supervisor(name, adapter, None)) so the supervisor owns the initial
start; if you still need to surface the first-start error when
surface_start_error is true, have install_supervisor return or expose a
future/result for the initial start that start_runtime can optionally await, but
do not perform adapter.start() inside start_runtime itself. Ensure references to
remove_adapter(), shutdown(), and lifecycle_mutex remain compatible with this
change.
src/main.rs (1)

2218-2232: ⚠️ Potential issue | 🟠 Major

Drop the old watcher before you spawn the replacement.

FileWatcherHandle only begins shutdown in Drop (src/config/watcher.rs, Lines 20-29). Keeping it in _old_watcher means the setup-mode watcher keeps running until the end of this arm, after the replacement watcher is already live, so both can process the same change burst against different runtime state. At minimum, drop the old handle before spawn_file_watcher(...); ideally wait for the blocking task to exit once the handle exposes a joinable shutdown.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 2218 - 2232, The current code keeps the old
FileWatcherHandle in _old_watcher until after calling
spacebot::config::spawn_file_watcher, so the old watcher isn't dropped (and
shutdown begun) until after the new watcher is running; change the order to take
and drop the old handle before calling spawn_file_watcher: call let _old_watcher
= _file_watcher.take(); explicitly drop(_old_watcher) (or let _old_watcher =
_file_watcher.take(); if you want to then await a joinable shutdown, call its
join/await method) before creating the new watcher with
spacebot::config::spawn_file_watcher, and if FileWatcherHandle exposes a
joinable shutdown API, await that to ensure the previous watcher has fully
exited before assigning _file_watcher to the new handle.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/messaging/manager.rs`:
- Around line 203-207: reconcile_configured() is holding lifecycle_mutex and
then awaits remove_adapter(), which deadlocks because remove_adapter() re-locks
lifecycle_mutex; fix by performing the stale-adapter removal without awaiting a
function that re-acquires the lock — either inline the removal logic currently
in remove_adapter() directly in reconcile_configured() while using the existing
lifecycle_mutex guard, or create a private helper (e.g.,
remove_adapter_while_locked or remove_adapter_inner) that assumes the
lifecycle_mutex guard is already held and call that from reconcile_configured();
ensure any async awaits that would drop the guard are avoided or moved outside
the locked section.
- Around line 195-201: current_configured is being populated from self.adapters
(which includes shared adapters like the one registered by register_shared()),
causing reconcile to treat shared adapters as stale; change the snapshot to
iterate only the configured adapter set (the same source used to build desired)
instead of self.adapters. Replace the
self.adapters.read().await.keys().cloned().collect::<Vec<_>>() usage when
setting current_configured with the configured/desired keys (e.g.,
desired.keys().cloned().collect::<Vec<_>>() or the equivalent configured-adapter
collection your code uses) so the stale-sweep only considers adapters that are
actually configured.

---

Duplicate comments:
In `@src/main.rs`:
- Around line 2218-2232: The current code keeps the old FileWatcherHandle in
_old_watcher until after calling spacebot::config::spawn_file_watcher, so the
old watcher isn't dropped (and shutdown begun) until after the new watcher is
running; change the order to take and drop the old handle before calling
spawn_file_watcher: call let _old_watcher = _file_watcher.take(); explicitly
drop(_old_watcher) (or let _old_watcher = _file_watcher.take(); if you want to
then await a joinable shutdown, call its join/await method) before creating the
new watcher with spacebot::config::spawn_file_watcher, and if FileWatcherHandle
exposes a joinable shutdown API, await that to ensure the previous watcher has
fully exited before assigning _file_watcher to the new handle.

In `@src/messaging/manager.rs`:
- Around line 82-93: The code is currently discarding errors from stop_runtime
by calling .ok(); update all occurrences (e.g., the calls inside the adapter
registration flow around stop_runtime(&name, old_adapter, false).await.ok()) to
handle the Result instead of swallowing it: await the stop_runtime call, check
for Err and either log the error with tracing::warn!/error! including the
adapter name/context or propagate the error up so callers can react (same
treatment for the similar occurrences at the other locations you noted). Ensure
you reference the same parameters (name, old_adapter, boolean flag) when logging
so supervisors can detect overlapping runtimes.
- Around line 553-564: seed_configured_fingerprints_from_registered currently
writes empty strings into configured_fingerprints which causes
reconcile_configured to treat everything as changed; instead populate
configured_fingerprints with the actual fingerprints from the registered adapter
objects (or reuse the same creation path that produces ConfiguredAdapter
fingerprints). Update seed_configured_fingerprints_from_registered to read the
registered adapters map (e.g., registered_adapters.read().await or the structure
holding adapter state), get each adapter's real fingerprint (e.g.,
ConfiguredAdapter::fingerprint or the field/method that returns the fingerprint)
and insert that value into configured_fingerprints.entry(name).or_insert_with(||
actual_fingerprint), or change the function signature to accept a map of
name->fingerprint and seed from that, so reconcile_configured no longer sees all
adapters as changed on startup.
- Around line 285-299: The code resets next_retry_delay to INITIAL_RETRY_DELAY
as soon as start_result returns Ok(stream), but that incorrectly collapses
backoff for immediate-EOF/None streams; instead, remove the immediate reset
inside the Ok(start) branch and only reset next_retry_delay after the stream has
proven healthy (e.g., after receiving the first successful item/event from the
stream or otherwise confirming it is not an immediate EOF). Update the logic
around the match on start_result (the Ok(stream) branch where stream is used)
and the analogous block later (the second match handling start_result) so the
reset happens after a confirmed successful read from stream rather than on start
success. Ensure you reference and adjust next_retry_delay, INITIAL_RETRY_DELAY,
start_result, start(), and stream consistently in both places.
- Around line 353-379: Change start_runtime so it does not await adapter.start()
directly; instead create/install the supervisor first and let the supervisor
perform the initial start attempt so the first connect is owned/abortable by the
supervisor. Concretely, replace the adapter.start().await + matching logic in
start_runtime with a call to install_supervisor(name.clone(),
Arc::clone(&adapter), None) (or install_supervisor(name, adapter, None)) so the
supervisor owns the initial start; if you still need to surface the first-start
error when surface_start_error is true, have install_supervisor return or expose
a future/result for the initial start that start_runtime can optionally await,
but do not perform adapter.start() inside start_runtime itself. Ensure
references to remove_adapter(), shutdown(), and lifecycle_mutex remain
compatible with this change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1c147ab7-5db5-4f84-93ba-b6d61c98bcd8

📥 Commits

Reviewing files that changed from the base of the PR and between f7ea53a and bbc3d4b.

📒 Files selected for processing (4)
  • src/config.rs
  • src/config/watcher.rs
  • src/main.rs
  • src/messaging/manager.rs

Comment on lines +203 to +207
let mut first_error = None;
for name in current_configured {
if !desired_names.contains(&name)
&& let Err(error) = self.remove_adapter(&name).await
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

This stale-removal path deadlocks under the lifecycle lock.

reconcile_configured() already holds lifecycle_mutex, then Line 206 awaits remove_adapter(), which tries to lock the same mutex again. The first stale adapter hangs reconciliation permanently; inline the removal here or add a private helper that assumes the guard is already held.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 203 - 207, reconcile_configured() is
holding lifecycle_mutex and then awaits remove_adapter(), which deadlocks
because remove_adapter() re-locks lifecycle_mutex; fix by performing the
stale-adapter removal without awaiting a function that re-acquires the lock —
either inline the removal logic currently in remove_adapter() directly in
reconcile_configured() while using the existing lifecycle_mutex guard, or create
a private helper (e.g., remove_adapter_while_locked or remove_adapter_inner)
that assumes the lifecycle_mutex guard is already held and call that from
reconcile_configured(); ensure any async awaits that would drop the guard are
avoided or moved outside the locked section.

@PyRo1121
Copy link
Contributor Author

PyRo1121 commented Mar 8, 2026

Follow-up for commit 433ee3f

This pass fixes the remaining live src/messaging/manager.rs review findings.

What changed:

  • Scoped reconciliation stale-removal to configured_fingerprints instead of the full registered adapter map, so shared adapters like webchat are no longer treated as stale during config reconcile.
  • Split adapter removal into a private remove_adapter_inner(...) helper used while the lifecycle mutex is already held, which removes the stale-reconciliation deadlock from reconcile_configured().

Validation rerun locally after the fix:

  • cargo fmt
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo check --all-targets
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo clippy --all-targets -- -D warnings
  • PROTOC=/tmp/protoc-bin/bin/protoc cargo test --lib -q

Result:

  • 494 passed; 0 failed

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/main.rs (1)

2219-2233: ⚠️ Potential issue | 🟠 Major

Drop the old watcher before spawning the replacement.

let _old_watcher = _file_watcher.take(); keeps the old handle alive until the end of this match arm, so FileWatcherHandle::drop() does not send the shutdown signal until after the new watcher is already running. That still leaves a duplicate-watcher window.

♻️ Proposed fix
-                                        let _old_watcher = _file_watcher.take();
+                                        drop(_file_watcher.take());
                                         _file_watcher = Some(spacebot::config::spawn_file_watcher(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 2219 - 2233, The current code keeps the old
FileWatcherHandle alive until after spawn_file_watcher runs by assigning let
_old_watcher = _file_watcher.take(); — drop the old watcher immediately before
calling spawn_file_watcher so FileWatcherHandle::drop sends the shutdown signal
prior to starting the replacement; e.g., take the option from _file_watcher into
a temporary (or if let Some(old) = _file_watcher.take() { drop(old); }) then
call spacebot::config::spawn_file_watcher(...) and assign its handle to
_file_watcher.
♻️ Duplicate comments (2)
src/messaging/manager.rs (2)

206-209: ⚠️ Potential issue | 🔴 Critical

This stale-adapter loop still self-deadlocks.

reconcile_configured() already holds lifecycle_mutex, then awaits remove_adapter(), which tries to lock the same mutex again on Line 533. The first stale adapter will hang reconciliation permanently; this needs an inner removal helper that assumes the lifecycle lock is already held.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 206 - 209, reconcile_configured()
currently holds lifecycle_mutex and awaits remove_adapter(), which deadlocks
because remove_adapter() tries to lock lifecycle_mutex again; extract the core
removal logic into a new internal helper (e.g., remove_adapter_locked or
remove_adapter_inner) that assumes the lifecycle_mutex is already held, update
reconcile_configured() to call this helper directly when it holds the mutex, and
change remove_adapter() to acquire lifecycle_mutex then call the same helper so
external callers keep existing behavior; ensure the helper is async and returns
the same Result/Err shape as remove_adapter() so error handling remains
unchanged.

197-203: ⚠️ Potential issue | 🔴 Critical

Stale removal still includes shared adapters.

current_configured is collected from self.adapters, so reconciliation still sweeps non-config runtimes too. src/main.rs also registers webchat via register_shared(), and it never appears in desired, so the next reconcile will tear it down as stale unless this loop uses a configured-only set.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 197 - 203, current_configured is
currently built from self.adapters and thus includes adapters registered via
register_shared (e.g., webchat), causing them to be treated as stale; change the
collection to only include configured (non-shared) adapters by filtering out
shared registrations when reading self.adapters (e.g., replace the
keys().cloned().collect with an iterator that filters by adapter.shared == false
or !adapter.is_shared(), then maps to the keys), or alternatively intersect with
the desired/configured set if you have one; ensure you reference the same map
used in the diff (self.adapters.read().await) and exclude entries registered via
register_shared().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/messaging/manager.rs`:
- Around line 373-395: The initial-start wait currently does a
potentially-unbounded await on the oneshot receiver (first_start_result_rx /
rx.await) inside register_and_start() / reconcile_configured() while holding
lifecycle_mutex, which can deadlock if adapter.start() hangs; wrap the receive
in a bounded timeout (e.g., use tokio::time::timeout with a sensible duration)
when awaiting the receiver returned by install_supervisor, and on timeout return
an Err indicating the initial start timed out so the API/reload path fails fast
while the supervisor continues retrying in the background; update the match
handling for rx.await to handle the timeout case and include the adapter name in
the returned error.
- Around line 294-301: The channel send results on first_start_result_tx are
being ignored with `let _ = ...`; update both send calls (the successful send of
Ok(()) and the error send of Err(error.to_string())) to use `.ok()` instead of
`let _ =` so we explicitly acknowledge a dropped receiver per the repo rule;
locate the sends where `first_start_result_tx.take()` is used in the
stream-starting code path and replace the `let _ = tx.send(...)` patterns with
`tx.send(...).ok()` ensuring the same messages (Ok(()) and
Err(error.to_string())) are sent.

---

Outside diff comments:
In `@src/main.rs`:
- Around line 2219-2233: The current code keeps the old FileWatcherHandle alive
until after spawn_file_watcher runs by assigning let _old_watcher =
_file_watcher.take(); — drop the old watcher immediately before calling
spawn_file_watcher so FileWatcherHandle::drop sends the shutdown signal prior to
starting the replacement; e.g., take the option from _file_watcher into a
temporary (or if let Some(old) = _file_watcher.take() { drop(old); }) then call
spacebot::config::spawn_file_watcher(...) and assign its handle to
_file_watcher.

---

Duplicate comments:
In `@src/messaging/manager.rs`:
- Around line 206-209: reconcile_configured() currently holds lifecycle_mutex
and awaits remove_adapter(), which deadlocks because remove_adapter() tries to
lock lifecycle_mutex again; extract the core removal logic into a new internal
helper (e.g., remove_adapter_locked or remove_adapter_inner) that assumes the
lifecycle_mutex is already held, update reconcile_configured() to call this
helper directly when it holds the mutex, and change remove_adapter() to acquire
lifecycle_mutex then call the same helper so external callers keep existing
behavior; ensure the helper is async and returns the same Result/Err shape as
remove_adapter() so error handling remains unchanged.
- Around line 197-203: current_configured is currently built from self.adapters
and thus includes adapters registered via register_shared (e.g., webchat),
causing them to be treated as stale; change the collection to only include
configured (non-shared) adapters by filtering out shared registrations when
reading self.adapters (e.g., replace the keys().cloned().collect with an
iterator that filters by adapter.shared == false or !adapter.is_shared(), then
maps to the keys), or alternatively intersect with the desired/configured set if
you have one; ensure you reference the same map used in the diff
(self.adapters.read().await) and exclude entries registered via
register_shared().

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4d45fa55-66c3-4a99-8fa9-6a425d3bc805

📥 Commits

Reviewing files that changed from the base of the PR and between bbc3d4b and c3bae3e.

📒 Files selected for processing (3)
  • src/api/messaging.rs
  • src/main.rs
  • src/messaging/manager.rs

Comment on lines +294 to +301
if let Some(tx) = first_start_result_tx.take() {
let _ = tx.send(Ok(()));
}
stream
}
Err(error) => {
if let Some(tx) = first_start_result_tx.take() {
let _ = tx.send(Err(error.to_string()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use the channel-send exception explicitly here.

If dropping the receiver is acceptable, prefer .ok() over let _ = so this follows the repository’s Result-handling rule.

As per coding guidelines "Don't silently discard errors. No let _ = on Results. Handle them, log them, or propagate them. The only exception is .ok() on channel sends where the receiver may be dropped".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 294 - 301, The channel send results on
first_start_result_tx are being ignored with `let _ = ...`; update both send
calls (the successful send of Ok(()) and the error send of
Err(error.to_string())) to use `.ok()` instead of `let _ =` so we explicitly
acknowledge a dropped receiver per the repo rule; locate the sends where
`first_start_result_tx.take()` is used in the stream-starting code path and
replace the `let _ = tx.send(...)` patterns with `tx.send(...).ok()` ensuring
the same messages (Ok(()) and Err(error.to_string())) are sent.

Comment on lines +373 to 395
let first_start_result_rx = if surface_start_error {
let (tx, rx) = oneshot::channel();
self.install_supervisor(name.clone(), adapter, Some(tx))
.await;
Some(rx)
} else {
self.install_supervisor(name.clone(), adapter, None).await;
None
};

if let Some(rx) = first_start_result_rx {
match rx.await {
Ok(Ok(())) => {}
Ok(Err(error)) => {
return Err(anyhow::anyhow!("failed to start adapter '{name}': {error}").into());
}
Err(error) => {
return Err(anyhow::anyhow!(
"failed to receive initial start result for '{name}': {error}"
)
.into());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bound the first-start wait.

register_and_start() and reconcile_configured() call this while holding lifecycle_mutex. If adapter.start() hangs, rx.await never completes, the caller hangs, and remove_adapter() cannot acquire the mutex to stop the supervisor. A timeout here would let the API/reload path fail fast while the supervisor keeps retrying in the background.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 373 - 395, The initial-start wait
currently does a potentially-unbounded await on the oneshot receiver
(first_start_result_rx / rx.await) inside register_and_start() /
reconcile_configured() while holding lifecycle_mutex, which can deadlock if
adapter.start() hangs; wrap the receive in a bounded timeout (e.g., use
tokio::time::timeout with a sensible duration) when awaiting the receiver
returned by install_supervisor, and on timeout return an Err indicating the
initial start timed out so the API/reload path fails fast while the supervisor
continues retrying in the background; update the match handling for rx.await to
handle the timeout case and include the adapter name in the returned error.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
src/messaging/manager.rs (1)

383-395: ⚠️ Potential issue | 🟠 Major

Bound the first-start wait to prevent deadlock.

This rx.await has no timeout. If adapter.start() hangs in the supervisor, this blocks indefinitely while holding lifecycle_mutex. Since remove_adapter() also needs that mutex, there's no way to cancel a hanging adapter — causing a deadlock.

Consider wrapping the receive in tokio::time::timeout so the API/reload path can fail fast while the supervisor continues retrying in the background.

🛡️ Proposed fix with timeout
+        const FIRST_START_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
+
         if let Some(rx) = first_start_result_rx {
-            match rx.await {
-                Ok(Ok(())) => {}
-                Ok(Err(error)) => {
-                    return Err(anyhow::anyhow!("failed to start adapter '{name}': {error}").into());
-                }
-                Err(error) => {
-                    return Err(anyhow::anyhow!(
-                        "failed to receive initial start result for '{name}': {error}"
-                    )
-                    .into());
+            match tokio::time::timeout(FIRST_START_TIMEOUT, rx).await {
+                Ok(Ok(Ok(()))) => {}
+                Ok(Ok(Err(error))) => {
+                    return Err(anyhow::anyhow!("failed to start adapter '{name}': {error}").into());
+                }
+                Ok(Err(error)) => {
+                    return Err(anyhow::anyhow!(
+                        "failed to receive initial start result for '{name}': {error}"
+                    )
+                    .into());
+                }
+                Err(_) => {
+                    tracing::warn!(adapter = %name, "initial start timed out; supervisor will keep retrying");
                 }
             }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 383 - 395, The await on
first_start_result_rx can block indefinitely and cause a deadlock while holding
lifecycle_mutex; wrap the receive in tokio::time::timeout (e.g.,
tokio::time::Duration::from_secs configurable) when awaiting
first_start_result_rx in the block that follows adapter.start(), map a timeout
(Elapsed) into an Err return that surfaces a fast-failing API/reload error
(including the adapter name) while allowing the supervisor to keep retrying in
the background; ensure the change references first_start_result_rx and the
matching arms (Ok(Ok(())), Ok(Err(error)), Err(error)) so the timeout case is
handled alongside those and remove_adapter()/supervisor can still acquire
lifecycle_mutex.
🧹 Nitpick comments (1)
src/messaging/manager.rs (1)

294-301: Use .ok() for channel sends per coding guidelines.

These oneshot channel sends use let _ = to discard the result. Per coding guidelines, the exception for channel sends where the receiver may be dropped should use .ok() explicitly.

♻️ Proposed fix
                     tracing::info!(adapter = %name, "adapter started successfully");
                     if let Some(tx) = first_start_result_tx.take() {
-                        let _ = tx.send(Ok(()));
+                        tx.send(Ok(())).ok();
                     }
                     stream
                 }
                 Err(error) => {
                     if let Some(tx) = first_start_result_tx.take() {
-                        let _ = tx.send(Err(error.to_string()));
+                        tx.send(Err(error.to_string())).ok();
                     }

As per coding guidelines: "Don't silently discard errors. No let _ = on Results. The only exception is .ok() on channel sends where the receiver may be dropped".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/messaging/manager.rs` around lines 294 - 301, Replace the silent `let _ =
tx.send(...)` calls with the explicit `.ok()` pattern for the oneshot channel
sends: locate the `first_start_result_tx.take()` branches where
`tx.send(Ok(()))` and `tx.send(Err(error.to_string()))` are invoked and change
those lines to `tx.send(...).ok();` so the send result is intentionally
discarded per the coding guideline.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/messaging/manager.rs`:
- Around line 383-395: The await on first_start_result_rx can block indefinitely
and cause a deadlock while holding lifecycle_mutex; wrap the receive in
tokio::time::timeout (e.g., tokio::time::Duration::from_secs configurable) when
awaiting first_start_result_rx in the block that follows adapter.start(), map a
timeout (Elapsed) into an Err return that surfaces a fast-failing API/reload
error (including the adapter name) while allowing the supervisor to keep
retrying in the background; ensure the change references first_start_result_rx
and the matching arms (Ok(Ok(())), Ok(Err(error)), Err(error)) so the timeout
case is handled alongside those and remove_adapter()/supervisor can still
acquire lifecycle_mutex.

---

Nitpick comments:
In `@src/messaging/manager.rs`:
- Around line 294-301: Replace the silent `let _ = tx.send(...)` calls with the
explicit `.ok()` pattern for the oneshot channel sends: locate the
`first_start_result_tx.take()` branches where `tx.send(Ok(()))` and
`tx.send(Err(error.to_string()))` are invoked and change those lines to
`tx.send(...).ok();` so the send result is intentionally discarded per the
coding guideline.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6dcb3570-2b84-4497-89fd-1667edd643f8

📥 Commits

Reviewing files that changed from the base of the PR and between c3bae3e and 433ee3f.

📒 Files selected for processing (1)
  • src/messaging/manager.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant