Unverified 提交 6f53e896 authored 作者: Will Chen's avatar Will Chen 提交者: GitHub

Handle local agent connection recovery (#2853)

Handle connection-drop and retry behavior in local agent IPC handling. Add an end-to-end scenario that verifies recovery after a temporary local agent disconnect. Align test fixtures and snapshots for local-agent reconnection behavior.
上级 709222c5
......@@ -12,6 +12,8 @@ dist/
# playwright
playwright-report/
test-results/
blob-report/
flakiness-report/
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
......
import type { LocalAgentFixture } from "../../../../testing/fake-llm-server/localAgentTypes";
/**
* Tests retry behavior when connection drops after tool-call chunks were emitted
* but before the stream is finalized. This simulates an orphaned tool-call retry
* window and ensures we don't duplicate tool execution.
*/
export const fixture: LocalAgentFixture = {
description: "Connection drop after streaming tool-call chunks",
dropConnectionAfterToolCallByTurn: [{ turnIndex: 0, attempts: [1] }],
turns: [
{
text: "I'll create a file for you.",
toolCalls: [
{
name: "write_file",
args: {
path: "src/recovered-after-tool-call.ts",
content: `export const recoveredAfterToolCall = true;\n`,
description: "File created after tool-call termination recovery",
},
},
],
},
{
text: "Successfully created the file after retrying from a tool-call termination.",
},
],
};
import type { LocalAgentFixture } from "../../../../testing/fake-llm-server/localAgentTypes";
/**
* Tests automatic retry after connection drop (e.g., TCP terminated mid-stream).
* This fixture drops the connection on the first attempt of turn 1 (the
* post-tool text turn), which is more realistic than dropping before any
* tool activity. The local agent handler should automatically retry and
* continue without re-running completed work.
*/
export const fixture: LocalAgentFixture = {
description: "Automatic retry after connection drop",
dropConnectionByTurn: [{ turnIndex: 1, attempts: [1] }],
turns: [
{
text: "I'll create a file for you.",
toolCalls: [
{
name: "write_file",
args: {
path: "src/recovered.ts",
content: `export const recovered = true;\n`,
description: "File created after connection recovery",
},
},
],
},
{
text: "Successfully created the file after automatic retry.",
},
],
};
import { expect } from "@playwright/test";
import { testSkipIfWindows } from "./helpers/test_helper";
/**
* E2E test for local-agent connection retry resilience.
* Verifies that the agent automatically recovers from transient connection
* drops (e.g., TCP terminated mid-stream) by retrying the stream.
*/
testSkipIfWindows(
"local-agent - recovers from connection drop",
async ({ po }) => {
await po.setUpDyadPro({ localAgent: true });
await po.importApp("minimal");
await po.chatActions.selectLocalAgentMode();
// The connection-drop fixture drops on turn 1 (after a tool turn already
// completed) to simulate a realistic interrupted follow-up request.
await po.sendPrompt("tc=local-agent/connection-drop");
// Verify the turn still completed and no error box leaked to the UI.
await expect(po.page.getByTestId("chat-error-box")).toHaveCount(0);
const introText = po.page.getByText("I'll create a file for you.");
const completionText = po.page.getByText(
"Successfully created the file after automatic retry.",
);
await expect(introText).toHaveCount(1);
await expect(completionText).toHaveCount(1);
await expect(introText).toBeVisible();
await expect(completionText).toBeVisible();
// Partial chunks from the dropped attempt must not leak into final UI.
await expect(
po.page.getByText("Partial response before connection dr"),
).toHaveCount(0);
// Verify exactly one recovered.ts edit card is shown in chat.
const recoveredEditCard = po.page.getByRole("button", {
name: /recovered\.ts .*src\/recovered\.ts.*Edit/,
});
await expect(recoveredEditCard).toHaveCount(1);
// The replayed conversation order must stay:
// intro assistant text -> tool edit card -> completion assistant text.
const introY = (await introText.boundingBox())?.y;
const editCardY = (await recoveredEditCard.boundingBox())?.y;
const completionY = (await completionText.boundingBox())?.y;
expect(introY).toBeDefined();
expect(editCardY).toBeDefined();
expect(completionY).toBeDefined();
expect(introY!).toBeLessThan(editCardY!);
expect(editCardY!).toBeLessThan(completionY!);
// Snapshot end state for chat + filesystem.
await po.snapshotMessages();
await po.snapshotAppFiles({
name: "after-connection-retry",
files: ["src/recovered.ts"],
});
},
);
testSkipIfWindows(
"local-agent - recovers when drop happens after tool-call stream",
async ({ po }) => {
await po.setUpDyadPro({ localAgent: true });
await po.importApp("minimal");
await po.chatActions.selectLocalAgentMode();
await po.sendPrompt("tc=local-agent/connection-drop-after-tool-call");
await expect(po.page.getByTestId("chat-error-box")).toHaveCount(0);
await expect(
po.page.getByText(
"Successfully created the file after retrying from a tool-call termination.",
),
).toBeVisible();
await expect(
po.page
.getByRole("button", {
name: /recovered-after-tool-call\.ts .*src\/recovered-after-tool-call\.ts.*Edit/,
})
.first(),
).toBeVisible();
await po.snapshotAppFiles({
name: "after-tool-call-connection-retry",
files: ["src/recovered-after-tool-call.ts"],
});
},
);
=== src/recovered-after-tool-call.ts ===
export const recoveredAfterToolCall = true;
- paragraph: /Generate an AI_RULES\.md file for this app\. Describe the tech stack in 5-\d+ bullet points and describe clear rules about what libraries to use for what\./
- button "file1.txt file1.txt Edit":
- img
- text: ""
- button "Edit":
- img
- text: ""
- img
- paragraph: More EOM
- button "Copy":
- img
- img
- text: Approved
- img
- text: claude-opus-4-5
- img
- text: less than a minute ago
- img
- text: (1 files changed)
- button "Copy Request ID":
- img
- text: ""
- paragraph: tc=local-agent/connection-drop
- paragraph: I'll create a file for you.
- 'button "recovered.ts src/recovered.ts Edit Summary: File created after connection recovery"':
- img
- text: ""
- button "Edit":
- img
- text: ""
- img
- text: ""
- paragraph: Successfully created the file after automatic retry.
- button "Copy":
- img
- img
- text: claude-opus-4-5
- img
- text: less than a minute ago
- button "Copy Request ID":
- img
- text: ""
- button "Undo":
- img
- text: ""
- button "Retry":
- img
- text: ""
\ No newline at end of file
......@@ -847,6 +847,184 @@ describe("handleLocalAgentStream", () => {
expect(lastContentUpdate.data.content).toContain("Hello, ");
expect(lastContentUpdate.data.content).toContain("world!");
});
it("should retry and resume when a stream terminates transiently", async () => {
// Arrange
const { event, getMessagesByChannel } = createFakeEvent();
mockSettings = buildTestSettings({ enableDyadPro: true });
mockChatData = buildTestChat();
const streamMessagesByAttempt: any[][] = [];
let attemptCount = 0;
mockStreamTextImpl = (options) => {
attemptCount += 1;
streamMessagesByAttempt.push(options.messages ?? []);
if (attemptCount === 1) {
return {
fullStream: (async function* () {
yield { type: "text-delta", text: "Partial response. " };
throw new TypeError("terminated");
})(),
response: Promise.resolve({ messages: [] }),
steps: Promise.resolve([]),
};
}
return {
fullStream: (async function* () {
yield { type: "text-delta", text: "Recovered output." };
})(),
response: Promise.resolve({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "Recovered output." }],
},
],
}),
steps: Promise.resolve([{ toolCalls: [] }]),
};
};
// Act
await handleLocalAgentStream(
event,
{ chatId: 1, prompt: "test" },
new AbortController(),
{
placeholderMessageId: 10,
systemPrompt: "You are helpful",
dyadRequestId,
},
);
// Assert
expect(attemptCount).toBe(2);
expect(getMessagesByChannel("chat:response:error")).toHaveLength(0);
const contentUpdates = dbOperations.updates.filter(
(u) => u.data.content !== undefined,
);
const finalContent = contentUpdates[contentUpdates.length - 1].data
.content as string;
expect(finalContent).toContain("Partial response.");
expect(finalContent).toContain("Recovered output.");
const continuationInstructionFound = (
streamMessagesByAttempt[1] ?? []
).some(
(message: any) =>
message.role === "user" &&
Array.isArray(message.content) &&
message.content.some(
(part: any) =>
part.type === "text" &&
typeof part.text === "string" &&
part.text.includes(
"previous response stream was interrupted by a transient network error",
),
),
);
expect(continuationInstructionFound).toBe(true);
});
it("should replay emitted tool events before retrying a terminated stream", async () => {
// Arrange
const { event, getMessagesByChannel } = createFakeEvent();
mockSettings = buildTestSettings({ enableDyadPro: true });
mockChatData = buildTestChat();
const streamMessagesByAttempt: any[][] = [];
let attemptCount = 0;
mockStreamTextImpl = (options) => {
attemptCount += 1;
streamMessagesByAttempt.push(options.messages ?? []);
if (attemptCount === 1) {
return {
fullStream: (async function* () {
yield { type: "text-delta", text: "Working with tools. " };
yield {
type: "tool-call",
toolCallId: "call_replay_1",
toolName: "read_file",
input: { path: "README.md" },
};
yield {
type: "tool-result",
toolCallId: "call_replay_1",
toolName: "read_file",
output: "README content",
};
throw new TypeError("terminated");
})(),
response: Promise.resolve({ messages: [] }),
steps: Promise.resolve([]),
};
}
return {
fullStream: (async function* () {
yield { type: "text-delta", text: "Resumed after replay." };
})(),
response: Promise.resolve({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "Resumed after replay." }],
},
],
}),
steps: Promise.resolve([{ toolCalls: [] }]),
};
};
// Act
await handleLocalAgentStream(
event,
{ chatId: 1, prompt: "test" },
new AbortController(),
{
placeholderMessageId: 10,
systemPrompt: "You are helpful",
dyadRequestId,
},
);
// Assert
expect(attemptCount).toBe(2);
expect(getMessagesByChannel("chat:response:error")).toHaveLength(0);
const secondAttemptMessages = streamMessagesByAttempt[1] ?? [];
const hasReplayedToolCall = secondAttemptMessages.some(
(message: any) =>
message.role === "assistant" &&
Array.isArray(message.content) &&
message.content.some(
(part: any) =>
part.type === "tool-call" &&
part.toolCallId === "call_replay_1" &&
part.toolName === "read_file",
),
);
const hasReplayedToolResult = secondAttemptMessages.some(
(message: any) =>
message.role === "tool" &&
Array.isArray(message.content) &&
message.content.some(
(part: any) =>
part.type === "tool-result" &&
part.toolCallId === "call_replay_1" &&
part.toolName === "read_file" &&
part.output?.type === "text" &&
part.output?.value === "README content",
),
);
expect(hasReplayedToolCall).toBe(true);
expect(hasReplayedToolResult).toBe(true);
});
});
describe("Stream processing - reasoning blocks", () => {
......
......@@ -79,6 +79,10 @@ import { getPostCompactionMessages } from "@/ipc/handlers/compaction/compaction_
const logger = log.scope("local_agent_handler");
const PLANNING_QUESTIONNAIRE_TOOL_NAME = "planning_questionnaire";
const MAX_TERMINATED_STREAM_RETRIES = 2;
const STREAM_RETRY_BASE_DELAY_MS = 400;
const STREAM_CONTINUE_MESSAGE =
"[System] Your previous response stream was interrupted by a transient network error. Continue from exactly where you left off and do not repeat text that has already been sent.";
// ============================================================================
// Tool Streaming State Management
......@@ -93,6 +97,24 @@ interface ToolStreamingEntry {
}
const toolStreamingEntries = new Map<string, ToolStreamingEntry>();
type RetryReplayEvent =
| {
type: "assistant-text";
text: string;
}
| {
type: "tool-call";
toolCallId: string;
toolName: string;
input: unknown;
}
| {
type: "tool-result";
toolCallId: string;
toolName: string;
output: unknown;
};
function getOrCreateStreamingEntry(
id: string,
toolName?: string,
......@@ -243,6 +265,7 @@ export async function handleLocalAgentStream(
const settings = readSettings();
let fullResponse = "";
let streamingPreview = ""; // Temporary preview for current tool, not persisted
let activeRetryReplayEvents: RetryReplayEvent[] | null = null;
// Mid-turn compaction inserts a DB summary row for LLM history, but we render
// the user-facing compaction indicator inline in the active assistant turn.
const hiddenMessageIdsForStreaming = new Set<number>();
......@@ -470,7 +493,8 @@ export async function handleLocalAgentStream(
},
onXmlComplete: (finalXml: string) => {
// Write final XML to DB and UI
fullResponse += finalXml + "\n";
const xmlChunk = `${finalXml}\n`;
fullResponse += xmlChunk;
streamingPreview = ""; // Clear preview
updateResponseInDb(placeholderMessageId, fullResponse);
sendResponseChunk(
......@@ -587,321 +611,455 @@ export async function handleLocalAgentStream(
postMidTurnCompactionStartStep = null;
baseMessageHistoryCount = currentMessageHistory.length;
// Stream the response
const streamResult = streamText({
model: modelClient.model,
headers: getAiHeaders({
builtinProviderId: modelClient.builtinProviderId,
}),
providerOptions: getProviderOptions({
dyadAppId: chat.app.id,
dyadRequestId,
dyadDisableFiles: true, // Local agent uses tools, not file injection
files: [],
mentionedAppsCodebases: [],
builtinProviderId: modelClient.builtinProviderId,
settings,
}),
maxOutputTokens,
temperature,
maxRetries: 2,
system: systemPrompt,
messages: currentMessageHistory,
tools: allTools,
stopWhen: [
stepCountIs(25),
hasToolCall(addIntegrationTool.name),
// In plan mode, also stop after writing a plan or exiting plan mode.
...(planModeOnly
? [hasToolCall(writePlanTool.name), hasToolCall(exitPlanTool.name)]
: []),
],
abortSignal: abortController.signal,
// Inject pending user messages (e.g., images from web_crawl) between steps
// We must re-inject all accumulated messages each step because the AI SDK
// doesn't persist dynamically injected messages in its internal state.
// We track the insertion index so messages appear at the same position each step.
prepareStep: async (options) => {
let stepOptions = options;
if (
!messageOverride &&
compactBeforeNextStep &&
!compactedMidTurn &&
settings.enableContextCompaction !== false
) {
compactBeforeNextStep = false;
const inFlightTailMessages = options.messages.slice(
baseMessageHistoryCount,
);
const compacted = await maybePerformPendingCompaction({
showOnTopOfCurrentResponse: true,
force: true,
});
if (compacted) {
compactedMidTurn = true;
// Preserve only messages generated after this compaction boundary.
postMidTurnCompactionStartStep = options.stepNumber;
// Clear stale injected messages — their insertAtIndex values are
// based on the pre-compaction message array which has been rebuilt
// with a different (typically smaller) count. Keeping them would
// cause injectMessagesAtPositions to splice at wrong positions.
allInjectedMessages.length = 0;
const compactedMessageHistory = buildChatMessageHistory(
chat.messages,
{
// Keep the structured in-flight assistant/tool messages from
// the current stream instead of the placeholder DB content.
excludeMessageIds: new Set([placeholderMessageId]),
},
);
baseMessageHistoryCount = compactedMessageHistory.length;
stepOptions = {
...options,
// Preserve in-flight turn messages so same-turn tool loops can
// continue, while later turns are compacted via persisted history.
messages: [...compactedMessageHistory, ...inFlightTailMessages],
};
} else {
// Prevent repeated compaction attempts if the first one fails.
compactionFailedMidTurn = true;
}
let passProducedChatText = false;
let responseMessages: ModelMessage[] = [];
let steps: Array<{
toolCalls: Array<unknown>;
response?: { messages?: ModelMessage[] };
}> = [];
let terminatedRetryCount = 0;
let needsContinuationInstruction = false;
// Retry loop: if the stream terminates with a transient error, captured text/tool events are replayed into message history, a continuation instruction is appended, and the stream is re-opened.
while (!abortController.signal.aborted) {
let streamErrorFromCallback: unknown;
const retryReplayEvents: RetryReplayEvent[] = [];
activeRetryReplayEvents = retryReplayEvents;
const attemptMessages = needsContinuationInstruction
? [
...currentMessageHistory,
buildTerminatedRetryContinuationInstruction(),
]
: currentMessageHistory;
const attemptToolInputIds = new Set<string>();
const cleanupAttemptToolStreamingEntries = () => {
for (const toolCallId of attemptToolInputIds) {
cleanupStreamingEntry(toolCallId);
}
attemptToolInputIds.clear();
};
const preparedStep = prepareStepMessages(
stepOptions,
pendingUserMessages,
allInjectedMessages,
);
// prepareStepMessages returns undefined when it has no additional
// injections/cleanups to apply. If we already replaced the base
// message history (e.g., after mid-turn compaction), we still need
// to return the updated options.
let result =
preparedStep ?? (stepOptions === options ? undefined : stepOptions);
try {
const streamResult = streamText({
model: modelClient.model,
headers: getAiHeaders({
builtinProviderId: modelClient.builtinProviderId,
}),
providerOptions: getProviderOptions({
dyadAppId: chat.app.id,
dyadRequestId,
dyadDisableFiles: true, // Local agent uses tools, not file injection
files: [],
mentionedAppsCodebases: [],
builtinProviderId: modelClient.builtinProviderId,
settings,
}),
maxOutputTokens,
temperature,
maxRetries: 2,
system: systemPrompt,
messages: attemptMessages,
tools: allTools,
stopWhen: [
stepCountIs(25),
hasToolCall(addIntegrationTool.name),
// In plan mode, also stop after writing a plan or exiting plan mode.
...(planModeOnly
? [
hasToolCall(writePlanTool.name),
hasToolCall(exitPlanTool.name),
]
: []),
],
abortSignal: abortController.signal,
// Inject pending user messages (e.g., images from web_crawl) between steps
// We must re-inject all accumulated messages each step because the AI SDK
// doesn't persist dynamically injected messages in its internal state.
// We track the insertion index so messages appear at the same position each step.
prepareStep: async (options) => {
let stepOptions = options;
if (
!messageOverride &&
compactBeforeNextStep &&
!compactedMidTurn &&
settings.enableContextCompaction !== false
) {
compactBeforeNextStep = false;
const inFlightTailMessages = options.messages.slice(
baseMessageHistoryCount,
);
const compacted = await maybePerformPendingCompaction({
showOnTopOfCurrentResponse: true,
force: true,
});
if (compacted) {
compactedMidTurn = true;
// Preserve only messages generated after this compaction boundary.
postMidTurnCompactionStartStep = options.stepNumber;
// Clear stale injected messages — their insertAtIndex values are
// based on the pre-compaction message array which has been rebuilt
// with a different (typically smaller) count. Keeping them would
// cause injectMessagesAtPositions to splice at wrong positions.
allInjectedMessages.length = 0;
const compactedMessageHistory = buildChatMessageHistory(
chat.messages,
{
// Keep the structured in-flight assistant/tool messages from
// the current stream instead of the placeholder DB content.
excludeMessageIds: new Set([placeholderMessageId]),
},
);
baseMessageHistoryCount = compactedMessageHistory.length;
stepOptions = {
...options,
// Preserve in-flight turn messages so same-turn tool loops can
// continue, while later turns are compacted via persisted history.
messages: [
...compactedMessageHistory,
...inFlightTailMessages,
],
};
} else {
// Prevent repeated compaction attempts if the first one fails.
compactionFailedMidTurn = true;
}
}
return result;
},
onStepFinish: async (step) => {
if (!hasInjectedPlanningQuestionnaireReflection) {
const questionnaireError =
getPlanningQuestionnaireErrorFromStep(step);
if (questionnaireError) {
pendingUserMessages.push([
{
type: "text",
text: buildPlanningQuestionnaireReflectionMessage(
questionnaireError,
planModeOnly,
),
},
]);
hasInjectedPlanningQuestionnaireReflection = true;
logger.info(
`Injected synthetic planning_questionnaire reflection message for chat ${req.chatId}`,
const preparedStep = prepareStepMessages(
stepOptions,
pendingUserMessages,
allInjectedMessages,
);
}
}
if (
settings.enableContextCompaction === false ||
compactedMidTurn ||
typeof step.usage.totalTokens !== "number"
) {
return;
}
const shouldCompact = await checkAndMarkForCompaction(
req.chatId,
step.usage.totalTokens,
);
// If this step triggered tool calls, compact before the next step
// in this same user turn instead of waiting for the next message.
// Only attempt mid-turn compaction once per turn.
if (
shouldCompact &&
step.toolCalls.length > 0 &&
!compactionFailedMidTurn
) {
compactBeforeNextStep = true;
}
},
onFinish: async (response) => {
const totalTokens = response.usage?.totalTokens;
const inputTokens = response.usage?.inputTokens;
const cachedInputTokens = response.usage?.cachedInputTokens;
logger.log(
"Total tokens used:",
totalTokens,
"Input tokens:",
inputTokens,
"Cached input tokens:",
cachedInputTokens,
"Cache hit ratio:",
cachedInputTokens
? (cachedInputTokens ?? 0) / (inputTokens ?? 0)
: 0,
);
if (typeof totalTokens === "number") {
await db
.update(messages)
.set({ maxTokensUsed: totalTokens })
.where(eq(messages.id, placeholderMessageId))
.catch((err) => logger.error("Failed to save token count", err));
}
},
onError: (error: any) => {
const errorMessage = error?.error?.message || JSON.stringify(error);
logger.error("Local agent stream error:", errorMessage);
safeSend(event.sender, "chat:response:error", {
chatId: req.chatId,
error: `AI error: ${errorMessage}`,
});
},
});
// Process the stream
let inThinkingBlock = false;
let passProducedChatText = false;
try {
for await (const part of streamResult.fullStream) {
if (abortController.signal.aborted) {
logger.log(`Stream aborted for chat ${req.chatId}`);
// Clean up pending consent/questionnaire requests to prevent stale UI banners
clearPendingConsentsForChat(req.chatId);
clearPendingQuestionnairesForChat(req.chatId);
break;
}
let chunk = "";
// prepareStepMessages returns undefined when it has no additional
// injections/cleanups to apply. If we already replaced the base
// message history (e.g., after mid-turn compaction), we still need
// to return the updated options.
let result =
preparedStep ??
(stepOptions === options ? undefined : stepOptions);
return result;
},
onStepFinish: async (step) => {
if (!hasInjectedPlanningQuestionnaireReflection) {
const questionnaireError =
getPlanningQuestionnaireErrorFromStep(step);
if (questionnaireError) {
pendingUserMessages.push([
{
type: "text",
text: buildPlanningQuestionnaireReflectionMessage(
questionnaireError,
planModeOnly,
),
},
]);
hasInjectedPlanningQuestionnaireReflection = true;
logger.info(
`Injected synthetic planning_questionnaire reflection message for chat ${req.chatId}`,
);
}
}
// Handle thinking block transitions
if (
inThinkingBlock &&
!["reasoning-delta", "reasoning-end", "reasoning-start"].includes(
part.type,
)
) {
chunk = "</think>\n";
inThinkingBlock = false;
}
if (
settings.enableContextCompaction === false ||
compactedMidTurn ||
typeof step.usage.totalTokens !== "number"
) {
return;
}
switch (part.type) {
case "text-delta":
passProducedChatText = true;
chunk += part.text;
break;
const shouldCompact = await checkAndMarkForCompaction(
req.chatId,
step.usage.totalTokens,
);
case "reasoning-start":
if (!inThinkingBlock) {
chunk = "<think>";
inThinkingBlock = true;
// If this step triggered tool calls, compact before the next step
// in this same user turn instead of waiting for the next message.
// Only attempt mid-turn compaction once per turn.
if (
shouldCompact &&
step.toolCalls.length > 0 &&
!compactionFailedMidTurn
) {
compactBeforeNextStep = true;
}
},
onFinish: async (response) => {
const totalTokens = response.usage?.totalTokens;
const inputTokens = response.usage?.inputTokens;
const cachedInputTokens = response.usage?.cachedInputTokens;
logger.log(
"Total tokens used:",
totalTokens,
"Input tokens:",
inputTokens,
"Cached input tokens:",
cachedInputTokens,
"Cache hit ratio:",
cachedInputTokens
? (cachedInputTokens ?? 0) / (inputTokens ?? 0)
: 0,
);
if (typeof totalTokens === "number") {
await db
.update(messages)
.set({ maxTokensUsed: totalTokens })
.where(eq(messages.id, placeholderMessageId))
.catch((err) =>
logger.error("Failed to save token count", err),
);
}
break;
},
onError: (error: any) => {
const normalizedError = unwrapStreamError(error);
streamErrorFromCallback = normalizedError;
logger.error(
"Local agent stream error:",
getErrorMessage(normalizedError),
);
},
});
case "reasoning-delta":
if (!inThinkingBlock) {
chunk = "<think>";
inThinkingBlock = true;
let inThinkingBlock = false;
let streamErrorFromIteration: unknown;
try {
for await (const part of streamResult.fullStream) {
if (abortController.signal.aborted) {
logger.log(`Stream aborted for chat ${req.chatId}`);
// Clean up pending consent/questionnaire requests to prevent stale UI banners
clearPendingConsentsForChat(req.chatId);
clearPendingQuestionnairesForChat(req.chatId);
break;
}
chunk += part.text;
break;
case "reasoning-end":
if (inThinkingBlock) {
let chunk = "";
// Handle thinking block transitions
if (
inThinkingBlock &&
![
"reasoning-delta",
"reasoning-end",
"reasoning-start",
].includes(part.type)
) {
chunk = "</think>\n";
inThinkingBlock = false;
}
break;
case "tool-input-start": {
// Initialize streaming state for this tool call
getOrCreateStreamingEntry(part.id, part.toolName);
break;
}
switch (part.type) {
case "text-delta":
passProducedChatText = true;
chunk += part.text;
maybeCaptureRetryReplayText(
activeRetryReplayEvents,
part.text,
);
break;
case "reasoning-start":
if (!inThinkingBlock) {
chunk = "<think>";
inThinkingBlock = true;
}
break;
case "tool-input-delta": {
// Accumulate args and stream XML preview
const entry = getOrCreateStreamingEntry(part.id);
if (entry) {
entry.argsAccumulated += part.delta;
const toolDef = findToolDefinition(entry.toolName);
if (toolDef?.buildXml) {
const argsPartial = parsePartialJson(entry.argsAccumulated);
const xml = toolDef.buildXml(argsPartial, false);
if (xml) {
ctx.onXmlStream(xml);
case "reasoning-delta":
if (!inThinkingBlock) {
chunk = "<think>";
inThinkingBlock = true;
}
chunk += part.text;
break;
case "reasoning-end":
if (inThinkingBlock) {
chunk = "</think>\n";
inThinkingBlock = false;
}
break;
case "tool-input-start": {
// Initialize streaming state for this tool call
getOrCreateStreamingEntry(part.id, part.toolName);
attemptToolInputIds.add(part.id);
break;
}
case "tool-input-delta": {
// Accumulate args and stream XML preview
const entry = getOrCreateStreamingEntry(part.id);
if (entry) {
entry.argsAccumulated += part.delta;
const toolDef = findToolDefinition(entry.toolName);
if (toolDef?.buildXml) {
const argsPartial = parsePartialJson(
entry.argsAccumulated,
);
const xml = toolDef.buildXml(argsPartial, false);
if (xml) {
ctx.onXmlStream(xml);
}
}
}
break;
}
}
break;
}
case "tool-input-end": {
// Build final XML and persist
const entry = getOrCreateStreamingEntry(part.id);
if (entry) {
const toolDef = findToolDefinition(entry.toolName);
if (toolDef?.buildXml) {
const argsPartial = parsePartialJson(entry.argsAccumulated);
const xml = toolDef.buildXml(argsPartial, true);
if (xml) {
ctx.onXmlComplete(xml);
case "tool-input-end": {
// Build final XML and persist
const entry = getOrCreateStreamingEntry(part.id);
if (entry) {
const toolDef = findToolDefinition(entry.toolName);
if (toolDef?.buildXml) {
const argsPartial = parsePartialJson(
entry.argsAccumulated,
);
const xml = toolDef.buildXml(argsPartial, true);
if (xml) {
ctx.onXmlComplete(xml);
}
}
}
cleanupStreamingEntry(part.id);
attemptToolInputIds.delete(part.id);
break;
}
case "tool-call":
maybeCaptureRetryReplayEvent(retryReplayEvents, part);
// Tool execution happens via execute callbacks
break;
case "tool-result":
maybeCaptureRetryReplayEvent(retryReplayEvents, part);
// Tool results are already handled by the execute callback
break;
}
if (chunk) {
fullResponse += chunk;
await updateResponseInDb(placeholderMessageId, fullResponse);
sendResponseChunk(
event,
req.chatId,
chat,
fullResponse,
placeholderMessageId,
hiddenMessageIdsForStreaming,
);
}
cleanupStreamingEntry(part.id);
break;
}
} catch (error) {
if (!abortController.signal.aborted) {
streamErrorFromIteration = error;
} else {
logger.log(
`Stream interrupted after abort for chat ${req.chatId}`,
);
}
}
case "tool-call":
// Tool execution happens via execute callbacks
break;
// Close thinking block if still open
if (inThinkingBlock) {
const closingThinkBlock = "</think>\n";
fullResponse += closingThinkBlock;
await updateResponseInDb(placeholderMessageId, fullResponse);
}
activeRetryReplayEvents = null;
case "tool-result":
// Tool results are already handled by the execute callback
break;
if (abortController.signal.aborted) {
break;
}
if (chunk) {
fullResponse += chunk;
await updateResponseInDb(placeholderMessageId, fullResponse);
sendResponseChunk(
event,
req.chatId,
chat,
fullResponse,
placeholderMessageId,
hiddenMessageIdsForStreaming,
);
const streamError =
streamErrorFromIteration ?? streamErrorFromCallback;
if (streamError) {
if (
shouldRetryTerminatedStreamError({
error: streamError,
retryCount: terminatedRetryCount,
aborted: abortController.signal.aborted,
})
) {
maybeAppendRetryReplayForRetry({
retryReplayEvents,
currentMessageHistoryRef: currentMessageHistory,
accumulatedAiMessagesRef: accumulatedAiMessages,
onCurrentMessageHistoryUpdate: (next) =>
(currentMessageHistory = next),
});
terminatedRetryCount += 1;
needsContinuationInstruction = true;
const retryDelayMs =
STREAM_RETRY_BASE_DELAY_MS * terminatedRetryCount;
sendTelemetryEvent("local_agent:terminated_stream_retry", {
chatId: req.chatId,
retryCount: terminatedRetryCount,
error: String(streamError),
phase: "stream_iteration",
});
logger.warn(
`Transient stream termination for chat ${req.chatId}; retrying pass (${terminatedRetryCount}/${MAX_TERMINATED_STREAM_RETRIES}) after ${retryDelayMs}ms`,
);
await delay(retryDelayMs);
continue;
}
throw streamError;
}
}
} catch (error) {
if (!abortController.signal.aborted) {
throw error;
}
logger.log(`Stream interrupted after abort for chat ${req.chatId}`);
}
// Close thinking block if still open
if (inThinkingBlock) {
fullResponse += "</think>\n";
await updateResponseInDb(placeholderMessageId, fullResponse);
try {
const response = await streamResult.response;
steps = (await streamResult.steps) ?? [];
responseMessages = response.messages;
} catch (err) {
if (
shouldRetryTerminatedStreamError({
error: err,
retryCount: terminatedRetryCount,
aborted: abortController.signal.aborted,
})
) {
maybeAppendRetryReplayForRetry({
retryReplayEvents,
currentMessageHistoryRef: currentMessageHistory,
accumulatedAiMessagesRef: accumulatedAiMessages,
onCurrentMessageHistoryUpdate: (next) =>
(currentMessageHistory = next),
});
terminatedRetryCount += 1;
needsContinuationInstruction = true;
const retryDelayMs =
STREAM_RETRY_BASE_DELAY_MS * terminatedRetryCount;
sendTelemetryEvent("local_agent:terminated_stream_retry", {
chatId: req.chatId,
retryCount: terminatedRetryCount,
error: String(err),
phase: "response_finalization",
});
logger.warn(
`Transient stream termination while finalizing response for chat ${req.chatId}; retrying pass (${terminatedRetryCount}/${MAX_TERMINATED_STREAM_RETRIES}) after ${retryDelayMs}ms`,
);
await delay(retryDelayMs);
continue;
}
logger.warn("Failed to retrieve stream response messages:", err);
steps = [];
responseMessages = [];
}
break;
} finally {
cleanupAttemptToolStreamingEntries();
}
}
// Get response messages for this pass
let responseMessages: ModelMessage[] = [];
let steps: Awaited<typeof streamResult.steps> = [];
try {
const response = await streamResult.response;
steps = await streamResult.steps;
responseMessages = response.messages;
} catch (err) {
logger.warn("Failed to retrieve stream response messages:", err);
if (abortController.signal.aborted) {
break;
}
if (responseMessages.length > 0) {
......@@ -913,7 +1071,7 @@ export async function handleLocalAgentStream(
// We want the step just before compaction to determine how many
// response messages to skip (they belong to pre-compaction context).
const prevStepMessages =
steps[postMidTurnCompactionStartStep - 1]?.response.messages;
steps[postMidTurnCompactionStartStep - 1]?.response?.messages;
if (!prevStepMessages) {
logger.warn(
`No step data found at index ${postMidTurnCompactionStartStep - 1} for mid-turn compaction slicing; persisting all messages`,
......@@ -1057,6 +1215,257 @@ export async function handleLocalAgentStream(
}
}
function buildTerminatedRetryContinuationInstruction(): ModelMessage {
return {
role: "user",
content: [{ type: "text", text: STREAM_CONTINUE_MESSAGE }],
};
}
function unwrapStreamError(error: unknown): unknown {
if (isRecord(error) && "error" in error) {
return error.error;
}
return error;
}
function getErrorMessage(error: unknown): string {
if (error instanceof Error) {
return `${error.name}: ${error.message}`;
}
if (typeof error === "string") {
return error;
}
try {
return JSON.stringify(error);
} catch {
return String(error);
}
}
function isTerminatedStreamError(error: unknown): boolean {
const normalized = unwrapStreamError(error);
const message = getErrorMessage(normalized).toLowerCase();
if (message.includes("typeerror: terminated") || message === "terminated") {
return true;
}
const cause =
isRecord(normalized) && "cause" in normalized
? normalized.cause
: undefined;
if (cause) {
return isTerminatedStreamError(cause);
}
return false;
}
function shouldRetryTerminatedStreamError(params: {
error: unknown;
retryCount: number;
aborted: boolean;
}): boolean {
const { error, retryCount, aborted } = params;
return (
!aborted &&
retryCount < MAX_TERMINATED_STREAM_RETRIES &&
isTerminatedStreamError(error)
);
}
function maybeCaptureRetryReplayEvent(
retryReplayEvents: RetryReplayEvent[],
part: unknown,
): void {
if (!isRecord(part) || typeof part.type !== "string") {
return;
}
if (
part.type === "tool-call" &&
typeof part.toolCallId === "string" &&
typeof part.toolName === "string"
) {
// Keep one emitted tool-call event per toolCallId.
if (
retryReplayEvents.some(
(event) =>
event.type === "tool-call" && event.toolCallId === part.toolCallId,
)
) {
return;
}
retryReplayEvents.push({
type: "tool-call",
toolCallId: part.toolCallId,
toolName: part.toolName,
input: part.input,
});
return;
}
if (
part.type === "tool-result" &&
typeof part.toolCallId === "string" &&
typeof part.toolName === "string"
) {
// Keep one emitted tool-result event per toolCallId.
if (
retryReplayEvents.some(
(event) =>
event.type === "tool-result" && event.toolCallId === part.toolCallId,
)
) {
return;
}
retryReplayEvents.push({
type: "tool-result",
toolCallId: part.toolCallId,
toolName: part.toolName,
output: part.output,
});
}
}
function maybeAppendRetryReplayForRetry(params: {
retryReplayEvents: RetryReplayEvent[];
currentMessageHistoryRef: ModelMessage[];
accumulatedAiMessagesRef: ModelMessage[];
onCurrentMessageHistoryUpdate: (next: ModelMessage[]) => void;
}) {
const {
retryReplayEvents,
currentMessageHistoryRef,
accumulatedAiMessagesRef,
onCurrentMessageHistoryUpdate,
} = params;
const replayMessages: ModelMessage[] = [];
const pendingAssistantParts: Array<
| { type: "text"; text: string }
| {
type: "tool-call";
toolCallId: string;
toolName: string;
input: unknown;
}
> = [];
const toolCallsWithResult = new Set<string>();
const toolResultsWithCall = new Set<string>();
for (const event of retryReplayEvents) {
if (event.type === "tool-call") {
toolResultsWithCall.add(event.toolCallId);
continue;
}
if (event.type === "tool-result") {
toolCallsWithResult.add(event.toolCallId);
}
}
const completedToolExchangeIds = new Set(
[...toolCallsWithResult].filter((toolCallId) =>
toolResultsWithCall.has(toolCallId),
),
);
const flushPendingAssistantMessage = () => {
if (pendingAssistantParts.length === 0) {
return;
}
replayMessages.push({
role: "assistant",
content: [...pendingAssistantParts],
});
pendingAssistantParts.length = 0;
};
for (const event of retryReplayEvents) {
if (event.type === "assistant-text") {
if (!event.text.trim()) {
continue;
}
pendingAssistantParts.push({ type: "text", text: event.text });
continue;
}
if (event.type === "tool-call") {
if (!completedToolExchangeIds.has(event.toolCallId)) {
continue;
}
pendingAssistantParts.push({
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input,
});
continue;
}
if (!completedToolExchangeIds.has(event.toolCallId)) {
continue;
}
flushPendingAssistantMessage();
replayMessages.push({
role: "tool",
content: [
{
type: "tool-result",
toolCallId: event.toolCallId,
toolName: event.toolName,
output: toToolResultOutput(event.output),
},
],
});
}
flushPendingAssistantMessage();
if (replayMessages.length === 0) {
return;
}
onCurrentMessageHistoryUpdate([
...currentMessageHistoryRef,
...replayMessages,
]);
accumulatedAiMessagesRef.push(...replayMessages);
}
function maybeCaptureRetryReplayText(
retryReplayEvents: RetryReplayEvent[] | null,
text: string,
): void {
if (!retryReplayEvents || text.length === 0) {
return;
}
const lastEvent = retryReplayEvents[retryReplayEvents.length - 1];
if (lastEvent?.type === "assistant-text") {
lastEvent.text += text;
return;
}
retryReplayEvents.push({
type: "assistant-text",
text,
});
}
async function delay(ms: number): Promise<void> {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
}
function toToolResultOutput(value: unknown): { type: "text"; value: string } {
if (typeof value === "string") {
return { type: "text", value };
}
try {
return { type: "text", value: JSON.stringify(value) };
} catch {
return { type: "text", value: String(value) };
}
}
async function updateResponseInDb(messageId: number, content: string) {
await db
.update(messages)
......
......@@ -53,11 +53,14 @@ export const createChatCompletionHandler =
// First, check if the LAST user message is a fixture trigger
let localAgentFixture = extractLocalAgentFixture(userTextContent);
// If last message isn't a fixture but contains a todo reminder, search earlier messages
// This handles the outer loop case where a reminder is injected after the original fixture trigger
// Note: This magic string must match the reminder text in prepare_step_utils.ts
// buildTodoReminderMessage(). Update both if the text changes.
if (!localAgentFixture && userTextContent.includes("incomplete todo(s)")) {
// If the last user message is synthetic (e.g., todo reminder or retry
// continuation instruction), search earlier user messages for the original
// fixture trigger.
if (
!localAgentFixture &&
(userTextContent.includes("incomplete todo(s)") ||
userTextContent.includes("previous response stream was interrupted"))
) {
for (const msg of userMessages) {
const textContent = getTextContent(msg);
const fixture = extractLocalAgentFixture(textContent);
......
......@@ -21,6 +21,10 @@ try {
// Cache loaded fixtures to avoid re-importing
const fixtureCache = new Map<string, LocalAgentFixture>();
// Track connection attempts per session+turn for connection drop simulation.
// Key: `${sessionId}-${passIndex}-${turnIndex}`, Value: attempt count
const connectionAttempts = new Map<string, number>();
/**
* Generate a session ID from the first user message
* This allows us to track conversation state across requests
......@@ -246,7 +250,11 @@ async function streamTextResponse(
/**
* Stream a turn with tool calls
*/
async function streamToolCallResponse(res: Response, turn: Turn) {
async function streamToolCallResponse(
res: Response,
turn: Turn,
options?: { dropAfterToolCalls?: boolean },
) {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
......@@ -320,6 +328,16 @@ async function streamToolCallResponse(res: Response, turn: Turn) {
}
}
if (options?.dropAfterToolCalls) {
console.log(
`[local-agent] Simulating connection drop after streaming tool calls`,
);
// Drop before finish_reason/[DONE] so tool calls were emitted but the
// provider response did not complete.
res.socket?.destroy();
return;
}
// 4) Send finish (with optional usage data)
const finishReason =
turn.toolCalls && turn.toolCalls.length > 0 ? "tool_calls" : "stop";
......@@ -340,6 +358,7 @@ async function streamToolCallResponse(res: Response, turn: Turn) {
finishChunk.usage = turn.usage;
}
res.write(`data: ${JSON.stringify(finishChunk)}\n\n`);
res.write("data: [DONE]\n\n");
res.end();
}
......@@ -413,9 +432,64 @@ export async function handleLocalAgentFixture(
}
}
// Check if we should simulate a connection drop for this attempt
const turnScopedDropAttempts =
fixture.dropConnectionByTurn?.find((rule) => rule.turnIndex === turnIndex)
?.attempts ?? fixture.dropConnectionOnAttempts;
const turnScopedDropAfterToolCallAttempts =
fixture.dropConnectionAfterToolCallByTurn?.find(
(rule) => rule.turnIndex === turnIndex,
)?.attempts;
if (turnScopedDropAttempts && turnScopedDropAttempts.length > 0) {
const attemptKey = `${sessionId}-${passIndex}-${turnIndex}`;
const currentAttempt = (connectionAttempts.get(attemptKey) || 0) + 1;
connectionAttempts.set(attemptKey, currentAttempt);
console.log(
`[local-agent] Connection attempt ${currentAttempt} for ${attemptKey}, ` +
`drop on: [${turnScopedDropAttempts.join(", ")}]`,
);
if (turnScopedDropAttempts.includes(currentAttempt)) {
console.log(
`[local-agent] Simulating connection drop on attempt ${currentAttempt}`,
);
// Stream partial data then destroy the socket to simulate a network interruption
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.write(
createStreamChunk(
"Partial response before connection dr",
"assistant",
),
);
// Destroy the underlying socket to trigger a "terminated" error on the client
res.socket?.destroy();
return;
}
}
// If this turn has tool calls, stream them
if (turn.toolCalls && turn.toolCalls.length > 0) {
await streamToolCallResponse(res, turn);
const dropAfterToolCalls =
turnScopedDropAfterToolCallAttempts &&
turnScopedDropAfterToolCallAttempts.length > 0
? (() => {
const attemptKey = `${sessionId}-${passIndex}-${turnIndex}-after-tool-call`;
const currentAttempt =
(connectionAttempts.get(attemptKey) || 0) + 1;
connectionAttempts.set(attemptKey, currentAttempt);
return turnScopedDropAfterToolCallAttempts.includes(
currentAttempt,
);
})()
: false;
await streamToolCallResponse(res, turn, {
dropAfterToolCalls,
});
} else {
// Text-only turn
await streamTextResponse(res, turn.text || "Done.", turn.usage);
......
......@@ -47,4 +47,33 @@ export type LocalAgentFixture = {
* Use this when testing todo follow-up loop behavior.
*/
passes?: Pass[];
/**
* For testing connection resilience: drop the connection on these attempt
* numbers (1-indexed) for the first turn. The fake server will stream partial
* data then destroy the socket, simulating a network interruption.
* E.g., [1] means drop on the 1st attempt, succeed on the 2nd.
*/
dropConnectionOnAttempts?: number[];
/**
* Optional per-turn connection drop configuration.
* Useful for simulating drops after prior tool activity within the same turn.
* Example: [{ turnIndex: 1, attempts: [1] }] drops the first attempt of turn 1.
*/
dropConnectionByTurn?: Array<{
/** 0-based turn index within the active pass */
turnIndex: number;
/** Attempt numbers (1-indexed) to drop for this turn */
attempts: number[];
}>;
/**
* Optional per-turn configuration to drop the connection AFTER streaming
* tool-call chunks for a turn (before [DONE]). This simulates termination in
* the window where a tool call was emitted but no tool result was captured.
*/
dropConnectionAfterToolCallByTurn?: Array<{
/** 0-based turn index within the active pass */
turnIndex: number;
/** Attempt numbers (1-indexed) to drop for this turn */
attempts: number[];
}>;
};
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论