Unverified 提交 f2265f83 authored 作者: Nour Zakhma's avatar Nour Zakhma 提交者: GitHub

Feature/pause queue (#3077)

closes #3075 Implements pause/resume functionality for the message queue processing system, allowing users to pause automatic processing of queued messages and selectively resume. Includes session-level persistence via sessionStorage. <img width="594" height="694" alt="Capture d&#39;écran 2026-03-26 045040" src="https://github.com/user-attachments/assets/2cf9648f-7be1-466c-971c-e2e96faccd0d" /> 9 comprehensive E2E test cases covering: Pause/Resume button visibility Pause functionality and message blocking Resume processing SessionStorage persistence (empty & non-empty cases) Multiple pause/resume cycles Per-chat isolation Visual styling
上级 f7b8748f
import { test, Timeout } from "./helpers/test_helper";
import { expect } from "@playwright/test";
test.describe("pause queue", () => {
test.beforeEach(async ({ po }) => {
await po.setUp();
});
test("pause queue prevents dequeuing after current stream completes", async ({
po,
}) => {
const page = po.page;
const chatInput = po.chatActions.getChatInput();
await po.sendPrompt("tc=1 [sleep=medium]", { skipWaitForCompletion: true });
const stopButton = page.getByRole("button", { name: /cancel generation/i });
await expect(stopButton).toBeVisible({ timeout: Timeout.MEDIUM });
for (let i = 1; i <= 4; i++) {
await chatInput.fill(`message ${i}`);
await chatInput.press("Enter");
await expect(page.getByTestId("queue-header")).toContainText(
new RegExp(`^${i}\\s+Queued`),
{
timeout: Timeout.SHORT,
},
);
}
const queueHeader = page.getByTestId("queue-header");
await expect(queueHeader).toContainText(/4\s+Queued/, {
timeout: Timeout.SHORT,
});
const pauseButton = queueHeader.getByRole("button", {
name: /pause queue/i,
});
await expect(pauseButton).toBeVisible({ timeout: Timeout.SHORT });
await pauseButton.click();
await expect(page.getByText("Paused")).toBeVisible();
await po.chatActions.waitForChatCompletion();
await expect(queueHeader).toContainText(/4\s+Queued/);
});
test("stopping while paused keeps queue and resume starts sending", async ({
po,
}) => {
const page = po.page;
const chatInput = po.chatActions.getChatInput();
await po.sendPrompt("tc=1 [sleep=medium]", { skipWaitForCompletion: true });
const stopButton = page.getByRole("button", { name: /cancel generation/i });
await expect(stopButton).toBeVisible({ timeout: Timeout.MEDIUM });
for (let i = 1; i <= 4; i++) {
await chatInput.fill(`queued ${i} [sleep=medium]`);
await chatInput.press("Enter");
}
const queueHeader = page.getByTestId("queue-header");
await expect(queueHeader).toContainText(/4\s+Queued/, {
timeout: Timeout.SHORT,
});
const pauseButton = queueHeader.getByRole("button", {
name: /pause queue/i,
});
await expect(pauseButton).toBeVisible({ timeout: Timeout.SHORT });
await pauseButton.click();
await expect(page.getByText("Paused")).toBeVisible();
await stopButton.click();
await expect(queueHeader).toContainText(/4\s+Queued/);
const resumeButton = queueHeader.getByRole("button", {
name: /resume queue/i,
});
await expect(resumeButton).toBeVisible({ timeout: Timeout.SHORT });
await resumeButton.click();
await expect(page.getByText("Paused")).not.toBeVisible({
timeout: Timeout.MEDIUM,
});
await expect
.poll(
async () => {
const text = (await queueHeader.textContent()) ?? "";
const match = text.match(/(\d+)\s+Queued/i);
return match ? Number(match[1]) : 0;
},
{ timeout: Timeout.LONG },
)
.toBeLessThan(4);
});
test("sending while stopped with paused queue sends immediately and keeps queue", async ({
po,
}) => {
const page = po.page;
const chatInput = po.chatActions.getChatInput();
await po.sendPrompt("tc=1 [sleep=medium]", { skipWaitForCompletion: true });
const stopButton = page.getByRole("button", { name: /cancel generation/i });
await expect(stopButton).toBeVisible({ timeout: Timeout.MEDIUM });
for (let i = 1; i <= 3; i++) {
await chatInput.fill(`queued ${i} [sleep=medium]`);
await chatInput.press("Enter");
}
const queueHeader = page.getByTestId("queue-header");
await expect(queueHeader).toContainText(/3\s+Queued/, {
timeout: Timeout.SHORT,
});
await queueHeader.getByRole("button", { name: /pause queue/i }).click();
await expect(page.getByText("Paused")).toBeVisible();
await stopButton.click();
await expect(queueHeader).toContainText(/3\s+Queued/);
await chatInput.fill("should send immediately");
await chatInput.press("Enter");
const messagesList = page.getByTestId("messages-list");
await expect(messagesList.getByText("should send immediately")).toBeVisible(
{
timeout: Timeout.SHORT,
},
);
await expect(queueHeader).toContainText(/3\s+Queued/);
await expect(page.getByText("Paused")).toBeVisible();
});
});
...@@ -260,3 +260,6 @@ export const queuedMessagesByIdAtom = atom<Map<number, QueuedMessageItem[]>>( ...@@ -260,3 +260,6 @@ export const queuedMessagesByIdAtom = atom<Map<number, QueuedMessageItem[]>>(
export const streamCompletedSuccessfullyByIdAtom = atom<Map<number, boolean>>( export const streamCompletedSuccessfullyByIdAtom = atom<Map<number, boolean>>(
new Map(), new Map(),
); );
// Tracks if the queue is paused for each chat (Map<chatId, isPaused>)
export const queuePausedByIdAtom = atom<Map<number, boolean>>(new Map());
...@@ -127,6 +127,11 @@ export function ChatInput({ chatId }: { chatId?: number }) { ...@@ -127,6 +127,11 @@ export function ChatInput({ chatId }: { chatId?: number }) {
removeQueuedMessage, removeQueuedMessage,
reorderQueuedMessages, reorderQueuedMessages,
clearAllQueuedMessages, clearAllQueuedMessages,
isPaused,
pauseQueue,
clearPauseOnly,
resumeQueue,
clearCompletionFlag,
} = useStreamChat(); } = useStreamChat();
const [showError, setShowError] = useState(true); const [showError, setShowError] = useState(true);
const [isApproving, setIsApproving] = useState(false); // State for approving const [isApproving, setIsApproving] = useState(false); // State for approving
...@@ -358,6 +363,14 @@ export function ChatInput({ chatId }: { chatId?: number }) { ...@@ -358,6 +363,14 @@ export function ChatInput({ chatId }: { chatId?: number }) {
// eslint-disable-next-line react-hooks/exhaustive-deps // eslint-disable-next-line react-hooks/exhaustive-deps
}, []); }, []);
// Auto-clear pause state when queue becomes empty (Users expect that deleting all queued messages returns them to normal send mode)
useEffect(() => {
if (chatId && isPaused && queuedMessages.length === 0) {
clearPauseOnly();
}
}, [chatId, isPaused, queuedMessages.length, clearPauseOnly]);
// Queue management handlers // Queue management handlers
const handleEditQueuedMessage = useCallback( const handleEditQueuedMessage = useCallback(
(id: string) => { (id: string) => {
...@@ -510,7 +523,8 @@ export function ChatInput({ chatId }: { chatId?: number }) { ...@@ -510,7 +523,8 @@ export function ChatInput({ chatId }: { chatId?: number }) {
return; return;
} }
// If streaming, queue the message instead of sending immediately // Queue while actively streaming. If we're paused but currently idle,
// send the new message immediately and keep existing queued items paused.
if (isStreaming) { if (isStreaming) {
const queued = queueMessage({ const queued = queueMessage({
prompt: currentInput, prompt: currentInput,
...@@ -561,17 +575,17 @@ export function ChatInput({ chatId }: { chatId?: number }) { ...@@ -561,17 +575,17 @@ export function ChatInput({ chatId }: { chatId?: number }) {
}; };
const handleCancel = () => { const handleCancel = () => {
// Clear all queued messages first, BEFORE the IPC call, to ensure // Only clear the queue if NOT paused
// the queue is empty even if the backend response arrives quickly. if (!isPaused) {
// This prevents race conditions where the queue-processing effect clearAllQueuedMessages();
// could potentially run if the backend responds before queue clearing. }
clearAllQueuedMessages(); // Always reset editing state when cancelling, regardless of pause state
// Reset editing state so the "Editing queued message" banner is dismissed
// and restored attachments/components are cleared
if (editingQueuedMessageId) { if (editingQueuedMessageId) {
resetEditingState(); resetEditingState();
} }
// Do NOT reset pause state here; queued messages should remain paused after stopping
if (chatId) { if (chatId) {
clearCompletionFlag();
ipc.chat.cancelStream(chatId); ipc.chat.cancelStream(chatId);
} }
setIsStreaming(false); setIsStreaming(false);
...@@ -760,6 +774,9 @@ export function ChatInput({ chatId }: { chatId?: number }) { ...@@ -760,6 +774,9 @@ export function ChatInput({ chatId }: { chatId?: number }) {
onMoveDown={handleMoveDown} onMoveDown={handleMoveDown}
isStreaming={isStreaming} isStreaming={isStreaming}
hasError={!!error} hasError={!!error}
isPaused={isPaused}
onPauseQueue={pauseQueue}
onResumeQueue={resumeQueue}
/> />
)} )}
{/* Show editing indicator when editing a queued message */} {/* Show editing indicator when editing a queued message */}
......
...@@ -9,6 +9,8 @@ import { ...@@ -9,6 +9,8 @@ import {
ArrowUp, ArrowUp,
ArrowDown, ArrowDown,
Paperclip, Paperclip,
PlayIcon,
PauseIcon,
} from "lucide-react"; } from "lucide-react";
import { cn } from "@/lib/utils"; import { cn } from "@/lib/utils";
...@@ -20,6 +22,9 @@ interface QueuedMessagesListProps { ...@@ -20,6 +22,9 @@ interface QueuedMessagesListProps {
onMoveDown: (id: string) => void; onMoveDown: (id: string) => void;
isStreaming: boolean; isStreaming: boolean;
hasError: boolean; hasError: boolean;
isPaused: boolean;
onPauseQueue: () => void;
onResumeQueue: () => void;
} }
interface QueuedMessageItemRowProps { interface QueuedMessageItemRowProps {
...@@ -106,6 +111,9 @@ export function QueuedMessagesList({ ...@@ -106,6 +111,9 @@ export function QueuedMessagesList({
onMoveDown, onMoveDown,
isStreaming, isStreaming,
hasError, hasError,
isPaused,
onPauseQueue,
onResumeQueue,
}: QueuedMessagesListProps) { }: QueuedMessagesListProps) {
const [isExpanded, setIsExpanded] = useState(true); const [isExpanded, setIsExpanded] = useState(true);
...@@ -118,25 +126,70 @@ export function QueuedMessagesList({ ...@@ -118,25 +126,70 @@ export function QueuedMessagesList({
: "ready to send"; : "ready to send";
return ( return (
<div className="border-b border-border bg-muted/30"> <div
<button data-testid="queue-header"
type="button" className={cn(
onClick={() => setIsExpanded(!isExpanded)} "border-b border-border bg-muted/30",
className="w-full flex items-center justify-between px-3 py-2 hover:bg-muted/50 transition-colors" isPaused && "bg-yellow-500/10 border-yellow-500/50",
> )}
<div className="flex items-center gap-2.5 min-w-0 flex-1"> >
<div className="w-full flex items-center justify-between px-3 py-2">
{/* Make left header area clickable for expand/collapse */}
<div
role="button"
tabIndex={0}
onKeyDown={(e) => {
if (e.key === "Enter" || e.key === " ") {
e.preventDefault();
setIsExpanded((v) => !v);
}
}}
className="flex items-center gap-2.5 min-w-0 flex-1 cursor-pointer select-none focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring rounded-sm"
onClick={() => setIsExpanded((v) => !v)}
title={isExpanded ? "Collapse" : "Expand"}
>
<ListOrdered className="w-4 h-4 text-muted-foreground flex-shrink-0" /> <ListOrdered className="w-4 h-4 text-muted-foreground flex-shrink-0" />
<span className="text-sm">{messages.length} Queued</span> <span className="text-sm">{messages.length} Queued</span>
<span className="text-xs text-muted-foreground">- {statusText}</span> {!isPaused && (
<span className="text-xs text-muted-foreground">
- {statusText}
</span>
)}
{isPaused && (
<span className="text-xs px-2 py-0.5 rounded bg-yellow-500/20 text-yellow-700 dark:text-yellow-400 font-medium">
Paused
</span>
)}
</div> </div>
<div className="flex items-center gap-2 flex-shrink-0 ml-3"> <div className="flex items-center gap-2 flex-shrink-0 ml-3">
{isExpanded ? ( <button
<ChevronUp className="w-4 h-4 text-muted-foreground" /> type="button"
) : ( onClick={isPaused ? onResumeQueue : onPauseQueue}
<ChevronDown className="w-4 h-4 text-muted-foreground" /> aria-label={isPaused ? "Resume queue" : "Pause queue"}
)} title={isPaused ? "Resume queue" : "Pause queue"}
className={cn(
"ml-2 px-2 py-1 rounded-lg transition-colors duration-150 cursor-pointer",
isPaused
? "bg-yellow-500/20 text-yellow-700 dark:text-yellow-400 hover:bg-yellow-500/30"
: "text-muted-foreground hover:text-primary hover:bg-muted/50",
)}
>
{isPaused ? <PlayIcon size={18} /> : <PauseIcon size={18} />}
</button>
<button
type="button"
onClick={() => setIsExpanded(!isExpanded)}
className="p-1 hover:bg-muted rounded cursor-pointer"
title={isExpanded ? "Collapse" : "Expand"}
>
{isExpanded ? (
<ChevronUp className="w-4 h-4 text-muted-foreground" />
) : (
<ChevronDown className="w-4 h-4 text-muted-foreground" />
)}
</button>
</div> </div>
</button> </div>
<div <div
className="grid transition-[grid-template-rows] duration-200 ease-in-out" className="grid transition-[grid-template-rows] duration-200 ease-in-out"
......
...@@ -3,6 +3,8 @@ import { useAtom } from "jotai"; ...@@ -3,6 +3,8 @@ import { useAtom } from "jotai";
import { import {
queuedMessagesByIdAtom, queuedMessagesByIdAtom,
streamCompletedSuccessfullyByIdAtom, streamCompletedSuccessfullyByIdAtom,
queuePausedByIdAtom,
isStreamingByIdAtom,
type QueuedMessageItem, type QueuedMessageItem,
} from "@/atoms/chatAtoms"; } from "@/atoms/chatAtoms";
import { useStreamChat } from "./useStreamChat"; import { useStreamChat } from "./useStreamChat";
...@@ -20,6 +22,8 @@ export function useQueueProcessor() { ...@@ -20,6 +22,8 @@ export function useQueueProcessor() {
); );
const [streamCompletedSuccessfullyById, setStreamCompletedSuccessfullyById] = const [streamCompletedSuccessfullyById, setStreamCompletedSuccessfullyById] =
useAtom(streamCompletedSuccessfullyByIdAtom); useAtom(streamCompletedSuccessfullyByIdAtom);
const [queuePausedById] = useAtom(queuePausedByIdAtom);
const [isStreamingById] = useAtom(isStreamingByIdAtom);
const posthog = usePostHog(); const posthog = usePostHog();
const { settings } = useSettings(); const { settings } = useSettings();
...@@ -28,8 +32,16 @@ export function useQueueProcessor() { ...@@ -28,8 +32,16 @@ export function useQueueProcessor() {
for (const [chatId, queuedMessages] of queuedMessagesById) { for (const [chatId, queuedMessages] of queuedMessagesById) {
if (queuedMessages.length === 0) continue; if (queuedMessages.length === 0) continue;
const isPaused = queuePausedById.get(chatId) ?? false;
if (isPaused) continue;
const isStreaming = isStreamingById.get(chatId) ?? false;
// Never dequeue while a stream is active for this chat
if (isStreaming) continue;
const completedSuccessfully = const completedSuccessfully =
streamCompletedSuccessfullyById.get(chatId) ?? false; streamCompletedSuccessfullyById.get(chatId) ?? false;
// Only dequeue if the previous stream completed successfully
if (!completedSuccessfully) continue; if (!completedSuccessfully) continue;
// Clear the successful completion flag first to prevent loops // Clear the successful completion flag first to prevent loops
...@@ -74,6 +86,8 @@ export function useQueueProcessor() { ...@@ -74,6 +86,8 @@ export function useQueueProcessor() {
}, [ }, [
queuedMessagesById, queuedMessagesById,
streamCompletedSuccessfullyById, streamCompletedSuccessfullyById,
queuePausedById,
isStreamingById,
streamMessage, streamMessage,
setQueuedMessagesById, setQueuedMessagesById,
setStreamCompletedSuccessfullyById, setStreamCompletedSuccessfullyById,
......
...@@ -13,6 +13,7 @@ import { ...@@ -13,6 +13,7 @@ import {
recentStreamChatIdsAtom, recentStreamChatIdsAtom,
queuedMessagesByIdAtom, queuedMessagesByIdAtom,
streamCompletedSuccessfullyByIdAtom, streamCompletedSuccessfullyByIdAtom,
queuePausedByIdAtom,
type QueuedMessageItem, type QueuedMessageItem,
} from "@/atoms/chatAtoms"; } from "@/atoms/chatAtoms";
import { ipc } from "@/ipc/types"; import { ipc } from "@/ipc/types";
...@@ -69,6 +70,8 @@ export function useStreamChat({ ...@@ -69,6 +70,8 @@ export function useStreamChat({
const setStreamCompletedSuccessfullyById = useSetAtom( const setStreamCompletedSuccessfullyById = useSetAtom(
streamCompletedSuccessfullyByIdAtom, streamCompletedSuccessfullyByIdAtom,
); );
const queuePausedById = useAtomValue(queuePausedByIdAtom);
const setQueuePausedById = useSetAtom(queuePausedByIdAtom);
const posthog = usePostHog(); const posthog = usePostHog();
const queryClient = useQueryClient(); const queryClient = useQueryClient();
...@@ -402,34 +405,10 @@ export function useStreamChat({ ...@@ -402,34 +405,10 @@ export function useStreamChat({
], ],
); );
return { // Memoize queue management functions to prevent unnecessary re-renders
streamMessage, // in components that depend on these functions (e.g., restore effect)
isStreaming: const queueMessage = useCallback(
hasChatId && chatId !== undefined (message: Omit<QueuedMessageItem, "id">): boolean => {
? (isStreamingById.get(chatId) ?? false)
: false,
error:
hasChatId && chatId !== undefined
? (errorById.get(chatId) ?? null)
: null,
setError: (value: string | null) =>
setErrorById((prev) => {
const next = new Map(prev);
if (chatId !== undefined) next.set(chatId, value);
return next;
}),
setIsStreaming: (value: boolean) =>
setIsStreamingById((prev) => {
const next = new Map(prev);
if (chatId !== undefined) next.set(chatId, value);
return next;
}),
// Multi-message queue support
queuedMessages:
hasChatId && chatId !== undefined
? (queuedMessagesById.get(chatId) ?? [])
: [],
queueMessage: (message: Omit<QueuedMessageItem, "id">): boolean => {
if (chatId === undefined) return false; if (chatId === undefined) return false;
const newItem: QueuedMessageItem = { const newItem: QueuedMessageItem = {
...message, ...message,
...@@ -443,7 +422,11 @@ export function useStreamChat({ ...@@ -443,7 +422,11 @@ export function useStreamChat({
}); });
return true; return true;
}, },
updateQueuedMessage: ( [chatId, setQueuedMessagesById],
);
const updateQueuedMessage = useCallback(
(
id: string, id: string,
updates: Partial< updates: Partial<
Pick<QueuedMessageItem, "prompt" | "attachments" | "selectedComponents"> Pick<QueuedMessageItem, "prompt" | "attachments" | "selectedComponents">
...@@ -460,7 +443,11 @@ export function useStreamChat({ ...@@ -460,7 +443,11 @@ export function useStreamChat({
return next; return next;
}); });
}, },
removeQueuedMessage: (id: string) => { [chatId, setQueuedMessagesById],
);
const removeQueuedMessage = useCallback(
(id: string) => {
if (chatId === undefined) return; if (chatId === undefined) return;
setQueuedMessagesById((prev) => { setQueuedMessagesById((prev) => {
const next = new Map(prev); const next = new Map(prev);
...@@ -474,7 +461,11 @@ export function useStreamChat({ ...@@ -474,7 +461,11 @@ export function useStreamChat({
return next; return next;
}); });
}, },
reorderQueuedMessages: (fromIndex: number, toIndex: number) => { [chatId, setQueuedMessagesById],
);
const reorderQueuedMessages = useCallback(
(fromIndex: number, toIndex: number) => {
if (chatId === undefined) return; if (chatId === undefined) return;
setQueuedMessagesById((prev) => { setQueuedMessagesById((prev) => {
const next = new Map(prev); const next = new Map(prev);
...@@ -493,13 +484,93 @@ export function useStreamChat({ ...@@ -493,13 +484,93 @@ export function useStreamChat({
return next; return next;
}); });
}, },
clearAllQueuedMessages: () => { [chatId, setQueuedMessagesById],
);
const clearAllQueuedMessages = useCallback(() => {
if (chatId === undefined) return;
setQueuedMessagesById((prev) => {
const next = new Map(prev);
next.delete(chatId);
return next;
});
}, [chatId, setQueuedMessagesById]);
return {
streamMessage,
isStreaming:
hasChatId && chatId !== undefined
? (isStreamingById.get(chatId) ?? false)
: false,
error:
hasChatId && chatId !== undefined
? (errorById.get(chatId) ?? null)
: null,
setError: (value: string | null) =>
setErrorById((prev) => {
const next = new Map(prev);
if (chatId !== undefined) next.set(chatId, value);
return next;
}),
setIsStreaming: (value: boolean) =>
setIsStreamingById((prev) => {
const next = new Map(prev);
if (chatId !== undefined) next.set(chatId, value);
return next;
}),
// Multi-message queue support
queuedMessages:
hasChatId && chatId !== undefined
? (queuedMessagesById.get(chatId) ?? [])
: [],
queueMessage,
updateQueuedMessage,
removeQueuedMessage,
reorderQueuedMessages,
clearAllQueuedMessages,
isPaused:
hasChatId && chatId !== undefined
? (queuePausedById.get(chatId) ?? false)
: false,
pauseQueue: useCallback(() => {
if (chatId === undefined) return; if (chatId === undefined) return;
setQueuedMessagesById((prev) => { setQueuePausedById((prev) => {
const next = new Map(prev);
next.set(chatId, true);
return next;
});
}, [chatId, setQueuePausedById]),
clearPauseOnly: useCallback(() => {
if (chatId === undefined) return;
setQueuePausedById((prev) => {
const next = new Map(prev);
next.set(chatId, false);
return next;
});
}, [chatId, setQueuePausedById]),
resumeQueue: useCallback(() => {
if (chatId === undefined) return;
setQueuePausedById((prev) => {
const next = new Map(prev);
next.set(chatId, false);
return next;
});
// Signal the queue processor if we're not currently streaming
if (!pendingStreamChatIds.has(chatId)) {
setStreamCompletedSuccessfullyById((prev) => {
const next = new Map(prev);
next.set(chatId, true);
return next;
});
}
}, [chatId, setQueuePausedById, setStreamCompletedSuccessfullyById]),
clearCompletionFlag: useCallback(() => {
if (chatId === undefined) return;
setStreamCompletedSuccessfullyById((prev) => {
const next = new Map(prev); const next = new Map(prev);
next.delete(chatId); next.delete(chatId);
return next; return next;
}); });
}, }, [chatId, setStreamCompletedSuccessfullyById]),
}; };
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论