Unverified 提交 60e7444c authored 作者: Will Chen's avatar Will Chen 提交者: GitHub

fix: retry transient local agent server errors (#3044)

## Summary - retry local-agent stream passes when providers emit transient server errors like Azure `server_error` - keep the existing terminated-stream continuation path and apply it to retryable provider-side failures - add a regression test covering the structured provider error event shape ## Test plan - npm run fmt - npm run lint:fix - npm run ts - npm test 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- devin-review-badge-begin --> --- <a href="https://app.devin.ai/review/dyad-sh/dyad/pull/3044" target="_blank"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1"> <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open with Devin"> </picture> </a> <!-- devin-review-badge-end -->
上级 32f281f6
......@@ -10,3 +10,7 @@ Agent tool definitions live in `src/pro/main/ipc/handlers/local_agent/tools/`. E
## Async I/O
- Use `fs.promises` (not sync `fs` methods) in any code running on the Electron main process (e.g., `todo_persistence.ts`) to avoid blocking the event loop.
## Stream retries
- When extending `handleLocalAgentStream` retry behavior, do not only match transport errors like `"terminated"`. Providers can emit structured stream errors such as `{ type: "error", error: { type: "server_error", ... } }`, and those transient 5xx / rate-limit failures need explicit retry classification too.
......@@ -1026,6 +1026,86 @@ describe("handleLocalAgentStream", () => {
expect(hasReplayedToolCall).toBe(true);
expect(hasReplayedToolResult).toBe(true);
});
it("should retry and resume when the provider emits a retryable server error", async () => {
// Arrange
const { event, getMessagesByChannel } = createFakeEvent();
mockSettings = buildTestSettings({ enableDyadPro: true });
mockChatData = buildTestChat();
const streamMessagesByAttempt: any[][] = [];
let attemptCount = 0;
mockStreamTextImpl = (options) => {
attemptCount += 1;
streamMessagesByAttempt.push(options.messages ?? []);
if (attemptCount === 1) {
return {
fullStream: (async function* () {
throw {
type: "error",
sequence_number: 0,
error: {
type: "server_error",
code: "server_error",
message: "The server had an error processing your request.",
},
};
})(),
response: Promise.resolve({ messages: [] }),
steps: Promise.resolve([]),
};
}
return {
fullStream: (async function* () {
yield { type: "text-delta", text: "Recovered after retry." };
})(),
response: Promise.resolve({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "Recovered after retry." }],
},
],
}),
steps: Promise.resolve([{ toolCalls: [] }]),
};
};
// Act
await handleLocalAgentStream(
event,
{ chatId: 1, prompt: "test" },
new AbortController(),
{
placeholderMessageId: 10,
systemPrompt: "You are helpful",
dyadRequestId,
},
);
// Assert
expect(attemptCount).toBe(2);
expect(getMessagesByChannel("chat:response:error")).toHaveLength(0);
const continuationInstructionFound = (
streamMessagesByAttempt[1] ?? []
).some(
(message: any) =>
message.role === "user" &&
Array.isArray(message.content) &&
message.content.some(
(part: any) =>
part.type === "text" &&
typeof part.text === "string" &&
part.text.includes(
"previous response stream was interrupted by a transient network error",
),
),
);
expect(continuationInstructionFound).toBe(true);
});
});
describe("Stream processing - reasoning blocks", () => {
......
......@@ -80,6 +80,7 @@ import {
checkAndMarkForCompaction,
} from "@/ipc/handlers/compaction/compaction_handler";
import { getPostCompactionMessages } from "@/ipc/handlers/compaction/compaction_utils";
import { DEFAULT_MAX_TOOL_CALL_STEPS } from "@/constants/settings_constants";
const logger = log.scope("local_agent_handler");
const PLANNING_QUESTIONNAIRE_TOOL_NAME = "planning_questionnaire";
......@@ -87,7 +88,25 @@ const MAX_TERMINATED_STREAM_RETRIES = 3;
const STREAM_RETRY_BASE_DELAY_MS = 400;
const STREAM_CONTINUE_MESSAGE =
"[System] Your previous response stream was interrupted by a transient network error. Continue from exactly where you left off and do not repeat text that has already been sent.";
import { DEFAULT_MAX_TOOL_CALL_STEPS } from "@/constants/settings_constants";
const RETRYABLE_STREAM_ERROR_STATUS_CODES = new Set([
408, 429, 500, 502, 503, 504,
]);
const RETRYABLE_STREAM_ERROR_PATTERNS = [
"server_error",
"internal server error",
"service unavailable",
"bad gateway",
"gateway timeout",
"too many requests",
"rate_limit",
"overloaded",
"econnrefused",
"enotfound",
"econnreset",
"epipe",
"etimedout",
];
// ============================================================================
// Tool Streaming State Management
......@@ -994,7 +1013,7 @@ export async function handleLocalAgentStream(
streamErrorFromIteration ?? streamErrorFromCallback;
if (streamError) {
if (
shouldRetryTerminatedStreamError({
shouldRetryTransientStreamError({
error: streamError,
retryCount: terminatedRetryCount,
aborted: abortController.signal.aborted,
......@@ -1043,7 +1062,7 @@ export async function handleLocalAgentStream(
responseMessages = response.messages;
} catch (err) {
if (
shouldRetryTerminatedStreamError({
shouldRetryTransientStreamError({
error: err,
retryCount: terminatedRetryCount,
aborted: abortController.signal.aborted,
......@@ -1329,7 +1348,43 @@ function isTerminatedStreamError(error: unknown): boolean {
return false;
}
function shouldRetryTerminatedStreamError(params: {
function isRetryableProviderStreamError(error: unknown): boolean {
const normalized = unwrapStreamError(error);
if (!isRecord(normalized)) {
return false;
}
const statusCode =
(typeof normalized.statusCode === "number" && normalized.statusCode) ||
(typeof normalized.status === "number" && normalized.status) ||
(isRecord(normalized.response) &&
typeof normalized.response.status === "number"
? normalized.response.status
: undefined);
if (
typeof statusCode === "number" &&
(statusCode >= 500 || RETRYABLE_STREAM_ERROR_STATUS_CODES.has(statusCode))
) {
return true;
}
const errorString =
[
typeof normalized.message === "string" ? normalized.message : undefined,
typeof normalized.code === "string" ? normalized.code : undefined,
typeof normalized.type === "string" ? normalized.type : undefined,
]
.filter(Boolean)
.join(" ")
.toLowerCase() || getErrorMessage(normalized).toLowerCase();
return RETRYABLE_STREAM_ERROR_PATTERNS.some((pattern) =>
errorString.includes(pattern),
);
}
function shouldRetryTransientStreamError(params: {
error: unknown;
retryCount: number;
aborted: boolean;
......@@ -1338,7 +1393,7 @@ function shouldRetryTerminatedStreamError(params: {
return (
!aborted &&
retryCount < MAX_TERMINATED_STREAM_RETRIES &&
isTerminatedStreamError(error)
(isTerminatedStreamError(error) || isRetryableProviderStreamError(error))
);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论