@@ -3,23 +3,30 @@ import type { Neuri, NeuriContext } from 'neuri'
33
44import type { EventManager } from '../perception/event-manager'
55import type { BotEvent , MineflayerWithAgents , UserIntentPayload } from '../types'
6+ import type { CancellationToken } from './task-state'
67
78import { withRetry } from '@moeru/std'
89import { system , user } from 'neuri/openai'
910
11+ import { DebugServer } from '../../debug-server'
1012import { handleLLMCompletion } from './completion'
1113import { generateStatusPrompt } from './prompt'
14+ import { TaskManager } from './task-manager'
1215
1316export class Orchestrator {
14- private isProcessing = false
17+ private taskManager : TaskManager
18+ private eventQueue : Array < BotEvent < UserIntentPayload > > = [ ]
19+ private isProcessingQueue = false
1520
1621 constructor (
1722 private readonly deps : {
1823 eventManager : EventManager
1924 neuri : Neuri
2025 logger : Logg
2126 } ,
22- ) { }
27+ ) {
28+ this . taskManager = new TaskManager ( deps . logger )
29+ }
2330
2431 public init ( bot : MineflayerWithAgents ) : void {
2532 this . deps . eventManager . on < UserIntentPayload > ( 'user_intent' , async ( event ) => {
@@ -38,54 +45,49 @@ export class Orchestrator {
3845 return
3946 }
4047
41- if ( this . isProcessing ) {
42- this . deps . logger . warn ( 'Still processing previous intent, skipping or queuing (TBD)' )
43- // For now, let's just abort or we could queue. Implementation plan said we'd decide.
44- // Let's implement a simple queue or at least a lock.
45- return
48+ // Check if there's a current task
49+ if ( this . taskManager . hasCurrentTask ( ) ) {
50+ // Determine if we should cancel current task or queue this event
51+ if ( this . shouldCancelCurrentTask ( event ) ) {
52+ this . deps . logger
53+ . withFields ( { currentTaskId : this . taskManager . getCurrentTask ( ) ?. id , priority : event . priority } )
54+ . log ( 'Orchestrator: Cancelling current task for high-priority event' )
55+ this . taskManager . cancelCurrentTask ( 'High-priority event received' )
56+ this . broadcastTaskStatus ( )
57+ }
58+ else {
59+ // Queue the event for later processing
60+ this . eventQueue . push ( event )
61+ this . deps . logger
62+ . withFields ( { queueSize : this . eventQueue . length , username, event : content } )
63+ . log ( 'Orchestrator: Event queued' )
64+ this . broadcastTaskStatus ( )
65+
66+ // Notify user that we're busy
67+ const busyMessage = 'I\'m busy right now, I\'ll get to that in a moment!'
68+ if ( source . reply ) {
69+ source . reply ( busyMessage )
70+ }
71+ else {
72+ bot . bot . chat ( busyMessage )
73+ }
74+ return
75+ }
4676 }
4777
48- this . isProcessing = true
49- this . deps . logger . withFields ( { username, content } ) . log ( 'Orchestrator: Handling user intent' )
78+ // Create new task
79+ const task = this . taskManager . createTask ( content )
80+ this . deps . logger
81+ . withFields ( { username, content, taskId : task . id } )
82+ . log ( 'Orchestrator: Starting new task' )
83+ this . broadcastTaskStatus ( )
5084
5185 try {
5286 // 1. Update memory
5387 bot . memory . chatHistory . push ( user ( `${ username } : ${ content } ` ) )
5488
55- // 2. Planning
56- const plan = await bot . planning . createPlan ( content )
57- this . deps . logger . withFields ( { plan } ) . log ( 'Orchestrator: Plan created' )
58-
59- // 3. Execution
60- await bot . planning . executePlan ( plan )
61- this . deps . logger . log ( 'Orchestrator: Plan executed successfully' )
62-
63- // 4. Response Generation
64- const statusPrompt = await generateStatusPrompt ( bot )
65- const response = await this . deps . neuri . handleStateless (
66- [ ...bot . memory . chatHistory , system ( statusPrompt ) ] ,
67- async ( c : NeuriContext ) => {
68- this . deps . logger . log ( 'Orchestrator: thinking...' )
69- return withRetry < NeuriContext , string > (
70- ctx => handleLLMCompletion ( ctx , bot , this . deps . logger ) ,
71- {
72- retry : 3 ,
73- retryDelay : 1000 ,
74- } ,
75- ) ( c )
76- } ,
77- )
78-
79- // 5. Reply
80- if ( response ) {
81- this . deps . logger . withFields ( { response } ) . log ( 'Orchestrator: Responded' )
82- if ( source . reply ) {
83- source . reply ( response )
84- }
85- else {
86- bot . bot . chat ( response )
87- }
88- }
89+ // 2. Execute task with cancellation support
90+ await this . executeTaskWithCancellation ( bot , event , task . cancellationToken )
8991 }
9092 catch ( error ) {
9193 this . deps . logger . withError ( error ) . warn ( 'Orchestrator: Failed to process intent' )
@@ -98,7 +100,114 @@ export class Orchestrator {
98100 }
99101 }
100102 finally {
101- this . isProcessing = false
103+ this . taskManager . completeCurrentTask ( )
104+ this . broadcastTaskStatus ( )
105+ // Process next queued event
106+ this . processNextQueuedEvent ( bot )
107+ }
108+ }
109+
110+ private async executeTaskWithCancellation (
111+ bot : MineflayerWithAgents ,
112+ event : BotEvent < UserIntentPayload > ,
113+ cancellationToken : CancellationToken ,
114+ ) : Promise < void > {
115+ const { payload, source } = event
116+ const { content } = payload
117+
118+ // Planning phase
119+ if ( cancellationToken . isCancelled )
120+ return
121+ this . taskManager . updateTaskStatus ( 'planning' )
122+ this . broadcastTaskStatus ( )
123+ this . deps . logger . log ( 'Orchestrator: Starting planning phase' )
124+
125+ const plan = await bot . planning . createPlan ( content )
126+ this . taskManager . setTaskPlan ( plan )
127+ this . deps . logger . withFields ( { steps : plan . steps . length } ) . log ( 'Orchestrator: Plan created' )
128+
129+ // Execution phase
130+ if ( cancellationToken . isCancelled )
131+ return
132+ this . taskManager . updateTaskStatus ( 'executing' )
133+ this . broadcastTaskStatus ( )
134+ this . deps . logger . log ( 'Orchestrator: Executing plan' )
135+
136+ await bot . planning . executePlan ( plan , cancellationToken )
137+ this . deps . logger . log ( 'Orchestrator: Plan executed successfully' )
138+
139+ // Response generation phase
140+ if ( cancellationToken . isCancelled )
141+ return
142+ this . taskManager . updateTaskStatus ( 'responding' )
143+ this . broadcastTaskStatus ( )
144+ this . deps . logger . log ( 'Orchestrator: Generating response' )
145+
146+ const statusPrompt = await generateStatusPrompt ( bot )
147+ const taskContext = this . taskManager . getTaskContextForLLM ( )
148+
149+ const response = await this . deps . neuri . handleStateless (
150+ [
151+ ...bot . memory . chatHistory ,
152+ system ( statusPrompt ) ,
153+ system ( `Task Context: ${ taskContext } ` ) ,
154+ ] ,
155+ async ( c : NeuriContext ) => {
156+ this . deps . logger . log ( 'Orchestrator: thinking...' )
157+ return withRetry < NeuriContext , string > (
158+ ctx => handleLLMCompletion ( ctx , bot , this . deps . logger ) ,
159+ {
160+ retry : 3 ,
161+ retryDelay : 1000 ,
162+ } ,
163+ ) ( c )
164+ } ,
165+ )
166+
167+ // Reply
168+ if ( cancellationToken . isCancelled )
169+ return
170+ if ( response ) {
171+ this . deps . logger . withFields ( { response } ) . log ( 'Orchestrator: Responded' )
172+ if ( source . reply ) {
173+ source . reply ( response )
174+ }
175+ else {
176+ bot . bot . chat ( response )
177+ }
178+ }
179+ }
180+
181+ private shouldCancelCurrentTask ( event : BotEvent < UserIntentPayload > ) : boolean {
182+ // High priority events should cancel current task
183+ const HIGH_PRIORITY_THRESHOLD = 8
184+ return ( event . priority ?? 5 ) >= HIGH_PRIORITY_THRESHOLD
185+ }
186+
187+ private async processNextQueuedEvent ( bot : MineflayerWithAgents ) : Promise < void > {
188+ // Prevent concurrent queue processing
189+ if ( this . isProcessingQueue || this . eventQueue . length === 0 ) {
190+ return
191+ }
192+
193+ this . isProcessingQueue = true
194+ const nextEvent = this . eventQueue . shift ( )
195+
196+ if ( nextEvent ) {
197+ this . deps . logger . log ( 'Orchestrator: Processing next queued event' )
198+ await this . handleUserIntent ( bot , nextEvent )
102199 }
200+
201+ this . isProcessingQueue = false
202+ }
203+
204+ private broadcastTaskStatus ( ) : void {
205+ const debugServer = DebugServer . getInstance ( )
206+ debugServer . broadcast ( 'task-status' , {
207+ currentTask : this . taskManager . getCurrentTask ( ) ,
208+ queueSize : this . eventQueue . length ,
209+ queue : this . eventQueue ,
210+ history : this . taskManager . getTaskHistory ( ) ,
211+ } )
103212 }
104213}
0 commit comments