Skip to content

Commit d6c1bc2

Browse files
v0.6.58: queue abort state machine improvement, contributing guide
2 parents 595c4c3 + d93a6f5 commit d6c1bc2

10 files changed

Lines changed: 453 additions & 112 deletions

File tree

.github/CONTRIBUTING.md

Lines changed: 166 additions & 93 deletions
Large diffs are not rendered by default.

apps/sim/lib/copilot/request/go/stream.test.ts

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,24 @@ import {
1010
MothershipStreamV1ToolOutcome,
1111
MothershipStreamV1ToolPhase,
1212
} from '@/lib/copilot/generated/mothership-stream-v1'
13+
14+
vi.mock('@/lib/copilot/request/session', async () => {
15+
const actual = await vi.importActual<typeof import('@/lib/copilot/request/session')>(
16+
'@/lib/copilot/request/session'
17+
)
18+
return {
19+
...actual,
20+
hasAbortMarker: vi.fn().mockResolvedValue(false),
21+
}
22+
})
23+
1324
import {
1425
buildPreviewContentUpdate,
1526
decodeJsonStringPrefix,
1627
extractEditContent,
1728
runStreamLoop,
1829
} from '@/lib/copilot/request/go/stream'
19-
import { createEvent } from '@/lib/copilot/request/session'
30+
import { AbortReason, createEvent, hasAbortMarker } from '@/lib/copilot/request/session'
2031
import { RequestTraceV1Outcome, TraceCollector } from '@/lib/copilot/request/trace'
2132
import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/types'
2233

@@ -285,6 +296,137 @@ describe('copilot go stream helpers', () => {
285296
).toBe(true)
286297
})
287298

299+
it('reclassifies as aborted when the body closes without terminal but the abort marker is set', async () => {
300+
const textEvent = createEvent({
301+
streamId: 'stream-1',
302+
cursor: '1',
303+
seq: 1,
304+
requestId: 'req-1',
305+
type: MothershipStreamV1EventType.text,
306+
payload: {
307+
channel: 'assistant',
308+
text: 'partial response',
309+
},
310+
})
311+
312+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
313+
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)
314+
315+
const context = createStreamingContext()
316+
const execContext: ExecutionContext = {
317+
userId: 'user-1',
318+
workflowId: 'workflow-1',
319+
}
320+
321+
await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
322+
timeout: 1000,
323+
})
324+
325+
expect(hasAbortMarker).toHaveBeenCalledWith(context.messageId)
326+
expect(context.wasAborted).toBe(true)
327+
expect(
328+
context.errors.some((message) =>
329+
message.includes('Copilot backend stream ended before a terminal event')
330+
)
331+
).toBe(false)
332+
})
333+
334+
it('invokes onAbortObserved with MarkerObservedAtBodyClose when reclassifying via the abort marker', async () => {
335+
const textEvent = createEvent({
336+
streamId: 'stream-1',
337+
cursor: '1',
338+
seq: 1,
339+
requestId: 'req-1',
340+
type: MothershipStreamV1EventType.text,
341+
payload: {
342+
channel: 'assistant',
343+
text: 'partial response',
344+
},
345+
})
346+
347+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
348+
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)
349+
350+
const context = createStreamingContext()
351+
const execContext: ExecutionContext = {
352+
userId: 'user-1',
353+
workflowId: 'workflow-1',
354+
}
355+
const onAbortObserved = vi.fn()
356+
357+
await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
358+
timeout: 1000,
359+
onAbortObserved,
360+
})
361+
362+
expect(onAbortObserved).toHaveBeenCalledTimes(1)
363+
expect(onAbortObserved).toHaveBeenCalledWith(AbortReason.MarkerObservedAtBodyClose)
364+
expect(context.wasAborted).toBe(true)
365+
})
366+
367+
it('does not invoke onAbortObserved when no abort marker is present at body close', async () => {
368+
const textEvent = createEvent({
369+
streamId: 'stream-1',
370+
cursor: '1',
371+
seq: 1,
372+
requestId: 'req-1',
373+
type: MothershipStreamV1EventType.text,
374+
payload: {
375+
channel: 'assistant',
376+
text: 'partial response',
377+
},
378+
})
379+
380+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
381+
vi.mocked(hasAbortMarker).mockResolvedValueOnce(false)
382+
383+
const context = createStreamingContext()
384+
const execContext: ExecutionContext = {
385+
userId: 'user-1',
386+
workflowId: 'workflow-1',
387+
}
388+
const onAbortObserved = vi.fn()
389+
390+
await expect(
391+
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
392+
timeout: 1000,
393+
onAbortObserved,
394+
})
395+
).rejects.toThrow('Copilot backend stream ended before a terminal event')
396+
397+
expect(onAbortObserved).not.toHaveBeenCalled()
398+
})
399+
400+
it('still fails closed when the body closes without terminal and the abort marker check throws', async () => {
401+
const textEvent = createEvent({
402+
streamId: 'stream-1',
403+
cursor: '1',
404+
seq: 1,
405+
requestId: 'req-1',
406+
type: MothershipStreamV1EventType.text,
407+
payload: {
408+
channel: 'assistant',
409+
text: 'partial response',
410+
},
411+
})
412+
413+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
414+
vi.mocked(hasAbortMarker).mockRejectedValueOnce(new Error('redis unavailable'))
415+
416+
const context = createStreamingContext()
417+
const execContext: ExecutionContext = {
418+
userId: 'user-1',
419+
workflowId: 'workflow-1',
420+
}
421+
422+
await expect(
423+
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
424+
timeout: 1000,
425+
})
426+
).rejects.toThrow('Copilot backend stream ended before a terminal event')
427+
expect(context.wasAborted).toBe(false)
428+
})
429+
288430
it('fails closed when the shared stream receives an invalid event', async () => {
289431
vi.mocked(fetch).mockResolvedValueOnce(
290432
createSseResponse([

apps/sim/lib/copilot/request/go/stream.ts

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import {
3030
} from '@/lib/copilot/request/handlers/types'
3131
import { getCopilotTracer } from '@/lib/copilot/request/otel'
3232
import {
33+
AbortReason,
3334
eventToStreamEvent,
35+
hasAbortMarker,
3436
isSubagentSpanStreamEvent,
3537
parsePersistedStreamEventEnvelope,
3638
} from '@/lib/copilot/request/session'
@@ -436,16 +438,32 @@ export async function runStreamLoop(
436438
})
437439

438440
if (!context.streamComplete && !abortSignal?.aborted && !context.wasAborted) {
439-
const streamPath = new URL(fetchUrl).pathname
440-
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
441-
context.errors.push(message)
442-
logger.error('Copilot backend stream ended before a terminal event', {
443-
path: streamPath,
444-
requestId: context.requestId,
445-
messageId: context.messageId,
446-
})
447-
endedOn = CopilotSseCloseReason.ClosedNoTerminal
448-
throw new CopilotBackendError(message, { status: 503 })
441+
let abortRequested = false
442+
try {
443+
abortRequested = await hasAbortMarker(context.messageId)
444+
} catch (error) {
445+
logger.warn('Failed to read abort marker at body close', {
446+
streamId: context.messageId,
447+
error: error instanceof Error ? error.message : String(error),
448+
})
449+
}
450+
451+
if (abortRequested) {
452+
options.onAbortObserved?.(AbortReason.MarkerObservedAtBodyClose)
453+
context.wasAborted = true
454+
endedOn = CopilotSseCloseReason.Aborted
455+
} else {
456+
const streamPath = new URL(fetchUrl).pathname
457+
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
458+
context.errors.push(message)
459+
logger.error('Copilot backend stream ended before a terminal event', {
460+
path: streamPath,
461+
requestId: context.requestId,
462+
messageId: context.messageId,
463+
})
464+
endedOn = CopilotSseCloseReason.ClosedNoTerminal
465+
throw new CopilotBackendError(message, { status: 503 })
466+
}
449467
}
450468
} catch (error) {
451469
if (error instanceof FatalSseEventError && !context.errors.includes(error.message)) {

apps/sim/lib/copilot/request/lifecycle/headless.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ export async function runHeadlessCopilotLifecycle(
5353
simRequestId,
5454
otelContext,
5555
})
56-
outcome = options.abortSignal?.aborted
57-
? RequestTraceV1Outcome.cancelled
58-
: result.success
59-
? RequestTraceV1Outcome.success
56+
outcome = result.success
57+
? RequestTraceV1Outcome.success
58+
: options.abortSignal?.aborted || result.cancelled
59+
? RequestTraceV1Outcome.cancelled
6060
: RequestTraceV1Outcome.error
6161
return result
6262
} catch (error) {

apps/sim/lib/copilot/request/lifecycle/start.test.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import { propagation, trace } from '@opentelemetry/api'
66
import { W3CTraceContextPropagator } from '@opentelemetry/core'
77
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base'
88
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
9-
import { MothershipStreamV1EventType } from '@/lib/copilot/generated/mothership-stream-v1'
9+
import {
10+
MothershipStreamV1CompletionStatus,
11+
MothershipStreamV1EventType,
12+
} from '@/lib/copilot/generated/mothership-stream-v1'
1013

1114
const {
1215
runCopilotLifecycle,
@@ -60,6 +63,7 @@ vi.mock('@/lib/copilot/request/session', () => ({
6063
registerActiveStream: vi.fn(),
6164
unregisterActiveStream: vi.fn(),
6265
startAbortPoller: vi.fn().mockReturnValue(setInterval(() => {}, 999999)),
66+
isExplicitStopReason: vi.fn().mockReturnValue(false),
6367
SSE_RESPONSE_HEADERS: {},
6468
StreamWriter: vi.fn().mockImplementation(() => ({
6569
attach: vi.fn().mockImplementation((ctrl: ReadableStreamDefaultController) => {
@@ -211,6 +215,46 @@ describe('createSSEStream terminal error handling', () => {
211215
expect(scheduleBufferCleanup).toHaveBeenCalledWith('stream-1')
212216
})
213217

218+
it('publishes a cancelled completion (not an error) when the orchestrator reports cancelled without abortSignal aborted', async () => {
219+
runCopilotLifecycle.mockResolvedValue({
220+
success: false,
221+
cancelled: true,
222+
content: '',
223+
contentBlocks: [],
224+
toolCalls: [],
225+
})
226+
227+
const stream = createSSEStream({
228+
requestPayload: { message: 'hello' },
229+
userId: 'user-1',
230+
streamId: 'stream-1',
231+
executionId: 'exec-1',
232+
runId: 'run-1',
233+
currentChat: null,
234+
isNewChat: false,
235+
message: 'hello',
236+
titleModel: 'gpt-5.4',
237+
requestId: 'req-cancelled',
238+
orchestrateOptions: {},
239+
})
240+
241+
await drainStream(stream)
242+
243+
expect(appendEvent).not.toHaveBeenCalledWith(
244+
expect.objectContaining({
245+
type: MothershipStreamV1EventType.error,
246+
})
247+
)
248+
expect(appendEvent).toHaveBeenCalledWith(
249+
expect.objectContaining({
250+
type: MothershipStreamV1EventType.complete,
251+
payload: expect.objectContaining({
252+
status: MothershipStreamV1CompletionStatus.cancelled,
253+
}),
254+
})
255+
)
256+
})
257+
214258
it('passes an OTel context into the streaming lifecycle', async () => {
215259
let lifecycleTraceparent = ''
216260
runCopilotLifecycle.mockImplementation(async (_payload, options) => {

apps/sim/lib/copilot/request/lifecycle/start.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,11 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
249249
onEvent: async (event) => {
250250
await publisher.publish(event)
251251
},
252+
onAbortObserved: (reason) => {
253+
if (!abortController.signal.aborted) {
254+
abortController.abort(reason)
255+
}
256+
},
252257
})
253258

254259
lifecycleResult = result
@@ -266,7 +271,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
266271
// 3. Otherwise → error.
267272
outcome = result.success
268273
? RequestTraceV1Outcome.success
269-
: abortController.signal.aborted || publisher.clientDisconnected
274+
: result.cancelled || abortController.signal.aborted || publisher.clientDisconnected
270275
? RequestTraceV1Outcome.cancelled
271276
: RequestTraceV1Outcome.error
272277
if (outcome === RequestTraceV1Outcome.cancelled) {

apps/sim/lib/copilot/request/session/abort-reason.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ export const AbortReason = {
2222
* that the node that DID receive it wrote, and aborts on the poll.
2323
*/
2424
RedisPoller: 'redis_abort_marker:poller',
25+
/**
26+
* Cross-process stop: same root cause as `RedisPoller`, but observed
27+
* by `runStreamLoop` at body close (the Go body ended before the
28+
* 250ms poller's next tick) rather than by the polling timer.
29+
*/
30+
MarkerObservedAtBodyClose: 'redis_abort_marker:body_close',
2531
/** Internal timeout on the outbound explicit-abort fetch to Go. */
2632
ExplicitAbortFetchTimeout: 'timeout:go_explicit_abort_fetch',
2733
} as const
@@ -38,5 +44,9 @@ export type AbortReasonValue = (typeof AbortReason)[keyof typeof AbortReason]
3844
* stops, mirroring `requestctx.IsExplicitUserStop` on the Go side.
3945
*/
4046
export function isExplicitStopReason(reason: unknown): boolean {
41-
return reason === AbortReason.UserStop || reason === AbortReason.RedisPoller
47+
return (
48+
reason === AbortReason.UserStop ||
49+
reason === AbortReason.RedisPoller ||
50+
reason === AbortReason.MarkerObservedAtBodyClose
51+
)
4252
}

apps/sim/lib/copilot/request/session/abort.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,47 @@ describe('startAbortPoller heartbeat', () => {
9898
}
9999
})
100100

101+
it('aborts the controller before clearing the marker so the marker is never observable as cleared while the signal is still unaborted', async () => {
102+
const controller = new AbortController()
103+
const streamId = 'stream-order-1'
104+
105+
let signalAbortedWhenMarkerCleared: boolean | null = null
106+
mockClearAbortMarker.mockImplementationOnce(async () => {
107+
signalAbortedWhenMarkerCleared = controller.signal.aborted
108+
})
109+
mockHasAbortMarker.mockResolvedValueOnce(true)
110+
111+
const interval = startAbortPoller(streamId, controller, {})
112+
113+
try {
114+
await vi.advanceTimersByTimeAsync(300)
115+
116+
expect(mockClearAbortMarker).toHaveBeenCalledWith(streamId)
117+
expect(signalAbortedWhenMarkerCleared).toBe(true)
118+
expect(controller.signal.aborted).toBe(true)
119+
} finally {
120+
clearInterval(interval)
121+
}
122+
})
123+
124+
it('does not clear the marker when the signal is already aborted (no double abort)', async () => {
125+
const controller = new AbortController()
126+
controller.abort('preexisting')
127+
const streamId = 'stream-order-2'
128+
129+
mockHasAbortMarker.mockResolvedValueOnce(true)
130+
131+
const interval = startAbortPoller(streamId, controller, {})
132+
133+
try {
134+
await vi.advanceTimersByTimeAsync(300)
135+
136+
expect(mockClearAbortMarker).not.toHaveBeenCalled()
137+
} finally {
138+
clearInterval(interval)
139+
}
140+
})
141+
101142
it('stops heartbeating after ownership is lost', async () => {
102143
const controller = new AbortController()
103144
const streamId = 'stream-lost'

0 commit comments

Comments
 (0)