Unverified 提交 b7bcb988 authored 作者: wwwillchen-bot's avatar wwwillchen-bot 提交者: GitHub

fix: group parallel tool results in stream retry replay (#3072)

## Summary - Fixes #3070 (and likely related to #2879) - When retrying after a transient stream termination, parallel tool-call results were split into separate `tool` messages instead of being grouped. This violated the Anthropic API constraint that every `tool_use` in an assistant message must have its `tool_result` in the immediately following message, causing `400 invalid_request_error`. - Extracted replay logic from `local_agent_handler.ts` into `retry_replay_utils.ts` for testability, and fixed `buildRetryReplayMessages` to merge consecutive tool-result entries into a single tool message. ## Test plan - [x] Added 18 new tests in `retry_replay_utils.test.ts` covering: - Parallel tool results grouped into single message (the core fix) - Sequential + parallel mixed scenarios - Incomplete tool exchanges excluded - Event capture deduplication - Edge cases (empty events, whitespace text, null inputs) - [x] All 18 existing `local_agent_handler.test.ts` tests still pass (including stream retry tests) 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- devin-review-badge-begin --> --- <a href="https://app.devin.ai/review/dyad-sh/dyad/pull/3072" target="_blank"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1"> <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open with Devin"> </picture> </a> <!-- devin-review-badge-end --> --------- Co-authored-by: 's avatarWill Chen <7344640+wwwillchen@users.noreply.github.com> Co-authored-by: 's avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
上级 9a6a9bad
差异被折叠。
...@@ -82,6 +82,12 @@ import { ...@@ -82,6 +82,12 @@ import {
import { getPostCompactionMessages } from "@/ipc/handlers/compaction/compaction_utils"; import { getPostCompactionMessages } from "@/ipc/handlers/compaction/compaction_utils";
import { DEFAULT_MAX_TOOL_CALL_STEPS } from "@/constants/settings_constants"; import { DEFAULT_MAX_TOOL_CALL_STEPS } from "@/constants/settings_constants";
import { DyadError, DyadErrorKind } from "@/errors/dyad_error"; import { DyadError, DyadErrorKind } from "@/errors/dyad_error";
import {
type RetryReplayEvent,
maybeCaptureRetryReplayEvent,
maybeCaptureRetryReplayText,
maybeAppendRetryReplayForRetry,
} from "./retry_replay_utils";
const logger = log.scope("local_agent_handler"); const logger = log.scope("local_agent_handler");
const PLANNING_QUESTIONNAIRE_TOOL_NAME = "planning_questionnaire"; const PLANNING_QUESTIONNAIRE_TOOL_NAME = "planning_questionnaire";
...@@ -122,24 +128,6 @@ interface ToolStreamingEntry { ...@@ -122,24 +128,6 @@ interface ToolStreamingEntry {
} }
const toolStreamingEntries = new Map<string, ToolStreamingEntry>(); const toolStreamingEntries = new Map<string, ToolStreamingEntry>();
type RetryReplayEvent =
| {
type: "assistant-text";
text: string;
}
| {
type: "tool-call";
toolCallId: string;
toolName: string;
input: unknown;
}
| {
type: "tool-result";
toolCallId: string;
toolName: string;
output: unknown;
};
function getOrCreateStreamingEntry( function getOrCreateStreamingEntry(
id: string, id: string,
toolName?: string, toolName?: string,
...@@ -1401,201 +1389,10 @@ function shouldRetryTransientStreamError(params: { ...@@ -1401,201 +1389,10 @@ function shouldRetryTransientStreamError(params: {
); );
} }
function maybeCaptureRetryReplayEvent(
retryReplayEvents: RetryReplayEvent[],
part: unknown,
): void {
if (!isRecord(part) || typeof part.type !== "string") {
return;
}
if (
part.type === "tool-call" &&
typeof part.toolCallId === "string" &&
typeof part.toolName === "string"
) {
// Keep one emitted tool-call event per toolCallId.
if (
retryReplayEvents.some(
(event) =>
event.type === "tool-call" && event.toolCallId === part.toolCallId,
)
) {
return;
}
retryReplayEvents.push({
type: "tool-call",
toolCallId: part.toolCallId,
toolName: part.toolName,
input:
typeof part.input === "object" && part.input !== null ? part.input : {},
});
return;
}
if (
part.type === "tool-result" &&
typeof part.toolCallId === "string" &&
typeof part.toolName === "string"
) {
// Keep one emitted tool-result event per toolCallId.
if (
retryReplayEvents.some(
(event) =>
event.type === "tool-result" && event.toolCallId === part.toolCallId,
)
) {
return;
}
retryReplayEvents.push({
type: "tool-result",
toolCallId: part.toolCallId,
toolName: part.toolName,
output: part.output,
});
}
}
function maybeAppendRetryReplayForRetry(params: {
retryReplayEvents: RetryReplayEvent[];
currentMessageHistoryRef: ModelMessage[];
accumulatedAiMessagesRef: ModelMessage[];
onCurrentMessageHistoryUpdate: (next: ModelMessage[]) => void;
}) {
const {
retryReplayEvents,
currentMessageHistoryRef,
accumulatedAiMessagesRef,
onCurrentMessageHistoryUpdate,
} = params;
const replayMessages: ModelMessage[] = [];
const pendingAssistantParts: Array<
| { type: "text"; text: string }
| {
type: "tool-call";
toolCallId: string;
toolName: string;
input: unknown;
}
> = [];
const toolCallsWithResult = new Set<string>();
const toolResultsWithCall = new Set<string>();
for (const event of retryReplayEvents) {
if (event.type === "tool-call") {
toolResultsWithCall.add(event.toolCallId);
continue;
}
if (event.type === "tool-result") {
toolCallsWithResult.add(event.toolCallId);
}
}
const completedToolExchangeIds = new Set(
[...toolCallsWithResult].filter((toolCallId) =>
toolResultsWithCall.has(toolCallId),
),
);
const flushPendingAssistantMessage = () => {
if (pendingAssistantParts.length === 0) {
return;
}
replayMessages.push({
role: "assistant",
content: [...pendingAssistantParts],
});
pendingAssistantParts.length = 0;
};
for (const event of retryReplayEvents) {
if (event.type === "assistant-text") {
if (!event.text.trim()) {
continue;
}
pendingAssistantParts.push({ type: "text", text: event.text });
continue;
}
if (event.type === "tool-call") {
if (!completedToolExchangeIds.has(event.toolCallId)) {
continue;
}
pendingAssistantParts.push({
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input,
});
continue;
}
if (!completedToolExchangeIds.has(event.toolCallId)) {
continue;
}
flushPendingAssistantMessage();
replayMessages.push({
role: "tool",
content: [
{
type: "tool-result",
toolCallId: event.toolCallId,
toolName: event.toolName,
output: toToolResultOutput(event.output),
},
],
});
}
flushPendingAssistantMessage();
if (replayMessages.length === 0) {
return;
}
onCurrentMessageHistoryUpdate([
...currentMessageHistoryRef,
...replayMessages,
]);
accumulatedAiMessagesRef.push(...replayMessages);
}
function maybeCaptureRetryReplayText(
retryReplayEvents: RetryReplayEvent[] | null,
text: string,
): void {
if (!retryReplayEvents || text.length === 0) {
return;
}
const lastEvent = retryReplayEvents[retryReplayEvents.length - 1];
if (lastEvent?.type === "assistant-text") {
lastEvent.text += text;
return;
}
retryReplayEvents.push({
type: "assistant-text",
text,
});
}
async function delay(ms: number): Promise<void> { async function delay(ms: number): Promise<void> {
await new Promise<void>((resolve) => setTimeout(resolve, ms)); await new Promise<void>((resolve) => setTimeout(resolve, ms));
} }
function toToolResultOutput(value: unknown): { type: "text"; value: string } {
if (typeof value === "string") {
return { type: "text", value };
}
try {
return { type: "text", value: JSON.stringify(value) };
} catch {
return { type: "text", value: String(value) };
}
}
async function updateResponseInDb(messageId: number, content: string) { async function updateResponseInDb(messageId: number, content: string) {
await db await db
.update(messages) .update(messages)
......
/**
* Utilities for building replay messages when retrying after a transient
* stream termination. Extracted for testability.
*/
import type { ModelMessage } from "ai";
export type RetryReplayEvent =
| {
type: "assistant-text";
text: string;
}
| {
type: "tool-call";
toolCallId: string;
toolName: string;
input: unknown;
}
| {
type: "tool-result";
toolCallId: string;
toolName: string;
output: unknown;
};
export function toToolResultOutput(value: unknown): {
type: "text";
value: string;
} {
if (typeof value === "string") {
return { type: "text", value };
}
try {
return { type: "text", value: JSON.stringify(value) };
} catch {
return { type: "text", value: String(value) };
}
}
export function maybeCaptureRetryReplayEvent(
retryReplayEvents: RetryReplayEvent[],
part: unknown,
): void {
if (
!part ||
typeof part !== "object" ||
!("type" in part) ||
typeof (part as Record<string, unknown>).type !== "string"
) {
return;
}
const record = part as Record<string, unknown>;
if (
record.type === "tool-call" &&
typeof record.toolCallId === "string" &&
typeof record.toolName === "string"
) {
if (
retryReplayEvents.some(
(event) =>
event.type === "tool-call" && event.toolCallId === record.toolCallId,
)
) {
return;
}
retryReplayEvents.push({
type: "tool-call",
toolCallId: record.toolCallId,
toolName: record.toolName,
input:
typeof record.input === "object" && record.input !== null
? record.input
: {},
});
return;
}
if (
record.type === "tool-result" &&
typeof record.toolCallId === "string" &&
typeof record.toolName === "string"
) {
if (
retryReplayEvents.some(
(event) =>
event.type === "tool-result" &&
event.toolCallId === record.toolCallId,
)
) {
return;
}
retryReplayEvents.push({
type: "tool-result",
toolCallId: record.toolCallId,
toolName: record.toolName,
output: record.output,
});
}
}
export function maybeCaptureRetryReplayText(
retryReplayEvents: RetryReplayEvent[] | null,
text: string,
): void {
if (!retryReplayEvents || text.length === 0) {
return;
}
const lastEvent = retryReplayEvents[retryReplayEvents.length - 1];
if (lastEvent?.type === "assistant-text") {
lastEvent.text += text;
return;
}
retryReplayEvents.push({
type: "assistant-text",
text,
});
}
/**
* Builds replay messages from captured stream events for retry after a
* transient stream termination. Only includes completed tool exchanges
* (tool-call + tool-result pairs).
*/
export function buildRetryReplayMessages(
retryReplayEvents: RetryReplayEvent[],
): ModelMessage[] {
const replayMessages: ModelMessage[] = [];
const pendingAssistantParts: Array<
| { type: "text"; text: string }
| {
type: "tool-call";
toolCallId: string;
toolName: string;
input: unknown;
}
> = [];
const toolCallsWithResult = new Set<string>();
const toolResultsWithCall = new Set<string>();
for (const event of retryReplayEvents) {
if (event.type === "tool-call") {
toolResultsWithCall.add(event.toolCallId);
continue;
}
if (event.type === "tool-result") {
toolCallsWithResult.add(event.toolCallId);
}
}
const completedToolExchangeIds = new Set(
[...toolCallsWithResult].filter((toolCallId) =>
toolResultsWithCall.has(toolCallId),
),
);
const flushPendingAssistantMessage = () => {
if (pendingAssistantParts.length === 0) {
return;
}
replayMessages.push({
role: "assistant",
content: [...pendingAssistantParts],
});
pendingAssistantParts.length = 0;
};
for (const event of retryReplayEvents) {
if (event.type === "assistant-text") {
if (!event.text.trim()) {
continue;
}
pendingAssistantParts.push({ type: "text", text: event.text });
continue;
}
if (event.type === "tool-call") {
if (!completedToolExchangeIds.has(event.toolCallId)) {
continue;
}
pendingAssistantParts.push({
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input,
});
continue;
}
if (!completedToolExchangeIds.has(event.toolCallId)) {
continue;
}
flushPendingAssistantMessage();
// Merge consecutive tool-result messages so parallel tool results stay
// grouped with the preceding assistant message's tool-call blocks.
// The Anthropic API requires every tool_use in an assistant message to
// have its tool_result in the immediately following message.
const lastReplayMsg = replayMessages[replayMessages.length - 1];
if (
lastReplayMsg?.role === "tool" &&
Array.isArray(lastReplayMsg.content)
) {
lastReplayMsg.content.push({
type: "tool-result",
toolCallId: event.toolCallId,
toolName: event.toolName,
output: toToolResultOutput(event.output),
});
} else {
replayMessages.push({
role: "tool",
content: [
{
type: "tool-result",
toolCallId: event.toolCallId,
toolName: event.toolName,
output: toToolResultOutput(event.output),
},
],
});
}
}
flushPendingAssistantMessage();
return replayMessages;
}
export function maybeAppendRetryReplayForRetry(params: {
retryReplayEvents: RetryReplayEvent[];
currentMessageHistoryRef: ModelMessage[];
accumulatedAiMessagesRef: ModelMessage[];
onCurrentMessageHistoryUpdate: (next: ModelMessage[]) => void;
}) {
const {
retryReplayEvents,
currentMessageHistoryRef,
accumulatedAiMessagesRef,
onCurrentMessageHistoryUpdate,
} = params;
const replayMessages = buildRetryReplayMessages(retryReplayEvents);
if (replayMessages.length === 0) {
return;
}
onCurrentMessageHistoryUpdate([
...currentMessageHistoryRef,
...replayMessages,
]);
accumulatedAiMessagesRef.push(...replayMessages);
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论