Skip to content

Commit 75a0a9f

Browse files
authored
Merge pull request #2492 from simonferquel-clanker/fix/steer-queue-top-of-turn-drain
fix(runtime): drain steerQueue at top of RunStream loop to close idle-window race
2 parents 77abf88 + 584d8b7 commit 75a0a9f

2 files changed

Lines changed: 417 additions & 11 deletions

File tree

pkg/runtime/loop.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ func (r *LocalRuntime) registerDefaultTools() {
4141
})
4242
}
4343

44+
// appendSteerAndEmit adds a steer message to the session and emits the corresponding event.
45+
func (r *LocalRuntime) appendSteerAndEmit(sess *session.Session, sm QueuedMessage, events chan<- Event) {
46+
sess.AddMessage(session.UserMessage(sm.Content, sm.MultiContent...))
47+
events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1)
48+
}
49+
4450
// finalizeEventChannel performs cleanup at the end of a RunStream goroutine:
4551
// restores the previous elicitation channel, emits the StreamStopped event,
4652
// fires hooks, and closes the events channel.
@@ -294,6 +300,16 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
294300
}
295301
}
296302

303+
// Drain steer messages queued while idle or before the first model call
304+
// (covers idle-window and first-turn-miss races).
305+
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
306+
messageCountBeforeSteer := len(sess.GetAllMessages())
307+
for _, sm := range steered {
308+
r.appendSteerAndEmit(sess, sm, events)
309+
}
310+
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeSteer, events)
311+
}
312+
297313
messages := sess.GetMessages(a)
298314
slog.Debug("Retrieved messages for processing", "agent", a.Name(), "message_count", len(messages))
299315

@@ -418,19 +434,10 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
418434
// Record per-toolset model override for the next LLM turn.
419435
toolModelOverride = resolveToolCallModelOverride(res.Calls, agentTools)
420436

421-
// --- STEERING: mid-turn injection ---
422-
// Drain ALL pending steer messages. These are urgent course-
423-
// corrections that the model should see on the very next
424-
// iteration, wrapped in <system-reminder> tags.
437+
// Drain steer messages that arrived during tool calls.
425438
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
426439
for _, sm := range steered {
427-
wrapped := fmt.Sprintf(
428-
"<system-reminder>\nThe user sent the following message while you were working:\n%s\n\nPlease address this in your next response while continuing with your current tasks.\n</system-reminder>",
429-
sm.Content,
430-
)
431-
userMsg := session.UserMessage(wrapped, sm.MultiContent...)
432-
sess.AddMessage(userMsg)
433-
events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1)
440+
r.appendSteerAndEmit(sess, sm, events)
434441
}
435442

436443
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
@@ -441,6 +448,15 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
441448
slog.Debug("Conversation stopped", "agent", a.Name())
442449
r.executeStopHooks(ctx, sess, a, res.Content, events)
443450

451+
// Re-check steer queue: closes the race between the mid-loop drain and this stop.
452+
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
453+
for _, sm := range steered {
454+
r.appendSteerAndEmit(sess, sm, events)
455+
}
456+
r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events)
457+
continue
458+
}
459+
444460
// --- FOLLOW-UP: end-of-turn injection ---
445461
// Pop exactly one follow-up message. Unlike steered
446462
// messages, follow-ups are plain user messages that start

0 commit comments

Comments
 (0)