Conversation
jmcphers
left a comment
There was a problem hiding this comment.
I didn't review this in detail, but the structure is sound. I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings. But this pattern feels much better for things like the Data Explorer that are primarily interacting with R state.
Absolutely! I've kept a Shell thread on the Ark side as an intermediate between Amalthea Shell and the Ark Console for that reason. The Ark Shell thread will dispatch asynchronous messages to async comm threads. See also related discussion in posit-dev/positron#7447. This setup will resemble how the DAP currently works, with a Console side running on the R thread and a server side living in its own thread. Both sides share common state via a mutex, and the server side is also able to run R code via idle tasks. |
crates/ark/src/console.rs
Outdated
| /// Channel used to send along messages relayed on the open comms. | ||
| comm_event_tx: Sender<CommEvent>, | ||
| pub(crate) comm_event_tx: Sender<CommEvent>, |
There was a problem hiding this comment.
I thought get_comm_event_tx() that doesn't expose this as pub was a nice abstraction :/
There was a problem hiding this comment.
You'd still need a pub crate if you call it from other files
There was a problem hiding this comment.
Yes but you can't modify it from other files because you only get a reference, that's the benefit of the getter and the reason for all of the get_ methods in here
There was a problem hiding this comment.
That makes sense, I've restored the method returning a reference, and as discussed on Slack used the submodule trick to remove pub(crate) for fields accessed from Console methods implemented in other files.
| } | ||
| // Safety: `Table` is only accessed on the R thread (or in R idle tasks, | ||
| // which also run on the R thread). | ||
| unsafe impl Send for Table {} |
There was a problem hiding this comment.
I am very uneasy about this
We still ship a Table around via an r_task::spawn_idle() (as you mentioned).
I know that we:
- Send the task from the main R thread
- Pick the task up and run it on the main R thread
But I am still extremely nervous about providing anything outside of RThreadSafe that can send across threads. I just don't trust us to get it right every time.
I think I would prefer to keep this wrapped in RThreadSafe, because to me that is The Way to ship across threads, even if you end up running from the main R thread on both sides.
There was a problem hiding this comment.
I would also like to note that this problem would very likely go away entirely if we had a variant of r_task::spawn_idle() that did not require a Send bound on T.
I believe we are stuck with that as long as we are using crossbeam channels, but everything happening here is all on one thread! The main R thread!
All we really want is to queue up a task within the same thread so that read_console can run it at the next idle iteration. Something like r_task::enque_idle() maybe, just spitballing.
That shouldn't require a crossbeam channel ideally (although compatibility with a crossbeam select! would make it challenging probably). We maybe just need some VecDeque to push and pop from.
Then you should be able to ship a closure around, even if it has an RObject inside, without a Send bound.
But until then, I still like RThreadSafe
There was a problem hiding this comment.
yep all task spawning should eventually be done from the R thread, and then we use a simple queue data structure instead of the crossbeam channel.
I'm comfortable with the current setup but since it makes you uneasy I'll restore the RThreadSafe.
There was a problem hiding this comment.
Just a heads up that this forced me to add some IS_TESTING weirdness to RThreadSafe's drop method
| fn update(&mut self, ctx: &CommHandlerContext) -> anyhow::Result<bool> { | ||
| // No need to check for updates if we have no binding |
There was a problem hiding this comment.
Random thought. Would it be nice to have some kind of assert_r_thread!() macro we could put at the top of functions like this? Panic in debug mode and no-op in release mode? It would be self documenting and would help us with our invariants.
There was a problem hiding this comment.
Not a bad idea but it feels superfluous to me, now that the architecture ensures that comms consistently run on the R thread.
crates/ark/src/request.rs
Outdated
| /// Register a new comm handler on the R thread (frontend-initiated comms). | ||
| /// Uses a factory closure so the handler (which may hold `RObject`s) is | ||
| /// created on the R thread rather than sent across threads. | ||
| CommOpen { | ||
| comm_id: String, | ||
| comm_name: String, | ||
| factory: CommHandlerFactory, | ||
| ctx: CommHandlerContext, | ||
| done_tx: Sender<()>, |
There was a problem hiding this comment.
This is what didn't look used to me, and the factory stuff just felt confusing if we don't have a use for it...
There was a problem hiding this comment.
Even if you think you'll use it for Variables, I'd be interested in delaying the addition of this to that PR so we can see / justify that we really do need this weird factory thing
There was a problem hiding this comment.
ok let's see in the Variables PR how that comes along
| if reg.ctx.is_closed() { | ||
| closed_ids.push(comm_id.clone()); | ||
| } | ||
| } | ||
| for comm_id in closed_ids { | ||
| if let Some(reg) = self.comms.remove(&comm_id) { | ||
| self.comm_notify_closed(&comm_id, ®); | ||
| } |
There was a problem hiding this comment.
This whole is_closed() thing feels a bit wrong to me.
It feels like after any kind of generic message we have to check if the backend decided to close the comm? Like we do this here and in comm_handle_msg.
Should something else be handling this in a more consistent manner?
Otherwise it feels like if we add any other comm_notify_*() helper to this, then we are going to also need to check is_closed there too, and that feels so easy to forget
There was a problem hiding this comment.
I had the same feeling as you but couldn't find a better way to structure this.
| pub enum EnvironmentChanged { | ||
| /// A top-level execution completed (user code, debug eval, etc.). | ||
| Execution, | ||
| /// The user selected a different frame in the call stack during debugging. | ||
| FrameSelected, | ||
| } |
b7a45f2 to
ae67c85
Compare
Progress towards #689
Progress towards #691
Progress towards #1074
Comm handlers (data explorer, variables, connections...) currently run on their own spawned threads and call R through
r_task(), which hooks into R's polled events. In practice this makes R evaluations preemptive: a comm handler may force a promise that triggersloadNamespace()whileloadNamespace()is already on the call stack. As we move more logic to comm RPCs, the surface area for these reentrancy bugs grows. The data explorer is a good example: it sorts, filters, and profiles columns by calling back into R, all from a background thread.A second, more subtle problem is message ordering. Because comm handlers run on their own threads, IOPub events (like data-changed notifications or
comm_close) race with Idle. Tests had to work around this with complicated buffering and polling inDummyArkFrontend.This PR introduces a blocking comm path and migrates the data explorer as the first comm to demonstrate the wins:
r_task()at interrupt time.r_task()wrappers around R-related code, which allows a much nicer development experience (and easier code reviews).select!loop goes away).Blocking Shell while comms run on the R thread
The fix is to make comm message handling block Shell. When a
comm_msgarrives on the Shell socket, instead of forwarding it to a channel and immediately moving on to the next request, we send the message to the R thread and wait for it to finish. This way noexecute_request(or other comm RPC) can start while R code is running for a comm.Amalthea side
Two new default methods on
ShellHandler:handle_comm_msg()andhandle_comm_close(). They returnCommHandled::HandledorCommHandled::NotHandled. The latter falls back to the existingincoming_txpath, so non-migrated comms keep working unchanged. This is the only part that touches the framework; the rest is in ark.In
Shell::handle_comm_msg()/handle_comm_close(), the new handler is tried first. If it returnsNotHandled, we fall through to the old channel send.Ark side
The
CommHandlertrait. This is the new contract for comms that want to run on the R thread:handle_open(): called once on the R thread when the comm is registered.handle_msg(): incoming RPC or data message.handle_close(): frontend-initiated close.handle_environment(): notification after each top-level execution (or debug frame selection), so handlers can react to binding changes.open_metadata(): metadata for thecomm_openIOPub message (backend-initiated comms).A
CommHandlerContextgives each handler access to any relevant Console state or methods, to itsoutgoing_txfor emitting messages to the frontend, and to aclose_on_exit()mechanism for self-closing (e.g. when a watched binding disappears).Console as the comm registry:
Console owns a
HashMap<String, ConsoleComm>keyed by comm ID. Thecomm_register()method handles backend-initiated opens (createsCommSocket, callshandle_open, sendsCommEvent::Opened). Frontend-initiated opens go throughKernelRequest::CommOpenwith a factory closure so the handler is constructed on the R thread (important because it may holdRObjects).Ordering change in
Console::handle_execute_result(): The old code emittedenvironment_changedbefore returning to the event loop, which meant the event ran at an unpredictable time viar_task(). Now the sequence is explicit:comm_notify_environment_changed()synchronouslyThis guarantees all comm side-effects (data explorer updates, closes) land on IOPub within the Busy/Idle window of the execute request that caused them. This is what makes test assertions deterministic.
Data explorer migration
Data explorer is the first comm fully migrated to the new path. The diff is large but the transformation is mechanical:
RDataExplorer::start()→RDataExplorer::new(). Construction returns the struct directly instead of spawning a thread with aselect!loop.r_task()calls: All ther_task(|| self.r_foo())wrappers become directself.foo()calls. Ther_prefix indicating a method is to be called on the R thread is dropped.RThreadSafe: Since everything runs on the R thread,Tablebecomes a plain newtype overRObject(with a manualunsafe impl Sendfor the idle-task path). TheArc<Mutex<Option<RThreadSafe<RObject>>>>is gone.Because data explorer messages now arrive deterministically on IOPub, the entire
DataExplorerBufferinfrastructure inDummyArkFrontendis removed (~180 lines of buffering, polling, andtry_buffer_msglogic). The integration tests now robustly assert comm events sequentially.