Compare commits

...

3 Commits

9 changed files with 371 additions and 24 deletions

View File

@@ -21,7 +21,7 @@ def _is_terminal_status(status: str) -> bool:
def _job_supports_queue_control(job: GenerationJob) -> bool: def _job_supports_queue_control(job: GenerationJob) -> bool:
return job.output_mode in {"story", "storybook"} return job.output_mode in {"story", "storybook", "asset_generation"}
def generation_job_can_cancel(job: GenerationJob) -> bool: def generation_job_can_cancel(job: GenerationJob) -> bool:

View File

@@ -1241,6 +1241,9 @@ async def execute_generation_job_service(
"""Execute one previously accepted generation job inside the worker.""" """Execute one previously accepted generation job inside the worker."""
try: try:
if job.output_mode == "asset_generation":
response = await _generate_asset_generation_service_with_job(job, db)
else:
request = GenerationRequest.model_validate(job.request_payload or {}) request = GenerationRequest.model_validate(job.request_payload or {})
response = await _generate_generation_service_with_job( response = await _generate_generation_service_with_job(
request, request,
@@ -1294,6 +1297,39 @@ def _build_canceled_generation_response(job: GenerationJob) -> GenerationRespons
) )
def _build_generation_response_from_story(
story: Story,
*,
job_id: str,
) -> GenerationResponse:
"""Build a unified generation response from one persisted story record."""
pages = None
if story.mode == "storybook":
pages = _storybook_pages_to_response(story.pages or [])
return GenerationResponse(
id=story.id,
generation_job_id=job_id,
title=story.title,
mode=story.mode,
story_text=story.story_text,
pages=pages,
cover_prompt=story.cover_prompt,
image_url=story.image_url,
cover_url=story.image_url,
audio_ready=story.audio_status == StoryAssetStatus.READY.value,
generation_status=story.generation_status,
text_status=story.text_status,
image_status=story.image_status,
audio_status=story.audio_status,
last_error=story.last_error,
child_profile_id=story.child_profile_id,
universe_id=story.universe_id,
retryable_assets=story.retryable_assets,
)
async def run_generation_job_service( async def run_generation_job_service(
job_id: str, job_id: str,
db: AsyncSession, db: AsyncSession,
@@ -1309,6 +1345,54 @@ async def run_generation_job_service(
return job return job
async def _generate_asset_generation_service_with_job(
job: GenerationJob,
db: AsyncSession,
) -> GenerationResponse:
"""Run queued asset generation in the background worker."""
payload = job.request_payload or {}
story_id = payload.get("story_id") or job.story_id
requested_assets = list(dict.fromkeys(payload.get("assets") or []))
if story_id is None:
raise HTTPException(status_code=400, detail="资源任务缺少 story_id。")
if not requested_assets:
raise HTTPException(status_code=400, detail="资源任务缺少 assets。")
story = await get_story_detail(int(story_id), job.user_id, db)
if "image" in requested_assets:
if story.mode == "storybook":
await _complete_storybook_image_assets(story, db, job=job)
else:
await _complete_cover_image_asset(
story,
db,
raise_on_failure=True,
log_event="cover_generation_failed",
job=job,
)
if "audio" in requested_assets:
await _complete_audio_asset(
story,
db,
raise_on_failure=True,
job=job,
)
story = await get_story_detail(story.id, job.user_id, db)
await finish_generation_job(
db,
job=job,
story=story,
current_step="asset_generation_completed",
message="Asset generation completed in the background worker.",
metadata={"assets": requested_assets},
)
return _build_generation_response_from_story(story, job_id=job.id)
async def retry_generation_job_service( async def retry_generation_job_service(
job_id: str, job_id: str,
user_id: str, user_id: str,
@@ -1534,6 +1618,46 @@ async def create_story_from_result(
) )
async def queue_story_asset_generation(
story_id: int,
user_id: str,
assets: list[str],
db: AsyncSession,
) -> dict:
"""Queue one asset generation job for an already persisted story."""
await ensure_no_active_story_generation_job(db, story_id=story_id, user_id=user_id)
requested_assets = list(dict.fromkeys(assets))
if not requested_assets:
raise HTTPException(status_code=400, detail="至少需要一个待生成资源")
story = await get_story_detail(story_id, user_id, db)
if "image" in requested_assets:
has_image_prompt = bool(story.cover_prompt)
if story.mode == "storybook":
has_image_prompt = has_image_prompt or any(
isinstance(page, dict) and page.get("image_prompt")
for page in story.pages or []
)
if not has_image_prompt:
raise HTTPException(status_code=400, detail="当前故事没有可生成的图片提示词")
if "audio" in requested_assets and not story.story_text:
raise HTTPException(status_code=400, detail="当前故事没有可生成音频的正文")
job = await create_generation_job(
db,
user_id=user_id,
output_mode="asset_generation",
input_type=",".join(requested_assets),
request_payload={"story_id": story_id, "assets": requested_assets},
story_id=story_id,
)
await _dispatch_generation_job(db, job=job)
await db.refresh(job)
return generation_job_to_summary(job)
async def _retry_cover_image_asset(story: Story, db: AsyncSession, *, job=None) -> None: async def _retry_cover_image_asset(story: Story, db: AsyncSession, *, job=None) -> None:
"""Retry cover generation for a text story.""" """Retry cover generation for a text story."""

View File

@@ -31,7 +31,7 @@ from app.services.memory_service import build_enhanced_memory_context
from app.services.provider_router import generate_story_content, text_to_speech from app.services.provider_router import generate_story_content, text_to_speech
from app.services.story_service import ( from app.services.story_service import (
create_story_from_result, create_story_from_result,
generate_story_cover, queue_story_asset_generation,
validate_profile_and_universe, validate_profile_and_universe,
) )
from app.services.voice_session_safety import ( from app.services.voice_session_safety import (
@@ -1710,15 +1710,21 @@ async def finalize_voice_session_service(
generation_job_id: str | None = None generation_job_id: str | None = None
if request.generate_cover and story.cover_prompt: if request.generate_cover and story.cover_prompt:
try: try:
await generate_story_cover(story.id, user_id, db) cover_job = await queue_story_asset_generation(
story.id,
user_id,
["image"],
db,
)
generation_job_id = str(cover_job["id"])
await _record_session_event( await _record_session_event(
db, db,
session_id=session.id, session_id=session.id,
turn_id=None, turn_id=None,
event_type="session_cover_generation_succeeded", event_type="session_cover_generation_queued",
status="succeeded", status="succeeded",
message="Finalized story cover was generated after session save.", message="Finalized story cover generation was queued after session save.",
metadata={"story_id": story.id}, metadata={"story_id": story.id, "generation_job_id": generation_job_id},
) )
except HTTPException as exc: except HTTPException as exc:
await _record_session_event( await _record_session_event(
@@ -1727,11 +1733,11 @@ async def finalize_voice_session_service(
turn_id=None, turn_id=None,
event_type="session_cover_generation_failed", event_type="session_cover_generation_failed",
status="failed", status="failed",
message="Finalized story cover generation failed after session save.", message="Finalized story cover generation failed before the worker could start.",
metadata={"story_id": story.id, "error": str(exc.detail)}, metadata={"story_id": story.id, "error": str(exc.detail)},
) )
logger.warning( logger.warning(
"voice_session_finalize_cover_failed", "voice_session_finalize_cover_queue_failed",
session_id=session.id, session_id=session.id,
story_id=story.id, story_id=story.id,
error=str(exc.detail), error=str(exc.detail),

View File

@@ -8,7 +8,7 @@ from httpx import ASGITransport, AsyncClient
from sqlalchemy import select from sqlalchemy import select
from app.db.database import get_db from app.db.database import get_db
from app.db.models import GenerationJob, GenerationJobEvent from app.db.models import GenerationJob, GenerationJobEvent, Story
from app.main import app from app.main import app
from app.services.adapters import AdapterConfig from app.services.adapters import AdapterConfig
from app.services.adapters.storybook.primary import Storybook, StorybookPage from app.services.adapters.storybook.primary import Storybook, StorybookPage
@@ -20,7 +20,7 @@ from app.services.generation_jobs import (
mark_stale_generation_jobs, mark_stale_generation_jobs,
record_generation_event, record_generation_event,
) )
from app.services.story_service import run_generation_job_service from app.services.story_service import queue_story_asset_generation, run_generation_job_service
pytestmark = pytest.mark.asyncio pytestmark = pytest.mark.asyncio
@@ -225,6 +225,174 @@ async def test_asset_retry_records_job_events_and_updates_retryable_assets(
app.dependency_overrides.clear() app.dependency_overrides.clear()
async def test_queue_story_asset_generation_dispatches_background_job(
db_session,
test_story,
):
task_delay_path = "app.tasks.generation_workflow.run_generation_workflow_task.delay"
with patch(task_delay_path) as mock_delay:
summary = await queue_story_asset_generation(
test_story.id,
test_story.user_id,
["image"],
db_session,
)
assert summary["output_mode"] == "asset_generation"
assert summary["input_type"] == "image"
assert summary["status"] == "running"
assert summary["current_step"] == "request_accepted"
assert summary["can_cancel"] is True
assert summary["can_retry"] is False
mock_delay.assert_called_once_with(summary["id"])
job = (
await db_session.execute(
select(GenerationJob).where(GenerationJob.id == summary["id"])
)
).scalar_one()
assert job.story_id == test_story.id
assert job.output_mode == "asset_generation"
async def test_asset_generation_job_worker_completes_cover_image(
db_session,
test_story,
):
job = await create_generation_job(
db_session,
user_id=test_story.user_id,
output_mode="asset_generation",
input_type="image",
request_payload={"story_id": test_story.id, "assets": ["image"]},
story_id=test_story.id,
)
with patch(
"app.services.story_service.generate_image",
new_callable=AsyncMock,
) as mock_generate_image:
mock_generate_image.return_value = "https://example.com/async-cover.png"
await run_generation_job_service(job.id, db_session)
refreshed_job = (
await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id))
).scalar_one()
assert refreshed_job.status == "completed"
assert refreshed_job.current_step == "asset_generation_completed"
assert refreshed_job.result_snapshot["image_status"] == "ready"
story = (
await db_session.execute(
select(Story).where(Story.id == test_story.id)
)
).scalar_one()
assert story.image_url == "https://example.com/async-cover.png"
events = (
await db_session.execute(
select(GenerationJobEvent)
.where(GenerationJobEvent.job_id == job.id)
.order_by(GenerationJobEvent.id)
)
).scalars().all()
assert [event.event_type for event in events] == [
"request_accepted",
"worker_started",
"cover_image_started",
"cover_image_succeeded",
"asset_generation_completed",
]
async def test_cancel_queued_asset_generation_job_marks_it_canceled(
db_session,
auth_token,
degraded_story_with_text,
):
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
transport = ASGITransport(app=app)
job = await create_generation_job(
db_session,
user_id=degraded_story_with_text.user_id,
output_mode="asset_generation",
input_type="image",
request_payload={"story_id": degraded_story_with_text.id, "assets": ["image"]},
story_id=degraded_story_with_text.id,
)
try:
async with AsyncClient(transport=transport, base_url="http://test") as client:
client.cookies.set("access_token", auth_token)
response = await client.post(f"/api/generations/jobs/{job.id}/cancel")
assert response.status_code == 200
data = response.json()
assert data["status"] == "canceled"
assert data["current_step"] == "generation_canceled"
assert data["can_cancel"] is False
assert data["can_retry"] is True
finally:
app.dependency_overrides.clear()
async def test_retry_failed_asset_generation_job_requeues_new_worker_job(
db_session,
auth_token,
degraded_story_with_text,
):
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
transport = ASGITransport(app=app)
task_delay_path = "app.tasks.generation_workflow.run_generation_workflow_task.delay"
failed_job = await create_generation_job(
db_session,
user_id=degraded_story_with_text.user_id,
output_mode="asset_generation",
input_type="image",
request_payload={"story_id": degraded_story_with_text.id, "assets": ["image"]},
story_id=degraded_story_with_text.id,
)
await finish_generation_job(
db_session,
job=failed_job,
story=degraded_story_with_text,
status="failed",
current_step="asset_generation_failed",
error_message="cover timeout",
message="Cover image generation failed.",
)
try:
with patch(task_delay_path) as mock_delay:
async with AsyncClient(transport=transport, base_url="http://test") as client:
client.cookies.set("access_token", auth_token)
response = await client.post(f"/api/generations/jobs/{failed_job.id}/retry")
assert response.status_code == 200
data = response.json()
assert data["id"] != failed_job.id
assert data["output_mode"] == "asset_generation"
assert data["status"] == "running"
assert data["current_step"] == "retry_queued"
assert data["can_cancel"] is True
assert data["can_retry"] is False
mock_delay.assert_called_once_with(data["id"])
finally:
app.dependency_overrides.clear()
async def test_storybook_generation_is_queued_then_worker_records_page_image_events( async def test_storybook_generation_is_queued_then_worker_records_page_image_events(
db_session, db_session,
auth_token, auth_token,

View File

@@ -99,9 +99,9 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
new_callable=AsyncMock, new_callable=AsyncMock,
) as mock_tts, ) as mock_tts,
patch( patch(
"app.services.voice_session_service.generate_story_cover", "app.services.voice_session_service.queue_story_asset_generation",
new_callable=AsyncMock, new_callable=AsyncMock,
) as mock_generate_cover, ) as mock_queue_asset_generation,
): ):
mock_generate.side_effect = [ mock_generate.side_effect = [
StoryOutput( StoryOutput(
@@ -118,7 +118,23 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
), ),
] ]
mock_tts.side_effect = [b"turn-1-audio", b"turn-2-audio"] mock_tts.side_effect = [b"turn-1-audio", b"turn-2-audio"]
mock_generate_cover.return_value = "https://example.com/voice-cover.png" mock_queue_asset_generation.return_value = {
"id": "cover-job-123",
"story_id": 1,
"output_mode": "asset_generation",
"input_type": "image",
"status": "running",
"current_step": "request_accepted",
"progress_percent": 5,
"progress_label": "已接收请求",
"is_terminal": False,
"can_cancel": True,
"can_retry": False,
"result_snapshot": {},
"error_message": None,
"created_at": "2026-04-20T00:00:00Z",
"updated_at": "2026-04-20T00:00:00Z",
}
transport = ASGITransport(app=app) transport = ASGITransport(app=app)
try: try:
@@ -156,6 +172,7 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
finalize_data = response.json() finalize_data = response.json()
story_id = finalize_data["story_id"] story_id = finalize_data["story_id"]
assert finalize_data["status"] == "completed" assert finalize_data["status"] == "completed"
assert finalize_data["generation_job_id"] == "cover-job-123"
response = await client.get(f"/api/stories/{story_id}") response = await client.get(f"/api/stories/{story_id}")
assert response.status_code == 200 assert response.status_code == 200
@@ -172,7 +189,7 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
assert session_data["final_story_id"] == story_id assert session_data["final_story_id"] == story_id
assert session_data["can_continue"] is False assert session_data["can_continue"] is False
assert session_data["story_state"]["final_summary"] assert session_data["story_state"]["final_summary"]
mock_generate_cover.assert_awaited_once() mock_queue_asset_generation.assert_awaited_once()
finally: finally:
app.dependency_overrides.clear() app.dependency_overrides.clear()

View File

@@ -594,6 +594,11 @@ async def resolve_voice_turn_confirmation(...)
async def finalize_voice_session(...) async def finalize_voice_session(...)
``` ```
说明:
-`generate_cover=true` 且已产出 `cover_prompt`finalize 后会额外排队一个 `asset_generation` job
- 前端可通过返回的 `generation_job_id` 继续复用现有 generation trace / job detail 组件
### 获取语音共创 analytics ### 获取语音共创 analytics
```python ```python

View File

@@ -27,8 +27,10 @@
- 前端明确展示“本轮系统理解为”与“建议家长确认后再继续”提示 - 前端明确展示“本轮系统理解为”与“建议家长确认后再继续”提示
- 低置信度确认链路已有后端测试覆盖,可作为下一阶段继续接 ASR 与更细确认交互的基础 - 低置信度确认链路已有后端测试覆盖,可作为下一阶段继续接 ASR 与更细确认交互的基础
- 已新增用户转写安全检查、assistant 输出柔性改写与 `safety_flags` 事件记录 - 已新增用户转写安全检查、assistant 输出柔性改写与 `safety_flags` 事件记录
- finalize 会生成更稳定的标题/摘要,并在条件允许时自动衔接封面补全 - finalize 会生成更稳定的标题/摘要,并在条件允许时自动排队封面补全 job
- 已新增 `voice session analytics` 聚合指标,可跟踪 turn 成功率、ASR/TTS 失败、低置信度触发和 finalize 转化率 - 已新增 `voice session analytics` 聚合指标,可跟踪 turn 成功率、ASR/TTS 失败、低置信度触发和 finalize 转化率
- `voice session finalize` 现在会返回可追踪的 `generation_job_id`,让正式 Story 资产补全重新接回现有 generation trace 主干
- 语音共创触发的 `asset_generation` job 现在也支持沿用统一 generation job 的取消 / 重试控制
Phase A 的核心目标不是做“完全实时的语音陪伴”,而是验证以下最小闭环: Phase A 的核心目标不是做“完全实时的语音陪伴”,而是验证以下最小闭环:

View File

@@ -333,7 +333,13 @@ defineExpose({ refresh })
> >
<div class="flex items-center justify-between gap-2"> <div class="flex items-center justify-between gap-2">
<span class="text-sm font-semibold"> <span class="text-sm font-semibold">
{{ job.output_mode === 'asset_retry' ? '资源重试' : '内容生成' }} {{
job.output_mode === 'asset_retry'
? '资源重试'
: job.output_mode === 'asset_generation'
? '资源生成'
: '内容生成'
}}
</span> </span>
<span class="rounded-full border px-2 py-0.5 text-xs" :class="getJobStatusClass(job.status)"> <span class="rounded-full border px-2 py-0.5 text-xs" :class="getJobStatusClass(job.status)">
{{ getJobStatusLabel(job.status) }} {{ getJobStatusLabel(job.status) }}
@@ -353,7 +359,13 @@ defineExpose({ refresh })
<div class="flex flex-wrap items-center justify-between gap-3"> <div class="flex flex-wrap items-center justify-between gap-3">
<div> <div>
<div class="text-sm font-semibold"> <div class="text-sm font-semibold">
{{ activeJob.output_mode === 'asset_retry' ? '资源重试事件' : '生成事件' }} {{
activeJob.output_mode === 'asset_retry'
? '资源重试事件'
: activeJob.output_mode === 'asset_generation'
? '资源生成事件'
: '生成事件'
}}
</div> </div>
<div class="mt-1 text-xs" :class="mutedTextClass"> <div class="mt-1 text-xs" :class="mutedTextClass">
当前步骤{{ getEventLabel(activeJob.current_step) }} 当前步骤{{ getEventLabel(activeJob.current_step) }}

View File

@@ -16,6 +16,7 @@ import BaseButton from '../components/ui/BaseButton.vue'
import BaseCard from '../components/ui/BaseCard.vue' import BaseCard from '../components/ui/BaseCard.vue'
import BaseSelect from '../components/ui/BaseSelect.vue' import BaseSelect from '../components/ui/BaseSelect.vue'
import BaseTextarea from '../components/ui/BaseTextarea.vue' import BaseTextarea from '../components/ui/BaseTextarea.vue'
import GenerationTrace from '../components/GenerationTrace.vue'
import LoadingSpinner from '../components/ui/LoadingSpinner.vue' import LoadingSpinner from '../components/ui/LoadingSpinner.vue'
import EmptyState from '../components/ui/EmptyState.vue' import EmptyState from '../components/ui/EmptyState.vue'
import { import {
@@ -88,6 +89,8 @@ const finalStorySummary = computed(() => {
const value = activeSession.value?.story_state?.final_summary const value = activeSession.value?.story_state?.final_summary
return typeof value === 'string' ? value : null return typeof value === 'string' ? value : null
}) })
const finalStoryId = computed(() => activeSession.value?.final_story_id ?? null)
const finalStoryHasAssetWork = computed(() => Boolean(finalStoryId.value))
const turnSuccessRateLabel = computed(() => { const turnSuccessRateLabel = computed(() => {
if (!voiceAnalytics.value) return '0%' if (!voiceAnalytics.value) return '0%'
return `${Math.round(voiceAnalytics.value.turn_success_rate * 100)}%` return `${Math.round(voiceAnalytics.value.turn_success_rate * 100)}%`
@@ -859,6 +862,9 @@ onBeforeUnmount(() => {
<p v-if="finalStorySummary" class="mt-2 text-sm text-emerald-700"> <p v-if="finalStorySummary" class="mt-2 text-sm text-emerald-700">
摘要{{ finalStorySummary }} 摘要{{ finalStorySummary }}
</p> </p>
<p class="mt-2 text-sm text-emerald-700">
保存后的封面补全与后续资源任务会继续挂到正式故事的生成轨迹里
</p>
<div class="mt-3"> <div class="mt-3">
<BaseButton size="sm" variant="secondary" @click="viewFinalStory"> <BaseButton size="sm" variant="secondary" @click="viewFinalStory">
<BookOpenIcon class="h-4 w-4" /> <BookOpenIcon class="h-4 w-4" />
@@ -1088,6 +1094,13 @@ onBeforeUnmount(() => {
</div> </div>
<div class="space-y-6"> <div class="space-y-6">
<GenerationTrace
v-if="finalStoryHasAssetWork"
:story-id="finalStoryId"
title="正式故事资产轨迹"
description="语音共创保存后触发的封面补全、资源生成与后续恢复任务,会继续记录在这里。"
/>
<div class="rounded-2xl border border-gray-100 bg-white p-4"> <div class="rounded-2xl border border-gray-100 bg-white p-4">
<h3 class="font-semibold text-gray-900">故事状态快照</h3> <h3 class="font-semibold text-gray-900">故事状态快照</h3>
<pre class="mt-4 overflow-x-auto rounded-xl bg-gray-950 p-4 text-xs leading-6 text-emerald-200">{{ JSON.stringify(activeSession.story_state, null, 2) }}</pre> <pre class="mt-4 overflow-x-auto rounded-xl bg-gray-950 p-4 text-xs leading-6 text-emerald-200">{{ JSON.stringify(activeSession.story_state, null, 2) }}</pre>