Files
dreamweaver/backend/app/services/admin_evaluation_analytics.py

205 lines
6.8 KiB
Python

"""Admin-only analytics for internal generation evaluation events."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models import GenerationJob, GenerationJobEvent
def _as_float(value: Any) -> float | None:
if isinstance(value, int | float):
return float(value)
return None
def _sorted_count_buckets(counts: dict[str, int], *, key_name: str) -> list[dict[str, Any]]:
return [
{key_name: name, "count": count}
for name, count in sorted(
counts.items(),
key=lambda item: (-item[1], item[0]),
)
]
def _average_bucket(
totals: dict[str, float],
counts: dict[str, int],
*,
key_name: str,
) -> list[dict[str, Any]]:
rows = [
{
key_name: name,
"average_score": round(totals[name] / counts[name], 4),
"count": counts[name],
}
for name in totals
if counts.get(name)
]
rows.sort(key=lambda item: (-int(item["count"]), str(item[key_name])))
return rows
def _score_band(score: float) -> str:
if score >= 0.9:
return "excellent"
if score >= 0.8:
return "good"
if score >= 0.7:
return "pass"
if score > 0:
return "blocked_low_score"
return "blocked_quality_gate"
def _metadata_scores(metadata: dict[str, Any]) -> list[dict[str, Any]]:
raw_scores = metadata.get("scores")
if not isinstance(raw_scores, list):
return []
return [score for score in raw_scores if isinstance(score, dict)]
def _quality_gate_issues(metadata: dict[str, Any]) -> list[dict[str, Any]]:
quality_gate = metadata.get("quality_gate")
if not isinstance(quality_gate, dict):
return []
raw_issues = quality_gate.get("issues")
if not isinstance(raw_issues, list):
return []
return [issue for issue in raw_issues if isinstance(issue, dict)]
async def get_admin_evaluation_analytics(
db: AsyncSession,
*,
days: int | None = None,
artifact: str | None = None,
) -> dict[str, Any]:
"""Aggregate internal evaluation results for the admin control plane."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days) if days is not None else None
query = (
select(GenerationJobEvent, GenerationJob)
.join(GenerationJob, GenerationJobEvent.job_id == GenerationJob.id)
.where(GenerationJobEvent.event_type == "evaluation_completed")
.order_by(GenerationJobEvent.id)
)
if cutoff is not None:
query = query.where(GenerationJobEvent.created_at >= cutoff)
rows = (await db.execute(query)).all()
total_evaluations = 0
passed_evaluations = 0
blocked_evaluations = 0
score_total = 0.0
score_count = 0
job_ids: set[str] = set()
story_ids: set[int] = set()
user_ids: set[str] = set()
artifacts: dict[str, int] = {}
output_modes: dict[str, int] = {}
score_bands: dict[str, int] = {}
dimension_totals: dict[str, float] = {}
dimension_counts: dict[str, int] = {}
quality_gate_codes: dict[str, int] = {}
failure_categories: dict[str, int] = {}
warning_counts: dict[str, int] = {}
for event, job in rows:
metadata = event.event_metadata or {}
event_artifact = str(metadata.get("artifact") or "unknown")
if artifact is not None and event_artifact != artifact:
continue
total_evaluations += 1
job_ids.add(job.id)
user_ids.add(job.user_id)
if event.story_id is not None:
story_ids.add(int(event.story_id))
elif job.story_id is not None:
story_ids.add(int(job.story_id))
artifacts[event_artifact] = artifacts.get(event_artifact, 0) + 1
output_modes[job.output_mode] = output_modes.get(job.output_mode, 0) + 1
passed = metadata.get("passed") is True
blocking = metadata.get("blocking") is True
if passed:
passed_evaluations += 1
if blocking:
blocked_evaluations += 1
overall_score = _as_float(metadata.get("overall_score"))
if overall_score is not None:
score_total += overall_score
score_count += 1
band = _score_band(overall_score)
score_bands[band] = score_bands.get(band, 0) + 1
for score in _metadata_scores(metadata):
dimension = score.get("dimension")
dimension_score = _as_float(score.get("score"))
if not isinstance(dimension, str) or dimension_score is None:
continue
dimension_totals[dimension] = dimension_totals.get(dimension, 0.0) + dimension_score
dimension_counts[dimension] = dimension_counts.get(dimension, 0) + 1
for issue in _quality_gate_issues(metadata):
code = issue.get("code")
if isinstance(code, str) and code:
quality_gate_codes[code] = quality_gate_codes.get(code, 0) + 1
failure_category = issue.get("failure_category")
if isinstance(failure_category, str) and failure_category:
failure_categories[failure_category] = (
failure_categories.get(failure_category, 0) + 1
)
warnings = metadata.get("warnings")
if isinstance(warnings, list):
for warning in warnings:
if isinstance(warning, str) and warning:
warning_counts[warning] = warning_counts.get(warning, 0) + 1
return {
"scope": "admin_internal_evaluations",
"window_days": days,
"artifact": artifact,
"total_evaluations": total_evaluations,
"passed_evaluations": passed_evaluations,
"blocked_evaluations": blocked_evaluations,
"pass_rate": (
round(passed_evaluations / total_evaluations, 4)
if total_evaluations
else 0.0
),
"average_score": round(score_total / score_count, 4) if score_count else None,
"job_count": len(job_ids),
"story_count": len(story_ids),
"user_count": len(user_ids),
"by_artifact": _sorted_count_buckets(artifacts, key_name="artifact"),
"by_output_mode": _sorted_count_buckets(output_modes, key_name="output_mode"),
"score_bands": _sorted_count_buckets(score_bands, key_name="band"),
"dimension_scores": _average_bucket(
dimension_totals,
dimension_counts,
key_name="dimension",
),
"quality_gate_issues": _sorted_count_buckets(
quality_gate_codes,
key_name="code",
),
"failure_categories": _sorted_count_buckets(
failure_categories,
key_name="category",
),
"warnings": _sorted_count_buckets(warning_counts, key_name="message"),
}