Skip to content

Run comms on the R thread#1075

Merged
lionel- merged 31 commits intomainfrom
task/sync-comms
Mar 8, 2026
Merged

Run comms on the R thread#1075
lionel- merged 31 commits intomainfrom
task/sync-comms

Conversation

@lionel-
Copy link
Contributor

@lionel- lionel- commented Mar 2, 2026

Progress towards #689
Progress towards #691
Progress towards #1074

Comm handlers (data explorer, variables, connections...) currently run on their own spawned threads and call R through r_task(), which hooks into R's polled events. In practice this makes R evaluations preemptive: a comm handler may force a promise that triggers loadNamespace() while loadNamespace() is already on the call stack. As we move more logic to comm RPCs, the surface area for these reentrancy bugs grows. The data explorer is a good example: it sorts, filters, and profiles columns by calling back into R, all from a background thread.

A second, more subtle problem is message ordering. Because comm handlers run on their own threads, IOPub events (like data-changed notifications or comm_close) race with Idle. Tests had to work around this with complicated buffering and polling in DummyArkFrontend.

This PR introduces a blocking comm path and migrates the data explorer as the first comm to demonstrate the wins:

  • No more R reentrancy risk from r_task() at interrupt time.
  • No more risk of omitting r_task() wrappers around R-related code, which allows a much nicer development experience (and easier code reviews).
  • Much simpler implementation (the dedicated thread and select! loop goes away).
  • 10 data explorers used to spawn 10 comm threads. We now spawn 0 additional threads. In addition to reduced complexity, this reduces memory usage since each thread allocates 2mb for its stack. This reduction will also apply to plot comms, so could be significant in real sessions.
  • Deterministic update ordering (environment change side effects land within the Busy/Idle window of the request that caused them). The tests become deterministic and all the event buffering test infra goes away.

Blocking Shell while comms run on the R thread

The fix is to make comm message handling block Shell. When a comm_msg arrives on the Shell socket, instead of forwarding it to a channel and immediately moving on to the next request, we send the message to the R thread and wait for it to finish. This way no execute_request (or other comm RPC) can start while R code is running for a comm.

Amalthea side

  • Two new default methods on ShellHandler: handle_comm_msg() and handle_comm_close(). They return CommHandled::Handled or CommHandled::NotHandled. The latter falls back to the existing incoming_tx path, so non-migrated comms keep working unchanged. This is the only part that touches the framework; the rest is in ark.

  • In Shell::handle_comm_msg() / handle_comm_close(), the new handler is tried first. If it returns NotHandled, we fall through to the old channel send.

Ark side

The CommHandler trait. This is the new contract for comms that want to run on the R thread:

  • handle_open(): called once on the R thread when the comm is registered.
  • handle_msg(): incoming RPC or data message.
  • handle_close(): frontend-initiated close.
  • handle_environment(): notification after each top-level execution (or debug frame selection), so handlers can react to binding changes.
  • open_metadata(): metadata for the comm_open IOPub message (backend-initiated comms).

A CommHandlerContext gives each handler access to any relevant Console state or methods, to its outgoing_tx for emitting messages to the frontend, and to a close_on_exit() mechanism for self-closing (e.g. when a watched binding disappears).

Console as the comm registry:

Console owns a HashMap<String, ConsoleComm> keyed by comm ID. The comm_register() method handles backend-initiated opens (creates CommSocket, calls handle_open, sends CommEvent::Opened). Frontend-initiated opens go through KernelRequest::CommOpen with a factory closure so the handler is constructed on the R thread (important because it may hold RObjects).

Ordering change in Console::handle_execute_result(): The old code emitted environment_changed before returning to the event loop, which meant the event ran at an unpredictable time via r_task(). Now the sequence is explicit:

  1. Send execute result/error on IOPub
  2. Call comm_notify_environment_changed() synchronously
  3. Then send the execute reply (which unblocks Shell, which sends Idle)

This guarantees all comm side-effects (data explorer updates, closes) land on IOPub within the Busy/Idle window of the execute request that caused them. This is what makes test assertions deterministic.

Data explorer migration

Data explorer is the first comm fully migrated to the new path. The diff is large but the transformation is mechanical:

  • No more spawned thread: RDataExplorer::start()RDataExplorer::new(). Construction returns the struct directly instead of spawning a thread with a select! loop.
  • No more r_task() calls: All the r_task(|| self.r_foo()) wrappers become direct self.foo() calls. The r_ prefix indicating a method is to be called on the R thread is dropped.
  • No more RThreadSafe: Since everything runs on the R thread, Table becomes a plain newtype over RObject (with a manual unsafe impl Send for the idle-task path). The Arc<Mutex<Option<RThreadSafe<RObject>>>> is gone.

Because data explorer messages now arrive deterministically on IOPub, the entire DataExplorerBuffer infrastructure in DummyArkFrontend is removed (~180 lines of buffering, polling, and try_buffer_msg logic). The integration tests now robustly assert comm events sequentially.

@lionel- lionel- requested review from DavisVaughan and jmcphers March 2, 2026 15:53
Copy link
Contributor

@jmcphers jmcphers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review this in detail, but the structure is sound. I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings. But this pattern feels much better for things like the Data Explorer that are primarily interacting with R state.

@lionel-
Copy link
Contributor Author

lionel- commented Mar 3, 2026

I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings.

Absolutely! I've kept a Shell thread on the Ark side as an intermediate between Amalthea Shell and the Ark Console for that reason. The Ark Shell thread will dispatch asynchronous messages to async comm threads. See also related discussion in posit-dev/positron#7447.

This setup will resemble how the DAP currently works, with a Console side running on the R thread and a server side living in its own thread. Both sides share common state via a mutex, and the server side is also able to run R code via idle tasks.

Comment on lines +219 to +220
/// Channel used to send along messages relayed on the open comms.
comm_event_tx: Sender<CommEvent>,
pub(crate) comm_event_tx: Sender<CommEvent>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought get_comm_event_tx() that doesn't expose this as pub was a nice abstraction :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd still need a pub crate if you call it from other files

Copy link
Contributor

@DavisVaughan DavisVaughan Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but you can't modify it from other files because you only get a reference, that's the benefit of the getter and the reason for all of the get_ methods in here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, I've restored the method returning a reference, and as discussed on Slack used the submodule trick to remove pub(crate) for fields accessed from Console methods implemented in other files.

}
// Safety: `Table` is only accessed on the R thread (or in R idle tasks,
// which also run on the R thread).
unsafe impl Send for Table {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very uneasy about this

We still ship a Table around via an r_task::spawn_idle() (as you mentioned).

I know that we:

  • Send the task from the main R thread
  • Pick the task up and run it on the main R thread

But I am still extremely nervous about providing anything outside of RThreadSafe that can send across threads. I just don't trust us to get it right every time.

I think I would prefer to keep this wrapped in RThreadSafe, because to me that is The Way to ship across threads, even if you end up running from the main R thread on both sides.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also like to note that this problem would very likely go away entirely if we had a variant of r_task::spawn_idle() that did not require a Send bound on T.

I believe we are stuck with that as long as we are using crossbeam channels, but everything happening here is all on one thread! The main R thread!

All we really want is to queue up a task within the same thread so that read_console can run it at the next idle iteration. Something like r_task::enque_idle() maybe, just spitballing.

That shouldn't require a crossbeam channel ideally (although compatibility with a crossbeam select! would make it challenging probably). We maybe just need some VecDeque to push and pop from.

Then you should be able to ship a closure around, even if it has an RObject inside, without a Send bound.

But until then, I still like RThreadSafe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep all task spawning should eventually be done from the R thread, and then we use a simple queue data structure instead of the crossbeam channel.

I'm comfortable with the current setup but since it makes you uneasy I'll restore the RThreadSafe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up that this forced me to add some IS_TESTING weirdness to RThreadSafe's drop method

Comment on lines +194 to 195
fn update(&mut self, ctx: &CommHandlerContext) -> anyhow::Result<bool> {
// No need to check for updates if we have no binding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random thought. Would it be nice to have some kind of assert_r_thread!() macro we could put at the top of functions like this? Panic in debug mode and no-op in release mode? It would be self documenting and would help us with our invariants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bad idea but it feels superfluous to me, now that the architecture ensures that comms consistently run on the R thread.

Comment on lines +72 to +80
/// Register a new comm handler on the R thread (frontend-initiated comms).
/// Uses a factory closure so the handler (which may hold `RObject`s) is
/// created on the R thread rather than sent across threads.
CommOpen {
comm_id: String,
comm_name: String,
factory: CommHandlerFactory,
ctx: CommHandlerContext,
done_tx: Sender<()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what didn't look used to me, and the factory stuff just felt confusing if we don't have a use for it...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if you think you'll use it for Variables, I'd be interested in delaying the addition of this to that PR so we can see / justify that we really do need this weird factory thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's see in the Variables PR how that comes along

Comment on lines +88 to +95
if reg.ctx.is_closed() {
closed_ids.push(comm_id.clone());
}
}
for comm_id in closed_ids {
if let Some(reg) = self.comms.remove(&comm_id) {
self.comm_notify_closed(&comm_id, &reg);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole is_closed() thing feels a bit wrong to me.

It feels like after any kind of generic message we have to check if the backend decided to close the comm? Like we do this here and in comm_handle_msg.

Should something else be handling this in a more consistent manner?

Otherwise it feels like if we add any other comm_notify_*() helper to this, then we are going to also need to check is_closed there too, and that feels so easy to forget

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same feeling as you but couldn't find a better way to structure this.

Comment on lines +81 to +86
pub enum EnvironmentChanged {
/// A top-level execution completed (user code, debug eval, etc.).
Execution,
/// The user selected a different frame in the call stack during debugging.
FrameSelected,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like this framing!

@lionel- lionel- force-pushed the task/sync-comms branch 2 times, most recently from b7a45f2 to ae67c85 Compare March 6, 2026 21:10
@lionel- lionel- merged commit ef09c81 into main Mar 8, 2026
8 checks passed
@lionel- lionel- deleted the task/sync-comms branch March 8, 2026 12:16
@github-actions github-actions bot locked and limited conversation to collaborators Mar 8, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants