feat: queue voice session cover generation jobs

This commit is contained in:
2026-04-20 17:05:01 +08:00
parent 4d7072fb66
commit df4f5f4cdd
6 changed files with 254 additions and 21 deletions

View File

@@ -1241,6 +1241,9 @@ async def execute_generation_job_service(
"""Execute one previously accepted generation job inside the worker.""" """Execute one previously accepted generation job inside the worker."""
try: try:
if job.output_mode == "asset_generation":
response = await _generate_asset_generation_service_with_job(job, db)
else:
request = GenerationRequest.model_validate(job.request_payload or {}) request = GenerationRequest.model_validate(job.request_payload or {})
response = await _generate_generation_service_with_job( response = await _generate_generation_service_with_job(
request, request,
@@ -1294,6 +1297,39 @@ def _build_canceled_generation_response(job: GenerationJob) -> GenerationRespons
) )
def _build_generation_response_from_story(
story: Story,
*,
job_id: str,
) -> GenerationResponse:
"""Build a unified generation response from one persisted story record."""
pages = None
if story.mode == "storybook":
pages = _storybook_pages_to_response(story.pages or [])
return GenerationResponse(
id=story.id,
generation_job_id=job_id,
title=story.title,
mode=story.mode,
story_text=story.story_text,
pages=pages,
cover_prompt=story.cover_prompt,
image_url=story.image_url,
cover_url=story.image_url,
audio_ready=story.audio_status == StoryAssetStatus.READY.value,
generation_status=story.generation_status,
text_status=story.text_status,
image_status=story.image_status,
audio_status=story.audio_status,
last_error=story.last_error,
child_profile_id=story.child_profile_id,
universe_id=story.universe_id,
retryable_assets=story.retryable_assets,
)
async def run_generation_job_service( async def run_generation_job_service(
job_id: str, job_id: str,
db: AsyncSession, db: AsyncSession,
@@ -1309,6 +1345,54 @@ async def run_generation_job_service(
return job return job
async def _generate_asset_generation_service_with_job(
job: GenerationJob,
db: AsyncSession,
) -> GenerationResponse:
"""Run queued asset generation in the background worker."""
payload = job.request_payload or {}
story_id = payload.get("story_id") or job.story_id
requested_assets = list(dict.fromkeys(payload.get("assets") or []))
if story_id is None:
raise HTTPException(status_code=400, detail="资源任务缺少 story_id。")
if not requested_assets:
raise HTTPException(status_code=400, detail="资源任务缺少 assets。")
story = await get_story_detail(int(story_id), job.user_id, db)
if "image" in requested_assets:
if story.mode == "storybook":
await _complete_storybook_image_assets(story, db, job=job)
else:
await _complete_cover_image_asset(
story,
db,
raise_on_failure=True,
log_event="cover_generation_failed",
job=job,
)
if "audio" in requested_assets:
await _complete_audio_asset(
story,
db,
raise_on_failure=True,
job=job,
)
story = await get_story_detail(story.id, job.user_id, db)
await finish_generation_job(
db,
job=job,
story=story,
current_step="asset_generation_completed",
message="Asset generation completed in the background worker.",
metadata={"assets": requested_assets},
)
return _build_generation_response_from_story(story, job_id=job.id)
async def retry_generation_job_service( async def retry_generation_job_service(
job_id: str, job_id: str,
user_id: str, user_id: str,
@@ -1534,6 +1618,46 @@ async def create_story_from_result(
) )
async def queue_story_asset_generation(
story_id: int,
user_id: str,
assets: list[str],
db: AsyncSession,
) -> dict:
"""Queue one asset generation job for an already persisted story."""
await ensure_no_active_story_generation_job(db, story_id=story_id, user_id=user_id)
requested_assets = list(dict.fromkeys(assets))
if not requested_assets:
raise HTTPException(status_code=400, detail="至少需要一个待生成资源")
story = await get_story_detail(story_id, user_id, db)
if "image" in requested_assets:
has_image_prompt = bool(story.cover_prompt)
if story.mode == "storybook":
has_image_prompt = has_image_prompt or any(
isinstance(page, dict) and page.get("image_prompt")
for page in story.pages or []
)
if not has_image_prompt:
raise HTTPException(status_code=400, detail="当前故事没有可生成的图片提示词")
if "audio" in requested_assets and not story.story_text:
raise HTTPException(status_code=400, detail="当前故事没有可生成音频的正文")
job = await create_generation_job(
db,
user_id=user_id,
output_mode="asset_generation",
input_type=",".join(requested_assets),
request_payload={"story_id": story_id, "assets": requested_assets},
story_id=story_id,
)
await _dispatch_generation_job(db, job=job)
await db.refresh(job)
return generation_job_to_summary(job)
async def _retry_cover_image_asset(story: Story, db: AsyncSession, *, job=None) -> None: async def _retry_cover_image_asset(story: Story, db: AsyncSession, *, job=None) -> None:
"""Retry cover generation for a text story.""" """Retry cover generation for a text story."""

View File

@@ -31,7 +31,7 @@ from app.services.memory_service import build_enhanced_memory_context
from app.services.provider_router import generate_story_content, text_to_speech from app.services.provider_router import generate_story_content, text_to_speech
from app.services.story_service import ( from app.services.story_service import (
create_story_from_result, create_story_from_result,
generate_story_cover, queue_story_asset_generation,
validate_profile_and_universe, validate_profile_and_universe,
) )
from app.services.voice_session_safety import ( from app.services.voice_session_safety import (
@@ -1710,15 +1710,21 @@ async def finalize_voice_session_service(
generation_job_id: str | None = None generation_job_id: str | None = None
if request.generate_cover and story.cover_prompt: if request.generate_cover and story.cover_prompt:
try: try:
await generate_story_cover(story.id, user_id, db) cover_job = await queue_story_asset_generation(
story.id,
user_id,
["image"],
db,
)
generation_job_id = str(cover_job["id"])
await _record_session_event( await _record_session_event(
db, db,
session_id=session.id, session_id=session.id,
turn_id=None, turn_id=None,
event_type="session_cover_generation_succeeded", event_type="session_cover_generation_queued",
status="succeeded", status="succeeded",
message="Finalized story cover was generated after session save.", message="Finalized story cover generation was queued after session save.",
metadata={"story_id": story.id}, metadata={"story_id": story.id, "generation_job_id": generation_job_id},
) )
except HTTPException as exc: except HTTPException as exc:
await _record_session_event( await _record_session_event(
@@ -1727,11 +1733,11 @@ async def finalize_voice_session_service(
turn_id=None, turn_id=None,
event_type="session_cover_generation_failed", event_type="session_cover_generation_failed",
status="failed", status="failed",
message="Finalized story cover generation failed after session save.", message="Finalized story cover generation failed before the worker could start.",
metadata={"story_id": story.id, "error": str(exc.detail)}, metadata={"story_id": story.id, "error": str(exc.detail)},
) )
logger.warning( logger.warning(
"voice_session_finalize_cover_failed", "voice_session_finalize_cover_queue_failed",
session_id=session.id, session_id=session.id,
story_id=story.id, story_id=story.id,
error=str(exc.detail), error=str(exc.detail),

View File

@@ -8,7 +8,7 @@ from httpx import ASGITransport, AsyncClient
from sqlalchemy import select from sqlalchemy import select
from app.db.database import get_db from app.db.database import get_db
from app.db.models import GenerationJob, GenerationJobEvent from app.db.models import GenerationJob, GenerationJobEvent, Story
from app.main import app from app.main import app
from app.services.adapters import AdapterConfig from app.services.adapters import AdapterConfig
from app.services.adapters.storybook.primary import Storybook, StorybookPage from app.services.adapters.storybook.primary import Storybook, StorybookPage
@@ -20,7 +20,7 @@ from app.services.generation_jobs import (
mark_stale_generation_jobs, mark_stale_generation_jobs,
record_generation_event, record_generation_event,
) )
from app.services.story_service import run_generation_job_service from app.services.story_service import queue_story_asset_generation, run_generation_job_service
pytestmark = pytest.mark.asyncio pytestmark = pytest.mark.asyncio
@@ -225,6 +225,86 @@ async def test_asset_retry_records_job_events_and_updates_retryable_assets(
app.dependency_overrides.clear() app.dependency_overrides.clear()
async def test_queue_story_asset_generation_dispatches_background_job(
db_session,
test_story,
):
task_delay_path = "app.tasks.generation_workflow.run_generation_workflow_task.delay"
with patch(task_delay_path) as mock_delay:
summary = await queue_story_asset_generation(
test_story.id,
test_story.user_id,
["image"],
db_session,
)
assert summary["output_mode"] == "asset_generation"
assert summary["input_type"] == "image"
assert summary["status"] == "running"
assert summary["current_step"] == "request_accepted"
mock_delay.assert_called_once_with(summary["id"])
job = (
await db_session.execute(
select(GenerationJob).where(GenerationJob.id == summary["id"])
)
).scalar_one()
assert job.story_id == test_story.id
assert job.output_mode == "asset_generation"
async def test_asset_generation_job_worker_completes_cover_image(
db_session,
test_story,
):
job = await create_generation_job(
db_session,
user_id=test_story.user_id,
output_mode="asset_generation",
input_type="image",
request_payload={"story_id": test_story.id, "assets": ["image"]},
story_id=test_story.id,
)
with patch(
"app.services.story_service.generate_image",
new_callable=AsyncMock,
) as mock_generate_image:
mock_generate_image.return_value = "https://example.com/async-cover.png"
await run_generation_job_service(job.id, db_session)
refreshed_job = (
await db_session.execute(select(GenerationJob).where(GenerationJob.id == job.id))
).scalar_one()
assert refreshed_job.status == "completed"
assert refreshed_job.current_step == "asset_generation_completed"
assert refreshed_job.result_snapshot["image_status"] == "ready"
story = (
await db_session.execute(
select(Story).where(Story.id == test_story.id)
)
).scalar_one()
assert story.image_url == "https://example.com/async-cover.png"
events = (
await db_session.execute(
select(GenerationJobEvent)
.where(GenerationJobEvent.job_id == job.id)
.order_by(GenerationJobEvent.id)
)
).scalars().all()
assert [event.event_type for event in events] == [
"request_accepted",
"worker_started",
"cover_image_started",
"cover_image_succeeded",
"asset_generation_completed",
]
async def test_storybook_generation_is_queued_then_worker_records_page_image_events( async def test_storybook_generation_is_queued_then_worker_records_page_image_events(
db_session, db_session,
auth_token, auth_token,

View File

@@ -99,9 +99,9 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
new_callable=AsyncMock, new_callable=AsyncMock,
) as mock_tts, ) as mock_tts,
patch( patch(
"app.services.voice_session_service.generate_story_cover", "app.services.voice_session_service.queue_story_asset_generation",
new_callable=AsyncMock, new_callable=AsyncMock,
) as mock_generate_cover, ) as mock_queue_asset_generation,
): ):
mock_generate.side_effect = [ mock_generate.side_effect = [
StoryOutput( StoryOutput(
@@ -118,7 +118,23 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
), ),
] ]
mock_tts.side_effect = [b"turn-1-audio", b"turn-2-audio"] mock_tts.side_effect = [b"turn-1-audio", b"turn-2-audio"]
mock_generate_cover.return_value = "https://example.com/voice-cover.png" mock_queue_asset_generation.return_value = {
"id": "cover-job-123",
"story_id": 1,
"output_mode": "asset_generation",
"input_type": "image",
"status": "running",
"current_step": "request_accepted",
"progress_percent": 5,
"progress_label": "已接收请求",
"is_terminal": False,
"can_cancel": True,
"can_retry": False,
"result_snapshot": {},
"error_message": None,
"created_at": "2026-04-20T00:00:00Z",
"updated_at": "2026-04-20T00:00:00Z",
}
transport = ASGITransport(app=app) transport = ASGITransport(app=app)
try: try:
@@ -156,6 +172,7 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
finalize_data = response.json() finalize_data = response.json()
story_id = finalize_data["story_id"] story_id = finalize_data["story_id"]
assert finalize_data["status"] == "completed" assert finalize_data["status"] == "completed"
assert finalize_data["generation_job_id"] == "cover-job-123"
response = await client.get(f"/api/stories/{story_id}") response = await client.get(f"/api/stories/{story_id}")
assert response.status_code == 200 assert response.status_code == 200
@@ -172,7 +189,7 @@ async def test_voice_session_correct_turn_and_finalize_to_story(
assert session_data["final_story_id"] == story_id assert session_data["final_story_id"] == story_id
assert session_data["can_continue"] is False assert session_data["can_continue"] is False
assert session_data["story_state"]["final_summary"] assert session_data["story_state"]["final_summary"]
mock_generate_cover.assert_awaited_once() mock_queue_asset_generation.assert_awaited_once()
finally: finally:
app.dependency_overrides.clear() app.dependency_overrides.clear()

View File

@@ -594,6 +594,11 @@ async def resolve_voice_turn_confirmation(...)
async def finalize_voice_session(...) async def finalize_voice_session(...)
``` ```
说明:
-`generate_cover=true` 且已产出 `cover_prompt`finalize 后会额外排队一个 `asset_generation` job
- 前端可通过返回的 `generation_job_id` 继续复用现有 generation trace / job detail 组件
### 获取语音共创 analytics ### 获取语音共创 analytics
```python ```python

View File

@@ -27,8 +27,9 @@
- 前端明确展示“本轮系统理解为”与“建议家长确认后再继续”提示 - 前端明确展示“本轮系统理解为”与“建议家长确认后再继续”提示
- 低置信度确认链路已有后端测试覆盖,可作为下一阶段继续接 ASR 与更细确认交互的基础 - 低置信度确认链路已有后端测试覆盖,可作为下一阶段继续接 ASR 与更细确认交互的基础
- 已新增用户转写安全检查、assistant 输出柔性改写与 `safety_flags` 事件记录 - 已新增用户转写安全检查、assistant 输出柔性改写与 `safety_flags` 事件记录
- finalize 会生成更稳定的标题/摘要,并在条件允许时自动衔接封面补全 - finalize 会生成更稳定的标题/摘要,并在条件允许时自动排队封面补全 job
- 已新增 `voice session analytics` 聚合指标,可跟踪 turn 成功率、ASR/TTS 失败、低置信度触发和 finalize 转化率 - 已新增 `voice session analytics` 聚合指标,可跟踪 turn 成功率、ASR/TTS 失败、低置信度触发和 finalize 转化率
- `voice session finalize` 现在会返回可追踪的 `generation_job_id`,让正式 Story 资产补全重新接回现有 generation trace 主干
Phase A 的核心目标不是做“完全实时的语音陪伴”,而是验证以下最小闭环: Phase A 的核心目标不是做“完全实时的语音陪伴”,而是验证以下最小闭环: