Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
70f7dd7
Add onChat cursor/mode schemas and caught-up replay metadata
ThomasK33 Feb 13, 2026
46a2733
Implement incremental onChat replay plumbing
ThomasK33 Feb 13, 2026
8e33e88
Add incremental onChat replay cursor handling
ThomasK33 Feb 13, 2026
dad31b6
Fix timing: defer resetChatStateForReplay to after iterator established
ThomasK33 Feb 13, 2026
1aebcf6
Fix incremental reconnect gating and replay tool completion filtering
ThomasK33 Feb 13, 2026
8ac0779
Fix replay retry buffer reset and tool completion timestamp filtering
ThomasK33 Feb 13, 2026
bbcb563
Advance stream cursor with tool completion timestamps
ThomasK33 Feb 13, 2026
ce72f97
Clear stale stream contexts on full replay fallback
ThomasK33 Feb 13, 2026
02d0dde
Defensively initialize tool completion timestamps in stream replay paths
ThomasK33 Feb 13, 2026
1aaf913
Guard incremental replay with oldest history sequence cursor
ThomasK33 Feb 13, 2026
2a148e1
Require history anchor for since-mode replay
ThomasK33 Feb 13, 2026
f0378f7
Re-read stream state before emitting caught-up stream cursor
ThomasK33 Feb 13, 2026
615cfcb
Avoid stale partial emission after stream replay attempts
ThomasK33 Feb 13, 2026
d3823c1
Reset transient queued/live state on full replay fallback
ThomasK33 Feb 13, 2026
1e112f1
Preserve queued message on full replay fallback
ThomasK33 Feb 13, 2026
10f238a
Replay queued-message snapshot during onChat reconnect
ThomasK33 Feb 13, 2026
5abbeca
Emit queued-message snapshot in finally before caught-up
ThomasK33 Feb 13, 2026
04cc601
test: handle replayed queued snapshot in completion tests
ThomasK33 Feb 13, 2026
8289684
fix: preserve live reconnect state for full and live replay
ThomasK33 Feb 13, 2026
414bd92
test: isolate share transcript dialog from onChat retries
ThomasK33 Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/browser/components/ShareTranscriptDialog.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ describe("ShareTranscriptDialog", () => {
originalGetComputedStyle = globalThis.getComputedStyle;
globalThis.getComputedStyle = globalThis.window.getComputedStyle.bind(globalThis.window);

// Ensure test isolation from other suites that attach a mock ORPC client.
// Share dialog tests operate on local ephemeral messages and should not race
// onChat reconnect loops from unrelated WorkspaceStore tests.
getStore().setClient(null);

spyOn(console, "error").mockImplementation(() => undefined);

spyOn(muxMd, "uploadToMuxMd").mockResolvedValue({
Expand Down
55 changes: 54 additions & 1 deletion src/browser/stores/WorkspaceStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -772,8 +772,36 @@ describe("WorkspaceStore", () => {
const live = store.getBashToolLiveOutput(workspaceId, "call-2");
expect(live).toBeNull();
});
});

it("replays pre-caught-up bash output after full replay catches up", async () => {
const workspaceId = "bash-output-workspace-3";

mockOnChat.mockImplementation(async function* (): AsyncGenerator<
WorkspaceChatMessage,
void,
unknown
> {
yield {
type: "bash-output",
workspaceId,
toolCallId: "call-3",
text: "buffered\n",
isError: false,
timestamp: 1,
};
await Promise.resolve();
yield { type: "caught-up", replay: "full" };
});

createAndAddWorkspace(store, workspaceId);
await new Promise((resolve) => setTimeout(resolve, 10));

const live = store.getBashToolLiveOutput(workspaceId, "call-3");
expect(live).not.toBeNull();
if (!live) throw new Error("Expected buffered live output after caught-up");
expect(live.stdout).toContain("buffered");
});
});
describe("task-created events", () => {
it("exposes live taskId while the task tool is running", async () => {
const workspaceId = "task-created-workspace-1";
Expand Down Expand Up @@ -833,5 +861,30 @@ describe("WorkspaceStore", () => {

expect(store.getTaskToolLiveTaskId(workspaceId, "call-task-2")).toBeNull();
});

it("replays pre-caught-up task-created after full replay catches up", async () => {
const workspaceId = "task-created-workspace-3";

mockOnChat.mockImplementation(async function* (): AsyncGenerator<
WorkspaceChatMessage,
void,
unknown
> {
yield {
type: "task-created",
workspaceId,
toolCallId: "call-task-3",
taskId: "child-workspace-3",
timestamp: 1,
};
await Promise.resolve();
yield { type: "caught-up", replay: "full" };
});

createAndAddWorkspace(store, workspaceId);
await new Promise((resolve) => setTimeout(resolve, 10));

expect(store.getTaskToolLiveTaskId(workspaceId, "call-task-3")).toBe("child-workspace-3");
});
});
});
82 changes: 74 additions & 8 deletions src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import assert from "@/common/utils/assert";
import type { MuxMessage, DisplayedMessage, QueuedMessage } from "@/common/types/message";
import type { FrontendWorkspaceMetadata } from "@/common/types/workspace";
import type { WorkspaceChatMessage, WorkspaceStatsSnapshot } from "@/common/orpc/types";
import type { WorkspaceChatMessage, WorkspaceStatsSnapshot, OnChatMode } from "@/common/orpc/types";
import type { RouterClient } from "@orpc/server";
import type { AppRouter } from "@/node/orpc/router";
import type { TodoItem } from "@/common/types/tools";
Expand Down Expand Up @@ -1624,13 +1624,35 @@ export class WorkspaceStore {
let lastChatEventAt = Date.now();

try {
// Reconnect incrementally whenever we can build a valid cursor.
// Do not gate on transient.caughtUp here: retry paths may optimistically
// set caughtUp=false to re-enable buffering, but the cursor can still
// represent the latest rendered state for an incremental reconnect.
const aggregator = this.aggregators.get(workspaceId);
let mode: OnChatMode | undefined;

if (aggregator) {
const cursor = aggregator.getOnChatCursor();
if (cursor?.history) {
mode = {
type: "since",
cursor: {
history: cursor.history,
stream: cursor.stream,
},
};
}
}

const iterator = await client.workspace.onChat(
{ workspaceId },
{ workspaceId, mode },
{ signal: attemptController.signal }
);

if (this.pendingReplayReset.delete(workspaceId)) {
// Keep the existing UI visible until the replay can actually start.
// Full replay: clear stale derived/transient state now that the subscription
// is active. Deferred to after the iterator is established so the UI continues
// displaying previous state until replay data actually starts arriving.
if (!mode || mode.type === "full") {
this.resetChatStateForReplay(workspaceId);
}

Expand Down Expand Up @@ -1718,7 +1740,16 @@ export class WorkspaceStore {
}

if (this.isWorkspaceSubscribed(workspaceId)) {
this.pendingReplayReset.add(workspaceId);
const transient = this.chatTransientState.get(workspaceId);
if (transient) {
// Failed reconnect attempts may have buffered partial replay data.
// Clear replay buffers before the next attempt so we don't append a
// second replay copy and duplicate deltas/tool events on caught-up.
transient.caughtUp = false;
transient.replayingHistory = false;
transient.historicalMessages.length = 0;
transient.pendingStreamEvents.length = 0;
}
}

const delayMs = calculateOnChatBackoffMs(attempt);
Expand Down Expand Up @@ -2019,7 +2050,17 @@ export class WorkspaceStore {
* This ensures isStreamEvent() and processStreamEvent() can never fall out of sync.
*/
private isBufferedEvent(data: WorkspaceChatMessage): boolean {
return "type" in data && data.type in this.bufferedEventHandlers;
if (!("type" in data)) {
return false;
}

// Buffer high-frequency stream events (including bash/task live updates) until
// caught-up so full-replay reconnects can deterministically rebuild transient state.
return (
data.type in this.bufferedEventHandlers ||
data.type === "bash-output" ||
data.type === "task-created"
);
}

private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void {
Expand All @@ -2029,16 +2070,41 @@ export class WorkspaceStore {
const transient = this.assertChatTransientState(workspaceId);

if (isCaughtUpMessage(data)) {
const replay = data.replay ?? "full";

// Check if there's an active stream in buffered events (reconnection scenario)
const pendingEvents = transient.pendingStreamEvents;
const hasActiveStream = pendingEvents.some(
(event) => "type" in event && event.type === "stream-start"
);

// Load historical messages first
// Defensive cleanup:
// - full replay means backend rebuilt state from scratch, so stale local stream contexts
// must be cleared even if a stream cursor is present in caught-up metadata.
// - no stream cursor means no active stream exists server-side.
if (replay === "full" || !data.cursor?.stream) {
aggregator.clearActiveStreams();
}

if (replay === "full") {
// Full replay replaces backend-derived history state. Reset transient UI-only
// fields before replay hydration so stale values do not survive reconnect fallback.
// queuedMessage is safe to clear because backend now replays a fresh
// queued-message-changed snapshot before caught-up.
transient.queuedMessage = null;
transient.liveBashOutput.clear();
transient.liveTaskIds.clear();
}

if (transient.historicalMessages.length > 0) {
aggregator.loadHistoricalMessages(transient.historicalMessages, hasActiveStream);
const loadMode = replay === "full" ? "replace" : "append";
aggregator.loadHistoricalMessages(transient.historicalMessages, hasActiveStream, {
mode: loadMode,
});
transient.historicalMessages.length = 0;
} else if (replay === "full") {
// Full replay can legitimately contain zero messages (e.g. compacted to empty).
aggregator.loadHistoricalMessages([], hasActiveStream, { mode: "replace" });
}

// Mark that we're replaying buffered history (prevents O(N) scheduling)
Expand Down
115 changes: 99 additions & 16 deletions src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ import type { LanguageModelV2Usage } from "@ai-sdk/provider";
import type { TodoItem, StatusSetToolResult, NotifyToolResult } from "@/common/types/tools";
import { getToolOutputUiOnly } from "@/common/utils/tools/toolOutputUiOnly";

import type { WorkspaceChatMessage, StreamErrorMessage, DeleteMessage } from "@/common/orpc/types";
import type {
WorkspaceChatMessage,
StreamErrorMessage,
DeleteMessage,
OnChatCursor,
} from "@/common/orpc/types";
import { isInitStart, isInitOutput, isInitEnd, isMuxMessage } from "@/common/orpc/types";
import type {
DynamicToolPart,
Expand Down Expand Up @@ -804,21 +809,29 @@ export class StreamingMessageAggregator {
*
* @param messages - Historical messages to load
* @param hasActiveStream - Whether there's an active stream in buffered events (for reconnection scenario)
* @param opts.mode - "replace" clears existing state first, "append" merges into existing state
*/
loadHistoricalMessages(messages: MuxMessage[], hasActiveStream = false): void {
// Clear existing state to prevent stale messages from persisting.
// This method replaces all messages, not merges them.
this.messages.clear();
this.displayedMessageCache.clear();
this.messageVersions.clear();
this.deltaHistory.clear();
this.activeStreamUsage.clear();
this.loadedSkills.clear();
this.loadedSkillsCache = [];
this.skillLoadErrors.clear();
this.skillLoadErrorsCache = [];

// Add all messages to the map
loadHistoricalMessages(
messages: MuxMessage[],
hasActiveStream = false,
opts?: { mode?: "replace" | "append" }
): void {
const mode = opts?.mode ?? "replace";

if (mode === "replace") {
// Clear existing state to prevent stale messages from persisting.
this.messages.clear();
this.displayedMessageCache.clear();
this.messageVersions.clear();
this.deltaHistory.clear();
this.activeStreamUsage.clear();
this.loadedSkills.clear();
this.loadedSkillsCache = [];
this.skillLoadErrors.clear();
this.skillLoadErrorsCache = [];
}

// Add/overwrite messages in the map
for (const message of messages) {
this.messages.set(message.id, message);
}
Expand Down Expand Up @@ -871,6 +884,61 @@ export class StreamingMessageAggregator {
return this.cache.allMessages;
}

/**
* Build a cursor for incremental onChat reconnection.
* Returns undefined when we cannot safely represent the current state,
* forcing a full replay.
*/
getOnChatCursor(): OnChatCursor | undefined {
let maxHistorySequence = -1;
let maxHistoryMessageId: string | undefined;
let minHistorySequence = Number.POSITIVE_INFINITY;

for (const message of this.messages.values()) {
const historySequence = message.metadata?.historySequence;
if (historySequence === undefined) {
continue;
}

if (historySequence > maxHistorySequence) {
maxHistorySequence = historySequence;
maxHistoryMessageId = message.id;
}

if (historySequence < minHistorySequence) {
minHistorySequence = historySequence;
}
}

if (!maxHistoryMessageId || !Number.isFinite(minHistorySequence)) {
return undefined;
}

if (this.activeStreams.size > 1) {
// Defensive fallback: multiple active streams is anomalous, so force a full replay.
return undefined;
}

const cursor: OnChatCursor = {
history: {
messageId: maxHistoryMessageId,
historySequence: maxHistorySequence,
oldestHistorySequence: minHistorySequence,
},
};

if (this.activeStreams.size === 1) {
const activeStreamEntry = this.activeStreams.entries().next().value;
assert(activeStreamEntry, "activeStreams size reported 1 but no entry found");
const [messageId, context] = activeStreamEntry;
cursor.stream = {
messageId,
lastTimestamp: context.lastServerTimestamp,
};
}

return cursor;
}
// Efficient methods to check message state without creating arrays
getMessageCount(): number {
return this.messages.size;
Expand Down Expand Up @@ -1291,8 +1359,23 @@ export class StreamingMessageAggregator {
thinkingLevel: data.thinkingLevel,
};

// For incremental replay: stream-start may be re-emitted to re-establish context.
// If we already have this message with accumulated parts, don't wipe its content.
const existingMessage = this.messages.get(data.messageId);
if (data.replay && existingMessage && existingMessage.parts.length > 0) {
this.activeStreams.set(data.messageId, context);
if (existingMessage.metadata) {
existingMessage.metadata.model = data.model;
existingMessage.metadata.routedThroughGateway = data.routedThroughGateway;
existingMessage.metadata.mode = data.mode;
existingMessage.metadata.thinkingLevel = data.thinkingLevel;
}
this.markMessageDirty(data.messageId);
return;
}

// Use messageId as key - ensures only ONE stream per message
// If called twice (e.g., during replay), second call safely overwrites first
// If called twice, second call safely overwrites first
this.activeStreams.set(data.messageId, context);

// Create initial streaming message with empty parts (deltas will append)
Expand Down
6 changes: 5 additions & 1 deletion src/common/orpc/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { RuntimeConfigSchema, RuntimeAvailabilitySchema } from "./runtime";
import { SecretSchema } from "./secrets";
import {
CompletedMessagePartSchema,
OnChatModeSchema,
SendMessageOptionsSchema,
StreamEndEventSchema,
UpdateStatusSchema,
Expand Down Expand Up @@ -1002,7 +1003,10 @@ export const workspace = {
},
// Subscriptions
onChat: {
input: z.object({ workspaceId: z.string() }),
input: z.object({
workspaceId: z.string(),
mode: OnChatModeSchema.optional(),
}),
output: eventIterator(WorkspaceChatMessageSchema), // Stream event
},
onMetadata: {
Expand Down
Loading
Loading