Module scenario.voice.tts
Text-to-speech router and cache.
The TTS side uses litellm-style provider/voice_name routing (e.g.
openai/nova, elevenlabs/rachel). Per the TTS cache key locked decision
the cache key is (text, voice) only; audio effects are applied AFTER a
cache hit and are never baked into the cached audio.
Security note: joblib serialises function arguments to disk as part of its
caching fingerprint. To keep raw user-supplied text out of the cache payload,
synthesize() hashes text to a stable SHA-256 digest before handing it
to the cached helper. The effective cache key is (sha256(text), voice) —
equivalent determinism, no plaintext at rest.
Users can register additional providers via register_tts_provider()(…).
The default set covers OpenAI (hard dep) and lazy-imports ElevenLabs /
Google / Cartesia only when their provider prefix is actually used.
Expand source code
"""
Text-to-speech router and cache.
The TTS side uses litellm-style ``provider/voice_name`` routing (e.g.
``openai/nova``, ``elevenlabs/rachel``). Per the TTS cache key locked decision
the cache key is ``(text, voice)`` only; audio effects are applied AFTER a
cache hit and are never baked into the cached audio.
Security note: joblib serialises function arguments to disk as part of its
caching fingerprint. To keep raw user-supplied text out of the cache payload,
``synthesize()`` hashes ``text`` to a stable SHA-256 digest before handing it
to the cached helper. The effective cache key is ``(sha256(text), voice)`` —
equivalent determinism, no plaintext at rest.
Users can register additional providers via ``register_tts_provider(...)``.
The default set covers OpenAI (hard dep) and lazy-imports ElevenLabs /
Google / Cartesia only when their provider prefix is actually used.
"""
from __future__ import annotations
import hashlib
from collections import OrderedDict
from typing import Awaitable, Callable, Dict, Tuple
from ..config.voice_models import OPENAI_TTS_MODEL
from .audio_chunk import AudioChunk, PCM16_SAMPLE_RATE
TTSCallable = Callable[[str, str], Awaitable[bytes]]
"""(text, voice_name) -> PCM16 @ 24kHz mono bytes"""
_PROVIDERS: Dict[str, TTSCallable] = {}
# In-process LRU cache keyed on (sha256(text), voice) → PCM16 bytes. Keeping
# the raw text out of the key avoids persisting user-supplied strings in any
# future on-disk layer (security review finding). Bounded to prevent
# unbounded memory growth in long-running processes — a 5-minute clip is
# ~14 MB, so 64 entries caps the cache at ~900 MB even for long utterances.
_CACHE_MAX_ENTRIES = 64
_CACHE: "OrderedDict[Tuple[str, str], bytes]" = OrderedDict()
def clear_cache() -> None:
"""Clear the in-process TTS cache. Used by tests and long-lived processes."""
_CACHE.clear()
def register_tts_provider(prefix: str, synth: TTSCallable) -> None:
"""Register a TTS backend under the given provider prefix."""
_PROVIDERS[prefix.lower()] = synth
def _split_voice(voice: str) -> Tuple[str, str]:
if "/" not in voice:
raise ValueError(
f"Voice string {voice!r} must be in 'provider/name' format, e.g. 'openai/nova'"
)
provider, name = voice.split("/", 1)
return provider.lower(), name
# ---------------------------------------------------------------- default TTS
async def _openai_tts(text: str, voice: str) -> bytes:
"""Default OpenAI TTS provider. Uses OPENAI_TTS_MODEL for short clips.
OpenAI's ``response_format="pcm"`` is documented as raw PCM16 @ 24kHz mono
— matching our internal AudioChunk. We validate the byte-length-is-even
invariant via ``AudioChunk.__post_init__`` when the result flows through
an AudioChunk; we also trim a trailing odd byte here so a single framing
glitch from the HTTP stream does not poison the cache.
"""
from openai import AsyncOpenAI
client = AsyncOpenAI()
response = await client.audio.speech.create(
model=OPENAI_TTS_MODEL,
voice=voice,
input=text,
response_format="pcm",
)
data = await response.aread()
if len(data) % 2 == 1:
# PCM16 is 2 bytes/sample; an odd trailing byte cannot be played.
data = data[:-1]
return data
async def _elevenlabs_tts(text: str, voice: str) -> bytes:
from elevenlabs.client import AsyncElevenLabs # type: ignore
client = AsyncElevenLabs()
# The convert() call returns an async iterator of PCM chunks directly —
# do NOT `await` it (that's a TypeError on async_generator).
#
# model_id="eleven_v3": v3 is the only EL model that honors inline
# paralinguistic markers like [shouting], [whispering], [sigh],
# [laughs] — the SDK default eleven_multilingual_v2 reads them as
# text, which surfaced in the angry_customer demo where the user
# simulator said the word "angry" aloud instead of sounding angry.
chunks: list[bytes] = []
async for chunk in client.text_to_speech.convert(
voice_id=voice,
text=text,
model_id="eleven_v3",
output_format="pcm_24000",
):
chunks.append(chunk)
return b"".join(chunks)
async def _google_tts(text: str, voice: str) -> bytes:
try:
from google.cloud import texttospeech # type: ignore
except ImportError as exc: # pragma: no cover — depends on host deps
raise ImportError(
"google provider requires `pip install google-cloud-texttospeech`"
) from exc
client = texttospeech.TextToSpeechAsyncClient()
synth_input = texttospeech.SynthesisInput(text=text)
voice_cfg = texttospeech.VoiceSelectionParams(
language_code="-".join(voice.split("-")[:2]) or "en-US",
name=voice,
)
audio_cfg = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.LINEAR16,
sample_rate_hertz=PCM16_SAMPLE_RATE,
)
resp = await client.synthesize_speech(
input=synth_input, voice=voice_cfg, audio_config=audio_cfg
)
return bytes(resp.audio_content)
async def _cartesia_tts(text: str, voice: str) -> bytes:
try:
from cartesia import AsyncCartesia # type: ignore
except ImportError as exc: # pragma: no cover — depends on host deps
raise ImportError(
"cartesia provider requires `pip install cartesia`"
) from exc
client = AsyncCartesia()
return await client.tts.bytes(
model_id="sonic-english",
transcript=text,
voice_id=voice,
output_format={
"container": "raw",
"encoding": "pcm_s16le",
"sample_rate": PCM16_SAMPLE_RATE,
},
)
register_tts_provider("openai", _openai_tts)
register_tts_provider("elevenlabs", _elevenlabs_tts)
register_tts_provider("google", _google_tts)
register_tts_provider("cartesia", _cartesia_tts)
# ------------------------------------------------------------ cached synthesis
async def _synthesize_raw(text: str, voice: str) -> bytes:
provider, name = _split_voice(voice)
if provider not in _PROVIDERS:
raise ValueError(
f"Unknown TTS provider {provider!r}. Known: {sorted(_PROVIDERS)}"
)
return await _PROVIDERS[provider](text, name)
def _hash_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
async def synthesize(text: str, voice: str) -> AudioChunk:
"""
Synthesize ``text`` into an AudioChunk using the voice provider.
Cache key is ``(sha256(text), voice)`` — equivalent to keying on
``(text, voice)`` but without pinning the raw text in the cache payload.
Effects must be applied by the caller on the returned chunk; they are
never part of the cache key.
"""
cache_key = (_hash_text(text), voice)
cached = _CACHE.get(cache_key)
if cached is not None:
_CACHE.move_to_end(cache_key) # LRU touch
return AudioChunk(data=cached, transcript=text)
pcm = await _synthesize_raw(text, voice)
_CACHE[cache_key] = pcm
_CACHE.move_to_end(cache_key)
while len(_CACHE) > _CACHE_MAX_ENTRIES:
_CACHE.popitem(last=False)
return AudioChunk(data=pcm, transcript=text)
Global variables
var TTSCallable-
(text, voice_name) -> PCM16 @ 24kHz mono bytes
Functions
def clear_cache() ‑> None-
Clear the in-process TTS cache. Used by tests and long-lived processes.
Expand source code
def clear_cache() -> None: """Clear the in-process TTS cache. Used by tests and long-lived processes.""" _CACHE.clear() def register_tts_provider(prefix: str, synth: TTSCallable) ‑> None-
Register a TTS backend under the given provider prefix.
Expand source code
def register_tts_provider(prefix: str, synth: TTSCallable) -> None: """Register a TTS backend under the given provider prefix.""" _PROVIDERS[prefix.lower()] = synth async def synthesize(text: str, voice: str) ‑> AudioChunk-
Synthesize
textinto an AudioChunk using the voice provider.Cache key is
(sha256(text), voice)— equivalent to keying on(text, voice)but without pinning the raw text in the cache payload. Effects must be applied by the caller on the returned chunk; they are never part of the cache key.Expand source code
async def synthesize(text: str, voice: str) -> AudioChunk: """ Synthesize ``text`` into an AudioChunk using the voice provider. Cache key is ``(sha256(text), voice)`` — equivalent to keying on ``(text, voice)`` but without pinning the raw text in the cache payload. Effects must be applied by the caller on the returned chunk; they are never part of the cache key. """ cache_key = (_hash_text(text), voice) cached = _CACHE.get(cache_key) if cached is not None: _CACHE.move_to_end(cache_key) # LRU touch return AudioChunk(data=cached, transcript=text) pcm = await _synthesize_raw(text, voice) _CACHE[cache_key] = pcm _CACHE.move_to_end(cache_key) while len(_CACHE) > _CACHE_MAX_ENTRIES: _CACHE.popitem(last=False) return AudioChunk(data=pcm, transcript=text)