Unverified 提交 54444ddf authored 作者: wwwillchen-bot's avatar wwwillchen-bot 提交者: GitHub

feat: ensure local agent completes todos before ending turn (#2601)

## Summary - When a local agent ends its turn with incomplete todos (pending or in_progress), a reminder message is now injected telling it to continue and complete the remaining tasks - This only happens once per turn to avoid infinite loops - Added `hasIncompleteTodos()` and `buildTodoReminderMessage()` helpers to prepare_step_utils.ts - Added `TodoReminderState` to track whether a reminder has already been sent this turn Fixes #2600 ## Test plan - Unit tests added for: - `hasIncompleteTodos()` - correctly detects pending/in_progress todos - `buildTodoReminderMessage()` - builds proper reminder message listing incomplete todos - `prepareStepMessages()` with todoContext: - Injects reminder when agent finishes with incomplete todos - Does not inject reminder when already reminded this turn - Does not inject reminder when all todos are completed - Does not inject reminder when agent has pending tool calls - Combines reminder with existing injected messages 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- devin-review-badge-begin --> --- <a href="https://app.devin.ai/review/dyad-sh/dyad/pull/2601" target="_blank"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1"> <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open with Devin"> </picture> </a> <!-- devin-review-badge-end --> <!-- This is an auto-generated description by cubic. --> --- ## Summary by cubic Ensures the local agent completes remaining todos before ending a turn by running a one-time outer-loop follow-up pass that adds a reminder. The reminder is not persisted. Meets #2600. - **New Features** - Outer-loop detection via shouldRunTodoFollowUpPass: runs one follow-up pass when the final step has no tool calls and incomplete todos remain; skips in read-only and plan modes. - Helpers hasIncompleteTodos() and buildTodoReminderMessage(); multi-pass E2E fixture and test; fake LLM server scans all user messages and counts todo reminders to drive passes. - **Refactors** - Removed inner-loop reminder injection from prepareStepMessages; tests cleaned up. - Restructured local_agent_handler into a controlled pass loop with createdAt guards, baseMessageHistoryCount and compaction state reset each pass, AI messages persisted across passes, and synthetic todo reminders excluded from aiMessagesJson; updated compaction test to include toolCalls in mock steps. <sup>Written for commit 70b9c5a6595c5b024d25665e785d93dd77a3076f. Summary will update on new commits.</sup> <!-- End of auto-generated description by cubic. --> --------- Co-authored-by: 's avatarWill Chen <willchen90@gmail.com> Co-authored-by: 's avatarClaude Opus 4.5 <noreply@anthropic.com>
上级 30ed10aa
import type { LocalAgentFixture } from "../../../../testing/fake-llm-server/localAgentTypes";
/**
* Fixture that tests the outer loop todo follow-up behavior:
*
* Pass 1: Agent creates 3 todos, completes only 1 of them, then emits chat text.
* The outer loop detects incomplete todos and sends a reminder.
*
* Pass 2: After receiving the todo reminder, agent completes the remaining 2 todos.
*
* This tests that the outer loop correctly:
* 1. Detects incomplete todos after a pass
* 2. Injects a reminder message
* 3. Runs another pass to allow the agent to complete remaining work
*/
export const fixture: LocalAgentFixture = {
description: "Test outer loop todo follow-up when todos are partially complete",
passes: [
{
// First pass: Create todos and partially complete them
turns: [
{
text: "I'll create a todo list to track these tasks.",
toolCalls: [
{
name: "update_todos",
args: {
merge: false,
todos: [
{
id: "todo-1",
content: "Create utility function",
status: "in_progress",
},
{
id: "todo-2",
content: "Write unit tests",
status: "pending",
},
{
id: "todo-3",
content: "Update documentation",
status: "pending",
},
],
},
},
],
},
{
text: "Let me create the utility function first.",
toolCalls: [
{
name: "write_file",
args: {
path: "src/utils/helper.ts",
content:
"export function helper(x: number): number {\n return x * 2;\n}\n",
description: "Create helper utility function",
},
},
],
},
{
text: "Now marking the first task as done.",
toolCalls: [
{
name: "update_todos",
args: {
merge: true,
todos: [
{
id: "todo-1",
status: "completed",
},
],
},
},
],
},
{
// This text-only response triggers the outer loop check.
// Since there are still incomplete todos, it will inject a reminder.
text: "I've completed the utility function. Let me continue with the remaining tasks.",
},
],
},
{
// Second pass: After receiving todo reminder, complete remaining tasks
turns: [
{
text: "I see there are still incomplete todos. Let me write the unit tests.",
toolCalls: [
{
name: "write_file",
args: {
path: "src/utils/helper.test.ts",
content:
'import { helper } from "./helper";\n\ntest("helper doubles input", () => {\n expect(helper(5)).toBe(10);\n});\n',
description: "Create unit tests for helper",
},
},
],
},
{
text: "Marking tests as done.",
toolCalls: [
{
name: "update_todos",
args: {
merge: true,
todos: [
{
id: "todo-2",
status: "completed",
},
],
},
},
],
},
{
text: "Now updating the documentation.",
toolCalls: [
{
name: "write_file",
args: {
path: "src/utils/README.md",
content:
"# Utils\n\n## helper(x)\n\nDoubles the input number.\n",
description: "Update documentation",
},
},
],
},
{
text: "Marking documentation as done.",
toolCalls: [
{
name: "update_todos",
args: {
merge: true,
todos: [
{
id: "todo-3",
status: "completed",
},
],
},
},
],
},
{
// All todos complete - no more follow-up passes
text: "All tasks are now complete! I've created the utility function, written unit tests, and updated the documentation.",
},
],
},
],
};
import { testSkipIfWindows } from "./helpers/test_helper";
/**
* E2E test for the outer loop todo follow-up behavior.
*
* This tests that when an agent creates a todo list but only partially
* completes it in the first pass, the outer loop will:
* 1. Detect incomplete todos
* 2. Inject a reminder message
* 3. Run another pass to complete the remaining todos
*
* Related to issue #2601
*/
testSkipIfWindows("local-agent - todo follow-up loop", async ({ po }) => {
await po.setUpDyadPro({ localAgent: true });
await po.importApp("minimal");
await po.chatActions.selectLocalAgentMode();
// Send prompt that triggers the todo follow-up loop fixture
await po.sendPrompt("tc=local-agent/todo-followup-loop");
// Snapshot the final messages to verify:
// 1. All todos were created and completed across two passes
// 2. The todo reminder was injected between passes
// 3. Files were created in both passes
await po.snapshotMessages();
// Verify files were created in both passes
await po.snapshotAppFiles({
name: "after-todo-followup",
files: [
"src/utils/helper.ts", // Created in pass 1
"src/utils/helper.test.ts", // Created in pass 2
"src/utils/README.md", // Created in pass 2
],
});
});
=== src/utils/helper.test.ts ===
import { helper } from "./helper";
test("helper doubles input", () => {
expect(helper(5)).toBe(10);
});
=== src/utils/helper.ts ===
export function helper(x: number): number {
return x * 2;
}
=== src/utils/README.md ===
# Utils
## helper(x)
Doubles the input number.
- paragraph: /Generate an AI_RULES\.md file for this app\. Describe the tech stack in 5-\d+ bullet points and describe clear rules about what libraries to use for what\./
- button "file1.txt file1.txt Edit":
- img
- text: ""
- button "Edit":
- img
- text: ""
- img
- paragraph: More EOM
- button "Copy":
- img
- img
- text: Approved
- img
- text: claude-opus-4-5
- img
- text: less than a minute ago
- button "Copy Request ID":
- img
- text: ""
- paragraph: tc=local-agent/todo-followup-loop
- paragraph: I'll create a todo list to track these tasks.Let me create the utility function first.
- 'button "helper.ts src/utils/helper.ts Edit Summary: Create helper utility function"':
- img
- text: ""
- button "Edit":
- img
- text: ""
- img
- text: ""
- paragraph: Now marking the first task as done.I've completed the utility function. Let me continue with the remaining tasks.I see there are still incomplete todos. Let me write the unit tests.
- 'button "helper.test.ts src/utils/helper.test.ts Edit Summary: Create unit tests for helper"':
- img
- text: ""
- button "Edit":
- img
- text: ""
- img
- text: ""
- paragraph: Marking tests as done.Now updating the documentation.
- 'button "README.md src/utils/README.md Edit Summary: Update documentation"':
- img
- text: ""
- button "Edit":
- img
- text: ""
- img
- text: ""
- paragraph: Marking documentation as done.All tasks are now complete! I've created the utility function, written unit tests, and updated the documentation.
- button "Copy":
- img
- img
- text: claude-opus-4-5
- img
- text: less than a minute ago
- button "Copy Request ID":
- img
- text: ""
- button "Undo":
- img
- text: ""
- button "Retry":
- img
- text: ""
\ No newline at end of file
...@@ -83,6 +83,7 @@ The stashed changes will be automatically merged back after the rebase completes ...@@ -83,6 +83,7 @@ The stashed changes will be automatically merged back after the rebase completes
- When rebasing documentation/table conflicts (e.g., workflow README tables), prefer keeping **both** additions from HEAD and upstream - merge new rows/content from both branches rather than choosing one side - When rebasing documentation/table conflicts (e.g., workflow README tables), prefer keeping **both** additions from HEAD and upstream - merge new rows/content from both branches rather than choosing one side
- **Complementary additions**: When both sides added new sections at the end of a file (e.g., both added different documentation tips), keep both sections rather than choosing one — they're not truly conflicting, just different additions - **Complementary additions**: When both sides added new sections at the end of a file (e.g., both added different documentation tips), keep both sections rather than choosing one — they're not truly conflicting, just different additions
- **React component wrapper conflicts**: When rebasing UI changes that conflict on wrapper div classes (e.g., `flex items-start space-x-2` vs `flex items-end gap-1`), keep the newer styling from the incoming commit but preserve any functional components (like dialogs or modals) that exist in HEAD but not in the incoming change - **React component wrapper conflicts**: When rebasing UI changes that conflict on wrapper div classes (e.g., `flex items-start space-x-2` vs `flex items-end gap-1`), keep the newer styling from the incoming commit but preserve any functional components (like dialogs or modals) that exist in HEAD but not in the incoming change
- **Refactoring conflicts**: When incoming commits refactor code (e.g., extracting inline logic into helper functions), and HEAD has new features in the same area, integrate HEAD's features into the new structure. Example: if incoming code moves streaming logic to `runSingleStreamPass()` and HEAD adds mid-turn compaction to the inline code, add compaction support to the new function rather than keeping the old inline version
## Rebasing with uncommitted changes ## Rebasing with uncommitted changes
......
...@@ -760,6 +760,7 @@ describe("handleLocalAgentStream", () => { ...@@ -760,6 +760,7 @@ describe("handleLocalAgentStream", () => {
response: { response: {
messages: [...preCompactionGenerated], messages: [...preCompactionGenerated],
}, },
toolCalls: [{}], // First step has tool calls
}, },
{ {
response: { response: {
...@@ -768,6 +769,7 @@ describe("handleLocalAgentStream", () => { ...@@ -768,6 +769,7 @@ describe("handleLocalAgentStream", () => {
...postCompactionGenerated, ...postCompactionGenerated,
], ],
}, },
toolCalls: [], // Last step has no tool calls (ended with text)
}, },
]), ]),
}; };
......
...@@ -4,9 +4,14 @@ import { ...@@ -4,9 +4,14 @@ import {
processPendingMessages, processPendingMessages,
injectMessagesAtPositions, injectMessagesAtPositions,
prepareStepMessages, prepareStepMessages,
hasIncompleteTodos,
buildTodoReminderMessage,
type InjectedMessage, type InjectedMessage,
} from "@/pro/main/ipc/handlers/local_agent/prepare_step_utils"; } from "@/pro/main/ipc/handlers/local_agent/prepare_step_utils";
import type { UserMessageContentPart } from "@/pro/main/ipc/handlers/local_agent/tools/types"; import type {
UserMessageContentPart,
Todo,
} from "@/pro/main/ipc/handlers/local_agent/tools/types";
import { ImagePart, ModelMessage } from "ai"; import { ImagePart, ModelMessage } from "ai";
describe("prepare_step_utils", () => { describe("prepare_step_utils", () => {
...@@ -798,4 +803,99 @@ describe("prepare_step_utils", () => { ...@@ -798,4 +803,99 @@ describe("prepare_step_utils", () => {
).toBe("encrypted-data"); ).toBe("encrypted-data");
}); });
}); });
describe("hasIncompleteTodos", () => {
it("returns true when there are pending todos", () => {
const todos: Todo[] = [
{ id: "1", content: "Task 1", status: "pending" },
{ id: "2", content: "Task 2", status: "completed" },
];
expect(hasIncompleteTodos(todos)).toBe(true);
});
it("returns true when there are in_progress todos", () => {
const todos: Todo[] = [
{ id: "1", content: "Task 1", status: "in_progress" },
{ id: "2", content: "Task 2", status: "completed" },
];
expect(hasIncompleteTodos(todos)).toBe(true);
});
it("returns false when all todos are completed", () => {
const todos: Todo[] = [
{ id: "1", content: "Task 1", status: "completed" },
{ id: "2", content: "Task 2", status: "completed" },
];
expect(hasIncompleteTodos(todos)).toBe(false);
});
it("returns false when there are no todos", () => {
const todos: Todo[] = [];
expect(hasIncompleteTodos(todos)).toBe(false);
});
});
describe("buildTodoReminderMessage", () => {
it("builds a message listing incomplete todos", () => {
const todos: Todo[] = [
{ id: "1", content: "Implement feature A", status: "in_progress" },
{ id: "2", content: "Write tests", status: "pending" },
{ id: "3", content: "Setup project", status: "completed" },
];
const message = buildTodoReminderMessage(todos);
expect(message).toContain("2 incomplete todo(s)");
expect(message).toContain("[in_progress] Implement feature A");
expect(message).toContain("[pending] Write tests");
expect(message).not.toContain("Setup project");
});
it("handles a single incomplete todo", () => {
const todos: Todo[] = [
{ id: "1", content: "Last task", status: "pending" },
];
const message = buildTodoReminderMessage(todos);
expect(message).toContain("1 incomplete todo(s)");
expect(message).toContain("[pending] Last task");
});
});
describe("prepareStepMessages with injected messages", () => {
it("works with existing injected messages", () => {
const pendingUserMessages: UserMessageContentPart[][] = [];
const allInjectedMessages: InjectedMessage[] = [
{
insertAtIndex: 1,
sequence: 0,
message: {
role: "user",
content: [{ type: "text", text: "Screenshot from crawl" }],
},
},
];
const messages: ModelMessage[] = [
{ role: "user", content: "Build an app" },
{ role: "assistant", content: "I analyzed the screenshot." },
];
const result = prepareStepMessages(
{ messages },
pendingUserMessages,
allInjectedMessages,
);
expect(result).toBeDefined();
// Should have: user message, injected screenshot, assistant message
expect(result!.messages).toHaveLength(3);
expect(result!.messages[0].role).toBe("user");
expect((result!.messages[1].content as { text: string }[])[0].text).toBe(
"Screenshot from crawl",
);
expect(result!.messages[2].role).toBe("assistant");
});
});
}); });
...@@ -53,6 +53,8 @@ import { ...@@ -53,6 +53,8 @@ import {
import { sendTelemetryEvent } from "@/ipc/utils/telemetry"; import { sendTelemetryEvent } from "@/ipc/utils/telemetry";
import { import {
prepareStepMessages, prepareStepMessages,
buildTodoReminderMessage,
hasIncompleteTodos,
type InjectedMessage, type InjectedMessage,
} from "./prepare_step_utils"; } from "./prepare_step_utils";
import { TOOL_DEFINITIONS } from "./tool_definitions"; import { TOOL_DEFINITIONS } from "./tool_definitions";
...@@ -505,312 +507,406 @@ export async function handleLocalAgentStream( ...@@ -505,312 +507,406 @@ export async function handleLocalAgentStream(
let compactedMidTurn = false; let compactedMidTurn = false;
let compactionFailedMidTurn = false; let compactionFailedMidTurn = false;
// Stream the response const maxOutputTokens = await getMaxTokens(settings.selectedModel);
const streamResult = streamText({ const temperature = await getTemperature(settings.selectedModel);
model: modelClient.model,
headers: getAiHeaders({ // Run one or more generation passes. If the model emits a chat message while
builtinProviderId: modelClient.builtinProviderId, // there are still incomplete todos, we append a reminder and do another pass.
}), const maxTodoFollowUpLoops = 1;
providerOptions: getProviderOptions({ let todoFollowUpLoops = 0;
dyadAppId: chat.app.id, let currentMessageHistory = messageHistory;
dyadRequestId, const accumulatedAiMessages: ModelMessage[] = [];
dyadDisableFiles: true, // Local agent uses tools, not file injection
files: [], while (!abortController.signal.aborted) {
mentionedAppsCodebases: [], // Reset mid-turn compaction state at the start of each pass.
builtinProviderId: modelClient.builtinProviderId, // These flags track compaction within a single pass and must not persist
settings, // across passes (e.g., todo follow-up passes).
}), compactedMidTurn = false;
maxOutputTokens: await getMaxTokens(settings.selectedModel), compactionFailedMidTurn = false;
temperature: await getTemperature(settings.selectedModel), compactBeforeNextStep = false;
maxRetries: 2, postMidTurnCompactionStartStep = null;
system: systemPrompt, baseMessageHistoryCount = currentMessageHistory.length;
messages: messageHistory,
tools: allTools, // Stream the response
stopWhen: [ const streamResult = streamText({
stepCountIs(25), model: modelClient.model,
hasToolCall(addIntegrationTool.name), headers: getAiHeaders({
// In plan mode, stop immediately after presenting a questionnaire, builtinProviderId: modelClient.builtinProviderId,
// writing a plan, or exiting plan mode so the agent yields control }),
// back to the user. Without this, some models (e.g. Gemini Pro 3) providerOptions: getProviderOptions({
// ignore the prompt-level "STOP" instruction and keep calling tools dyadAppId: chat.app.id,
// in a loop. dyadRequestId,
...(planModeOnly dyadDisableFiles: true, // Local agent uses tools, not file injection
? [ files: [],
hasToolCall(planningQuestionnaireTool.name), mentionedAppsCodebases: [],
hasToolCall(writePlanTool.name), builtinProviderId: modelClient.builtinProviderId,
hasToolCall(exitPlanTool.name), settings,
] }),
: []), maxOutputTokens,
], temperature,
abortSignal: abortController.signal, maxRetries: 2,
// Inject pending user messages (e.g., images from web_crawl) between steps system: systemPrompt,
// We must re-inject all accumulated messages each step because the AI SDK messages: currentMessageHistory,
// doesn't persist dynamically injected messages in its internal state. tools: allTools,
// We track the insertion index so messages appear at the same position each step. stopWhen: [
prepareStep: async (options) => { stepCountIs(25),
let stepOptions = options; hasToolCall(addIntegrationTool.name),
// In plan mode, stop immediately after presenting a questionnaire,
if ( // writing a plan, or exiting plan mode so the agent yields control
!messageOverride && // back to the user. Without this, some models (e.g. Gemini Pro 3)
compactBeforeNextStep && // ignore the prompt-level "STOP" instruction and keep calling tools
!compactedMidTurn && // in a loop.
settings.enableContextCompaction !== false ...(planModeOnly
) { ? [
compactBeforeNextStep = false; hasToolCall(planningQuestionnaireTool.name),
const inFlightTailMessages = options.messages.slice( hasToolCall(writePlanTool.name),
baseMessageHistoryCount, hasToolCall(exitPlanTool.name),
); ]
const compacted = await maybePerformPendingCompaction({ : []),
showOnTopOfCurrentResponse: true, ],
force: true, abortSignal: abortController.signal,
}); // Inject pending user messages (e.g., images from web_crawl) between steps
// We must re-inject all accumulated messages each step because the AI SDK
// doesn't persist dynamically injected messages in its internal state.
// We track the insertion index so messages appear at the same position each step.
prepareStep: async (options) => {
let stepOptions = options;
if (compacted) { if (
compactedMidTurn = true; !messageOverride &&
// Preserve only messages generated after this compaction boundary. compactBeforeNextStep &&
postMidTurnCompactionStartStep = options.stepNumber; !compactedMidTurn &&
// Clear stale injected messages — their insertAtIndex values are settings.enableContextCompaction !== false
// based on the pre-compaction message array which has been rebuilt ) {
// with a different (typically smaller) count. Keeping them would compactBeforeNextStep = false;
// cause injectMessagesAtPositions to splice at wrong positions. const inFlightTailMessages = options.messages.slice(
allInjectedMessages.length = 0; baseMessageHistoryCount,
const compactedMessageHistory = buildChatMessageHistory(
chat.messages,
{
// Keep the structured in-flight assistant/tool messages from
// the current stream instead of the placeholder DB content.
excludeMessageIds: new Set([placeholderMessageId]),
},
); );
baseMessageHistoryCount = compactedMessageHistory.length; const compacted = await maybePerformPendingCompaction({
stepOptions = { showOnTopOfCurrentResponse: true,
...options, force: true,
// Preserve in-flight turn messages so same-turn tool loops can });
// continue, while later turns are compacted via persisted history.
messages: [...compactedMessageHistory, ...inFlightTailMessages], if (compacted) {
}; compactedMidTurn = true;
} else { // Preserve only messages generated after this compaction boundary.
// Prevent repeated compaction attempts if the first one fails. postMidTurnCompactionStartStep = options.stepNumber;
compactionFailedMidTurn = true; // Clear stale injected messages — their insertAtIndex values are
// based on the pre-compaction message array which has been rebuilt
// with a different (typically smaller) count. Keeping them would
// cause injectMessagesAtPositions to splice at wrong positions.
allInjectedMessages.length = 0;
const compactedMessageHistory = buildChatMessageHistory(
chat.messages,
{
// Keep the structured in-flight assistant/tool messages from
// the current stream instead of the placeholder DB content.
excludeMessageIds: new Set([placeholderMessageId]),
},
);
baseMessageHistoryCount = compactedMessageHistory.length;
stepOptions = {
...options,
// Preserve in-flight turn messages so same-turn tool loops can
// continue, while later turns are compacted via persisted history.
messages: [...compactedMessageHistory, ...inFlightTailMessages],
};
} else {
// Prevent repeated compaction attempts if the first one fails.
compactionFailedMidTurn = true;
}
} }
}
const preparedStep = prepareStepMessages(
stepOptions,
pendingUserMessages,
allInjectedMessages,
);
// prepareStepMessages returns undefined when it has no additional
// injections/cleanups to apply. If we already replaced the base
// message history (e.g., after mid-turn compaction), we still need
// to return the updated options.
if (preparedStep) {
return preparedStep;
}
return stepOptions === options ? undefined : stepOptions;
},
onStepFinish: async (step) => {
if (
settings.enableContextCompaction === false ||
compactedMidTurn ||
typeof step.usage.totalTokens !== "number"
) {
return;
}
const shouldCompact = await checkAndMarkForCompaction(
req.chatId,
step.usage.totalTokens,
);
// If this step triggered tool calls, compact before the next step
// in this same user turn instead of waiting for the next message.
// Only attempt mid-turn compaction once per turn.
if (
shouldCompact &&
step.toolCalls.length > 0 &&
!compactionFailedMidTurn
) {
compactBeforeNextStep = true;
}
},
onFinish: async (response) => {
const totalTokens = response.usage?.totalTokens;
const inputTokens = response.usage?.inputTokens;
const cachedInputTokens = response.usage?.cachedInputTokens;
logger.log(
"Total tokens used:",
totalTokens,
"Input tokens:",
inputTokens,
"Cached input tokens:",
cachedInputTokens,
"Cache hit ratio:",
cachedInputTokens ? (cachedInputTokens ?? 0) / (inputTokens ?? 0) : 0,
);
if (typeof totalTokens === "number") {
await db
.update(messages)
.set({ maxTokensUsed: totalTokens })
.where(eq(messages.id, placeholderMessageId))
.catch((err) => logger.error("Failed to save token count", err));
}
},
onError: (error: any) => {
const errorMessage = error?.error?.message || JSON.stringify(error);
logger.error("Local agent stream error:", errorMessage);
safeSend(event.sender, "chat:response:error", {
chatId: req.chatId,
error: `AI error: ${errorMessage}`,
});
},
});
// Process the stream const preparedStep = prepareStepMessages(
let inThinkingBlock = false; stepOptions,
pendingUserMessages,
for await (const part of streamResult.fullStream) { allInjectedMessages,
if (abortController.signal.aborted) { );
logger.log(`Stream aborted for chat ${req.chatId}`);
// Clean up pending consent requests to prevent stale UI banners
clearPendingConsentsForChat(req.chatId);
break;
}
let chunk = ""; // prepareStepMessages returns undefined when it has no additional
// injections/cleanups to apply. If we already replaced the base
// message history (e.g., after mid-turn compaction), we still need
// to return the updated options.
if (preparedStep) {
return preparedStep;
}
// Handle thinking block transitions return stepOptions === options ? undefined : stepOptions;
if ( },
inThinkingBlock && onStepFinish: async (step) => {
!["reasoning-delta", "reasoning-end", "reasoning-start"].includes( if (
part.type, settings.enableContextCompaction === false ||
) compactedMidTurn ||
) { typeof step.usage.totalTokens !== "number"
chunk = "</think>\n"; ) {
inThinkingBlock = false; return;
} }
switch (part.type) { const shouldCompact = await checkAndMarkForCompaction(
case "text-delta": req.chatId,
chunk += part.text; step.usage.totalTokens,
break; );
case "reasoning-start": // If this step triggered tool calls, compact before the next step
if (!inThinkingBlock) { // in this same user turn instead of waiting for the next message.
chunk = "<think>"; // Only attempt mid-turn compaction once per turn.
inThinkingBlock = true; if (
shouldCompact &&
step.toolCalls.length > 0 &&
!compactionFailedMidTurn
) {
compactBeforeNextStep = true;
} }
break; },
onFinish: async (response) => {
case "reasoning-delta": const totalTokens = response.usage?.totalTokens;
if (!inThinkingBlock) { const inputTokens = response.usage?.inputTokens;
chunk = "<think>"; const cachedInputTokens = response.usage?.cachedInputTokens;
inThinkingBlock = true; logger.log(
"Total tokens used:",
totalTokens,
"Input tokens:",
inputTokens,
"Cached input tokens:",
cachedInputTokens,
"Cache hit ratio:",
cachedInputTokens
? (cachedInputTokens ?? 0) / (inputTokens ?? 0)
: 0,
);
if (typeof totalTokens === "number") {
await db
.update(messages)
.set({ maxTokensUsed: totalTokens })
.where(eq(messages.id, placeholderMessageId))
.catch((err) => logger.error("Failed to save token count", err));
} }
chunk += part.text; },
break; onError: (error: any) => {
const errorMessage = error?.error?.message || JSON.stringify(error);
logger.error("Local agent stream error:", errorMessage);
safeSend(event.sender, "chat:response:error", {
chatId: req.chatId,
error: `AI error: ${errorMessage}`,
});
},
});
// Process the stream
let inThinkingBlock = false;
let passProducedChatText = false;
try {
for await (const part of streamResult.fullStream) {
if (abortController.signal.aborted) {
logger.log(`Stream aborted for chat ${req.chatId}`);
// Clean up pending consent requests to prevent stale UI banners
clearPendingConsentsForChat(req.chatId);
break;
}
let chunk = "";
case "reasoning-end": // Handle thinking block transitions
if (inThinkingBlock) { if (
inThinkingBlock &&
!["reasoning-delta", "reasoning-end", "reasoning-start"].includes(
part.type,
)
) {
chunk = "</think>\n"; chunk = "</think>\n";
inThinkingBlock = false; inThinkingBlock = false;
} }
break;
case "tool-input-start": { switch (part.type) {
// Initialize streaming state for this tool call case "text-delta":
getOrCreateStreamingEntry(part.id, part.toolName); passProducedChatText = true;
break; chunk += part.text;
} break;
case "tool-input-delta": { case "reasoning-start":
// Accumulate args and stream XML preview if (!inThinkingBlock) {
const entry = getOrCreateStreamingEntry(part.id); chunk = "<think>";
if (entry) { inThinkingBlock = true;
entry.argsAccumulated += part.delta;
const toolDef = findToolDefinition(entry.toolName);
if (toolDef?.buildXml) {
const argsPartial = parsePartialJson(entry.argsAccumulated);
const xml = toolDef.buildXml(argsPartial, false);
if (xml) {
ctx.onXmlStream(xml);
} }
break;
case "reasoning-delta":
if (!inThinkingBlock) {
chunk = "<think>";
inThinkingBlock = true;
}
chunk += part.text;
break;
case "reasoning-end":
if (inThinkingBlock) {
chunk = "</think>\n";
inThinkingBlock = false;
}
break;
case "tool-input-start": {
// Initialize streaming state for this tool call
getOrCreateStreamingEntry(part.id, part.toolName);
break;
} }
}
break;
}
case "tool-input-end": { case "tool-input-delta": {
// Build final XML and persist // Accumulate args and stream XML preview
const entry = getOrCreateStreamingEntry(part.id); const entry = getOrCreateStreamingEntry(part.id);
if (entry) { if (entry) {
const toolDef = findToolDefinition(entry.toolName); entry.argsAccumulated += part.delta;
if (toolDef?.buildXml) { const toolDef = findToolDefinition(entry.toolName);
const argsPartial = parsePartialJson(entry.argsAccumulated); if (toolDef?.buildXml) {
const xml = toolDef.buildXml(argsPartial, true); const argsPartial = parsePartialJson(entry.argsAccumulated);
if (xml) { const xml = toolDef.buildXml(argsPartial, false);
ctx.onXmlComplete(xml); if (xml) {
ctx.onXmlStream(xml);
}
}
} }
break;
} }
case "tool-input-end": {
// Build final XML and persist
const entry = getOrCreateStreamingEntry(part.id);
if (entry) {
const toolDef = findToolDefinition(entry.toolName);
if (toolDef?.buildXml) {
const argsPartial = parsePartialJson(entry.argsAccumulated);
const xml = toolDef.buildXml(argsPartial, true);
if (xml) {
ctx.onXmlComplete(xml);
}
}
}
cleanupStreamingEntry(part.id);
break;
}
case "tool-call":
// Tool execution happens via execute callbacks
break;
case "tool-result":
// Tool results are already handled by the execute callback
break;
}
if (chunk) {
fullResponse += chunk;
await updateResponseInDb(placeholderMessageId, fullResponse);
sendResponseChunk(
event,
req.chatId,
chat,
fullResponse,
placeholderMessageId,
hiddenMessageIdsForStreaming,
);
} }
cleanupStreamingEntry(part.id);
break;
} }
} catch (error) {
if (!abortController.signal.aborted) {
throw error;
}
logger.log(`Stream interrupted after abort for chat ${req.chatId}`);
}
case "tool-call": // Close thinking block if still open
// Tool execution happens via execute callbacks if (inThinkingBlock) {
break; fullResponse += "</think>\n";
await updateResponseInDb(placeholderMessageId, fullResponse);
}
case "tool-result": // Get response messages for this pass
// Tool results are already handled by the execute callback let responseMessages: ModelMessage[] = [];
break; let steps: Awaited<typeof streamResult.steps> = [];
try {
const response = await streamResult.response;
steps = await streamResult.steps;
responseMessages = response.messages;
} catch (err) {
logger.warn("Failed to retrieve stream response messages:", err);
} }
if (chunk) { if (responseMessages.length > 0) {
fullResponse += chunk; // For mid-turn compaction, slice off pre-compaction messages
await updateResponseInDb(placeholderMessageId, fullResponse); const messagesToAccumulate =
sendResponseChunk( compactedMidTurn && postMidTurnCompactionStartStep !== null
event, ? (() => {
req.chatId, // stepNumber is 0-indexed (from AI SDK: stepNumber = steps.length).
chat, // We want the step just before compaction to determine how many
fullResponse, // response messages to skip (they belong to pre-compaction context).
placeholderMessageId, const prevStepMessages =
hiddenMessageIdsForStreaming, steps[postMidTurnCompactionStartStep - 1]?.response.messages;
); if (!prevStepMessages) {
logger.warn(
`No step data found at index ${postMidTurnCompactionStartStep - 1} for mid-turn compaction slicing; persisting all messages`,
);
}
return responseMessages.slice(prevStepMessages?.length ?? 0);
})()
: responseMessages;
accumulatedAiMessages.push(...messagesToAccumulate);
currentMessageHistory = [
...currentMessageHistory,
...messagesToAccumulate,
];
} }
// Check if the model ended with text only (no tool calls in the final step).
// This is more reliable than passProducedChatText which is set on any text-delta
// during the stream (including preambles before tool calls).
const lastStep = steps.length > 0 ? steps[steps.length - 1] : null;
const passEndedWithText =
passProducedChatText && (!lastStep || lastStep.toolCalls.length === 0);
if (
!shouldRunTodoFollowUpPass({
readOnly,
planModeOnly,
passEndedWithText,
todos: ctx.todos,
todoFollowUpLoops,
maxTodoFollowUpLoops,
})
) {
break;
}
todoFollowUpLoops += 1;
const reminderText = buildTodoReminderMessage(ctx.todos);
const reminderMessage: ModelMessage = {
role: "user",
content: [{ type: "text", text: reminderText }],
};
currentMessageHistory = [...currentMessageHistory, reminderMessage];
// Note: Do NOT push reminderMessage to accumulatedAiMessages.
// It is a synthetic message that should not be persisted to aiMessagesJson,
// as it would pollute future conversation history with stale todo state.
logger.info(
`Starting todo follow-up pass ${todoFollowUpLoops}/${maxTodoFollowUpLoops} for chat ${req.chatId}`,
);
} }
// Close thinking block if still open // Handle cancellation paths where stream processing exits cleanly after abort.
if (inThinkingBlock) { if (abortController.signal.aborted) {
fullResponse += "</think>\n"; if (fullResponse) {
await updateResponseInDb(placeholderMessageId, fullResponse); await db
.update(messages)
.set({ content: `${fullResponse}\n\n[Response cancelled by user]` })
.where(eq(messages.id, placeholderMessageId));
}
return false; // Cancelled - don't consume quota
} }
// Save the AI SDK messages for multi-turn tool call preservation // Save the AI SDK messages for multi-turn tool call preservation
try { try {
const response = await streamResult.response;
const steps = await streamResult.steps;
const aiMessagesForPersistence =
compactedMidTurn && postMidTurnCompactionStartStep !== null
? (() => {
// stepNumber is 0-indexed (from AI SDK: stepNumber = steps.length).
// We want the step just before compaction to determine how many
// response messages to skip (they belong to pre-compaction context).
const prevStepMessages =
steps[postMidTurnCompactionStartStep - 1]?.response.messages;
if (!prevStepMessages) {
logger.warn(
`No step data found at index ${postMidTurnCompactionStartStep - 1} for mid-turn compaction slicing; persisting all messages`,
);
}
return response.messages.slice(prevStepMessages?.length ?? 0);
})()
: response.messages;
const aiMessagesJson = getAiMessagesJsonIfWithinLimit( const aiMessagesJson = getAiMessagesJsonIfWithinLimit(
aiMessagesForPersistence, accumulatedAiMessages,
); );
if (aiMessagesJson) { if (aiMessagesJson) {
await db await db
...@@ -922,6 +1018,31 @@ function sendResponseChunk( ...@@ -922,6 +1018,31 @@ function sendResponseChunk(
}); });
} }
function shouldRunTodoFollowUpPass(params: {
readOnly: boolean;
planModeOnly: boolean;
passEndedWithText: boolean;
todos: AgentContext["todos"];
todoFollowUpLoops: number;
maxTodoFollowUpLoops: number;
}): boolean {
const {
readOnly,
planModeOnly,
passEndedWithText,
todos,
todoFollowUpLoops,
maxTodoFollowUpLoops,
} = params;
return (
!readOnly &&
!planModeOnly &&
passEndedWithText &&
hasIncompleteTodos(todos) &&
todoFollowUpLoops < maxTodoFollowUpLoops
);
}
async function getMcpTools( async function getMcpTools(
event: IpcMainInvokeEvent, event: IpcMainInvokeEvent,
ctx: AgentContext, ctx: AgentContext,
......
...@@ -6,9 +6,38 @@ ...@@ -6,9 +6,38 @@
*/ */
import { ImagePart, ModelMessage, TextPart, UserModelMessage } from "ai"; import { ImagePart, ModelMessage, TextPart, UserModelMessage } from "ai";
import type { UserMessageContentPart } from "./tools/types"; import type { UserMessageContentPart, Todo } from "./tools/types";
import { cleanMessageForOpenAI } from "@/ipc/utils/ai_messages_utils"; import { cleanMessageForOpenAI } from "@/ipc/utils/ai_messages_utils";
/**
* Check if a single todo is incomplete (pending or in_progress).
*/
const isIncompleteTodo = (todo: Todo): boolean =>
todo.status === "pending" || todo.status === "in_progress";
/**
* Check if there are incomplete todos (pending or in_progress).
*/
export function hasIncompleteTodos(todos: Todo[]): boolean {
return todos.some(isIncompleteTodo);
}
/**
* Build a reminder message for incomplete todos.
*/
export function buildTodoReminderMessage(todos: Todo[]): string {
const incompleteTodos = todos.filter(isIncompleteTodo);
const todoList = incompleteTodos
.map((t) => `- [${t.status}] ${t.content}`)
.join("\n");
// Note: The "incomplete todo(s)" substring is used as a detection marker by test
// infrastructure in testing/fake-llm-server/ (chatCompletionHandler.ts and
// localAgentHandler.ts). Update those files if this text changes.
return `You have ${incompleteTodos.length} incomplete todo(s). Please continue and complete them:\n\n${todoList}`;
}
/** /**
* A message that has been processed and is ready to inject. * A message that has been processed and is ready to inject.
*/ */
......
...@@ -28,38 +28,56 @@ export const createChatCompletionHandler = ...@@ -28,38 +28,56 @@ export const createChatCompletionHandler =
} }
// Check for local-agent fixture requests (tc=local-agent/*) // Check for local-agent fixture requests (tc=local-agent/*)
// This needs to be checked on the first user message, not the last (which might be tool results) // We need to check ALL user messages, not just the last one, because
const lastUserMessage = messages // outer loop follow-up requests inject a todo reminder as the last user message.
.slice() // The fixture trigger (tc=local-agent/...) will be in an earlier user message.
.reverse() const userMessages = messages.filter((m: any) => m.role === "user");
.find((m: any) => m.role === "user");
// Helper to extract text content from a message (handles both string and array content)
// Extract text content from last user message (handles both string and array content) const getTextContent = (msg: any): string => {
let userTextContent = ""; if (typeof msg.content === "string") {
if (lastUserMessage) { return msg.content;
if (typeof lastUserMessage.content === "string") { } else if (Array.isArray(msg.content)) {
userTextContent = lastUserMessage.content; const textPart = msg.content.find((p: any) => p.type === "text");
} else if (Array.isArray(lastUserMessage.content)) { return textPart ? textPart.text : "";
const textPart = lastUserMessage.content.find( }
(p: any) => p.type === "text", return "";
); };
if (textPart) {
userTextContent = textPart.text; // Get the last user message's text content for other checks
const lastUserMessage = userMessages[userMessages.length - 1];
const userTextContent = lastUserMessage
? getTextContent(lastUserMessage)
: "";
// First, check if the LAST user message is a fixture trigger
let localAgentFixture = extractLocalAgentFixture(userTextContent);
// If last message isn't a fixture but contains a todo reminder, search earlier messages
// This handles the outer loop case where a reminder is injected after the original fixture trigger
// Note: This magic string must match the reminder text in prepare_step_utils.ts
// buildTodoReminderMessage(). Update both if the text changes.
if (!localAgentFixture && userTextContent.includes("incomplete todo(s)")) {
for (const msg of userMessages) {
const textContent = getTextContent(msg);
const fixture = extractLocalAgentFixture(textContent);
if (fixture) {
localAgentFixture = fixture;
break; // Use the first (original) fixture trigger found
} }
} }
}
const localAgentFixture = extractLocalAgentFixture(userTextContent); console.error(
console.error( `[local-agent] Checking message: "${userTextContent.slice(0, 50)}", fixture: ${localAgentFixture}`,
`[local-agent] Checking message: "${userTextContent.slice(0, 50)}", fixture: ${localAgentFixture}`, );
); if (localAgentFixture) {
if (localAgentFixture) { return handleLocalAgentFixture(req, res, localAgentFixture);
return handleLocalAgentFixture(req, res, localAgentFixture); }
}
// Route plan acceptance message to exit-plan fixture // Route plan acceptance message to exit-plan fixture
if (userTextContent.includes("I accept this plan")) { if (userTextContent.includes("I accept this plan")) {
return handleLocalAgentFixture(req, res, "exit-plan"); return handleLocalAgentFixture(req, res, "exit-plan");
}
} }
let messageContent = CANNED_MESSAGE; let messageContent = CANNED_MESSAGE;
......
...@@ -37,6 +37,30 @@ function getSessionId(messages: any[]): string { ...@@ -37,6 +37,30 @@ function getSessionId(messages: any[]): string {
.digest("hex"); .digest("hex");
} }
/**
* Check if a message content contains a todo reminder pattern.
* The todo reminder is injected by the outer loop when there are incomplete todos.
*/
function isTodoReminderMessage(msg: any): boolean {
if (msg?.role !== "user") return false;
const content = Array.isArray(msg.content)
? msg.content.find((p: any) => p.type === "text")?.text
: typeof msg.content === "string"
? msg.content
: null;
// Note: This magic string must match the reminder text in prepare_step_utils.ts
// buildTodoReminderMessage(). Update both if the text changes.
return content?.includes("incomplete todo(s)") ?? false;
}
/**
* Count the number of todo reminder messages in the conversation.
* This determines which outer loop pass we're on.
*/
function countTodoReminderMessages(messages: any[]): number {
return messages.filter(isTodoReminderMessage).length;
}
/** /**
* Count the number of tool result messages AFTER the last user message * Count the number of tool result messages AFTER the last user message
* to determine which turn we're on for the current fixture. * to determine which turn we're on for the current fixture.
...@@ -99,9 +123,9 @@ async function loadFixture(fixtureName: string): Promise<LocalAgentFixture> { ...@@ -99,9 +123,9 @@ async function loadFixture(fixtureName: string): Promise<LocalAgentFixture> {
const module = require(fixturePath); const module = require(fixturePath);
const fixture = module.fixture as LocalAgentFixture; const fixture = module.fixture as LocalAgentFixture;
if (!fixture || !fixture.turns) { if (!fixture || (!fixture.turns && !fixture.passes)) {
throw new Error( throw new Error(
`Invalid fixture: missing 'fixture' export or 'turns' array`, `Invalid fixture: missing 'fixture' export or 'turns'/'passes' array`,
); );
} }
...@@ -113,6 +137,30 @@ async function loadFixture(fixtureName: string): Promise<LocalAgentFixture> { ...@@ -113,6 +137,30 @@ async function loadFixture(fixtureName: string): Promise<LocalAgentFixture> {
} }
} }
/**
* Get the turns for the current pass from a fixture.
* Supports both simple fixtures (with `turns`) and multi-pass fixtures (with `passes`).
*/
function getTurnsForPass(
fixture: LocalAgentFixture,
passIndex: number,
): Turn[] {
// If fixture uses passes, get the appropriate pass
if (fixture.passes && fixture.passes.length > 0) {
if (passIndex >= fixture.passes.length) {
// All passes exhausted
return [];
}
return fixture.passes[passIndex].turns;
}
// Simple fixture with turns - only valid for pass 0
if (passIndex > 0) {
return [];
}
return fixture.turns || [];
}
/** /**
* Create a streaming chunk in OpenAI format * Create a streaming chunk in OpenAI format
*/ */
...@@ -292,26 +340,37 @@ export async function handleLocalAgentFixture( ...@@ -292,26 +340,37 @@ export async function handleLocalAgentFixture(
const fixture = await loadFixture(fixtureName); const fixture = await loadFixture(fixtureName);
const sessionId = getSessionId(messages); const sessionId = getSessionId(messages);
// Determine which turn we're on based on tool result rounds // Determine which outer loop pass we're on based on todo reminder messages
const passIndex = countTodoReminderMessages(messages);
// Determine which turn we're on within the current pass
const toolResultRounds = countToolResultRounds(messages); const toolResultRounds = countToolResultRounds(messages);
const turnIndex = toolResultRounds; const turnIndex = toolResultRounds;
// Get the turns for the current pass
const turns = getTurnsForPass(fixture, passIndex);
console.error( console.error(
`[local-agent] Loaded fixture: ${fixtureName}, Session: ${sessionId}, Turn: ${turnIndex}, Tool rounds: ${toolResultRounds}`, `[local-agent] Loaded fixture: ${fixtureName}, Session: ${sessionId}, Pass: ${passIndex}, Turn: ${turnIndex}, Tool rounds: ${toolResultRounds}`,
); );
if (turnIndex >= fixture.turns.length) { if (turnIndex >= turns.length) {
// All turns exhausted, send a simple completion message // All turns exhausted for this pass, send a simple completion message
console.log(`[local-agent] All turns exhausted, sending completion`); console.log(
`[local-agent] All turns exhausted for pass ${passIndex}, sending completion`,
);
await streamTextResponse(res, "Task completed."); await streamTextResponse(res, "Task completed.");
return; return;
} }
const turn = fixture.turns[turnIndex]; const turn = turns[turnIndex];
console.log(`[local-agent] Executing turn ${turnIndex}:`, { console.log(
hasText: !!turn.text, `[local-agent] Executing pass ${passIndex}, turn ${turnIndex}:`,
toolCallCount: turn.toolCalls?.length ?? 0, {
}); hasText: !!turn.text,
toolCallCount: turn.toolCalls?.length ?? 0,
},
);
// If this turn has tool calls, stream them // If this turn has tool calls, stream them
if (turn.toolCalls && turn.toolCalls.length > 0) { if (turn.toolCalls && turn.toolCalls.length > 0) {
......
...@@ -24,9 +24,27 @@ export type Turn = { ...@@ -24,9 +24,27 @@ export type Turn = {
}; };
}; };
/**
* Represents a single outer loop pass.
* The outer loop runs when todos are incomplete after a chat response.
*/
export type Pass = {
/** Ordered turns within this pass */
turns: Turn[];
};
export type LocalAgentFixture = { export type LocalAgentFixture = {
/** Description for debugging */ /** Description for debugging */
description?: string; description?: string;
/** Ordered turns in the conversation */ /**
turns: Turn[]; * Ordered turns in the conversation.
* For simple fixtures without outer loop testing.
*/
turns?: Turn[];
/**
* Ordered passes for testing outer loop behavior.
* Each pass contains turns that execute within that outer loop iteration.
* Use this when testing todo follow-up loop behavior.
*/
passes?: Pass[];
}; };
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论