Unverified 提交 376a59c6 authored 作者: wwwillchen-bot's avatar wwwillchen-bot 提交者: GitHub

perf: send incremental streaming updates via IPC instead of full messages array (#2988)

## Summary - During AI streaming, the full messages array (potentially 500KB+ with many messages) was serialized and sent through IPC on every text-delta chunk, causing UI sluggishness - Changed to send only the streaming message ID and updated content for high-frequency streaming updates - Full messages are still sent for initial loads and state changes (compaction, lazy edits) to maintain correctness - Updated all consumers (`useStreamChat`, `usePlanImplementation`, `useResolveMergeConflictsWithAI`) to handle both full and incremental chunk modes ## Test plan - [x] TypeScript compilation passes - [x] All 897 unit tests pass (39 files) - [x] E2E tests pass (234 passed — 20 pre-existing failures unrelated to this change, confirmed by running on clean main) - [x] Specifically tested: `new_chat`, `chat_input`, `local_agent_advanced`, `local_agent_code_search`, `local_agent_grep`, `local_agent_search_replace`, `local_agent_list_files`, `local_agent_summarize`, `local_agent_connection_retry`, `local_agent_read_logs`, `local_agent_consent`, `local_agent_persistent_todos`, `local_agent_todo_followup`, `concurrent_chat`, `chat_history` 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- devin-review-badge-begin --> --- <a href="https://app.devin.ai/review/dyad-sh/dyad/pull/2988" target="_blank"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1"> <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open with Devin"> </picture> </a> <!-- devin-review-badge-end --> --------- Co-authored-by: 's avatarWill Chen <willchen90@gmail.com> Co-authored-by: 's avatarClaude Opus 4.6 <noreply@anthropic.com>
上级 d7d4e84f
...@@ -100,6 +100,17 @@ queryClient.invalidateQueries({ queryKey: queryKeys.apps.all }); ...@@ -100,6 +100,17 @@ queryClient.invalidateQueries({ queryKey: queryKeys.apps.all });
**Adding new keys:** Add entries to the appropriate domain in `queryKeys.ts`. Follow the existing pattern with `all` for the base key and factory functions using object parameters for parameterized keys. **Adding new keys:** Add entries to the appropriate domain in `queryKeys.ts`. Follow the existing pattern with `all` for the base key and factory functions using object parameters for parameterized keys.
## Streaming chunk optimizations
The `chat:response:chunk` event supports two modes:
1. **Full update**`messages` field contains the complete messages array. Used for initial message load, post-compaction refresh, and lazy-edit completions.
2. **Incremental update**`streamingMessageId` + `streamingContent` fields update only the actively streaming message's content. Used for high-frequency text-delta streaming to avoid serializing the full messages array on every chunk.
When modifying `ChatResponseChunkSchema` or adding new `safeSend("chat:response:chunk", ...)` call sites, decide which mode is appropriate. All frontend consumers (`useStreamChat`, `usePlanImplementation`, `useResolveMergeConflictsWithAI`) must handle both modes.
**Zod schema contract changes:** Making a field optional (e.g., `messages``messages.optional()`) causes TypeScript errors in all consumers that assume the field is always present. Search for all destructuring/usage sites and add guards before committing.
## React + IPC integration pattern ## React + IPC integration pattern
When creating hooks/components that call IPC handlers: When creating hooks/components that call IPC handlers:
......
...@@ -98,14 +98,39 @@ export function usePlanImplementation() { ...@@ -98,14 +98,39 @@ export function usePlanImplementation() {
selectedComponents: [], selectedComponents: [],
}, },
{ {
onChunk: ({ messages: updatedMessages }) => { onChunk: ({
messages: updatedMessages,
streamingMessageId,
streamingContent,
}) => {
if (!isMountedRef.current) return; if (!isMountedRef.current) return;
// Update the messages so the UI shows the streaming response
if (updatedMessages) {
// Full messages update (initial load, post-compaction, etc.)
setMessagesById((prev) => { setMessagesById((prev) => {
const next = new Map(prev); const next = new Map(prev);
next.set(chatId, updatedMessages); next.set(chatId, updatedMessages);
return next; return next;
}); });
} else if (
streamingMessageId !== undefined &&
streamingContent !== undefined
) {
// Incremental update: only update the streaming message's content
setMessagesById((prev) => {
const existingMessages = prev.get(chatId);
if (!existingMessages) return prev;
const next = new Map(prev);
const updated = existingMessages.map((msg) =>
msg.id === streamingMessageId
? { ...msg, content: streamingContent }
: msg,
);
next.set(chatId, updated);
return next;
});
}
}, },
onEnd: () => { onEnd: () => {
if (!isMountedRef.current) return; if (!isMountedRef.current) return;
......
...@@ -97,7 +97,7 @@ For each file, review the conflict markers (<<<<<<<, =======, >>>>>>>) and choos ...@@ -97,7 +97,7 @@ For each file, review the conflict markers (<<<<<<<, =======, >>>>>>>) and choos
prompt, prompt,
}, },
{ {
onChunk: ({ messages }) => { onChunk: ({ messages, streamingMessageId, streamingContent }) => {
if (!hasIncrementedStreamCount) { if (!hasIncrementedStreamCount) {
setStreamCountById((prev) => { setStreamCountById((prev) => {
const next = new Map(prev); const next = new Map(prev);
...@@ -106,11 +106,33 @@ For each file, review the conflict markers (<<<<<<<, =======, >>>>>>>) and choos ...@@ -106,11 +106,33 @@ For each file, review the conflict markers (<<<<<<<, =======, >>>>>>>) and choos
}); });
hasIncrementedStreamCount = true; hasIncrementedStreamCount = true;
} }
if (messages) {
// Full messages update (initial load, post-compaction, etc.)
setMessagesById((prev) => { setMessagesById((prev) => {
const next = new Map(prev); const next = new Map(prev);
next.set(newChatId, messages); next.set(newChatId, messages);
return next; return next;
}); });
} else if (
streamingMessageId !== undefined &&
streamingContent !== undefined
) {
// Incremental update: only update the streaming message's content
setMessagesById((prev) => {
const existingMessages = prev.get(newChatId);
if (!existingMessages) return prev;
const next = new Map(prev);
const updated = existingMessages.map((msg) =>
msg.id === streamingMessageId
? { ...msg, content: streamingContent }
: msg,
);
next.set(newChatId, updated);
return next;
});
}
}, },
onEnd: () => { onEnd: () => {
setIsStreamingById((prev) => { setIsStreamingById((prev) => {
......
...@@ -172,7 +172,11 @@ export function useStreamChat({ ...@@ -172,7 +172,11 @@ export function useStreamChat({
selectedComponents: selectedComponents ?? [], selectedComponents: selectedComponents ?? [],
}, },
{ {
onChunk: ({ messages: updatedMessages }) => { onChunk: ({
messages: updatedMessages,
streamingMessageId,
streamingContent,
}) => {
if (!hasIncrementedStreamCount) { if (!hasIncrementedStreamCount) {
setStreamCountById((prev) => { setStreamCountById((prev) => {
const next = new Map(prev); const next = new Map(prev);
...@@ -182,11 +186,32 @@ export function useStreamChat({ ...@@ -182,11 +186,32 @@ export function useStreamChat({
hasIncrementedStreamCount = true; hasIncrementedStreamCount = true;
} }
if (updatedMessages) {
// Full messages update (initial load, post-compaction, etc.)
setMessagesById((prev) => { setMessagesById((prev) => {
const next = new Map(prev); const next = new Map(prev);
next.set(chatId, updatedMessages); next.set(chatId, updatedMessages);
return next; return next;
}); });
} else if (
streamingMessageId !== undefined &&
streamingContent !== undefined
) {
// Incremental update: only update the streaming message's content
setMessagesById((prev) => {
const existingMessages = prev.get(chatId);
if (!existingMessages) return prev;
const next = new Map(prev);
const updated = existingMessages.map((msg) =>
msg.id === streamingMessageId
? { ...msg, content: streamingContent }
: msg,
);
next.set(chatId, updated);
return next;
});
}
}, },
onEnd: (response: ChatResponseEnd) => { onEnd: (response: ChatResponseEnd) => {
// Remove from pending set now that stream is complete // Remove from pending set now that stream is complete
......
...@@ -1081,19 +1081,12 @@ This conversation includes one or more image attachments. When the user uploads ...@@ -1081,19 +1081,12 @@ This conversation includes one or more image attachments. When the user uploads
lastDbSaveAt = now; lastDbSaveAt = now;
} }
// Update the placeholder assistant message content in the messages array // Send incremental update with only the streaming message content
const currentMessages = [...updatedChat.messages]; // instead of the full messages array to reduce IPC overhead
if (
currentMessages.length > 0 &&
currentMessages[currentMessages.length - 1].role === "assistant"
) {
currentMessages[currentMessages.length - 1].content = fullResponse;
}
// Update the assistant message in the database
safeSend(event.sender, "chat:response:chunk", { safeSend(event.sender, "chat:response:chunk", {
chatId: req.chatId, chatId: req.chatId,
messages: currentMessages, streamingMessageId: placeholderAssistantMessage.id,
streamingContent: fullResponse,
}); });
return fullResponse; return fullResponse;
}; };
......
...@@ -92,10 +92,18 @@ export type ChatStreamParams = z.infer<typeof ChatStreamParamsSchema>; ...@@ -92,10 +92,18 @@ export type ChatStreamParams = z.infer<typeof ChatStreamParamsSchema>;
/** /**
* Schema for chat response chunk event. * Schema for chat response chunk event.
*
* Supports two modes:
* 1. Full update: `messages` is set with the complete messages array
* 2. Incremental update: `streamingMessageId` + `streamingContent` are set
* to update only the content of a single message being streamed.
* This avoids serializing the entire messages array on every text delta.
*/ */
export const ChatResponseChunkSchema = z.object({ export const ChatResponseChunkSchema = z.object({
chatId: z.number(), chatId: z.number(),
messages: z.array(MessageSchema), messages: z.array(MessageSchema).optional(),
streamingMessageId: z.number().optional(),
streamingContent: z.string().optional(),
}); });
/** /**
......
...@@ -375,6 +375,7 @@ export async function handleLocalAgentStream( ...@@ -375,6 +375,7 @@ export async function handleLocalAgentStream(
previewContent, previewContent,
placeholderMessageId, placeholderMessageId,
hiddenMessageIdsForStreaming, hiddenMessageIdsForStreaming,
true, // Full messages: compaction changes message list
); );
}, },
{ {
...@@ -425,6 +426,7 @@ export async function handleLocalAgentStream( ...@@ -425,6 +426,7 @@ export async function handleLocalAgentStream(
fullResponse + streamingPreview, fullResponse + streamingPreview,
placeholderMessageId, placeholderMessageId,
hiddenMessageIdsForStreaming, hiddenMessageIdsForStreaming,
true, // Full messages: post-compaction refresh
); );
} }
...@@ -1550,13 +1552,13 @@ function sendResponseChunk( ...@@ -1550,13 +1552,13 @@ function sendResponseChunk(
fullResponse: string, fullResponse: string,
placeholderMessageId: number, placeholderMessageId: number,
hiddenMessageIds?: Set<number>, hiddenMessageIds?: Set<number>,
/** When true, sends the full messages array instead of an incremental update */
sendFullMessages?: boolean,
) { ) {
if (sendFullMessages) {
const currentMessages = [...chat.messages].filter( const currentMessages = [...chat.messages].filter(
(message) => !hiddenMessageIds?.has(message.id), (message) => !hiddenMessageIds?.has(message.id),
); );
// Find the placeholder message by ID rather than assuming it's the last
// assistant message. After compaction, a compaction summary message may
// exist after the placeholder and we must not overwrite it.
const placeholderMsg = currentMessages.find( const placeholderMsg = currentMessages.find(
(m) => m.id === placeholderMessageId, (m) => m.id === placeholderMessageId,
); );
...@@ -1567,6 +1569,15 @@ function sendResponseChunk( ...@@ -1567,6 +1569,15 @@ function sendResponseChunk(
chatId, chatId,
messages: currentMessages, messages: currentMessages,
}); });
} else {
// Send incremental update with only the streaming message content
// to reduce IPC overhead during high-frequency streaming
safeSend(event.sender, "chat:response:chunk", {
chatId,
streamingMessageId: placeholderMessageId,
streamingContent: fullResponse,
});
}
} }
function getPlanningQuestionnaireErrorFromStep(step: { function getPlanningQuestionnaireErrorFromStep(step: {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论