Files
dreamweaver/backend/app/services/voice_session_service.py

1985 lines
70 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Voice co-creation session service."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from fastapi import HTTPException
from sqlalchemy import case, desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.db.models import VoiceSession, VoiceSessionEvent, VoiceTurn
from app.schemas.voice_session_schemas import (
VoiceSessionAbandonRequest,
VoiceSessionAnalyticsResponse,
VoiceSessionCreateRequest,
VoiceSessionDetailResponse,
VoiceSessionFinalizeRequest,
VoiceSessionFinalizeResponse,
VoiceSessionSummaryResponse,
VoiceTurnAcceptedResponse,
VoiceTurnConfirmRequest,
VoiceTurnCreateFallbackRequest,
VoiceTurnSummaryResponse,
VoiceTurnUploadAcceptedResponse,
)
from app.services.adapters.text.models import StoryOutput
from app.services.memory_service import build_enhanced_memory_context
from app.services.provider_router import generate_story_content, text_to_speech
from app.services.story_service import (
create_story_from_result,
queue_story_asset_generation,
validate_profile_and_universe,
)
from app.services.voice_session_safety import (
check_assistant_output_safety,
check_user_transcript_safety,
)
from app.services.voice_session_storage import (
build_turn_assistant_audio_path,
read_session_audio,
session_audio_exists,
write_session_audio,
write_uploaded_user_audio,
)
from app.services.voice_transcription_service import transcribe_voice_audio
logger = get_logger(__name__)
CONTINUABLE_SESSION_STATUSES = {"draft", "active", "waiting_user"}
FINAL_SESSION_STATUSES = {"completed", "abandoned"}
def _default_story_state() -> dict[str, Any]:
return {
"premise": None,
"latest_direction": None,
"cover_prompt": None,
"narrative_segments": [],
"safety_flags": [],
"last_intent": None,
"final_summary": None,
}
def _session_can_continue(session: VoiceSession) -> bool:
return session.status in CONTINUABLE_SESSION_STATUSES
def _session_can_finalize(session: VoiceSession) -> bool:
segments = list((session.story_state or {}).get("narrative_segments") or [])
return bool(segments) and session.status in {"active", "waiting_user"}
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
def _assistant_audio_url(session_id: str, turn_id: str, audio_path: str | None) -> str | None:
if not session_audio_exists(audio_path):
return None
return f"/api/voice-sessions/{session_id}/turns/{turn_id}/audio"
def _user_audio_url(session_id: str, turn_id: str, audio_path: str | None) -> str | None:
if not session_audio_exists(audio_path):
return None
return f"/api/voice-sessions/{session_id}/turns/{turn_id}/user-audio"
def _format_intent_label(intent: str | None) -> str:
labels = {
"start_story": "开启故事",
"continue_story": "继续讲述",
"correct_story": "修正走向",
"end_story": "先停在这里",
"save_story": "保存当前故事",
"unknown": "待确认",
}
return labels.get(intent or "", "待确认")
def _build_understanding_summary(
*,
transcript_text: str | None,
detected_intent: str,
) -> str | None:
normalized_transcript = (transcript_text or "").strip()
if detected_intent == "unknown":
if normalized_transcript:
return f"本轮系统暂时还没完全理解:{normalized_transcript}"
return "本轮系统暂时还没完全理解孩子刚才的表达。"
if normalized_transcript:
return f"本轮系统理解为「{_format_intent_label(detected_intent)}」:{normalized_transcript}"
return f"本轮系统理解为「{_format_intent_label(detected_intent)}"
def _build_confirmation_message(
*,
transcript_text: str | None,
detected_intent: str,
reasons: list[str],
) -> str:
natural_understanding = ""
normalized_transcript = (transcript_text or "").strip()
if detected_intent != "unknown":
if normalized_transcript:
natural_understanding = (
f"我现在先理解成你想「{_format_intent_label(detected_intent)}」:"
f"{normalized_transcript}"
)
else:
natural_understanding = (
f"我现在先理解成你想「{_format_intent_label(detected_intent)}」。"
)
if "intent_unknown" in reasons:
prefix = "我这一次还没有完全听懂。"
elif {
"low_transcript_confidence",
"low_intent_confidence",
}.issubset(set(reasons)):
prefix = "我这一次听得还不够清楚,也不太确定该怎么接剧情。"
elif "low_transcript_confidence" in reasons:
prefix = "我这一次可能没有完全听清。"
else:
prefix = "我这一次还不太确定你是想继续讲,还是想改一下剧情。"
return (
f"{prefix}{natural_understanding}"
"请家长帮忙确认一下;如果不对,可以换一种说法再说一遍,我们再继续编下去。"
)
def _merge_unique_items(*values: list[str] | tuple[str, ...]) -> list[str]:
merged: list[str] = []
for value in values:
for item in value:
normalized = str(item).strip()
if normalized and normalized not in merged:
merged.append(normalized)
return merged
def _confirmation_state_from_patch(story_patch: dict[str, Any] | None = None) -> str:
patch = story_patch or {}
if isinstance(patch.get("confirmation_state"), str):
return str(patch["confirmation_state"])
if patch.get("requires_confirmation"):
return "pending"
return "not_needed"
def _resolve_turn_safety_state(story_patch: dict[str, Any] | None = None) -> dict[str, Any]:
patch = story_patch or {}
return {
"safety_flags": list(patch.get("safety_flags") or []),
"safety_blocked": bool(patch.get("safety_blocked") or False),
"safety_message": patch.get("safety_message"),
}
def _resolve_turn_confirmation_state(
*,
transcript_text: str | None,
transcript_confidence: float | None,
detected_intent: str,
intent_confidence: float | None,
story_patch: dict[str, Any] | None = None,
) -> dict[str, Any]:
patch = story_patch or {}
requires_confirmation = patch.get("requires_confirmation")
confirmation_reason = patch.get("confirmation_reason")
confirmation_message = patch.get("confirmation_message")
understanding_summary = patch.get("understanding_summary")
confirmation_state = _confirmation_state_from_patch(patch)
reasons: list[str] = []
if detected_intent == "unknown":
reasons.append("intent_unknown")
if (
transcript_confidence is not None
and transcript_confidence < settings.voice_turn_low_transcript_confidence
):
reasons.append("low_transcript_confidence")
if (
intent_confidence is not None
and intent_confidence < settings.voice_turn_low_intent_confidence
):
reasons.append("low_intent_confidence")
if requires_confirmation is None:
requires_confirmation = bool(reasons)
if confirmation_reason is None and reasons:
confirmation_reason = ",".join(reasons)
if understanding_summary is None:
understanding_summary = _build_understanding_summary(
transcript_text=transcript_text,
detected_intent=detected_intent,
)
if confirmation_message is None and requires_confirmation:
confirmation_message = _build_confirmation_message(
transcript_text=transcript_text,
detected_intent=detected_intent,
reasons=reasons or ["intent_unknown"],
)
return {
"understanding_summary": understanding_summary,
"requires_confirmation": bool(requires_confirmation),
"confirmation_state": confirmation_state,
"confirmation_reason": confirmation_reason,
"confirmation_message": confirmation_message,
}
def _turn_has_pending_confirmation(turn: VoiceTurn) -> bool:
confirmation_state = _resolve_turn_confirmation_state(
transcript_text=turn.user_transcript,
transcript_confidence=turn.transcript_confidence,
detected_intent=turn.detected_intent,
intent_confidence=turn.intent_confidence,
story_patch=turn.story_patch or {},
)
return confirmation_state["requires_confirmation"] and (
confirmation_state["confirmation_state"] == "pending"
)
def _extract_first_sentence(text: str | None) -> str:
normalized = (text or "").strip().replace("\n", " ")
if not normalized:
return ""
for separator in ("", "", "", ".", "!", "?"):
if separator in normalized:
return normalized.split(separator, 1)[0].strip()
return normalized
def _build_final_story_title(session: VoiceSession) -> str:
candidates = [
session.working_title,
(session.story_state or {}).get("premise"),
_extract_first_sentence(
((session.story_state or {}).get("narrative_segments") or [None])[0]
),
"一起编织的睡前故事",
]
for candidate in candidates:
normalized = str(candidate or "").strip(" \n\t。!?:-")
if normalized:
return normalized[:24]
return "一起编织的睡前故事"
def _build_final_story_summary(session: VoiceSession) -> str:
story_state = session.story_state or {}
segments = [
segment.strip()
for segment in list(story_state.get("narrative_segments") or [])
if str(segment).strip()
]
if not segments:
return "这是一段由孩子和 DreamWeaver 一起共创的温柔故事。"
first_sentence = _extract_first_sentence(segments[0])
last_sentence = _extract_first_sentence(segments[-1])
if first_sentence and last_sentence and first_sentence != last_sentence:
return f"{first_sentence}。后来,{last_sentence}"
if first_sentence:
return f"{first_sentence}"
return "这是一段由孩子和 DreamWeaver 一起共创的温柔故事。"
def _turn_counts_as_success(turn: VoiceTurn) -> bool:
patch = turn.story_patch or {}
confirmation_state = _confirmation_state_from_patch(patch)
if turn.status == "failed":
return False
if patch.get("safety_blocked"):
return False
if confirmation_state in {"pending", "retry_recording", "switch_to_text"}:
return False
return turn.status in {"audio_ready", "narrative_ready"}
def _can_finalize_with_latest_turn(
session: VoiceSession,
latest_turn: VoiceTurn | None,
) -> bool:
if not _session_can_finalize(session):
return False
if latest_turn and _turn_has_pending_confirmation(latest_turn):
return False
return True
def _turn_to_summary(turn: VoiceTurn) -> VoiceTurnSummaryResponse:
turn_patch = turn.story_patch or {}
confirmation_state = _resolve_turn_confirmation_state(
transcript_text=turn.user_transcript,
transcript_confidence=turn.transcript_confidence,
detected_intent=turn.detected_intent,
intent_confidence=turn.intent_confidence,
story_patch=turn_patch,
)
safety_state = _resolve_turn_safety_state(turn_patch)
return VoiceTurnSummaryResponse(
id=turn.id,
session_id=turn.session_id,
turn_index=turn.turn_index,
status=turn.status,
user_transcript=turn.user_transcript,
transcript_confidence=turn.transcript_confidence,
transcription_provider=turn_patch.get("transcription_provider"),
user_audio_duration_ms=turn.user_audio_duration_ms,
detected_intent=turn.detected_intent,
intent_confidence=turn.intent_confidence,
understanding_summary=confirmation_state["understanding_summary"],
requires_confirmation=confirmation_state["requires_confirmation"],
confirmation_state=confirmation_state["confirmation_state"],
confirmation_reason=confirmation_state["confirmation_reason"],
confirmation_message=confirmation_state["confirmation_message"],
safety_flags=safety_state["safety_flags"],
safety_blocked=safety_state["safety_blocked"],
safety_message=safety_state["safety_message"],
assistant_text=turn.assistant_text,
assistant_audio_duration_ms=turn.assistant_audio_duration_ms,
assistant_audio_ready=session_audio_exists(turn.assistant_audio_path),
assistant_audio_url=_assistant_audio_url(
turn.session_id,
turn.id,
turn.assistant_audio_path,
),
user_audio_ready=session_audio_exists(turn.user_audio_path),
user_audio_url=_user_audio_url(turn.session_id, turn.id, turn.user_audio_path),
error_message=turn.error_message,
created_at=turn.created_at,
updated_at=turn.updated_at,
)
def _session_to_summary(
session: VoiceSession,
*,
latest_turn: VoiceTurn | None = None,
total_turns: int | None = None,
) -> VoiceSessionSummaryResponse:
if latest_turn is None:
total_turns = total_turns if total_turns is not None else session.current_turn_index
latest_confirmation_state = {
"understanding_summary": None,
"requires_confirmation": False,
"confirmation_state": None,
"confirmation_message": None,
}
latest_safety_state = {
"safety_flags": [],
"safety_message": None,
}
else:
total_turns = total_turns if total_turns is not None else latest_turn.turn_index
latest_confirmation_state = _resolve_turn_confirmation_state(
transcript_text=latest_turn.user_transcript,
transcript_confidence=latest_turn.transcript_confidence,
detected_intent=latest_turn.detected_intent,
intent_confidence=latest_turn.intent_confidence,
story_patch=latest_turn.story_patch or {},
)
latest_safety_state = _resolve_turn_safety_state(latest_turn.story_patch or {})
attention_reasons = _build_session_attention_reasons(
latest_requires_confirmation=latest_confirmation_state["requires_confirmation"],
latest_safety_flags=latest_safety_state["safety_flags"],
last_turn_status=latest_turn.status if latest_turn else None,
last_error=session.last_error,
)
return VoiceSessionSummaryResponse(
id=session.id,
child_profile_id=session.child_profile_id,
universe_id=session.universe_id,
final_story_id=session.final_story_id,
target_mode=session.target_mode,
status=session.status,
current_turn_index=session.current_turn_index,
total_turns=total_turns or 0,
working_title=session.working_title,
story_state=session.story_state or {},
latest_user_transcript=session.latest_user_transcript,
latest_assistant_text=session.latest_assistant_text,
latest_detected_intent=latest_turn.detected_intent if latest_turn else None,
latest_understanding_summary=latest_confirmation_state["understanding_summary"],
latest_requires_confirmation=latest_confirmation_state["requires_confirmation"],
latest_confirmation_state=latest_confirmation_state["confirmation_state"],
latest_confirmation_message=latest_confirmation_state["confirmation_message"],
latest_safety_flags=latest_safety_state["safety_flags"],
latest_safety_message=latest_safety_state["safety_message"],
latest_assistant_audio_ready=(
session_audio_exists(latest_turn.assistant_audio_path) if latest_turn else False
),
last_turn_status=latest_turn.status if latest_turn else None,
attention_reasons=attention_reasons,
transcription_mode_hint=settings.voice_transcription_mode,
can_continue=_session_can_continue(session),
can_finalize=_can_finalize_with_latest_turn(session, latest_turn),
last_error=session.last_error,
created_at=session.created_at,
updated_at=session.updated_at,
)
def _build_session_attention_reasons(
*,
latest_requires_confirmation: bool,
latest_safety_flags: list[str] | None,
last_turn_status: str | None,
last_error: str | None,
) -> list[str]:
reasons: list[str] = []
if latest_requires_confirmation:
reasons.append("pending_confirmation")
if latest_safety_flags:
reasons.append("safety_intervention")
if last_turn_status == "failed" or last_error:
reasons.append("failed_turn")
return reasons
def _session_summary_needs_attention(summary: VoiceSessionSummaryResponse) -> bool:
return bool(summary.attention_reasons)
def _session_summary_matches_attention_reason(
summary: VoiceSessionSummaryResponse,
attention_reason: str | None,
) -> bool:
if attention_reason is None:
return True
return attention_reason in summary.attention_reasons
async def _build_session_summary(
db: AsyncSession,
session: VoiceSession,
) -> VoiceSessionSummaryResponse:
latest_turn = await _get_latest_turn(db, session_id=session.id)
return _session_to_summary(
session,
latest_turn=latest_turn,
total_turns=session.current_turn_index,
)
async def _record_session_event(
db: AsyncSession,
*,
session_id: str,
turn_id: str | None,
event_type: str,
status: str,
message: str | None = None,
metadata: dict[str, Any] | None = None,
) -> VoiceSessionEvent:
event = VoiceSessionEvent(
session_id=session_id,
turn_id=turn_id,
event_type=event_type,
status=status,
message=message,
event_metadata=metadata or {},
)
db.add(event)
await db.commit()
await db.refresh(event)
return event
async def _get_owned_session(
db: AsyncSession,
*,
session_id: str,
user_id: str,
) -> VoiceSession:
result = await db.execute(
select(VoiceSession).where(
VoiceSession.id == session_id,
VoiceSession.user_id == user_id,
)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Voice session not found")
return session
async def _get_latest_turn(
db: AsyncSession,
*,
session_id: str,
) -> VoiceTurn | None:
result = await db.execute(
select(VoiceTurn)
.where(VoiceTurn.session_id == session_id)
.order_by(desc(VoiceTurn.turn_index))
.limit(1)
)
return result.scalar_one_or_none()
async def _get_owned_turn(
db: AsyncSession,
*,
session_id: str,
turn_id: str,
user_id: str,
) -> VoiceTurn:
result = await db.execute(
select(VoiceTurn)
.join(VoiceSession, VoiceTurn.session_id == VoiceSession.id)
.where(
VoiceTurn.id == turn_id,
VoiceTurn.session_id == session_id,
VoiceSession.user_id == user_id,
)
)
turn = result.scalar_one_or_none()
if not turn:
raise HTTPException(status_code=404, detail="Voice turn not found")
return turn
def _detect_intent(
transcript_text: str,
*,
current_turn_index: int,
) -> tuple[str, float]:
normalized = transcript_text.replace(" ", "")
if any(keyword in normalized for keyword in ("保存故事", "存起来", "保存吧", "保存到故事库")):
return "save_story", 0.96
if any(keyword in normalized for keyword in ("先到这里", "讲完了", "结束吧", "停在这里")):
return "end_story", 0.90
if len(normalized) <= 1 or normalized in {"", "", "", "", "这个", "那个", "不知道"}:
return "unknown", 0.25
if current_turn_index == 0:
return "start_story", 0.84
if any(
keyword in normalized
for keyword in (
"不要",
"改成",
"换成",
"我想让",
"让它",
"改一下",
"改一改",
"其实",
)
):
return "correct_story", 0.78
return "continue_story", 0.74
def _recent_story_text(session: VoiceSession) -> str:
story_state = session.story_state or {}
segments = list(story_state.get("narrative_segments") or [])
if not segments:
return ""
return "\n\n".join(segments[-2:])
def _build_generation_prompt(
*,
session: VoiceSession,
transcript_text: str,
intent: str,
) -> str:
recent_story = _recent_story_text(session)
if intent == "start_story":
return (
"你是 DreamWeaver 的儿童故事共创助手。"
"请为 3-8 岁儿童写一个温暖、安全、适合继续接龙的故事开头。"
f"孩子刚刚说:{transcript_text}"
"请只输出一小段自然的中文故事,不要分点,不要解释,不要写“故事开始”。"
)
if intent == "correct_story":
return (
"你是 DreamWeaver 的儿童故事共创助手。"
f"当前故事最近两段如下:{recent_story or '(暂时还没有已讲述内容)'}"
f"孩子希望修正故事走向:{transcript_text}"
"请顺着已有内容自然接住这个修改,继续写一小段新故事。"
"不要从头重讲,不要解释规则。"
)
return (
"你是 DreamWeaver 的儿童故事共创助手。"
f"当前故事最近两段如下:{recent_story or '(暂时还没有已讲述内容)'}"
f"孩子接着说:{transcript_text}"
"请继续写一小段新的儿童故事内容,让故事自然往下发展。"
"不要分点,不要做旁白说明。"
)
async def _generate_assistant_turn(
db: AsyncSession,
*,
session: VoiceSession,
transcript_text: str,
intent: str,
) -> StoryOutput:
memory_context = await build_enhanced_memory_context(
session.child_profile_id,
session.universe_id,
db,
)
prompt = _build_generation_prompt(
session=session,
transcript_text=transcript_text,
intent=intent,
)
return await generate_story_content(
input_type="full_story",
data=prompt,
memory_context=memory_context,
db=db,
user_id=session.user_id,
)
def _merge_story_state(
session: VoiceSession,
*,
transcript_text: str,
intent: str,
assistant_result: StoryOutput | None,
safety_flags: list[str] | None = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
current_state = _default_story_state() | (session.story_state or {})
narrative_segments = list(current_state.get("narrative_segments") or [])
if intent == "start_story" and not current_state.get("premise"):
current_state["premise"] = transcript_text
if assistant_result and assistant_result.story_text:
narrative_segments.append(assistant_result.story_text.strip())
current_state["narrative_segments"] = narrative_segments
current_state["latest_direction"] = transcript_text
current_state["last_intent"] = intent
current_state["safety_flags"] = _merge_unique_items(
list(current_state.get("safety_flags") or []),
list(safety_flags or []),
)
if assistant_result and assistant_result.cover_prompt_suggestion:
current_state["cover_prompt"] = assistant_result.cover_prompt_suggestion
patch = {
"intent": intent,
"transcript_text": transcript_text,
"segment_added": bool(assistant_result and assistant_result.story_text),
"working_title": assistant_result.title if assistant_result else session.working_title,
"cover_prompt": current_state.get("cover_prompt"),
"narrative_segments_count": len(narrative_segments),
"safety_flags": list(current_state.get("safety_flags") or []),
}
return current_state, patch
async def _ensure_no_pending_confirmation(
db: AsyncSession,
*,
session: VoiceSession,
) -> None:
latest_turn = await _get_latest_turn(db, session_id=session.id)
if latest_turn and _turn_has_pending_confirmation(latest_turn):
raise HTTPException(
status_code=409,
detail="请先确认上一轮系统理解,或选择重说 / 改成文本输入后再继续。",
)
async def _create_pending_turn(
db: AsyncSession,
*,
session: VoiceSession,
transcript_text: str,
transcript_confidence: float | None,
transcription_provider: str | None,
user_audio_path: str | None = None,
user_audio_mime_type: str | None = None,
user_audio_duration_ms: int | None = None,
) -> tuple[VoiceSession, VoiceTurn]:
if session.status not in CONTINUABLE_SESSION_STATUSES:
raise HTTPException(
status_code=409,
detail="Voice session is not ready for another turn.",
)
await _ensure_no_pending_confirmation(db, session=session)
next_turn_index = session.current_turn_index + 1
detected_intent, intent_confidence = _detect_intent(
transcript_text,
current_turn_index=session.current_turn_index,
)
turn = VoiceTurn(
session_id=session.id,
turn_index=next_turn_index,
status="transcribing",
user_audio_path=user_audio_path,
user_audio_mime_type=user_audio_mime_type,
user_audio_duration_ms=user_audio_duration_ms,
user_transcript=transcript_text,
transcript_confidence=transcript_confidence,
detected_intent=detected_intent,
intent_confidence=intent_confidence,
story_patch={"transcription_provider": transcription_provider},
)
session.status = "processing_turn"
session.current_turn_index = next_turn_index
session.latest_user_transcript = transcript_text
session.last_error = None
session.updated_at = _utcnow()
db.add(turn)
await db.commit()
await db.refresh(session)
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="turn_received",
status="received",
message="Voice turn received.",
metadata={
"turn_index": turn.turn_index,
"has_user_audio": bool(user_audio_path),
"transcription_provider": transcription_provider,
},
)
if user_audio_path:
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="turn_audio_uploaded",
status="succeeded",
message="User audio uploaded for one voice turn.",
metadata={
"mime_type": user_audio_mime_type,
"audio_path": user_audio_path,
},
)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="turn_transcribed",
status="succeeded",
message="Voice turn transcript is available.",
metadata={
"transcript_confidence": transcript_confidence,
"transcription_provider": transcription_provider,
},
)
return session, turn
async def _process_pending_turn(
db: AsyncSession,
*,
session: VoiceSession,
turn: VoiceTurn,
transcript_text: str,
user_id: str,
) -> str:
assistant_text: str | None = None
assistant_result: StoryOutput | None = None
detected_intent = turn.detected_intent
intent_confidence = turn.intent_confidence
turn_patch = dict(turn.story_patch or {})
confirmation_state = _resolve_turn_confirmation_state(
transcript_text=transcript_text,
transcript_confidence=turn.transcript_confidence,
detected_intent=detected_intent,
intent_confidence=intent_confidence,
story_patch=turn_patch,
)
transcript_safety = check_user_transcript_safety(transcript_text)
assistant_safety_message: str | None = None
safety_flags: list[str] = []
transcript_blocked = False
try:
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="intent_resolved",
status="succeeded",
message="Turn intent resolved.",
metadata={
"detected_intent": detected_intent,
"intent_confidence": intent_confidence,
},
)
if confirmation_state["requires_confirmation"]:
current_state = _default_story_state() | (session.story_state or {})
assistant_text = confirmation_state["confirmation_message"]
turn.story_patch = {
**(turn.story_patch or {}),
"intent": detected_intent,
"transcript_text": transcript_text,
"segment_added": False,
"working_title": session.working_title,
"cover_prompt": current_state.get("cover_prompt"),
"narrative_segments_count": len(
list(current_state.get("narrative_segments") or [])
),
"requires_confirmation": True,
"confirmation_reason": confirmation_state["confirmation_reason"],
"confirmation_message": confirmation_state["confirmation_message"],
"understanding_summary": confirmation_state["understanding_summary"],
}
turn.assistant_text = assistant_text
turn.status = "narrative_ready"
session.latest_assistant_text = assistant_text
session.status = "waiting_user"
session.updated_at = _utcnow()
await db.commit()
await db.refresh(session)
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="turn_confirmation_requested",
status="needs_confirmation",
message="Voice turn needs parent confirmation before the story continues.",
metadata={
"detected_intent": detected_intent,
"transcript_confidence": turn.transcript_confidence,
"intent_confidence": intent_confidence,
"confirmation_reason": confirmation_state["confirmation_reason"],
},
)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="assistant_text_ready",
status="succeeded",
message="Assistant clarification prompt generated.",
metadata={
"assistant_text_length": len(assistant_text or ""),
"working_title": session.working_title,
"requires_confirmation": True,
},
)
elif not transcript_safety.is_safe:
transcript_blocked = True
safety_flags = list(transcript_safety.flags)
current_state = _default_story_state() | (session.story_state or {})
current_state["safety_flags"] = _merge_unique_items(
list(current_state.get("safety_flags") or []),
safety_flags,
)
assistant_text = transcript_safety.replacement_text or transcript_safety.message
turn.story_patch = {
**turn_patch,
"intent": detected_intent,
"transcript_text": transcript_text,
"segment_added": False,
"working_title": session.working_title,
"cover_prompt": current_state.get("cover_prompt"),
"narrative_segments_count": len(
list(current_state.get("narrative_segments") or [])
),
"requires_confirmation": False,
"confirmation_state": turn_patch.get("confirmation_state", "not_needed"),
"understanding_summary": confirmation_state["understanding_summary"],
"safety_flags": safety_flags,
"safety_blocked": True,
"safety_message": transcript_safety.message,
}
turn.assistant_text = assistant_text
turn.status = "narrative_ready"
turn.error_message = None
session.story_state = current_state
session.latest_assistant_text = assistant_text
session.status = "waiting_user"
session.last_error = None
session.updated_at = _utcnow()
await db.commit()
await db.refresh(session)
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="safety_intervention_requested",
status="blocked",
message="Unsafe user transcript was redirected to a child-friendly path.",
metadata={
"stage": "user_input",
"safety_flags": safety_flags,
},
)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="assistant_text_ready",
status="succeeded",
message="Assistant safety redirect generated.",
metadata={
"assistant_text_length": len(assistant_text or ""),
"working_title": session.working_title,
"requires_confirmation": False,
"safety_flags": safety_flags,
},
)
elif detected_intent == "save_story":
assistant_text = "好的,这个故事已经准备好保存到故事库了。"
elif detected_intent == "end_story":
assistant_text = "好的,我们先把故事停在这里。想保存的话,现在就可以保存到故事库。"
else:
assistant_result = await _generate_assistant_turn(
db,
session=session,
transcript_text=transcript_text,
intent=detected_intent,
)
assistant_text = assistant_result.story_text.strip()
output_safety = check_assistant_output_safety(
assistant_text,
premise=str((session.story_state or {}).get("premise") or ""),
)
if not output_safety.is_safe:
safety_flags = _merge_unique_items(safety_flags, output_safety.flags)
assistant_safety_message = output_safety.message
assistant_text = output_safety.replacement_text or assistant_text
assistant_result = StoryOutput(
mode=assistant_result.mode,
title=assistant_result.title,
story_text=assistant_text,
cover_prompt_suggestion=assistant_result.cover_prompt_suggestion,
)
if not confirmation_state["requires_confirmation"] and not transcript_blocked:
merged_state, story_patch = _merge_story_state(
session,
transcript_text=transcript_text,
intent=detected_intent,
assistant_result=assistant_result,
safety_flags=safety_flags,
)
story_patch["transcription_provider"] = turn_patch.get("transcription_provider")
story_patch["requires_confirmation"] = False
story_patch["confirmation_state"] = turn_patch.get("confirmation_state", "not_needed")
story_patch["understanding_summary"] = confirmation_state["understanding_summary"]
if turn_patch.get("confirmation_reason"):
story_patch["confirmation_reason"] = turn_patch.get("confirmation_reason")
story_patch["confirmation_message"] = None
story_patch["safety_flags"] = safety_flags
story_patch["safety_blocked"] = False
story_patch["safety_message"] = assistant_safety_message
turn.story_patch = story_patch
turn.assistant_text = assistant_text
turn.status = "narrative_ready"
turn.error_message = None
session.story_state = merged_state
session.latest_assistant_text = assistant_text
session.status = "waiting_user"
session.last_error = None
session.updated_at = _utcnow()
if assistant_result and assistant_result.title and not session.working_title:
session.working_title = assistant_result.title
await db.commit()
await db.refresh(session)
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="story_patch_applied",
status="succeeded",
message="Story state updated after one turn.",
metadata=story_patch,
)
if safety_flags:
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="safety_intervention_requested",
status="rewritten",
message="Assistant output was rewritten to keep the story child-friendly.",
metadata={
"stage": "assistant_output",
"safety_flags": safety_flags,
},
)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="assistant_text_ready",
status="succeeded",
message="Assistant text response generated.",
metadata={
"assistant_text_length": len(assistant_text or ""),
"working_title": session.working_title,
"requires_confirmation": False,
"safety_flags": safety_flags,
},
)
except Exception as exc:
turn.status = "failed"
turn.error_message = str(exc)
session.status = "waiting_user"
session.last_error = str(exc)
session.updated_at = _utcnow()
await db.commit()
await db.refresh(session)
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="session_failed",
status="failed",
message="Assistant narrative generation failed for one voice turn.",
metadata={"error": str(exc), "turn_index": turn.turn_index},
)
logger.warning(
"voice_turn_generation_failed",
session_id=session.id,
turn_id=turn.id,
error=str(exc),
)
return turn.status
if assistant_text:
try:
audio_bytes = await text_to_speech(
assistant_text,
db=db,
user_id=user_id,
)
saved_path = write_session_audio(
build_turn_assistant_audio_path(session.id, turn.turn_index),
audio_bytes,
)
turn.assistant_audio_path = saved_path
turn.assistant_audio_duration_ms = None
turn.status = "audio_ready"
await db.commit()
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="assistant_audio_ready",
status="succeeded",
message="Assistant audio response generated.",
metadata={"audio_path": saved_path},
)
except Exception as exc:
turn.status = "narrative_ready"
turn.error_message = None
session.last_error = None
session.updated_at = _utcnow()
await db.commit()
await db.refresh(turn)
await db.refresh(session)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="assistant_audio_failed",
status="failed",
message="Assistant audio generation failed, text response kept.",
metadata={"error": str(exc)},
)
logger.warning(
"voice_turn_audio_failed",
session_id=session.id,
turn_id=turn.id,
error=str(exc),
)
return turn.status
def _confirmation_resolution_text(action: str) -> str:
if action == "retry_recording":
return "好的,我们把这一轮先撤回,你可以重新录一遍,我会重新认真听。"
return "好的,我们先切换成文本输入。你可以直接在下面把这一轮想法改写清楚,我们再继续讲。"
async def list_voice_sessions_service(
user_id: str,
db: AsyncSession,
*,
limit: int | None = None,
active_only: bool = False,
needs_attention: bool = False,
attention_reason: str | None = None,
active_first: bool = False,
) -> list[VoiceSessionSummaryResponse]:
resolved_limit = limit or settings.voice_session_default_list_limit
resolved_limit = max(1, min(resolved_limit, settings.voice_session_max_list_limit))
query = select(VoiceSession).where(VoiceSession.user_id == user_id)
if active_only:
query = query.where(VoiceSession.status.in_(CONTINUABLE_SESSION_STATUSES))
if active_first:
query = query.order_by(
desc(
case(
(VoiceSession.status.in_(CONTINUABLE_SESSION_STATUSES), 1),
else_=0,
)
),
desc(VoiceSession.updated_at),
desc(VoiceSession.created_at),
)
else:
query = query.order_by(desc(VoiceSession.updated_at), desc(VoiceSession.created_at))
if not needs_attention and attention_reason is None:
query = query.limit(resolved_limit)
sessions = (await db.execute(query)).scalars().all()
summaries: list[VoiceSessionSummaryResponse] = []
for session in sessions:
summary = await _build_session_summary(db, session)
if needs_attention and not _session_summary_needs_attention(summary):
continue
if not _session_summary_matches_attention_reason(summary, attention_reason):
continue
summaries.append(summary)
if (needs_attention or attention_reason is not None) and len(summaries) >= resolved_limit:
break
return summaries
async def get_latest_active_voice_session_service(
user_id: str,
db: AsyncSession,
) -> VoiceSessionSummaryResponse | None:
query = (
select(VoiceSession)
.where(
VoiceSession.user_id == user_id,
VoiceSession.status.in_(CONTINUABLE_SESSION_STATUSES),
)
.order_by(desc(VoiceSession.updated_at), desc(VoiceSession.created_at))
.limit(1)
)
session = (await db.execute(query)).scalar_one_or_none()
if session is None:
return None
return await _build_session_summary(db, session)
async def get_voice_session_analytics_service(
user_id: str,
db: AsyncSession,
*,
days: int | None = 30,
provider: str | None = None,
session_status: str | None = None,
) -> VoiceSessionAnalyticsResponse:
cutoff = None
if days is not None:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
provider_filter = (provider or "").strip() or None
session_status_filter = (session_status or "").strip() or None
session_query = select(VoiceSession).where(VoiceSession.user_id == user_id)
turn_query = (
select(VoiceTurn)
.join(VoiceSession, VoiceTurn.session_id == VoiceSession.id)
.where(VoiceSession.user_id == user_id)
)
event_query = (
select(VoiceSessionEvent)
.join(VoiceSession, VoiceSessionEvent.session_id == VoiceSession.id)
.where(VoiceSession.user_id == user_id)
)
if cutoff is not None:
session_query = session_query.where(VoiceSession.created_at >= cutoff)
turn_query = turn_query.where(VoiceTurn.created_at >= cutoff)
event_query = event_query.where(VoiceSessionEvent.created_at >= cutoff)
if session_status_filter is not None:
session_query = session_query.where(VoiceSession.status == session_status_filter)
turn_query = turn_query.where(VoiceSession.status == session_status_filter)
event_query = event_query.where(VoiceSession.status == session_status_filter)
sessions = (await db.execute(session_query)).scalars().all()
turns = (await db.execute(turn_query)).scalars().all()
events = (await db.execute(event_query)).scalars().all()
if provider_filter is not None:
provider_turn_ids = {
turn.id
for turn in turns
if ((turn.story_patch or {}).get("transcription_provider") or "unknown")
== provider_filter
}
provider_session_ids = {turn.session_id for turn in turns if turn.id in provider_turn_ids}
sessions = [session for session in sessions if session.id in provider_session_ids]
turns = [turn for turn in turns if turn.id in provider_turn_ids]
events = [
event
for event in events
if event.turn_id in provider_turn_ids
or (event.turn_id is None and event.session_id in provider_session_ids)
]
session_summaries = [await _build_session_summary(db, session) for session in sessions]
total_sessions = len(sessions)
attention_sessions = sum(
1 for summary in session_summaries if _session_summary_needs_attention(summary)
)
confirmation_attention_sessions = sum(
1
for summary in session_summaries
if "pending_confirmation" in summary.attention_reasons
)
safety_attention_sessions = sum(
1
for summary in session_summaries
if "safety_intervention" in summary.attention_reasons
)
failed_attention_sessions = sum(
1 for summary in session_summaries if "failed_turn" in summary.attention_reasons
)
active_sessions = sum(
1 for session in sessions if session.status in CONTINUABLE_SESSION_STATUSES
)
finalized_sessions = sum(1 for session in sessions if session.status == "completed")
abandoned_sessions = sum(1 for session in sessions if session.status == "abandoned")
total_turns = len(turns)
successful_turns = sum(1 for turn in turns if _turn_counts_as_success(turn))
failed_turns = sum(1 for turn in turns if turn.status == "failed")
asr_failures = sum(1 for event in events if event.event_type == "turn_transcription_failed")
tts_failures = sum(
1
for event in events
if event.event_type in {"assistant_audio_failed", "assistant_audio_retry_failed"}
)
low_confidence_turns = sum(
1 for event in events if event.event_type == "turn_confirmation_requested"
)
safety_interventions = sum(
1 for event in events if event.event_type == "safety_intervention_requested"
)
text_fallback_turns = sum(
1 for turn in turns if (turn.story_patch or {}).get("transcription_provider") == "fallback"
)
uploaded_audio_turns = sum(1 for turn in turns if turn.user_audio_path)
assistant_audio_ready_turns = sum(
1 for turn in turns if session_audio_exists(turn.assistant_audio_path)
)
user_audio_durations = [
duration for turn in turns if (duration := turn.user_audio_duration_ms) is not None
]
assistant_audio_durations = [
duration for turn in turns if (duration := turn.assistant_audio_duration_ms) is not None
]
total_user_audio_duration_ms = sum(user_audio_durations)
total_assistant_audio_duration_ms = sum(assistant_audio_durations)
transcription_provider_counts: dict[str, int] = {}
for turn in turns:
provider = (turn.story_patch or {}).get("transcription_provider") or "unknown"
transcription_provider_counts[provider] = transcription_provider_counts.get(provider, 0) + 1
failure_event_counts: dict[str, int] = {}
for event in events:
if event.status != "failed":
continue
failure_event_counts[event.event_type] = failure_event_counts.get(event.event_type, 0) + 1
transcript_confidences = [
confidence for turn in turns if (confidence := turn.transcript_confidence) is not None
]
intent_confidences = [
confidence for turn in turns if (confidence := turn.intent_confidence) is not None
]
turn_success_rate = (
round(successful_turns / total_turns, 4) if total_turns else 0.0
)
finalize_conversion_rate = (
round(finalized_sessions / total_sessions, 4) if total_sessions else 0.0
)
confirmation_request_rate = (
round(low_confidence_turns / total_turns, 4) if total_turns else 0.0
)
user_audio_turn_rate = round(uploaded_audio_turns / total_turns, 4) if total_turns else 0.0
assistant_audio_ready_rate = (
round(assistant_audio_ready_turns / successful_turns, 4) if successful_turns else 0.0
)
asr_attempts = uploaded_audio_turns + asr_failures
asr_success_rate = round(uploaded_audio_turns / asr_attempts, 4) if asr_attempts else 0.0
tts_attempts = assistant_audio_ready_turns + tts_failures
tts_success_rate = (
round(assistant_audio_ready_turns / tts_attempts, 4) if tts_attempts else 0.0
)
safety_intervention_rate = (
round(safety_interventions / total_turns, 4) if total_turns else 0.0
)
return VoiceSessionAnalyticsResponse(
window_days=days,
provider=provider_filter,
session_status=session_status_filter,
total_sessions=total_sessions,
attention_sessions=attention_sessions,
confirmation_attention_sessions=confirmation_attention_sessions,
safety_attention_sessions=safety_attention_sessions,
failed_attention_sessions=failed_attention_sessions,
active_sessions=active_sessions,
finalized_sessions=finalized_sessions,
abandoned_sessions=abandoned_sessions,
total_turns=total_turns,
successful_turns=successful_turns,
failed_turns=failed_turns,
asr_failures=asr_failures,
tts_failures=tts_failures,
low_confidence_turns=low_confidence_turns,
safety_interventions=safety_interventions,
text_fallback_turns=text_fallback_turns,
uploaded_audio_turns=uploaded_audio_turns,
user_audio_turn_rate=user_audio_turn_rate,
assistant_audio_ready_turns=assistant_audio_ready_turns,
assistant_audio_ready_rate=assistant_audio_ready_rate,
asr_success_rate=asr_success_rate,
tts_success_rate=tts_success_rate,
avg_transcript_confidence=(
round(sum(transcript_confidences) / len(transcript_confidences), 4)
if transcript_confidences
else 0.0
),
avg_intent_confidence=(
round(sum(intent_confidences) / len(intent_confidences), 4)
if intent_confidences
else 0.0
),
safety_intervention_rate=safety_intervention_rate,
failure_event_counts=failure_event_counts,
total_user_audio_duration_ms=total_user_audio_duration_ms,
avg_user_audio_duration_ms=(
round(total_user_audio_duration_ms / len(user_audio_durations), 2)
if user_audio_durations
else 0.0
),
total_assistant_audio_turns=len(assistant_audio_durations),
total_assistant_audio_duration_ms=total_assistant_audio_duration_ms,
avg_assistant_audio_duration_ms=(
round(total_assistant_audio_duration_ms / len(assistant_audio_durations), 2)
if assistant_audio_durations
else 0.0
),
transcription_provider_counts=transcription_provider_counts,
confirmation_request_rate=confirmation_request_rate,
turn_success_rate=turn_success_rate,
finalize_conversion_rate=finalize_conversion_rate,
)
async def create_voice_session_service(
request: VoiceSessionCreateRequest,
user_id: str,
db: AsyncSession,
) -> VoiceSessionSummaryResponse:
profile_id, universe_id = await validate_profile_and_universe(
request.child_profile_id,
request.universe_id,
user_id,
db,
)
session = VoiceSession(
user_id=user_id,
child_profile_id=profile_id,
universe_id=universe_id,
target_mode=request.target_mode,
status="draft",
story_state=_default_story_state(),
)
db.add(session)
await db.commit()
await db.refresh(session)
await _record_session_event(
db,
session_id=session.id,
turn_id=None,
event_type="session_created",
status="succeeded",
message="Voice co-creation session created.",
metadata={
"child_profile_id": session.child_profile_id,
"universe_id": session.universe_id,
"target_mode": session.target_mode,
},
)
await db.refresh(session)
return _session_to_summary(session)
async def get_voice_session_detail_service(
session_id: str,
user_id: str,
db: AsyncSession,
) -> VoiceSessionDetailResponse:
session = await _get_owned_session(db, session_id=session_id, user_id=user_id)
turns = (
await db.execute(
select(VoiceTurn)
.where(VoiceTurn.session_id == session.id)
.order_by(desc(VoiceTurn.turn_index))
.limit(10)
)
).scalars().all()
turns = list(reversed(turns))
events = (
await db.execute(
select(VoiceSessionEvent)
.where(VoiceSessionEvent.session_id == session.id)
.order_by(desc(VoiceSessionEvent.id))
.limit(50)
)
).scalars().all()
events = list(reversed(events))
latest_turn = turns[-1] if turns else None
summary = _session_to_summary(
session,
latest_turn=latest_turn,
total_turns=session.current_turn_index,
)
return VoiceSessionDetailResponse(
**summary.model_dump(),
recent_turns=[_turn_to_summary(turn) for turn in turns],
events=[
{
"id": event.id,
"session_id": event.session_id,
"turn_id": event.turn_id,
"event_type": event.event_type,
"status": event.status,
"message": event.message,
"event_metadata": event.event_metadata or {},
"created_at": event.created_at,
}
for event in events
],
)
async def create_voice_turn_from_text_service(
session_id: str,
request: VoiceTurnCreateFallbackRequest,
user_id: str,
db: AsyncSession,
) -> VoiceTurnAcceptedResponse:
session = await _get_owned_session(db, session_id=session_id, user_id=user_id)
transcript_text = request.transcript_text.strip()
session, turn = await _create_pending_turn(
db,
session=session,
transcript_text=transcript_text,
transcript_confidence=1.0,
transcription_provider="fallback",
user_audio_duration_ms=request.duration_ms,
)
status = await _process_pending_turn(
db,
session=session,
turn=turn,
transcript_text=transcript_text,
user_id=user_id,
)
return VoiceTurnAcceptedResponse(
turn_id=turn.id,
session_id=session.id,
status=status,
)
async def create_voice_turn_from_upload_service(
*,
session_id: str,
user_id: str,
audio_bytes: bytes,
file_name: str,
mime_type: str | None,
duration_ms: int | None,
transcript_hint: str | None,
db: AsyncSession,
) -> VoiceTurnUploadAcceptedResponse:
session = await _get_owned_session(db, session_id=session_id, user_id=user_id)
if session.status not in CONTINUABLE_SESSION_STATUSES:
raise HTTPException(
status_code=409,
detail="Voice session is not ready for another turn.",
)
await _ensure_no_pending_confirmation(db, session=session)
if not audio_bytes:
raise HTTPException(status_code=400, detail="上传音频为空,请重新录音后再试。")
if len(audio_bytes) > settings.voice_turn_max_upload_bytes:
raise HTTPException(
status_code=413,
detail="上传音频过大,请缩短单轮录音时长后再试。",
)
next_turn_index = session.current_turn_index + 1
user_audio_path = write_uploaded_user_audio(
session_id=session.id,
turn_index=next_turn_index,
file_name=file_name,
mime_type=mime_type,
audio_data=audio_bytes,
)
try:
transcription = await transcribe_voice_audio(
audio_bytes=audio_bytes,
file_name=file_name,
mime_type=mime_type,
transcript_hint=transcript_hint,
db=db,
user_id=user_id,
)
except HTTPException as exc:
session.last_error = str(exc.detail)
session.updated_at = _utcnow()
await db.commit()
await db.refresh(session)
await _record_session_event(
db,
session_id=session.id,
turn_id=None,
event_type="turn_transcription_failed",
status="failed",
message="Voice transcription failed before one turn could be created.",
metadata={
"mime_type": mime_type,
"audio_path": user_audio_path,
"error": str(exc.detail),
},
)
raise
session, turn = await _create_pending_turn(
db,
session=session,
transcript_text=transcription.transcript_text,
transcript_confidence=transcription.confidence,
transcription_provider=transcription.provider,
user_audio_path=user_audio_path,
user_audio_mime_type=mime_type,
user_audio_duration_ms=duration_ms,
)
status = await _process_pending_turn(
db,
session=session,
turn=turn,
transcript_text=transcription.transcript_text,
user_id=user_id,
)
return VoiceTurnUploadAcceptedResponse(
turn_id=turn.id,
session_id=session.id,
status=status,
transcription_provider=transcription.provider,
)
async def retry_voice_turn_service(
session_id: str,
turn_id: str,
user_id: str,
db: AsyncSession,
) -> VoiceTurnAcceptedResponse:
turn = await _get_owned_turn(
db,
session_id=session_id,
turn_id=turn_id,
user_id=user_id,
)
if turn.status != "failed":
raise HTTPException(status_code=409, detail="Only failed turns can be retried.")
if not turn.user_transcript:
raise HTTPException(status_code=409, detail="This turn has no transcript to retry.")
return await create_voice_turn_from_text_service(
session_id,
VoiceTurnCreateFallbackRequest(
transcript_text=turn.user_transcript,
duration_ms=turn.user_audio_duration_ms,
),
user_id,
db,
)
async def resolve_voice_turn_confirmation_service(
session_id: str,
turn_id: str,
request: VoiceTurnConfirmRequest,
user_id: str,
db: AsyncSession,
) -> VoiceTurnSummaryResponse:
session = await _get_owned_session(db, session_id=session_id, user_id=user_id)
turn = await _get_owned_turn(
db,
session_id=session_id,
turn_id=turn_id,
user_id=user_id,
)
if turn.turn_index != session.current_turn_index:
raise HTTPException(status_code=409, detail="Only the latest turn can be confirmed.")
if not _turn_has_pending_confirmation(turn):
raise HTTPException(status_code=409, detail="This turn does not need confirmation.")
if not turn.user_transcript:
raise HTTPException(status_code=409, detail="This turn has no transcript to confirm.")
patch = dict(turn.story_patch or {})
patch["requires_confirmation"] = False
patch["confirmation_state"] = "accepted" if request.action == "accept" else request.action
patch["confirmation_message"] = None
turn.story_patch = patch
turn.error_message = None
session.last_error = None
session.updated_at = _utcnow()
if request.action == "accept":
session.status = "processing_turn"
await db.commit()
await db.refresh(session)
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type="turn_confirmation_accepted",
status="succeeded",
message=(
"Parent confirmed the current interpretation "
"and allowed the story to continue."
),
metadata={"turn_index": turn.turn_index},
)
await _process_pending_turn(
db,
session=session,
turn=turn,
transcript_text=turn.user_transcript,
user_id=user_id,
)
await db.refresh(turn)
return _turn_to_summary(turn)
guidance_text = _confirmation_resolution_text(request.action)
turn.assistant_text = guidance_text
turn.assistant_audio_path = None
turn.assistant_audio_duration_ms = None
turn.status = "narrative_ready"
session.status = "waiting_user"
session.latest_assistant_text = guidance_text
await db.commit()
await db.refresh(session)
await db.refresh(turn)
await _record_session_event(
db,
session_id=session.id,
turn_id=turn.id,
event_type=f"turn_confirmation_{request.action}",
status="succeeded",
message="Pending confirmation was resolved without continuing the current transcript.",
metadata={"turn_index": turn.turn_index, "action": request.action},
)
return _turn_to_summary(turn)
async def retry_voice_turn_audio_service(
session_id: str,
turn_id: str,
user_id: str,
db: AsyncSession,
) -> VoiceTurnSummaryResponse:
turn = await _get_owned_turn(
db,
session_id=session_id,
turn_id=turn_id,
user_id=user_id,
)
if not turn.assistant_text:
raise HTTPException(status_code=409, detail="This turn has no assistant text to speak.")
if session_audio_exists(turn.assistant_audio_path):
raise HTTPException(status_code=409, detail="Assistant audio already exists for this turn.")
try:
audio_bytes = await text_to_speech(
turn.assistant_text,
db=db,
user_id=user_id,
)
saved_path = write_session_audio(
build_turn_assistant_audio_path(turn.session_id, turn.turn_index),
audio_bytes,
)
turn.assistant_audio_path = saved_path
turn.assistant_audio_duration_ms = None
if turn.status == "narrative_ready":
turn.status = "audio_ready"
await db.commit()
await db.refresh(turn)
await _record_session_event(
db,
session_id=turn.session_id,
turn_id=turn.id,
event_type="assistant_audio_retry_succeeded",
status="succeeded",
message="Assistant audio regenerated for one voice turn.",
metadata={"audio_path": saved_path},
)
except Exception as exc:
await _record_session_event(
db,
session_id=turn.session_id,
turn_id=turn.id,
event_type="assistant_audio_retry_failed",
status="failed",
message="Assistant audio retry failed.",
metadata={"error": str(exc)},
)
raise HTTPException(status_code=503, detail="语音补发失败,请稍后再试。") from exc
return _turn_to_summary(turn)
async def get_voice_turn_service(
session_id: str,
turn_id: str,
user_id: str,
db: AsyncSession,
) -> VoiceTurnSummaryResponse:
turn = await _get_owned_turn(
db,
session_id=session_id,
turn_id=turn_id,
user_id=user_id,
)
return _turn_to_summary(turn)
async def get_voice_turn_audio_service(
session_id: str,
turn_id: str,
user_id: str,
db: AsyncSession,
) -> bytes:
turn = await _get_owned_turn(
db,
session_id=session_id,
turn_id=turn_id,
user_id=user_id,
)
if not session_audio_exists(turn.assistant_audio_path):
raise HTTPException(status_code=404, detail="Voice turn audio not found")
return read_session_audio(turn.assistant_audio_path)
async def get_voice_turn_user_audio_service(
session_id: str,
turn_id: str,
user_id: str,
db: AsyncSession,
) -> tuple[bytes, str]:
turn = await _get_owned_turn(
db,
session_id=session_id,
turn_id=turn_id,
user_id=user_id,
)
if not session_audio_exists(turn.user_audio_path):
raise HTTPException(status_code=404, detail="Uploaded user audio not found")
return read_session_audio(turn.user_audio_path), (turn.user_audio_mime_type or "audio/webm")
async def finalize_voice_session_service(
session_id: str,
request: VoiceSessionFinalizeRequest,
user_id: str,
db: AsyncSession,
) -> VoiceSessionFinalizeResponse:
if not request.save_story:
raise HTTPException(
status_code=400,
detail="Voice session finalize requires save_story=true in Phase A.",
)
session = await _get_owned_session(db, session_id=session_id, user_id=user_id)
latest_turn = await _get_latest_turn(db, session_id=session.id)
if session.status in FINAL_SESSION_STATUSES:
raise HTTPException(status_code=409, detail="Voice session is already closed.")
if not _can_finalize_with_latest_turn(session, latest_turn):
raise HTTPException(status_code=409, detail="Voice session is not ready to finalize.")
session.status = "finalizing_story"
session.updated_at = _utcnow()
await db.commit()
await db.refresh(session)
await _record_session_event(
db,
session_id=session.id,
turn_id=None,
event_type="session_finalizing",
status="running",
message="Voice session is being finalized into a story.",
metadata={"generate_cover": request.generate_cover},
)
story_state = session.story_state or {}
narrative_segments = list(story_state.get("narrative_segments") or [])
final_story_text = "\n\n".join(
segment.strip() for segment in narrative_segments if segment.strip()
)
if not final_story_text:
raise HTTPException(status_code=409, detail="Voice session has no narrative to save.")
final_title = _build_final_story_title(session)
final_summary = _build_final_story_summary(session)
story_state = {
**story_state,
"final_summary": final_summary,
"final_title": final_title,
}
session.story_state = story_state
session.working_title = final_title
story_result = StoryOutput(
mode="generated",
title=final_title,
story_text=final_story_text,
cover_prompt_suggestion=(
(story_state.get("cover_prompt") or "") if request.generate_cover else ""
),
)
story = await create_story_from_result(
result=story_result,
user_id=user_id,
profile_id=session.child_profile_id,
universe_id=session.universe_id,
db=db,
)
generation_job_id: str | None = None
if request.generate_cover and story.cover_prompt:
try:
cover_job = await queue_story_asset_generation(
story.id,
user_id,
["image"],
db,
)
generation_job_id = str(cover_job["id"])
await _record_session_event(
db,
session_id=session.id,
turn_id=None,
event_type="session_cover_generation_queued",
status="succeeded",
message="Finalized story cover generation was queued after session save.",
metadata={"story_id": story.id, "generation_job_id": generation_job_id},
)
except HTTPException as exc:
await _record_session_event(
db,
session_id=session.id,
turn_id=None,
event_type="session_cover_generation_failed",
status="failed",
message="Finalized story cover generation failed before the worker could start.",
metadata={"story_id": story.id, "error": str(exc.detail)},
)
logger.warning(
"voice_session_finalize_cover_queue_failed",
session_id=session.id,
story_id=story.id,
error=str(exc.detail),
)
session.final_story_id = story.id
session.status = "completed"
session.last_error = None
session.updated_at = _utcnow()
await db.commit()
await db.refresh(session)
await _record_session_event(
db,
session_id=session.id,
turn_id=None,
event_type="session_saved_as_story",
status="succeeded",
message="Voice session finalized into a story.",
metadata={
"story_id": story.id,
"final_title": final_title,
"final_summary": final_summary,
},
)
return VoiceSessionFinalizeResponse(
session_id=session.id,
status=session.status,
story_id=story.id,
generation_job_id=generation_job_id,
)
async def abandon_voice_session_service(
session_id: str,
request: VoiceSessionAbandonRequest,
user_id: str,
db: AsyncSession,
) -> VoiceSessionSummaryResponse:
session = await _get_owned_session(db, session_id=session_id, user_id=user_id)
if session.status in FINAL_SESSION_STATUSES:
raise HTTPException(status_code=409, detail="Voice session is already closed.")
session.status = "abandoned"
session.last_error = request.reason
session.updated_at = _utcnow()
await db.commit()
await db.refresh(session)
await _record_session_event(
db,
session_id=session.id,
turn_id=None,
event_type="session_abandoned",
status="succeeded",
message="Voice session abandoned by the user.",
metadata={"reason": request.reason},
)
await db.refresh(session)
latest_turn = await _get_latest_turn(db, session_id=session.id)
return _session_to_summary(
session,
latest_turn=latest_turn,
total_turns=session.current_turn_index,
)