"""Admin-only generation trace detail service.""" from __future__ import annotations from typing import Any from fastapi import HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.db.models import GenerationJob, GenerationJobEvent from app.services.admin_executor_coverage import summarize_executor_coverage_rows from app.services.generation_jobs import ( generation_event_to_response, generation_job_to_summary, ) async def get_admin_generation_job_trace( db: AsyncSession, *, job_id: str, ) -> dict[str, Any]: """Return a complete internal generation trace for the admin control plane.""" job = ( await db.execute(select(GenerationJob).where(GenerationJob.id == job_id)) ).scalar_one_or_none() if job is None: raise HTTPException(status_code=404, detail="Generation job not found") events = ( await db.execute( select(GenerationJobEvent) .where(GenerationJobEvent.job_id == job.id) .order_by(GenerationJobEvent.id) ) ).scalars().all() executor_rows = [ (event, job) for event in events if event.event_type == "executor_completed" ] return { **generation_job_to_summary(job), "user_id": job.user_id, "request_payload": job.request_payload or {}, "executor_coverage": summarize_executor_coverage_rows( executor_rows, scope="admin_internal_job_executor_coverage", ), "events": [generation_event_to_response(event) for event in events], }