"""Child profile APIs.""" from datetime import date from typing import Literal from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.deps import require_user from app.db.database import get_db from app.db.models import ChildProfile, Story, StoryUniverse, User router = APIRouter() MAX_PROFILES_PER_USER = 5 class ChildProfileCreate(BaseModel): """Create profile payload.""" name: str = Field(..., min_length=1, max_length=50) birth_date: date | None = None gender: str | None = Field(default=None, pattern="^(male|female|other)$") interests: list[str] = Field(default_factory=list) growth_themes: list[str] = Field(default_factory=list) avatar_url: str | None = None class ChildProfileUpdate(BaseModel): """Update profile payload.""" name: str | None = Field(default=None, min_length=1, max_length=50) birth_date: date | None = None gender: str | None = Field(default=None, pattern="^(male|female|other)$") interests: list[str] | None = None growth_themes: list[str] | None = None avatar_url: str | None = None class ChildProfileResponse(BaseModel): """Profile response.""" id: str name: str avatar_url: str | None birth_date: date | None gender: str | None age: int | None interests: list[str] growth_themes: list[str] stories_count: int total_reading_time: int class Config: from_attributes = True class ChildProfileListResponse(BaseModel): """Profile list response.""" profiles: list[ChildProfileResponse] total: int class TimelineEvent(BaseModel): """Timeline event item.""" date: str type: Literal["story", "achievement", "milestone"] title: str description: str | None = None image_url: str | None = None metadata: dict | None = None class TimelineResponse(BaseModel): """Timeline response.""" events: list[TimelineEvent] @router.get("/profiles", response_model=ChildProfileListResponse) async def list_profiles( user: User = Depends(require_user), db: AsyncSession = Depends(get_db), ): """List child profiles for current user.""" result = await db.execute( select(ChildProfile) .where(ChildProfile.user_id == user.id) .order_by(ChildProfile.created_at.desc()) ) profiles = result.scalars().all() return ChildProfileListResponse(profiles=profiles, total=len(profiles)) @router.post("/profiles", response_model=ChildProfileResponse, status_code=status.HTTP_201_CREATED) async def create_profile( payload: ChildProfileCreate, user: User = Depends(require_user), db: AsyncSession = Depends(get_db), ): """Create a new child profile.""" count = await db.scalar( select(func.count(ChildProfile.id)).where(ChildProfile.user_id == user.id) ) if count and count >= MAX_PROFILES_PER_USER: raise HTTPException(status_code=400, detail="最多只能创建 5 个孩子档案") existing = await db.scalar( select(ChildProfile.id).where( ChildProfile.user_id == user.id, ChildProfile.name == payload.name, ) ) if existing: raise HTTPException(status_code=409, detail="该档案名称已存在") profile = ChildProfile(user_id=user.id, **payload.model_dump()) db.add(profile) await db.commit() await db.refresh(profile) return profile @router.get("/profiles/{profile_id}", response_model=ChildProfileResponse) async def get_profile( profile_id: str, user: User = Depends(require_user), db: AsyncSession = Depends(get_db), ): """Get one child profile.""" result = await db.execute( select(ChildProfile).where( ChildProfile.id == profile_id, ChildProfile.user_id == user.id, ) ) profile = result.scalar_one_or_none() if not profile: raise HTTPException(status_code=404, detail="档案不存在") return profile @router.put("/profiles/{profile_id}", response_model=ChildProfileResponse) async def update_profile( profile_id: str, payload: ChildProfileUpdate, user: User = Depends(require_user), db: AsyncSession = Depends(get_db), ): """Update a child profile.""" result = await db.execute( select(ChildProfile).where( ChildProfile.id == profile_id, ChildProfile.user_id == user.id, ) ) profile = result.scalar_one_or_none() if not profile: raise HTTPException(status_code=404, detail="档案不存在") updates = payload.model_dump(exclude_unset=True) if "name" in updates: existing = await db.scalar( select(ChildProfile.id).where( ChildProfile.user_id == user.id, ChildProfile.name == updates["name"], ChildProfile.id != profile_id, ) ) if existing: raise HTTPException(status_code=409, detail="该档案名称已存在") for key, value in updates.items(): setattr(profile, key, value) await db.commit() await db.refresh(profile) return profile @router.delete("/profiles/{profile_id}") async def delete_profile( profile_id: str, user: User = Depends(require_user), db: AsyncSession = Depends(get_db), ): """Delete a child profile.""" result = await db.execute( select(ChildProfile).where( ChildProfile.id == profile_id, ChildProfile.user_id == user.id, ) ) profile = result.scalar_one_or_none() if not profile: raise HTTPException(status_code=404, detail="档案不存在") await db.delete(profile) await db.commit() return {"message": "Deleted"} @router.get("/profiles/{profile_id}/timeline", response_model=TimelineResponse) async def get_profile_timeline( profile_id: str, user: User = Depends(require_user), db: AsyncSession = Depends(get_db), ): """Get profile growth timeline.""" result = await db.execute( select(ChildProfile).where( ChildProfile.id == profile_id, ChildProfile.user_id == user.id, ) ) profile = result.scalar_one_or_none() if not profile: raise HTTPException(status_code=404, detail="档案不存在") events: list[TimelineEvent] = [] # 1. Milestone: Profile Created events.append(TimelineEvent( date=profile.created_at.isoformat(), type="milestone", title="初次相遇", description=f"创建了档案 {profile.name}" )) # 2. Stories stories_result = await db.execute( select(Story).where(Story.child_profile_id == profile_id) ) for s in stories_result.scalars(): events.append(TimelineEvent( date=s.created_at.isoformat(), type="story", title=s.title, image_url=s.image_url, metadata={"story_id": s.id, "mode": s.mode} )) # 3. Achievements (from Universe) universes_result = await db.execute( select(StoryUniverse).where(StoryUniverse.child_profile_id == profile_id) ) for u in universes_result.scalars(): if u.achievements: for ach in u.achievements: if isinstance(ach, dict): obt_at = ach.get("obtained_at") # Fallback if not obt_at: obt_at = u.updated_at.isoformat() events.append(TimelineEvent( date=obt_at, type="achievement", title=f"获得成就:{ach.get('type')}", description=ach.get('description'), metadata={"universe_id": u.id, "source_story_id": ach.get("source_story_id")} )) # Sort by date desc events.sort(key=lambda x: x.date, reverse=True) return TimelineResponse(events=events)