🤖 feat: incremental workspace.onChat subscription (cursor + live-only)#2413
🤖 feat: incremental workspace.onChat subscription (cursor + live-only)#2413
Conversation
Move the full-replay reset to after the await for client.workspace.onChat() to preserve the original timing behavior. The reset must happen after an async boundary so the UI continues displaying previous state until replay data starts arriving, and to avoid spurious store emissions during synchronous workspace setup.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: dad31b6df3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1aebcf65b1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8ac07793db
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bbcb56364d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
Codex Review: Didn't find any major issues. 🚀 ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 02d0dde54f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1aaf91377d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2a148e1448
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f0378f722a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 615cfcb9db
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d3823c194b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1e112f1a87
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 10f238a5db
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5abbecae8b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex review |
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 414bd92d58
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // Replay init state only for full replay. Incremental reconnects already have init state. | ||
| if (replayMode === "full") { | ||
| await this.initStateManager.replayInit(this.workspaceId); |
There was a problem hiding this comment.
Replay init events when reconnecting with since/live mode
Skipping initStateManager.replayInit() for non-full replays means reconnects can miss init state transitions that happened while the client was offline. In particular, if a client disconnects after init-start and the hook finishes before reconnect, StreamingMessageAggregator only clears the running init state when it receives init-end, so the UI can remain stuck showing a stale "running" workspace-init message until a later full replay. This regression is introduced by gating init replay on replayMode === "full".
Useful? React with 👍 / 👎.
Summary
Add incremental
workspace.onChatsubscription support so clients can reconnect with a cursor instead of replaying full history. This eliminates redundant data transfer after transient disconnects and enables live-only subscriptions.Background
Today
workspace.onChatalways does a full history replay on subscribe/reconnect — re-sending all messages fromchat.jsonl, replaying active streams from scratch, and replaying init state. For browser/remote connections, this resends a lot of redundant data after transient disconnects. This PR adds three subscription modes:full(default): current behavior, unchanged.since(with cursor): replay only messages newer than the client's cursor, plus incremental stream replay filtered by timestamp.live: skip all history replay; receive only future events.Implementation
Shared schemas (
src/common/orpc/schemas/stream.ts,api.ts,types.ts)OnChatHistoryCursorSchema,OnChatStreamCursorSchema,OnChatCursorSchema,OnChatModeSchemaCaughtUpMessageSchemaextended with optionalreplay(enum) andcursorfields for backward compatibilityworkspace.onChat.inputnow accepts optionalmodeparameterBackend (
router.ts,agentSession.ts,streamManager.ts,aiService.ts)input.modetosession.replayHistory()AgentSession.emitHistoricalEvents()implements mode-aware replay:id + historySequence), filters messages, and passesafterTimestampfor incremental stream replaycaught-upevent now always includes{ replay, cursor }— the server declares which strategy it actually usedStreamManager.replayStream()accepts optionalafterTimestampto filter stream parts while always emittingstream-startreplay === "full"Frontend (
StreamingMessageAggregator.ts,WorkspaceStore.ts)StreamingMessageAggregator:getOnChatCursor()method computes cursor from maxhistorySequence+ active stream stateloadHistoricalMessages()supports{ mode: "append" }for incremental loads (skips clearing)handleStreamStart()is replay-idempotent: won't wipe existing parts whendata.replay === trueWorkspaceStore:runOnChatSubscription()computes cursor from aggregator state on reconnecthandleChatMessage()caught-up handler uses server'sreplayfield to decide replace vs. appendpendingReplayResetpattern with mode-aware reset timingValidation
make static-checkpasses (typecheck, lint, fmt)make test: 3788 pass, 4 fail (all pre-existing: 2 PolicyContext timeouts, 1 flaky ProjectAddForm, 1 error)Risks
mode=undefined→ full replay) is preserved exactly; incremental modes are opt-in from the frontend.CaughtUpMessageSchemachanges are backward-compatible (new fields are optional).📋 Implementation Plan
Plan: Incremental
workspace.onChatsubscription (cursor + live-only)Context / Why
Today
workspace.onChatalways does a full history replay on subscribe/reconnect (messages fromchat.jsonl+ optional stream replay + init replay), and the frontend resets its chat state to accept that replay.For browser/remote connections this can resend a lot of redundant data after transient disconnects. The goal is to support:
Key constraints:
Evidence (current code)
src/node/orpc/router.tsworkspace.onChat(async generator; sets up relay + callssession.replayHistory(); sends heartbeats).src/node/services/agentSession.tsreplayHistory()/emitHistoricalEvents()(readsgetHistoryFromLatestBoundary(skip=1), optionally callsaiService.replayStream(), replays init, always emits{type:"caught-up"}).src/node/services/streamManager.tsreplayStream()emitsstream-start+ replayspartswithreplay: true.src/node/orpc/replayBufferedStreamMessageRelay.ts(buffers live deltas during replay; dedupes by[type, messageId, timestamp, delta]).src/browser/stores/WorkspaceStore.tsrunOnChatSubscription()+resetChatStateForReplay().WorkspaceStore.handleChatMessage()buffers untilcaught-up, then callsStreamingMessageAggregator.loadHistoricalMessages().StreamingMessageAggregator.loadHistoricalMessages()currently clears all state.StreamingMessageAggregator.handleStreamStart()currently overwrites the streaming message with empty parts, even during replay.Proposed API / contract
1) Extend
workspace.onChatinputAdd a
modediscriminant. Default remains full replay.Semantics:
stream-startsnapshot but no prior deltas — see Backend step 5.)2) Extend
caught-upevent to carry replay info + new cursorWe need the server to tell the client whether it honored
sinceor fell back to full replay.Rules:
caught-up(existing invariant).replayreflects the actual replay strategy used (even ifsincewas requested).cursorrepresents the server’s best “current” cursor at the end of replay.cursor.streamshould be present only when the server believes a stream is currently active.Recommended approach (with phases)
Approach A (MVP): history cursor + live-only, keep stream replay full
mode=live.mode=sincefor persisted messages only.full(or frontend requestsfull).Net LoC estimate (product code): ~250–400
Approach B (recommended): history cursor + stream timestamp cursor (full incremental reconnect)
Includes everything in Approach A, plus:
cursor.stream.lastTimestamp.part.timestamp > lastTimestamp.handleStreamStart(replay=true)idempotent (no wiping existing parts).Net LoC estimate (product code): ~450–700
The rest of this plan details Approach B. (Approach A is a subset by skipping steps marked “(B-only)”.)
Implementation details
Backend
1) Schemas
src/common/orpc/schemas/stream.tsOnChatHistoryCursorSchema,OnChatStreamCursorSchema,OnChatCursorSchema.CaughtUpMessageSchemato include{ replay, cursor? }.src/common/orpc/schemas/api.tsworkspace.onChat.inputto accept{ mode?: OnChatMode }.OnChatModeSchema(or keep private and infer types viaschemas).src/common/orpc/types.tscaught-uphas only{type}.2) ORPC router: thread
modethroughsrc/node/orpc/router.tsworkspace.onChathandlerinput.modethrough tosession.replayHistory(...).createReplayBufferedStreamMessageRelay+ heartbeat logic intact.Pseudo-shape:
3) AgentSession: implement replay modes + cursor validation
src/node/services/agentSession.tsreplayHistory(listener)→replayHistory(listener, mode?: OnChatMode).emitHistoricalEvents(listener)→emitHistoricalEvents(listener, mode?: OnChatMode).Core logic outline:
Validation rules (defensive):
mode.type !== "since"→ full.since.cursor.historymissing → fallback to full (we can’t safely filter).messageIdand confirminghistorySequencematches.This keeps “crash and burn” logic localized to replay only; it should never crash the app.
4) (B-only) Stream replay filtering by timestamp
src/node/services/aiService.ts+src/node/services/streamManager.tsreplayStream(workspaceId, opts?: { afterTimestamp?: number }).StreamManager.replayStream():Also:
sincecursor includescursor.stream.messageIdand it doesn’t matchstreamInfo.messageId, ignoreafterTimestampand replay full stream (client cursor is for a different stream).5) Live-only mode behavior
Decide one of:
caught-up(replay:"live").stream-start(replay:true)but no prior deltas so the subscriber can receive subsequent live deltas without dropping them.(Plan assumes “useful live-only”.)
Frontend
6) Add a reconnect cursor and pass it on resubscribe
src/browser/stores/WorkspaceStore.tsrunOnChatSubscription()history: latest{ messageId, historySequence }fromaggregator.getAllMessages().stream: ifaggregatorhas an active stream, use itslastServerTimestampand messageId.Implementation shape:
Notes:
sincewhen we already have local state (e.g.,transient.caughtUp === trueandaggregator.hasMessages()), otherwise request full.7) Stop pre-emptively clearing state on reconnect; clear only when server says “full”
WorkspaceStore:pendingReplayResetusage.resetChatStateForReplay()as a utility, but trigger it based oncaught-up.replay === "full".Concrete change:
runOnChatSubscriptionremove:if (this.pendingReplayReset.delete(workspaceId)) this.resetChatStateForReplay(workspaceId);this.pendingReplayReset.add(workspaceId);in the retry pathInstead, in
handleChatMessagewhen receivingcaught-up, decide whether to reset/replace or append.8) Extend
caught-uphandling to support append + stream reconciliationWorkspaceStore.handleChatMessage()isCaughtUpMessagebranchNew logic:
data.replay(default to"full"if missing).data.cursor?.streamis absent, callaggregator.clearActiveStreams()to recover from “stream ended while disconnected”.replay === "full"→ clear + replace (current behavior)replay === "since"→ append-only (new)replay === "live"→ should be no historical messages; just mark caught upPseudo:
(See next step for
loadHistoricalMessageschanges.)9) Update
StreamingMessageAggregator.loadHistoricalMessagesto support append modesrc/browser/utils/messages/StreamingMessageAggregator.tsChange signature:
Behavior:
replace= current behavior.append:this.messages(usethis.messages.set(id, msg)to avoid theaddMessage“parts length” heuristic).historySequenceis greater than the prior max sequence.invalidateCache()once.This keeps the performance win of batch loading without corrupting derived state.
10) (B-only) Make
handleStreamStart(replay=true)idempotentStreamingMessageAggregator.handleStreamStart()Current behavior always overwrites message with empty parts. For incremental replay we must not wipe already-received parts.
Change:
data.replay === trueand we already havethis.messages.get(data.messageId):activeStreamscontext so the UI knows it’s streaming.Pseudo:
11) Cursor helper on aggregator
StreamingMessageAggregatoradd:getOnChatCursor(): { history?: {messageId; historySequence}; stream?: {messageId; lastTimestamp} } | undefinedImplementation details:
history: pick message with maxmetadata.historySequence.stream: ifactiveStreams.size > 0, pick the (only) stream messageId and use itslastServerTimestamp.Tests / validation
Backend tests
Add coverage for:
mode=sincefilters persisted messages correctly.replay:"full".replayStream({afterTimestamp})only emits parts after timestamp.caught-upalways emitted with{ replay, cursor }.Good starting points:
src/node/services/historyService.test.ts(real file I/O patterns)src/cli/server.test.ts(spins up ORPC server/client; can test subscription end-to-end)Frontend tests
src/browser/stores/WorkspaceStore.test.tsfor caught-up behavior (replace vs append)src/browser/utils/messages/StreamingMessageAggregator.test.tsfor:handleStreamStart(replay=true)not wiping partsLocal validation checklist
make typecheckmake testmake lintRollout / safety
sinceafter it has successfully loaded once (caughtUp === true).caught-up.replayto keep clients correct.Why we need server-indicated replay mode (vs always append)
If the client’s cursor is stale (history truncated/edited) and the server must send a full replay, append-mode would create duplicates and/or keep stale messages that should have been deleted. Having the server declare `replay:"full"` lets the client reset deterministically.Generated with
mux• Model:anthropic:claude-opus-4-6• Thinking:xhigh• Cost:$1.88