Module scenario.voice

Voice agent support for Scenario.

Per the source proposal (§1): voice testing uses the same run() entrypoint, the same script DSL, and the same judge. What changes is the medium — audio instead of text.

Public surface: - VoiceAgentAdapter — base class for voice-capable agents - AudioChunk — canonical internal audio (PCM16 @ 24kHz mono) - AdapterCapabilities / UnsupportedCapabilityError — capability matrix - FirstChunkTimeoutError — attributable first-chunk recv timeout - VoiceRecording / VoiceEvent / LatencyMetrics — result-side types - AudioSegment — per-speaker slice of the recording - synthesize / STTProvider / set_stt_provider / get_stt_provider — TTS + STT plumbing - transcribe_segments — post-hoc STT over a VoiceRecording (judge fallback path for non-multimodal judges) - WebRTCVadFallback — SDK-side VAD for adapters without native VAD - create_audio_message / extract_audio / message_has_audio — message helpers

Expand source code
"""
Voice agent support for Scenario.

Per the source proposal (§1): voice testing uses the same ``scenario.run()``
entrypoint, the same script DSL, and the same judge. What changes is the
medium — audio instead of text.

Public surface:
    - VoiceAgentAdapter — base class for voice-capable agents
    - AudioChunk — canonical internal audio (PCM16 @ 24kHz mono)
    - AdapterCapabilities / UnsupportedCapabilityError — capability matrix
    - FirstChunkTimeoutError — attributable first-chunk recv timeout
    - VoiceRecording / VoiceEvent / LatencyMetrics — result-side types
    - AudioSegment — per-speaker slice of the recording
    - synthesize / STTProvider / set_stt_provider / get_stt_provider —
      TTS + STT plumbing
    - transcribe_segments — post-hoc STT over a VoiceRecording (judge
      fallback path for non-multimodal judges)
    - WebRTCVadFallback — SDK-side VAD for adapters without native VAD
    - create_audio_message / extract_audio / message_has_audio — message helpers
"""

from __future__ import annotations

from .adapter import FirstChunkTimeoutError, VoiceAgentAdapter
from .adapters import (
    ComposableVoiceAgent,
    ElevenLabsAgentAdapter,
    ElevenLabsVoiceAgent,
    GeminiLiveAgentAdapter,
    LiveKitAgentAdapter,
    OpenAIRealtimeAgentAdapter,
    PipecatAgentAdapter,
    TwilioAgentAdapter,
    VapiAgentAdapter,
    WebRTCAgentAdapter,
    WebSocketAgentAdapter,
    WebSocketProtocol,
)
from .audio_chunk import AudioChunk, silent_chunk
from .capabilities import AdapterCapabilities, UnsupportedCapabilityError
from .interruption import CONTEXTUAL_PROMPT, InterruptionConfig
from .messages import create_audio_message, extract_audio, message_has_audio
from .recording import AudioSegment, LatencyMetrics, VoiceEvent, VoiceRecording
from .stt import (
    ElevenLabsSTTProvider,
    OpenAISTTProvider,
    STTProvider,
    get_stt_provider,
    set_stt_provider,
    transcribe,
)
from ._transcribe import transcribe_segments
from .tts import register_tts_provider, synthesize
from .vad import WebRTCVadFallback

__all__ = [
    "AdapterCapabilities",
    "AudioChunk",
    "AudioSegment",
    "CONTEXTUAL_PROMPT",
    "ComposableVoiceAgent",
    "ElevenLabsAgentAdapter",
    "ElevenLabsSTTProvider",
    "ElevenLabsVoiceAgent",
    "FirstChunkTimeoutError",
    "GeminiLiveAgentAdapter",
    "InterruptionConfig",
    "LatencyMetrics",
    "LiveKitAgentAdapter",
    "OpenAIRealtimeAgentAdapter",
    "OpenAISTTProvider",
    "PipecatAgentAdapter",
    "STTProvider",
    "TwilioAgentAdapter",
    "UnsupportedCapabilityError",
    "VapiAgentAdapter",
    "VoiceAgentAdapter",
    "VoiceEvent",
    "VoiceRecording",
    "WebRTCAgentAdapter",
    "WebRTCVadFallback",
    "WebSocketAgentAdapter",
    "WebSocketProtocol",
    "create_audio_message",
    "extract_audio",
    "get_stt_provider",
    "message_has_audio",
    "register_tts_provider",
    "set_stt_provider",
    "silent_chunk",
    "synthesize",
    "transcribe",
    "transcribe_segments",
]

Sub-modules

scenario.voice.adapter

VoiceAgentAdapter — base class for voice-capable agents …

scenario.voice.adapters

Platform-specific voice adapters (Phase 2) …

scenario.voice.audio_chunk

AudioChunk — the canonical internal audio representation …

scenario.voice.capabilities

Adapter capability matrix …

scenario.voice.effects

Audio effects pipeline for the voice user simulator (§4.5) …

scenario.voice.interruption

Interruption configuration for proceed(interruptions=…) …

scenario.voice.messages

Audio message helpers …

scenario.voice.playback

Local audio playback via the bundled ffmpeg binary …

scenario.voice.recording

Voice recording, timeline events, and latency metrics …

scenario.voice.script_steps

Voice-specific script steps: sleep, silence, audio, dtmf, interrupt …

scenario.voice.stt

Speech-to-text: pluggable STTProvider interface with an OpenAI default …

scenario.voice.testing

Test-harness helpers for voice adapters that need public HTTP/S endpoints (webhooks, WebSockets) — specifically TwilioAgentAdapter

scenario.voice.tts

Text-to-speech router and cache …

scenario.voice.vad

Voice-activity detection fallback for adapters without native VAD …

Functions

def create_audio_message(chunk: AudioChunk, role: str = 'user') ‑> openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam

Turn an AudioChunk into an OpenAI-compatible message.

The content is a list with an input_audio part carrying base64-encoded WAV. If the chunk has a transcript, it is added as a text part alongside the audio — this is what lets the judge's text-only path still read the content.

Audio travels cleanly in any role (user or assistant) per the locked design — there is no forceUserRole workaround.

Expand source code
def create_audio_message(
    chunk: AudioChunk,
    role: str = "user",
) -> ChatCompletionMessageParam:
    """
    Turn an AudioChunk into an OpenAI-compatible message.

    The content is a list with an input_audio part carrying base64-encoded WAV.
    If the chunk has a transcript, it is added as a text part alongside the
    audio — this is what lets the judge's text-only path still read the content.

    Audio travels cleanly in any role (user or assistant) per the locked design
    — there is no forceUserRole workaround.
    """
    wav = _pcm16_to_wav_bytes(chunk.data)
    b64 = base64.b64encode(wav).decode()
    parts: list[dict[str, Any]] = [
        {
            "type": "input_audio",
            "input_audio": {"data": b64, "format": "wav"},
        }
    ]
    if chunk.transcript:
        parts.insert(0, {"type": "text", "text": chunk.transcript})
    return cast(ChatCompletionMessageParam, {"role": role, "content": parts})
def extract_audio(message: ChatCompletionMessageParam) ‑> AudioChunk | None

Pull the first audio chunk out of an OpenAI-format message.

Returns None if the message has no audio content part. Accepts both 'input_audio' (OpenAI API convention) and 'audio' (alternate providers).

Expand source code
def extract_audio(message: ChatCompletionMessageParam) -> Optional[AudioChunk]:
    """
    Pull the first audio chunk out of an OpenAI-format message.

    Returns None if the message has no audio content part. Accepts both
    'input_audio' (OpenAI API convention) and 'audio' (alternate providers).
    """
    content = message.get("content") if isinstance(message, dict) else None
    if not isinstance(content, list):
        return None

    transcript: Optional[str] = None
    for part in content:
        if not isinstance(part, dict):
            continue
        if part.get("type") == "text":
            transcript = part.get("text") or transcript
        if part.get("type") in ("input_audio", "audio"):
            data_obj = part.get("input_audio") or part.get("audio") or {}
            b64 = data_obj.get("data") if isinstance(data_obj, dict) else None
            if not b64:
                continue
            raw = base64.b64decode(b64)
            # We expect WAV by convention. If it's raw PCM, bytes pass through.
            if raw[:4] == b"RIFF":
                pcm = _wav_bytes_to_pcm16(raw)
            else:
                pcm = raw
            return AudioChunk(data=pcm, transcript=transcript)
    return None
def get_stt_provider() ‑> STTProvider
Expand source code
def get_stt_provider() -> STTProvider:
    return _provider
def message_has_audio(message: ChatCompletionMessageParam) ‑> bool

True if the message contains any audio content part.

Expand source code
def message_has_audio(message: ChatCompletionMessageParam) -> bool:
    """True if the message contains any audio content part."""
    return extract_audio(message) is not None
def register_tts_provider(prefix: str, synth: TTSCallable) ‑> None

Register a TTS backend under the given provider prefix.

Expand source code
def register_tts_provider(prefix: str, synth: TTSCallable) -> None:
    """Register a TTS backend under the given provider prefix."""
    _PROVIDERS[prefix.lower()] = synth
def set_stt_provider(provider: STTProvider) ‑> None

Install a custom STT provider. Invoked by scenario.configure(stt=…).

Expand source code
def set_stt_provider(provider: STTProvider) -> None:
    """Install a custom STT provider. Invoked by scenario.configure(stt=...)."""
    global _provider
    _provider = provider
def silent_chunk(duration_seconds: float) ‑> AudioChunk

Generate a PCM16 silent AudioChunk of the given duration.

Expand source code
def silent_chunk(duration_seconds: float) -> AudioChunk:
    """Generate a PCM16 silent AudioChunk of the given duration."""
    num_samples = int(duration_seconds * PCM16_SAMPLE_RATE)
    return AudioChunk(data=b"\x00\x00" * num_samples)
async def synthesize(text: str, voice: str) ‑> AudioChunk

Synthesize text into an AudioChunk using the voice provider.

Cache key is (sha256(text), voice) — equivalent to keying on (text, voice) but without pinning the raw text in the cache payload. Effects must be applied by the caller on the returned chunk; they are never part of the cache key.

Expand source code
async def synthesize(text: str, voice: str) -> AudioChunk:
    """
    Synthesize ``text`` into an AudioChunk using the voice provider.

    Cache key is ``(sha256(text), voice)`` — equivalent to keying on
    ``(text, voice)`` but without pinning the raw text in the cache payload.
    Effects must be applied by the caller on the returned chunk; they are
    never part of the cache key.
    """
    cache_key = (_hash_text(text), voice)
    cached = _CACHE.get(cache_key)
    if cached is not None:
        _CACHE.move_to_end(cache_key)  # LRU touch
        return AudioChunk(data=cached, transcript=text)
    pcm = await _synthesize_raw(text, voice)
    _CACHE[cache_key] = pcm
    _CACHE.move_to_end(cache_key)
    while len(_CACHE) > _CACHE_MAX_ENTRIES:
        _CACHE.popitem(last=False)
    return AudioChunk(data=pcm, transcript=text)
async def transcribe(audio: AudioChunk) ‑> str

Convenience wrapper around the globally configured provider.

Expand source code
async def transcribe(audio: AudioChunk) -> str:
    """Convenience wrapper around the globally configured provider."""
    if audio.transcript:
        return audio.transcript
    return await _provider.transcribe(audio)
async def transcribe_segments(recording: VoiceRecording, provider: Optional[STTProvider] = None, only_missing: bool = True) ‑> None

Run STT over recording.segments, mutating .transcript in place.

Args

recording
VoiceRecording to enrich.
provider
STTProvider to use. Defaults to scenario.configure-set provider (OpenAI by default). Pass explicitly to override.
only_missing
If True (default), skip segments whose transcript is already set. If False, re-transcribe everything (e.g. to overwrite adapter-side STT with a different provider).

Concurrency: transcribes segments concurrently with asyncio.gather. Each segment's STT call is independent. Empty-data segments are skipped.

Errors are caught per-segment and logged as warnings; never raised. Result: transcript stays None on failed segments, caller sees partial coverage.

Expand source code
async def transcribe_segments(
    recording: VoiceRecording,
    provider: Optional[STTProvider] = None,
    only_missing: bool = True,
) -> None:
    """
    Run STT over recording.segments, mutating .transcript in place.

    Args:
        recording: VoiceRecording to enrich.
        provider: STTProvider to use. Defaults to scenario.configure-set provider
            (OpenAI by default). Pass explicitly to override.
        only_missing: If True (default), skip segments whose transcript is
            already set. If False, re-transcribe everything (e.g. to overwrite
            adapter-side STT with a different provider).

    Concurrency: transcribes segments concurrently with asyncio.gather. Each
    segment's STT call is independent. Empty-data segments are skipped.

    Errors are caught per-segment and logged as warnings; never raised. Result:
    transcript stays None on failed segments, caller sees partial coverage.
    """
    if not recording.segments:
        return
    p = provider or _try_get_provider()
    if p is None:
        return  # already warned
    targets = [
        s for s in recording.segments
        if s.audio and (not only_missing or s.transcript is None)
    ]
    if not targets:
        return
    await asyncio.gather(*(_transcribe_one(p, s) for s in targets))

Classes

class AdapterCapabilities (streaming_transcripts: bool = False, native_vad: bool = False, dtmf: bool = False, interruption: bool = False, input_formats: List[str] = <factory>, output_formats: List[str] = <factory>)

Declaration of what a voice adapter can and cannot do.

Attributes

streaming_transcripts
True if the adapter emits incremental transcript updates as the agent speaks. Required for interrupt(after_words=N).
native_vad
True if the adapter itself provides voice-activity-detection events (user_start_speaking / user_stop_speaking). When False, the SDK falls back to webrtcvad on the incoming audio stream.
dtmf
True if the adapter can transmit DTMF tones (telephony).
interruption
True if the adapter can send a first-class interrupt signal to the agent under test (e.g., Twilio clear, OpenAI Realtime response.cancel). When True, interrupt() uses the signal path; when False, it falls back to timing-based barge-in (audio sent over the wire while the agent is speaking, which the SUT detects via VAD).
input_formats
Wire formats the adapter can accept from the SDK for outgoing user audio (e.g., ["pcm16/24000", "mulaw/8000"]).
output_formats
Wire formats the adapter emits for incoming agent audio. The SDK converts these to internal PCM16/24000 mono.
Expand source code
@dataclass(frozen=True)
class AdapterCapabilities:
    """
    Declaration of what a voice adapter can and cannot do.

    Attributes:
        streaming_transcripts: True if the adapter emits incremental transcript
            updates as the agent speaks. Required for interrupt(after_words=N).
        native_vad: True if the adapter itself provides voice-activity-detection
            events (user_start_speaking / user_stop_speaking). When False, the
            SDK falls back to webrtcvad on the incoming audio stream.
        dtmf: True if the adapter can transmit DTMF tones (telephony).
        interruption: True if the adapter can send a first-class interrupt
            signal to the agent under test (e.g., Twilio ``clear``, OpenAI
            Realtime ``response.cancel``). When True, ``scenario.interrupt()``
            uses the signal path; when False, it falls back to timing-based
            barge-in (audio sent over the wire while the agent is speaking,
            which the SUT detects via VAD).
        input_formats: Wire formats the adapter can accept from the SDK for
            outgoing user audio (e.g., ["pcm16/24000", "mulaw/8000"]).
        output_formats: Wire formats the adapter emits for incoming agent
            audio. The SDK converts these to internal PCM16/24000 mono.
    """

    streaming_transcripts: bool = False
    native_vad: bool = False
    dtmf: bool = False
    interruption: bool = False
    input_formats: List[str] = field(default_factory=list)
    output_formats: List[str] = field(default_factory=list)

Instance variables

var dtmf : bool
var input_formats : List[str]
var interruption : bool
var native_vad : bool
var output_formats : List[str]
var streaming_transcripts : bool
class AudioChunk (data: bytes, transcript: Optional[str] = None, start_time: Optional[float] = None, end_time: Optional[float] = None)

A chunk of audio in the canonical internal format: PCM16, 24kHz, mono.

Attributes

data
Raw PCM16 little-endian bytes, mono, sample rate = 24000 Hz.
transcript
Optional transcript text (may be populated by streaming STT).
start_time
Optional wall-clock offset from scenario start, in seconds.
end_time
Optional wall-clock offset from scenario start, in seconds.
Expand source code
@dataclass
class AudioChunk:
    """
    A chunk of audio in the canonical internal format: PCM16, 24kHz, mono.

    Attributes:
        data: Raw PCM16 little-endian bytes, mono, sample rate = 24000 Hz.
        transcript: Optional transcript text (may be populated by streaming STT).
        start_time: Optional wall-clock offset from scenario start, in seconds.
        end_time: Optional wall-clock offset from scenario start, in seconds.
    """

    data: bytes
    transcript: Optional[str] = None
    start_time: Optional[float] = None
    end_time: Optional[float] = None

    def __post_init__(self) -> None:
        # PCM16 samples are 2 bytes each. An odd-length buffer means a WebSocket
        # framing boundary split a sample — downstream code (np.frombuffer,
        # duration_seconds) silently truncates and produces off-by-one drift.
        # Catch it at the canonical boundary instead.
        if len(self.data) % PCM16_SAMPLE_WIDTH_BYTES != 0:
            raise ValueError(
                f"AudioChunk.data length ({len(self.data)} bytes) is not a "
                f"multiple of {PCM16_SAMPLE_WIDTH_BYTES} — not valid PCM16. "
                "This usually indicates a partial transport frame; adapters "
                "must buffer until a complete sample is available."
            )

    @property
    def sample_rate(self) -> int:
        return PCM16_SAMPLE_RATE

    @property
    def channels(self) -> int:
        return PCM16_CHANNELS

    @property
    def duration_seconds(self) -> float:
        """Length of the chunk in seconds (from bytes, assuming PCM16 mono)."""
        if not self.data:
            return 0.0
        num_samples = len(self.data) // PCM16_SAMPLE_WIDTH_BYTES
        return num_samples / PCM16_SAMPLE_RATE

Instance variables

var channels : int
Expand source code
@property
def channels(self) -> int:
    return PCM16_CHANNELS
var data : bytes
var duration_seconds : float

Length of the chunk in seconds (from bytes, assuming PCM16 mono).

Expand source code
@property
def duration_seconds(self) -> float:
    """Length of the chunk in seconds (from bytes, assuming PCM16 mono)."""
    if not self.data:
        return 0.0
    num_samples = len(self.data) // PCM16_SAMPLE_WIDTH_BYTES
    return num_samples / PCM16_SAMPLE_RATE
var end_time : float | None
var sample_rate : int
Expand source code
@property
def sample_rate(self) -> int:
    return PCM16_SAMPLE_RATE
var start_time : float | None
var transcript : str | None
class AudioSegment (speaker: SpeakerRole, start_time: float, end_time: float, audio: bytes, transcript: Optional[str] = None, transcript_truncated: bool = False)

A contiguous span of audio attributed to one speaker.

transcript_truncated is True when this agent segment was cut short by a user_interrupt event during the run — the audio bytes are authoritative; the transcript may reflect what the agent INTENDED to say, not what the user actually heard. Tools that care about wire truth should re-transcribe the audio (transcribe_segments with only_missing=False) on truncated segments.

Expand source code
@dataclass
class AudioSegment:
    """A contiguous span of audio attributed to one speaker.

    ``transcript_truncated`` is True when this agent segment was cut short
    by a user_interrupt event during the run — the audio bytes are
    authoritative; the transcript may reflect what the agent INTENDED to
    say, not what the user actually heard. Tools that care about wire
    truth should re-transcribe the audio (transcribe_segments with
    ``only_missing=False``) on truncated segments.
    """

    speaker: SpeakerRole
    start_time: float
    end_time: float
    audio: bytes  # PCM16 bytes
    transcript: Optional[str] = None
    transcript_truncated: bool = False

Instance variables

var audio : bytes
var end_time : float
var speaker : Literal['user', 'agent']
var start_time : float
var transcript : str | None
var transcript_truncated : bool
class ComposableVoiceAgent (stt: STTProvider, llm: str, tts: str, *, system_prompt: Optional[str] = None)

Locally-executed STT → LLM → TTS voice agent.

scenario.voice.stt transcribes incoming user audio, the result is fed to llm (a litellm model string) along with conversation history, and the response is synthesised via the scenario.voice.tts voice string using the existing synthesize() router.

Each seam is independently swappable — change any one without touching the other two. Intermediate results are surfaced on instance attributes so the scenario harness can assert on them.

Attributes

last_user_transcript
Transcript of the most-recent user audio turn.
last_llm_response
Text produced by the LLM for the most-recent turn.

Args

stt
STTProvider implementation for the user's audio.
llm
litellm-style model identifier, e.g. COMPOSABLE_VOICE_LLM_MODEL.
tts
TTS voice string in "provider/voice" format, e.g. "openai/nova" or "elevenlabs/rachel".
system_prompt
Optional system prompt seeded at turn zero so the LLM has guidance before the first user message. Defaults to a generic helpful-assistant prompt.
Expand source code
class ComposableVoiceAgent(VoiceAgentAdapter):
    """
    Locally-executed STT → LLM → TTS voice agent.

    ``stt`` transcribes incoming user audio, the result is fed to ``llm``
    (a litellm model string) along with conversation history, and the response
    is synthesised via the ``tts`` voice string using the existing
    ``scenario.voice.synthesize`` router.

    Each seam is independently swappable — change any one without touching the
    other two. Intermediate results are surfaced on instance attributes so the
    scenario harness can assert on them.

    Attributes:
        last_user_transcript: Transcript of the most-recent user audio turn.
        last_llm_response: Text produced by the LLM for the most-recent turn.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=False,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    DEFAULT_SYSTEM_PROMPT = (
        "You are a helpful voice assistant. Respond naturally and conversationally "
        "as this is an audio conversation — be concise, friendly, and clear."
    )

    def __init__(
        self,
        stt: STTProvider,
        llm: str,
        tts: str,
        *,
        system_prompt: Optional[str] = None,
    ) -> None:
        """
        Args:
            stt: STTProvider implementation for the user's audio.
            llm: litellm-style model identifier, e.g. ``COMPOSABLE_VOICE_LLM_MODEL``.
            tts: TTS voice string in ``"provider/voice"`` format,
                 e.g. ``"openai/nova"`` or ``"elevenlabs/rachel"``.
            system_prompt: Optional system prompt seeded at turn zero so the
                LLM has guidance before the first user message. Defaults to a
                generic helpful-assistant prompt.
        """
        super().__init__()
        self.stt = stt
        self.llm = llm
        self.tts = tts

        self.last_user_transcript: Optional[str] = None
        self.last_llm_response: Optional[str] = None

        # Seed history with a system prompt so the first recv_audio call (which
        # can happen before any user audio when the agent speaks first) doesn't
        # send an empty messages array to the LLM.
        self._history: List[dict] = [
            {"role": "system", "content": system_prompt or self.DEFAULT_SYSTEM_PROMPT}
        ]
        # Turn-output guard. ``recv_audio`` synthesises ONE chunk per
        # user turn. The default ``call()`` drains by re-calling
        # ``recv_audio`` until tail-silence — on this adapter that would
        # kick a second LLM call, cancelled later by timeout (wasted
        # credits + latency). The guard makes subsequent ``recv_audio``
        # calls in the same turn return an empty chunk, which the drain
        # loop interprets as end-of-stream.
        #
        # Reset boundary: ``send_audio`` (new user audio → new turn).
        # Set boundary: end of ``recv_audio`` (LLM+TTS completed).
        self._turn_output_emitted: bool = False

    def __repr__(self) -> str:
        return f"ComposableVoiceAgent(llm={self.llm!r}, tts={self.tts!r})"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """No-op — no external transport to open."""

    async def disconnect(self) -> None:
        """No-op — nothing to tear down."""

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """Transcribe the chunk via STT and store for the next recv_audio call."""
        transcript = await self.stt.transcribe(chunk)
        self.last_user_transcript = transcript
        self._history.append({"role": "user", "content": transcript})
        # New user turn → next recv_audio is allowed to synthesise.
        self._turn_output_emitted = False

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Run the LLM on the current history, synthesise the response via TTS,
        and return the resulting AudioChunk.

        ``timeout`` is honoured for the combined LLM+TTS call via
        ``asyncio.wait_for``. Subsequent calls in the same turn (the
        default ``call()`` drains until tail-silence) return an empty
        chunk so the drain loop exits without billing a second LLM
        round-trip — see ``_turn_output_emitted`` for the guard contract.
        """
        if self._turn_output_emitted:
            return AudioChunk(data=b"")

        import asyncio

        async def _run() -> AudioChunk:
            import litellm  # type: ignore
            from litellm.types.utils import Choices, ModelResponse
            from typing import cast as _cast

            from ..tts import synthesize

            completion = await litellm.acompletion(
                model=self.llm,
                messages=self._history,
            )
            # Non-streaming acompletion returns ModelResponse with Choices;
            # cast satisfies pyright without runtime isinstance overhead.
            completion = _cast(ModelResponse, completion)
            choice = _cast(Choices, completion.choices[0])
            response_text: str = choice.message.content or ""
            self.last_llm_response = response_text
            self._history.append({"role": "assistant", "content": response_text})

            return await synthesize(response_text, self.tts)

        chunk = await asyncio.wait_for(_run(), timeout=timeout)
        self._turn_output_emitted = True
        return chunk

Ancestors

Subclasses

Class variables

var DEFAULT_SYSTEM_PROMPT
var capabilities : ClassVar[AdapterCapabilities]

Methods

async def connect(self) ‑> None

No-op — no external transport to open.

Expand source code
async def connect(self) -> None:
    """No-op — no external transport to open."""
async def disconnect(self) ‑> None

No-op — nothing to tear down.

Expand source code
async def disconnect(self) -> None:
    """No-op — nothing to tear down."""
async def recv_audio(self, timeout: float) ‑> AudioChunk

Run the LLM on the current history, synthesise the response via TTS, and return the resulting AudioChunk.

timeout is honoured for the combined LLM+TTS call via asyncio.wait_for. Subsequent calls in the same turn (the default call() drains until tail-silence) return an empty chunk so the drain loop exits without billing a second LLM round-trip — see _turn_output_emitted for the guard contract.

Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """
    Run the LLM on the current history, synthesise the response via TTS,
    and return the resulting AudioChunk.

    ``timeout`` is honoured for the combined LLM+TTS call via
    ``asyncio.wait_for``. Subsequent calls in the same turn (the
    default ``call()`` drains until tail-silence) return an empty
    chunk so the drain loop exits without billing a second LLM
    round-trip — see ``_turn_output_emitted`` for the guard contract.
    """
    if self._turn_output_emitted:
        return AudioChunk(data=b"")

    import asyncio

    async def _run() -> AudioChunk:
        import litellm  # type: ignore
        from litellm.types.utils import Choices, ModelResponse
        from typing import cast as _cast

        from ..tts import synthesize

        completion = await litellm.acompletion(
            model=self.llm,
            messages=self._history,
        )
        # Non-streaming acompletion returns ModelResponse with Choices;
        # cast satisfies pyright without runtime isinstance overhead.
        completion = _cast(ModelResponse, completion)
        choice = _cast(Choices, completion.choices[0])
        response_text: str = choice.message.content or ""
        self.last_llm_response = response_text
        self._history.append({"role": "assistant", "content": response_text})

        return await synthesize(response_text, self.tts)

    chunk = await asyncio.wait_for(_run(), timeout=timeout)
    self._turn_output_emitted = True
    return chunk
async def send_audio(self, chunk: AudioChunk) ‑> None

Transcribe the chunk via STT and store for the next recv_audio call.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """Transcribe the chunk via STT and store for the next recv_audio call."""
    transcript = await self.stt.transcribe(chunk)
    self.last_user_transcript = transcript
    self._history.append({"role": "user", "content": transcript})
    # New user turn → next recv_audio is allowed to synthesise.
    self._turn_output_emitted = False

Inherited members

class ElevenLabsAgentAdapter (agent_id: str, api_key: str, *, system_prompt_override: Optional[str] = None, first_message_override: Optional[str] = None)

ElevenLabs hosted Conversational AI adapter.

Connects to ElevenLabs' hosted endpoint where the STT→LLM→TTS loop runs on their infrastructure. All audio is PCM16 @ 24kHz mono — no conversion needed at either edge.

Not to be confused with :class:ElevenLabsVoiceAgent (in scenario.voice.adapters.composable), which is the typed composable preset that runs locally with separate STT, LLM, and TTS providers. The two complement each other:

  • ElevenLabsAgentAdapter (this class): black-box hosted EL ConvAI; you provide an agent_id provisioned in the EL dashboard and EL runs the whole pipeline server-side.
  • :class:ElevenLabsVoiceAgent: composes ElevenLabsSTTProvider + any LLM + ElevenLabs TTS on your side; you control the prompts, model choice, and tool calls.

Intermediate transcripts are tracked on last_user_transcript and last_agent_transcript for scenario observability.

Example::

adapter = ElevenLabsAgentAdapter(agent_id="abc123", api_key="sk-...")
async with adapter:
    # scenario.run() feeds send_audio / recv_audio ...
Expand source code
class ElevenLabsAgentAdapter(VoiceAgentAdapter):
    """
    ElevenLabs **hosted** Conversational AI adapter.

    Connects to ElevenLabs' hosted endpoint where the STT→LLM→TTS loop runs
    on their infrastructure. All audio is PCM16 @ 24kHz mono — no conversion
    needed at either edge.

    Not to be confused with :class:`ElevenLabsVoiceAgent` (in
    ``scenario.voice.adapters.composable``), which is the typed composable
    preset that runs locally with separate STT, LLM, and TTS providers. The
    two complement each other:

    - ``ElevenLabsAgentAdapter`` (this class): black-box hosted EL ConvAI;
      you provide an ``agent_id`` provisioned in the EL dashboard and EL
      runs the whole pipeline server-side.
    - :class:`ElevenLabsVoiceAgent`: composes ``ElevenLabsSTTProvider`` +
      any LLM + ElevenLabs TTS on your side; you control the prompts,
      model choice, and tool calls.

    Intermediate transcripts are tracked on ``last_user_transcript`` and
    ``last_agent_transcript`` for scenario observability.

    Example::

        adapter = ElevenLabsAgentAdapter(agent_id="abc123", api_key="sk-...")
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        agent_id: str,
        api_key: str,
        *,
        system_prompt_override: Optional[str] = None,
        first_message_override: Optional[str] = None,
    ) -> None:
        super().__init__()
        self.agent_id = agent_id
        self.api_key = api_key
        # Per-session overrides applied via conversation_initiation_client_data
        # at the start of every WS connect. Used by demos that need a
        # different prompt shape (e.g. verbose for interrupt demos) without
        # mutating the shared test agent's persistent config.
        self._system_prompt_override = system_prompt_override
        self._first_message_override = first_message_override
        self._ws: Any = None

        # Transcript observability — updated on each transcript event.
        self.last_user_transcript: Optional[str] = None
        self.last_agent_transcript: Optional[str] = None

    @property
    def url(self) -> str:
        return CONVAI_URL_TEMPLATE.format(agent_id=self.agent_id)

    def __repr__(self) -> str:  # redact credentials
        return f"ElevenLabsAgentAdapter(agent_id={self.agent_id!r}, api_key='***')"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open the WebSocket to ElevenLabs' ConvAI endpoint.

        We send ``conversation_initiation_client_data`` on every connect.
        The EL docs neither require nor forbid this (the reference SDK
        sample sends it unconditionally with an empty body); empirically
        we've seen ``first_message`` skipped on bare connects and reliably
        fire when the init message is sent, even with an empty override
        block. If EL's behavior changes, this is the first thing to
        revisit.
        """
        import websockets

        self._ws = await websockets.connect(
            self.url,
            additional_headers={"xi-api-key": self.api_key},
        )
        logger.debug("ElevenLabsAgentAdapter: connected to %s", self.url)

        agent_override: dict[str, Any] = {}
        if self._system_prompt_override:
            agent_override["prompt"] = {"prompt": self._system_prompt_override}
        if self._first_message_override:
            agent_override["first_message"] = self._first_message_override

        init = {
            "type": "conversation_initiation_client_data",
            "conversation_config_override": {"agent": agent_override},
        }
        await self._ws.send(json.dumps(init))
        logger.debug(
            "ElevenLabsAgentAdapter: sent conversation_initiation_client_data with overrides=%s",
            list(agent_override.keys()) or "none",
        )

    async def disconnect(self) -> None:
        """Close the WebSocket if open."""
        if self._ws is not None:
            try:
                await self._ws.close()
            except Exception:
                # Best-effort: connection may already be half-closed or
                # in an error state when disconnect() is called. We're
                # tearing down regardless — propagating here would just
                # leak the WS reference.
                pass
            finally:
                self._ws = None
            logger.debug("ElevenLabsAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """Send a PCM16 audio chunk encoded as base64 in a JSON message.

        Empirically, EL ConvAI stops responding to subsequent turns if
        the client sends only a single chunk and never signals end of
        turn. The EL docs document no client-driven end-of-turn signal
        (server-side VAD is supposed to handle it) but in practice the
        VAD only fires after enough silence has been observed. We
        append a fixed-size tail of zero-bytes after every chunk to
        provide that silence signal.

        Tail size: 16000 zero bytes — empirically the sweet spot.
        - Removing the tail entirely: EL stops responding to user
          turns after the greeting.
        - Doubling to 24000 bytes (a "true 500ms" at the provisioned
          pcm_24000 rate): EL stops responding mid-conversation, same
          stall pattern.
        - 16000 bytes at pcm_24000 = ~333ms of silence: reliable.

        If EL ever exposes an explicit end-of-turn message we should
        switch to that instead.
        """
        if self._ws is None:
            raise RuntimeError("ElevenLabsAgentAdapter: not connected")

        # 1. Speech.
        b64 = base64.b64encode(chunk.data).decode()
        await self._ws.send(json.dumps({"user_audio_chunk": b64}))

        # 2. Silence tail. See docstring for size rationale.
        silence = b"\x00" * 16000
        silence_b64 = base64.b64encode(silence).decode()
        await self._ws.send(json.dumps({"user_audio_chunk": silence_b64}))

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Receive the next audio chunk from ElevenLabs.

        ``timeout`` bounds **inter-message silence** — the maximum gap between
        any two received frames — NOT the total duration of the call. Every
        received frame (**keep-alive pings included**) resets the idle
        deadline, so this returns when an ``audio`` event arrives and raises
        :class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse
        with **no message of any kind**. Pings are replied to inline;
        transcript events update instance attributes for observability; all
        other event types are swallowed without error.

        Design decision (issue #493 — intentional, not an oversight): because
        a received ping is treated as proof of liveness, a hosted agent that
        keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will
        make this method wait **indefinitely**. There is deliberately **no
        total-duration ceiling** here — a legitimate 30s+ silent-but-pinging
        stretch must not abort the turn, which a cumulative budget would do.
        The caller's ``response_max_duration`` is checked *between*
        ``recv_audio`` calls and does **not** bound a single in-progress recv.
        (An absolute caller-side backstop for the wedged-agent case is tracked
        as a separate follow-up; it is intentionally not implemented here.)
        """
        if self._ws is None:
            raise RuntimeError("ElevenLabsAgentAdapter: not connected")

        deadline = asyncio.get_running_loop().time() + timeout
        while True:
            remaining = deadline - asyncio.get_running_loop().time()
            if remaining <= 0:
                raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out")

            raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            # A received message (ping included) proves the socket is alive, so
            # re-arm the idle deadline. Placed BEFORE json.loads so ANY frame —
            # even a non-JSON/malformed one — counts as a liveness signal.
            deadline = asyncio.get_running_loop().time() + timeout
            try:
                event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
            except Exception:
                logger.debug("ElevenLabsAgentAdapter: non-JSON message, skipping")
                continue

            etype = event.get("type", "")
            logger.debug("ElevenLabsAgentAdapter: recv event %s", etype)

            if etype == "audio":
                audio_event = event.get("audio_event", {})
                b64 = audio_event.get("audio_base_64", "")
                pcm = base64.b64decode(b64)
                # Ensure even byte count (PCM16 invariant).
                if len(pcm) % 2 == 1:
                    pcm = pcm[:-1]
                return AudioChunk(data=pcm)

            elif etype == "ping":
                # Per EL docs, ping wire shape is
                #   {"type": "ping", "ping_event": {"event_id": <int>, "ping_ms": <int>}}
                # Pong must echo the event_id at the top level. The
                # fallback to top-level event_id covers any older shape.
                ping_event = event.get("ping_event") or {}
                event_id = ping_event.get("event_id")
                if event_id is None:
                    event_id = event.get("event_id")
                if event_id is None:
                    logger.debug("ElevenLabsAgentAdapter: ping with no event_id, skipping pong: %r", event)
                    continue
                pong = json.dumps({"type": "pong", "event_id": event_id})
                await self._ws.send(pong)

            elif etype == "user_transcript":
                self.last_user_transcript = event.get("user_transcription_event", {}).get("user_transcript")

            elif etype == "agent_response":
                self.last_agent_transcript = event.get("agent_response_event", {}).get("agent_response")

            elif etype == "agent_response_correction":
                # EL signals a corrected agent reply (post server-side
                # barge-in detection). The corrected text replaces the
                # last_agent_transcript so consumers see what the agent
                # ACTUALLY said after our interrupt landed, not the
                # pre-correction draft.
                #
                # Wire shape:
                #   {"type": "agent_response_correction",
                #    "agent_response_correction_event": {
                #      "original_agent_response": "...",
                #      "corrected_agent_response": "..."}}
                correction = event.get("agent_response_correction_event", {}) or {}
                corrected = correction.get("corrected_agent_response")
                if corrected:
                    self.last_agent_transcript = corrected

            elif etype == "conversation_initiation_metadata":
                # EL reports the agent's actual configured audio formats
                # here. Our adapter capabilities advertise pcm16/24000,
                # matching the test agent we provision. If a caller
                # points the adapter at an agent configured differently,
                # this is where the mismatch becomes visible — warn so
                # the codec mismatch is logged rather than silently
                # garbling audio.
                #
                # Wire shape (per docs):
                #   {"type": "conversation_initiation_metadata",
                #    "conversation_initiation_metadata_event": {
                #      "conversation_id": "...",
                #      "agent_output_audio_format": "pcm_24000",
                #      "user_input_audio_format": "pcm_24000"}}
                meta = event.get("conversation_initiation_metadata_event", {}) or {}
                out_fmt = meta.get("agent_output_audio_format")
                in_fmt = meta.get("user_input_audio_format")
                if out_fmt and out_fmt != "pcm_24000":
                    logger.warning(
                        "ElevenLabsAgentAdapter: agent_output_audio_format=%r "
                        "differs from advertised pcm16/24000 capability; "
                        "audio may pitch-shift or fail to decode.",
                        out_fmt,
                    )
                if in_fmt and in_fmt != "pcm_24000":
                    logger.warning(
                        "ElevenLabsAgentAdapter: user_input_audio_format=%r "
                        "differs from advertised pcm16/24000 capability; "
                        "the agent may not understand audio we send.",
                        in_fmt,
                    )

            elif etype == "interruption":
                pass  # documented non-audio event, no action needed

            else:
                logger.debug("ElevenLabsAgentAdapter: unknown event type %r, skipping", etype)

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var url : str
Expand source code
@property
def url(self) -> str:
    return CONVAI_URL_TEMPLATE.format(agent_id=self.agent_id)

Methods

async def connect(self) ‑> None

Open the WebSocket to ElevenLabs' ConvAI endpoint.

We send conversation_initiation_client_data on every connect. The EL docs neither require nor forbid this (the reference SDK sample sends it unconditionally with an empty body); empirically we've seen first_message skipped on bare connects and reliably fire when the init message is sent, even with an empty override block. If EL's behavior changes, this is the first thing to revisit.

Expand source code
async def connect(self) -> None:
    """Open the WebSocket to ElevenLabs' ConvAI endpoint.

    We send ``conversation_initiation_client_data`` on every connect.
    The EL docs neither require nor forbid this (the reference SDK
    sample sends it unconditionally with an empty body); empirically
    we've seen ``first_message`` skipped on bare connects and reliably
    fire when the init message is sent, even with an empty override
    block. If EL's behavior changes, this is the first thing to
    revisit.
    """
    import websockets

    self._ws = await websockets.connect(
        self.url,
        additional_headers={"xi-api-key": self.api_key},
    )
    logger.debug("ElevenLabsAgentAdapter: connected to %s", self.url)

    agent_override: dict[str, Any] = {}
    if self._system_prompt_override:
        agent_override["prompt"] = {"prompt": self._system_prompt_override}
    if self._first_message_override:
        agent_override["first_message"] = self._first_message_override

    init = {
        "type": "conversation_initiation_client_data",
        "conversation_config_override": {"agent": agent_override},
    }
    await self._ws.send(json.dumps(init))
    logger.debug(
        "ElevenLabsAgentAdapter: sent conversation_initiation_client_data with overrides=%s",
        list(agent_override.keys()) or "none",
    )
async def disconnect(self) ‑> None

Close the WebSocket if open.

Expand source code
async def disconnect(self) -> None:
    """Close the WebSocket if open."""
    if self._ws is not None:
        try:
            await self._ws.close()
        except Exception:
            # Best-effort: connection may already be half-closed or
            # in an error state when disconnect() is called. We're
            # tearing down regardless — propagating here would just
            # leak the WS reference.
            pass
        finally:
            self._ws = None
        logger.debug("ElevenLabsAgentAdapter: disconnected")
async def recv_audio(self, timeout: float) ‑> AudioChunk

Receive the next audio chunk from ElevenLabs.

timeout bounds inter-message silence — the maximum gap between any two received frames — NOT the total duration of the call. Every received frame (keep-alive pings included) resets the idle deadline, so this returns when an audio event arrives and raises :class:asyncio.TimeoutError only after timeout seconds elapse with no message of any kind. Pings are replied to inline; transcript events update instance attributes for observability; all other event types are swallowed without error.

Design decision (issue #493 — intentional, not an oversight): because a received ping is treated as proof of liveness, a hosted agent that keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will make this method wait indefinitely. There is deliberately no total-duration ceiling here — a legitimate 30s+ silent-but-pinging stretch must not abort the turn, which a cumulative budget would do. The caller's response_max_duration is checked between recv_audio calls and does not bound a single in-progress recv. (An absolute caller-side backstop for the wedged-agent case is tracked as a separate follow-up; it is intentionally not implemented here.)

Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """
    Receive the next audio chunk from ElevenLabs.

    ``timeout`` bounds **inter-message silence** — the maximum gap between
    any two received frames — NOT the total duration of the call. Every
    received frame (**keep-alive pings included**) resets the idle
    deadline, so this returns when an ``audio`` event arrives and raises
    :class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse
    with **no message of any kind**. Pings are replied to inline;
    transcript events update instance attributes for observability; all
    other event types are swallowed without error.

    Design decision (issue #493 — intentional, not an oversight): because
    a received ping is treated as proof of liveness, a hosted agent that
    keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will
    make this method wait **indefinitely**. There is deliberately **no
    total-duration ceiling** here — a legitimate 30s+ silent-but-pinging
    stretch must not abort the turn, which a cumulative budget would do.
    The caller's ``response_max_duration`` is checked *between*
    ``recv_audio`` calls and does **not** bound a single in-progress recv.
    (An absolute caller-side backstop for the wedged-agent case is tracked
    as a separate follow-up; it is intentionally not implemented here.)
    """
    if self._ws is None:
        raise RuntimeError("ElevenLabsAgentAdapter: not connected")

    deadline = asyncio.get_running_loop().time() + timeout
    while True:
        remaining = deadline - asyncio.get_running_loop().time()
        if remaining <= 0:
            raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out")

        raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
        # A received message (ping included) proves the socket is alive, so
        # re-arm the idle deadline. Placed BEFORE json.loads so ANY frame —
        # even a non-JSON/malformed one — counts as a liveness signal.
        deadline = asyncio.get_running_loop().time() + timeout
        try:
            event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
        except Exception:
            logger.debug("ElevenLabsAgentAdapter: non-JSON message, skipping")
            continue

        etype = event.get("type", "")
        logger.debug("ElevenLabsAgentAdapter: recv event %s", etype)

        if etype == "audio":
            audio_event = event.get("audio_event", {})
            b64 = audio_event.get("audio_base_64", "")
            pcm = base64.b64decode(b64)
            # Ensure even byte count (PCM16 invariant).
            if len(pcm) % 2 == 1:
                pcm = pcm[:-1]
            return AudioChunk(data=pcm)

        elif etype == "ping":
            # Per EL docs, ping wire shape is
            #   {"type": "ping", "ping_event": {"event_id": <int>, "ping_ms": <int>}}
            # Pong must echo the event_id at the top level. The
            # fallback to top-level event_id covers any older shape.
            ping_event = event.get("ping_event") or {}
            event_id = ping_event.get("event_id")
            if event_id is None:
                event_id = event.get("event_id")
            if event_id is None:
                logger.debug("ElevenLabsAgentAdapter: ping with no event_id, skipping pong: %r", event)
                continue
            pong = json.dumps({"type": "pong", "event_id": event_id})
            await self._ws.send(pong)

        elif etype == "user_transcript":
            self.last_user_transcript = event.get("user_transcription_event", {}).get("user_transcript")

        elif etype == "agent_response":
            self.last_agent_transcript = event.get("agent_response_event", {}).get("agent_response")

        elif etype == "agent_response_correction":
            # EL signals a corrected agent reply (post server-side
            # barge-in detection). The corrected text replaces the
            # last_agent_transcript so consumers see what the agent
            # ACTUALLY said after our interrupt landed, not the
            # pre-correction draft.
            #
            # Wire shape:
            #   {"type": "agent_response_correction",
            #    "agent_response_correction_event": {
            #      "original_agent_response": "...",
            #      "corrected_agent_response": "..."}}
            correction = event.get("agent_response_correction_event", {}) or {}
            corrected = correction.get("corrected_agent_response")
            if corrected:
                self.last_agent_transcript = corrected

        elif etype == "conversation_initiation_metadata":
            # EL reports the agent's actual configured audio formats
            # here. Our adapter capabilities advertise pcm16/24000,
            # matching the test agent we provision. If a caller
            # points the adapter at an agent configured differently,
            # this is where the mismatch becomes visible — warn so
            # the codec mismatch is logged rather than silently
            # garbling audio.
            #
            # Wire shape (per docs):
            #   {"type": "conversation_initiation_metadata",
            #    "conversation_initiation_metadata_event": {
            #      "conversation_id": "...",
            #      "agent_output_audio_format": "pcm_24000",
            #      "user_input_audio_format": "pcm_24000"}}
            meta = event.get("conversation_initiation_metadata_event", {}) or {}
            out_fmt = meta.get("agent_output_audio_format")
            in_fmt = meta.get("user_input_audio_format")
            if out_fmt and out_fmt != "pcm_24000":
                logger.warning(
                    "ElevenLabsAgentAdapter: agent_output_audio_format=%r "
                    "differs from advertised pcm16/24000 capability; "
                    "audio may pitch-shift or fail to decode.",
                    out_fmt,
                )
            if in_fmt and in_fmt != "pcm_24000":
                logger.warning(
                    "ElevenLabsAgentAdapter: user_input_audio_format=%r "
                    "differs from advertised pcm16/24000 capability; "
                    "the agent may not understand audio we send.",
                    in_fmt,
                )

        elif etype == "interruption":
            pass  # documented non-audio event, no action needed

        else:
            logger.debug("ElevenLabsAgentAdapter: unknown event type %r, skipping", etype)
async def send_audio(self, chunk: AudioChunk) ‑> None

Send a PCM16 audio chunk encoded as base64 in a JSON message.

Empirically, EL ConvAI stops responding to subsequent turns if the client sends only a single chunk and never signals end of turn. The EL docs document no client-driven end-of-turn signal (server-side VAD is supposed to handle it) but in practice the VAD only fires after enough silence has been observed. We append a fixed-size tail of zero-bytes after every chunk to provide that silence signal.

Tail size: 16000 zero bytes — empirically the sweet spot. - Removing the tail entirely: EL stops responding to user turns after the greeting. - Doubling to 24000 bytes (a "true 500ms" at the provisioned pcm_24000 rate): EL stops responding mid-conversation, same stall pattern. - 16000 bytes at pcm_24000 = ~333ms of silence: reliable.

If EL ever exposes an explicit end-of-turn message we should switch to that instead.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """Send a PCM16 audio chunk encoded as base64 in a JSON message.

    Empirically, EL ConvAI stops responding to subsequent turns if
    the client sends only a single chunk and never signals end of
    turn. The EL docs document no client-driven end-of-turn signal
    (server-side VAD is supposed to handle it) but in practice the
    VAD only fires after enough silence has been observed. We
    append a fixed-size tail of zero-bytes after every chunk to
    provide that silence signal.

    Tail size: 16000 zero bytes — empirically the sweet spot.
    - Removing the tail entirely: EL stops responding to user
      turns after the greeting.
    - Doubling to 24000 bytes (a "true 500ms" at the provisioned
      pcm_24000 rate): EL stops responding mid-conversation, same
      stall pattern.
    - 16000 bytes at pcm_24000 = ~333ms of silence: reliable.

    If EL ever exposes an explicit end-of-turn message we should
    switch to that instead.
    """
    if self._ws is None:
        raise RuntimeError("ElevenLabsAgentAdapter: not connected")

    # 1. Speech.
    b64 = base64.b64encode(chunk.data).decode()
    await self._ws.send(json.dumps({"user_audio_chunk": b64}))

    # 2. Silence tail. See docstring for size rationale.
    silence = b"\x00" * 16000
    silence_b64 = base64.b64encode(silence).decode()
    await self._ws.send(json.dumps({"user_audio_chunk": silence_b64}))

Inherited members

class ElevenLabsSTTProvider (api_key: Optional[str] = None)

STT implementation backed by the ElevenLabs REST speech-to-text API.

Uses the scribe_v1 model. Audio is converted from the canonical PCM16/24kHz AudioChunk to a WAV byte payload before posting.

Reads ELEVENLABS_API_KEY from the environment when api_key is not supplied explicitly.

Only text is returned — no ElevenLabs-specific types cross the STTProvider interface boundary.

Expand source code
class ElevenLabsSTTProvider(STTProvider):
    """
    STT implementation backed by the ElevenLabs REST speech-to-text API.

    Uses the ``scribe_v1`` model. Audio is converted from the canonical
    PCM16/24kHz AudioChunk to a WAV byte payload before posting.

    Reads ``ELEVENLABS_API_KEY`` from the environment when ``api_key`` is not
    supplied explicitly.

    Only ``text`` is returned — no ElevenLabs-specific types cross the
    ``STTProvider`` interface boundary.
    """

    def __init__(self, api_key: Optional[str] = None) -> None:
        self.api_key = api_key or os.environ.get("ELEVENLABS_API_KEY", "")

    def __repr__(self) -> str:  # redact credentials
        return "ElevenLabsSTTProvider(api_key='***')"

    async def transcribe(self, audio: AudioChunk) -> str:
        import logging

        import httpx

        from .messages import _pcm16_to_wav_bytes

        wav_bytes = _pcm16_to_wav_bytes(audio.data)
        async with httpx.AsyncClient() as client:
            response = await client.post(
                ELEVENLABS_STT_ENDPOINT,
                headers={"xi-api-key": self.api_key},
                files={"file": ("audio.wav", wav_bytes, "audio/wav")},
                data={"model_id": ELEVENLABS_STT_MODEL},
            )
            if response.status_code >= 400:
                # Log detail at DEBUG; keep exception message minimal so response
                # body doesn't end up embedded in trace tooling output.
                logging.getLogger("scenario.voice.stt").debug(
                    "ElevenLabs STT %d: %s",
                    response.status_code,
                    response.text[:300],
                )
                raise RuntimeError(
                    f"ElevenLabs STT HTTP {response.status_code} "
                    "(see DEBUG log for response body)"
                )
            return response.json().get("text", "")

Ancestors

Inherited members

class ElevenLabsVoiceAgent (api_key: str, *, llm: str = 'openai/gpt-5.4-mini', voice: Optional[str] = None, stt: Optional[STTProvider] = None, system_prompt: Optional[str] = None)

Composable voice agent with ElevenLabs-opinionated defaults.

Not to be confused with :class:ElevenLabsAgentAdapter (in scenario.voice.adapters.elevenlabs) — that one talks to ElevenLabs' hosted Conversational AI endpoint where EL runs the full STT→LLM→TTS loop. This class is local: you compose ElevenLabsSTTProvider + any LLM + ElevenLabs TTS yourself, keeping full control over prompts, model choice, and tool calls.

Instantiate with just an api_key to get an ElevenLabs STT + LLM (default COMPOSABLE_VOICE_LLM_MODEL) + elevenlabs/rachel TTS stack. Each piece can be overridden independently without changing the others.

Example::

# Defaults — all ElevenLabs STT, GPT-4o-mini, ElevenLabs TTS
agent = ElevenLabsVoiceAgent(api_key="sk-...")

# Override just the LLM
agent = ElevenLabsVoiceAgent(api_key="sk-...", llm="openai/gpt-4o")

# Bring your own STT
agent = ElevenLabsVoiceAgent(api_key="sk-...", stt=MyCustomSTT())

Args

api_key
ElevenLabs API key. Redacted in __repr__.
llm
litellm-style model identifier. Defaults to COMPOSABLE_VOICE_LLM_MODEL.
voice
TTS voice string in "elevenlabs/<voice_id>" format. Defaults to the ELEVENLABS_VOICE_ID environment variable when set, otherwise falls back to "Sarah" ("elevenlabs/EXAVITQu4vr4xnSDxMaL") — premade and accessible on the ElevenLabs free tier as of 2026-05. Other premade voices (e.g. "Rachel" 21m00Tcm4TlvDq8ikWAM) returned 402 paid_plan_required from the EL TTS API; gating differs per voice. Set ELEVENLABS_VOICE_ID to override.
stt
STTProvider override. Defaults to ElevenLabsSTTProvider(api_key=api_key).
system_prompt
Optional system prompt. Defaults to ComposableVoiceAgent.DEFAULT_SYSTEM_PROMPT.
Expand source code
class ElevenLabsVoiceAgent(ComposableVoiceAgent):
    """
    Composable voice agent with ElevenLabs-opinionated defaults.

    Not to be confused with :class:`ElevenLabsAgentAdapter` (in
    ``scenario.voice.adapters.elevenlabs``) — that one talks to ElevenLabs'
    **hosted** Conversational AI endpoint where EL runs the full
    STT→LLM→TTS loop. This class is local: you compose ``ElevenLabsSTTProvider``
    + any LLM + ElevenLabs TTS yourself, keeping full control over prompts,
    model choice, and tool calls.

    Instantiate with just an ``api_key`` to get an ElevenLabs STT +
    LLM (default ``COMPOSABLE_VOICE_LLM_MODEL``) + ``elevenlabs/rachel`` TTS stack. Each piece
    can be overridden independently without changing the others.

    Example::

        # Defaults — all ElevenLabs STT, GPT-4o-mini, ElevenLabs TTS
        agent = ElevenLabsVoiceAgent(api_key="sk-...")

        # Override just the LLM
        agent = ElevenLabsVoiceAgent(api_key="sk-...", llm="openai/gpt-4o")

        # Bring your own STT
        agent = ElevenLabsVoiceAgent(api_key="sk-...", stt=MyCustomSTT())
    """

    def __init__(
        self,
        api_key: str,
        *,
        llm: str = COMPOSABLE_VOICE_LLM_MODEL,
        voice: Optional[str] = None,
        stt: Optional[STTProvider] = None,
        system_prompt: Optional[str] = None,
    ) -> None:
        """
        Args:
            api_key: ElevenLabs API key. Redacted in ``__repr__``.
            llm: litellm-style model identifier. Defaults to
                ``COMPOSABLE_VOICE_LLM_MODEL``.
            voice: TTS voice string in ``"elevenlabs/<voice_id>"`` format.
                Defaults to the ``ELEVENLABS_VOICE_ID`` environment variable
                when set, otherwise falls back to "Sarah"
                (``"elevenlabs/EXAVITQu4vr4xnSDxMaL"``) — premade and
                accessible on the ElevenLabs free tier as of 2026-05.
                Other premade voices (e.g. "Rachel"
                ``21m00Tcm4TlvDq8ikWAM``) returned 402 paid_plan_required
                from the EL TTS API; gating differs per voice.  Set
                ``ELEVENLABS_VOICE_ID`` to override.
            stt: STTProvider override. Defaults to
                ``ElevenLabsSTTProvider(api_key=api_key)``.
            system_prompt: Optional system prompt. Defaults to
                ``ComposableVoiceAgent.DEFAULT_SYSTEM_PROMPT``.
        """
        import os

        if voice is None:
            env_voice_id = os.environ.get("ELEVENLABS_VOICE_ID")
            voice = (
                f"elevenlabs/{env_voice_id}"
                if env_voice_id
                else "elevenlabs/EXAVITQu4vr4xnSDxMaL"  # "Sarah" — free-tier premade
            )
        resolved_stt = stt if stt is not None else ElevenLabsSTTProvider(api_key=api_key)
        super().__init__(stt=resolved_stt, llm=llm, tts=voice, system_prompt=system_prompt)
        self._api_key = api_key
        self.voice = voice

    def __repr__(self) -> str:  # redact credentials
        return (
            f"ElevenLabsVoiceAgent("
            f"api_key='***', llm={self.llm!r}, voice={self.voice!r})"
        )

Ancestors

Inherited members

class FirstChunkTimeoutError (*, timeout: float)

Raised when the agent fails to send its first audio chunk within response_timeout.

WHY this subclass exists: operators could not distinguish a first-chunk hang (agent never spoke — wrong endpoint, VAD never fired, response_timeout too short) from a tail-silence cutoff (agent finished speaking normally). The bare asyncio.TimeoutError that escaped previously had an empty str() and no structured attributes, so log aggregators and re-raise chains had no signal. This class embeds the phase marker (_FIRST_CHUNK_PHASE) in its message, a machine-readable .timeout attribute, and chains the original transport error via __cause__.

Expand source code
class FirstChunkTimeoutError(asyncio.TimeoutError):
    """Raised when the agent fails to send its first audio chunk within ``response_timeout``.

    WHY this subclass exists: operators could not distinguish a first-chunk hang
    (agent never spoke — wrong endpoint, VAD never fired, response_timeout too
    short) from a tail-silence cutoff (agent finished speaking normally).  The
    bare ``asyncio.TimeoutError`` that escaped previously had an empty ``str()``
    and no structured attributes, so log aggregators and re-raise chains had no
    signal.  This class embeds the phase marker (``_FIRST_CHUNK_PHASE``) in its
    message, a machine-readable ``.timeout`` attribute, and chains the original
    transport error via ``__cause__``.
    """

    def __init__(self, *, timeout: float) -> None:
        self.timeout = timeout
        self.phase = _FIRST_CHUNK_PHASE
        super().__init__(
            f"agent did not send its first audio chunk within {timeout}s "
            f"(phase={_FIRST_CHUNK_PHASE})"
        )

Ancestors

  • builtins.TimeoutError
  • builtins.OSError
  • builtins.Exception
  • builtins.BaseException
class GeminiLiveAgentAdapter (model: str = 'gemini-2.5-flash-native-audio-latest', voice: str = 'Algieba', system_instruction: str = '', api_key: Optional[str] = None)

Gemini Live native-audio adapter.

Connects directly to the Gemini Live API via the official google-genai SDK. STT, LLM, and TTS all run on Google's infrastructure; audio flows bidirectionally as raw PCM16.

Example::

adapter = GeminiLiveAgentAdapter(
    model=GEMINI_LIVE_MODEL,
    system_instruction="You are a helpful assistant.",
)
async with adapter:
    # scenario.run() feeds send_audio / recv_audio ...

Attributes

last_agent_transcript
Most-recent output transcript received from the server (if transcription is available), for observability.
Expand source code
class GeminiLiveAgentAdapter(VoiceAgentAdapter):
    """
    Gemini Live native-audio adapter.

    Connects directly to the Gemini Live API via the official ``google-genai``
    SDK.  STT, LLM, and TTS all run on Google's infrastructure; audio flows
    bidirectionally as raw PCM16.

    Example::

        adapter = GeminiLiveAgentAdapter(
            model=GEMINI_LIVE_MODEL,
            system_instruction="You are a helpful assistant.",
        )
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...

    Attributes:
        last_agent_transcript: Most-recent output transcript received from
            the server (if transcription is available), for observability.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # ``interruption=True``: with explicit Activity markers and
        # ``activity_handling=START_OF_ACTIVITY_INTERRUPTS`` (see
        # ``connect``), the next ``activity_start`` we send while the
        # model is replying causes Gemini to cut its in-flight audio.
        # ``interrupt()`` itself just drains stale chunks out of the
        # local queue so the recovery agent turn doesn't replay them.
        interruption=True,
        input_formats=["pcm16/16000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        model: str = GEMINI_LIVE_MODEL,
        voice: str = "Algieba",
        system_instruction: str = "",
        api_key: Optional[str] = None,
    ) -> None:
        super().__init__()
        self.model = model
        self.voice = voice
        self.system_instruction = system_instruction
        # Resolve key: explicit arg > env var.
        self._api_key: str = api_key or os.environ.get("GEMINI_API_KEY", "")

        # Populated when the background session task is live.
        self._session: Optional[Any] = None
        self._session_task: Optional[asyncio.Task[None]] = None
        self._session_ready: Optional[asyncio.Event] = None
        self._shutdown: Optional[asyncio.Event] = None
        self._session_error: Optional[BaseException] = None

        # Cached async iterator on ``session.receive()``. Acquired lazily
        # on the first ``recv_audio`` call so we can iterate the same
        # stream across consecutive agent turns. Without caching, each
        # ``recv_audio`` would call ``session.receive()`` afresh — which
        # the SDK does not support cleanly across turns.
        self._recv_iter: Optional[Any] = None
        # Tracks whether any audio was received on the CURRENT iterator
        # (reset whenever ``_recv_iter`` is recreated). Used by
        # ``recv_audio`` to distinguish a spurious empty-interrupt turn
        # (no audio at all) from a real mid-reply interrupt (audio
        # arrived before the interrupt landed).
        self._iter_had_audio: bool = False

        # Observability.
        self.last_agent_transcript: Optional[str] = None

    def __repr__(self) -> str:
        # Never leak the API key.
        masked = "***" if self._api_key else ""
        return (
            f"GeminiLiveAgentAdapter("
            f"model={self.model!r}, "
            f"voice={self.voice!r}, "
            f"api_key={masked!r})"
        )

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open a Gemini Live session.

        Spawns a background task that holds the ``async with`` SDK context open
        for the adapter's lifetime.  Returns once the session handshake is
        complete and audio can flow.
        """
        from google import genai  # type: ignore[attr-defined]  # noqa: PLC0415 — lazy import
        from google.genai import types  # noqa: PLC0415

        self._session_ready = asyncio.Event()
        self._shutdown = asyncio.Event()
        loop = asyncio.get_running_loop()
        session_future: asyncio.Future[Any] = loop.create_future()

        config = types.LiveConnectConfig(
            response_modalities=[types.Modality.AUDIO],
            system_instruction=self.system_instruction or None,
            speech_config=types.SpeechConfig(
                voice_config=types.VoiceConfig(
                    prebuilt_voice_config=types.PrebuiltVoiceConfig(
                        voice_name=self.voice,
                    )
                )
            ),
            # Disable Automatic Activity Detection. AAD requires a clean
            # trailing silence to fire its end-of-speech detector, which is
            # unreliable across the audio shapes scenario produces (TTS'd
            # user-sim audio, scripted clips, layered interruptions). With
            # AAD off we drive turn boundaries explicitly via
            # ``activity_start`` / ``activity_end`` in ``send_audio``;
            # Gemini replies the moment we close the turn instead of
            # waiting on its own VAD heuristic. Activity handling is left
            # at its default (START_OF_ACTIVITY_INTERRUPTS): when we send
            # a new ``activity_start`` while Gemini is mid-reply, the
            # model treats it as a barge-in and cuts its in-flight audio.
            #
            # Subtlety: even after ``generation_complete`` on turn N, the
            # next ``activity_start`` opening turn N+1 is still treated as
            # a barge-in on the just-completed turn. The server emits a
            # spurious ``interrupted → turn_complete`` pair (with no model
            # output) BEFORE actually producing turn N+1's reply. The
            # ``recv_audio`` loop transparently skips that empty pair and
            # re-enters ``session.receive()`` to read the real reply.
            realtime_input_config=types.RealtimeInputConfig(
                automatic_activity_detection=types.AutomaticActivityDetection(
                    disabled=True,
                ),
            ),
            # Enable transcripts so the recv loop can populate
            # last_agent_transcript / chunk.transcript. Without these,
            # audio still flows but consumers (judge, manifest) get
            # no readable text.
            input_audio_transcription=types.AudioTranscriptionConfig(),
            output_audio_transcription=types.AudioTranscriptionConfig(),
        )

        client = genai.Client(api_key=self._api_key)

        async def _session_lifetime() -> None:
            """Hold the SDK context manager open; expose session via future."""
            try:
                async with client.aio.live.connect(
                    model=self.model, config=config
                ) as session:
                    if not session_future.done():
                        session_future.set_result(session)
                    assert self._session_ready is not None
                    self._session_ready.set()
                    # Stay alive until disconnect() fires the shutdown event.
                    assert self._shutdown is not None
                    await self._shutdown.wait()
            except Exception as exc:
                self._session_error = exc
                if not session_future.done():
                    session_future.set_exception(exc)
                assert self._session_ready is not None
                self._session_ready.set()  # unblock connect() even on error

        self._session_task = asyncio.create_task(_session_lifetime())

        # Wait until the session is ready (or errored).
        assert self._session_ready is not None
        await self._session_ready.wait()

        if self._session_error is not None:
            raise self._session_error

        self._session = await session_future
        self._recv_iter = None
        logger.debug("GeminiLiveAgentAdapter: connected model=%s", self.model)

    async def disconnect(self) -> None:
        """Close the Gemini Live session."""
        if self._recv_iter is not None:
            try:
                await self._recv_iter.aclose()  # type: ignore[attr-defined]
            except Exception:
                # Best-effort teardown: the iterator may already be
                # closed or in an invalid state during shutdown. Any
                # exception here is non-actionable since we're tearing
                # down anyway.
                pass
            self._recv_iter = None
        if self._shutdown is not None:
            self._shutdown.set()
        if self._session_task is not None:
            try:
                await asyncio.wait_for(self._session_task, timeout=5.0)
            except (asyncio.TimeoutError, Exception):
                # Timeout: task didn't finish in 5s — proceed with
                # teardown anyway, can't block disconnect indefinitely.
                # Other Exception: task error during shutdown is
                # non-actionable; we're discarding the session.
                pass
        self._session = None
        self._session_task = None
        self._session_ready = None
        self._shutdown = None
        self._session_error = None
        logger.debug("GeminiLiveAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn.

        Resamples from 24kHz → 16kHz at the wire boundary so the adapter
        speaks Gemini's expected ``audio/pcm;rate=16000`` format while the rest
        of the framework stays at the canonical 24kHz.

        Wraps the audio in explicit ``activity_start`` / ``activity_end``
        markers because we connect with Automatic Activity Detection
        disabled (see ``connect``). Each ``send_audio`` call is therefore a
        complete user turn from Gemini's perspective: it triggers the
        model to reply immediately on ``activity_end`` instead of waiting
        on its own VAD heuristic to detect end-of-speech. This is critical
        for the interrupt path — when the user barges in, we send a fresh
        turn boundary on top of the agent's in-flight reply, which Gemini
        treats as a deterministic interruption signal.
        """
        if self._session is None:
            raise RuntimeError("GeminiLiveAgentAdapter: not connected")
        from google.genai import types  # noqa: PLC0415

        pcm_16k = _resample_pcm16(chunk.data, CANONICAL_RATE, GEMINI_INPUT_RATE)
        if not pcm_16k:
            return
        # New user turn → reset transcript and the per-turn receive
        # iterator so the next ``recv_audio`` enters
        # ``session.receive()`` fresh for this turn.
        self._reset_turn_transcript()
        if self._recv_iter is not None:
            try:
                await self._recv_iter.aclose()  # type: ignore[attr-defined]
            except Exception:
                # Best-effort: prior turn's receive iterator may already be
                # closed or in an error state. We're resetting to start a new
                # turn — propagating here would block legitimate new turns.
                pass
            self._recv_iter = None
        await self._session.send_realtime_input(activity_start=types.ActivityStart())
        blob = types.Blob(
            data=pcm_16k,
            mime_type="audio/pcm;rate=16000",
        )
        await self._session.send_realtime_input(audio=blob)
        await self._session.send_realtime_input(activity_end=types.ActivityEnd())

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """Receive the next audio fragment from Gemini Live for the current turn.

        The SDK's ``session.receive()`` async generator yields messages
        for ONE model turn then stops at ``turn_complete``. We cache the
        per-turn iterator on ``self._recv_iter`` and reset it when the
        previous turn ended (StopAsyncIteration), so each user turn
        sent via ``send_audio`` can read its full reply across multiple
        ``recv_audio`` calls without us re-entering ``session.receive()``
        mid-turn (which would skip messages already buffered server-side).

        Returns the next non-empty audio chunk as soon as it arrives
        so the executor's ``_drain_agent_response`` can set
        ``_agent_speaking_event`` early — the interruption path depends
        on this.

        On ``turn_complete`` returns an empty AudioChunk so the drain
        loop's tail-silence path exits.

        Raises ``asyncio.TimeoutError`` if no chunk arrives within
        ``timeout`` seconds.
        """
        if self._session is None:
            raise RuntimeError("GeminiLiveAgentAdapter: not connected")
        if self._recv_iter is None:
            self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
            self._iter_had_audio = False

        async def _next_chunk() -> AudioChunk:
            pending_delta = ""
            # Local-to-call: detects the spurious empty-interrupt turn
            # pattern (server emits ``interrupted=True`` then
            # ``turn_complete=True`` with no audio at all when a fresh
            # ``activity_start`` arrives during turn N's post-
            # ``generation_complete`` playback delay). Combined with
            # ``self._iter_had_audio`` (iterator-scope) we can tell the
            # difference between a spurious turn (no audio ever, on this
            # iterator) and a real mid-reply interrupt (audio arrived
            # earlier on this iterator).
            saw_interrupted = False
            while True:
                try:
                    assert self._recv_iter is not None
                    message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
                except StopAsyncIteration:
                    # The previous turn ended (turn_complete already
                    # consumed). Surface end-of-turn to the drain loop
                    # and reset the iterator so the next user turn
                    # can re-enter session.receive() afresh.
                    self._recv_iter = None
                    return AudioChunk(
                        data=b"",
                        transcript=pending_delta or None,
                    )

                if message.go_away is not None:
                    raise RuntimeError(
                        f"GeminiLiveAgentAdapter: server sent go_away: {message.go_away}"
                    )

                sc = message.server_content
                if sc is None:
                    continue

                if getattr(sc, "interrupted", None):
                    saw_interrupted = True

                if sc.output_transcription is not None:
                    transcript_text = getattr(sc.output_transcription, "text", None)
                    if transcript_text:
                        pending_delta += transcript_text
                        existing = self.last_agent_transcript or ""
                        self.last_agent_transcript = existing + transcript_text

                if sc.model_turn is not None and sc.model_turn.parts:
                    audio_bytes = b""
                    for part in sc.model_turn.parts:
                        if part.inline_data is not None and part.inline_data.data:
                            audio_bytes += part.inline_data.data
                    if audio_bytes:
                        if len(audio_bytes) % 2 == 1:
                            audio_bytes = audio_bytes[:-1]
                        if audio_bytes:
                            self._iter_had_audio = True
                            return AudioChunk(
                                data=audio_bytes,
                                transcript=pending_delta or None,
                            )

                if sc.turn_complete:
                    # Spurious empty-interrupt turn? When activity_start
                    # opens turn N+1 after turn N's generation_complete,
                    # the server emits ``interrupted → turn_complete`` with
                    # no audio FIRST, then the real reply in a separate
                    # turn. Detect that pattern (saw interrupted=True, no
                    # audio on THIS iterator, no transcript) and re-enter
                    # ``session.receive()`` to read the actual reply.
                    #
                    # We gate on ``self._iter_had_audio`` (iterator-scope)
                    # rather than this call's audio: a real mid-reply
                    # interrupt earlier in the same turn would have yielded
                    # audio chunks before this point, even if THIS call sees
                    # only the trailing ``interrupted → turn_complete`` pair.
                    if (
                        saw_interrupted
                        and not self._iter_had_audio
                        and not pending_delta
                    ):
                        self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
                        self._iter_had_audio = False
                        saw_interrupted = False
                        continue
                    # Real end-of-turn — yield empty AudioChunk and reset
                    # the iterator. The next ``recv_audio`` call (for the
                    # next user turn) will re-enter ``session.receive()``.
                    self._recv_iter = None
                    return AudioChunk(
                        data=b"",
                        transcript=pending_delta or None,
                    )

        return await asyncio.wait_for(_next_chunk(), timeout=timeout)

    async def interrupt(self) -> None:
        """Drain leftover chunks from the in-flight agent turn so the
        recovery agent's ``recv_audio`` doesn't pick them up as a fake
        first reply.

        On Gemini Live, when we send a fresh ``activity_start`` (the
        next ``send_audio``) while the model is mid-reply, the server
        cuts its in-flight audio AND emits ``turn_complete`` for that
        cancelled turn. ``session.receive()`` is a one-turn generator,
        so the cancelled turn's tail messages still need to be consumed
        before the next ``session.receive()`` invocation can read the
        recovery turn cleanly. ``interrupt()`` consumes them up to that
        ``turn_complete`` and resets the cached iterator.

        Best-effort: bounded by 2 seconds so a stuck stream doesn't
        block the executor's interrupt sequence.
        """
        if self._session is None or self._recv_iter is None:
            return
        try:
            async with asyncio.timeout(2.0):
                while True:
                    try:
                        message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
                    except StopAsyncIteration:
                        break
                    sc = getattr(message, "server_content", None)
                    if sc is not None and sc.turn_complete:
                        break
        except asyncio.TimeoutError:
            # Bounded drain: if the server doesn't close out the turn within
            # 2s after we sent the activity_end, give up and proceed. The
            # finally block will still close the iterator.
            pass
        finally:
            try:
                await self._recv_iter.aclose()  # type: ignore[attr-defined]
            except Exception:
                # Best-effort close; the iterator may already be exhausted
                # or in an error state. Don't mask the original outcome.
                pass
            self._recv_iter = None

    def _reset_turn_transcript(self) -> None:
        """Clear the running transcript before each new agent turn.

        Called from ``send_audio`` so each turn starts fresh — otherwise
        the agent's first reply text would be permanently prefixed onto
        all subsequent turns' transcripts.
        """
        self.last_agent_transcript = None

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Methods

async def connect(self) ‑> None

Open a Gemini Live session.

Spawns a background task that holds the async with SDK context open for the adapter's lifetime. Returns once the session handshake is complete and audio can flow.

Expand source code
async def connect(self) -> None:
    """Open a Gemini Live session.

    Spawns a background task that holds the ``async with`` SDK context open
    for the adapter's lifetime.  Returns once the session handshake is
    complete and audio can flow.
    """
    from google import genai  # type: ignore[attr-defined]  # noqa: PLC0415 — lazy import
    from google.genai import types  # noqa: PLC0415

    self._session_ready = asyncio.Event()
    self._shutdown = asyncio.Event()
    loop = asyncio.get_running_loop()
    session_future: asyncio.Future[Any] = loop.create_future()

    config = types.LiveConnectConfig(
        response_modalities=[types.Modality.AUDIO],
        system_instruction=self.system_instruction or None,
        speech_config=types.SpeechConfig(
            voice_config=types.VoiceConfig(
                prebuilt_voice_config=types.PrebuiltVoiceConfig(
                    voice_name=self.voice,
                )
            )
        ),
        # Disable Automatic Activity Detection. AAD requires a clean
        # trailing silence to fire its end-of-speech detector, which is
        # unreliable across the audio shapes scenario produces (TTS'd
        # user-sim audio, scripted clips, layered interruptions). With
        # AAD off we drive turn boundaries explicitly via
        # ``activity_start`` / ``activity_end`` in ``send_audio``;
        # Gemini replies the moment we close the turn instead of
        # waiting on its own VAD heuristic. Activity handling is left
        # at its default (START_OF_ACTIVITY_INTERRUPTS): when we send
        # a new ``activity_start`` while Gemini is mid-reply, the
        # model treats it as a barge-in and cuts its in-flight audio.
        #
        # Subtlety: even after ``generation_complete`` on turn N, the
        # next ``activity_start`` opening turn N+1 is still treated as
        # a barge-in on the just-completed turn. The server emits a
        # spurious ``interrupted → turn_complete`` pair (with no model
        # output) BEFORE actually producing turn N+1's reply. The
        # ``recv_audio`` loop transparently skips that empty pair and
        # re-enters ``session.receive()`` to read the real reply.
        realtime_input_config=types.RealtimeInputConfig(
            automatic_activity_detection=types.AutomaticActivityDetection(
                disabled=True,
            ),
        ),
        # Enable transcripts so the recv loop can populate
        # last_agent_transcript / chunk.transcript. Without these,
        # audio still flows but consumers (judge, manifest) get
        # no readable text.
        input_audio_transcription=types.AudioTranscriptionConfig(),
        output_audio_transcription=types.AudioTranscriptionConfig(),
    )

    client = genai.Client(api_key=self._api_key)

    async def _session_lifetime() -> None:
        """Hold the SDK context manager open; expose session via future."""
        try:
            async with client.aio.live.connect(
                model=self.model, config=config
            ) as session:
                if not session_future.done():
                    session_future.set_result(session)
                assert self._session_ready is not None
                self._session_ready.set()
                # Stay alive until disconnect() fires the shutdown event.
                assert self._shutdown is not None
                await self._shutdown.wait()
        except Exception as exc:
            self._session_error = exc
            if not session_future.done():
                session_future.set_exception(exc)
            assert self._session_ready is not None
            self._session_ready.set()  # unblock connect() even on error

    self._session_task = asyncio.create_task(_session_lifetime())

    # Wait until the session is ready (or errored).
    assert self._session_ready is not None
    await self._session_ready.wait()

    if self._session_error is not None:
        raise self._session_error

    self._session = await session_future
    self._recv_iter = None
    logger.debug("GeminiLiveAgentAdapter: connected model=%s", self.model)
async def disconnect(self) ‑> None

Close the Gemini Live session.

Expand source code
async def disconnect(self) -> None:
    """Close the Gemini Live session."""
    if self._recv_iter is not None:
        try:
            await self._recv_iter.aclose()  # type: ignore[attr-defined]
        except Exception:
            # Best-effort teardown: the iterator may already be
            # closed or in an invalid state during shutdown. Any
            # exception here is non-actionable since we're tearing
            # down anyway.
            pass
        self._recv_iter = None
    if self._shutdown is not None:
        self._shutdown.set()
    if self._session_task is not None:
        try:
            await asyncio.wait_for(self._session_task, timeout=5.0)
        except (asyncio.TimeoutError, Exception):
            # Timeout: task didn't finish in 5s — proceed with
            # teardown anyway, can't block disconnect indefinitely.
            # Other Exception: task error during shutdown is
            # non-actionable; we're discarding the session.
            pass
    self._session = None
    self._session_task = None
    self._session_ready = None
    self._shutdown = None
    self._session_error = None
    logger.debug("GeminiLiveAgentAdapter: disconnected")
async def interrupt(self) ‑> None

Drain leftover chunks from the in-flight agent turn so the recovery agent's recv_audio doesn't pick them up as a fake first reply.

On Gemini Live, when we send a fresh activity_start (the next send_audio) while the model is mid-reply, the server cuts its in-flight audio AND emits turn_complete for that cancelled turn. session.receive() is a one-turn generator, so the cancelled turn's tail messages still need to be consumed before the next session.receive() invocation can read the recovery turn cleanly. interrupt() consumes them up to that turn_complete and resets the cached iterator.

Best-effort: bounded by 2 seconds so a stuck stream doesn't block the executor's interrupt sequence.

Expand source code
async def interrupt(self) -> None:
    """Drain leftover chunks from the in-flight agent turn so the
    recovery agent's ``recv_audio`` doesn't pick them up as a fake
    first reply.

    On Gemini Live, when we send a fresh ``activity_start`` (the
    next ``send_audio``) while the model is mid-reply, the server
    cuts its in-flight audio AND emits ``turn_complete`` for that
    cancelled turn. ``session.receive()`` is a one-turn generator,
    so the cancelled turn's tail messages still need to be consumed
    before the next ``session.receive()`` invocation can read the
    recovery turn cleanly. ``interrupt()`` consumes them up to that
    ``turn_complete`` and resets the cached iterator.

    Best-effort: bounded by 2 seconds so a stuck stream doesn't
    block the executor's interrupt sequence.
    """
    if self._session is None or self._recv_iter is None:
        return
    try:
        async with asyncio.timeout(2.0):
            while True:
                try:
                    message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
                except StopAsyncIteration:
                    break
                sc = getattr(message, "server_content", None)
                if sc is not None and sc.turn_complete:
                    break
    except asyncio.TimeoutError:
        # Bounded drain: if the server doesn't close out the turn within
        # 2s after we sent the activity_end, give up and proceed. The
        # finally block will still close the iterator.
        pass
    finally:
        try:
            await self._recv_iter.aclose()  # type: ignore[attr-defined]
        except Exception:
            # Best-effort close; the iterator may already be exhausted
            # or in an error state. Don't mask the original outcome.
            pass
        self._recv_iter = None
async def recv_audio(self, timeout: float) ‑> AudioChunk

Receive the next audio fragment from Gemini Live for the current turn.

The SDK's session.receive() async generator yields messages for ONE model turn then stops at turn_complete. We cache the per-turn iterator on self._recv_iter and reset it when the previous turn ended (StopAsyncIteration), so each user turn sent via send_audio can read its full reply across multiple recv_audio calls without us re-entering session.receive() mid-turn (which would skip messages already buffered server-side).

Returns the next non-empty audio chunk as soon as it arrives so the executor's _drain_agent_response can set _agent_speaking_event early — the interruption path depends on this.

On turn_complete returns an empty AudioChunk so the drain loop's tail-silence path exits.

Raises asyncio.TimeoutError if no chunk arrives within timeout seconds.

Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """Receive the next audio fragment from Gemini Live for the current turn.

    The SDK's ``session.receive()`` async generator yields messages
    for ONE model turn then stops at ``turn_complete``. We cache the
    per-turn iterator on ``self._recv_iter`` and reset it when the
    previous turn ended (StopAsyncIteration), so each user turn
    sent via ``send_audio`` can read its full reply across multiple
    ``recv_audio`` calls without us re-entering ``session.receive()``
    mid-turn (which would skip messages already buffered server-side).

    Returns the next non-empty audio chunk as soon as it arrives
    so the executor's ``_drain_agent_response`` can set
    ``_agent_speaking_event`` early — the interruption path depends
    on this.

    On ``turn_complete`` returns an empty AudioChunk so the drain
    loop's tail-silence path exits.

    Raises ``asyncio.TimeoutError`` if no chunk arrives within
    ``timeout`` seconds.
    """
    if self._session is None:
        raise RuntimeError("GeminiLiveAgentAdapter: not connected")
    if self._recv_iter is None:
        self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
        self._iter_had_audio = False

    async def _next_chunk() -> AudioChunk:
        pending_delta = ""
        # Local-to-call: detects the spurious empty-interrupt turn
        # pattern (server emits ``interrupted=True`` then
        # ``turn_complete=True`` with no audio at all when a fresh
        # ``activity_start`` arrives during turn N's post-
        # ``generation_complete`` playback delay). Combined with
        # ``self._iter_had_audio`` (iterator-scope) we can tell the
        # difference between a spurious turn (no audio ever, on this
        # iterator) and a real mid-reply interrupt (audio arrived
        # earlier on this iterator).
        saw_interrupted = False
        while True:
            try:
                assert self._recv_iter is not None
                message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
            except StopAsyncIteration:
                # The previous turn ended (turn_complete already
                # consumed). Surface end-of-turn to the drain loop
                # and reset the iterator so the next user turn
                # can re-enter session.receive() afresh.
                self._recv_iter = None
                return AudioChunk(
                    data=b"",
                    transcript=pending_delta or None,
                )

            if message.go_away is not None:
                raise RuntimeError(
                    f"GeminiLiveAgentAdapter: server sent go_away: {message.go_away}"
                )

            sc = message.server_content
            if sc is None:
                continue

            if getattr(sc, "interrupted", None):
                saw_interrupted = True

            if sc.output_transcription is not None:
                transcript_text = getattr(sc.output_transcription, "text", None)
                if transcript_text:
                    pending_delta += transcript_text
                    existing = self.last_agent_transcript or ""
                    self.last_agent_transcript = existing + transcript_text

            if sc.model_turn is not None and sc.model_turn.parts:
                audio_bytes = b""
                for part in sc.model_turn.parts:
                    if part.inline_data is not None and part.inline_data.data:
                        audio_bytes += part.inline_data.data
                if audio_bytes:
                    if len(audio_bytes) % 2 == 1:
                        audio_bytes = audio_bytes[:-1]
                    if audio_bytes:
                        self._iter_had_audio = True
                        return AudioChunk(
                            data=audio_bytes,
                            transcript=pending_delta or None,
                        )

            if sc.turn_complete:
                # Spurious empty-interrupt turn? When activity_start
                # opens turn N+1 after turn N's generation_complete,
                # the server emits ``interrupted → turn_complete`` with
                # no audio FIRST, then the real reply in a separate
                # turn. Detect that pattern (saw interrupted=True, no
                # audio on THIS iterator, no transcript) and re-enter
                # ``session.receive()`` to read the actual reply.
                #
                # We gate on ``self._iter_had_audio`` (iterator-scope)
                # rather than this call's audio: a real mid-reply
                # interrupt earlier in the same turn would have yielded
                # audio chunks before this point, even if THIS call sees
                # only the trailing ``interrupted → turn_complete`` pair.
                if (
                    saw_interrupted
                    and not self._iter_had_audio
                    and not pending_delta
                ):
                    self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
                    self._iter_had_audio = False
                    saw_interrupted = False
                    continue
                # Real end-of-turn — yield empty AudioChunk and reset
                # the iterator. The next ``recv_audio`` call (for the
                # next user turn) will re-enter ``session.receive()``.
                self._recv_iter = None
                return AudioChunk(
                    data=b"",
                    transcript=pending_delta or None,
                )

    return await asyncio.wait_for(_next_chunk(), timeout=timeout)
async def send_audio(self, chunk: AudioChunk) ‑> None

Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn.

Resamples from 24kHz → 16kHz at the wire boundary so the adapter speaks Gemini's expected audio/pcm;rate=16000 format while the rest of the framework stays at the canonical 24kHz.

Wraps the audio in explicit activity_start / activity_end markers because we connect with Automatic Activity Detection disabled (see connect). Each send_audio call is therefore a complete user turn from Gemini's perspective: it triggers the model to reply immediately on activity_end instead of waiting on its own VAD heuristic to detect end-of-speech. This is critical for the interrupt path — when the user barges in, we send a fresh turn boundary on top of the agent's in-flight reply, which Gemini treats as a deterministic interruption signal.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn.

    Resamples from 24kHz → 16kHz at the wire boundary so the adapter
    speaks Gemini's expected ``audio/pcm;rate=16000`` format while the rest
    of the framework stays at the canonical 24kHz.

    Wraps the audio in explicit ``activity_start`` / ``activity_end``
    markers because we connect with Automatic Activity Detection
    disabled (see ``connect``). Each ``send_audio`` call is therefore a
    complete user turn from Gemini's perspective: it triggers the
    model to reply immediately on ``activity_end`` instead of waiting
    on its own VAD heuristic to detect end-of-speech. This is critical
    for the interrupt path — when the user barges in, we send a fresh
    turn boundary on top of the agent's in-flight reply, which Gemini
    treats as a deterministic interruption signal.
    """
    if self._session is None:
        raise RuntimeError("GeminiLiveAgentAdapter: not connected")
    from google.genai import types  # noqa: PLC0415

    pcm_16k = _resample_pcm16(chunk.data, CANONICAL_RATE, GEMINI_INPUT_RATE)
    if not pcm_16k:
        return
    # New user turn → reset transcript and the per-turn receive
    # iterator so the next ``recv_audio`` enters
    # ``session.receive()`` fresh for this turn.
    self._reset_turn_transcript()
    if self._recv_iter is not None:
        try:
            await self._recv_iter.aclose()  # type: ignore[attr-defined]
        except Exception:
            # Best-effort: prior turn's receive iterator may already be
            # closed or in an error state. We're resetting to start a new
            # turn — propagating here would block legitimate new turns.
            pass
        self._recv_iter = None
    await self._session.send_realtime_input(activity_start=types.ActivityStart())
    blob = types.Blob(
        data=pcm_16k,
        mime_type="audio/pcm;rate=16000",
    )
    await self._session.send_realtime_input(audio=blob)
    await self._session.send_realtime_input(activity_end=types.ActivityEnd())

Inherited members

class InterruptionConfig (probability: float = 0.3, delay_range: Tuple[float, float] = (0.5, 3.0), strategy: "Literal['contextual', 'random_phrase']" = 'random_phrase', phrases: Sequence[str] = <factory>)

Configuration for random interruptions during proceed().

Expand source code
@dataclass
class InterruptionConfig:
    """Configuration for random interruptions during ``proceed()``."""

    probability: float = 0.3
    delay_range: Tuple[float, float] = (0.5, 3.0)
    strategy: Literal["contextual", "random_phrase"] = "random_phrase"
    phrases: Sequence[str] = field(default_factory=lambda: _CANNED_PHRASES)

    def should_interrupt(self, rng: random.Random | None = None) -> bool:
        r = rng or random
        return r.random() < self.probability

    def sample_delay(self, rng: random.Random | None = None) -> float:
        r = rng or random
        lo, hi = self.delay_range
        return r.uniform(lo, hi)

    def pick_random_phrase(self, rng: random.Random | None = None) -> str:
        r = rng or random
        return r.choice(list(self.phrases))

Instance variables

var delay_range : Tuple[float, float]
var phrases : Sequence[str]
var probability : float
var strategy : Literal['contextual', 'random_phrase']

Methods

def pick_random_phrase(self, rng: random.Random | None = None) ‑> str
Expand source code
def pick_random_phrase(self, rng: random.Random | None = None) -> str:
    r = rng or random
    return r.choice(list(self.phrases))
def sample_delay(self, rng: random.Random | None = None) ‑> float
Expand source code
def sample_delay(self, rng: random.Random | None = None) -> float:
    r = rng or random
    lo, hi = self.delay_range
    return r.uniform(lo, hi)
def should_interrupt(self, rng: random.Random | None = None) ‑> bool
Expand source code
def should_interrupt(self, rng: random.Random | None = None) -> bool:
    r = rng or random
    return r.random() < self.probability
class LatencyMetrics (measurements: List[float] = <factory>, time_to_first_byte: Optional[float] = None, interrupt_response_time: Optional[float] = None)

Summary of agent response timing across the conversation.

Expand source code
@dataclass
class LatencyMetrics:
    """Summary of agent response timing across the conversation."""

    measurements: List[float] = field(default_factory=list)
    time_to_first_byte: Optional[float] = None
    interrupt_response_time: Optional[float] = None

    @property
    def avg_response_time(self) -> Optional[float]:
        if not self.measurements:
            return None
        return sum(self.measurements) / len(self.measurements)

    @property
    def p50_response_time(self) -> Optional[float]:
        if not self.measurements:
            return None
        return median(self.measurements)

    @property
    def p95_response_time(self) -> Optional[float]:
        if not self.measurements:
            return None
        import math
        sorted_ms = sorted(self.measurements)
        # Ceiling-style: round up so p95 reflects the tail, not the body.
        idx = min(len(sorted_ms) - 1, math.ceil(0.95 * (len(sorted_ms) - 1)))
        return sorted_ms[idx]

Instance variables

var avg_response_time : Optional[float]
Expand source code
@property
def avg_response_time(self) -> Optional[float]:
    if not self.measurements:
        return None
    return sum(self.measurements) / len(self.measurements)
var interrupt_response_time : float | None
var measurements : List[float]
var p50_response_time : Optional[float]
Expand source code
@property
def p50_response_time(self) -> Optional[float]:
    if not self.measurements:
        return None
    return median(self.measurements)
var p95_response_time : Optional[float]
Expand source code
@property
def p95_response_time(self) -> Optional[float]:
    if not self.measurements:
        return None
    import math
    sorted_ms = sorted(self.measurements)
    # Ceiling-style: round up so p95 reflects the tail, not the body.
    idx = min(len(sorted_ms) - 1, math.ceil(0.95 * (len(sorted_ms) - 1)))
    return sorted_ms[idx]
var time_to_first_byte : float | None
class LiveKitAgentAdapter (url: str, api_key: str, api_secret: str, room: str)

Abstract base for voice agents that exchange audio with the agent under test.

Subclasses implement connect, disconnect, send_audio, and recv_audio. The default call implementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.

Attributes

capabilities
Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout

Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.

60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.

Override per-adapter at construction time::

adapter = MyVoiceAdapter()
adapter.response_timeout = 90.0  # slow tool-call chain
Expand source code
class LiveKitAgentAdapter(VoiceAgentAdapter):
    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        input_formats=["pcm16/48000"],
        output_formats=["pcm16/48000"],
    )

    def __init__(self, url: str, api_key: str, api_secret: str, room: str):
        super().__init__()
        self.url = url
        self.api_key = api_key
        self.api_secret = api_secret
        self.room = room
        self._room: Optional[object] = None

    def __repr__(self) -> str:  # redact credentials
        return f"LiveKitAgentAdapter(url={self.url!r}, room={self.room!r}, api_key='***', api_secret='***')"

    async def connect(self) -> None:
        self._room = object()

    async def disconnect(self) -> None:
        self._room = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._room is None:
            raise RuntimeError("LiveKitAgentAdapter: not connected")
        raise PendingTransportError("LiveKitAgentAdapter")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._room is None:
            raise RuntimeError("LiveKitAgentAdapter: not connected")
        raise PendingTransportError("LiveKitAgentAdapter")

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class OpenAIRealtimeAgentAdapter (model: str = 'gpt-realtime-mini', voice: str = 'alloy', instructions: str = '', tools: Optional[List[Any]] = None, *, api_key: Optional[str] = None, role: AgentRole = AgentRole.AGENT, speaks_first: bool = False)

Exercise OpenAI's Realtime API as either the agent under test (role=AGENT, default) or as the voice-enabled user simulator (role=USER, per §7.2 L1164-1171).

When role=USER, scripted user("text") steps route text through the realtime session's text-input channel rather than triggering TTS.

Transcript observability: - last_user_transcript — set from conversation.item.input_audio_transcription.completed - last_agent_transcript — accumulated from response.output_audio_transcript.delta / reset on done

Example::

adapter = OpenAIRealtimeAgentAdapter(
    model=OPENAI_REALTIME_MODEL,
    voice="alloy",
    instructions="You are a helpful assistant.",
)
async with adapter:
    # scenario.run() feeds send_audio / recv_audio ...
Expand source code
class OpenAIRealtimeAgentAdapter(VoiceAgentAdapter):
    """
    Exercise OpenAI's Realtime API as either the agent under test
    (role=AGENT, default) or as the voice-enabled user simulator
    (role=USER, per §7.2 L1164-1171).

    When role=USER, scripted ``user("text")`` steps route text through the
    realtime session's text-input channel rather than triggering TTS.

    Transcript observability:
        - ``last_user_transcript`` — set from
          ``conversation.item.input_audio_transcription.completed``
        - ``last_agent_transcript`` — accumulated from
          ``response.output_audio_transcript.delta`` / reset on done

    Example::

        adapter = OpenAIRealtimeAgentAdapter(
            model=OPENAI_REALTIME_MODEL,
            voice="alloy",
            instructions="You are a helpful assistant.",
        )
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # OpenAI Realtime exposes ``response.cancel`` as a first-class
        # interrupt event — the model stops generating immediately. Mapped
        # below in ``interrupt()``.
        interruption=True,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        model: str = OPENAI_REALTIME_MODEL,
        voice: str = "alloy",
        instructions: str = "",
        tools: Optional[List[Any]] = None,
        *,
        api_key: Optional[str] = None,
        role: AgentRole = AgentRole.AGENT,
        speaks_first: bool = False,
    ):
        super().__init__()
        self.model = model
        self.voice = voice
        self.instructions = instructions
        self.tools = tools or []
        self.role = role  # type: ignore[misc]
        # Resolve API key: explicit param takes precedence over env var.
        self._api_key: str = api_key or os.environ.get("OPENAI_API_KEY", "")
        self._ws: Any = None

        # Transcript observability — updated on incoming transcript events.
        self.last_user_transcript: Optional[str] = None
        self.last_agent_transcript: Optional[str] = None

        # Accumulation buffer for streaming agent transcript deltas.
        self._agent_transcript_buf: str = ""

        # Bytes appended to input_audio_buffer since last commit. Non-zero
        # means recv_audio should commit + request a response before awaiting.
        self._pending_audio_bytes: int = 0

        # Tracks which legacy (pre-GA) event names have already triggered a
        # one-time warning, so the log isn't spammed on every audio frame.
        self._legacy_events_warned: set[str] = set()

        # --- Gap 1: agent-speaks-first support ---
        # When speaks_first=True, the adapter is configured for an agent-initiated
        # scenario where the agent must speak first without any user audio.
        self._speaks_first: bool = speaks_first

        # True while a response is in flight (set on response.created, cleared on
        # response.done / response.cancelled). Prevents double-firing response.create
        # and allows drain re-entries after a completed response to return empty
        # (clean drain exit).
        self._response_active: bool = False

        # Set to True the first time response.created is received. Guards the
        # drain re-entry short-circuit in recv_audio: the empty-chunk early-return
        # must only fire AFTER at least one response has been active and completed,
        # not on the very first recv_audio call (which would break direct-call
        # tests that don't go through notify_agent_turn).
        self._response_ever_active: bool = False

        # Per-turn signal: set by notify_agent_turn() before each agent step so
        # recv_audio knows this is a genuine agent-initiated turn (not a silent
        # proceed()/resume). Consumed (cleared) when the kick fires.
        self._agent_turn_pending: bool = speaks_first  # first turn armed if speaks_first

        # --- Issue #630: realtime function-call (tool-call) surfacing ---
        # In-progress function calls keyed by call_id. Each value is a dict
        # {"name": str|None, "arguments": str} assembled from the streaming
        # `response.function_call_arguments.delta`/`.done` events and the
        # `response.output_item.added`/`.done` (function_call item) events.
        # recv_audio() returns an AudioChunk, so it cannot carry tool calls in
        # its return value — instead the function-call branches accumulate here
        # and the overridden call() drains them into result.messages.
        self._tool_call_accumulators: dict[str, dict[str, Any]] = {}
        # Finalized OpenAI-chat tool_call dicts for the CURRENT turn, in arrival
        # order. Cleared at the start of each call() so calls never leak across
        # turns (mirrors the transcript-state reset in _drain_agent_response).
        # Shape per entry:
        #   {"id": call_id, "type": "function",
        #    "function": {"name": name, "arguments": <json-str>}}
        self._completed_tool_calls: List[dict[str, Any]] = []

    @property
    def url(self) -> str:
        return REALTIME_URL_TEMPLATE.format(model=self.model)

    def __repr__(self) -> str:  # redact credentials
        return (
            f"OpenAIRealtimeAgentAdapter("
            f"model={self.model!r}, "
            f"voice={self.voice!r}, "
            f"role={self.role!r}, "
            f"api_key='***')"
        )

    def _warn_if_legacy(self, received: str, ga_name: str) -> None:
        """Emit a one-time WARNING when a pre-GA (beta) event name is seen.

        Only fires once per distinct legacy name per adapter instance, so a
        multi-chunk audio response doesn't flood the log. The GA-named event
        itself never triggers a warning.
        """
        if received != ga_name and received not in self._legacy_events_warned:
            self._legacy_events_warned.add(received)
            logger.warning(
                "OpenAIRealtimeAgentAdapter: received legacy event %r; "
                "GA uses %r — accepting defensively",
                received,
                ga_name,
            )

    # ------------------------------------------------------------- tool calls
    # Issue #630: surface OpenAI Realtime function-call events as OpenAI-chat
    # tool_calls on the assistant message in result.messages. The Realtime wire
    # describes ONE logical call across several events
    # (`response.function_call_arguments.delta` × N → `.done`, and/or
    # `response.output_item.added`/`.done` carrying the function_call item).
    # We accumulate per call_id, then finalize into self._completed_tool_calls,
    # de-duplicating so each call_id yields exactly one tool_call (AC6).

    def _accumulate_tool_call_delta(self, call_id: str, delta: str) -> None:
        """Append a streaming arguments fragment for ``call_id``."""
        acc = self._tool_call_accumulators.setdefault(
            call_id, {"name": None, "arguments": ""}
        )
        acc["arguments"] = (acc["arguments"] or "") + (delta or "")

    def _note_tool_call_name(self, call_id: str, name: Optional[str]) -> None:
        """Record the function name for ``call_id`` (the item/added event)."""
        if not name:
            return
        acc = self._tool_call_accumulators.setdefault(
            call_id, {"name": None, "arguments": ""}
        )
        acc["name"] = name

    def _finalize_tool_call(
        self,
        call_id: Optional[str],
        *,
        name: Optional[str] = None,
        arguments: Optional[str] = None,
    ) -> None:
        """Resolve one logical function call into ``self._completed_tool_calls``.

        Idempotent on ``call_id`` (AC6): the streaming-args path and the
        output-item path both describe the SAME call, so a second finalize for
        an already-completed ``call_id`` MERGES (fills a missing name / upgrades
        to a more complete arguments string) rather than appending a duplicate.

        Degrades safely (AC7): a finalize with NO ``call_id`` is skipped with a
        DEBUG log and emits nothing. Missing arguments become ``"{}"``; a
        malformed (non-JSON) arguments string is passed through verbatim — the
        adapter never parses-and-reraises.
        """
        if not call_id:
            logger.debug(
                "OpenAIRealtimeAgentAdapter: function-call event with no "
                "call_id; skipping (AC7 degraded path)"
            )
            return

        acc = self._tool_call_accumulators.get(call_id, {})
        resolved_name = name or acc.get("name")
        # Pick the most complete arguments source: explicit (item/done) arg if
        # non-empty, else the accumulated streaming deltas, else "{}".
        candidates = [arguments, acc.get("arguments")]
        resolved_args = next(
            (c for c in candidates if c is not None and c != ""), None
        )
        if resolved_args is None:
            resolved_args = "{}"

        # De-dup / merge on call_id (AC6).
        for existing in self._completed_tool_calls:
            if existing["id"] == call_id:
                fn = existing["function"]
                if not fn.get("name") and resolved_name:
                    fn["name"] = resolved_name
                # Upgrade to a longer/more-complete arguments string if the new
                # source carries more (item arguments arriving after deltas).
                if resolved_args not in ("", "{}") and len(resolved_args) > len(
                    fn.get("arguments") or ""
                ):
                    fn["arguments"] = resolved_args
                return

        self._completed_tool_calls.append(
            {
                "id": call_id,
                "type": "function",
                "function": {
                    "name": resolved_name or "",
                    "arguments": resolved_args,
                },
            }
        )

    def notify_agent_turn(self) -> None:
        """Signal that an agent turn is about to be dispatched.

        Called by the executor before each agent step so recv_audio can fire
        a bare response.create for agent-initiated turns (where no user audio
        has been committed). This per-turn signal handles both turn 1 (opening)
        and subsequent agent turns in multi-turn scripts like [agent(), user(),
        agent()].

        Only meaningful when role=AGENT. Safe to call on every agent step —
        recv_audio consumes and clears the flag.
        """
        if self.role == AgentRole.AGENT:
            self._agent_turn_pending = True

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open the Realtime WebSocket and send the initial session.update."""
        import websockets

        self._ws = await websockets.connect(
            self.url,
            additional_headers={
                "Authorization": f"Bearer {self._api_key}",
            },
        )
        logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url)

        # Configure session: audio formats, voice, instructions, tools.
        # Disable server-side VAD (session.audio.input.turn_detection=None) so
        # we control turn boundaries explicitly via input_audio_buffer.commit +
        # response.create after each send_audio.
        session_config: dict[str, Any] = {
            "type": "realtime",
            "audio": {
                "input": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "turn_detection": None,
                    "transcription": {"model": OPENAI_STT_MODEL},
                },
                "output": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "voice": self.voice,
                },
            },
        }
        if self.instructions:
            session_config["instructions"] = self.instructions
        if self.tools:
            session_config["tools"] = self.tools

        await self._ws.send(
            json.dumps({"type": "session.update", "session": session_config})
        )
        logger.debug("OpenAIRealtimeAgentAdapter: session.update sent")

    async def disconnect(self) -> None:
        """Close the WebSocket if open."""
        if self._ws is not None:
            try:
                await self._ws.close()
            except Exception:
                # Best-effort: connection may already be half-closed or in an
                # error state when disconnect() is called. We're tearing down
                # regardless — propagating here would just leak the WS reference.
                pass
            finally:
                self._ws = None
            logger.debug("OpenAIRealtimeAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """
        Append a PCM16 audio chunk to the model's input audio buffer.

        Only emits ``input_audio_buffer.append`` — the commit + response are
        deferred to the next ``recv_audio`` call. The scenario executor may
        call ``send_audio`` many times for a single user turn (TTS streams
        audio as chunks); committing per-chunk would confuse the server with
        sub-second turn boundaries. By deferring commit to recv_audio, we
        get one server turn per user turn.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        b64 = base64.b64encode(chunk.data).decode()
        await self._ws.send(
            json.dumps({"type": "input_audio_buffer.append", "audio": b64})
        )
        self._pending_audio_bytes += len(chunk.data)

    async def interrupt(self) -> None:
        """Send ``response.cancel`` — the OpenAI Realtime API's first-class
        interrupt. The model stops generating audio and text immediately.
        No timing race against VAD: deterministic stop, then the next user
        turn flows normally through ``send_audio`` + ``recv_audio``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        await self._ws.send(json.dumps({"type": "response.cancel"}))
        logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Commit any pending audio, request a response, and return the first
        audio chunk the model produces.

        If ``send_audio`` was called since the last ``recv_audio``, this
        method commits the buffer and emits ``response.create`` before
        awaiting the reply. Subsequent recv calls without new send calls
        just await the next audio delta (for multi-chunk responses).

        Loops over incoming events until a ``response.output_audio.delta``
        event arrives (GA name), then returns decoded PCM16. The legacy
        ``response.audio.delta`` name is accepted defensively with a one-time
        warning. Transcript events update the instance's
        ``last_user_transcript`` / ``last_agent_transcript`` attributes.
        An ``error`` event raises a ``RuntimeError``. All other housekeeping
        events are ignored and the loop continues.

        Raises:
            asyncio.TimeoutError: if no audio arrives within ``timeout``.
            RuntimeError: if the server sends an error event.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # If send_audio was called since last recv, commit and request response.
        if self._pending_audio_bytes > 0:
            await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
            await self._ws.send(json.dumps({"type": "response.create"}))
            self._pending_audio_bytes = 0
            self._agent_turn_pending = False  # user spoke → per-turn signal consumed

        # Gap 1: agent-speaks-first / multi-turn agent initiation.
        # When the executor signals an agent turn via notify_agent_turn() and
        # no user audio has been committed and no response is already in flight,
        # send a bare content-free response.create so the model speaks.
        # Opening words come from the session instructions — no text is injected.
        # Self-limiting on user-first scripts: if user audio was committed above,
        # _agent_turn_pending was already cleared.
        # Also serves as the clean drain exit: if a response already completed
        # (_response_active is False after response.done cleared it) and
        # _agent_turn_pending is False, a drain re-entry returns an empty chunk.
        elif self._agent_turn_pending and not self._response_active:
            await self._ws.send(json.dumps({"type": "response.create"}))
            self._agent_turn_pending = False
            self._response_active = True

        elif not self._agent_turn_pending and not self._response_active and self._response_ever_active:
            # No pending audio, no agent-turn signal, no response in flight,
            # AND at least one response has already completed this session:
            # this is a drain re-entry after a completed response. Return empty
            # chunk so _drain_agent_response's tail-silence loop exits cleanly.
            # Guard on _response_ever_active so that a fresh recv_audio call
            # (before any response.created fires) does NOT short-circuit —
            # that would break direct recv_audio callers (e.g. unit tests that
            # call recv_audio without going through notify_agent_turn).
            return AudioChunk(data=b"")

        deadline = asyncio.get_running_loop().time() + timeout
        while True:
            remaining = deadline - asyncio.get_running_loop().time()
            if remaining <= 0:
                raise asyncio.TimeoutError(
                    "OpenAIRealtimeAgentAdapter: recv_audio timed out"
                )

            raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            try:
                event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
            except Exception:
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: non-JSON message, skipping"
                )
                continue

            etype = event.get("type", "")

            if etype in ("response.output_audio.delta", "response.audio.delta"):
                # Accept both the GA event name and its retired beta alias —
                # live gpt-realtime* models have been observed still emitting
                # the beta names. These legacy arms should be removed once the
                # GA names are confirmed stable at a live endpoint (issue #602).
                self._warn_if_legacy(etype, "response.output_audio.delta")
                b64 = event.get("delta", "")
                pcm = base64.b64decode(b64)
                # Enforce PCM16 invariant: even byte count.
                if len(pcm) % 2 == 1:
                    pcm = pcm[:-1]
                return AudioChunk(data=pcm)

            elif etype == "response.created":
                # Response is now in flight — mark it so subsequent recv_audio
                # drain re-entries don't fire a spurious second response.create.
                self._response_active = True
                self._response_ever_active = True

            elif etype in (
                "response.output_audio_transcript.delta",
                "response.audio_transcript.delta",
            ):
                # Accumulate streaming agent transcript.
                self._warn_if_legacy(etype, "response.output_audio_transcript.delta")
                self._agent_transcript_buf += event.get("delta", "")

            elif etype in (
                "response.output_audio_transcript.done",
                "response.audio_transcript.done",
            ):
                # Finalise; the `transcript` field may have the full text.
                self._warn_if_legacy(etype, "response.output_audio_transcript.done")
                transcript = event.get("transcript", "")
                if transcript:
                    self.last_agent_transcript = transcript
                elif self._agent_transcript_buf:
                    self.last_agent_transcript = self._agent_transcript_buf
                self._agent_transcript_buf = ""

            elif etype in ("response.done", "response.cancelled"):
                # Response finished or was cancelled — mark it so the next
                # drain re-entry returns an empty chunk (clean exit).
                self._response_active = False
                # Issue #646: a tool-only turn (function call, NO audio delta) would
                # otherwise loop here to the deadline and raise — the accumulated tool
                # call is parsed but never returned. When the response is done and at
                # least one tool call has been finalized this turn, return an empty
                # chunk so the drain exits cleanly and call() surfaces the tool_calls
                # message. A genuinely empty turn (done + EMPTY accumulator) must still
                # fall through to the timeout — the non-empty accumulator is the
                # discriminator, NOT response.done alone.
                # (Distinct from the drain-re-entry empty-chunk path above at the
                # _response_ever_active guard, which handles a COMPLETED prior response.)
                if self._completed_tool_calls:
                    return AudioChunk(data=b"")

            elif etype == "conversation.item.input_audio_transcription.completed":
                # User-side transcript from Whisper.
                self.last_user_transcript = event.get("transcript", "")

            elif etype == "response.function_call_arguments.delta":
                # Issue #630: streaming arguments fragment for a function call.
                # Accumulate per call_id; the call is finalized on `.done` or
                # the function_call output-item event.
                self._accumulate_tool_call_delta(
                    event.get("call_id", ""), event.get("delta", "")
                )

            elif etype == "response.function_call_arguments.done":
                # Issue #630: streaming-args path complete. `name` is typically
                # NOT on this event (it arrives via the output_item), so resolve
                # it from the accumulator. A missing call_id degrades safely.
                self._finalize_tool_call(
                    event.get("call_id"),
                    name=event.get("name"),
                    arguments=event.get("arguments"),
                )

            elif etype in ("response.output_item.added", "response.output_item.done"):
                # Issue #630: the output-item form of a function call carries the
                # authoritative `name` + `call_id` (and, on `.done`, the full
                # `arguments`). For non-function items (e.g. an audio message
                # item) this is benign housekeeping — fall through silently.
                item = event.get("item") or {}
                if isinstance(item, dict) and item.get("type") == "function_call":
                    call_id = item.get("call_id")
                    name = item.get("name")
                    if etype == "response.output_item.added":
                        # Shell arrives before args stream — record the name so a
                        # later delta/done can attach to it. Do not finalize yet.
                        self._note_tool_call_name(call_id or "", name)
                    else:
                        # `.done`: authoritative full call. Finalize (idempotent
                        # on call_id — merges with any streaming-args entry, AC6).
                        self._finalize_tool_call(
                            call_id, name=name, arguments=item.get("arguments")
                        )
                else:
                    logger.debug(
                        "OpenAIRealtimeAgentAdapter: ignoring non-function "
                        "output_item event %r",
                        etype,
                    )

            elif etype == "error":
                error_detail = event.get("error", {})
                msg = error_detail.get("message", str(error_detail))
                raise RuntimeError(
                    f"OpenAIRealtimeAgentAdapter: server error — {msg}"
                )

            else:
                # Housekeeping events — session.created, session.updated, etc. —
                # are benign. Log at DEBUG and keep the loop running.
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype
                )

    async def call(self, input: AgentInput) -> AgentReturnTypes:
        """Surface realtime tool calls alongside the spoken audio turn (#630).

        The base ``call()`` returns a single assistant audio message and does
        all the recording bookkeeping. We keep that intact and, when the agent
        called any tools this turn, append ONE extra assistant message carrying
        every tool call as OpenAI-chat ``tool_calls`` — the shape
        ``state.has_tool_call`` / ``state.last_tool_call`` consume.

        Returns:
            - the single audio message (dict) when no tools were called — byte
              identical to the base behaviour (AC8 regression), OR
            - ``[audio_message, tool_call_message]`` when ≥1 tool was called
              (AC1/AC9/AC10). ``convert_agent_return_types_to_openai_messages``
              passes a list of dicts through verbatim into result.messages.

        The completed-calls list is reset HERE (turn start) so tool calls never
        leak across turns; the function-call events for THIS turn are consumed
        inside the ``super().call()`` drain and finalized onto the list.
        """
        # Reset per-turn tool-call state so a prior turn's calls don't bleed
        # through (mirrors the transcript reset in _drain_agent_response).
        self._completed_tool_calls = []
        self._tool_call_accumulators = {}

        audio_message = await super().call(input)

        if not self._completed_tool_calls:
            return audio_message

        # One assistant message carrying ALL of this turn's tool calls — the
        # conventional OpenAI shape (one assistant turn, many tool_calls).
        tool_call_message: ChatCompletionMessageParam = cast(
            ChatCompletionMessageParam,
            {
                "role": "assistant",
                "content": None,
                "tool_calls": list(self._completed_tool_calls),
            },
        )
        return [cast(ChatCompletionMessageParam, audio_message), tool_call_message]

    async def _drain_agent_response(
        self, on_first_chunk=None
    ) -> "AudioChunk":
        """Override to surface the spoken transcript after draining.

        The base class drains recv_audio chunks until tail silence. After
        draining, ``response.output_audio_transcript.done`` will have already
        fired (it arrives before ``response.done``), so ``self.last_agent_transcript``
        is populated. We rebuild the merged chunk with ``transcript=`` set so
        ``create_audio_message`` attaches a text part to the assistant message.

        This puts the transcript in ``result.messages`` (AC2/AC5) without
        modifying messages.py, tts.py, or composable.py (AC6).

        The transcript text is an ordinary ``{"type":"text","text":...}`` part —
        no extra keys. Echo-safety is handled in ``_strip_audio_content``
        (user_simulator_agent.py) by detecting the structural pattern: an
        assistant message that carries both an ``input_audio`` part AND a ``text``
        part is a voiced agent turn, so the text is reframed as third-person
        context before ``reverse_roles`` runs (AC4/AC11).
        """
        from ..audio_chunk import AudioChunk as _AudioChunk

        # Clear transcript state at the start of each turn so a stale value
        # from a prior turn doesn't bleed through if this turn's event never
        # fires (AC8: degraded case — absent transcript → no text part).
        self.last_agent_transcript = None
        self._agent_transcript_buf = ""

        merged = await super()._drain_agent_response(on_first_chunk=on_first_chunk)

        transcript = self.last_agent_transcript
        if transcript:
            # Rebuild with transcript attached so create_audio_message adds
            # the text part. The original merged.data is unchanged.
            return _AudioChunk(data=merged.data, transcript=transcript)
        # No transcript (AC8): return the merge unchanged (audio-only).
        return merged

    async def send_text(self, text: str) -> None:
        """
        Inject scripted text into the realtime session as a user message.

        Used when this adapter is the user simulator (role=USER): scripted
        ``user("text")`` steps route through here instead of spawning TTS.
        The model synthesises the text into spoken audio with natural prosody,
        which is then delivered via ``recv_audio``.

        NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio
        messages retroactively; the downstream transcript reflects what the
        model actually emitted, not what was scripted.

        Raises:
            RuntimeError: if called before ``connect()``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # Create a user conversation item with the scripted text.
        await self._ws.send(
            json.dumps(
                {
                    "type": "conversation.item.create",
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": [{"type": "input_text", "text": text}],
                    },
                }
            )
        )
        # Prompt the model to generate audio output.
        await self._ws.send(json.dumps({"type": "response.create"}))
        logger.debug(
            "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60]
        )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var url : str
Expand source code
@property
def url(self) -> str:
    return REALTIME_URL_TEMPLATE.format(model=self.model)

Methods

async def call(self, input: AgentInput) ‑> str | openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam | List[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam] | ScenarioResult

Surface realtime tool calls alongside the spoken audio turn (#630).

The base call() returns a single assistant audio message and does all the recording bookkeeping. We keep that intact and, when the agent called any tools this turn, append ONE extra assistant message carrying every tool call as OpenAI-chat tool_calls — the shape state.has_tool_call / state.last_tool_call consume.

Returns

  • the single audio message (dict) when no tools were called — byte identical to the base behaviour (AC8 regression), OR
  • [audio_message, tool_call_message] when ≥1 tool was called (AC1/AC9/AC10). convert_agent_return_types_to_openai_messages passes a list of dicts through verbatim into result.messages. The completed-calls list is reset HERE (turn start) so tool calls never leak across turns; the function-call events for THIS turn are consumed inside the super().call() drain and finalized onto the list.
Expand source code
async def call(self, input: AgentInput) -> AgentReturnTypes:
    """Surface realtime tool calls alongside the spoken audio turn (#630).

    The base ``call()`` returns a single assistant audio message and does
    all the recording bookkeeping. We keep that intact and, when the agent
    called any tools this turn, append ONE extra assistant message carrying
    every tool call as OpenAI-chat ``tool_calls`` — the shape
    ``state.has_tool_call`` / ``state.last_tool_call`` consume.

    Returns:
        - the single audio message (dict) when no tools were called — byte
          identical to the base behaviour (AC8 regression), OR
        - ``[audio_message, tool_call_message]`` when ≥1 tool was called
          (AC1/AC9/AC10). ``convert_agent_return_types_to_openai_messages``
          passes a list of dicts through verbatim into result.messages.

    The completed-calls list is reset HERE (turn start) so tool calls never
    leak across turns; the function-call events for THIS turn are consumed
    inside the ``super().call()`` drain and finalized onto the list.
    """
    # Reset per-turn tool-call state so a prior turn's calls don't bleed
    # through (mirrors the transcript reset in _drain_agent_response).
    self._completed_tool_calls = []
    self._tool_call_accumulators = {}

    audio_message = await super().call(input)

    if not self._completed_tool_calls:
        return audio_message

    # One assistant message carrying ALL of this turn's tool calls — the
    # conventional OpenAI shape (one assistant turn, many tool_calls).
    tool_call_message: ChatCompletionMessageParam = cast(
        ChatCompletionMessageParam,
        {
            "role": "assistant",
            "content": None,
            "tool_calls": list(self._completed_tool_calls),
        },
    )
    return [cast(ChatCompletionMessageParam, audio_message), tool_call_message]
async def connect(self) ‑> None

Open the Realtime WebSocket and send the initial session.update.

Expand source code
async def connect(self) -> None:
    """Open the Realtime WebSocket and send the initial session.update."""
    import websockets

    self._ws = await websockets.connect(
        self.url,
        additional_headers={
            "Authorization": f"Bearer {self._api_key}",
        },
    )
    logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url)

    # Configure session: audio formats, voice, instructions, tools.
    # Disable server-side VAD (session.audio.input.turn_detection=None) so
    # we control turn boundaries explicitly via input_audio_buffer.commit +
    # response.create after each send_audio.
    session_config: dict[str, Any] = {
        "type": "realtime",
        "audio": {
            "input": {
                "format": {"type": "audio/pcm", "rate": 24000},
                "turn_detection": None,
                "transcription": {"model": OPENAI_STT_MODEL},
            },
            "output": {
                "format": {"type": "audio/pcm", "rate": 24000},
                "voice": self.voice,
            },
        },
    }
    if self.instructions:
        session_config["instructions"] = self.instructions
    if self.tools:
        session_config["tools"] = self.tools

    await self._ws.send(
        json.dumps({"type": "session.update", "session": session_config})
    )
    logger.debug("OpenAIRealtimeAgentAdapter: session.update sent")
async def disconnect(self) ‑> None

Close the WebSocket if open.

Expand source code
async def disconnect(self) -> None:
    """Close the WebSocket if open."""
    if self._ws is not None:
        try:
            await self._ws.close()
        except Exception:
            # Best-effort: connection may already be half-closed or in an
            # error state when disconnect() is called. We're tearing down
            # regardless — propagating here would just leak the WS reference.
            pass
        finally:
            self._ws = None
        logger.debug("OpenAIRealtimeAgentAdapter: disconnected")
async def interrupt(self) ‑> None

Send response.cancel — the OpenAI Realtime API's first-class interrupt. The model stops generating audio and text immediately. No timing race against VAD: deterministic stop, then the next user turn flows normally through send_audio + recv_audio.

Expand source code
async def interrupt(self) -> None:
    """Send ``response.cancel`` — the OpenAI Realtime API's first-class
    interrupt. The model stops generating audio and text immediately.
    No timing race against VAD: deterministic stop, then the next user
    turn flows normally through ``send_audio`` + ``recv_audio``.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
    await self._ws.send(json.dumps({"type": "response.cancel"}))
    logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)")
def notify_agent_turn(self) ‑> None

Signal that an agent turn is about to be dispatched.

Called by the executor before each agent step so recv_audio can fire a bare response.create for agent-initiated turns (where no user audio has been committed). This per-turn signal handles both turn 1 (opening) and subsequent agent turns in multi-turn scripts like [agent(), user(), agent()].

Only meaningful when role=AGENT. Safe to call on every agent step — recv_audio consumes and clears the flag.

Expand source code
def notify_agent_turn(self) -> None:
    """Signal that an agent turn is about to be dispatched.

    Called by the executor before each agent step so recv_audio can fire
    a bare response.create for agent-initiated turns (where no user audio
    has been committed). This per-turn signal handles both turn 1 (opening)
    and subsequent agent turns in multi-turn scripts like [agent(), user(),
    agent()].

    Only meaningful when role=AGENT. Safe to call on every agent step —
    recv_audio consumes and clears the flag.
    """
    if self.role == AgentRole.AGENT:
        self._agent_turn_pending = True
async def recv_audio(self, timeout: float) ‑> AudioChunk

Commit any pending audio, request a response, and return the first audio chunk the model produces.

If send_audio was called since the last recv_audio, this method commits the buffer and emits response.create before awaiting the reply. Subsequent recv calls without new send calls just await the next audio delta (for multi-chunk responses).

Loops over incoming events until a response.output_audio.delta event arrives (GA name), then returns decoded PCM16. The legacy response.audio.delta name is accepted defensively with a one-time warning. Transcript events update the instance's last_user_transcript / last_agent_transcript attributes. An error event raises a RuntimeError. All other housekeeping events are ignored and the loop continues.

Raises

asyncio.TimeoutError
if no audio arrives within timeout.
RuntimeError
if the server sends an error event.
Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """
    Commit any pending audio, request a response, and return the first
    audio chunk the model produces.

    If ``send_audio`` was called since the last ``recv_audio``, this
    method commits the buffer and emits ``response.create`` before
    awaiting the reply. Subsequent recv calls without new send calls
    just await the next audio delta (for multi-chunk responses).

    Loops over incoming events until a ``response.output_audio.delta``
    event arrives (GA name), then returns decoded PCM16. The legacy
    ``response.audio.delta`` name is accepted defensively with a one-time
    warning. Transcript events update the instance's
    ``last_user_transcript`` / ``last_agent_transcript`` attributes.
    An ``error`` event raises a ``RuntimeError``. All other housekeeping
    events are ignored and the loop continues.

    Raises:
        asyncio.TimeoutError: if no audio arrives within ``timeout``.
        RuntimeError: if the server sends an error event.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

    # If send_audio was called since last recv, commit and request response.
    if self._pending_audio_bytes > 0:
        await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
        await self._ws.send(json.dumps({"type": "response.create"}))
        self._pending_audio_bytes = 0
        self._agent_turn_pending = False  # user spoke → per-turn signal consumed

    # Gap 1: agent-speaks-first / multi-turn agent initiation.
    # When the executor signals an agent turn via notify_agent_turn() and
    # no user audio has been committed and no response is already in flight,
    # send a bare content-free response.create so the model speaks.
    # Opening words come from the session instructions — no text is injected.
    # Self-limiting on user-first scripts: if user audio was committed above,
    # _agent_turn_pending was already cleared.
    # Also serves as the clean drain exit: if a response already completed
    # (_response_active is False after response.done cleared it) and
    # _agent_turn_pending is False, a drain re-entry returns an empty chunk.
    elif self._agent_turn_pending and not self._response_active:
        await self._ws.send(json.dumps({"type": "response.create"}))
        self._agent_turn_pending = False
        self._response_active = True

    elif not self._agent_turn_pending and not self._response_active and self._response_ever_active:
        # No pending audio, no agent-turn signal, no response in flight,
        # AND at least one response has already completed this session:
        # this is a drain re-entry after a completed response. Return empty
        # chunk so _drain_agent_response's tail-silence loop exits cleanly.
        # Guard on _response_ever_active so that a fresh recv_audio call
        # (before any response.created fires) does NOT short-circuit —
        # that would break direct recv_audio callers (e.g. unit tests that
        # call recv_audio without going through notify_agent_turn).
        return AudioChunk(data=b"")

    deadline = asyncio.get_running_loop().time() + timeout
    while True:
        remaining = deadline - asyncio.get_running_loop().time()
        if remaining <= 0:
            raise asyncio.TimeoutError(
                "OpenAIRealtimeAgentAdapter: recv_audio timed out"
            )

        raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
        try:
            event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
        except Exception:
            logger.debug(
                "OpenAIRealtimeAgentAdapter: non-JSON message, skipping"
            )
            continue

        etype = event.get("type", "")

        if etype in ("response.output_audio.delta", "response.audio.delta"):
            # Accept both the GA event name and its retired beta alias —
            # live gpt-realtime* models have been observed still emitting
            # the beta names. These legacy arms should be removed once the
            # GA names are confirmed stable at a live endpoint (issue #602).
            self._warn_if_legacy(etype, "response.output_audio.delta")
            b64 = event.get("delta", "")
            pcm = base64.b64decode(b64)
            # Enforce PCM16 invariant: even byte count.
            if len(pcm) % 2 == 1:
                pcm = pcm[:-1]
            return AudioChunk(data=pcm)

        elif etype == "response.created":
            # Response is now in flight — mark it so subsequent recv_audio
            # drain re-entries don't fire a spurious second response.create.
            self._response_active = True
            self._response_ever_active = True

        elif etype in (
            "response.output_audio_transcript.delta",
            "response.audio_transcript.delta",
        ):
            # Accumulate streaming agent transcript.
            self._warn_if_legacy(etype, "response.output_audio_transcript.delta")
            self._agent_transcript_buf += event.get("delta", "")

        elif etype in (
            "response.output_audio_transcript.done",
            "response.audio_transcript.done",
        ):
            # Finalise; the `transcript` field may have the full text.
            self._warn_if_legacy(etype, "response.output_audio_transcript.done")
            transcript = event.get("transcript", "")
            if transcript:
                self.last_agent_transcript = transcript
            elif self._agent_transcript_buf:
                self.last_agent_transcript = self._agent_transcript_buf
            self._agent_transcript_buf = ""

        elif etype in ("response.done", "response.cancelled"):
            # Response finished or was cancelled — mark it so the next
            # drain re-entry returns an empty chunk (clean exit).
            self._response_active = False
            # Issue #646: a tool-only turn (function call, NO audio delta) would
            # otherwise loop here to the deadline and raise — the accumulated tool
            # call is parsed but never returned. When the response is done and at
            # least one tool call has been finalized this turn, return an empty
            # chunk so the drain exits cleanly and call() surfaces the tool_calls
            # message. A genuinely empty turn (done + EMPTY accumulator) must still
            # fall through to the timeout — the non-empty accumulator is the
            # discriminator, NOT response.done alone.
            # (Distinct from the drain-re-entry empty-chunk path above at the
            # _response_ever_active guard, which handles a COMPLETED prior response.)
            if self._completed_tool_calls:
                return AudioChunk(data=b"")

        elif etype == "conversation.item.input_audio_transcription.completed":
            # User-side transcript from Whisper.
            self.last_user_transcript = event.get("transcript", "")

        elif etype == "response.function_call_arguments.delta":
            # Issue #630: streaming arguments fragment for a function call.
            # Accumulate per call_id; the call is finalized on `.done` or
            # the function_call output-item event.
            self._accumulate_tool_call_delta(
                event.get("call_id", ""), event.get("delta", "")
            )

        elif etype == "response.function_call_arguments.done":
            # Issue #630: streaming-args path complete. `name` is typically
            # NOT on this event (it arrives via the output_item), so resolve
            # it from the accumulator. A missing call_id degrades safely.
            self._finalize_tool_call(
                event.get("call_id"),
                name=event.get("name"),
                arguments=event.get("arguments"),
            )

        elif etype in ("response.output_item.added", "response.output_item.done"):
            # Issue #630: the output-item form of a function call carries the
            # authoritative `name` + `call_id` (and, on `.done`, the full
            # `arguments`). For non-function items (e.g. an audio message
            # item) this is benign housekeeping — fall through silently.
            item = event.get("item") or {}
            if isinstance(item, dict) and item.get("type") == "function_call":
                call_id = item.get("call_id")
                name = item.get("name")
                if etype == "response.output_item.added":
                    # Shell arrives before args stream — record the name so a
                    # later delta/done can attach to it. Do not finalize yet.
                    self._note_tool_call_name(call_id or "", name)
                else:
                    # `.done`: authoritative full call. Finalize (idempotent
                    # on call_id — merges with any streaming-args entry, AC6).
                    self._finalize_tool_call(
                        call_id, name=name, arguments=item.get("arguments")
                    )
            else:
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: ignoring non-function "
                    "output_item event %r",
                    etype,
                )

        elif etype == "error":
            error_detail = event.get("error", {})
            msg = error_detail.get("message", str(error_detail))
            raise RuntimeError(
                f"OpenAIRealtimeAgentAdapter: server error — {msg}"
            )

        else:
            # Housekeeping events — session.created, session.updated, etc. —
            # are benign. Log at DEBUG and keep the loop running.
            logger.debug(
                "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype
            )
async def send_audio(self, chunk: AudioChunk) ‑> None

Append a PCM16 audio chunk to the model's input audio buffer.

Only emits input_audio_buffer.append — the commit + response are deferred to the next recv_audio call. The scenario executor may call send_audio many times for a single user turn (TTS streams audio as chunks); committing per-chunk would confuse the server with sub-second turn boundaries. By deferring commit to recv_audio, we get one server turn per user turn.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """
    Append a PCM16 audio chunk to the model's input audio buffer.

    Only emits ``input_audio_buffer.append`` — the commit + response are
    deferred to the next ``recv_audio`` call. The scenario executor may
    call ``send_audio`` many times for a single user turn (TTS streams
    audio as chunks); committing per-chunk would confuse the server with
    sub-second turn boundaries. By deferring commit to recv_audio, we
    get one server turn per user turn.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
    b64 = base64.b64encode(chunk.data).decode()
    await self._ws.send(
        json.dumps({"type": "input_audio_buffer.append", "audio": b64})
    )
    self._pending_audio_bytes += len(chunk.data)
async def send_text(self, text: str) ‑> None

Inject scripted text into the realtime session as a user message.

Used when this adapter is the user simulator (role=USER): scripted user("text") steps route through here instead of spawning TTS. The model synthesises the text into spoken audio with natural prosody, which is then delivered via recv_audio.

NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio messages retroactively; the downstream transcript reflects what the model actually emitted, not what was scripted.

Raises

RuntimeError
if called before connect().
Expand source code
async def send_text(self, text: str) -> None:
    """
    Inject scripted text into the realtime session as a user message.

    Used when this adapter is the user simulator (role=USER): scripted
    ``user("text")`` steps route through here instead of spawning TTS.
    The model synthesises the text into spoken audio with natural prosody,
    which is then delivered via ``recv_audio``.

    NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio
    messages retroactively; the downstream transcript reflects what the
    model actually emitted, not what was scripted.

    Raises:
        RuntimeError: if called before ``connect()``.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

    # Create a user conversation item with the scripted text.
    await self._ws.send(
        json.dumps(
            {
                "type": "conversation.item.create",
                "item": {
                    "type": "message",
                    "role": "user",
                    "content": [{"type": "input_text", "text": text}],
                },
            }
        )
    )
    # Prompt the model to generate audio output.
    await self._ws.send(json.dumps({"type": "response.create"}))
    logger.debug(
        "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60]
    )
class OpenAISTTProvider (model: str = 'gpt-4o-transcribe')

Default STT implementation using OpenAI's gpt-4o-transcribe model.

Chunks audio exceeding 25 minutes per request (API hard limit). Chunks are transcribed independently and concatenated with single spaces.

Expand source code
class OpenAISTTProvider(STTProvider):
    """
    Default STT implementation using OpenAI's ``gpt-4o-transcribe`` model.

    Chunks audio exceeding 25 minutes per request (API hard limit). Chunks are
    transcribed independently and concatenated with single spaces.
    """

    def __init__(self, model: str = OPENAI_STT_MODEL):
        self.model = model

    async def transcribe(self, audio: AudioChunk) -> str:
        if audio.duration_seconds <= OPENAI_TRANSCRIBE_LIMIT_SECONDS:
            return await self._transcribe_single(audio)

        # Chunk: split by sample count into <25min slices.
        samples_per_chunk = OPENAI_TRANSCRIBE_LIMIT_SECONDS * PCM16_SAMPLE_RATE
        bytes_per_chunk = samples_per_chunk * 2  # PCM16 = 2 bytes/sample
        parts: list[str] = []
        for i in range(0, len(audio.data), bytes_per_chunk):
            sub = AudioChunk(data=audio.data[i : i + bytes_per_chunk])
            parts.append(await self._transcribe_single(sub))
        return " ".join(p for p in parts if p)

    async def _transcribe_single(self, audio: AudioChunk) -> str:
        import io

        from openai import AsyncOpenAI

        from .messages import _pcm16_to_wav_bytes

        wav_bytes = _pcm16_to_wav_bytes(audio.data)
        client = AsyncOpenAI()
        buf = io.BytesIO(wav_bytes)
        buf.name = "audio.wav"
        resp = await client.audio.transcriptions.create(
            model=self.model,
            file=buf,
        )
        return getattr(resp, "text", "") or ""

Ancestors

Inherited members

class PipecatAgentAdapter (url: Optional[str] = None, *, signaling_url: Optional[str] = None, transport: "Literal['websocket', 'webrtc']" = 'websocket', audio_format: str = 'mulaw', sample_rate: int = 8000, stream_sid: Optional[str] = None, call_sid: Optional[str] = None)

Test a running Pipecat bot via its exposed WebSocket endpoint.

Transport is selected by the transport argument: - "websocket" (default): Twilio Media Streams protocol over WS. Scenario sends a synthetic start event, then media frames. Pipecat's TwilioFrameSerializer on the bot side handles the wire format. - "webrtc": SmallWebRTC-style negotiation. Raises PendingTransportError; tracked as a follow-up.

Expand source code
class PipecatAgentAdapter(VoiceAgentAdapter):
    """
    Test a running Pipecat bot via its exposed WebSocket endpoint.

    Transport is selected by the ``transport`` argument:
        - ``"websocket"`` (default): Twilio Media Streams protocol over WS.
          Scenario sends a synthetic ``start`` event, then ``media`` frames.
          Pipecat's ``TwilioFrameSerializer`` on the bot side handles the
          wire format.
        - ``"webrtc"``: SmallWebRTC-style negotiation. Raises
          ``PendingTransportError``; tracked as a follow-up.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # Pipecat over the Twilio WS transport speaks the Twilio Media Streams
        # protocol; the ``clear`` event drops all buffered outbound audio on
        # the bot side. That's first-class interrupt — no VAD timing race.
        interruption=True,
        input_formats=["pcm16/24000", "mulaw/8000", "opus"],
        output_formats=["pcm16/24000", "mulaw/8000", "opus"],
    )

    def __init__(
        self,
        url: Optional[str] = None,
        *,
        signaling_url: Optional[str] = None,
        transport: Literal["websocket", "webrtc"] = "websocket",
        audio_format: str = "mulaw",
        sample_rate: int = 8000,
        stream_sid: Optional[str] = None,
        call_sid: Optional[str] = None,
    ) -> None:
        super().__init__()
        if transport == "websocket" and url is None:
            raise ValueError("PipecatAgentAdapter(transport='websocket') requires url=")
        if transport == "webrtc" and signaling_url is None:
            raise ValueError("PipecatAgentAdapter(transport='webrtc') requires signaling_url=")

        self.url = url
        self.signaling_url = signaling_url
        self.transport = transport
        self.audio_format = audio_format
        self.sample_rate = sample_rate
        # Synthetic SIDs pipecat's TwilioFrameSerializer needs in the `start`
        # event. If caller doesn't supply them, we fabricate UUIDs. Pipecat
        # uses them for logging and the auto-hangup REST call; both are no-ops
        # when we're not actually going through Twilio.
        self.stream_sid = stream_sid
        self.call_sid = call_sid

        self._ws: Any = None
        self._recv_task: Optional[asyncio.Task] = None
        self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None
        # Serialises concurrent send_audio() calls — without it two paced
        # senders would interleave 20-ms mulaw frames on the wire and the
        # bot would receive corrupted audio. Used for the interruption case
        # where the executor calls send_audio() while a previous turn's
        # send is still in flight.
        self._send_lock: Optional[asyncio.Lock] = None

    @property
    def transport_format(self) -> str:
        return f"{self.audio_format}/{self.sample_rate}"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        if self.transport == "webrtc":
            from ._stub import PendingTransportError

            raise PendingTransportError(
                "PipecatAgentAdapter(transport='webrtc')"
            )

        # Lazy import so `import scenario` doesn't require websockets at the
        # top of the module-load path (it's already a hard dep, but being
        # consistent with the Twilio adapter style).
        import websockets

        assert self.url is not None  # validated in __init__
        self._ws = await websockets.connect(
            self.url, ping_interval=None, ping_timeout=None
        )
        self._inbound_queue = asyncio.Queue()
        self._send_lock = asyncio.Lock()

        # Send the synthetic `start` event that pipecat's TwilioFrameSerializer
        # requires to learn the stream/call SIDs and start deserializing
        # media frames.
        if self.stream_sid is None:
            self.stream_sid = f"MZ{uuid.uuid4().hex}"
        if self.call_sid is None:
            self.call_sid = f"CA{uuid.uuid4().hex}"

        await self._ws.send(json.dumps({"event": "connected", "protocol": "Call", "version": "1.0.0"}))
        await self._ws.send(
            json.dumps(
                {
                    "event": "start",
                    "streamSid": self.stream_sid,
                    "start": {
                        "streamSid": self.stream_sid,
                        "callSid": self.call_sid,
                        "mediaFormat": {
                            "encoding": "audio/x-mulaw",
                            "sampleRate": 8000,
                            "channels": 1,
                        },
                    },
                }
            )
        )

        self._recv_task = asyncio.create_task(self._recv_loop())
        logger.debug("PipecatAgentAdapter: connected to %s (stream=%s)", self.url, self.stream_sid)

    async def disconnect(self) -> None:
        ws = self._ws
        if ws is None:
            return

        # Send `stop` event so the bot can clean up its pipeline gracefully.
        try:
            if self.stream_sid:
                await ws.send(json.dumps({"event": "stop", "streamSid": self.stream_sid}))
        except Exception:
            logger.debug("PipecatAgentAdapter: failed to send stop frame", exc_info=True)

        if self._recv_task is not None:
            self._recv_task.cancel()
            try:
                await self._recv_task
            except asyncio.CancelledError:
                # Expected: we just cancelled it.
                pass
            except Exception:
                # Unexpected teardown error — already logging enough context
                # elsewhere; disconnect() is best-effort.
                logger.debug("PipecatAgentAdapter: recv_task raised during cancel", exc_info=True)
            self._recv_task = None

        try:
            await ws.close()
        except Exception:
            # WS may already be closed by the peer; disconnect() is best-effort.
            logger.debug("PipecatAgentAdapter: ws.close() raised", exc_info=True)

        self._ws = None
        self._inbound_queue = None
        self.stream_sid = None
        self.call_sid = None

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        # Pace at real-time (TWILIO_FRAME_MS/1000s per 20-ms frame). Matches what
        # a real caller produces over a PSTN line — the SUT sees normal speech
        # rhythm, not a synthetic dump.
        #
        # After the last frame we send a Twilio ``mark`` named "utterance_end".
        # Real-time pacing means TTS-induced inter-phrase pauses survive on the
        # wire, and a stateless inactivity-timer on the receiver can't
        # distinguish "speaker paused after a comma" from "speaker finished
        # their turn." The mark is an explicit, non-ambiguous end-of-turn
        # signal: cooperating SUTs flush on the mark; legacy SUTs fall back to
        # VAD timing.
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None and self._send_lock is not None
        mulaw = pcm16_24k_to_mulaw8k(chunk.data)
        frame_secs = TWILIO_FRAME_MS / 1000
        async with self._send_lock:
            for frame in iter_mulaw_frames(mulaw):
                if not frame:
                    continue
                await self._ws.send(build_media_frame(self.stream_sid, frame))
                await asyncio.sleep(frame_secs)
            await self._ws.send(build_mark_frame(self.stream_sid, "utterance_end"))

    async def recv_audio(self, timeout: float) -> AudioChunk:
        self._assert_connected()
        assert self._inbound_queue is not None
        return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout)

    async def interrupt(self) -> None:
        """Send a Twilio ``clear`` frame — the bot drops all buffered outbound
        audio immediately. Cooperating Pipecat bots (and any code wired to
        the Media Streams protocol) treat ``clear`` as "stop talking now."
        Use this in preference to timing-based barge-in when the SUT
        supports it: it's deterministic, doesn't depend on VAD detection
        windows, and matches the same protocol used in production.
        """
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None
        await self._ws.send(build_clear_frame(self.stream_sid))
        logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")

    # ------------------------------------------------------------------ background

    async def _recv_loop(self) -> None:
        """Read frames from pipecat, decode µ-law → PCM16 24k, enqueue."""
        assert self._ws is not None and self._inbound_queue is not None
        buffered_mulaw = bytearray()
        BATCH_MS = 100

        try:
            async for raw in self._ws:
                if isinstance(raw, bytes):
                    # pipecat sometimes emits binary frames for audio; treat
                    # as raw µ-law payload if we see one.
                    buffered_mulaw.extend(raw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    continue

                frame = parse_media_stream_frame(raw)
                if frame is None:
                    continue
                if frame.event == "media" and frame.payload_mulaw:
                    buffered_mulaw.extend(frame.payload_mulaw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                elif frame.event == "stop":
                    if buffered_mulaw:
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    return
        except asyncio.CancelledError:
            raise
        except Exception:
            logger.warning("PipecatAgentAdapter: recv loop exited with error", exc_info=True)

    # ------------------------------------------------------------------ assertions

    def _assert_connected(self) -> None:
        if self._ws is None:
            raise RuntimeError(
                "PipecatAgentAdapter: not connected. Did you forget to call connect()?"
            )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var transport_format : str
Expand source code
@property
def transport_format(self) -> str:
    return f"{self.audio_format}/{self.sample_rate}"

Methods

async def interrupt(self) ‑> None

Send a Twilio clear frame — the bot drops all buffered outbound audio immediately. Cooperating Pipecat bots (and any code wired to the Media Streams protocol) treat clear as "stop talking now." Use this in preference to timing-based barge-in when the SUT supports it: it's deterministic, doesn't depend on VAD detection windows, and matches the same protocol used in production.

Expand source code
async def interrupt(self) -> None:
    """Send a Twilio ``clear`` frame — the bot drops all buffered outbound
    audio immediately. Cooperating Pipecat bots (and any code wired to
    the Media Streams protocol) treat ``clear`` as "stop talking now."
    Use this in preference to timing-based barge-in when the SUT
    supports it: it's deterministic, doesn't depend on VAD detection
    windows, and matches the same protocol used in production.
    """
    self._assert_connected()
    assert self._ws is not None and self.stream_sid is not None
    await self._ws.send(build_clear_frame(self.stream_sid))
    logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")

Inherited members

class STTProvider

Abstract base for speech-to-text providers.

Expand source code
class STTProvider(ABC):
    """Abstract base for speech-to-text providers."""

    @abstractmethod
    async def transcribe(self, audio: AudioChunk) -> str:
        """Return a text transcript of the audio chunk."""

Ancestors

  • abc.ABC

Subclasses

Methods

async def transcribe(self, audio: AudioChunk) ‑> str

Return a text transcript of the audio chunk.

Expand source code
@abstractmethod
async def transcribe(self, audio: AudioChunk) -> str:
    """Return a text transcript of the audio chunk."""
class TwilioAgentAdapter (*, account_sid: str, auth_token: str, phone_number: str, public_base_url: Optional[str] = None, allowed_callers: Optional[list[str]] = None, on_dtmf: Optional[Callable[[str], None]] = None, http_port: int = 8765, role: AgentRole = AgentRole.AGENT, validate_signature: bool = True)

Bidirectional Twilio Media Streams adapter.

Same class, same state, either direction:

adapter = TwilioAgentAdapter(
    account_sid=..., auth_token=...,
    phone_number="+14155551234",
)
async with adapter:                   # connect() / disconnect()
    await adapter.place_call(to="+14155557777")  # OR wait_for_call()
    # ... scenario.run(...) feeds send_audio / recv_audio ...

The adapter is the only adapter with dtmf=True. DTMF events received from the callee surface via the on_dtmf callback set at construction time. To send DTMF, use send_dtmf().

interrupt(after_words=N) raises UnsupportedCapabilityError on this adapter — Media Streams delivers raw audio without incremental transcripts. Use interrupt(after=seconds) instead.

Expand source code
class TwilioAgentAdapter(VoiceAgentAdapter):
    """
    Bidirectional Twilio Media Streams adapter.

    Same class, same state, either direction:

        adapter = TwilioAgentAdapter(
            account_sid=..., auth_token=...,
            phone_number="+14155551234",
        )
        async with adapter:                   # connect() / disconnect()
            await adapter.place_call(to="+14155557777")  # OR wait_for_call()
            # ... scenario.run(...) feeds send_audio / recv_audio ...

    The adapter is the *only* adapter with ``dtmf=True``. DTMF events
    received from the callee surface via the ``on_dtmf`` callback set at
    construction time. To send DTMF, use ``send_dtmf()``.

    ``interrupt(after_words=N)`` raises ``UnsupportedCapabilityError`` on this
    adapter — Media Streams delivers raw audio without incremental
    transcripts. Use ``interrupt(after=seconds)`` instead.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=False,
        native_vad=False,
        dtmf=True,
        # Twilio Media Streams ``clear`` event drops all buffered outbound
        # audio. Used by ``adapter.interrupt()`` already wired below.
        interruption=True,
        input_formats=["mulaw/8000"],
        output_formats=["mulaw/8000"],
    )

    # ------------------------------------------------------------------ init

    def __init__(
        self,
        *,
        account_sid: str,
        auth_token: str,
        phone_number: str,
        public_base_url: Optional[str] = None,
        allowed_callers: Optional[list[str]] = None,
        on_dtmf: Optional[Callable[[str], None]] = None,
        http_port: int = 8765,
        role: AgentRole = AgentRole.AGENT,
        validate_signature: bool = True,
    ) -> None:
        super().__init__()
        validate_e164(phone_number)

        self.account_sid = account_sid
        self.auth_token = auth_token
        self.phone_number = phone_number
        self.public_base_url = public_base_url
        self.allowed_callers = set(allowed_callers) if allowed_callers else None
        self.on_dtmf = on_dtmf
        self.http_port = http_port
        self.role = role  # type: ignore[misc]
        # When True (default), the inbound /twilio/voice route requires a
        # valid X-Twilio-Signature header before accepting the webhook
        # body. The cloudflared tunnel URL is ephemeral and not
        # guessable, but anyone who learns it could otherwise POST fake
        # Twilio webhook events into the harness. Tests that don't have
        # real Twilio credentials disable this with
        # ``validate_signature=False``; production callers should leave
        # it on.
        self.validate_signature = validate_signature
        if not validate_signature:
            logger.warning(
                "TwilioAgentAdapter: validate_signature=False — inbound "
                "webhooks accept any payload without signature checks. "
                "Use only in tests; do not deploy to production."
            )

        # Populated during connect(); None when disconnected.
        self._rest: Optional[TwilioRESTHelper] = None
        self._phone_number_sid: Optional[str] = None
        self._prior_voice_url: Optional[str] = None
        # Set by place_call() when it rewrites the callee's voice_url so
        # B-leg's webhook lands on our harness. Restored in disconnect.
        self._callee_phone_number_sid: Optional[str] = None
        self._prior_callee_voice_url: Optional[str] = None
        # Set by the first of wait_for_call()/place_call(); subsequent calls to
        # the other method raise. "idle" after connect() before either fires.
        self._mode: TwilioAdapterMode = "idle"

        # Server / media stream state.
        self._server_task: Optional[asyncio.Task] = None
        self._server_shutdown: Optional[asyncio.Event] = None
        self._call_sid: Optional[str] = None
        self._stream_sid: Optional[str] = None
        self._stream_connected: Optional[asyncio.Event] = None
        self._stream_ws: Any = None  # starlette WebSocket
        self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None


    # ------------------------------------------------------------------ repr

    def __repr__(self) -> str:  # redact credentials
        return (
            f"TwilioAgentAdapter("
            f"phone_number={self.phone_number!r}, "
            f"account_sid='***', auth_token='***', "
            f"public_base_url={self.public_base_url!r})"
        )

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Resolve number SID and start the FastAPI webhook + WS server.

        Does NOT modify the Twilio account's ``voice_url``. That side-effect
        only happens when ``wait_for_call()`` is invoked — callers (who will
        use ``place_call()``) never overwrite their number's inbound webhook,
        which makes caller-mode adapters safe to run against a shared pool of
        Twilio numbers without clobbering anyone's prod webhook.

        Idempotent: calling connect() on an already-connected adapter is a
        no-op. This lets the scenario executor's auto-connect step
        (``_voice_connect_all``) coexist with explicit harness-driven
        connects (``TwilioHarness`` ``__aenter__``) that already brought
        the adapter up before scenario.run() was called.
        """
        if self._rest is not None:
            return

        if self.public_base_url is None:
            raise RuntimeError(
                "TwilioAgentAdapter: public_base_url is required. Wrap the "
                "adapter in scenario.voice.testing.TwilioHarness, or supply "
                "a stable public HTTPS URL that routes to this machine."
            )

        self._rest = TwilioRESTHelper(self.account_sid, self.auth_token)
        self._phone_number_sid = self._rest.resolve_phone_number_sid(self.phone_number)

        self._stream_connected = asyncio.Event()
        self._inbound_queue = asyncio.Queue()
        self._server_shutdown = asyncio.Event()
        self._mode = "idle"

        # Webhook server is its own unit — see _twilio_server.py. The
        # adapter only orchestrates lifecycle; the routes, signature
        # validation, and WS framing live in TwilioWebhookServer.
        from ._twilio_server import TwilioWebhookServer
        self._webhook_server: Optional[TwilioWebhookServer] = TwilioWebhookServer(self)
        self._server_task = asyncio.create_task(self._run_server())
        # Give uvicorn a beat to bind the port before Twilio hits it.
        await asyncio.sleep(0.2)

    async def disconnect(self) -> None:
        """Restore prior voice_url (answer mode only), tear down server.

        Best-effort on errors. In caller mode we never touched the Twilio
        number's voice_url, so there's nothing to restore.
        """
        if self._rest is None:
            return

        # 1. Restore webhook first so Twilio doesn't keep hitting a dead URL.
        if self._mode == "answer" and self._phone_number_sid is not None:
            with suppress(Exception):
                prior = self._prior_voice_url or ""
                self._rest.write_voice_url(self._phone_number_sid, prior)
                logger.debug(
                    "TwilioAgentAdapter: restored voice_url=%r on %s",
                    prior,
                    self._phone_number_sid,
                )
        # place_call() rewrites the CALLEE's voice_url to attach Media
        # Streams to B-leg. Restore that too.
        if self._mode == "call" and self._callee_phone_number_sid is not None:
            with suppress(Exception):
                prior_b = self._prior_callee_voice_url or ""
                self._rest.write_voice_url(self._callee_phone_number_sid, prior_b)
                logger.debug(
                    "TwilioAgentAdapter: restored callee voice_url=%r on %s",
                    prior_b,
                    self._callee_phone_number_sid,
                )

        # 2. Signal server to shut down, then wait for the task.
        if self._server_shutdown is not None:
            self._server_shutdown.set()
        if self._server_task is not None:
            with suppress(Exception):
                await asyncio.wait_for(self._server_task, timeout=3.0)

        # 3. Reset state.
        self._rest = None
        self._phone_number_sid = None
        self._prior_voice_url = None
        self._callee_phone_number_sid = None
        self._prior_callee_voice_url = None
        self._mode = "idle"
        self._server_task = None
        self._server_shutdown = None
        self._call_sid = None
        self._stream_sid = None
        self._stream_connected = None
        self._stream_ws = None
        self._inbound_queue = None

    # ------------------------------------------------------------------ direction

    async def place_call(
        self,
        to: str,
        *,
        timeout: float = 120.0,
        attach_stream_to_self: bool = True,
    ) -> None:
        """
        Originate an outbound call from this adapter's Twilio number to ``to``.

        Twilio's REST ``Calls.create`` runs TwiML on TWO legs of the
        resulting call:

        - **A-leg** (the originator, ``from_=self.phone_number``): runs the
          inline TwiML passed via ``twiml=`` to ``Calls.create``. We use
          ``<Pause length=120>`` so the originator just holds the bridge
          open while the demo runs.
        - **B-leg** (the callee, ``to=``): when Twilio dials B and B picks
          up, B's number's ``voice_url`` fires — that's where the bridge's
          Media Streams attach. This is identical to the inbound demo's
          flow: B's voice_url returns ``<Connect><Stream>``, the WS opens,
          audio flows.

        So ``place_call`` only makes sense when ``to`` is another Twilio
        number on this account whose ``voice_url`` is set to OUR harness
        webhook. To make that wiring automatic, ``place_call`` temporarily
        rewrites B's ``voice_url`` for the duration of the call and
        restores it on ``disconnect``. The harness on this adapter's own
        number does NOT need to be answer-mode — the Stream attaches to
        B's leg, NOT this adapter's leg, but B's webhook is hosted on this
        adapter's local server, so the WS still lands here.

        The bidirectional audio model is unchanged: ``send_audio`` writes
        frames over the WS (B hears them and bridges to A), ``recv_audio``
        reads inbound frames off the WS (whatever the bridge mixes from
        both legs).

        Limitation: ``to`` MUST be a phone number on this same Twilio
        account. Calling an external PSTN endpoint (a real cell phone)
        requires a different topology (``<Start><Stream>`` + ``<Dial>``)
        which we don't implement here because the inline TwiML route on
        the A-leg can't capture B's audio when B is external.

        Default timeout 120s covers cloudflared cold-start latency.

        Raises:
            RuntimeError: If called after ``wait_for_call()`` (modes are
                exclusive per adapter instance), or if ``to`` is not a
                Twilio number on this account.
            ValueError: If ``to`` is not in E.164 format.
            asyncio.TimeoutError: If the media stream doesn't open within
                ``timeout`` seconds.
        """
        self._assert_connected()
        self._enter_mode("call")
        validate_e164(to)

        assert self.public_base_url is not None
        assert self._rest is not None
        assert self._stream_connected is not None

        if attach_stream_to_self:
            # Resolve B-leg's number SID and snapshot+rewrite its voice_url so
            # B's leg attaches its Media Stream to our harness webhook. We own
            # this number (same Twilio account); disconnect() will restore.
            self._callee_phone_number_sid = self._rest.resolve_phone_number_sid(to)
            self._prior_callee_voice_url = self._rest.read_voice_url(
                self._callee_phone_number_sid
            )
            webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
            self._rest.write_voice_url(self._callee_phone_number_sid, webhook_url)
            logger.info(
                "TwilioAgentAdapter: rewrote callee %s voice_url to %s",
                _redact_e164(to),
                webhook_url,
            )

        # A-leg TwiML: play a short deterministic <Say> line, then hold
        # the bridge open. Twilio runs this on the originator side while
        # B's webhook attaches the Media Stream.
        #
        # The <Say> gives the recording a known-good utterance to
        # transcribe. A bare <Pause> alone produces 120s of line silence
        # that Whisper has been observed to hallucinate as non-English
        # text (issue #465 in this PR). The Say is a one-time anchor at
        # call setup; the Media Stream carries the real bidirectional
        # conversation that follows.
        inline_a_leg_twiml = (
            '<?xml version="1.0" encoding="UTF-8"?>'
            "<Response>"
            f'<Say voice="Polly.Joanna">{PLACE_CALL_A_LEG_SAY_TEXT}</Say>'
            '<Pause length="120"/>'
            "</Response>"
        )
        self._call_sid = self._rest.place_call(
            to=to, from_=self.phone_number, twiml=inline_a_leg_twiml
        )
        logger.info(
            "TwilioAgentAdapter: placed call %s from %s to %s",
            self._call_sid,
            _redact_e164(self.phone_number),
            _redact_e164(to),
        )

        if attach_stream_to_self:
            # Wait for OUR webhook to fire — only meaningful when we rewrote
            # the callee's voice_url to point at us. In originator-only mode
            # (attach_stream_to_self=False), there's no stream coming to us;
            # the callee has its own harness which owns the stream.
            await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)

    async def wait_for_call(self, timeout: float = 120.0) -> None:
        """
        Block until someone dials in and the media stream is live.

        In **default mode** (no conference_room): the number's ``voice_url``
        is overwritten to point at our webhook so inbound calls reach us.
        Caller (``place_call``) elsewhere will dial this number.

        In **conference mode**: there's no inbound call to wait for — the
        two-Twilio-number demo can't naturally have one side receive an
        inbound call when both legs need conference TwiML. Instead, we
        ORIGINATE an outbound call FROM this adapter's number TO itself
        with inline TwiML that opens the capture stream and dials into
        the shared conference room. This is the symmetric counterpart to
        ``place_call`` in conference mode — both adapters end up as
        participants in the same room, exchanging audio via the bridge.

        Default timeout 120s covers cloudflared cold-start, conference-room
        formation, and Twilio's webhook ramp.

        Raises:
            RuntimeError: If called after ``place_call()``.
            asyncio.TimeoutError: If nobody dials in within ``timeout``.
        """
        self._assert_connected()
        self._enter_mode("answer")

        assert self.public_base_url is not None
        assert self._rest is not None
        assert self._phone_number_sid is not None
        assert self._stream_connected is not None

        # Snapshot the prior webhook so we can restore it on disconnect, then
        # point the number at our server. Only answer mode does this.
        self._prior_voice_url = self._rest.read_voice_url(self._phone_number_sid)
        webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
        self._rest.write_voice_url(self._phone_number_sid, webhook_url)
        logger.info("TwilioAgentAdapter: webhook set to %s", webhook_url)

        await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)

    def _enter_mode(self, mode: TwilioAdapterMode) -> None:
        """Transition idle → mode, or raise if already in a different mode.

        Modes are exclusive per connected session: an adapter can place a call
        or answer a call, not both. Disconnect + reconnect to reuse the
        instance in the other direction.
        """
        if self._mode == mode:
            return  # idempotent re-entry (e.g. retrying place_call after timeout)
        if self._mode != "idle":
            raise RuntimeError(
                f"TwilioAgentAdapter: already in {self._mode!r} mode; cannot "
                f"switch to {mode!r}. Disconnect and reconnect to reuse this "
                f"adapter in the other direction."
            )
        self._mode = mode

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        # Pace at real-time (one frame per TWILIO_FRAME_MS). Without pacing the
        # whole utterance arrives in milliseconds, which trips bots' VAD into
        # a clipped-utterance reading.
        self._assert_stream_live()

        ws = self._stream_ws
        stream_sid = self._stream_sid
        assert ws is not None and stream_sid is not None

        mulaw = pcm16_24k_to_mulaw8k(chunk.data)
        frame_secs = TWILIO_FRAME_MS / 1000
        for frame in iter_mulaw_frames(mulaw):
            if not frame:
                continue
            await ws.send_text(build_media_frame(stream_sid, frame))
            await asyncio.sleep(frame_secs)

    async def recv_audio(self, timeout: float) -> AudioChunk:
        self._assert_stream_live()
        assert self._inbound_queue is not None
        return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout)

    async def send_dtmf(self, tones: str) -> None:
        """Send DTMF digits on the live call (uses Twilio REST ``<Play digits>``)."""
        if self._rest is None or self._call_sid is None:
            raise RuntimeError("TwilioAgentAdapter: no active call; send_dtmf requires an in-progress call")
        # Run blocking REST call off-thread so we don't stall the event loop.
        await asyncio.to_thread(self._rest.send_dtmf_on_call, self._call_sid, tones)

    async def interrupt(self) -> None:
        """Drop any buffered outbound audio on Twilio's side (``clear`` event)."""
        self._assert_stream_live()
        ws = self._stream_ws
        stream_sid = self._stream_sid
        assert ws is not None and stream_sid is not None
        await ws.send_text(build_clear_frame(stream_sid))

    # ------------------------------------------------------------------ server

    async def _run_server(self) -> None:
        """Thin lifecycle wrapper; the real work lives in TwilioWebhookServer."""
        assert self._webhook_server is not None
        await self._webhook_server.run()

    def _build_app(self) -> Any:
        """Test seam: build the FastAPI app for in-process exercise.

        Production code never calls this — the server's ``run()`` builds
        the app itself when uvicorn starts. Existing unit tests use
        ``TestClient(_build_app())`` to exercise the routes without
        binding a port; the delegation keeps that test surface stable.
        """
        assert self._webhook_server is not None
        return self._webhook_server.build_app()

    async def _media_stream_loop(self, ws: Any) -> None:
        """Test seam: kick off the Media Streams WS loop directly.

        The two-adapter-bridge test in
        ``tests/voice/test_twilio_two_adapter_bridge.py`` uses this to
        drive a loopback WS without going through the FastAPI route
        wrapper. Production code reaches the loop via the ``/twilio/stream``
        WebSocket handler defined in ``_twilio_server.build_app``.
        """
        assert self._webhook_server is not None
        await self._webhook_server.media_stream_loop(ws)

    # ------------------------------------------------------------------ assertions

    def _assert_connected(self) -> None:
        if self._rest is None:
            raise RuntimeError("TwilioAgentAdapter: not connected; call connect() or use `async with`.")

    def _assert_stream_live(self) -> None:
        self._assert_connected()
        if self._stream_ws is None or self._stream_sid is None:
            raise RuntimeError(
                "TwilioAgentAdapter: no live media stream. Call place_call() or "
                "wait_for_call() first."
            )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Methods

async def connect(self) ‑> None

Resolve number SID and start the FastAPI webhook + WS server.

Does NOT modify the Twilio account's voice_url. That side-effect only happens when wait_for_call() is invoked — callers (who will use place_call()) never overwrite their number's inbound webhook, which makes caller-mode adapters safe to run against a shared pool of Twilio numbers without clobbering anyone's prod webhook.

Idempotent: calling connect() on an already-connected adapter is a no-op. This lets the scenario executor's auto-connect step (_voice_connect_all) coexist with explicit harness-driven connects (TwilioHarness __aenter__) that already brought the adapter up before scenario.run() was called.

Expand source code
async def connect(self) -> None:
    """Resolve number SID and start the FastAPI webhook + WS server.

    Does NOT modify the Twilio account's ``voice_url``. That side-effect
    only happens when ``wait_for_call()`` is invoked — callers (who will
    use ``place_call()``) never overwrite their number's inbound webhook,
    which makes caller-mode adapters safe to run against a shared pool of
    Twilio numbers without clobbering anyone's prod webhook.

    Idempotent: calling connect() on an already-connected adapter is a
    no-op. This lets the scenario executor's auto-connect step
    (``_voice_connect_all``) coexist with explicit harness-driven
    connects (``TwilioHarness`` ``__aenter__``) that already brought
    the adapter up before scenario.run() was called.
    """
    if self._rest is not None:
        return

    if self.public_base_url is None:
        raise RuntimeError(
            "TwilioAgentAdapter: public_base_url is required. Wrap the "
            "adapter in scenario.voice.testing.TwilioHarness, or supply "
            "a stable public HTTPS URL that routes to this machine."
        )

    self._rest = TwilioRESTHelper(self.account_sid, self.auth_token)
    self._phone_number_sid = self._rest.resolve_phone_number_sid(self.phone_number)

    self._stream_connected = asyncio.Event()
    self._inbound_queue = asyncio.Queue()
    self._server_shutdown = asyncio.Event()
    self._mode = "idle"

    # Webhook server is its own unit — see _twilio_server.py. The
    # adapter only orchestrates lifecycle; the routes, signature
    # validation, and WS framing live in TwilioWebhookServer.
    from ._twilio_server import TwilioWebhookServer
    self._webhook_server: Optional[TwilioWebhookServer] = TwilioWebhookServer(self)
    self._server_task = asyncio.create_task(self._run_server())
    # Give uvicorn a beat to bind the port before Twilio hits it.
    await asyncio.sleep(0.2)
async def disconnect(self) ‑> None

Restore prior voice_url (answer mode only), tear down server.

Best-effort on errors. In caller mode we never touched the Twilio number's voice_url, so there's nothing to restore.

Expand source code
async def disconnect(self) -> None:
    """Restore prior voice_url (answer mode only), tear down server.

    Best-effort on errors. In caller mode we never touched the Twilio
    number's voice_url, so there's nothing to restore.
    """
    if self._rest is None:
        return

    # 1. Restore webhook first so Twilio doesn't keep hitting a dead URL.
    if self._mode == "answer" and self._phone_number_sid is not None:
        with suppress(Exception):
            prior = self._prior_voice_url or ""
            self._rest.write_voice_url(self._phone_number_sid, prior)
            logger.debug(
                "TwilioAgentAdapter: restored voice_url=%r on %s",
                prior,
                self._phone_number_sid,
            )
    # place_call() rewrites the CALLEE's voice_url to attach Media
    # Streams to B-leg. Restore that too.
    if self._mode == "call" and self._callee_phone_number_sid is not None:
        with suppress(Exception):
            prior_b = self._prior_callee_voice_url or ""
            self._rest.write_voice_url(self._callee_phone_number_sid, prior_b)
            logger.debug(
                "TwilioAgentAdapter: restored callee voice_url=%r on %s",
                prior_b,
                self._callee_phone_number_sid,
            )

    # 2. Signal server to shut down, then wait for the task.
    if self._server_shutdown is not None:
        self._server_shutdown.set()
    if self._server_task is not None:
        with suppress(Exception):
            await asyncio.wait_for(self._server_task, timeout=3.0)

    # 3. Reset state.
    self._rest = None
    self._phone_number_sid = None
    self._prior_voice_url = None
    self._callee_phone_number_sid = None
    self._prior_callee_voice_url = None
    self._mode = "idle"
    self._server_task = None
    self._server_shutdown = None
    self._call_sid = None
    self._stream_sid = None
    self._stream_connected = None
    self._stream_ws = None
    self._inbound_queue = None
async def interrupt(self) ‑> None

Drop any buffered outbound audio on Twilio's side (clear event).

Expand source code
async def interrupt(self) -> None:
    """Drop any buffered outbound audio on Twilio's side (``clear`` event)."""
    self._assert_stream_live()
    ws = self._stream_ws
    stream_sid = self._stream_sid
    assert ws is not None and stream_sid is not None
    await ws.send_text(build_clear_frame(stream_sid))
async def place_call(self, to: str, *, timeout: float = 120.0, attach_stream_to_self: bool = True) ‑> None

Originate an outbound call from this adapter's Twilio number to to.

Twilio's REST Calls.create runs TwiML on TWO legs of the resulting call:

  • A-leg (the originator, from_=self.phone_number): runs the inline TwiML passed via twiml= to Calls.create. We use <Pause length=120> so the originator just holds the bridge open while the demo runs.
  • B-leg (the callee, to=): when Twilio dials B and B picks up, B's number's voice_url fires — that's where the bridge's Media Streams attach. This is identical to the inbound demo's flow: B's voice_url returns <Connect><Stream>, the WS opens, audio flows.

So place_call only makes sense when to is another Twilio number on this account whose voice_url is set to OUR harness webhook. To make that wiring automatic, place_call temporarily rewrites B's voice_url for the duration of the call and restores it on disconnect. The harness on this adapter's own number does NOT need to be answer-mode — the Stream attaches to B's leg, NOT this adapter's leg, but B's webhook is hosted on this adapter's local server, so the WS still lands here.

The bidirectional audio model is unchanged: send_audio writes frames over the WS (B hears them and bridges to A), recv_audio reads inbound frames off the WS (whatever the bridge mixes from both legs).

Limitation: to MUST be a phone number on this same Twilio account. Calling an external PSTN endpoint (a real cell phone) requires a different topology (<Start><Stream> + <Dial>) which we don't implement here because the inline TwiML route on the A-leg can't capture B's audio when B is external.

Default timeout 120s covers cloudflared cold-start latency.

Raises

RuntimeError
If called after wait_for_call() (modes are exclusive per adapter instance), or if to is not a Twilio number on this account.
ValueError
If to is not in E.164 format.
asyncio.TimeoutError
If the media stream doesn't open within timeout seconds.
Expand source code
async def place_call(
    self,
    to: str,
    *,
    timeout: float = 120.0,
    attach_stream_to_self: bool = True,
) -> None:
    """
    Originate an outbound call from this adapter's Twilio number to ``to``.

    Twilio's REST ``Calls.create`` runs TwiML on TWO legs of the
    resulting call:

    - **A-leg** (the originator, ``from_=self.phone_number``): runs the
      inline TwiML passed via ``twiml=`` to ``Calls.create``. We use
      ``<Pause length=120>`` so the originator just holds the bridge
      open while the demo runs.
    - **B-leg** (the callee, ``to=``): when Twilio dials B and B picks
      up, B's number's ``voice_url`` fires — that's where the bridge's
      Media Streams attach. This is identical to the inbound demo's
      flow: B's voice_url returns ``<Connect><Stream>``, the WS opens,
      audio flows.

    So ``place_call`` only makes sense when ``to`` is another Twilio
    number on this account whose ``voice_url`` is set to OUR harness
    webhook. To make that wiring automatic, ``place_call`` temporarily
    rewrites B's ``voice_url`` for the duration of the call and
    restores it on ``disconnect``. The harness on this adapter's own
    number does NOT need to be answer-mode — the Stream attaches to
    B's leg, NOT this adapter's leg, but B's webhook is hosted on this
    adapter's local server, so the WS still lands here.

    The bidirectional audio model is unchanged: ``send_audio`` writes
    frames over the WS (B hears them and bridges to A), ``recv_audio``
    reads inbound frames off the WS (whatever the bridge mixes from
    both legs).

    Limitation: ``to`` MUST be a phone number on this same Twilio
    account. Calling an external PSTN endpoint (a real cell phone)
    requires a different topology (``<Start><Stream>`` + ``<Dial>``)
    which we don't implement here because the inline TwiML route on
    the A-leg can't capture B's audio when B is external.

    Default timeout 120s covers cloudflared cold-start latency.

    Raises:
        RuntimeError: If called after ``wait_for_call()`` (modes are
            exclusive per adapter instance), or if ``to`` is not a
            Twilio number on this account.
        ValueError: If ``to`` is not in E.164 format.
        asyncio.TimeoutError: If the media stream doesn't open within
            ``timeout`` seconds.
    """
    self._assert_connected()
    self._enter_mode("call")
    validate_e164(to)

    assert self.public_base_url is not None
    assert self._rest is not None
    assert self._stream_connected is not None

    if attach_stream_to_self:
        # Resolve B-leg's number SID and snapshot+rewrite its voice_url so
        # B's leg attaches its Media Stream to our harness webhook. We own
        # this number (same Twilio account); disconnect() will restore.
        self._callee_phone_number_sid = self._rest.resolve_phone_number_sid(to)
        self._prior_callee_voice_url = self._rest.read_voice_url(
            self._callee_phone_number_sid
        )
        webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
        self._rest.write_voice_url(self._callee_phone_number_sid, webhook_url)
        logger.info(
            "TwilioAgentAdapter: rewrote callee %s voice_url to %s",
            _redact_e164(to),
            webhook_url,
        )

    # A-leg TwiML: play a short deterministic <Say> line, then hold
    # the bridge open. Twilio runs this on the originator side while
    # B's webhook attaches the Media Stream.
    #
    # The <Say> gives the recording a known-good utterance to
    # transcribe. A bare <Pause> alone produces 120s of line silence
    # that Whisper has been observed to hallucinate as non-English
    # text (issue #465 in this PR). The Say is a one-time anchor at
    # call setup; the Media Stream carries the real bidirectional
    # conversation that follows.
    inline_a_leg_twiml = (
        '<?xml version="1.0" encoding="UTF-8"?>'
        "<Response>"
        f'<Say voice="Polly.Joanna">{PLACE_CALL_A_LEG_SAY_TEXT}</Say>'
        '<Pause length="120"/>'
        "</Response>"
    )
    self._call_sid = self._rest.place_call(
        to=to, from_=self.phone_number, twiml=inline_a_leg_twiml
    )
    logger.info(
        "TwilioAgentAdapter: placed call %s from %s to %s",
        self._call_sid,
        _redact_e164(self.phone_number),
        _redact_e164(to),
    )

    if attach_stream_to_self:
        # Wait for OUR webhook to fire — only meaningful when we rewrote
        # the callee's voice_url to point at us. In originator-only mode
        # (attach_stream_to_self=False), there's no stream coming to us;
        # the callee has its own harness which owns the stream.
        await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)
async def send_dtmf(self, tones: str) ‑> None

Send DTMF digits on the live call (uses Twilio REST <Play digits>).

Expand source code
async def send_dtmf(self, tones: str) -> None:
    """Send DTMF digits on the live call (uses Twilio REST ``<Play digits>``)."""
    if self._rest is None or self._call_sid is None:
        raise RuntimeError("TwilioAgentAdapter: no active call; send_dtmf requires an in-progress call")
    # Run blocking REST call off-thread so we don't stall the event loop.
    await asyncio.to_thread(self._rest.send_dtmf_on_call, self._call_sid, tones)
async def wait_for_call(self, timeout: float = 120.0) ‑> None

Block until someone dials in and the media stream is live.

In default mode (no conference_room): the number's voice_url is overwritten to point at our webhook so inbound calls reach us. Caller (place_call) elsewhere will dial this number.

In conference mode: there's no inbound call to wait for — the two-Twilio-number demo can't naturally have one side receive an inbound call when both legs need conference TwiML. Instead, we ORIGINATE an outbound call FROM this adapter's number TO itself with inline TwiML that opens the capture stream and dials into the shared conference room. This is the symmetric counterpart to place_call in conference mode — both adapters end up as participants in the same room, exchanging audio via the bridge.

Default timeout 120s covers cloudflared cold-start, conference-room formation, and Twilio's webhook ramp.

Raises

RuntimeError
If called after place_call().
asyncio.TimeoutError
If nobody dials in within timeout.
Expand source code
async def wait_for_call(self, timeout: float = 120.0) -> None:
    """
    Block until someone dials in and the media stream is live.

    In **default mode** (no conference_room): the number's ``voice_url``
    is overwritten to point at our webhook so inbound calls reach us.
    Caller (``place_call``) elsewhere will dial this number.

    In **conference mode**: there's no inbound call to wait for — the
    two-Twilio-number demo can't naturally have one side receive an
    inbound call when both legs need conference TwiML. Instead, we
    ORIGINATE an outbound call FROM this adapter's number TO itself
    with inline TwiML that opens the capture stream and dials into
    the shared conference room. This is the symmetric counterpart to
    ``place_call`` in conference mode — both adapters end up as
    participants in the same room, exchanging audio via the bridge.

    Default timeout 120s covers cloudflared cold-start, conference-room
    formation, and Twilio's webhook ramp.

    Raises:
        RuntimeError: If called after ``place_call()``.
        asyncio.TimeoutError: If nobody dials in within ``timeout``.
    """
    self._assert_connected()
    self._enter_mode("answer")

    assert self.public_base_url is not None
    assert self._rest is not None
    assert self._phone_number_sid is not None
    assert self._stream_connected is not None

    # Snapshot the prior webhook so we can restore it on disconnect, then
    # point the number at our server. Only answer mode does this.
    self._prior_voice_url = self._rest.read_voice_url(self._phone_number_sid)
    webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
    self._rest.write_voice_url(self._phone_number_sid, webhook_url)
    logger.info("TwilioAgentAdapter: webhook set to %s", webhook_url)

    await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)

Inherited members

class UnsupportedCapabilityError (adapter_name: str, capability: str, hint: str = '')

Raised when a script step requests a capability the adapter does not advertise. The message names the adapter and the missing capability so users can pick a different adapter or fall back to a capability-free alternative (e.g., interrupt(after=seconds) instead of after_words).

Expand source code
class UnsupportedCapabilityError(RuntimeError):
    """
    Raised when a script step requests a capability the adapter does not
    advertise. The message names the adapter and the missing capability so
    users can pick a different adapter or fall back to a capability-free
    alternative (e.g., interrupt(after=seconds) instead of after_words).
    """

    def __init__(self, adapter_name: str, capability: str, hint: str = ""):
        self.adapter_name = adapter_name
        self.capability = capability
        suffix = f" {hint}" if hint else ""
        super().__init__(
            f"Adapter {adapter_name!r} does not support capability {capability!r}. "
            f"See the adapter capability matrix at https://scenario-docs.langwatch.ai/voice/capability-matrix.{suffix}"
        )

Ancestors

  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException
class VapiAgentAdapter (assistant_id: str, api_key: str)

Abstract base for voice agents that exchange audio with the agent under test.

Subclasses implement connect, disconnect, send_audio, and recv_audio. The default call implementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.

Attributes

capabilities
Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout

Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.

60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.

Override per-adapter at construction time::

adapter = MyVoiceAdapter()
adapter.response_timeout = 90.0  # slow tool-call chain
Expand source code
class VapiAgentAdapter(VoiceAgentAdapter):
    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        input_formats=["pcm16/16000"],
        output_formats=["pcm16/16000"],
    )

    def __init__(self, assistant_id: str, api_key: str):
        super().__init__()
        self.assistant_id = assistant_id
        self.api_key = api_key
        self.websocket_call_url: Optional[str] = None
        self._ws: Optional[object] = None

    async def connect(self) -> None:
        # Integration: POST to Vapi REST API to get websocketCallUrl, then
        # open websocket.
        self.websocket_call_url = f"wss://vapi.ai/ws/{self.assistant_id}"
        self._ws = object()

    async def disconnect(self) -> None:
        self._ws = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._ws is None:
            raise RuntimeError("VapiAgentAdapter: not connected")
        raise PendingTransportError("VapiAgentAdapter")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._ws is None:
            raise RuntimeError("VapiAgentAdapter: not connected")
        raise PendingTransportError("VapiAgentAdapter")

    def __repr__(self) -> str:  # redact credentials
        return f"VapiAgentAdapter(assistant_id={self.assistant_id!r}, api_key='***')"

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class VoiceAgentAdapter

Abstract base for voice agents that exchange audio with the agent under test.

Subclasses implement connect, disconnect, send_audio, and recv_audio. The default call implementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.

Attributes

capabilities
Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout

Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.

60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.

Override per-adapter at construction time::

adapter = MyVoiceAdapter()
adapter.response_timeout = 90.0  # slow tool-call chain
Expand source code
class VoiceAgentAdapter(AgentAdapter):
    """
    Abstract base for voice agents that exchange audio with the agent under test.

    Subclasses implement ``connect``, ``disconnect``, ``send_audio``, and
    ``recv_audio``. The default ``call`` implementation threads audio extracted
    from the last incoming message through the transport and wraps the response
    back into an assistant message.

    Attributes:
        capabilities: Declaration of what the adapter can and cannot do. Each
            concrete subclass must set this as a class attribute.
        response_timeout: Seconds to wait for agent audio after sending user
            audio. Defaults to 60 seconds.

            60 seconds covers a typical real-world STT → LLM → TTS round-trip
            including backoff/retry inside each provider, tool calls, and RAG
            lookups. If you see TimeoutError flakes against a fast LLM-only
            chain, you can lower this; if your agent does heavy processing
            (MCP roundtrips, multi-step tool chains), consider raising it.

            Override per-adapter at construction time::

                adapter = MyVoiceAdapter()
                adapter.response_timeout = 90.0  # slow tool-call chain
    """

    role: ClassVar[AgentRole] = AgentRole.AGENT
    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities()
    response_timeout: float = 60.0  # 60s: STT + LLM + TTS budget (see docstring)
    # Tail silence: once the first agent chunk arrives, keep draining recv_audio
    # until no chunk shows up within this many seconds — that's how we detect the
    # agent finished talking. Without this, demos record only the first ~100ms.
    response_tail_silence: float = 0.6
    # Hard cap on a single agent turn's audio. Prevents runaway loops if a
    # transport never signals end-of-stream. 30s = a long sentence.
    response_max_duration: float = 30.0

    def __init__(self) -> None:
        # Per-instance event used by the interruption path to wait until
        # the agent is actually speaking before firing an interrupt — so
        # we don't fire ``clear`` at a silent SUT. Subclasses that
        # override ``__init__`` must call ``super().__init__()``.
        self._agent_speaking = asyncio.Event()

    @property
    def _agent_speaking_event(self) -> asyncio.Event:
        """Event set when the agent emits its first chunk of the current turn."""
        # Safety net for subclasses that pre-date this base ``__init__``
        # contract and didn't call ``super().__init__()``. They get a
        # one-shot lazy event so the interruption path doesn't crash.
        # We emit a single warning per subclass — silent fallback masks
        # bugs, but a warning per call would spam the timing-critical
        # interruption path. New adapters must call super().__init__().
        ev = getattr(self, "_agent_speaking", None)
        if ev is None:
            cls = type(self)
            if not getattr(cls, "_agent_speaking_lazy_warned", False):
                logger.warning(
                    "%s.__init__() did not call super().__init__(); "
                    "lazily initialising _agent_speaking event. "
                    "Add super().__init__() to silence this warning.",
                    cls.__name__,
                )
                # setattr() form: pyright won't infer this dynamic class attr
                # otherwise (reportAttributeAccessIssue). Functionally identical
                # to cls._agent_speaking_lazy_warned = True.
                setattr(cls, "_agent_speaking_lazy_warned", True)
            ev = asyncio.Event()
            self._agent_speaking = ev
        return ev

    @abstractmethod
    async def connect(self) -> None:
        """Open the transport and prepare to exchange audio."""

    @abstractmethod
    async def disconnect(self) -> None:
        """Close the transport and release resources."""

    @abstractmethod
    async def send_audio(self, chunk: AudioChunk) -> None:
        """Transmit an AudioChunk to the agent under test."""

    @abstractmethod
    async def recv_audio(self, timeout: float) -> AudioChunk:
        """Receive the next AudioChunk from the agent."""

    async def __aenter__(self):
        # Default async context manager: subclasses don't need to
        # reimplement this — they get connect/disconnect sandwiching
        # for free. Override only if a transport needs extra setup
        # ordering around connect.
        await self.connect()
        return self

    async def __aexit__(self, *exc_info: Any) -> None:
        await self.disconnect()

    async def interrupt(self) -> None:
        """Send a first-class interrupt signal to the agent under test.

        Adapters that advertise ``capabilities.interruption=True`` override
        this to send the transport-native interrupt (e.g., Twilio ``clear``,
        OpenAI Realtime ``response.cancel``). The agent stops generating
        audio immediately — much more deterministic than racing VAD against
        a wall-clock sleep.

        The default raises ``UnsupportedCapabilityError``. Callers
        (``scenario.interrupt()``) check ``capabilities.interruption`` and
        fall back to timing-based barge-in (sending audio while the agent
        is speaking) when this returns False.
        """
        from .capabilities import UnsupportedCapabilityError

        raise UnsupportedCapabilityError(
            type(self).__name__,
            "interruption",
            hint=(
                "This adapter has no native interrupt signal. Use the "
                "timing-based barge-in pattern instead: "
                "agent(wait=False) + sleep(N) + user(content), where the "
                "user audio overlaps with the agent's TTS and the SUT's "
                "VAD detects it."
            ),
        )

    async def call(self, input: AgentInput) -> AgentReturnTypes:
        """
        Default implementation: extract audio from the latest user message,
        send it, drain the agent's full response (multiple recv_audio chunks
        until tail silence), record once, return as one assistant audio message.

        Why drain instead of taking one chunk: TTS and realtime APIs stream
        their response in many small chunks. A single recv_audio() returns the
        first one only — the recorder would log ~100ms of agent audio per turn
        and the judge would receive a truncated response. Draining until
        tail-silence (no new chunk for ``response_tail_silence`` seconds) gives
        the natural "agent finished talking" signal that works across
        adapters without each one needing to know its transport's done event.

        Subclasses may override this for specialised flows but will usually
        inherit it.
        """
        # Clear the speaking-event for this turn — set in _drain on first chunk.
        self._agent_speaking_event.clear()
        recorder = _AdapterRecorder(input)
        incoming = extract_audio(input.new_messages[-1]) if input.new_messages else None
        if incoming is not None:
            # Wrap send_audio so user.start = "we began transmitting" and
            # user.end = "we finished transmitting" — both real flow points.
            recorder.mark_user_start()
            await self.send_audio(incoming)
            recorder.record_user(incoming)
        # Drain. Recorder grabs agent.start at first chunk via
        # mark_agent_start, so agent.start is "first chunk on the wire,"
        # not "now minus merged.duration."
        merged = await self._drain_agent_response(on_first_chunk=recorder.mark_agent_start)
        recorder.record_agent(merged)
        return create_audio_message(merged, role="assistant")

    async def _drain_agent_response(
        self, on_first_chunk: Optional[Callable[[], None]] = None
    ) -> AudioChunk:
        """Loop ``recv_audio`` until tail silence or max duration; merge result.

        ``on_first_chunk`` is invoked synchronously the moment the first
        non-empty audio chunk arrives — used by the recorder to capture
        agent.start at a real flow point rather than back-computing from
        the merged-chunk duration.
        """
        try:
            first = await self.recv_audio(timeout=self.response_timeout)
        except asyncio.TimeoutError as err:
            raise FirstChunkTimeoutError(timeout=self.response_timeout) from err
        # First chunk arrived → agent is now speaking. Wakes anyone awaiting
        # _agent_speaking_event (the interruption path).
        if first.data and on_first_chunk is not None:
            on_first_chunk()
        self._agent_speaking_event.set()
        chunks: List[AudioChunk] = [first]
        accumulated = first.duration_seconds
        while accumulated < self.response_max_duration:
            try:
                nxt = await self.recv_audio(timeout=self.response_tail_silence)
            except asyncio.TimeoutError:
                break
            if not nxt.data:
                break
            chunks.append(nxt)
            accumulated += nxt.duration_seconds
        return _merge_chunks(chunks)

Ancestors

Subclasses

Class variables

var capabilities : ClassVar[AdapterCapabilities]
var response_max_duration : float
var response_tail_silence : float
var response_timeout : float
var role : ClassVar[AgentRole]

Methods

async def call(self, input: AgentInput) ‑> str | openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam | List[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam] | ScenarioResult

Default implementation: extract audio from the latest user message, send it, drain the agent's full response (multiple recv_audio chunks until tail silence), record once, return as one assistant audio message.

Why drain instead of taking one chunk: TTS and realtime APIs stream their response in many small chunks. A single recv_audio() returns the first one only — the recorder would log ~100ms of agent audio per turn and the judge would receive a truncated response. Draining until tail-silence (no new chunk for response_tail_silence seconds) gives the natural "agent finished talking" signal that works across adapters without each one needing to know its transport's done event.

Subclasses may override this for specialised flows but will usually inherit it.

Expand source code
async def call(self, input: AgentInput) -> AgentReturnTypes:
    """
    Default implementation: extract audio from the latest user message,
    send it, drain the agent's full response (multiple recv_audio chunks
    until tail silence), record once, return as one assistant audio message.

    Why drain instead of taking one chunk: TTS and realtime APIs stream
    their response in many small chunks. A single recv_audio() returns the
    first one only — the recorder would log ~100ms of agent audio per turn
    and the judge would receive a truncated response. Draining until
    tail-silence (no new chunk for ``response_tail_silence`` seconds) gives
    the natural "agent finished talking" signal that works across
    adapters without each one needing to know its transport's done event.

    Subclasses may override this for specialised flows but will usually
    inherit it.
    """
    # Clear the speaking-event for this turn — set in _drain on first chunk.
    self._agent_speaking_event.clear()
    recorder = _AdapterRecorder(input)
    incoming = extract_audio(input.new_messages[-1]) if input.new_messages else None
    if incoming is not None:
        # Wrap send_audio so user.start = "we began transmitting" and
        # user.end = "we finished transmitting" — both real flow points.
        recorder.mark_user_start()
        await self.send_audio(incoming)
        recorder.record_user(incoming)
    # Drain. Recorder grabs agent.start at first chunk via
    # mark_agent_start, so agent.start is "first chunk on the wire,"
    # not "now minus merged.duration."
    merged = await self._drain_agent_response(on_first_chunk=recorder.mark_agent_start)
    recorder.record_agent(merged)
    return create_audio_message(merged, role="assistant")
async def connect(self) ‑> None

Open the transport and prepare to exchange audio.

Expand source code
@abstractmethod
async def connect(self) -> None:
    """Open the transport and prepare to exchange audio."""
async def disconnect(self) ‑> None

Close the transport and release resources.

Expand source code
@abstractmethod
async def disconnect(self) -> None:
    """Close the transport and release resources."""
async def interrupt(self) ‑> None

Send a first-class interrupt signal to the agent under test.

Adapters that advertise capabilities.interruption=True override this to send the transport-native interrupt (e.g., Twilio clear, OpenAI Realtime response.cancel). The agent stops generating audio immediately — much more deterministic than racing VAD against a wall-clock sleep.

The default raises UnsupportedCapabilityError. Callers (interrupt()) check capabilities.interruption and fall back to timing-based barge-in (sending audio while the agent is speaking) when this returns False.

Expand source code
async def interrupt(self) -> None:
    """Send a first-class interrupt signal to the agent under test.

    Adapters that advertise ``capabilities.interruption=True`` override
    this to send the transport-native interrupt (e.g., Twilio ``clear``,
    OpenAI Realtime ``response.cancel``). The agent stops generating
    audio immediately — much more deterministic than racing VAD against
    a wall-clock sleep.

    The default raises ``UnsupportedCapabilityError``. Callers
    (``scenario.interrupt()``) check ``capabilities.interruption`` and
    fall back to timing-based barge-in (sending audio while the agent
    is speaking) when this returns False.
    """
    from .capabilities import UnsupportedCapabilityError

    raise UnsupportedCapabilityError(
        type(self).__name__,
        "interruption",
        hint=(
            "This adapter has no native interrupt signal. Use the "
            "timing-based barge-in pattern instead: "
            "agent(wait=False) + sleep(N) + user(content), where the "
            "user audio overlaps with the agent's TTS and the SUT's "
            "VAD detects it."
        ),
    )
async def recv_audio(self, timeout: float) ‑> AudioChunk

Receive the next AudioChunk from the agent.

Expand source code
@abstractmethod
async def recv_audio(self, timeout: float) -> AudioChunk:
    """Receive the next AudioChunk from the agent."""
async def send_audio(self, chunk: AudioChunk) ‑> None

Transmit an AudioChunk to the agent under test.

Expand source code
@abstractmethod
async def send_audio(self, chunk: AudioChunk) -> None:
    """Transmit an AudioChunk to the agent under test."""
class VoiceEvent (time: float, type: str, name: Optional[str] = None, args: Optional[Dict[str, Any]] = None, result: Optional[Any] = None, latency: Optional[float] = None, metadata: Optional[Dict[str, Any]] = None)

One timestamped event on the voice conversation timeline.

Types (from §4.6 L600-615): user_start_speaking, user_stop_speaking, agent_start_speaking, agent_stop_speaking, tool_call, tool_result, user_interrupt.

latency is populated for agent_start_speaking events and measures the response time from the preceding user_stop_speaking event.

metadata is a free-form dict for type-specific context. Examples: - user_interrupt: {"adapter": "PipecatAgentAdapter", "native": True} - tool_call: {"call_id": "…"}

Expand source code
@dataclass
class VoiceEvent:
    """
    One timestamped event on the voice conversation timeline.

    Types (from §4.6 L600-615):
        user_start_speaking, user_stop_speaking, agent_start_speaking,
        agent_stop_speaking, tool_call, tool_result, user_interrupt.

    `latency` is populated for ``agent_start_speaking`` events and measures
    the response time from the preceding user_stop_speaking event.

    `metadata` is a free-form dict for type-specific context. Examples:
        - user_interrupt: {"adapter": "PipecatAgentAdapter", "native": True}
        - tool_call:      {"call_id": "..."}
    """

    time: float
    type: str
    name: Optional[str] = None
    args: Optional[Dict[str, Any]] = None
    result: Optional[Any] = None
    latency: Optional[float] = None
    metadata: Optional[Dict[str, Any]] = None

Instance variables

var args : Dict[str, Any] | None
var latency : float | None
var metadata : Dict[str, Any] | None
var name : str | None
var result : Any | None
var time : float
var type : str
class VoiceRecording (segments: List[AudioSegment] = <factory>, timeline: "List['VoiceEvent']" = <factory>)

The full audio record of a voice scenario, segmented by speaker.

Usage (§4.6): result.audio.save("conversation.wav") result.audio.save("conversation.mp3", format="mp3") for seg in result.audio.segments: …

timeline mirrors result.timeline so save_segments() can write timestamped events (user_interrupt, etc.) into the manifest. Populated by the executor at end-of-scenario via _attach_voice_output.

Expand source code
@dataclass
class VoiceRecording:
    """
    The full audio record of a voice scenario, segmented by speaker.

    Usage (§4.6):
        result.audio.save("conversation.wav")
        result.audio.save("conversation.mp3", format="mp3")
        for seg in result.audio.segments: ...

    ``timeline`` mirrors result.timeline so save_segments() can write
    timestamped events (user_interrupt, etc.) into the manifest. Populated
    by the executor at end-of-scenario via _attach_voice_output.
    """

    segments: List[AudioSegment] = field(default_factory=list)
    timeline: List["VoiceEvent"] = field(default_factory=list)

    @property
    def duration(self) -> float:
        if not self.segments:
            return 0.0
        return max(s.end_time for s in self.segments)

    @property
    def full_wav(self) -> bytes:
        """Full mixed/concatenated conversation audio as a WAV byte string."""
        from io import BytesIO
        import wave

        buf = BytesIO()
        with wave.open(buf, "wb") as w:
            w.setnchannels(PCM16_CHANNELS)
            w.setsampwidth(2)
            w.setframerate(PCM16_SAMPLE_RATE)
            for seg in sorted(self.segments, key=lambda s: s.start_time):
                w.writeframes(seg.audio)
        return buf.getvalue()

    _ALLOWED_FORMATS = frozenset({"wav", "mp3", "ogg", "flac"})

    def save(self, path: Union[str, Path], format: Optional[str] = None) -> Path:
        """
        Save the conversation audio to a file.

        By default the format is inferred from the path suffix. ``format="mp3"``
        (or any non-wav format) uses the bundled ffmpeg binary via imageio-ffmpeg
        to transcode from the internal WAV representation.

        Security: ``path`` is resolved (``Path.resolve()``) before writing, and
        ``format`` is validated against an allowlist of supported formats. This
        prevents passing arbitrary ffmpeg muxer names or relying on ambiguous
        path semantics.
        """
        resolved = Path(path).resolve()
        fmt = (format or resolved.suffix.lstrip(".")).lower() or "wav"
        if fmt not in self._ALLOWED_FORMATS:
            raise ValueError(
                f"save(format={fmt!r}) not supported; allowed: "
                f"{sorted(self._ALLOWED_FORMATS)}"
            )
        wav_bytes = self.full_wav
        if fmt == "wav":
            resolved.write_bytes(wav_bytes)
            return resolved

        import subprocess

        import imageio_ffmpeg

        ffmpeg = imageio_ffmpeg.get_ffmpeg_exe()
        # -protocol_whitelist file,pipe — defence in depth. Input here is
        # our own WAV bytes (not user-controlled), but the whitelist costs
        # nothing and forecloses future regressions if a caller pipes in
        # externally sourced container bytes through this path.
        proc = subprocess.run(
            [
                ffmpeg,
                "-protocol_whitelist", "file,pipe",
                "-loglevel", "error",
                "-y",
                "-f", "wav",
                "-i", "pipe:0",
                "-f", fmt,
                str(resolved),
            ],
            input=wav_bytes,
            capture_output=True,
        )
        if proc.returncode != 0:
            raise RuntimeError(
                f"ffmpeg transcode to {fmt!r} failed: {proc.stderr.decode(errors='replace')}"
            )
        return resolved

    def save_segments(self, dir: Union[str, Path], manifest: bool = True) -> Path:
        """
        Write each segment as its own WAV file plus the full mixed conversation,
        optionally with a JSON manifest pairing files to transcripts/timestamps.

        Layout::

            <dir>/
                segments/
                    00-user-0000ms.wav
                    01-agent-0312ms.wav
                    ...
                full.wav
                manifest.json   # iff manifest=True

        Segment file names: zero-padded index, role, start_time in ms.
        Manifest schema::

            {
              "generated_at": "<ISO 8601 UTC>",
              "duration": <float seconds>,
              "segment_count": <int>,
              "segments": [
                {"idx": 0, "file": "segments/00-user-0000ms.wav",
                 "role": "user", "start_time": 0.0, "end_time": 1.2,
                 "duration": 1.2, "transcript": "..."}
              ]
            }

        The directory is created (parents=True, exist_ok=True). Existing
        contents in the target directory are NOT cleared — caller decides
        retention.  Returns the resolved directory path.
        """
        from io import BytesIO
        import wave

        target = Path(dir).resolve()
        segments_dir = target / "segments"
        target.mkdir(parents=True, exist_ok=True)
        segments_dir.mkdir(parents=True, exist_ok=True)

        ordered = sorted(self.segments, key=lambda s: s.start_time)
        segment_entries: List[Dict[str, Any]] = []

        for idx, seg in enumerate(ordered):
            start_ms = int(seg.start_time * 1000)
            filename = f"{idx:02d}-{seg.speaker}-{start_ms:04d}ms.wav"
            seg_path = segments_dir / filename

            buf = BytesIO()
            with wave.open(buf, "wb") as w:
                w.setnchannels(PCM16_CHANNELS)
                w.setsampwidth(2)
                w.setframerate(PCM16_SAMPLE_RATE)
                w.writeframes(seg.audio)
            seg_path.write_bytes(buf.getvalue())

            rel_file = f"segments/{filename}"
            entry: Dict[str, Any] = {
                "idx": idx,
                "file": rel_file,
                "role": seg.speaker,
                "start_time": seg.start_time,
                "end_time": seg.end_time,
                "duration": seg.end_time - seg.start_time,
                "transcript": seg.transcript,
            }
            if seg.transcript_truncated:
                entry["transcript_truncated"] = True
            segment_entries.append(entry)

        # Write the full mixed WAV.
        (target / "full.wav").write_bytes(self.full_wav)

        if manifest:
            event_entries: List[Dict[str, Any]] = []
            for evt in sorted(self.timeline, key=lambda e: e.time):
                entry: Dict[str, Any] = {"time": evt.time, "type": evt.type}
                if evt.latency is not None:
                    entry["latency"] = evt.latency
                if evt.name is not None:
                    entry["name"] = evt.name
                if evt.args is not None:
                    entry["args"] = evt.args
                if evt.result is not None:
                    entry["result"] = evt.result
                if evt.metadata is not None:
                    entry["metadata"] = evt.metadata
                event_entries.append(entry)
            manifest_data: Dict[str, Any] = {
                "generated_at": datetime.now(timezone.utc).isoformat(),
                "duration": self.duration,
                "segment_count": len(ordered),
                "segments": segment_entries,
                "events": event_entries,
            }
            (target / "manifest.json").write_text(
                json.dumps(manifest_data, indent=2), encoding="utf-8"
            )

        return target

Instance variables

var duration : float
Expand source code
@property
def duration(self) -> float:
    if not self.segments:
        return 0.0
    return max(s.end_time for s in self.segments)
var full_wav : bytes

Full mixed/concatenated conversation audio as a WAV byte string.

Expand source code
@property
def full_wav(self) -> bytes:
    """Full mixed/concatenated conversation audio as a WAV byte string."""
    from io import BytesIO
    import wave

    buf = BytesIO()
    with wave.open(buf, "wb") as w:
        w.setnchannels(PCM16_CHANNELS)
        w.setsampwidth(2)
        w.setframerate(PCM16_SAMPLE_RATE)
        for seg in sorted(self.segments, key=lambda s: s.start_time):
            w.writeframes(seg.audio)
    return buf.getvalue()
var segments : List[AudioSegment]
var timeline : List[VoiceEvent]

Methods

def save(self, path: Union[str, Path], format: Optional[str] = None) ‑> pathlib.Path

Save the conversation audio to a file.

By default the format is inferred from the path suffix. format="mp3" (or any non-wav format) uses the bundled ffmpeg binary via imageio-ffmpeg to transcode from the internal WAV representation.

Security: path is resolved (Path.resolve()) before writing, and format is validated against an allowlist of supported formats. This prevents passing arbitrary ffmpeg muxer names or relying on ambiguous path semantics.

Expand source code
def save(self, path: Union[str, Path], format: Optional[str] = None) -> Path:
    """
    Save the conversation audio to a file.

    By default the format is inferred from the path suffix. ``format="mp3"``
    (or any non-wav format) uses the bundled ffmpeg binary via imageio-ffmpeg
    to transcode from the internal WAV representation.

    Security: ``path`` is resolved (``Path.resolve()``) before writing, and
    ``format`` is validated against an allowlist of supported formats. This
    prevents passing arbitrary ffmpeg muxer names or relying on ambiguous
    path semantics.
    """
    resolved = Path(path).resolve()
    fmt = (format or resolved.suffix.lstrip(".")).lower() or "wav"
    if fmt not in self._ALLOWED_FORMATS:
        raise ValueError(
            f"save(format={fmt!r}) not supported; allowed: "
            f"{sorted(self._ALLOWED_FORMATS)}"
        )
    wav_bytes = self.full_wav
    if fmt == "wav":
        resolved.write_bytes(wav_bytes)
        return resolved

    import subprocess

    import imageio_ffmpeg

    ffmpeg = imageio_ffmpeg.get_ffmpeg_exe()
    # -protocol_whitelist file,pipe — defence in depth. Input here is
    # our own WAV bytes (not user-controlled), but the whitelist costs
    # nothing and forecloses future regressions if a caller pipes in
    # externally sourced container bytes through this path.
    proc = subprocess.run(
        [
            ffmpeg,
            "-protocol_whitelist", "file,pipe",
            "-loglevel", "error",
            "-y",
            "-f", "wav",
            "-i", "pipe:0",
            "-f", fmt,
            str(resolved),
        ],
        input=wav_bytes,
        capture_output=True,
    )
    if proc.returncode != 0:
        raise RuntimeError(
            f"ffmpeg transcode to {fmt!r} failed: {proc.stderr.decode(errors='replace')}"
        )
    return resolved
def save_segments(self, dir: Union[str, Path], manifest: bool = True) ‑> pathlib.Path

Write each segment as its own WAV file plus the full mixed conversation, optionally with a JSON manifest pairing files to transcripts/timestamps.

Layout::

<dir>/
    segments/
        00-user-0000ms.wav
        01-agent-0312ms.wav
        ...
    full.wav
    manifest.json   # iff manifest=True

Segment file names: zero-padded index, role, start_time in ms. Manifest schema::

{
  "generated_at": "<ISO 8601 UTC>",
  "duration": <float seconds>,
  "segment_count": <int>,
  "segments": [
    {"idx": 0, "file": "segments/00-user-0000ms.wav",
     "role": "user", "start_time": 0.0, "end_time": 1.2,
     "duration": 1.2, "transcript": "..."}
  ]
}

The directory is created (parents=True, exist_ok=True). Existing contents in the target directory are NOT cleared — caller decides retention. Returns the resolved directory path.

Expand source code
def save_segments(self, dir: Union[str, Path], manifest: bool = True) -> Path:
    """
    Write each segment as its own WAV file plus the full mixed conversation,
    optionally with a JSON manifest pairing files to transcripts/timestamps.

    Layout::

        <dir>/
            segments/
                00-user-0000ms.wav
                01-agent-0312ms.wav
                ...
            full.wav
            manifest.json   # iff manifest=True

    Segment file names: zero-padded index, role, start_time in ms.
    Manifest schema::

        {
          "generated_at": "<ISO 8601 UTC>",
          "duration": <float seconds>,
          "segment_count": <int>,
          "segments": [
            {"idx": 0, "file": "segments/00-user-0000ms.wav",
             "role": "user", "start_time": 0.0, "end_time": 1.2,
             "duration": 1.2, "transcript": "..."}
          ]
        }

    The directory is created (parents=True, exist_ok=True). Existing
    contents in the target directory are NOT cleared — caller decides
    retention.  Returns the resolved directory path.
    """
    from io import BytesIO
    import wave

    target = Path(dir).resolve()
    segments_dir = target / "segments"
    target.mkdir(parents=True, exist_ok=True)
    segments_dir.mkdir(parents=True, exist_ok=True)

    ordered = sorted(self.segments, key=lambda s: s.start_time)
    segment_entries: List[Dict[str, Any]] = []

    for idx, seg in enumerate(ordered):
        start_ms = int(seg.start_time * 1000)
        filename = f"{idx:02d}-{seg.speaker}-{start_ms:04d}ms.wav"
        seg_path = segments_dir / filename

        buf = BytesIO()
        with wave.open(buf, "wb") as w:
            w.setnchannels(PCM16_CHANNELS)
            w.setsampwidth(2)
            w.setframerate(PCM16_SAMPLE_RATE)
            w.writeframes(seg.audio)
        seg_path.write_bytes(buf.getvalue())

        rel_file = f"segments/{filename}"
        entry: Dict[str, Any] = {
            "idx": idx,
            "file": rel_file,
            "role": seg.speaker,
            "start_time": seg.start_time,
            "end_time": seg.end_time,
            "duration": seg.end_time - seg.start_time,
            "transcript": seg.transcript,
        }
        if seg.transcript_truncated:
            entry["transcript_truncated"] = True
        segment_entries.append(entry)

    # Write the full mixed WAV.
    (target / "full.wav").write_bytes(self.full_wav)

    if manifest:
        event_entries: List[Dict[str, Any]] = []
        for evt in sorted(self.timeline, key=lambda e: e.time):
            entry: Dict[str, Any] = {"time": evt.time, "type": evt.type}
            if evt.latency is not None:
                entry["latency"] = evt.latency
            if evt.name is not None:
                entry["name"] = evt.name
            if evt.args is not None:
                entry["args"] = evt.args
            if evt.result is not None:
                entry["result"] = evt.result
            if evt.metadata is not None:
                entry["metadata"] = evt.metadata
            event_entries.append(entry)
        manifest_data: Dict[str, Any] = {
            "generated_at": datetime.now(timezone.utc).isoformat(),
            "duration": self.duration,
            "segment_count": len(ordered),
            "segments": segment_entries,
            "events": event_entries,
        }
        (target / "manifest.json").write_text(
            json.dumps(manifest_data, indent=2), encoding="utf-8"
        )

    return target
class WebRTCAgentAdapter (signaling_url: str)

Generic WebRTC adapter that negotiates via an HTTP signaling URL.

Expand source code
class WebRTCAgentAdapter(VoiceAgentAdapter):
    """Generic WebRTC adapter that negotiates via an HTTP signaling URL."""

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=False,
        native_vad=False,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(self, signaling_url: str):
        super().__init__()
        self.signaling_url = signaling_url
        self._pc: Optional[Any] = None
        self._inbound_audio: "asyncio.Queue[AudioChunk]" = asyncio.Queue()

    async def connect(self) -> None:
        # Deferred: actual SDP exchange requires a reachable signaling server.
        # Tested at @integration level with a loopback aiortc peer.
        self._pc = object()  # sentinel — mark "connected"

    async def disconnect(self) -> None:
        self._pc = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._pc is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        raise PendingTransportError(type(self).__name__)

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._pc is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        # If a subclass populated _inbound_audio, return from it.
        if not self._inbound_audio.empty():
            return await asyncio.wait_for(self._inbound_audio.get(), timeout=timeout)
        raise PendingTransportError(type(self).__name__)

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class WebRTCVadFallback (adapter_name: str, aggressiveness: int = 2, on_speech_start: Optional[Callable[[], None]] = None, on_speech_end: Optional[Callable[[], None]] = None)

Incremental VAD over PCM16 @ 24kHz mono audio.

Feed chunks via process(chunk); the callbacks on_speech_start and on_speech_end fire when the speech/silence transitions stabilise.

Aggressiveness level 0–3 (3 = most aggressive filter). Frame size 10, 20, or 30 ms; we use 30 ms.

Expand source code
class WebRTCVadFallback:
    """
    Incremental VAD over PCM16 @ 24kHz mono audio.

    Feed chunks via ``process(chunk)``; the callbacks ``on_speech_start`` and
    ``on_speech_end`` fire when the speech/silence transitions stabilise.

    Aggressiveness level 0–3 (3 = most aggressive filter). Frame size 10, 20,
    or 30 ms; we use 30 ms.
    """

    FRAME_MS = 30
    SAMPLES_PER_FRAME = PCM16_SAMPLE_RATE * FRAME_MS // 1000
    BYTES_PER_FRAME = SAMPLES_PER_FRAME * 2

    _warned_adapters: "set[str]" = set()

    @classmethod
    def reset_warnings(cls) -> None:
        """Clear the per-adapter warning memoization. Intended for tests."""
        cls._warned_adapters = set()

    @classmethod
    def _emit_fallback_warning_once(cls, adapter_name: str) -> None:
        if adapter_name in cls._warned_adapters:
            return
        cls._warned_adapters.add(adapter_name)
        warnings.warn(
            f"Adapter {adapter_name!r} has no native VAD — using SDK-side webrtcvad. "
            f"Accuracy may differ from native VAD.",
            UserWarning,
            stacklevel=3,
        )

    def __init__(
        self,
        adapter_name: str,
        aggressiveness: int = 2,
        on_speech_start: Optional[Callable[[], None]] = None,
        on_speech_end: Optional[Callable[[], None]] = None,
    ):
        self._emit_fallback_warning_once(adapter_name)
        import webrtcvad  # hard dep via webrtcvad-wheels

        self._vad = webrtcvad.Vad(aggressiveness)
        self._speaking = False
        self._buf = bytearray()
        self._on_start = on_speech_start or (lambda: None)
        self._on_end = on_speech_end or (lambda: None)
        # webrtcvad supports 8000/16000/32000/48000 Hz. 24000 is NOT supported,
        # so we downsample frames to 16kHz for classification while keeping the
        # original audio as PCM16 @ 24kHz.
        self._vad_rate = 16000

    def _resample_24k_to_16k(self, pcm24k: bytes) -> bytes:
        """Naive 24k -> 16k downsample (ratio 3:2) for VAD classification only."""
        import numpy as np

        samples = np.frombuffer(pcm24k, dtype=np.int16)
        if len(samples) == 0:
            return b""
        # Simple linear resampling: output = round(len * 2 / 3)
        new_len = max(1, int(round(len(samples) * 2 / 3)))
        idx = np.linspace(0, len(samples) - 1, new_len).astype(np.int64)
        out = samples[idx].astype(np.int16)
        return out.tobytes()

    def process(self, chunk: AudioChunk) -> None:
        """Feed audio to the detector; callbacks fire on transitions."""
        self._buf.extend(chunk.data)
        while len(self._buf) >= self.BYTES_PER_FRAME:
            frame = bytes(self._buf[: self.BYTES_PER_FRAME])
            del self._buf[: self.BYTES_PER_FRAME]
            # Resample to 16kHz for webrtcvad (24kHz not supported).
            frame_16k = self._resample_24k_to_16k(frame)
            bytes_per_frame_16k = (self._vad_rate * self.FRAME_MS // 1000) * 2
            if len(frame_16k) < bytes_per_frame_16k:
                continue
            frame_16k = frame_16k[:bytes_per_frame_16k]
            try:
                is_speech = self._vad.is_speech(frame_16k, self._vad_rate)
            except Exception:  # pragma: no cover — defensive
                is_speech = False
            if is_speech and not self._speaking:
                self._speaking = True
                self._on_start()
            elif not is_speech and self._speaking:
                self._speaking = False
                self._on_end()

    @property
    def is_speaking(self) -> bool:
        return self._speaking

Class variables

var BYTES_PER_FRAME
var FRAME_MS
var SAMPLES_PER_FRAME

Static methods

def reset_warnings() ‑> None

Clear the per-adapter warning memoization. Intended for tests.

Instance variables

var is_speaking : bool
Expand source code
@property
def is_speaking(self) -> bool:
    return self._speaking

Methods

def process(self, chunk: AudioChunk) ‑> None

Feed audio to the detector; callbacks fire on transitions.

Expand source code
def process(self, chunk: AudioChunk) -> None:
    """Feed audio to the detector; callbacks fire on transitions."""
    self._buf.extend(chunk.data)
    while len(self._buf) >= self.BYTES_PER_FRAME:
        frame = bytes(self._buf[: self.BYTES_PER_FRAME])
        del self._buf[: self.BYTES_PER_FRAME]
        # Resample to 16kHz for webrtcvad (24kHz not supported).
        frame_16k = self._resample_24k_to_16k(frame)
        bytes_per_frame_16k = (self._vad_rate * self.FRAME_MS // 1000) * 2
        if len(frame_16k) < bytes_per_frame_16k:
            continue
        frame_16k = frame_16k[:bytes_per_frame_16k]
        try:
            is_speech = self._vad.is_speech(frame_16k, self._vad_rate)
        except Exception:  # pragma: no cover — defensive
            is_speech = False
        if is_speech and not self._speaking:
            self._speaking = True
            self._on_start()
        elif not is_speech and self._speaking:
            self._speaking = False
            self._on_end()
class WebSocketAgentAdapter (url: str, protocol: WebSocketProtocol)

Connects to an arbitrary WebSocket endpoint using a user-supplied protocol.

The protocol's encode_audio is called before sending; decode_response is called on each inbound frame until an AudioChunk is produced.

Expand source code
class WebSocketAgentAdapter(VoiceAgentAdapter):
    """
    Connects to an arbitrary WebSocket endpoint using a user-supplied protocol.

    The protocol's ``encode_audio`` is called before sending; ``decode_response``
    is called on each inbound frame until an AudioChunk is produced.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=False,
        native_vad=False,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(self, url: str, protocol: WebSocketProtocol):
        super().__init__()
        self.url = url
        self.protocol = protocol
        self._ws: Optional[Any] = None

    async def connect(self) -> None:
        import websockets  # hard dep

        self._ws = await websockets.connect(self.url)

    async def disconnect(self) -> None:
        if self._ws is not None:
            await self._ws.close()
            self._ws = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._ws is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        payload = self.protocol.encode_audio(chunk.data)
        await self._ws.send(payload)

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._ws is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        loop = asyncio.get_running_loop()
        deadline = loop.time() + timeout
        while True:
            remaining = max(0.0, deadline - loop.time())
            message = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            chunk = self.protocol.decode_response(message)
            if chunk is not None:
                return chunk

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class WebSocketProtocol

Encoder/decoder pair for a custom WebSocket audio protocol.

Expand source code
class WebSocketProtocol(ABC):
    """Encoder/decoder pair for a custom WebSocket audio protocol."""

    @abstractmethod
    def encode_audio(self, audio: bytes) -> Any:
        """Convert PCM16 bytes into the wire representation the server expects."""

    @abstractmethod
    def decode_response(self, message: Any) -> Optional[AudioChunk]:
        """Parse a server message into an AudioChunk, or None if it's not audio."""

Ancestors

  • abc.ABC

Methods

def decode_response(self, message: Any) ‑> AudioChunk | None

Parse a server message into an AudioChunk, or None if it's not audio.

Expand source code
@abstractmethod
def decode_response(self, message: Any) -> Optional[AudioChunk]:
    """Parse a server message into an AudioChunk, or None if it's not audio."""
def encode_audio(self, audio: bytes) ‑> Any

Convert PCM16 bytes into the wire representation the server expects.

Expand source code
@abstractmethod
def encode_audio(self, audio: bytes) -> Any:
    """Convert PCM16 bytes into the wire representation the server expects."""