Files
dreamweaver/backend/app/services/admin_executor_coverage.py

148 lines
4.9 KiB
Python

"""Admin-only analytics for internal workflow executor coverage."""
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models import GenerationJob, GenerationJobEvent
def _as_int(value: Any) -> int:
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
return 0
def _sorted_count_buckets(counts: dict[str, int], *, key_name: str) -> list[dict[str, Any]]:
return [
{key_name: name, "count": count}
for name, count in sorted(
counts.items(),
key=lambda item: (-item[1], item[0]),
)
]
def _iter_strings(value: Any) -> Iterable[str]:
if not isinstance(value, list | tuple | set):
return
for item in value:
if isinstance(item, str) and item:
yield item
def summarize_executor_coverage_rows(
rows: Iterable[tuple[GenerationJobEvent, GenerationJob]],
*,
days: int | None = None,
plan_mode: str | None = None,
scope: str = "admin_internal_executor_coverage",
) -> dict[str, Any]:
"""Aggregate internal executor coverage rows into an admin-only summary."""
total_runs = 0
total_planned_tasks = 0
total_executed_tasks = 0
total_ignored_tasks = 0
job_ids: set[str] = set()
story_ids: set[int] = set()
user_ids: set[str] = set()
by_plan_mode: dict[str, int] = {}
by_output_mode: dict[str, int] = {}
executed_task_keys: dict[str, int] = {}
ignored_task_keys: dict[str, int] = {}
result_assets: dict[str, int] = {}
for event, job in rows:
metadata = event.event_metadata or {}
event_plan_mode = str(metadata.get("plan_mode") or "unknown")
if plan_mode is not None and event_plan_mode != plan_mode:
continue
total_runs += 1
job_ids.add(job.id)
user_ids.add(job.user_id)
if event.story_id is not None:
story_ids.add(int(event.story_id))
elif job.story_id is not None:
story_ids.add(int(job.story_id))
by_plan_mode[event_plan_mode] = by_plan_mode.get(event_plan_mode, 0) + 1
by_output_mode[job.output_mode] = by_output_mode.get(job.output_mode, 0) + 1
total_planned_tasks += _as_int(metadata.get("planned_task_count"))
total_executed_tasks += _as_int(metadata.get("executed_task_count"))
total_ignored_tasks += _as_int(metadata.get("ignored_task_count"))
for key in _iter_strings(metadata.get("executed_task_keys")):
executed_task_keys[key] = executed_task_keys.get(key, 0) + 1
for key in _iter_strings(metadata.get("ignored_task_keys")):
ignored_task_keys[key] = ignored_task_keys.get(key, 0) + 1
for asset in _iter_strings(metadata.get("result_assets")):
result_assets[asset] = result_assets.get(asset, 0) + 1
coverage_ratio = (
round(total_executed_tasks / total_planned_tasks, 4)
if total_planned_tasks
else 0.0
)
return {
"scope": scope,
"window_days": days,
"plan_mode": plan_mode,
"total_runs": total_runs,
"total_planned_tasks": total_planned_tasks,
"total_executed_tasks": total_executed_tasks,
"total_ignored_tasks": total_ignored_tasks,
"coverage_ratio": coverage_ratio,
"job_count": len(job_ids),
"story_count": len(story_ids),
"user_count": len(user_ids),
"by_plan_mode": _sorted_count_buckets(by_plan_mode, key_name="plan_mode"),
"by_output_mode": _sorted_count_buckets(by_output_mode, key_name="output_mode"),
"executed_task_keys": _sorted_count_buckets(
executed_task_keys,
key_name="task_key",
),
"ignored_task_keys": _sorted_count_buckets(
ignored_task_keys,
key_name="task_key",
),
"result_assets": _sorted_count_buckets(result_assets, key_name="asset"),
}
async def get_admin_executor_coverage(
db: AsyncSession,
*,
days: int | None = None,
plan_mode: str | None = None,
) -> dict[str, Any]:
"""Aggregate internal executor coverage events for the admin control plane."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days) if days is not None else None
query = (
select(GenerationJobEvent, GenerationJob)
.join(GenerationJob, GenerationJobEvent.job_id == GenerationJob.id)
.where(GenerationJobEvent.event_type == "executor_completed")
.order_by(GenerationJobEvent.id)
)
if cutoff is not None:
query = query.where(GenerationJobEvent.created_at >= cutoff)
rows = (await db.execute(query)).all()
return summarize_executor_coverage_rows(rows, days=days, plan_mode=plan_mode)