diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 0796cb4905..7a08765647 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -1505,6 +1505,329 @@ it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => { }), ); + it.effect("clears stale pending approvals from projected shell summaries", () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + yield* appendAndProject({ + type: "project.created", + eventId: EventId.make("evt-stale-approval-1"), + aggregateKind: "project", + aggregateId: ProjectId.make("project-stale-approval"), + occurredAt: "2026-02-26T12:30:00.000Z", + commandId: CommandId.make("cmd-stale-approval-1"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-stale-approval-1"), + metadata: {}, + payload: { + projectId: ProjectId.make("project-stale-approval"), + title: "Project Stale Approval", + workspaceRoot: "/tmp/project-stale-approval", + defaultModelSelection: null, + scripts: [], + createdAt: "2026-02-26T12:30:00.000Z", + updatedAt: "2026-02-26T12:30:00.000Z", + }, + }); + + yield* appendAndProject({ + type: "thread.created", + eventId: EventId.make("evt-stale-approval-2"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-stale-approval"), + occurredAt: "2026-02-26T12:30:01.000Z", + commandId: CommandId.make("cmd-stale-approval-2"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-stale-approval-2"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-stale-approval"), + projectId: ProjectId.make("project-stale-approval"), + title: "Thread Stale Approval", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + createdAt: "2026-02-26T12:30:01.000Z", + updatedAt: "2026-02-26T12:30:01.000Z", + }, + }); + + yield* appendAndProject({ + type: "thread.activity-appended", + eventId: EventId.make("evt-stale-approval-3"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-stale-approval"), + occurredAt: "2026-02-26T12:30:02.000Z", + commandId: CommandId.make("cmd-stale-approval-3"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-stale-approval-3"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-stale-approval"), + activity: { + id: EventId.make("activity-stale-approval-requested"), + tone: "approval", + kind: "approval.requested", + summary: "Command approval requested", + payload: { + requestId: "approval-request-stale-1", + requestKind: "command", + }, + turnId: null, + createdAt: "2026-02-26T12:30:02.000Z", + }, + }, + }); + + yield* appendAndProject({ + type: "thread.activity-appended", + eventId: EventId.make("evt-stale-approval-4"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-stale-approval"), + occurredAt: "2026-02-26T12:30:03.000Z", + commandId: CommandId.make("cmd-stale-approval-4"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-stale-approval-4"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-stale-approval"), + activity: { + id: EventId.make("activity-stale-approval-failed"), + tone: "error", + kind: "provider.approval.respond.failed", + summary: "Provider approval response failed", + payload: { + requestId: "approval-request-stale-1", + detail: "Unknown pending permission request: approval-request-stale-1", + }, + turnId: null, + createdAt: "2026-02-26T12:30:03.000Z", + }, + }, + }); + + const approvalRows = yield* sql<{ + readonly requestId: string; + readonly status: string; + readonly resolvedAt: string | null; + }>` + SELECT + request_id AS "requestId", + status, + resolved_at AS "resolvedAt" + FROM projection_pending_approvals + WHERE request_id = 'approval-request-stale-1' + `; + assert.deepEqual(approvalRows, [ + { + requestId: "approval-request-stale-1", + status: "resolved", + resolvedAt: "2026-02-26T12:30:03.000Z", + }, + ]); + + const threadRows = yield* sql<{ + readonly pendingApprovalCount: number; + }>` + SELECT pending_approval_count AS "pendingApprovalCount" + FROM projection_threads + WHERE thread_id = 'thread-stale-approval' + `; + assert.deepEqual(threadRows, [{ pendingApprovalCount: 0 }]); + }), + ); + + it.effect("ignores non-stale provider approval response failures", () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + yield* appendAndProject({ + type: "project.created", + eventId: EventId.make("evt-nonstale-approval-1"), + aggregateKind: "project", + aggregateId: ProjectId.make("project-nonstale-approval"), + occurredAt: "2026-02-26T12:45:00.000Z", + commandId: CommandId.make("cmd-nonstale-approval-1"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-nonstale-approval-1"), + metadata: {}, + payload: { + projectId: ProjectId.make("project-nonstale-approval"), + title: "Project Non-Stale Approval", + workspaceRoot: "/tmp/project-nonstale-approval", + defaultModelSelection: null, + scripts: [], + createdAt: "2026-02-26T12:45:00.000Z", + updatedAt: "2026-02-26T12:45:00.000Z", + }, + }); + + yield* appendAndProject({ + type: "thread.created", + eventId: EventId.make("evt-nonstale-approval-2"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-nonstale-approval"), + occurredAt: "2026-02-26T12:45:01.000Z", + commandId: CommandId.make("cmd-nonstale-approval-2"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-nonstale-approval-2"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-nonstale-approval"), + projectId: ProjectId.make("project-nonstale-approval"), + title: "Thread Non-Stale Approval", + modelSelection: { + provider: "codex", + model: "gpt-5-codex", + }, + runtimeMode: "approval-required", + interactionMode: "default", + branch: null, + worktreePath: null, + createdAt: "2026-02-26T12:45:01.000Z", + updatedAt: "2026-02-26T12:45:01.000Z", + }, + }); + + yield* appendAndProject({ + type: "thread.activity-appended", + eventId: EventId.make("evt-nonstale-approval-3"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-nonstale-approval"), + occurredAt: "2026-02-26T12:45:02.000Z", + commandId: CommandId.make("cmd-nonstale-approval-3"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-nonstale-approval-3"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-nonstale-approval"), + activity: { + id: EventId.make("activity-nonstale-approval-requested"), + tone: "approval", + kind: "approval.requested", + summary: "Command approval requested", + payload: { + requestId: "approval-request-nonstale-existing", + requestKind: "command", + }, + turnId: null, + createdAt: "2026-02-26T12:45:02.000Z", + }, + }, + }); + + yield* appendAndProject({ + type: "thread.activity-appended", + eventId: EventId.make("evt-nonstale-approval-4"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-nonstale-approval"), + occurredAt: "2026-02-26T12:45:03.000Z", + commandId: CommandId.make("cmd-nonstale-approval-4"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-nonstale-approval-4"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-nonstale-approval"), + activity: { + id: EventId.make("activity-nonstale-approval-failed-existing"), + tone: "error", + kind: "provider.approval.respond.failed", + summary: "Provider approval response failed", + payload: { + requestId: "approval-request-nonstale-existing", + detail: "Provider timed out while responding to approval request", + }, + turnId: TurnId.make("turn-nonstale-failure"), + createdAt: "2026-02-26T12:45:03.000Z", + }, + }, + }); + + yield* appendAndProject({ + type: "thread.activity-appended", + eventId: EventId.make("evt-nonstale-approval-5"), + aggregateKind: "thread", + aggregateId: ThreadId.make("thread-nonstale-approval"), + occurredAt: "2026-02-26T12:45:04.000Z", + commandId: CommandId.make("cmd-nonstale-approval-5"), + causationEventId: null, + correlationId: CorrelationId.make("cmd-nonstale-approval-5"), + metadata: {}, + payload: { + threadId: ThreadId.make("thread-nonstale-approval"), + activity: { + id: EventId.make("activity-nonstale-approval-failed-missing"), + tone: "error", + kind: "provider.approval.respond.failed", + summary: "Provider approval response failed", + payload: { + requestId: "approval-request-nonstale-missing", + detail: "Provider timed out while responding to approval request", + }, + turnId: null, + createdAt: "2026-02-26T12:45:04.000Z", + }, + }, + }); + + const approvalRows = yield* sql<{ + readonly requestId: string; + readonly status: string; + readonly turnId: string | null; + readonly createdAt: string; + readonly resolvedAt: string | null; + }>` + SELECT + request_id AS "requestId", + status, + turn_id AS "turnId", + created_at AS "createdAt", + resolved_at AS "resolvedAt" + FROM projection_pending_approvals + WHERE request_id IN ( + 'approval-request-nonstale-existing', + 'approval-request-nonstale-missing' + ) + ORDER BY request_id + `; + assert.deepEqual(approvalRows, [ + { + requestId: "approval-request-nonstale-existing", + status: "pending", + turnId: null, + createdAt: "2026-02-26T12:45:02.000Z", + resolvedAt: null, + }, + ]); + + const threadRows = yield* sql<{ + readonly pendingApprovalCount: number; + }>` + SELECT pending_approval_count AS "pendingApprovalCount" + FROM projection_threads + WHERE thread_id = 'thread-nonstale-approval' + `; + assert.deepEqual(threadRows, [{ pendingApprovalCount: 1 }]); + }), + ); + it.effect("does not fallback-retain messages whose turnId is removed by revert", () => Effect.gen(function* () { const projectionPipeline = yield* OrchestrationProjectionPipeline; diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index d7e62c39bd..aa1109a4d1 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -90,6 +90,17 @@ function extractActivityRequestId(payload: unknown): ApprovalRequestId | null { return typeof requestId === "string" ? ApprovalRequestId.make(requestId) : null; } +function isStalePendingApprovalFailureDetail(detail: string | null): boolean { + if (detail === null) { + return false; + } + return ( + detail.includes("stale pending approval request") || + detail.includes("unknown pending approval request") || + detail.includes("unknown pending permission request") + ); +} + function derivePendingUserInputCountFromActivities( activities: ReadonlyArray, ): number { @@ -1245,6 +1256,34 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti }); return; } + if (event.payload.activity.kind === "provider.approval.respond.failed") { + const payload = + typeof event.payload.activity.payload === "object" && + event.payload.activity.payload !== null + ? (event.payload.activity.payload as Record) + : null; + const detail = + typeof payload?.detail === "string" ? payload.detail.toLowerCase() : null; + if (isStalePendingApprovalFailureDetail(detail)) { + if (Option.isNone(existingRow)) { + return; + } + if (existingRow.value.status === "resolved") { + return; + } + yield* projectionPendingApprovalRepository.upsert({ + requestId, + threadId: existingRow.value.threadId, + turnId: existingRow.value.turnId, + status: "resolved", + decision: null, + createdAt: existingRow.value.createdAt, + resolvedAt: event.payload.activity.createdAt, + }); + return; + } + return; + } if (Option.isSome(existingRow) && existingRow.value.status === "resolved") { return; } diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index 6c2885a372..01c649f7e9 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -36,6 +36,7 @@ import Migration0020 from "./Migrations/020_AuthAccessManagement.ts"; import Migration0021 from "./Migrations/021_AuthSessionClientMetadata.ts"; import Migration0022 from "./Migrations/022_AuthSessionLastConnectedAt.ts"; import Migration0023 from "./Migrations/023_ProjectionThreadShellSummary.ts"; +import Migration0024 from "./Migrations/024_BackfillProjectionThreadShellSummary.ts"; /** * Migration loader with all migrations defined inline. @@ -71,6 +72,7 @@ export const migrationEntries = [ [21, "AuthSessionClientMetadata", Migration0021], [22, "AuthSessionLastConnectedAt", Migration0022], [23, "ProjectionThreadShellSummary", Migration0023], + [24, "BackfillProjectionThreadShellSummary", Migration0024], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/024_BackfillProjectionThreadShellSummary.test.ts b/apps/server/src/persistence/Migrations/024_BackfillProjectionThreadShellSummary.test.ts new file mode 100644 index 0000000000..cc911d2469 --- /dev/null +++ b/apps/server/src/persistence/Migrations/024_BackfillProjectionThreadShellSummary.test.ts @@ -0,0 +1,218 @@ +import { assert, it } from "@effect/vitest"; +import { Effect, Layer } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +import { runMigrations } from "../Migrations.ts"; +import * as NodeSqliteClient from "../NodeSqliteClient.ts"; + +const layer = it.layer(Layer.mergeAll(NodeSqliteClient.layerMemory())); + +layer("024_BackfillProjectionThreadShellSummary", (it) => { + it.effect("backfills thread shell summary fields and clears stale projected approvals", () => + Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* runMigrations({ toMigrationInclusive: 23 }); + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + created_at, + updated_at, + archived_at, + latest_user_message_at, + pending_approval_count, + pending_user_input_count, + has_actionable_proposed_plan, + deleted_at + ) + VALUES ( + 'thread-1', + 'project-1', + 'Thread 1', + '{"provider":"codex","model":"gpt-5-codex"}', + 'approval-required', + 'plan', + NULL, + NULL, + 'turn-1', + '2026-02-24T00:00:00.000Z', + '2026-02-24T00:00:00.000Z', + NULL, + NULL, + 0, + 0, + 0, + NULL + ) + `; + + yield* sql` + INSERT INTO projection_thread_messages ( + message_id, + thread_id, + turn_id, + role, + text, + attachments_json, + is_streaming, + created_at, + updated_at + ) + VALUES ( + 'message-user-1', + 'thread-1', + 'turn-1', + 'user', + 'Need help', + NULL, + 0, + '2026-02-24T00:01:00.000Z', + '2026-02-24T00:01:00.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_thread_activities ( + activity_id, + thread_id, + turn_id, + tone, + kind, + summary, + payload_json, + sequence, + created_at + ) + VALUES + ( + 'activity-approval-requested', + 'thread-1', + 'turn-1', + 'approval', + 'approval.requested', + 'Command approval requested', + '{"requestId":"approval-1","requestKind":"command"}', + NULL, + '2026-02-24T00:02:00.000Z' + ), + ( + 'activity-approval-stale', + 'thread-1', + 'turn-1', + 'error', + 'provider.approval.respond.failed', + 'Provider approval response failed', + '{"requestId":"approval-1","detail":"Unknown pending permission request: approval-1"}', + NULL, + '2026-02-24T00:03:00.000Z' + ), + ( + 'activity-user-input-requested', + 'thread-1', + 'turn-1', + 'info', + 'user-input.requested', + 'User input requested', + '{"requestId":"input-1","questions":[{"id":"area","header":"Area","question":"Which repo area should I inspect next?","options":[{"label":"Server","description":"Server orchestration."}]}]}', + NULL, + '2026-02-24T00:04:00.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_thread_proposed_plans ( + plan_id, + thread_id, + turn_id, + plan_markdown, + implemented_at, + implementation_thread_id, + created_at, + updated_at + ) + VALUES ( + 'plan-1', + 'thread-1', + 'turn-1', + '# Do the thing', + NULL, + NULL, + '2026-02-24T00:05:00.000Z', + '2026-02-24T00:05:00.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_pending_approvals ( + request_id, + thread_id, + turn_id, + status, + decision, + created_at, + resolved_at + ) + VALUES ( + 'approval-1', + 'thread-1', + 'turn-1', + 'pending', + NULL, + '2026-02-24T00:02:00.000Z', + NULL + ) + `; + + yield* runMigrations({ toMigrationInclusive: 24 }); + + const threadRows = yield* sql<{ + readonly latestUserMessageAt: string | null; + readonly pendingApprovalCount: number; + readonly pendingUserInputCount: number; + readonly hasActionableProposedPlan: number; + }>` + SELECT + latest_user_message_at AS "latestUserMessageAt", + pending_approval_count AS "pendingApprovalCount", + pending_user_input_count AS "pendingUserInputCount", + has_actionable_proposed_plan AS "hasActionableProposedPlan" + FROM projection_threads + WHERE thread_id = 'thread-1' + `; + assert.deepStrictEqual(threadRows, [ + { + latestUserMessageAt: "2026-02-24T00:01:00.000Z", + pendingApprovalCount: 0, + pendingUserInputCount: 1, + hasActionableProposedPlan: 1, + }, + ]); + + const approvalRows = yield* sql<{ + readonly status: string; + readonly resolvedAt: string | null; + }>` + SELECT + status, + resolved_at AS "resolvedAt" + FROM projection_pending_approvals + WHERE request_id = 'approval-1' + `; + assert.deepStrictEqual(approvalRows, [ + { + status: "resolved", + resolvedAt: "2026-02-24T00:03:00.000Z", + }, + ]); + }), + ); +}); diff --git a/apps/server/src/persistence/Migrations/024_BackfillProjectionThreadShellSummary.ts b/apps/server/src/persistence/Migrations/024_BackfillProjectionThreadShellSummary.ts new file mode 100644 index 0000000000..549906dfb0 --- /dev/null +++ b/apps/server/src/persistence/Migrations/024_BackfillProjectionThreadShellSummary.ts @@ -0,0 +1,277 @@ +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as Effect from "effect/Effect"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + INSERT OR IGNORE INTO projection_pending_approvals ( + request_id, + thread_id, + turn_id, + status, + decision, + created_at, + resolved_at + ) + SELECT + requested.request_id, + requested.thread_id, + requested.turn_id, + 'pending', + NULL, + requested.created_at, + NULL + FROM ( + SELECT + json_extract(payload_json, '$.requestId') AS request_id, + thread_id, + turn_id, + created_at, + ROW_NUMBER() OVER ( + PARTITION BY json_extract(payload_json, '$.requestId') + ORDER BY created_at ASC, activity_id ASC + ) AS row_number + FROM projection_thread_activities + WHERE kind = 'approval.requested' + AND json_extract(payload_json, '$.requestId') IS NOT NULL + ) AS requested + WHERE requested.row_number = 1 + `; + + yield* sql` + WITH latest_resolutions AS ( + SELECT + resolved.request_id, + resolved.resolved_at, + resolved.decision + FROM ( + SELECT + json_extract(payload_json, '$.requestId') AS request_id, + created_at AS resolved_at, + CASE + WHEN json_extract(payload_json, '$.decision') IN ( + 'accept', + 'acceptForSession', + 'decline', + 'cancel' + ) + THEN json_extract(payload_json, '$.decision') + ELSE NULL + END AS decision, + ROW_NUMBER() OVER ( + PARTITION BY json_extract(payload_json, '$.requestId') + ORDER BY created_at DESC, activity_id DESC + ) AS row_number + FROM projection_thread_activities + WHERE kind = 'approval.resolved' + AND json_extract(payload_json, '$.requestId') IS NOT NULL + ) AS resolved + WHERE resolved.row_number = 1 + ) + UPDATE projection_pending_approvals + SET + status = 'resolved', + decision = ( + SELECT latest_resolutions.decision + FROM latest_resolutions + WHERE latest_resolutions.request_id = projection_pending_approvals.request_id + ), + resolved_at = ( + SELECT latest_resolutions.resolved_at + FROM latest_resolutions + WHERE latest_resolutions.request_id = projection_pending_approvals.request_id + ) + WHERE EXISTS ( + SELECT 1 + FROM latest_resolutions + WHERE latest_resolutions.request_id = projection_pending_approvals.request_id + ) + `; + + yield* sql` + WITH latest_response_events AS ( + SELECT + response.request_id, + response.resolved_at, + response.decision + FROM ( + SELECT + json_extract(payload_json, '$.requestId') AS request_id, + occurred_at AS resolved_at, + CASE + WHEN json_extract(payload_json, '$.decision') IN ( + 'accept', + 'acceptForSession', + 'decline', + 'cancel' + ) + THEN json_extract(payload_json, '$.decision') + ELSE NULL + END AS decision, + ROW_NUMBER() OVER ( + PARTITION BY json_extract(payload_json, '$.requestId') + ORDER BY occurred_at DESC, sequence DESC + ) AS row_number + FROM orchestration_events + WHERE event_type = 'thread.approval-response-requested' + AND json_extract(payload_json, '$.requestId') IS NOT NULL + ) AS response + WHERE response.row_number = 1 + ) + UPDATE projection_pending_approvals + SET + status = 'resolved', + decision = ( + SELECT latest_response_events.decision + FROM latest_response_events + WHERE latest_response_events.request_id = projection_pending_approvals.request_id + ), + resolved_at = ( + SELECT latest_response_events.resolved_at + FROM latest_response_events + WHERE latest_response_events.request_id = projection_pending_approvals.request_id + ) + WHERE EXISTS ( + SELECT 1 + FROM latest_response_events + WHERE latest_response_events.request_id = projection_pending_approvals.request_id + ) + `; + + yield* sql` + WITH latest_stale_failures AS ( + SELECT + failure.request_id, + failure.resolved_at + FROM ( + SELECT + json_extract(payload_json, '$.requestId') AS request_id, + created_at AS resolved_at, + ROW_NUMBER() OVER ( + PARTITION BY json_extract(payload_json, '$.requestId') + ORDER BY created_at DESC, activity_id DESC + ) AS row_number + FROM projection_thread_activities + WHERE kind = 'provider.approval.respond.failed' + AND json_extract(payload_json, '$.requestId') IS NOT NULL + AND ( + lower(COALESCE(json_extract(payload_json, '$.detail'), '')) + LIKE '%stale pending approval request%' + OR lower(COALESCE(json_extract(payload_json, '$.detail'), '')) + LIKE '%unknown pending approval request%' + OR lower(COALESCE(json_extract(payload_json, '$.detail'), '')) + LIKE '%unknown pending permission request%' + ) + ) AS failure + WHERE failure.row_number = 1 + ) + UPDATE projection_pending_approvals + SET + status = 'resolved', + decision = NULL, + resolved_at = ( + SELECT latest_stale_failures.resolved_at + FROM latest_stale_failures + WHERE latest_stale_failures.request_id = projection_pending_approvals.request_id + ) + WHERE status = 'pending' + AND EXISTS ( + SELECT 1 + FROM latest_stale_failures + WHERE latest_stale_failures.request_id = projection_pending_approvals.request_id + ) + `; + + yield* sql` + UPDATE projection_threads + SET + latest_user_message_at = ( + SELECT MAX(message.created_at) + FROM projection_thread_messages AS message + WHERE message.thread_id = projection_threads.thread_id + AND message.role = 'user' + ), + pending_approval_count = COALESCE(( + SELECT COUNT(*) + FROM projection_pending_approvals + WHERE projection_pending_approvals.thread_id = projection_threads.thread_id + AND projection_pending_approvals.status = 'pending' + ), 0), + pending_user_input_count = COALESCE(( + WITH latest_user_input_states AS ( + SELECT + latest.request_id, + latest.kind, + latest.detail + FROM ( + SELECT + json_extract(activity.payload_json, '$.requestId') AS request_id, + activity.kind, + lower(COALESCE(json_extract(activity.payload_json, '$.detail'), '')) AS detail, + ROW_NUMBER() OVER ( + PARTITION BY json_extract(activity.payload_json, '$.requestId') + ORDER BY activity.created_at DESC, activity.activity_id DESC + ) AS row_number + FROM projection_thread_activities AS activity + WHERE activity.thread_id = projection_threads.thread_id + AND json_extract(activity.payload_json, '$.requestId') IS NOT NULL + AND activity.kind IN ( + 'user-input.requested', + 'user-input.resolved', + 'provider.user-input.respond.failed' + ) + ) AS latest + WHERE latest.row_number = 1 + ) + SELECT COUNT(*) + FROM latest_user_input_states + WHERE latest_user_input_states.kind = 'user-input.requested' + OR ( + latest_user_input_states.kind = 'provider.user-input.respond.failed' + AND latest_user_input_states.detail NOT LIKE '%stale pending user-input request%' + AND latest_user_input_states.detail NOT LIKE '%unknown pending user-input request%' + ) + ), 0), + has_actionable_proposed_plan = COALESCE(( + SELECT CASE + WHEN projection_threads.latest_turn_id IS NOT NULL + AND EXISTS ( + SELECT 1 + FROM projection_thread_proposed_plans AS latest_turn_plan_exists + WHERE latest_turn_plan_exists.thread_id = projection_threads.thread_id + AND latest_turn_plan_exists.turn_id = projection_threads.latest_turn_id + ) + THEN CASE + WHEN ( + SELECT latest_turn_plan.implemented_at + FROM projection_thread_proposed_plans AS latest_turn_plan + WHERE latest_turn_plan.thread_id = projection_threads.thread_id + AND latest_turn_plan.turn_id = projection_threads.latest_turn_id + ORDER BY latest_turn_plan.updated_at DESC, latest_turn_plan.plan_id DESC + LIMIT 1 + ) IS NULL + THEN 1 + ELSE 0 + END + WHEN EXISTS ( + SELECT 1 + FROM projection_thread_proposed_plans AS any_plan + WHERE any_plan.thread_id = projection_threads.thread_id + ) + THEN CASE + WHEN ( + SELECT latest_plan.implemented_at + FROM projection_thread_proposed_plans AS latest_plan + WHERE latest_plan.thread_id = projection_threads.thread_id + ORDER BY latest_plan.updated_at DESC, latest_plan.plan_id DESC + LIMIT 1 + ) IS NULL + THEN 1 + ELSE 0 + END + ELSE 0 + END + ), 0) + `; +});