"""OpenAI ASR adapter.""" from __future__ import annotations from io import BytesIO from fastapi import HTTPException from openai import AsyncOpenAI from app.core.logging import get_logger from app.services.adapters.asr.models import TranscriptionOutput from app.services.adapters.base import BaseAdapter from app.services.adapters.registry import AdapterRegistry logger = get_logger(__name__) @AdapterRegistry.register("asr", "openai_asr") class OpenAIASRAdapter(BaseAdapter[TranscriptionOutput]): """Transcribe uploaded voice turn audio with OpenAI audio transcription.""" adapter_type = "asr" adapter_name = "openai_asr" async def execute( self, audio_bytes: bytes, file_name: str | None = None, mime_type: str | None = None, transcript_hint: str | None = None, language: str | None = None, **kwargs, ) -> TranscriptionOutput: if not self.config.api_key: raise HTTPException( status_code=503, detail="OPENAI_API_KEY 未配置,无法使用 OpenAI 语音转写。", ) client = AsyncOpenAI(api_key=self.config.api_key) audio_file = BytesIO(audio_bytes) audio_file.name = file_name or "voice-turn.webm" prompt = transcript_hint.strip() if transcript_hint else None model = self.config.model or "gpt-4o-mini-transcribe" try: response = await client.audio.transcriptions.create( model=model, file=audio_file, language=language, prompt=prompt, ) except Exception as exc: logger.warning("openai_asr_failed", error=str(exc)) raise HTTPException( status_code=503, detail="语音转写服务暂时不可用,请稍后重试。", ) from exc transcript_text = (getattr(response, "text", "") or "").strip() if not transcript_text: raise HTTPException(status_code=502, detail="语音转写结果为空,请重试。") return TranscriptionOutput( transcript_text=transcript_text, confidence=None, provider=self.adapter_name, ) async def health_check(self) -> bool: return bool(self.config.api_key) @property def estimated_cost(self) -> float: return 0.006