Unverified 提交 9205717e authored 作者: Mohamed Aziz Mejri's avatar Mohamed Aziz Mejri 提交者: GitHub

Fix: firing queued messages when in other chat (#2931)

closes #2905 <!-- devin-review-badge-begin --> --- <a href="https://app.devin.ai/review/dyad-sh/dyad/pull/2931" target="_blank"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1"> <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open with Devin"> </picture> </a> <!-- devin-review-badge-end -->
上级 87beccf7
...@@ -109,4 +109,44 @@ test.describe("queued messages", () => { ...@@ -109,4 +109,44 @@ test.describe("queued messages", () => {
// "tc=second" was deleted, so it should NOT appear // "tc=second" was deleted, so it should NOT appear
await expect(messagesList.getByText("tc=second")).not.toBeVisible(); await expect(messagesList.getByText("tc=second")).not.toBeVisible();
}); });
test("fires queued message while on another page", async ({ po }) => {
// Send a message with a medium sleep to simulate a slow response
await po.sendPrompt("tc=1 [sleep=medium]", {
skipWaitForCompletion: true,
});
// Wait for chat input to appear (indicates we're in chat view and streaming)
await expect(chatInput).toBeVisible();
// While streaming, queue a second message
await chatInput.fill("tc=2");
await chatInput.press("Enter");
// Verify the queued message indicator is visible
await expect(
po.page.getByText(/\d+ Queued.*will send after current response/),
).toBeVisible();
// Navigate away from the chat page while streaming + queue are active
await po.sleep(1_000);
await po.navigation.goToAppsTab();
// Wait for the in-progress indicator to disappear, meaning both the
// first stream and the queued message have completed in the background
await expect(
po.page.locator('[aria-label="Chat in progress"]'),
).not.toBeVisible({ timeout: 30_000 });
// Navigate back to the chat to verify both messages were sent
const chatTab = po.page
.locator("button")
.filter({ hasText: /Chat/ })
.first();
await chatTab.click();
const messagesList = po.page.locator('[data-testid="messages-list"]');
await expect(messagesList.getByText("tc=1 [sleep=medium]")).toBeVisible();
await expect(messagesList.getByText("tc=2")).toBeVisible();
});
}); });
...@@ -18,6 +18,7 @@ import { selectedComponentsPreviewAtom } from "@/atoms/previewAtoms"; ...@@ -18,6 +18,7 @@ import { selectedComponentsPreviewAtom } from "@/atoms/previewAtoms";
import { chatInputValueAtom } from "@/atoms/chatAtoms"; import { chatInputValueAtom } from "@/atoms/chatAtoms";
import { usePlanEvents } from "@/hooks/usePlanEvents"; import { usePlanEvents } from "@/hooks/usePlanEvents";
import { useZoomShortcuts } from "@/hooks/useZoomShortcuts"; import { useZoomShortcuts } from "@/hooks/useZoomShortcuts";
import { useQueueProcessor } from "@/hooks/useQueueProcessor";
import i18n from "@/i18n"; import i18n from "@/i18n";
import { LanguageSchema } from "@/lib/schemas"; import { LanguageSchema } from "@/lib/schemas";
...@@ -40,6 +41,9 @@ export default function RootLayout({ children }: { children: ReactNode }) { ...@@ -40,6 +41,9 @@ export default function RootLayout({ children }: { children: ReactNode }) {
// Zoom keyboard shortcuts (Ctrl/Cmd + =/- /0) // Zoom keyboard shortcuts (Ctrl/Cmd + =/- /0)
useZoomShortcuts(); useZoomShortcuts();
// Process queued messages globally (even when not on chat page)
useQueueProcessor();
useEffect(() => { useEffect(() => {
const zoomLevel = settings?.zoomLevel ?? DEFAULT_ZOOM_LEVEL; const zoomLevel = settings?.zoomLevel ?? DEFAULT_ZOOM_LEVEL;
const zoomFactor = Number(zoomLevel) / 100; const zoomFactor = Number(zoomLevel) / 100;
......
...@@ -120,7 +120,7 @@ export function ChatInput({ chatId }: { chatId?: number }) { ...@@ -120,7 +120,7 @@ export function ChatInput({ chatId }: { chatId?: number }) {
removeQueuedMessage, removeQueuedMessage,
reorderQueuedMessages, reorderQueuedMessages,
clearAllQueuedMessages, clearAllQueuedMessages,
} = useStreamChat({ shouldProcessQueue: true }); } = useStreamChat();
const [showError, setShowError] = useState(true); const [showError, setShowError] = useState(true);
const [isApproving, setIsApproving] = useState(false); // State for approving const [isApproving, setIsApproving] = useState(false); // State for approving
const [isRejecting, setIsRejecting] = useState(false); // State for rejecting const [isRejecting, setIsRejecting] = useState(false); // State for rejecting
......
import { useEffect } from "react";
import { useAtom } from "jotai";
import {
queuedMessagesByIdAtom,
streamCompletedSuccessfullyByIdAtom,
type QueuedMessageItem,
} from "@/atoms/chatAtoms";
import { useStreamChat } from "./useStreamChat";
import { usePostHog } from "posthog-js/react";
import { useSettings } from "./useSettings";
/**
* Root-level hook that processes queued messages for any chat,
* even when the user is not on the chat page.
*/
export function useQueueProcessor() {
const { streamMessage } = useStreamChat({ hasChatId: false });
const [queuedMessagesById, setQueuedMessagesById] = useAtom(
queuedMessagesByIdAtom,
);
const [streamCompletedSuccessfullyById, setStreamCompletedSuccessfullyById] =
useAtom(streamCompletedSuccessfullyByIdAtom);
const posthog = usePostHog();
const { settings } = useSettings();
useEffect(() => {
// Find any chatId that has both completed successfully and has queued messages
for (const [chatId, queuedMessages] of queuedMessagesById) {
if (queuedMessages.length === 0) continue;
const completedSuccessfully =
streamCompletedSuccessfullyById.get(chatId) ?? false;
if (!completedSuccessfully) continue;
// Clear the successful completion flag first to prevent loops
setStreamCompletedSuccessfullyById((prev) => {
const next = new Map(prev);
next.set(chatId, false);
return next;
});
// Get and remove the first message atomically
let messageToSend: QueuedMessageItem | undefined;
setQueuedMessagesById((prev) => {
const next = new Map(prev);
const current = prev.get(chatId) ?? [];
const [first, ...remainingMessages] = current;
messageToSend = first;
if (remainingMessages.length > 0) {
next.set(chatId, remainingMessages);
} else {
next.delete(chatId);
}
return next;
});
if (!messageToSend) return;
posthog.capture("chat:submit", {
chatMode: settings?.selectedChatMode,
});
streamMessage({
prompt: messageToSend.prompt,
chatId,
redo: false,
attachments: messageToSend.attachments,
selectedComponents: messageToSend.selectedComponents,
});
// Only process one chatId per effect run
break;
}
}, [
queuedMessagesById,
streamCompletedSuccessfullyById,
streamMessage,
setQueuedMessagesById,
setStreamCompletedSuccessfullyById,
posthog,
settings?.selectedChatMode,
]);
}
import { useCallback, useEffect } from "react"; import { useCallback } from "react";
import type { import type {
ComponentSelection, ComponentSelection,
FileAttachment, FileAttachment,
...@@ -44,8 +44,7 @@ const pendingStreamChatIds = new Set<number>(); ...@@ -44,8 +44,7 @@ const pendingStreamChatIds = new Set<number>();
export function useStreamChat({ export function useStreamChat({
hasChatId = true, hasChatId = true,
shouldProcessQueue = false, }: { hasChatId?: boolean } = {}) {
}: { hasChatId?: boolean; shouldProcessQueue?: boolean } = {}) {
const setMessagesById = useSetAtom(chatMessagesByIdAtom); const setMessagesById = useSetAtom(chatMessagesByIdAtom);
const isStreamingById = useAtomValue(isStreamingByIdAtom); const isStreamingById = useAtomValue(isStreamingByIdAtom);
const setIsStreamingById = useSetAtom(isStreamingByIdAtom); const setIsStreamingById = useSetAtom(isStreamingByIdAtom);
...@@ -66,8 +65,9 @@ export function useStreamChat({ ...@@ -66,8 +65,9 @@ export function useStreamChat({
const [queuedMessagesById, setQueuedMessagesById] = useAtom( const [queuedMessagesById, setQueuedMessagesById] = useAtom(
queuedMessagesByIdAtom, queuedMessagesByIdAtom,
); );
const [streamCompletedSuccessfullyById, setStreamCompletedSuccessfullyById] = const setStreamCompletedSuccessfullyById = useSetAtom(
useAtom(streamCompletedSuccessfullyByIdAtom); streamCompletedSuccessfullyByIdAtom,
);
const posthog = usePostHog(); const posthog = usePostHog();
const queryClient = useQueryClient(); const queryClient = useQueryClient();
...@@ -337,66 +337,6 @@ export function useStreamChat({ ...@@ -337,66 +337,6 @@ export function useStreamChat({
], ],
); );
// Process first queued message when streaming ends successfully
useEffect(() => {
if (!chatId || !shouldProcessQueue) return;
const queuedMessages = queuedMessagesById.get(chatId) ?? [];
const completedSuccessfully =
streamCompletedSuccessfullyById.get(chatId) ?? false;
// Only process queue if we have confirmation that the stream completed successfully
// This prevents race conditions where the queue might be processed during cancellation
if (queuedMessages.length > 0 && completedSuccessfully) {
// Clear the successful completion flag first to prevent loops
setStreamCompletedSuccessfullyById((prev) => {
const next = new Map(prev);
next.set(chatId, false);
return next;
});
// Get and remove the first message atomically by extracting it inside the setter
// This prevents race conditions where the queue might be modified between
// reading firstMessage and updating the queue
let messageToSend: QueuedMessageItem | undefined;
setQueuedMessagesById((prev) => {
const next = new Map(prev);
const current = prev.get(chatId) ?? [];
const [first, ...remainingMessages] = current;
messageToSend = first;
if (remainingMessages.length > 0) {
next.set(chatId, remainingMessages);
} else {
next.delete(chatId);
}
return next;
});
if (!messageToSend) return;
posthog.capture("chat:submit", { chatMode: settings?.selectedChatMode });
// Send the first message
streamMessage({
prompt: messageToSend.prompt,
chatId,
redo: false,
attachments: messageToSend.attachments,
selectedComponents: messageToSend.selectedComponents,
});
}
}, [
chatId,
queuedMessagesById,
streamCompletedSuccessfullyById,
streamMessage,
setQueuedMessagesById,
setStreamCompletedSuccessfullyById,
posthog,
settings?.selectedChatMode,
shouldProcessQueue,
]);
return { return {
streamMessage, streamMessage,
isStreaming: isStreaming:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论