22 ApprovalRequestId ,
33 type ChatAttachment ,
44 type OrchestrationEvent ,
5+ ThreadId ,
56} from "@t3tools/contracts" ;
67import { Effect , FileSystem , Layer , Option , Path , Stream } from "effect" ;
78import * as SqlClient from "effect/unstable/sql/SqlClient" ;
@@ -89,6 +90,77 @@ function extractActivityRequestId(payload: unknown): ApprovalRequestId | null {
8990 return typeof requestId === "string" ? ApprovalRequestId . make ( requestId ) : null ;
9091}
9192
93+ function derivePendingUserInputCountFromActivities (
94+ activities : ReadonlyArray < ProjectionThreadActivity > ,
95+ ) : number {
96+ const openRequestIds = new Set < string > ( ) ;
97+ const ordered = [ ...activities ] . toSorted (
98+ ( left , right ) =>
99+ left . createdAt . localeCompare ( right . createdAt ) ||
100+ left . activityId . localeCompare ( right . activityId ) ,
101+ ) ;
102+
103+ for ( const activity of ordered ) {
104+ const requestId = extractActivityRequestId ( activity . payload ) ;
105+ if ( requestId === null ) {
106+ continue ;
107+ }
108+ const payload =
109+ typeof activity . payload === "object" && activity . payload !== null
110+ ? ( activity . payload as Record < string , unknown > )
111+ : null ;
112+ const detail = typeof payload ?. detail === "string" ? payload . detail . toLowerCase ( ) : null ;
113+
114+ if ( activity . kind === "user-input.requested" ) {
115+ openRequestIds . add ( requestId ) ;
116+ continue ;
117+ }
118+
119+ if ( activity . kind === "user-input.resolved" ) {
120+ openRequestIds . delete ( requestId ) ;
121+ continue ;
122+ }
123+
124+ if (
125+ activity . kind === "provider.user-input.respond.failed" &&
126+ detail !== null &&
127+ ( detail . includes ( "stale pending user-input request" ) ||
128+ detail . includes ( "unknown pending user-input request" ) )
129+ ) {
130+ openRequestIds . delete ( requestId ) ;
131+ }
132+ }
133+
134+ return openRequestIds . size ;
135+ }
136+
137+ function deriveHasActionableProposedPlan ( input : {
138+ readonly latestTurnId : string | null ;
139+ readonly proposedPlans : ReadonlyArray < ProjectionThreadProposedPlan > ;
140+ } ) : boolean {
141+ const sorted = [ ...input . proposedPlans ] . toSorted (
142+ ( left , right ) =>
143+ left . updatedAt . localeCompare ( right . updatedAt ) || left . planId . localeCompare ( right . planId ) ,
144+ ) ;
145+
146+ let latestForTurn : ProjectionThreadProposedPlan | null = null ;
147+ if ( input . latestTurnId !== null ) {
148+ for ( let index = sorted . length - 1 ; index >= 0 ; index -= 1 ) {
149+ const plan = sorted [ index ] ;
150+ if ( plan ?. turnId === input . latestTurnId ) {
151+ latestForTurn = plan ;
152+ break ;
153+ }
154+ }
155+ }
156+ if ( latestForTurn !== null ) {
157+ return latestForTurn . implementedAt === null ;
158+ }
159+
160+ const latestPlan = sorted . at ( - 1 ) ?? null ;
161+ return latestPlan !== null && latestPlan . implementedAt === null ;
162+ }
163+
92164function retainProjectionMessagesAfterRevert (
93165 messages : ReadonlyArray < ProjectionThreadMessage > ,
94166 turns : ReadonlyArray < ProjectionTurn > ,
@@ -432,6 +504,48 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
432504 }
433505 } ) ;
434506
507+ const refreshThreadShellSummary = Effect . fn ( "refreshThreadShellSummary" ) ( function * (
508+ threadId : ThreadId ,
509+ ) {
510+ const existingRow = yield * projectionThreadRepository . getById ( {
511+ threadId,
512+ } ) ;
513+ if ( Option . isNone ( existingRow ) ) {
514+ return ;
515+ }
516+
517+ const [ messages , proposedPlans , activities , pendingApprovals ] = yield * Effect . all ( [
518+ projectionThreadMessageRepository . listByThreadId ( { threadId } ) ,
519+ projectionThreadProposedPlanRepository . listByThreadId ( { threadId } ) ,
520+ projectionThreadActivityRepository . listByThreadId ( { threadId } ) ,
521+ projectionPendingApprovalRepository . listByThreadId ( { threadId } ) ,
522+ ] ) ;
523+
524+ const latestUserMessageAt =
525+ messages
526+ . filter ( ( message ) => message . role === "user" )
527+ . map ( ( message ) => message . createdAt )
528+ . toSorted ( )
529+ . at ( - 1 ) ?? null ;
530+
531+ const pendingApprovalCount = pendingApprovals . filter (
532+ ( approval ) => approval . status === "pending" ,
533+ ) . length ;
534+ const pendingUserInputCount = derivePendingUserInputCountFromActivities ( activities ) ;
535+ const hasActionableProposedPlan = deriveHasActionableProposedPlan ( {
536+ latestTurnId : existingRow . value . latestTurnId ,
537+ proposedPlans,
538+ } ) ;
539+
540+ yield * projectionThreadRepository . upsert ( {
541+ ...existingRow . value ,
542+ latestUserMessageAt,
543+ pendingApprovalCount,
544+ pendingUserInputCount,
545+ hasActionableProposedPlan : hasActionableProposedPlan ? 1 : 0 ,
546+ } ) ;
547+ } ) ;
548+
435549 const applyThreadsProjection : ProjectorDefinition [ "apply" ] = Effect . fn (
436550 "applyThreadsProjection" ,
437551 ) ( function * ( event , attachmentSideEffects ) {
@@ -450,6 +564,10 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
450564 createdAt : event . payload . createdAt ,
451565 updatedAt : event . payload . updatedAt ,
452566 archivedAt : null ,
567+ latestUserMessageAt : null ,
568+ pendingApprovalCount : 0 ,
569+ pendingUserInputCount : 0 ,
570+ hasActionableProposedPlan : 0 ,
453571 deletedAt : null ,
454572 } ) ;
455573 return ;
@@ -554,7 +672,9 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
554672
555673 case "thread.message-sent" :
556674 case "thread.proposed-plan-upserted" :
557- case "thread.activity-appended" : {
675+ case "thread.activity-appended" :
676+ case "thread.approval-response-requested" :
677+ case "thread.user-input-response-requested" : {
558678 const existingRow = yield * projectionThreadRepository . getById ( {
559679 threadId : event . payload . threadId ,
560680 } ) ;
@@ -565,6 +685,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
565685 ...existingRow . value ,
566686 updatedAt : event . occurredAt ,
567687 } ) ;
688+ yield * refreshThreadShellSummary ( event . payload . threadId ) ;
568689 return ;
569690 }
570691
@@ -580,6 +701,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
580701 latestTurnId : event . payload . session . activeTurnId ,
581702 updatedAt : event . occurredAt ,
582703 } ) ;
704+ yield * refreshThreadShellSummary ( event . payload . threadId ) ;
583705 return ;
584706 }
585707
@@ -595,6 +717,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
595717 latestTurnId : event . payload . turnId ,
596718 updatedAt : event . occurredAt ,
597719 } ) ;
720+ yield * refreshThreadShellSummary ( event . payload . threadId ) ;
598721 return ;
599722 }
600723
@@ -610,6 +733,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
610733 latestTurnId : null ,
611734 updatedAt : event . occurredAt ,
612735 } ) ;
736+ yield * refreshThreadShellSummary ( event . payload . threadId ) ;
613737 return ;
614738 }
615739
0 commit comments