Module scenario.voice.adapters

Platform-specific voice adapters (Phase 2).

Per the proposal (§7.3): per-platform classes over a unified VoiceAgent(transport=...). PipecatAgentAdapter means "test my Pipecat agent"; TwilioAgentAdapter means "test via phone call"; each has platform-specific constructor parameters that don't fit cleanly on a generic class.

Expand source code
"""
Platform-specific voice adapters (Phase 2).

Per the proposal (§7.3): per-platform classes over a unified
``VoiceAgent(transport=...)``. ``PipecatAgentAdapter`` means "test my Pipecat agent";
``TwilioAgentAdapter`` means "test via phone call"; each has platform-specific
constructor parameters that don't fit cleanly on a generic class.
"""

from __future__ import annotations

from ._stub import PendingTransportError
from .composable import ComposableVoiceAgent, ElevenLabsVoiceAgent
from .elevenlabs import ElevenLabsAgentAdapter
from .gemini_live import GeminiLiveAgentAdapter
from .livekit import LiveKitAgentAdapter
from .openai_realtime import OpenAIRealtimeAgentAdapter
from .pipecat import PipecatAgentAdapter
from .twilio import TwilioAgentAdapter
from .vapi import VapiAgentAdapter
from .webrtc import WebRTCAgentAdapter
from .websocket import WebSocketAgentAdapter, WebSocketProtocol

__all__ = [
    "ComposableVoiceAgent",
    "ElevenLabsAgentAdapter",
    "ElevenLabsVoiceAgent",
    "GeminiLiveAgentAdapter",
    "LiveKitAgentAdapter",
    "OpenAIRealtimeAgentAdapter",
    "PendingTransportError",
    "PipecatAgentAdapter",
    "TwilioAgentAdapter",
    "VapiAgentAdapter",
    "WebRTCAgentAdapter",
    "WebSocketAgentAdapter",
    "WebSocketProtocol",
]

Sub-modules

scenario.voice.adapters.composable

Composable and provider-branded voice agents …

scenario.voice.adapters.elevenlabs

ElevenLabsAgentAdapter: connect to ElevenLabs Conversational AI via their WebSocket …

scenario.voice.adapters.gemini_live

GeminiLiveAgentAdapter: direct-to-model adapter for Gemini Live native-audio …

scenario.voice.adapters.livekit

LiveKitAgentAdapter: join a LiveKit room as a participant and exchange audio …

scenario.voice.adapters.openai_realtime

OpenAIRealtimeAgentAdapter: direct-to-model adapter — the model IS the agent …

scenario.voice.adapters.pipecat

PipecatAgentAdapter: WebSocket client to a user-run Pipecat bot …

scenario.voice.adapters.twilio

TwilioAgentAdapter: bidirectional real phone transport via Twilio Media Streams …

scenario.voice.adapters.vapi

VapiAgentAdapter: call Vapi's REST API to create a call, then connect to the returned websocketCallUrl …

scenario.voice.adapters.webrtc

Generic WebRTC adapter …

scenario.voice.adapters.websocket

Generic WebSocket adapter: bring-your-own protocol …

Classes

class ComposableVoiceAgent (stt: STTProvider, llm: str, tts: str, *, system_prompt: Optional[str] = None)

Locally-executed STT → LLM → TTS voice agent.

stt transcribes incoming user audio, the result is fed to llm (a litellm model string) along with conversation history, and the response is synthesised via the tts voice string using the existing synthesize() router.

Each seam is independently swappable — change any one without touching the other two. Intermediate results are surfaced on instance attributes so the scenario harness can assert on them.

Attributes

last_user_transcript
Transcript of the most-recent user audio turn.
last_llm_response
Text produced by the LLM for the most-recent turn.

Args

stt
STTProvider implementation for the user's audio.
llm
litellm-style model identifier, e.g. COMPOSABLE_VOICE_LLM_MODEL.
tts
TTS voice string in "provider/voice" format, e.g. "openai/nova" or "elevenlabs/rachel".
system_prompt
Optional system prompt seeded at turn zero so the LLM has guidance before the first user message. Defaults to a generic helpful-assistant prompt.
Expand source code
class ComposableVoiceAgent(VoiceAgentAdapter):
    """
    Locally-executed STT → LLM → TTS voice agent.

    ``stt`` transcribes incoming user audio, the result is fed to ``llm``
    (a litellm model string) along with conversation history, and the response
    is synthesised via the ``tts`` voice string using the existing
    ``scenario.voice.synthesize`` router.

    Each seam is independently swappable — change any one without touching the
    other two. Intermediate results are surfaced on instance attributes so the
    scenario harness can assert on them.

    Attributes:
        last_user_transcript: Transcript of the most-recent user audio turn.
        last_llm_response: Text produced by the LLM for the most-recent turn.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=False,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    DEFAULT_SYSTEM_PROMPT = (
        "You are a helpful voice assistant. Respond naturally and conversationally "
        "as this is an audio conversation — be concise, friendly, and clear."
    )

    def __init__(
        self,
        stt: STTProvider,
        llm: str,
        tts: str,
        *,
        system_prompt: Optional[str] = None,
    ) -> None:
        """
        Args:
            stt: STTProvider implementation for the user's audio.
            llm: litellm-style model identifier, e.g. ``COMPOSABLE_VOICE_LLM_MODEL``.
            tts: TTS voice string in ``"provider/voice"`` format,
                 e.g. ``"openai/nova"`` or ``"elevenlabs/rachel"``.
            system_prompt: Optional system prompt seeded at turn zero so the
                LLM has guidance before the first user message. Defaults to a
                generic helpful-assistant prompt.
        """
        super().__init__()
        self.stt = stt
        self.llm = llm
        self.tts = tts

        self.last_user_transcript: Optional[str] = None
        self.last_llm_response: Optional[str] = None

        # Seed history with a system prompt so the first recv_audio call (which
        # can happen before any user audio when the agent speaks first) doesn't
        # send an empty messages array to the LLM.
        self._history: List[dict] = [
            {"role": "system", "content": system_prompt or self.DEFAULT_SYSTEM_PROMPT}
        ]
        # Turn-output guard. ``recv_audio`` synthesises ONE chunk per
        # user turn. The default ``call()`` drains by re-calling
        # ``recv_audio`` until tail-silence — on this adapter that would
        # kick a second LLM call, cancelled later by timeout (wasted
        # credits + latency). The guard makes subsequent ``recv_audio``
        # calls in the same turn return an empty chunk, which the drain
        # loop interprets as end-of-stream.
        #
        # Reset boundary: ``send_audio`` (new user audio → new turn).
        # Set boundary: end of ``recv_audio`` (LLM+TTS completed).
        self._turn_output_emitted: bool = False

    def __repr__(self) -> str:
        return f"ComposableVoiceAgent(llm={self.llm!r}, tts={self.tts!r})"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """No-op — no external transport to open."""

    async def disconnect(self) -> None:
        """No-op — nothing to tear down."""

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """Transcribe the chunk via STT and store for the next recv_audio call."""
        transcript = await self.stt.transcribe(chunk)
        self.last_user_transcript = transcript
        self._history.append({"role": "user", "content": transcript})
        # New user turn → next recv_audio is allowed to synthesise.
        self._turn_output_emitted = False

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Run the LLM on the current history, synthesise the response via TTS,
        and return the resulting AudioChunk.

        ``timeout`` is honoured for the combined LLM+TTS call via
        ``asyncio.wait_for``. Subsequent calls in the same turn (the
        default ``call()`` drains until tail-silence) return an empty
        chunk so the drain loop exits without billing a second LLM
        round-trip — see ``_turn_output_emitted`` for the guard contract.
        """
        if self._turn_output_emitted:
            return AudioChunk(data=b"")

        import asyncio

        async def _run() -> AudioChunk:
            import litellm  # type: ignore
            from litellm.types.utils import Choices, ModelResponse
            from typing import cast as _cast

            from ..tts import synthesize

            completion = await litellm.acompletion(
                model=self.llm,
                messages=self._history,
            )
            # Non-streaming acompletion returns ModelResponse with Choices;
            # cast satisfies pyright without runtime isinstance overhead.
            completion = _cast(ModelResponse, completion)
            choice = _cast(Choices, completion.choices[0])
            response_text: str = choice.message.content or ""
            self.last_llm_response = response_text
            self._history.append({"role": "assistant", "content": response_text})

            return await synthesize(response_text, self.tts)

        chunk = await asyncio.wait_for(_run(), timeout=timeout)
        self._turn_output_emitted = True
        return chunk

Ancestors

Subclasses

Class variables

var DEFAULT_SYSTEM_PROMPT
var capabilities : ClassVar[AdapterCapabilities]

Methods

async def connect(self) ‑> None

No-op — no external transport to open.

Expand source code
async def connect(self) -> None:
    """No-op — no external transport to open."""
async def disconnect(self) ‑> None

No-op — nothing to tear down.

Expand source code
async def disconnect(self) -> None:
    """No-op — nothing to tear down."""
async def recv_audio(self, timeout: float) ‑> AudioChunk

Run the LLM on the current history, synthesise the response via TTS, and return the resulting AudioChunk.

timeout is honoured for the combined LLM+TTS call via asyncio.wait_for. Subsequent calls in the same turn (the default call() drains until tail-silence) return an empty chunk so the drain loop exits without billing a second LLM round-trip — see _turn_output_emitted for the guard contract.

Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """
    Run the LLM on the current history, synthesise the response via TTS,
    and return the resulting AudioChunk.

    ``timeout`` is honoured for the combined LLM+TTS call via
    ``asyncio.wait_for``. Subsequent calls in the same turn (the
    default ``call()`` drains until tail-silence) return an empty
    chunk so the drain loop exits without billing a second LLM
    round-trip — see ``_turn_output_emitted`` for the guard contract.
    """
    if self._turn_output_emitted:
        return AudioChunk(data=b"")

    import asyncio

    async def _run() -> AudioChunk:
        import litellm  # type: ignore
        from litellm.types.utils import Choices, ModelResponse
        from typing import cast as _cast

        from ..tts import synthesize

        completion = await litellm.acompletion(
            model=self.llm,
            messages=self._history,
        )
        # Non-streaming acompletion returns ModelResponse with Choices;
        # cast satisfies pyright without runtime isinstance overhead.
        completion = _cast(ModelResponse, completion)
        choice = _cast(Choices, completion.choices[0])
        response_text: str = choice.message.content or ""
        self.last_llm_response = response_text
        self._history.append({"role": "assistant", "content": response_text})

        return await synthesize(response_text, self.tts)

    chunk = await asyncio.wait_for(_run(), timeout=timeout)
    self._turn_output_emitted = True
    return chunk
async def send_audio(self, chunk: AudioChunk) ‑> None

Transcribe the chunk via STT and store for the next recv_audio call.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """Transcribe the chunk via STT and store for the next recv_audio call."""
    transcript = await self.stt.transcribe(chunk)
    self.last_user_transcript = transcript
    self._history.append({"role": "user", "content": transcript})
    # New user turn → next recv_audio is allowed to synthesise.
    self._turn_output_emitted = False

Inherited members

class ElevenLabsAgentAdapter (agent_id: str, api_key: str, *, system_prompt_override: Optional[str] = None, first_message_override: Optional[str] = None)

ElevenLabs hosted Conversational AI adapter.

Connects to ElevenLabs' hosted endpoint where the STT→LLM→TTS loop runs on their infrastructure. All audio is PCM16 @ 24kHz mono — no conversion needed at either edge.

Not to be confused with :class:ElevenLabsVoiceAgent (in scenario.voice.adapters.composable), which is the typed composable preset that runs locally with separate STT, LLM, and TTS providers. The two complement each other:

  • ElevenLabsAgentAdapter (this class): black-box hosted EL ConvAI; you provide an agent_id provisioned in the EL dashboard and EL runs the whole pipeline server-side.
  • :class:ElevenLabsVoiceAgent: composes ElevenLabsSTTProvider + any LLM + ElevenLabs TTS on your side; you control the prompts, model choice, and tool calls.

Intermediate transcripts are tracked on last_user_transcript and last_agent_transcript for scenario observability.

Example::

adapter = ElevenLabsAgentAdapter(agent_id="abc123", api_key="sk-...")
async with adapter:
    # scenario.run() feeds send_audio / recv_audio ...
Expand source code
class ElevenLabsAgentAdapter(VoiceAgentAdapter):
    """
    ElevenLabs **hosted** Conversational AI adapter.

    Connects to ElevenLabs' hosted endpoint where the STT→LLM→TTS loop runs
    on their infrastructure. All audio is PCM16 @ 24kHz mono — no conversion
    needed at either edge.

    Not to be confused with :class:`ElevenLabsVoiceAgent` (in
    ``scenario.voice.adapters.composable``), which is the typed composable
    preset that runs locally with separate STT, LLM, and TTS providers. The
    two complement each other:

    - ``ElevenLabsAgentAdapter`` (this class): black-box hosted EL ConvAI;
      you provide an ``agent_id`` provisioned in the EL dashboard and EL
      runs the whole pipeline server-side.
    - :class:`ElevenLabsVoiceAgent`: composes ``ElevenLabsSTTProvider`` +
      any LLM + ElevenLabs TTS on your side; you control the prompts,
      model choice, and tool calls.

    Intermediate transcripts are tracked on ``last_user_transcript`` and
    ``last_agent_transcript`` for scenario observability.

    Example::

        adapter = ElevenLabsAgentAdapter(agent_id="abc123", api_key="sk-...")
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        agent_id: str,
        api_key: str,
        *,
        system_prompt_override: Optional[str] = None,
        first_message_override: Optional[str] = None,
    ) -> None:
        super().__init__()
        self.agent_id = agent_id
        self.api_key = api_key
        # Per-session overrides applied via conversation_initiation_client_data
        # at the start of every WS connect. Used by demos that need a
        # different prompt shape (e.g. verbose for interrupt demos) without
        # mutating the shared test agent's persistent config.
        self._system_prompt_override = system_prompt_override
        self._first_message_override = first_message_override
        self._ws: Any = None

        # Transcript observability — updated on each transcript event.
        self.last_user_transcript: Optional[str] = None
        self.last_agent_transcript: Optional[str] = None

    @property
    def url(self) -> str:
        return CONVAI_URL_TEMPLATE.format(agent_id=self.agent_id)

    def __repr__(self) -> str:  # redact credentials
        return f"ElevenLabsAgentAdapter(agent_id={self.agent_id!r}, api_key='***')"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open the WebSocket to ElevenLabs' ConvAI endpoint.

        We send ``conversation_initiation_client_data`` on every connect.
        The EL docs neither require nor forbid this (the reference SDK
        sample sends it unconditionally with an empty body); empirically
        we've seen ``first_message`` skipped on bare connects and reliably
        fire when the init message is sent, even with an empty override
        block. If EL's behavior changes, this is the first thing to
        revisit.
        """
        import websockets

        self._ws = await websockets.connect(
            self.url,
            additional_headers={"xi-api-key": self.api_key},
        )
        logger.debug("ElevenLabsAgentAdapter: connected to %s", self.url)

        agent_override: dict[str, Any] = {}
        if self._system_prompt_override:
            agent_override["prompt"] = {"prompt": self._system_prompt_override}
        if self._first_message_override:
            agent_override["first_message"] = self._first_message_override

        init = {
            "type": "conversation_initiation_client_data",
            "conversation_config_override": {"agent": agent_override},
        }
        await self._ws.send(json.dumps(init))
        logger.debug(
            "ElevenLabsAgentAdapter: sent conversation_initiation_client_data with overrides=%s",
            list(agent_override.keys()) or "none",
        )

    async def disconnect(self) -> None:
        """Close the WebSocket if open."""
        if self._ws is not None:
            try:
                await self._ws.close()
            except Exception:
                # Best-effort: connection may already be half-closed or
                # in an error state when disconnect() is called. We're
                # tearing down regardless — propagating here would just
                # leak the WS reference.
                pass
            finally:
                self._ws = None
            logger.debug("ElevenLabsAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """Send a PCM16 audio chunk encoded as base64 in a JSON message.

        Empirically, EL ConvAI stops responding to subsequent turns if
        the client sends only a single chunk and never signals end of
        turn. The EL docs document no client-driven end-of-turn signal
        (server-side VAD is supposed to handle it) but in practice the
        VAD only fires after enough silence has been observed. We
        append a fixed-size tail of zero-bytes after every chunk to
        provide that silence signal.

        Tail size: 16000 zero bytes — empirically the sweet spot.
        - Removing the tail entirely: EL stops responding to user
          turns after the greeting.
        - Doubling to 24000 bytes (a "true 500ms" at the provisioned
          pcm_24000 rate): EL stops responding mid-conversation, same
          stall pattern.
        - 16000 bytes at pcm_24000 = ~333ms of silence: reliable.

        If EL ever exposes an explicit end-of-turn message we should
        switch to that instead.
        """
        if self._ws is None:
            raise RuntimeError("ElevenLabsAgentAdapter: not connected")

        # 1. Speech.
        b64 = base64.b64encode(chunk.data).decode()
        await self._ws.send(json.dumps({"user_audio_chunk": b64}))

        # 2. Silence tail. See docstring for size rationale.
        silence = b"\x00" * 16000
        silence_b64 = base64.b64encode(silence).decode()
        await self._ws.send(json.dumps({"user_audio_chunk": silence_b64}))

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Receive the next audio chunk from ElevenLabs.

        ``timeout`` bounds **inter-message silence** — the maximum gap between
        any two received frames — NOT the total duration of the call. Every
        received frame (**keep-alive pings included**) resets the idle
        deadline, so this returns when an ``audio`` event arrives and raises
        :class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse
        with **no message of any kind**. Pings are replied to inline;
        transcript events update instance attributes for observability; all
        other event types are swallowed without error.

        Design decision (issue #493 — intentional, not an oversight): because
        a received ping is treated as proof of liveness, a hosted agent that
        keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will
        make this method wait **indefinitely**. There is deliberately **no
        total-duration ceiling** here — a legitimate 30s+ silent-but-pinging
        stretch must not abort the turn, which a cumulative budget would do.
        The caller's ``response_max_duration`` is checked *between*
        ``recv_audio`` calls and does **not** bound a single in-progress recv.
        (An absolute caller-side backstop for the wedged-agent case is tracked
        as a separate follow-up; it is intentionally not implemented here.)
        """
        if self._ws is None:
            raise RuntimeError("ElevenLabsAgentAdapter: not connected")

        deadline = asyncio.get_running_loop().time() + timeout
        while True:
            remaining = deadline - asyncio.get_running_loop().time()
            if remaining <= 0:
                raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out")

            raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            # A received message (ping included) proves the socket is alive, so
            # re-arm the idle deadline. Placed BEFORE json.loads so ANY frame —
            # even a non-JSON/malformed one — counts as a liveness signal.
            deadline = asyncio.get_running_loop().time() + timeout
            try:
                event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
            except Exception:
                logger.debug("ElevenLabsAgentAdapter: non-JSON message, skipping")
                continue

            etype = event.get("type", "")
            logger.debug("ElevenLabsAgentAdapter: recv event %s", etype)

            if etype == "audio":
                audio_event = event.get("audio_event", {})
                b64 = audio_event.get("audio_base_64", "")
                pcm = base64.b64decode(b64)
                # Ensure even byte count (PCM16 invariant).
                if len(pcm) % 2 == 1:
                    pcm = pcm[:-1]
                return AudioChunk(data=pcm)

            elif etype == "ping":
                # Per EL docs, ping wire shape is
                #   {"type": "ping", "ping_event": {"event_id": <int>, "ping_ms": <int>}}
                # Pong must echo the event_id at the top level. The
                # fallback to top-level event_id covers any older shape.
                ping_event = event.get("ping_event") or {}
                event_id = ping_event.get("event_id")
                if event_id is None:
                    event_id = event.get("event_id")
                if event_id is None:
                    logger.debug("ElevenLabsAgentAdapter: ping with no event_id, skipping pong: %r", event)
                    continue
                pong = json.dumps({"type": "pong", "event_id": event_id})
                await self._ws.send(pong)

            elif etype == "user_transcript":
                self.last_user_transcript = event.get("user_transcription_event", {}).get("user_transcript")

            elif etype == "agent_response":
                self.last_agent_transcript = event.get("agent_response_event", {}).get("agent_response")

            elif etype == "agent_response_correction":
                # EL signals a corrected agent reply (post server-side
                # barge-in detection). The corrected text replaces the
                # last_agent_transcript so consumers see what the agent
                # ACTUALLY said after our interrupt landed, not the
                # pre-correction draft.
                #
                # Wire shape:
                #   {"type": "agent_response_correction",
                #    "agent_response_correction_event": {
                #      "original_agent_response": "...",
                #      "corrected_agent_response": "..."}}
                correction = event.get("agent_response_correction_event", {}) or {}
                corrected = correction.get("corrected_agent_response")
                if corrected:
                    self.last_agent_transcript = corrected

            elif etype == "conversation_initiation_metadata":
                # EL reports the agent's actual configured audio formats
                # here. Our adapter capabilities advertise pcm16/24000,
                # matching the test agent we provision. If a caller
                # points the adapter at an agent configured differently,
                # this is where the mismatch becomes visible — warn so
                # the codec mismatch is logged rather than silently
                # garbling audio.
                #
                # Wire shape (per docs):
                #   {"type": "conversation_initiation_metadata",
                #    "conversation_initiation_metadata_event": {
                #      "conversation_id": "...",
                #      "agent_output_audio_format": "pcm_24000",
                #      "user_input_audio_format": "pcm_24000"}}
                meta = event.get("conversation_initiation_metadata_event", {}) or {}
                out_fmt = meta.get("agent_output_audio_format")
                in_fmt = meta.get("user_input_audio_format")
                if out_fmt and out_fmt != "pcm_24000":
                    logger.warning(
                        "ElevenLabsAgentAdapter: agent_output_audio_format=%r "
                        "differs from advertised pcm16/24000 capability; "
                        "audio may pitch-shift or fail to decode.",
                        out_fmt,
                    )
                if in_fmt and in_fmt != "pcm_24000":
                    logger.warning(
                        "ElevenLabsAgentAdapter: user_input_audio_format=%r "
                        "differs from advertised pcm16/24000 capability; "
                        "the agent may not understand audio we send.",
                        in_fmt,
                    )

            elif etype == "interruption":
                pass  # documented non-audio event, no action needed

            else:
                logger.debug("ElevenLabsAgentAdapter: unknown event type %r, skipping", etype)

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var url : str
Expand source code
@property
def url(self) -> str:
    return CONVAI_URL_TEMPLATE.format(agent_id=self.agent_id)

Methods

async def connect(self) ‑> None

Open the WebSocket to ElevenLabs' ConvAI endpoint.

We send conversation_initiation_client_data on every connect. The EL docs neither require nor forbid this (the reference SDK sample sends it unconditionally with an empty body); empirically we've seen first_message skipped on bare connects and reliably fire when the init message is sent, even with an empty override block. If EL's behavior changes, this is the first thing to revisit.

Expand source code
async def connect(self) -> None:
    """Open the WebSocket to ElevenLabs' ConvAI endpoint.

    We send ``conversation_initiation_client_data`` on every connect.
    The EL docs neither require nor forbid this (the reference SDK
    sample sends it unconditionally with an empty body); empirically
    we've seen ``first_message`` skipped on bare connects and reliably
    fire when the init message is sent, even with an empty override
    block. If EL's behavior changes, this is the first thing to
    revisit.
    """
    import websockets

    self._ws = await websockets.connect(
        self.url,
        additional_headers={"xi-api-key": self.api_key},
    )
    logger.debug("ElevenLabsAgentAdapter: connected to %s", self.url)

    agent_override: dict[str, Any] = {}
    if self._system_prompt_override:
        agent_override["prompt"] = {"prompt": self._system_prompt_override}
    if self._first_message_override:
        agent_override["first_message"] = self._first_message_override

    init = {
        "type": "conversation_initiation_client_data",
        "conversation_config_override": {"agent": agent_override},
    }
    await self._ws.send(json.dumps(init))
    logger.debug(
        "ElevenLabsAgentAdapter: sent conversation_initiation_client_data with overrides=%s",
        list(agent_override.keys()) or "none",
    )
async def disconnect(self) ‑> None

Close the WebSocket if open.

Expand source code
async def disconnect(self) -> None:
    """Close the WebSocket if open."""
    if self._ws is not None:
        try:
            await self._ws.close()
        except Exception:
            # Best-effort: connection may already be half-closed or
            # in an error state when disconnect() is called. We're
            # tearing down regardless — propagating here would just
            # leak the WS reference.
            pass
        finally:
            self._ws = None
        logger.debug("ElevenLabsAgentAdapter: disconnected")
async def recv_audio(self, timeout: float) ‑> AudioChunk

Receive the next audio chunk from ElevenLabs.

timeout bounds inter-message silence — the maximum gap between any two received frames — NOT the total duration of the call. Every received frame (keep-alive pings included) resets the idle deadline, so this returns when an audio event arrives and raises :class:asyncio.TimeoutError only after timeout seconds elapse with no message of any kind. Pings are replied to inline; transcript events update instance attributes for observability; all other event types are swallowed without error.

Design decision (issue #493 — intentional, not an oversight): because a received ping is treated as proof of liveness, a hosted agent that keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will make this method wait indefinitely. There is deliberately no total-duration ceiling here — a legitimate 30s+ silent-but-pinging stretch must not abort the turn, which a cumulative budget would do. The caller's response_max_duration is checked between recv_audio calls and does not bound a single in-progress recv. (An absolute caller-side backstop for the wedged-agent case is tracked as a separate follow-up; it is intentionally not implemented here.)

Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """
    Receive the next audio chunk from ElevenLabs.

    ``timeout`` bounds **inter-message silence** — the maximum gap between
    any two received frames — NOT the total duration of the call. Every
    received frame (**keep-alive pings included**) resets the idle
    deadline, so this returns when an ``audio`` event arrives and raises
    :class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse
    with **no message of any kind**. Pings are replied to inline;
    transcript events update instance attributes for observability; all
    other event types are swallowed without error.

    Design decision (issue #493 — intentional, not an oversight): because
    a received ping is treated as proof of liveness, a hosted agent that
    keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will
    make this method wait **indefinitely**. There is deliberately **no
    total-duration ceiling** here — a legitimate 30s+ silent-but-pinging
    stretch must not abort the turn, which a cumulative budget would do.
    The caller's ``response_max_duration`` is checked *between*
    ``recv_audio`` calls and does **not** bound a single in-progress recv.
    (An absolute caller-side backstop for the wedged-agent case is tracked
    as a separate follow-up; it is intentionally not implemented here.)
    """
    if self._ws is None:
        raise RuntimeError("ElevenLabsAgentAdapter: not connected")

    deadline = asyncio.get_running_loop().time() + timeout
    while True:
        remaining = deadline - asyncio.get_running_loop().time()
        if remaining <= 0:
            raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out")

        raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
        # A received message (ping included) proves the socket is alive, so
        # re-arm the idle deadline. Placed BEFORE json.loads so ANY frame —
        # even a non-JSON/malformed one — counts as a liveness signal.
        deadline = asyncio.get_running_loop().time() + timeout
        try:
            event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
        except Exception:
            logger.debug("ElevenLabsAgentAdapter: non-JSON message, skipping")
            continue

        etype = event.get("type", "")
        logger.debug("ElevenLabsAgentAdapter: recv event %s", etype)

        if etype == "audio":
            audio_event = event.get("audio_event", {})
            b64 = audio_event.get("audio_base_64", "")
            pcm = base64.b64decode(b64)
            # Ensure even byte count (PCM16 invariant).
            if len(pcm) % 2 == 1:
                pcm = pcm[:-1]
            return AudioChunk(data=pcm)

        elif etype == "ping":
            # Per EL docs, ping wire shape is
            #   {"type": "ping", "ping_event": {"event_id": <int>, "ping_ms": <int>}}
            # Pong must echo the event_id at the top level. The
            # fallback to top-level event_id covers any older shape.
            ping_event = event.get("ping_event") or {}
            event_id = ping_event.get("event_id")
            if event_id is None:
                event_id = event.get("event_id")
            if event_id is None:
                logger.debug("ElevenLabsAgentAdapter: ping with no event_id, skipping pong: %r", event)
                continue
            pong = json.dumps({"type": "pong", "event_id": event_id})
            await self._ws.send(pong)

        elif etype == "user_transcript":
            self.last_user_transcript = event.get("user_transcription_event", {}).get("user_transcript")

        elif etype == "agent_response":
            self.last_agent_transcript = event.get("agent_response_event", {}).get("agent_response")

        elif etype == "agent_response_correction":
            # EL signals a corrected agent reply (post server-side
            # barge-in detection). The corrected text replaces the
            # last_agent_transcript so consumers see what the agent
            # ACTUALLY said after our interrupt landed, not the
            # pre-correction draft.
            #
            # Wire shape:
            #   {"type": "agent_response_correction",
            #    "agent_response_correction_event": {
            #      "original_agent_response": "...",
            #      "corrected_agent_response": "..."}}
            correction = event.get("agent_response_correction_event", {}) or {}
            corrected = correction.get("corrected_agent_response")
            if corrected:
                self.last_agent_transcript = corrected

        elif etype == "conversation_initiation_metadata":
            # EL reports the agent's actual configured audio formats
            # here. Our adapter capabilities advertise pcm16/24000,
            # matching the test agent we provision. If a caller
            # points the adapter at an agent configured differently,
            # this is where the mismatch becomes visible — warn so
            # the codec mismatch is logged rather than silently
            # garbling audio.
            #
            # Wire shape (per docs):
            #   {"type": "conversation_initiation_metadata",
            #    "conversation_initiation_metadata_event": {
            #      "conversation_id": "...",
            #      "agent_output_audio_format": "pcm_24000",
            #      "user_input_audio_format": "pcm_24000"}}
            meta = event.get("conversation_initiation_metadata_event", {}) or {}
            out_fmt = meta.get("agent_output_audio_format")
            in_fmt = meta.get("user_input_audio_format")
            if out_fmt and out_fmt != "pcm_24000":
                logger.warning(
                    "ElevenLabsAgentAdapter: agent_output_audio_format=%r "
                    "differs from advertised pcm16/24000 capability; "
                    "audio may pitch-shift or fail to decode.",
                    out_fmt,
                )
            if in_fmt and in_fmt != "pcm_24000":
                logger.warning(
                    "ElevenLabsAgentAdapter: user_input_audio_format=%r "
                    "differs from advertised pcm16/24000 capability; "
                    "the agent may not understand audio we send.",
                    in_fmt,
                )

        elif etype == "interruption":
            pass  # documented non-audio event, no action needed

        else:
            logger.debug("ElevenLabsAgentAdapter: unknown event type %r, skipping", etype)
async def send_audio(self, chunk: AudioChunk) ‑> None

Send a PCM16 audio chunk encoded as base64 in a JSON message.

Empirically, EL ConvAI stops responding to subsequent turns if the client sends only a single chunk and never signals end of turn. The EL docs document no client-driven end-of-turn signal (server-side VAD is supposed to handle it) but in practice the VAD only fires after enough silence has been observed. We append a fixed-size tail of zero-bytes after every chunk to provide that silence signal.

Tail size: 16000 zero bytes — empirically the sweet spot. - Removing the tail entirely: EL stops responding to user turns after the greeting. - Doubling to 24000 bytes (a "true 500ms" at the provisioned pcm_24000 rate): EL stops responding mid-conversation, same stall pattern. - 16000 bytes at pcm_24000 = ~333ms of silence: reliable.

If EL ever exposes an explicit end-of-turn message we should switch to that instead.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """Send a PCM16 audio chunk encoded as base64 in a JSON message.

    Empirically, EL ConvAI stops responding to subsequent turns if
    the client sends only a single chunk and never signals end of
    turn. The EL docs document no client-driven end-of-turn signal
    (server-side VAD is supposed to handle it) but in practice the
    VAD only fires after enough silence has been observed. We
    append a fixed-size tail of zero-bytes after every chunk to
    provide that silence signal.

    Tail size: 16000 zero bytes — empirically the sweet spot.
    - Removing the tail entirely: EL stops responding to user
      turns after the greeting.
    - Doubling to 24000 bytes (a "true 500ms" at the provisioned
      pcm_24000 rate): EL stops responding mid-conversation, same
      stall pattern.
    - 16000 bytes at pcm_24000 = ~333ms of silence: reliable.

    If EL ever exposes an explicit end-of-turn message we should
    switch to that instead.
    """
    if self._ws is None:
        raise RuntimeError("ElevenLabsAgentAdapter: not connected")

    # 1. Speech.
    b64 = base64.b64encode(chunk.data).decode()
    await self._ws.send(json.dumps({"user_audio_chunk": b64}))

    # 2. Silence tail. See docstring for size rationale.
    silence = b"\x00" * 16000
    silence_b64 = base64.b64encode(silence).decode()
    await self._ws.send(json.dumps({"user_audio_chunk": silence_b64}))

Inherited members

class ElevenLabsVoiceAgent (api_key: str, *, llm: str = 'openai/gpt-5.4-mini', voice: Optional[str] = None, stt: Optional[STTProvider] = None, system_prompt: Optional[str] = None)

Composable voice agent with ElevenLabs-opinionated defaults.

Not to be confused with :class:ElevenLabsAgentAdapter (in scenario.voice.adapters.elevenlabs) — that one talks to ElevenLabs' hosted Conversational AI endpoint where EL runs the full STT→LLM→TTS loop. This class is local: you compose ElevenLabsSTTProvider + any LLM + ElevenLabs TTS yourself, keeping full control over prompts, model choice, and tool calls.

Instantiate with just an api_key to get an ElevenLabs STT + LLM (default COMPOSABLE_VOICE_LLM_MODEL) + elevenlabs/rachel TTS stack. Each piece can be overridden independently without changing the others.

Example::

# Defaults — all ElevenLabs STT, GPT-4o-mini, ElevenLabs TTS
agent = ElevenLabsVoiceAgent(api_key="sk-...")

# Override just the LLM
agent = ElevenLabsVoiceAgent(api_key="sk-...", llm="openai/gpt-4o")

# Bring your own STT
agent = ElevenLabsVoiceAgent(api_key="sk-...", stt=MyCustomSTT())

Args

api_key
ElevenLabs API key. Redacted in __repr__.
llm
litellm-style model identifier. Defaults to COMPOSABLE_VOICE_LLM_MODEL.
voice
TTS voice string in "elevenlabs/<voice_id>" format. Defaults to the ELEVENLABS_VOICE_ID environment variable when set, otherwise falls back to "Sarah" ("elevenlabs/EXAVITQu4vr4xnSDxMaL") — premade and accessible on the ElevenLabs free tier as of 2026-05. Other premade voices (e.g. "Rachel" 21m00Tcm4TlvDq8ikWAM) returned 402 paid_plan_required from the EL TTS API; gating differs per voice. Set ELEVENLABS_VOICE_ID to override.
stt
STTProvider override. Defaults to ElevenLabsSTTProvider(api_key=api_key).
system_prompt
Optional system prompt. Defaults to ComposableVoiceAgent.DEFAULT_SYSTEM_PROMPT.
Expand source code
class ElevenLabsVoiceAgent(ComposableVoiceAgent):
    """
    Composable voice agent with ElevenLabs-opinionated defaults.

    Not to be confused with :class:`ElevenLabsAgentAdapter` (in
    ``scenario.voice.adapters.elevenlabs``) — that one talks to ElevenLabs'
    **hosted** Conversational AI endpoint where EL runs the full
    STT→LLM→TTS loop. This class is local: you compose ``ElevenLabsSTTProvider``
    + any LLM + ElevenLabs TTS yourself, keeping full control over prompts,
    model choice, and tool calls.

    Instantiate with just an ``api_key`` to get an ElevenLabs STT +
    LLM (default ``COMPOSABLE_VOICE_LLM_MODEL``) + ``elevenlabs/rachel`` TTS stack. Each piece
    can be overridden independently without changing the others.

    Example::

        # Defaults — all ElevenLabs STT, GPT-4o-mini, ElevenLabs TTS
        agent = ElevenLabsVoiceAgent(api_key="sk-...")

        # Override just the LLM
        agent = ElevenLabsVoiceAgent(api_key="sk-...", llm="openai/gpt-4o")

        # Bring your own STT
        agent = ElevenLabsVoiceAgent(api_key="sk-...", stt=MyCustomSTT())
    """

    def __init__(
        self,
        api_key: str,
        *,
        llm: str = COMPOSABLE_VOICE_LLM_MODEL,
        voice: Optional[str] = None,
        stt: Optional[STTProvider] = None,
        system_prompt: Optional[str] = None,
    ) -> None:
        """
        Args:
            api_key: ElevenLabs API key. Redacted in ``__repr__``.
            llm: litellm-style model identifier. Defaults to
                ``COMPOSABLE_VOICE_LLM_MODEL``.
            voice: TTS voice string in ``"elevenlabs/<voice_id>"`` format.
                Defaults to the ``ELEVENLABS_VOICE_ID`` environment variable
                when set, otherwise falls back to "Sarah"
                (``"elevenlabs/EXAVITQu4vr4xnSDxMaL"``) — premade and
                accessible on the ElevenLabs free tier as of 2026-05.
                Other premade voices (e.g. "Rachel"
                ``21m00Tcm4TlvDq8ikWAM``) returned 402 paid_plan_required
                from the EL TTS API; gating differs per voice.  Set
                ``ELEVENLABS_VOICE_ID`` to override.
            stt: STTProvider override. Defaults to
                ``ElevenLabsSTTProvider(api_key=api_key)``.
            system_prompt: Optional system prompt. Defaults to
                ``ComposableVoiceAgent.DEFAULT_SYSTEM_PROMPT``.
        """
        import os

        if voice is None:
            env_voice_id = os.environ.get("ELEVENLABS_VOICE_ID")
            voice = (
                f"elevenlabs/{env_voice_id}"
                if env_voice_id
                else "elevenlabs/EXAVITQu4vr4xnSDxMaL"  # "Sarah" — free-tier premade
            )
        resolved_stt = stt if stt is not None else ElevenLabsSTTProvider(api_key=api_key)
        super().__init__(stt=resolved_stt, llm=llm, tts=voice, system_prompt=system_prompt)
        self._api_key = api_key
        self.voice = voice

    def __repr__(self) -> str:  # redact credentials
        return (
            f"ElevenLabsVoiceAgent("
            f"api_key='***', llm={self.llm!r}, voice={self.voice!r})"
        )

Ancestors

Inherited members

class GeminiLiveAgentAdapter (model: str = 'gemini-2.5-flash-native-audio-latest', voice: str = 'Algieba', system_instruction: str = '', api_key: Optional[str] = None)

Gemini Live native-audio adapter.

Connects directly to the Gemini Live API via the official google-genai SDK. STT, LLM, and TTS all run on Google's infrastructure; audio flows bidirectionally as raw PCM16.

Example::

adapter = GeminiLiveAgentAdapter(
    model=GEMINI_LIVE_MODEL,
    system_instruction="You are a helpful assistant.",
)
async with adapter:
    # scenario.run() feeds send_audio / recv_audio ...

Attributes

last_agent_transcript
Most-recent output transcript received from the server (if transcription is available), for observability.
Expand source code
class GeminiLiveAgentAdapter(VoiceAgentAdapter):
    """
    Gemini Live native-audio adapter.

    Connects directly to the Gemini Live API via the official ``google-genai``
    SDK.  STT, LLM, and TTS all run on Google's infrastructure; audio flows
    bidirectionally as raw PCM16.

    Example::

        adapter = GeminiLiveAgentAdapter(
            model=GEMINI_LIVE_MODEL,
            system_instruction="You are a helpful assistant.",
        )
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...

    Attributes:
        last_agent_transcript: Most-recent output transcript received from
            the server (if transcription is available), for observability.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # ``interruption=True``: with explicit Activity markers and
        # ``activity_handling=START_OF_ACTIVITY_INTERRUPTS`` (see
        # ``connect``), the next ``activity_start`` we send while the
        # model is replying causes Gemini to cut its in-flight audio.
        # ``interrupt()`` itself just drains stale chunks out of the
        # local queue so the recovery agent turn doesn't replay them.
        interruption=True,
        input_formats=["pcm16/16000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        model: str = GEMINI_LIVE_MODEL,
        voice: str = "Algieba",
        system_instruction: str = "",
        api_key: Optional[str] = None,
    ) -> None:
        super().__init__()
        self.model = model
        self.voice = voice
        self.system_instruction = system_instruction
        # Resolve key: explicit arg > env var.
        self._api_key: str = api_key or os.environ.get("GEMINI_API_KEY", "")

        # Populated when the background session task is live.
        self._session: Optional[Any] = None
        self._session_task: Optional[asyncio.Task[None]] = None
        self._session_ready: Optional[asyncio.Event] = None
        self._shutdown: Optional[asyncio.Event] = None
        self._session_error: Optional[BaseException] = None

        # Cached async iterator on ``session.receive()``. Acquired lazily
        # on the first ``recv_audio`` call so we can iterate the same
        # stream across consecutive agent turns. Without caching, each
        # ``recv_audio`` would call ``session.receive()`` afresh — which
        # the SDK does not support cleanly across turns.
        self._recv_iter: Optional[Any] = None
        # Tracks whether any audio was received on the CURRENT iterator
        # (reset whenever ``_recv_iter`` is recreated). Used by
        # ``recv_audio`` to distinguish a spurious empty-interrupt turn
        # (no audio at all) from a real mid-reply interrupt (audio
        # arrived before the interrupt landed).
        self._iter_had_audio: bool = False

        # Observability.
        self.last_agent_transcript: Optional[str] = None

    def __repr__(self) -> str:
        # Never leak the API key.
        masked = "***" if self._api_key else ""
        return (
            f"GeminiLiveAgentAdapter("
            f"model={self.model!r}, "
            f"voice={self.voice!r}, "
            f"api_key={masked!r})"
        )

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open a Gemini Live session.

        Spawns a background task that holds the ``async with`` SDK context open
        for the adapter's lifetime.  Returns once the session handshake is
        complete and audio can flow.
        """
        from google import genai  # type: ignore[attr-defined]  # noqa: PLC0415 — lazy import
        from google.genai import types  # noqa: PLC0415

        self._session_ready = asyncio.Event()
        self._shutdown = asyncio.Event()
        loop = asyncio.get_running_loop()
        session_future: asyncio.Future[Any] = loop.create_future()

        config = types.LiveConnectConfig(
            response_modalities=[types.Modality.AUDIO],
            system_instruction=self.system_instruction or None,
            speech_config=types.SpeechConfig(
                voice_config=types.VoiceConfig(
                    prebuilt_voice_config=types.PrebuiltVoiceConfig(
                        voice_name=self.voice,
                    )
                )
            ),
            # Disable Automatic Activity Detection. AAD requires a clean
            # trailing silence to fire its end-of-speech detector, which is
            # unreliable across the audio shapes scenario produces (TTS'd
            # user-sim audio, scripted clips, layered interruptions). With
            # AAD off we drive turn boundaries explicitly via
            # ``activity_start`` / ``activity_end`` in ``send_audio``;
            # Gemini replies the moment we close the turn instead of
            # waiting on its own VAD heuristic. Activity handling is left
            # at its default (START_OF_ACTIVITY_INTERRUPTS): when we send
            # a new ``activity_start`` while Gemini is mid-reply, the
            # model treats it as a barge-in and cuts its in-flight audio.
            #
            # Subtlety: even after ``generation_complete`` on turn N, the
            # next ``activity_start`` opening turn N+1 is still treated as
            # a barge-in on the just-completed turn. The server emits a
            # spurious ``interrupted → turn_complete`` pair (with no model
            # output) BEFORE actually producing turn N+1's reply. The
            # ``recv_audio`` loop transparently skips that empty pair and
            # re-enters ``session.receive()`` to read the real reply.
            realtime_input_config=types.RealtimeInputConfig(
                automatic_activity_detection=types.AutomaticActivityDetection(
                    disabled=True,
                ),
            ),
            # Enable transcripts so the recv loop can populate
            # last_agent_transcript / chunk.transcript. Without these,
            # audio still flows but consumers (judge, manifest) get
            # no readable text.
            input_audio_transcription=types.AudioTranscriptionConfig(),
            output_audio_transcription=types.AudioTranscriptionConfig(),
        )

        client = genai.Client(api_key=self._api_key)

        async def _session_lifetime() -> None:
            """Hold the SDK context manager open; expose session via future."""
            try:
                async with client.aio.live.connect(
                    model=self.model, config=config
                ) as session:
                    if not session_future.done():
                        session_future.set_result(session)
                    assert self._session_ready is not None
                    self._session_ready.set()
                    # Stay alive until disconnect() fires the shutdown event.
                    assert self._shutdown is not None
                    await self._shutdown.wait()
            except Exception as exc:
                self._session_error = exc
                if not session_future.done():
                    session_future.set_exception(exc)
                assert self._session_ready is not None
                self._session_ready.set()  # unblock connect() even on error

        self._session_task = asyncio.create_task(_session_lifetime())

        # Wait until the session is ready (or errored).
        assert self._session_ready is not None
        await self._session_ready.wait()

        if self._session_error is not None:
            raise self._session_error

        self._session = await session_future
        self._recv_iter = None
        logger.debug("GeminiLiveAgentAdapter: connected model=%s", self.model)

    async def disconnect(self) -> None:
        """Close the Gemini Live session."""
        if self._recv_iter is not None:
            try:
                await self._recv_iter.aclose()  # type: ignore[attr-defined]
            except Exception:
                # Best-effort teardown: the iterator may already be
                # closed or in an invalid state during shutdown. Any
                # exception here is non-actionable since we're tearing
                # down anyway.
                pass
            self._recv_iter = None
        if self._shutdown is not None:
            self._shutdown.set()
        if self._session_task is not None:
            try:
                await asyncio.wait_for(self._session_task, timeout=5.0)
            except (asyncio.TimeoutError, Exception):
                # Timeout: task didn't finish in 5s — proceed with
                # teardown anyway, can't block disconnect indefinitely.
                # Other Exception: task error during shutdown is
                # non-actionable; we're discarding the session.
                pass
        self._session = None
        self._session_task = None
        self._session_ready = None
        self._shutdown = None
        self._session_error = None
        logger.debug("GeminiLiveAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn.

        Resamples from 24kHz → 16kHz at the wire boundary so the adapter
        speaks Gemini's expected ``audio/pcm;rate=16000`` format while the rest
        of the framework stays at the canonical 24kHz.

        Wraps the audio in explicit ``activity_start`` / ``activity_end``
        markers because we connect with Automatic Activity Detection
        disabled (see ``connect``). Each ``send_audio`` call is therefore a
        complete user turn from Gemini's perspective: it triggers the
        model to reply immediately on ``activity_end`` instead of waiting
        on its own VAD heuristic to detect end-of-speech. This is critical
        for the interrupt path — when the user barges in, we send a fresh
        turn boundary on top of the agent's in-flight reply, which Gemini
        treats as a deterministic interruption signal.
        """
        if self._session is None:
            raise RuntimeError("GeminiLiveAgentAdapter: not connected")
        from google.genai import types  # noqa: PLC0415

        pcm_16k = _resample_pcm16(chunk.data, CANONICAL_RATE, GEMINI_INPUT_RATE)
        if not pcm_16k:
            return
        # New user turn → reset transcript and the per-turn receive
        # iterator so the next ``recv_audio`` enters
        # ``session.receive()`` fresh for this turn.
        self._reset_turn_transcript()
        if self._recv_iter is not None:
            try:
                await self._recv_iter.aclose()  # type: ignore[attr-defined]
            except Exception:
                # Best-effort: prior turn's receive iterator may already be
                # closed or in an error state. We're resetting to start a new
                # turn — propagating here would block legitimate new turns.
                pass
            self._recv_iter = None
        await self._session.send_realtime_input(activity_start=types.ActivityStart())
        blob = types.Blob(
            data=pcm_16k,
            mime_type="audio/pcm;rate=16000",
        )
        await self._session.send_realtime_input(audio=blob)
        await self._session.send_realtime_input(activity_end=types.ActivityEnd())

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """Receive the next audio fragment from Gemini Live for the current turn.

        The SDK's ``session.receive()`` async generator yields messages
        for ONE model turn then stops at ``turn_complete``. We cache the
        per-turn iterator on ``self._recv_iter`` and reset it when the
        previous turn ended (StopAsyncIteration), so each user turn
        sent via ``send_audio`` can read its full reply across multiple
        ``recv_audio`` calls without us re-entering ``session.receive()``
        mid-turn (which would skip messages already buffered server-side).

        Returns the next non-empty audio chunk as soon as it arrives
        so the executor's ``_drain_agent_response`` can set
        ``_agent_speaking_event`` early — the interruption path depends
        on this.

        On ``turn_complete`` returns an empty AudioChunk so the drain
        loop's tail-silence path exits.

        Raises ``asyncio.TimeoutError`` if no chunk arrives within
        ``timeout`` seconds.
        """
        if self._session is None:
            raise RuntimeError("GeminiLiveAgentAdapter: not connected")
        if self._recv_iter is None:
            self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
            self._iter_had_audio = False

        async def _next_chunk() -> AudioChunk:
            pending_delta = ""
            # Local-to-call: detects the spurious empty-interrupt turn
            # pattern (server emits ``interrupted=True`` then
            # ``turn_complete=True`` with no audio at all when a fresh
            # ``activity_start`` arrives during turn N's post-
            # ``generation_complete`` playback delay). Combined with
            # ``self._iter_had_audio`` (iterator-scope) we can tell the
            # difference between a spurious turn (no audio ever, on this
            # iterator) and a real mid-reply interrupt (audio arrived
            # earlier on this iterator).
            saw_interrupted = False
            while True:
                try:
                    assert self._recv_iter is not None
                    message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
                except StopAsyncIteration:
                    # The previous turn ended (turn_complete already
                    # consumed). Surface end-of-turn to the drain loop
                    # and reset the iterator so the next user turn
                    # can re-enter session.receive() afresh.
                    self._recv_iter = None
                    return AudioChunk(
                        data=b"",
                        transcript=pending_delta or None,
                    )

                if message.go_away is not None:
                    raise RuntimeError(
                        f"GeminiLiveAgentAdapter: server sent go_away: {message.go_away}"
                    )

                sc = message.server_content
                if sc is None:
                    continue

                if getattr(sc, "interrupted", None):
                    saw_interrupted = True

                if sc.output_transcription is not None:
                    transcript_text = getattr(sc.output_transcription, "text", None)
                    if transcript_text:
                        pending_delta += transcript_text
                        existing = self.last_agent_transcript or ""
                        self.last_agent_transcript = existing + transcript_text

                if sc.model_turn is not None and sc.model_turn.parts:
                    audio_bytes = b""
                    for part in sc.model_turn.parts:
                        if part.inline_data is not None and part.inline_data.data:
                            audio_bytes += part.inline_data.data
                    if audio_bytes:
                        if len(audio_bytes) % 2 == 1:
                            audio_bytes = audio_bytes[:-1]
                        if audio_bytes:
                            self._iter_had_audio = True
                            return AudioChunk(
                                data=audio_bytes,
                                transcript=pending_delta or None,
                            )

                if sc.turn_complete:
                    # Spurious empty-interrupt turn? When activity_start
                    # opens turn N+1 after turn N's generation_complete,
                    # the server emits ``interrupted → turn_complete`` with
                    # no audio FIRST, then the real reply in a separate
                    # turn. Detect that pattern (saw interrupted=True, no
                    # audio on THIS iterator, no transcript) and re-enter
                    # ``session.receive()`` to read the actual reply.
                    #
                    # We gate on ``self._iter_had_audio`` (iterator-scope)
                    # rather than this call's audio: a real mid-reply
                    # interrupt earlier in the same turn would have yielded
                    # audio chunks before this point, even if THIS call sees
                    # only the trailing ``interrupted → turn_complete`` pair.
                    if (
                        saw_interrupted
                        and not self._iter_had_audio
                        and not pending_delta
                    ):
                        self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
                        self._iter_had_audio = False
                        saw_interrupted = False
                        continue
                    # Real end-of-turn — yield empty AudioChunk and reset
                    # the iterator. The next ``recv_audio`` call (for the
                    # next user turn) will re-enter ``session.receive()``.
                    self._recv_iter = None
                    return AudioChunk(
                        data=b"",
                        transcript=pending_delta or None,
                    )

        return await asyncio.wait_for(_next_chunk(), timeout=timeout)

    async def interrupt(self) -> None:
        """Drain leftover chunks from the in-flight agent turn so the
        recovery agent's ``recv_audio`` doesn't pick them up as a fake
        first reply.

        On Gemini Live, when we send a fresh ``activity_start`` (the
        next ``send_audio``) while the model is mid-reply, the server
        cuts its in-flight audio AND emits ``turn_complete`` for that
        cancelled turn. ``session.receive()`` is a one-turn generator,
        so the cancelled turn's tail messages still need to be consumed
        before the next ``session.receive()`` invocation can read the
        recovery turn cleanly. ``interrupt()`` consumes them up to that
        ``turn_complete`` and resets the cached iterator.

        Best-effort: bounded by 2 seconds so a stuck stream doesn't
        block the executor's interrupt sequence.
        """
        if self._session is None or self._recv_iter is None:
            return
        try:
            async with asyncio.timeout(2.0):
                while True:
                    try:
                        message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
                    except StopAsyncIteration:
                        break
                    sc = getattr(message, "server_content", None)
                    if sc is not None and sc.turn_complete:
                        break
        except asyncio.TimeoutError:
            # Bounded drain: if the server doesn't close out the turn within
            # 2s after we sent the activity_end, give up and proceed. The
            # finally block will still close the iterator.
            pass
        finally:
            try:
                await self._recv_iter.aclose()  # type: ignore[attr-defined]
            except Exception:
                # Best-effort close; the iterator may already be exhausted
                # or in an error state. Don't mask the original outcome.
                pass
            self._recv_iter = None

    def _reset_turn_transcript(self) -> None:
        """Clear the running transcript before each new agent turn.

        Called from ``send_audio`` so each turn starts fresh — otherwise
        the agent's first reply text would be permanently prefixed onto
        all subsequent turns' transcripts.
        """
        self.last_agent_transcript = None

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Methods

async def connect(self) ‑> None

Open a Gemini Live session.

Spawns a background task that holds the async with SDK context open for the adapter's lifetime. Returns once the session handshake is complete and audio can flow.

Expand source code
async def connect(self) -> None:
    """Open a Gemini Live session.

    Spawns a background task that holds the ``async with`` SDK context open
    for the adapter's lifetime.  Returns once the session handshake is
    complete and audio can flow.
    """
    from google import genai  # type: ignore[attr-defined]  # noqa: PLC0415 — lazy import
    from google.genai import types  # noqa: PLC0415

    self._session_ready = asyncio.Event()
    self._shutdown = asyncio.Event()
    loop = asyncio.get_running_loop()
    session_future: asyncio.Future[Any] = loop.create_future()

    config = types.LiveConnectConfig(
        response_modalities=[types.Modality.AUDIO],
        system_instruction=self.system_instruction or None,
        speech_config=types.SpeechConfig(
            voice_config=types.VoiceConfig(
                prebuilt_voice_config=types.PrebuiltVoiceConfig(
                    voice_name=self.voice,
                )
            )
        ),
        # Disable Automatic Activity Detection. AAD requires a clean
        # trailing silence to fire its end-of-speech detector, which is
        # unreliable across the audio shapes scenario produces (TTS'd
        # user-sim audio, scripted clips, layered interruptions). With
        # AAD off we drive turn boundaries explicitly via
        # ``activity_start`` / ``activity_end`` in ``send_audio``;
        # Gemini replies the moment we close the turn instead of
        # waiting on its own VAD heuristic. Activity handling is left
        # at its default (START_OF_ACTIVITY_INTERRUPTS): when we send
        # a new ``activity_start`` while Gemini is mid-reply, the
        # model treats it as a barge-in and cuts its in-flight audio.
        #
        # Subtlety: even after ``generation_complete`` on turn N, the
        # next ``activity_start`` opening turn N+1 is still treated as
        # a barge-in on the just-completed turn. The server emits a
        # spurious ``interrupted → turn_complete`` pair (with no model
        # output) BEFORE actually producing turn N+1's reply. The
        # ``recv_audio`` loop transparently skips that empty pair and
        # re-enters ``session.receive()`` to read the real reply.
        realtime_input_config=types.RealtimeInputConfig(
            automatic_activity_detection=types.AutomaticActivityDetection(
                disabled=True,
            ),
        ),
        # Enable transcripts so the recv loop can populate
        # last_agent_transcript / chunk.transcript. Without these,
        # audio still flows but consumers (judge, manifest) get
        # no readable text.
        input_audio_transcription=types.AudioTranscriptionConfig(),
        output_audio_transcription=types.AudioTranscriptionConfig(),
    )

    client = genai.Client(api_key=self._api_key)

    async def _session_lifetime() -> None:
        """Hold the SDK context manager open; expose session via future."""
        try:
            async with client.aio.live.connect(
                model=self.model, config=config
            ) as session:
                if not session_future.done():
                    session_future.set_result(session)
                assert self._session_ready is not None
                self._session_ready.set()
                # Stay alive until disconnect() fires the shutdown event.
                assert self._shutdown is not None
                await self._shutdown.wait()
        except Exception as exc:
            self._session_error = exc
            if not session_future.done():
                session_future.set_exception(exc)
            assert self._session_ready is not None
            self._session_ready.set()  # unblock connect() even on error

    self._session_task = asyncio.create_task(_session_lifetime())

    # Wait until the session is ready (or errored).
    assert self._session_ready is not None
    await self._session_ready.wait()

    if self._session_error is not None:
        raise self._session_error

    self._session = await session_future
    self._recv_iter = None
    logger.debug("GeminiLiveAgentAdapter: connected model=%s", self.model)
async def disconnect(self) ‑> None

Close the Gemini Live session.

Expand source code
async def disconnect(self) -> None:
    """Close the Gemini Live session."""
    if self._recv_iter is not None:
        try:
            await self._recv_iter.aclose()  # type: ignore[attr-defined]
        except Exception:
            # Best-effort teardown: the iterator may already be
            # closed or in an invalid state during shutdown. Any
            # exception here is non-actionable since we're tearing
            # down anyway.
            pass
        self._recv_iter = None
    if self._shutdown is not None:
        self._shutdown.set()
    if self._session_task is not None:
        try:
            await asyncio.wait_for(self._session_task, timeout=5.0)
        except (asyncio.TimeoutError, Exception):
            # Timeout: task didn't finish in 5s — proceed with
            # teardown anyway, can't block disconnect indefinitely.
            # Other Exception: task error during shutdown is
            # non-actionable; we're discarding the session.
            pass
    self._session = None
    self._session_task = None
    self._session_ready = None
    self._shutdown = None
    self._session_error = None
    logger.debug("GeminiLiveAgentAdapter: disconnected")
async def interrupt(self) ‑> None

Drain leftover chunks from the in-flight agent turn so the recovery agent's recv_audio doesn't pick them up as a fake first reply.

On Gemini Live, when we send a fresh activity_start (the next send_audio) while the model is mid-reply, the server cuts its in-flight audio AND emits turn_complete for that cancelled turn. session.receive() is a one-turn generator, so the cancelled turn's tail messages still need to be consumed before the next session.receive() invocation can read the recovery turn cleanly. interrupt() consumes them up to that turn_complete and resets the cached iterator.

Best-effort: bounded by 2 seconds so a stuck stream doesn't block the executor's interrupt sequence.

Expand source code
async def interrupt(self) -> None:
    """Drain leftover chunks from the in-flight agent turn so the
    recovery agent's ``recv_audio`` doesn't pick them up as a fake
    first reply.

    On Gemini Live, when we send a fresh ``activity_start`` (the
    next ``send_audio``) while the model is mid-reply, the server
    cuts its in-flight audio AND emits ``turn_complete`` for that
    cancelled turn. ``session.receive()`` is a one-turn generator,
    so the cancelled turn's tail messages still need to be consumed
    before the next ``session.receive()`` invocation can read the
    recovery turn cleanly. ``interrupt()`` consumes them up to that
    ``turn_complete`` and resets the cached iterator.

    Best-effort: bounded by 2 seconds so a stuck stream doesn't
    block the executor's interrupt sequence.
    """
    if self._session is None or self._recv_iter is None:
        return
    try:
        async with asyncio.timeout(2.0):
            while True:
                try:
                    message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
                except StopAsyncIteration:
                    break
                sc = getattr(message, "server_content", None)
                if sc is not None and sc.turn_complete:
                    break
    except asyncio.TimeoutError:
        # Bounded drain: if the server doesn't close out the turn within
        # 2s after we sent the activity_end, give up and proceed. The
        # finally block will still close the iterator.
        pass
    finally:
        try:
            await self._recv_iter.aclose()  # type: ignore[attr-defined]
        except Exception:
            # Best-effort close; the iterator may already be exhausted
            # or in an error state. Don't mask the original outcome.
            pass
        self._recv_iter = None
async def recv_audio(self, timeout: float) ‑> AudioChunk

Receive the next audio fragment from Gemini Live for the current turn.

The SDK's session.receive() async generator yields messages for ONE model turn then stops at turn_complete. We cache the per-turn iterator on self._recv_iter and reset it when the previous turn ended (StopAsyncIteration), so each user turn sent via send_audio can read its full reply across multiple recv_audio calls without us re-entering session.receive() mid-turn (which would skip messages already buffered server-side).

Returns the next non-empty audio chunk as soon as it arrives so the executor's _drain_agent_response can set _agent_speaking_event early — the interruption path depends on this.

On turn_complete returns an empty AudioChunk so the drain loop's tail-silence path exits.

Raises asyncio.TimeoutError if no chunk arrives within timeout seconds.

Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """Receive the next audio fragment from Gemini Live for the current turn.

    The SDK's ``session.receive()`` async generator yields messages
    for ONE model turn then stops at ``turn_complete``. We cache the
    per-turn iterator on ``self._recv_iter`` and reset it when the
    previous turn ended (StopAsyncIteration), so each user turn
    sent via ``send_audio`` can read its full reply across multiple
    ``recv_audio`` calls without us re-entering ``session.receive()``
    mid-turn (which would skip messages already buffered server-side).

    Returns the next non-empty audio chunk as soon as it arrives
    so the executor's ``_drain_agent_response`` can set
    ``_agent_speaking_event`` early — the interruption path depends
    on this.

    On ``turn_complete`` returns an empty AudioChunk so the drain
    loop's tail-silence path exits.

    Raises ``asyncio.TimeoutError`` if no chunk arrives within
    ``timeout`` seconds.
    """
    if self._session is None:
        raise RuntimeError("GeminiLiveAgentAdapter: not connected")
    if self._recv_iter is None:
        self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
        self._iter_had_audio = False

    async def _next_chunk() -> AudioChunk:
        pending_delta = ""
        # Local-to-call: detects the spurious empty-interrupt turn
        # pattern (server emits ``interrupted=True`` then
        # ``turn_complete=True`` with no audio at all when a fresh
        # ``activity_start`` arrives during turn N's post-
        # ``generation_complete`` playback delay). Combined with
        # ``self._iter_had_audio`` (iterator-scope) we can tell the
        # difference between a spurious turn (no audio ever, on this
        # iterator) and a real mid-reply interrupt (audio arrived
        # earlier on this iterator).
        saw_interrupted = False
        while True:
            try:
                assert self._recv_iter is not None
                message = await self._recv_iter.__anext__()  # type: ignore[union-attr]
            except StopAsyncIteration:
                # The previous turn ended (turn_complete already
                # consumed). Surface end-of-turn to the drain loop
                # and reset the iterator so the next user turn
                # can re-enter session.receive() afresh.
                self._recv_iter = None
                return AudioChunk(
                    data=b"",
                    transcript=pending_delta or None,
                )

            if message.go_away is not None:
                raise RuntimeError(
                    f"GeminiLiveAgentAdapter: server sent go_away: {message.go_away}"
                )

            sc = message.server_content
            if sc is None:
                continue

            if getattr(sc, "interrupted", None):
                saw_interrupted = True

            if sc.output_transcription is not None:
                transcript_text = getattr(sc.output_transcription, "text", None)
                if transcript_text:
                    pending_delta += transcript_text
                    existing = self.last_agent_transcript or ""
                    self.last_agent_transcript = existing + transcript_text

            if sc.model_turn is not None and sc.model_turn.parts:
                audio_bytes = b""
                for part in sc.model_turn.parts:
                    if part.inline_data is not None and part.inline_data.data:
                        audio_bytes += part.inline_data.data
                if audio_bytes:
                    if len(audio_bytes) % 2 == 1:
                        audio_bytes = audio_bytes[:-1]
                    if audio_bytes:
                        self._iter_had_audio = True
                        return AudioChunk(
                            data=audio_bytes,
                            transcript=pending_delta or None,
                        )

            if sc.turn_complete:
                # Spurious empty-interrupt turn? When activity_start
                # opens turn N+1 after turn N's generation_complete,
                # the server emits ``interrupted → turn_complete`` with
                # no audio FIRST, then the real reply in a separate
                # turn. Detect that pattern (saw interrupted=True, no
                # audio on THIS iterator, no transcript) and re-enter
                # ``session.receive()`` to read the actual reply.
                #
                # We gate on ``self._iter_had_audio`` (iterator-scope)
                # rather than this call's audio: a real mid-reply
                # interrupt earlier in the same turn would have yielded
                # audio chunks before this point, even if THIS call sees
                # only the trailing ``interrupted → turn_complete`` pair.
                if (
                    saw_interrupted
                    and not self._iter_had_audio
                    and not pending_delta
                ):
                    self._recv_iter = self._session.receive().__aiter__()  # type: ignore[union-attr]
                    self._iter_had_audio = False
                    saw_interrupted = False
                    continue
                # Real end-of-turn — yield empty AudioChunk and reset
                # the iterator. The next ``recv_audio`` call (for the
                # next user turn) will re-enter ``session.receive()``.
                self._recv_iter = None
                return AudioChunk(
                    data=b"",
                    transcript=pending_delta or None,
                )

    return await asyncio.wait_for(_next_chunk(), timeout=timeout)
async def send_audio(self, chunk: AudioChunk) ‑> None

Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn.

Resamples from 24kHz → 16kHz at the wire boundary so the adapter speaks Gemini's expected audio/pcm;rate=16000 format while the rest of the framework stays at the canonical 24kHz.

Wraps the audio in explicit activity_start / activity_end markers because we connect with Automatic Activity Detection disabled (see connect). Each send_audio call is therefore a complete user turn from Gemini's perspective: it triggers the model to reply immediately on activity_end instead of waiting on its own VAD heuristic to detect end-of-speech. This is critical for the interrupt path — when the user barges in, we send a fresh turn boundary on top of the agent's in-flight reply, which Gemini treats as a deterministic interruption signal.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn.

    Resamples from 24kHz → 16kHz at the wire boundary so the adapter
    speaks Gemini's expected ``audio/pcm;rate=16000`` format while the rest
    of the framework stays at the canonical 24kHz.

    Wraps the audio in explicit ``activity_start`` / ``activity_end``
    markers because we connect with Automatic Activity Detection
    disabled (see ``connect``). Each ``send_audio`` call is therefore a
    complete user turn from Gemini's perspective: it triggers the
    model to reply immediately on ``activity_end`` instead of waiting
    on its own VAD heuristic to detect end-of-speech. This is critical
    for the interrupt path — when the user barges in, we send a fresh
    turn boundary on top of the agent's in-flight reply, which Gemini
    treats as a deterministic interruption signal.
    """
    if self._session is None:
        raise RuntimeError("GeminiLiveAgentAdapter: not connected")
    from google.genai import types  # noqa: PLC0415

    pcm_16k = _resample_pcm16(chunk.data, CANONICAL_RATE, GEMINI_INPUT_RATE)
    if not pcm_16k:
        return
    # New user turn → reset transcript and the per-turn receive
    # iterator so the next ``recv_audio`` enters
    # ``session.receive()`` fresh for this turn.
    self._reset_turn_transcript()
    if self._recv_iter is not None:
        try:
            await self._recv_iter.aclose()  # type: ignore[attr-defined]
        except Exception:
            # Best-effort: prior turn's receive iterator may already be
            # closed or in an error state. We're resetting to start a new
            # turn — propagating here would block legitimate new turns.
            pass
        self._recv_iter = None
    await self._session.send_realtime_input(activity_start=types.ActivityStart())
    blob = types.Blob(
        data=pcm_16k,
        mime_type="audio/pcm;rate=16000",
    )
    await self._session.send_realtime_input(audio=blob)
    await self._session.send_realtime_input(activity_end=types.ActivityEnd())

Inherited members

class LiveKitAgentAdapter (url: str, api_key: str, api_secret: str, room: str)

Abstract base for voice agents that exchange audio with the agent under test.

Subclasses implement connect, disconnect, send_audio, and recv_audio. The default call implementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.

Attributes

capabilities
Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout

Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.

60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.

Override per-adapter at construction time::

adapter = MyVoiceAdapter()
adapter.response_timeout = 90.0  # slow tool-call chain
Expand source code
class LiveKitAgentAdapter(VoiceAgentAdapter):
    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        input_formats=["pcm16/48000"],
        output_formats=["pcm16/48000"],
    )

    def __init__(self, url: str, api_key: str, api_secret: str, room: str):
        super().__init__()
        self.url = url
        self.api_key = api_key
        self.api_secret = api_secret
        self.room = room
        self._room: Optional[object] = None

    def __repr__(self) -> str:  # redact credentials
        return f"LiveKitAgentAdapter(url={self.url!r}, room={self.room!r}, api_key='***', api_secret='***')"

    async def connect(self) -> None:
        self._room = object()

    async def disconnect(self) -> None:
        self._room = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._room is None:
            raise RuntimeError("LiveKitAgentAdapter: not connected")
        raise PendingTransportError("LiveKitAgentAdapter")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._room is None:
            raise RuntimeError("LiveKitAgentAdapter: not connected")
        raise PendingTransportError("LiveKitAgentAdapter")

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class OpenAIRealtimeAgentAdapter (model: str = 'gpt-realtime-mini', voice: str = 'alloy', instructions: str = '', tools: Optional[List[Any]] = None, *, api_key: Optional[str] = None, role: AgentRole = AgentRole.AGENT, speaks_first: bool = False)

Exercise OpenAI's Realtime API as either the agent under test (role=AGENT, default) or as the voice-enabled user simulator (role=USER, per §7.2 L1164-1171).

When role=USER, scripted user("text") steps route text through the realtime session's text-input channel rather than triggering TTS.

Transcript observability: - last_user_transcript — set from conversation.item.input_audio_transcription.completed - last_agent_transcript — accumulated from response.output_audio_transcript.delta / reset on done

Example::

adapter = OpenAIRealtimeAgentAdapter(
    model=OPENAI_REALTIME_MODEL,
    voice="alloy",
    instructions="You are a helpful assistant.",
)
async with adapter:
    # scenario.run() feeds send_audio / recv_audio ...
Expand source code
class OpenAIRealtimeAgentAdapter(VoiceAgentAdapter):
    """
    Exercise OpenAI's Realtime API as either the agent under test
    (role=AGENT, default) or as the voice-enabled user simulator
    (role=USER, per §7.2 L1164-1171).

    When role=USER, scripted ``user("text")`` steps route text through the
    realtime session's text-input channel rather than triggering TTS.

    Transcript observability:
        - ``last_user_transcript`` — set from
          ``conversation.item.input_audio_transcription.completed``
        - ``last_agent_transcript`` — accumulated from
          ``response.output_audio_transcript.delta`` / reset on done

    Example::

        adapter = OpenAIRealtimeAgentAdapter(
            model=OPENAI_REALTIME_MODEL,
            voice="alloy",
            instructions="You are a helpful assistant.",
        )
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # OpenAI Realtime exposes ``response.cancel`` as a first-class
        # interrupt event — the model stops generating immediately. Mapped
        # below in ``interrupt()``.
        interruption=True,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        model: str = OPENAI_REALTIME_MODEL,
        voice: str = "alloy",
        instructions: str = "",
        tools: Optional[List[Any]] = None,
        *,
        api_key: Optional[str] = None,
        role: AgentRole = AgentRole.AGENT,
        speaks_first: bool = False,
    ):
        super().__init__()
        self.model = model
        self.voice = voice
        self.instructions = instructions
        self.tools = tools or []
        self.role = role  # type: ignore[misc]
        # Resolve API key: explicit param takes precedence over env var.
        self._api_key: str = api_key or os.environ.get("OPENAI_API_KEY", "")
        self._ws: Any = None

        # Transcript observability — updated on incoming transcript events.
        self.last_user_transcript: Optional[str] = None
        self.last_agent_transcript: Optional[str] = None

        # Accumulation buffer for streaming agent transcript deltas.
        self._agent_transcript_buf: str = ""

        # Bytes appended to input_audio_buffer since last commit. Non-zero
        # means recv_audio should commit + request a response before awaiting.
        self._pending_audio_bytes: int = 0

        # Tracks which legacy (pre-GA) event names have already triggered a
        # one-time warning, so the log isn't spammed on every audio frame.
        self._legacy_events_warned: set[str] = set()

        # --- Gap 1: agent-speaks-first support ---
        # When speaks_first=True, the adapter is configured for an agent-initiated
        # scenario where the agent must speak first without any user audio.
        self._speaks_first: bool = speaks_first

        # True while a response is in flight (set on response.created, cleared on
        # response.done / response.cancelled). Prevents double-firing response.create
        # and allows drain re-entries after a completed response to return empty
        # (clean drain exit).
        self._response_active: bool = False

        # Set to True the first time response.created is received. Guards the
        # drain re-entry short-circuit in recv_audio: the empty-chunk early-return
        # must only fire AFTER at least one response has been active and completed,
        # not on the very first recv_audio call (which would break direct-call
        # tests that don't go through notify_agent_turn).
        self._response_ever_active: bool = False

        # Per-turn signal: set by notify_agent_turn() before each agent step so
        # recv_audio knows this is a genuine agent-initiated turn (not a silent
        # proceed()/resume). Consumed (cleared) when the kick fires.
        self._agent_turn_pending: bool = speaks_first  # first turn armed if speaks_first

        # --- Issue #630: realtime function-call (tool-call) surfacing ---
        # In-progress function calls keyed by call_id. Each value is a dict
        # {"name": str|None, "arguments": str} assembled from the streaming
        # `response.function_call_arguments.delta`/`.done` events and the
        # `response.output_item.added`/`.done` (function_call item) events.
        # recv_audio() returns an AudioChunk, so it cannot carry tool calls in
        # its return value — instead the function-call branches accumulate here
        # and the overridden call() drains them into result.messages.
        self._tool_call_accumulators: dict[str, dict[str, Any]] = {}
        # Finalized OpenAI-chat tool_call dicts for the CURRENT turn, in arrival
        # order. Cleared at the start of each call() so calls never leak across
        # turns (mirrors the transcript-state reset in _drain_agent_response).
        # Shape per entry:
        #   {"id": call_id, "type": "function",
        #    "function": {"name": name, "arguments": <json-str>}}
        self._completed_tool_calls: List[dict[str, Any]] = []

    @property
    def url(self) -> str:
        return REALTIME_URL_TEMPLATE.format(model=self.model)

    def __repr__(self) -> str:  # redact credentials
        return (
            f"OpenAIRealtimeAgentAdapter("
            f"model={self.model!r}, "
            f"voice={self.voice!r}, "
            f"role={self.role!r}, "
            f"api_key='***')"
        )

    def _warn_if_legacy(self, received: str, ga_name: str) -> None:
        """Emit a one-time WARNING when a pre-GA (beta) event name is seen.

        Only fires once per distinct legacy name per adapter instance, so a
        multi-chunk audio response doesn't flood the log. The GA-named event
        itself never triggers a warning.
        """
        if received != ga_name and received not in self._legacy_events_warned:
            self._legacy_events_warned.add(received)
            logger.warning(
                "OpenAIRealtimeAgentAdapter: received legacy event %r; "
                "GA uses %r — accepting defensively",
                received,
                ga_name,
            )

    # ------------------------------------------------------------- tool calls
    # Issue #630: surface OpenAI Realtime function-call events as OpenAI-chat
    # tool_calls on the assistant message in result.messages. The Realtime wire
    # describes ONE logical call across several events
    # (`response.function_call_arguments.delta` × N → `.done`, and/or
    # `response.output_item.added`/`.done` carrying the function_call item).
    # We accumulate per call_id, then finalize into self._completed_tool_calls,
    # de-duplicating so each call_id yields exactly one tool_call (AC6).

    def _accumulate_tool_call_delta(self, call_id: str, delta: str) -> None:
        """Append a streaming arguments fragment for ``call_id``."""
        acc = self._tool_call_accumulators.setdefault(
            call_id, {"name": None, "arguments": ""}
        )
        acc["arguments"] = (acc["arguments"] or "") + (delta or "")

    def _note_tool_call_name(self, call_id: str, name: Optional[str]) -> None:
        """Record the function name for ``call_id`` (the item/added event)."""
        if not name:
            return
        acc = self._tool_call_accumulators.setdefault(
            call_id, {"name": None, "arguments": ""}
        )
        acc["name"] = name

    def _finalize_tool_call(
        self,
        call_id: Optional[str],
        *,
        name: Optional[str] = None,
        arguments: Optional[str] = None,
    ) -> None:
        """Resolve one logical function call into ``self._completed_tool_calls``.

        Idempotent on ``call_id`` (AC6): the streaming-args path and the
        output-item path both describe the SAME call, so a second finalize for
        an already-completed ``call_id`` MERGES (fills a missing name / upgrades
        to a more complete arguments string) rather than appending a duplicate.

        Degrades safely (AC7): a finalize with NO ``call_id`` is skipped with a
        DEBUG log and emits nothing. Missing arguments become ``"{}"``; a
        malformed (non-JSON) arguments string is passed through verbatim — the
        adapter never parses-and-reraises.
        """
        if not call_id:
            logger.debug(
                "OpenAIRealtimeAgentAdapter: function-call event with no "
                "call_id; skipping (AC7 degraded path)"
            )
            return

        acc = self._tool_call_accumulators.get(call_id, {})
        resolved_name = name or acc.get("name")
        # Pick the most complete arguments source: explicit (item/done) arg if
        # non-empty, else the accumulated streaming deltas, else "{}".
        candidates = [arguments, acc.get("arguments")]
        resolved_args = next(
            (c for c in candidates if c is not None and c != ""), None
        )
        if resolved_args is None:
            resolved_args = "{}"

        # De-dup / merge on call_id (AC6).
        for existing in self._completed_tool_calls:
            if existing["id"] == call_id:
                fn = existing["function"]
                if not fn.get("name") and resolved_name:
                    fn["name"] = resolved_name
                # Upgrade to a longer/more-complete arguments string if the new
                # source carries more (item arguments arriving after deltas).
                if resolved_args not in ("", "{}") and len(resolved_args) > len(
                    fn.get("arguments") or ""
                ):
                    fn["arguments"] = resolved_args
                return

        self._completed_tool_calls.append(
            {
                "id": call_id,
                "type": "function",
                "function": {
                    "name": resolved_name or "",
                    "arguments": resolved_args,
                },
            }
        )

    def notify_agent_turn(self) -> None:
        """Signal that an agent turn is about to be dispatched.

        Called by the executor before each agent step so recv_audio can fire
        a bare response.create for agent-initiated turns (where no user audio
        has been committed). This per-turn signal handles both turn 1 (opening)
        and subsequent agent turns in multi-turn scripts like [agent(), user(),
        agent()].

        Only meaningful when role=AGENT. Safe to call on every agent step —
        recv_audio consumes and clears the flag.
        """
        if self.role == AgentRole.AGENT:
            self._agent_turn_pending = True

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open the Realtime WebSocket and send the initial session.update."""
        import websockets

        self._ws = await websockets.connect(
            self.url,
            additional_headers={
                "Authorization": f"Bearer {self._api_key}",
            },
        )
        logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url)

        # Configure session: audio formats, voice, instructions, tools.
        # Disable server-side VAD (session.audio.input.turn_detection=None) so
        # we control turn boundaries explicitly via input_audio_buffer.commit +
        # response.create after each send_audio.
        session_config: dict[str, Any] = {
            "type": "realtime",
            "audio": {
                "input": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "turn_detection": None,
                    "transcription": {"model": OPENAI_STT_MODEL},
                },
                "output": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "voice": self.voice,
                },
            },
        }
        if self.instructions:
            session_config["instructions"] = self.instructions
        if self.tools:
            session_config["tools"] = self.tools

        await self._ws.send(
            json.dumps({"type": "session.update", "session": session_config})
        )
        logger.debug("OpenAIRealtimeAgentAdapter: session.update sent")

    async def disconnect(self) -> None:
        """Close the WebSocket if open."""
        if self._ws is not None:
            try:
                await self._ws.close()
            except Exception:
                # Best-effort: connection may already be half-closed or in an
                # error state when disconnect() is called. We're tearing down
                # regardless — propagating here would just leak the WS reference.
                pass
            finally:
                self._ws = None
            logger.debug("OpenAIRealtimeAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """
        Append a PCM16 audio chunk to the model's input audio buffer.

        Only emits ``input_audio_buffer.append`` — the commit + response are
        deferred to the next ``recv_audio`` call. The scenario executor may
        call ``send_audio`` many times for a single user turn (TTS streams
        audio as chunks); committing per-chunk would confuse the server with
        sub-second turn boundaries. By deferring commit to recv_audio, we
        get one server turn per user turn.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        b64 = base64.b64encode(chunk.data).decode()
        await self._ws.send(
            json.dumps({"type": "input_audio_buffer.append", "audio": b64})
        )
        self._pending_audio_bytes += len(chunk.data)

    async def interrupt(self) -> None:
        """Send ``response.cancel`` — the OpenAI Realtime API's first-class
        interrupt. The model stops generating audio and text immediately.
        No timing race against VAD: deterministic stop, then the next user
        turn flows normally through ``send_audio`` + ``recv_audio``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        await self._ws.send(json.dumps({"type": "response.cancel"}))
        logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Commit any pending audio, request a response, and return the first
        audio chunk the model produces.

        If ``send_audio`` was called since the last ``recv_audio``, this
        method commits the buffer and emits ``response.create`` before
        awaiting the reply. Subsequent recv calls without new send calls
        just await the next audio delta (for multi-chunk responses).

        Loops over incoming events until a ``response.output_audio.delta``
        event arrives (GA name), then returns decoded PCM16. The legacy
        ``response.audio.delta`` name is accepted defensively with a one-time
        warning. Transcript events update the instance's
        ``last_user_transcript`` / ``last_agent_transcript`` attributes.
        An ``error`` event raises a ``RuntimeError``. All other housekeeping
        events are ignored and the loop continues.

        Raises:
            asyncio.TimeoutError: if no audio arrives within ``timeout``.
            RuntimeError: if the server sends an error event.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # If send_audio was called since last recv, commit and request response.
        if self._pending_audio_bytes > 0:
            await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
            await self._ws.send(json.dumps({"type": "response.create"}))
            self._pending_audio_bytes = 0
            self._agent_turn_pending = False  # user spoke → per-turn signal consumed

        # Gap 1: agent-speaks-first / multi-turn agent initiation.
        # When the executor signals an agent turn via notify_agent_turn() and
        # no user audio has been committed and no response is already in flight,
        # send a bare content-free response.create so the model speaks.
        # Opening words come from the session instructions — no text is injected.
        # Self-limiting on user-first scripts: if user audio was committed above,
        # _agent_turn_pending was already cleared.
        # Also serves as the clean drain exit: if a response already completed
        # (_response_active is False after response.done cleared it) and
        # _agent_turn_pending is False, a drain re-entry returns an empty chunk.
        elif self._agent_turn_pending and not self._response_active:
            await self._ws.send(json.dumps({"type": "response.create"}))
            self._agent_turn_pending = False
            self._response_active = True

        elif not self._agent_turn_pending and not self._response_active and self._response_ever_active:
            # No pending audio, no agent-turn signal, no response in flight,
            # AND at least one response has already completed this session:
            # this is a drain re-entry after a completed response. Return empty
            # chunk so _drain_agent_response's tail-silence loop exits cleanly.
            # Guard on _response_ever_active so that a fresh recv_audio call
            # (before any response.created fires) does NOT short-circuit —
            # that would break direct recv_audio callers (e.g. unit tests that
            # call recv_audio without going through notify_agent_turn).
            return AudioChunk(data=b"")

        deadline = asyncio.get_running_loop().time() + timeout
        while True:
            remaining = deadline - asyncio.get_running_loop().time()
            if remaining <= 0:
                raise asyncio.TimeoutError(
                    "OpenAIRealtimeAgentAdapter: recv_audio timed out"
                )

            raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            try:
                event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
            except Exception:
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: non-JSON message, skipping"
                )
                continue

            etype = event.get("type", "")

            if etype in ("response.output_audio.delta", "response.audio.delta"):
                # Accept both the GA event name and its retired beta alias —
                # live gpt-realtime* models have been observed still emitting
                # the beta names. These legacy arms should be removed once the
                # GA names are confirmed stable at a live endpoint (issue #602).
                self._warn_if_legacy(etype, "response.output_audio.delta")
                b64 = event.get("delta", "")
                pcm = base64.b64decode(b64)
                # Enforce PCM16 invariant: even byte count.
                if len(pcm) % 2 == 1:
                    pcm = pcm[:-1]
                return AudioChunk(data=pcm)

            elif etype == "response.created":
                # Response is now in flight — mark it so subsequent recv_audio
                # drain re-entries don't fire a spurious second response.create.
                self._response_active = True
                self._response_ever_active = True

            elif etype in (
                "response.output_audio_transcript.delta",
                "response.audio_transcript.delta",
            ):
                # Accumulate streaming agent transcript.
                self._warn_if_legacy(etype, "response.output_audio_transcript.delta")
                self._agent_transcript_buf += event.get("delta", "")

            elif etype in (
                "response.output_audio_transcript.done",
                "response.audio_transcript.done",
            ):
                # Finalise; the `transcript` field may have the full text.
                self._warn_if_legacy(etype, "response.output_audio_transcript.done")
                transcript = event.get("transcript", "")
                if transcript:
                    self.last_agent_transcript = transcript
                elif self._agent_transcript_buf:
                    self.last_agent_transcript = self._agent_transcript_buf
                self._agent_transcript_buf = ""

            elif etype in ("response.done", "response.cancelled"):
                # Response finished or was cancelled — mark it so the next
                # drain re-entry returns an empty chunk (clean exit).
                self._response_active = False
                # Issue #646: a tool-only turn (function call, NO audio delta) would
                # otherwise loop here to the deadline and raise — the accumulated tool
                # call is parsed but never returned. When the response is done and at
                # least one tool call has been finalized this turn, return an empty
                # chunk so the drain exits cleanly and call() surfaces the tool_calls
                # message. A genuinely empty turn (done + EMPTY accumulator) must still
                # fall through to the timeout — the non-empty accumulator is the
                # discriminator, NOT response.done alone.
                # (Distinct from the drain-re-entry empty-chunk path above at the
                # _response_ever_active guard, which handles a COMPLETED prior response.)
                if self._completed_tool_calls:
                    return AudioChunk(data=b"")

            elif etype == "conversation.item.input_audio_transcription.completed":
                # User-side transcript from Whisper.
                self.last_user_transcript = event.get("transcript", "")

            elif etype == "response.function_call_arguments.delta":
                # Issue #630: streaming arguments fragment for a function call.
                # Accumulate per call_id; the call is finalized on `.done` or
                # the function_call output-item event.
                self._accumulate_tool_call_delta(
                    event.get("call_id", ""), event.get("delta", "")
                )

            elif etype == "response.function_call_arguments.done":
                # Issue #630: streaming-args path complete. `name` is typically
                # NOT on this event (it arrives via the output_item), so resolve
                # it from the accumulator. A missing call_id degrades safely.
                self._finalize_tool_call(
                    event.get("call_id"),
                    name=event.get("name"),
                    arguments=event.get("arguments"),
                )

            elif etype in ("response.output_item.added", "response.output_item.done"):
                # Issue #630: the output-item form of a function call carries the
                # authoritative `name` + `call_id` (and, on `.done`, the full
                # `arguments`). For non-function items (e.g. an audio message
                # item) this is benign housekeeping — fall through silently.
                item = event.get("item") or {}
                if isinstance(item, dict) and item.get("type") == "function_call":
                    call_id = item.get("call_id")
                    name = item.get("name")
                    if etype == "response.output_item.added":
                        # Shell arrives before args stream — record the name so a
                        # later delta/done can attach to it. Do not finalize yet.
                        self._note_tool_call_name(call_id or "", name)
                    else:
                        # `.done`: authoritative full call. Finalize (idempotent
                        # on call_id — merges with any streaming-args entry, AC6).
                        self._finalize_tool_call(
                            call_id, name=name, arguments=item.get("arguments")
                        )
                else:
                    logger.debug(
                        "OpenAIRealtimeAgentAdapter: ignoring non-function "
                        "output_item event %r",
                        etype,
                    )

            elif etype == "error":
                error_detail = event.get("error", {})
                msg = error_detail.get("message", str(error_detail))
                raise RuntimeError(
                    f"OpenAIRealtimeAgentAdapter: server error — {msg}"
                )

            else:
                # Housekeeping events — session.created, session.updated, etc. —
                # are benign. Log at DEBUG and keep the loop running.
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype
                )

    async def call(self, input: AgentInput) -> AgentReturnTypes:
        """Surface realtime tool calls alongside the spoken audio turn (#630).

        The base ``call()`` returns a single assistant audio message and does
        all the recording bookkeeping. We keep that intact and, when the agent
        called any tools this turn, append ONE extra assistant message carrying
        every tool call as OpenAI-chat ``tool_calls`` — the shape
        ``state.has_tool_call`` / ``state.last_tool_call`` consume.

        Returns:
            - the single audio message (dict) when no tools were called — byte
              identical to the base behaviour (AC8 regression), OR
            - ``[audio_message, tool_call_message]`` when ≥1 tool was called
              (AC1/AC9/AC10). ``convert_agent_return_types_to_openai_messages``
              passes a list of dicts through verbatim into result.messages.

        The completed-calls list is reset HERE (turn start) so tool calls never
        leak across turns; the function-call events for THIS turn are consumed
        inside the ``super().call()`` drain and finalized onto the list.
        """
        # Reset per-turn tool-call state so a prior turn's calls don't bleed
        # through (mirrors the transcript reset in _drain_agent_response).
        self._completed_tool_calls = []
        self._tool_call_accumulators = {}

        audio_message = await super().call(input)

        if not self._completed_tool_calls:
            return audio_message

        # One assistant message carrying ALL of this turn's tool calls — the
        # conventional OpenAI shape (one assistant turn, many tool_calls).
        tool_call_message: ChatCompletionMessageParam = cast(
            ChatCompletionMessageParam,
            {
                "role": "assistant",
                "content": None,
                "tool_calls": list(self._completed_tool_calls),
            },
        )
        return [cast(ChatCompletionMessageParam, audio_message), tool_call_message]

    async def _drain_agent_response(
        self, on_first_chunk=None
    ) -> "AudioChunk":
        """Override to surface the spoken transcript after draining.

        The base class drains recv_audio chunks until tail silence. After
        draining, ``response.output_audio_transcript.done`` will have already
        fired (it arrives before ``response.done``), so ``self.last_agent_transcript``
        is populated. We rebuild the merged chunk with ``transcript=`` set so
        ``create_audio_message`` attaches a text part to the assistant message.

        This puts the transcript in ``result.messages`` (AC2/AC5) without
        modifying messages.py, tts.py, or composable.py (AC6).

        The transcript text is an ordinary ``{"type":"text","text":...}`` part —
        no extra keys. Echo-safety is handled in ``_strip_audio_content``
        (user_simulator_agent.py) by detecting the structural pattern: an
        assistant message that carries both an ``input_audio`` part AND a ``text``
        part is a voiced agent turn, so the text is reframed as third-person
        context before ``reverse_roles`` runs (AC4/AC11).
        """
        from ..audio_chunk import AudioChunk as _AudioChunk

        # Clear transcript state at the start of each turn so a stale value
        # from a prior turn doesn't bleed through if this turn's event never
        # fires (AC8: degraded case — absent transcript → no text part).
        self.last_agent_transcript = None
        self._agent_transcript_buf = ""

        merged = await super()._drain_agent_response(on_first_chunk=on_first_chunk)

        transcript = self.last_agent_transcript
        if transcript:
            # Rebuild with transcript attached so create_audio_message adds
            # the text part. The original merged.data is unchanged.
            return _AudioChunk(data=merged.data, transcript=transcript)
        # No transcript (AC8): return the merge unchanged (audio-only).
        return merged

    async def send_text(self, text: str) -> None:
        """
        Inject scripted text into the realtime session as a user message.

        Used when this adapter is the user simulator (role=USER): scripted
        ``user("text")`` steps route through here instead of spawning TTS.
        The model synthesises the text into spoken audio with natural prosody,
        which is then delivered via ``recv_audio``.

        NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio
        messages retroactively; the downstream transcript reflects what the
        model actually emitted, not what was scripted.

        Raises:
            RuntimeError: if called before ``connect()``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # Create a user conversation item with the scripted text.
        await self._ws.send(
            json.dumps(
                {
                    "type": "conversation.item.create",
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": [{"type": "input_text", "text": text}],
                    },
                }
            )
        )
        # Prompt the model to generate audio output.
        await self._ws.send(json.dumps({"type": "response.create"}))
        logger.debug(
            "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60]
        )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var url : str
Expand source code
@property
def url(self) -> str:
    return REALTIME_URL_TEMPLATE.format(model=self.model)

Methods

async def call(self, input: AgentInput) ‑> str | openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam | List[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam] | ScenarioResult

Surface realtime tool calls alongside the spoken audio turn (#630).

The base call() returns a single assistant audio message and does all the recording bookkeeping. We keep that intact and, when the agent called any tools this turn, append ONE extra assistant message carrying every tool call as OpenAI-chat tool_calls — the shape state.has_tool_call / state.last_tool_call consume.

Returns

  • the single audio message (dict) when no tools were called — byte identical to the base behaviour (AC8 regression), OR
  • [audio_message, tool_call_message] when ≥1 tool was called (AC1/AC9/AC10). convert_agent_return_types_to_openai_messages passes a list of dicts through verbatim into result.messages. The completed-calls list is reset HERE (turn start) so tool calls never leak across turns; the function-call events for THIS turn are consumed inside the super().call() drain and finalized onto the list.
Expand source code
async def call(self, input: AgentInput) -> AgentReturnTypes:
    """Surface realtime tool calls alongside the spoken audio turn (#630).

    The base ``call()`` returns a single assistant audio message and does
    all the recording bookkeeping. We keep that intact and, when the agent
    called any tools this turn, append ONE extra assistant message carrying
    every tool call as OpenAI-chat ``tool_calls`` — the shape
    ``state.has_tool_call`` / ``state.last_tool_call`` consume.

    Returns:
        - the single audio message (dict) when no tools were called — byte
          identical to the base behaviour (AC8 regression), OR
        - ``[audio_message, tool_call_message]`` when ≥1 tool was called
          (AC1/AC9/AC10). ``convert_agent_return_types_to_openai_messages``
          passes a list of dicts through verbatim into result.messages.

    The completed-calls list is reset HERE (turn start) so tool calls never
    leak across turns; the function-call events for THIS turn are consumed
    inside the ``super().call()`` drain and finalized onto the list.
    """
    # Reset per-turn tool-call state so a prior turn's calls don't bleed
    # through (mirrors the transcript reset in _drain_agent_response).
    self._completed_tool_calls = []
    self._tool_call_accumulators = {}

    audio_message = await super().call(input)

    if not self._completed_tool_calls:
        return audio_message

    # One assistant message carrying ALL of this turn's tool calls — the
    # conventional OpenAI shape (one assistant turn, many tool_calls).
    tool_call_message: ChatCompletionMessageParam = cast(
        ChatCompletionMessageParam,
        {
            "role": "assistant",
            "content": None,
            "tool_calls": list(self._completed_tool_calls),
        },
    )
    return [cast(ChatCompletionMessageParam, audio_message), tool_call_message]
async def connect(self) ‑> None

Open the Realtime WebSocket and send the initial session.update.

Expand source code
async def connect(self) -> None:
    """Open the Realtime WebSocket and send the initial session.update."""
    import websockets

    self._ws = await websockets.connect(
        self.url,
        additional_headers={
            "Authorization": f"Bearer {self._api_key}",
        },
    )
    logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url)

    # Configure session: audio formats, voice, instructions, tools.
    # Disable server-side VAD (session.audio.input.turn_detection=None) so
    # we control turn boundaries explicitly via input_audio_buffer.commit +
    # response.create after each send_audio.
    session_config: dict[str, Any] = {
        "type": "realtime",
        "audio": {
            "input": {
                "format": {"type": "audio/pcm", "rate": 24000},
                "turn_detection": None,
                "transcription": {"model": OPENAI_STT_MODEL},
            },
            "output": {
                "format": {"type": "audio/pcm", "rate": 24000},
                "voice": self.voice,
            },
        },
    }
    if self.instructions:
        session_config["instructions"] = self.instructions
    if self.tools:
        session_config["tools"] = self.tools

    await self._ws.send(
        json.dumps({"type": "session.update", "session": session_config})
    )
    logger.debug("OpenAIRealtimeAgentAdapter: session.update sent")
async def disconnect(self) ‑> None

Close the WebSocket if open.

Expand source code
async def disconnect(self) -> None:
    """Close the WebSocket if open."""
    if self._ws is not None:
        try:
            await self._ws.close()
        except Exception:
            # Best-effort: connection may already be half-closed or in an
            # error state when disconnect() is called. We're tearing down
            # regardless — propagating here would just leak the WS reference.
            pass
        finally:
            self._ws = None
        logger.debug("OpenAIRealtimeAgentAdapter: disconnected")
async def interrupt(self) ‑> None

Send response.cancel — the OpenAI Realtime API's first-class interrupt. The model stops generating audio and text immediately. No timing race against VAD: deterministic stop, then the next user turn flows normally through send_audio + recv_audio.

Expand source code
async def interrupt(self) -> None:
    """Send ``response.cancel`` — the OpenAI Realtime API's first-class
    interrupt. The model stops generating audio and text immediately.
    No timing race against VAD: deterministic stop, then the next user
    turn flows normally through ``send_audio`` + ``recv_audio``.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
    await self._ws.send(json.dumps({"type": "response.cancel"}))
    logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)")
def notify_agent_turn(self) ‑> None

Signal that an agent turn is about to be dispatched.

Called by the executor before each agent step so recv_audio can fire a bare response.create for agent-initiated turns (where no user audio has been committed). This per-turn signal handles both turn 1 (opening) and subsequent agent turns in multi-turn scripts like [agent(), user(), agent()].

Only meaningful when role=AGENT. Safe to call on every agent step — recv_audio consumes and clears the flag.

Expand source code
def notify_agent_turn(self) -> None:
    """Signal that an agent turn is about to be dispatched.

    Called by the executor before each agent step so recv_audio can fire
    a bare response.create for agent-initiated turns (where no user audio
    has been committed). This per-turn signal handles both turn 1 (opening)
    and subsequent agent turns in multi-turn scripts like [agent(), user(),
    agent()].

    Only meaningful when role=AGENT. Safe to call on every agent step —
    recv_audio consumes and clears the flag.
    """
    if self.role == AgentRole.AGENT:
        self._agent_turn_pending = True
async def recv_audio(self, timeout: float) ‑> AudioChunk

Commit any pending audio, request a response, and return the first audio chunk the model produces.

If send_audio was called since the last recv_audio, this method commits the buffer and emits response.create before awaiting the reply. Subsequent recv calls without new send calls just await the next audio delta (for multi-chunk responses).

Loops over incoming events until a response.output_audio.delta event arrives (GA name), then returns decoded PCM16. The legacy response.audio.delta name is accepted defensively with a one-time warning. Transcript events update the instance's last_user_transcript / last_agent_transcript attributes. An error event raises a RuntimeError. All other housekeeping events are ignored and the loop continues.

Raises

asyncio.TimeoutError
if no audio arrives within timeout.
RuntimeError
if the server sends an error event.
Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """
    Commit any pending audio, request a response, and return the first
    audio chunk the model produces.

    If ``send_audio`` was called since the last ``recv_audio``, this
    method commits the buffer and emits ``response.create`` before
    awaiting the reply. Subsequent recv calls without new send calls
    just await the next audio delta (for multi-chunk responses).

    Loops over incoming events until a ``response.output_audio.delta``
    event arrives (GA name), then returns decoded PCM16. The legacy
    ``response.audio.delta`` name is accepted defensively with a one-time
    warning. Transcript events update the instance's
    ``last_user_transcript`` / ``last_agent_transcript`` attributes.
    An ``error`` event raises a ``RuntimeError``. All other housekeeping
    events are ignored and the loop continues.

    Raises:
        asyncio.TimeoutError: if no audio arrives within ``timeout``.
        RuntimeError: if the server sends an error event.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

    # If send_audio was called since last recv, commit and request response.
    if self._pending_audio_bytes > 0:
        await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
        await self._ws.send(json.dumps({"type": "response.create"}))
        self._pending_audio_bytes = 0
        self._agent_turn_pending = False  # user spoke → per-turn signal consumed

    # Gap 1: agent-speaks-first / multi-turn agent initiation.
    # When the executor signals an agent turn via notify_agent_turn() and
    # no user audio has been committed and no response is already in flight,
    # send a bare content-free response.create so the model speaks.
    # Opening words come from the session instructions — no text is injected.
    # Self-limiting on user-first scripts: if user audio was committed above,
    # _agent_turn_pending was already cleared.
    # Also serves as the clean drain exit: if a response already completed
    # (_response_active is False after response.done cleared it) and
    # _agent_turn_pending is False, a drain re-entry returns an empty chunk.
    elif self._agent_turn_pending and not self._response_active:
        await self._ws.send(json.dumps({"type": "response.create"}))
        self._agent_turn_pending = False
        self._response_active = True

    elif not self._agent_turn_pending and not self._response_active and self._response_ever_active:
        # No pending audio, no agent-turn signal, no response in flight,
        # AND at least one response has already completed this session:
        # this is a drain re-entry after a completed response. Return empty
        # chunk so _drain_agent_response's tail-silence loop exits cleanly.
        # Guard on _response_ever_active so that a fresh recv_audio call
        # (before any response.created fires) does NOT short-circuit —
        # that would break direct recv_audio callers (e.g. unit tests that
        # call recv_audio without going through notify_agent_turn).
        return AudioChunk(data=b"")

    deadline = asyncio.get_running_loop().time() + timeout
    while True:
        remaining = deadline - asyncio.get_running_loop().time()
        if remaining <= 0:
            raise asyncio.TimeoutError(
                "OpenAIRealtimeAgentAdapter: recv_audio timed out"
            )

        raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
        try:
            event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
        except Exception:
            logger.debug(
                "OpenAIRealtimeAgentAdapter: non-JSON message, skipping"
            )
            continue

        etype = event.get("type", "")

        if etype in ("response.output_audio.delta", "response.audio.delta"):
            # Accept both the GA event name and its retired beta alias —
            # live gpt-realtime* models have been observed still emitting
            # the beta names. These legacy arms should be removed once the
            # GA names are confirmed stable at a live endpoint (issue #602).
            self._warn_if_legacy(etype, "response.output_audio.delta")
            b64 = event.get("delta", "")
            pcm = base64.b64decode(b64)
            # Enforce PCM16 invariant: even byte count.
            if len(pcm) % 2 == 1:
                pcm = pcm[:-1]
            return AudioChunk(data=pcm)

        elif etype == "response.created":
            # Response is now in flight — mark it so subsequent recv_audio
            # drain re-entries don't fire a spurious second response.create.
            self._response_active = True
            self._response_ever_active = True

        elif etype in (
            "response.output_audio_transcript.delta",
            "response.audio_transcript.delta",
        ):
            # Accumulate streaming agent transcript.
            self._warn_if_legacy(etype, "response.output_audio_transcript.delta")
            self._agent_transcript_buf += event.get("delta", "")

        elif etype in (
            "response.output_audio_transcript.done",
            "response.audio_transcript.done",
        ):
            # Finalise; the `transcript` field may have the full text.
            self._warn_if_legacy(etype, "response.output_audio_transcript.done")
            transcript = event.get("transcript", "")
            if transcript:
                self.last_agent_transcript = transcript
            elif self._agent_transcript_buf:
                self.last_agent_transcript = self._agent_transcript_buf
            self._agent_transcript_buf = ""

        elif etype in ("response.done", "response.cancelled"):
            # Response finished or was cancelled — mark it so the next
            # drain re-entry returns an empty chunk (clean exit).
            self._response_active = False
            # Issue #646: a tool-only turn (function call, NO audio delta) would
            # otherwise loop here to the deadline and raise — the accumulated tool
            # call is parsed but never returned. When the response is done and at
            # least one tool call has been finalized this turn, return an empty
            # chunk so the drain exits cleanly and call() surfaces the tool_calls
            # message. A genuinely empty turn (done + EMPTY accumulator) must still
            # fall through to the timeout — the non-empty accumulator is the
            # discriminator, NOT response.done alone.
            # (Distinct from the drain-re-entry empty-chunk path above at the
            # _response_ever_active guard, which handles a COMPLETED prior response.)
            if self._completed_tool_calls:
                return AudioChunk(data=b"")

        elif etype == "conversation.item.input_audio_transcription.completed":
            # User-side transcript from Whisper.
            self.last_user_transcript = event.get("transcript", "")

        elif etype == "response.function_call_arguments.delta":
            # Issue #630: streaming arguments fragment for a function call.
            # Accumulate per call_id; the call is finalized on `.done` or
            # the function_call output-item event.
            self._accumulate_tool_call_delta(
                event.get("call_id", ""), event.get("delta", "")
            )

        elif etype == "response.function_call_arguments.done":
            # Issue #630: streaming-args path complete. `name` is typically
            # NOT on this event (it arrives via the output_item), so resolve
            # it from the accumulator. A missing call_id degrades safely.
            self._finalize_tool_call(
                event.get("call_id"),
                name=event.get("name"),
                arguments=event.get("arguments"),
            )

        elif etype in ("response.output_item.added", "response.output_item.done"):
            # Issue #630: the output-item form of a function call carries the
            # authoritative `name` + `call_id` (and, on `.done`, the full
            # `arguments`). For non-function items (e.g. an audio message
            # item) this is benign housekeeping — fall through silently.
            item = event.get("item") or {}
            if isinstance(item, dict) and item.get("type") == "function_call":
                call_id = item.get("call_id")
                name = item.get("name")
                if etype == "response.output_item.added":
                    # Shell arrives before args stream — record the name so a
                    # later delta/done can attach to it. Do not finalize yet.
                    self._note_tool_call_name(call_id or "", name)
                else:
                    # `.done`: authoritative full call. Finalize (idempotent
                    # on call_id — merges with any streaming-args entry, AC6).
                    self._finalize_tool_call(
                        call_id, name=name, arguments=item.get("arguments")
                    )
            else:
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: ignoring non-function "
                    "output_item event %r",
                    etype,
                )

        elif etype == "error":
            error_detail = event.get("error", {})
            msg = error_detail.get("message", str(error_detail))
            raise RuntimeError(
                f"OpenAIRealtimeAgentAdapter: server error — {msg}"
            )

        else:
            # Housekeeping events — session.created, session.updated, etc. —
            # are benign. Log at DEBUG and keep the loop running.
            logger.debug(
                "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype
            )
async def send_audio(self, chunk: AudioChunk) ‑> None

Append a PCM16 audio chunk to the model's input audio buffer.

Only emits input_audio_buffer.append — the commit + response are deferred to the next recv_audio call. The scenario executor may call send_audio many times for a single user turn (TTS streams audio as chunks); committing per-chunk would confuse the server with sub-second turn boundaries. By deferring commit to recv_audio, we get one server turn per user turn.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """
    Append a PCM16 audio chunk to the model's input audio buffer.

    Only emits ``input_audio_buffer.append`` — the commit + response are
    deferred to the next ``recv_audio`` call. The scenario executor may
    call ``send_audio`` many times for a single user turn (TTS streams
    audio as chunks); committing per-chunk would confuse the server with
    sub-second turn boundaries. By deferring commit to recv_audio, we
    get one server turn per user turn.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
    b64 = base64.b64encode(chunk.data).decode()
    await self._ws.send(
        json.dumps({"type": "input_audio_buffer.append", "audio": b64})
    )
    self._pending_audio_bytes += len(chunk.data)
async def send_text(self, text: str) ‑> None

Inject scripted text into the realtime session as a user message.

Used when this adapter is the user simulator (role=USER): scripted user("text") steps route through here instead of spawning TTS. The model synthesises the text into spoken audio with natural prosody, which is then delivered via recv_audio.

NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio messages retroactively; the downstream transcript reflects what the model actually emitted, not what was scripted.

Raises

RuntimeError
if called before connect().
Expand source code
async def send_text(self, text: str) -> None:
    """
    Inject scripted text into the realtime session as a user message.

    Used when this adapter is the user simulator (role=USER): scripted
    ``user("text")`` steps route through here instead of spawning TTS.
    The model synthesises the text into spoken audio with natural prosody,
    which is then delivered via ``recv_audio``.

    NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio
    messages retroactively; the downstream transcript reflects what the
    model actually emitted, not what was scripted.

    Raises:
        RuntimeError: if called before ``connect()``.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

    # Create a user conversation item with the scripted text.
    await self._ws.send(
        json.dumps(
            {
                "type": "conversation.item.create",
                "item": {
                    "type": "message",
                    "role": "user",
                    "content": [{"type": "input_text", "text": text}],
                },
            }
        )
    )
    # Prompt the model to generate audio output.
    await self._ws.send(json.dumps({"type": "response.create"}))
    logger.debug(
        "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60]
    )
class PendingTransportError (adapter_name: str)

Raised by stub adapters when their transport code has not landed yet.

Expand source code
class PendingTransportError(NotImplementedError):
    """Raised by stub adapters when their transport code has not landed yet."""

    def __init__(self, adapter_name: str) -> None:
        super().__init__(
            f"{adapter_name}: transport implementation is not yet wired up. "
            "Options: (1) run this scenario as an @integration test against a "
            f"live endpoint, (2) subclass {adapter_name} and implement "
            "send_audio/recv_audio — and re-audit the inherited "
            "`capabilities` ClassVar so the matrix matches what your subclass "
            "can actually do. Claiming streaming_transcripts=True in a "
            "subclass without a real transcript stream will silently break "
            "after_words interruption."
        )
        self.adapter_name = adapter_name

Ancestors

  • builtins.NotImplementedError
  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException
class PipecatAgentAdapter (url: Optional[str] = None, *, signaling_url: Optional[str] = None, transport: "Literal['scenario.voice.adapters.websocket', 'scenario.voice.adapters.webrtc']" = 'websocket', audio_format: str = 'mulaw', sample_rate: int = 8000, stream_sid: Optional[str] = None, call_sid: Optional[str] = None)

Test a running Pipecat bot via its exposed WebSocket endpoint.

Transport is selected by the transport argument: - "websocket" (default): Twilio Media Streams protocol over WS. Scenario sends a synthetic start event, then media frames. Pipecat's TwilioFrameSerializer on the bot side handles the wire format. - "webrtc": SmallWebRTC-style negotiation. Raises PendingTransportError; tracked as a follow-up.

Expand source code
class PipecatAgentAdapter(VoiceAgentAdapter):
    """
    Test a running Pipecat bot via its exposed WebSocket endpoint.

    Transport is selected by the ``transport`` argument:
        - ``"websocket"`` (default): Twilio Media Streams protocol over WS.
          Scenario sends a synthetic ``start`` event, then ``media`` frames.
          Pipecat's ``TwilioFrameSerializer`` on the bot side handles the
          wire format.
        - ``"webrtc"``: SmallWebRTC-style negotiation. Raises
          ``PendingTransportError``; tracked as a follow-up.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # Pipecat over the Twilio WS transport speaks the Twilio Media Streams
        # protocol; the ``clear`` event drops all buffered outbound audio on
        # the bot side. That's first-class interrupt — no VAD timing race.
        interruption=True,
        input_formats=["pcm16/24000", "mulaw/8000", "opus"],
        output_formats=["pcm16/24000", "mulaw/8000", "opus"],
    )

    def __init__(
        self,
        url: Optional[str] = None,
        *,
        signaling_url: Optional[str] = None,
        transport: Literal["websocket", "webrtc"] = "websocket",
        audio_format: str = "mulaw",
        sample_rate: int = 8000,
        stream_sid: Optional[str] = None,
        call_sid: Optional[str] = None,
    ) -> None:
        super().__init__()
        if transport == "websocket" and url is None:
            raise ValueError("PipecatAgentAdapter(transport='websocket') requires url=")
        if transport == "webrtc" and signaling_url is None:
            raise ValueError("PipecatAgentAdapter(transport='webrtc') requires signaling_url=")

        self.url = url
        self.signaling_url = signaling_url
        self.transport = transport
        self.audio_format = audio_format
        self.sample_rate = sample_rate
        # Synthetic SIDs pipecat's TwilioFrameSerializer needs in the `start`
        # event. If caller doesn't supply them, we fabricate UUIDs. Pipecat
        # uses them for logging and the auto-hangup REST call; both are no-ops
        # when we're not actually going through Twilio.
        self.stream_sid = stream_sid
        self.call_sid = call_sid

        self._ws: Any = None
        self._recv_task: Optional[asyncio.Task] = None
        self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None
        # Serialises concurrent send_audio() calls — without it two paced
        # senders would interleave 20-ms mulaw frames on the wire and the
        # bot would receive corrupted audio. Used for the interruption case
        # where the executor calls send_audio() while a previous turn's
        # send is still in flight.
        self._send_lock: Optional[asyncio.Lock] = None

    @property
    def transport_format(self) -> str:
        return f"{self.audio_format}/{self.sample_rate}"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        if self.transport == "webrtc":
            from ._stub import PendingTransportError

            raise PendingTransportError(
                "PipecatAgentAdapter(transport='webrtc')"
            )

        # Lazy import so `import scenario` doesn't require websockets at the
        # top of the module-load path (it's already a hard dep, but being
        # consistent with the Twilio adapter style).
        import websockets

        assert self.url is not None  # validated in __init__
        self._ws = await websockets.connect(
            self.url, ping_interval=None, ping_timeout=None
        )
        self._inbound_queue = asyncio.Queue()
        self._send_lock = asyncio.Lock()

        # Send the synthetic `start` event that pipecat's TwilioFrameSerializer
        # requires to learn the stream/call SIDs and start deserializing
        # media frames.
        if self.stream_sid is None:
            self.stream_sid = f"MZ{uuid.uuid4().hex}"
        if self.call_sid is None:
            self.call_sid = f"CA{uuid.uuid4().hex}"

        await self._ws.send(json.dumps({"event": "connected", "protocol": "Call", "version": "1.0.0"}))
        await self._ws.send(
            json.dumps(
                {
                    "event": "start",
                    "streamSid": self.stream_sid,
                    "start": {
                        "streamSid": self.stream_sid,
                        "callSid": self.call_sid,
                        "mediaFormat": {
                            "encoding": "audio/x-mulaw",
                            "sampleRate": 8000,
                            "channels": 1,
                        },
                    },
                }
            )
        )

        self._recv_task = asyncio.create_task(self._recv_loop())
        logger.debug("PipecatAgentAdapter: connected to %s (stream=%s)", self.url, self.stream_sid)

    async def disconnect(self) -> None:
        ws = self._ws
        if ws is None:
            return

        # Send `stop` event so the bot can clean up its pipeline gracefully.
        try:
            if self.stream_sid:
                await ws.send(json.dumps({"event": "stop", "streamSid": self.stream_sid}))
        except Exception:
            logger.debug("PipecatAgentAdapter: failed to send stop frame", exc_info=True)

        if self._recv_task is not None:
            self._recv_task.cancel()
            try:
                await self._recv_task
            except asyncio.CancelledError:
                # Expected: we just cancelled it.
                pass
            except Exception:
                # Unexpected teardown error — already logging enough context
                # elsewhere; disconnect() is best-effort.
                logger.debug("PipecatAgentAdapter: recv_task raised during cancel", exc_info=True)
            self._recv_task = None

        try:
            await ws.close()
        except Exception:
            # WS may already be closed by the peer; disconnect() is best-effort.
            logger.debug("PipecatAgentAdapter: ws.close() raised", exc_info=True)

        self._ws = None
        self._inbound_queue = None
        self.stream_sid = None
        self.call_sid = None

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        # Pace at real-time (TWILIO_FRAME_MS/1000s per 20-ms frame). Matches what
        # a real caller produces over a PSTN line — the SUT sees normal speech
        # rhythm, not a synthetic dump.
        #
        # After the last frame we send a Twilio ``mark`` named "utterance_end".
        # Real-time pacing means TTS-induced inter-phrase pauses survive on the
        # wire, and a stateless inactivity-timer on the receiver can't
        # distinguish "speaker paused after a comma" from "speaker finished
        # their turn." The mark is an explicit, non-ambiguous end-of-turn
        # signal: cooperating SUTs flush on the mark; legacy SUTs fall back to
        # VAD timing.
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None and self._send_lock is not None
        mulaw = pcm16_24k_to_mulaw8k(chunk.data)
        frame_secs = TWILIO_FRAME_MS / 1000
        async with self._send_lock:
            for frame in iter_mulaw_frames(mulaw):
                if not frame:
                    continue
                await self._ws.send(build_media_frame(self.stream_sid, frame))
                await asyncio.sleep(frame_secs)
            await self._ws.send(build_mark_frame(self.stream_sid, "utterance_end"))

    async def recv_audio(self, timeout: float) -> AudioChunk:
        self._assert_connected()
        assert self._inbound_queue is not None
        return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout)

    async def interrupt(self) -> None:
        """Send a Twilio ``clear`` frame — the bot drops all buffered outbound
        audio immediately. Cooperating Pipecat bots (and any code wired to
        the Media Streams protocol) treat ``clear`` as "stop talking now."
        Use this in preference to timing-based barge-in when the SUT
        supports it: it's deterministic, doesn't depend on VAD detection
        windows, and matches the same protocol used in production.
        """
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None
        await self._ws.send(build_clear_frame(self.stream_sid))
        logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")

    # ------------------------------------------------------------------ background

    async def _recv_loop(self) -> None:
        """Read frames from pipecat, decode µ-law → PCM16 24k, enqueue."""
        assert self._ws is not None and self._inbound_queue is not None
        buffered_mulaw = bytearray()
        BATCH_MS = 100

        try:
            async for raw in self._ws:
                if isinstance(raw, bytes):
                    # pipecat sometimes emits binary frames for audio; treat
                    # as raw µ-law payload if we see one.
                    buffered_mulaw.extend(raw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    continue

                frame = parse_media_stream_frame(raw)
                if frame is None:
                    continue
                if frame.event == "media" and frame.payload_mulaw:
                    buffered_mulaw.extend(frame.payload_mulaw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                elif frame.event == "stop":
                    if buffered_mulaw:
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    return
        except asyncio.CancelledError:
            raise
        except Exception:
            logger.warning("PipecatAgentAdapter: recv loop exited with error", exc_info=True)

    # ------------------------------------------------------------------ assertions

    def _assert_connected(self) -> None:
        if self._ws is None:
            raise RuntimeError(
                "PipecatAgentAdapter: not connected. Did you forget to call connect()?"
            )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var transport_format : str
Expand source code
@property
def transport_format(self) -> str:
    return f"{self.audio_format}/{self.sample_rate}"

Methods

async def interrupt(self) ‑> None

Send a Twilio clear frame — the bot drops all buffered outbound audio immediately. Cooperating Pipecat bots (and any code wired to the Media Streams protocol) treat clear as "stop talking now." Use this in preference to timing-based barge-in when the SUT supports it: it's deterministic, doesn't depend on VAD detection windows, and matches the same protocol used in production.

Expand source code
async def interrupt(self) -> None:
    """Send a Twilio ``clear`` frame — the bot drops all buffered outbound
    audio immediately. Cooperating Pipecat bots (and any code wired to
    the Media Streams protocol) treat ``clear`` as "stop talking now."
    Use this in preference to timing-based barge-in when the SUT
    supports it: it's deterministic, doesn't depend on VAD detection
    windows, and matches the same protocol used in production.
    """
    self._assert_connected()
    assert self._ws is not None and self.stream_sid is not None
    await self._ws.send(build_clear_frame(self.stream_sid))
    logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")

Inherited members

class TwilioAgentAdapter (*, account_sid: str, auth_token: str, phone_number: str, public_base_url: Optional[str] = None, allowed_callers: Optional[list[str]] = None, on_dtmf: Optional[Callable[[str], None]] = None, http_port: int = 8765, role: AgentRole = AgentRole.AGENT, validate_signature: bool = True)

Bidirectional Twilio Media Streams adapter.

Same class, same state, either direction:

adapter = TwilioAgentAdapter(
    account_sid=..., auth_token=...,
    phone_number="+14155551234",
)
async with adapter:                   # connect() / disconnect()
    await adapter.place_call(to="+14155557777")  # OR wait_for_call()
    # ... scenario.run(...) feeds send_audio / recv_audio ...

The adapter is the only adapter with dtmf=True. DTMF events received from the callee surface via the on_dtmf callback set at construction time. To send DTMF, use send_dtmf().

interrupt(after_words=N) raises UnsupportedCapabilityError on this adapter — Media Streams delivers raw audio without incremental transcripts. Use interrupt(after=seconds) instead.

Expand source code
class TwilioAgentAdapter(VoiceAgentAdapter):
    """
    Bidirectional Twilio Media Streams adapter.

    Same class, same state, either direction:

        adapter = TwilioAgentAdapter(
            account_sid=..., auth_token=...,
            phone_number="+14155551234",
        )
        async with adapter:                   # connect() / disconnect()
            await adapter.place_call(to="+14155557777")  # OR wait_for_call()
            # ... scenario.run(...) feeds send_audio / recv_audio ...

    The adapter is the *only* adapter with ``dtmf=True``. DTMF events
    received from the callee surface via the ``on_dtmf`` callback set at
    construction time. To send DTMF, use ``send_dtmf()``.

    ``interrupt(after_words=N)`` raises ``UnsupportedCapabilityError`` on this
    adapter — Media Streams delivers raw audio without incremental
    transcripts. Use ``interrupt(after=seconds)`` instead.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=False,
        native_vad=False,
        dtmf=True,
        # Twilio Media Streams ``clear`` event drops all buffered outbound
        # audio. Used by ``adapter.interrupt()`` already wired below.
        interruption=True,
        input_formats=["mulaw/8000"],
        output_formats=["mulaw/8000"],
    )

    # ------------------------------------------------------------------ init

    def __init__(
        self,
        *,
        account_sid: str,
        auth_token: str,
        phone_number: str,
        public_base_url: Optional[str] = None,
        allowed_callers: Optional[list[str]] = None,
        on_dtmf: Optional[Callable[[str], None]] = None,
        http_port: int = 8765,
        role: AgentRole = AgentRole.AGENT,
        validate_signature: bool = True,
    ) -> None:
        super().__init__()
        validate_e164(phone_number)

        self.account_sid = account_sid
        self.auth_token = auth_token
        self.phone_number = phone_number
        self.public_base_url = public_base_url
        self.allowed_callers = set(allowed_callers) if allowed_callers else None
        self.on_dtmf = on_dtmf
        self.http_port = http_port
        self.role = role  # type: ignore[misc]
        # When True (default), the inbound /twilio/voice route requires a
        # valid X-Twilio-Signature header before accepting the webhook
        # body. The cloudflared tunnel URL is ephemeral and not
        # guessable, but anyone who learns it could otherwise POST fake
        # Twilio webhook events into the harness. Tests that don't have
        # real Twilio credentials disable this with
        # ``validate_signature=False``; production callers should leave
        # it on.
        self.validate_signature = validate_signature
        if not validate_signature:
            logger.warning(
                "TwilioAgentAdapter: validate_signature=False — inbound "
                "webhooks accept any payload without signature checks. "
                "Use only in tests; do not deploy to production."
            )

        # Populated during connect(); None when disconnected.
        self._rest: Optional[TwilioRESTHelper] = None
        self._phone_number_sid: Optional[str] = None
        self._prior_voice_url: Optional[str] = None
        # Set by place_call() when it rewrites the callee's voice_url so
        # B-leg's webhook lands on our harness. Restored in disconnect.
        self._callee_phone_number_sid: Optional[str] = None
        self._prior_callee_voice_url: Optional[str] = None
        # Set by the first of wait_for_call()/place_call(); subsequent calls to
        # the other method raise. "idle" after connect() before either fires.
        self._mode: TwilioAdapterMode = "idle"

        # Server / media stream state.
        self._server_task: Optional[asyncio.Task] = None
        self._server_shutdown: Optional[asyncio.Event] = None
        self._call_sid: Optional[str] = None
        self._stream_sid: Optional[str] = None
        self._stream_connected: Optional[asyncio.Event] = None
        self._stream_ws: Any = None  # starlette WebSocket
        self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None


    # ------------------------------------------------------------------ repr

    def __repr__(self) -> str:  # redact credentials
        return (
            f"TwilioAgentAdapter("
            f"phone_number={self.phone_number!r}, "
            f"account_sid='***', auth_token='***', "
            f"public_base_url={self.public_base_url!r})"
        )

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Resolve number SID and start the FastAPI webhook + WS server.

        Does NOT modify the Twilio account's ``voice_url``. That side-effect
        only happens when ``wait_for_call()`` is invoked — callers (who will
        use ``place_call()``) never overwrite their number's inbound webhook,
        which makes caller-mode adapters safe to run against a shared pool of
        Twilio numbers without clobbering anyone's prod webhook.

        Idempotent: calling connect() on an already-connected adapter is a
        no-op. This lets the scenario executor's auto-connect step
        (``_voice_connect_all``) coexist with explicit harness-driven
        connects (``TwilioHarness`` ``__aenter__``) that already brought
        the adapter up before scenario.run() was called.
        """
        if self._rest is not None:
            return

        if self.public_base_url is None:
            raise RuntimeError(
                "TwilioAgentAdapter: public_base_url is required. Wrap the "
                "adapter in scenario.voice.testing.TwilioHarness, or supply "
                "a stable public HTTPS URL that routes to this machine."
            )

        self._rest = TwilioRESTHelper(self.account_sid, self.auth_token)
        self._phone_number_sid = self._rest.resolve_phone_number_sid(self.phone_number)

        self._stream_connected = asyncio.Event()
        self._inbound_queue = asyncio.Queue()
        self._server_shutdown = asyncio.Event()
        self._mode = "idle"

        # Webhook server is its own unit — see _twilio_server.py. The
        # adapter only orchestrates lifecycle; the routes, signature
        # validation, and WS framing live in TwilioWebhookServer.
        from ._twilio_server import TwilioWebhookServer
        self._webhook_server: Optional[TwilioWebhookServer] = TwilioWebhookServer(self)
        self._server_task = asyncio.create_task(self._run_server())
        # Give uvicorn a beat to bind the port before Twilio hits it.
        await asyncio.sleep(0.2)

    async def disconnect(self) -> None:
        """Restore prior voice_url (answer mode only), tear down server.

        Best-effort on errors. In caller mode we never touched the Twilio
        number's voice_url, so there's nothing to restore.
        """
        if self._rest is None:
            return

        # 1. Restore webhook first so Twilio doesn't keep hitting a dead URL.
        if self._mode == "answer" and self._phone_number_sid is not None:
            with suppress(Exception):
                prior = self._prior_voice_url or ""
                self._rest.write_voice_url(self._phone_number_sid, prior)
                logger.debug(
                    "TwilioAgentAdapter: restored voice_url=%r on %s",
                    prior,
                    self._phone_number_sid,
                )
        # place_call() rewrites the CALLEE's voice_url to attach Media
        # Streams to B-leg. Restore that too.
        if self._mode == "call" and self._callee_phone_number_sid is not None:
            with suppress(Exception):
                prior_b = self._prior_callee_voice_url or ""
                self._rest.write_voice_url(self._callee_phone_number_sid, prior_b)
                logger.debug(
                    "TwilioAgentAdapter: restored callee voice_url=%r on %s",
                    prior_b,
                    self._callee_phone_number_sid,
                )

        # 2. Signal server to shut down, then wait for the task.
        if self._server_shutdown is not None:
            self._server_shutdown.set()
        if self._server_task is not None:
            with suppress(Exception):
                await asyncio.wait_for(self._server_task, timeout=3.0)

        # 3. Reset state.
        self._rest = None
        self._phone_number_sid = None
        self._prior_voice_url = None
        self._callee_phone_number_sid = None
        self._prior_callee_voice_url = None
        self._mode = "idle"
        self._server_task = None
        self._server_shutdown = None
        self._call_sid = None
        self._stream_sid = None
        self._stream_connected = None
        self._stream_ws = None
        self._inbound_queue = None

    # ------------------------------------------------------------------ direction

    async def place_call(
        self,
        to: str,
        *,
        timeout: float = 120.0,
        attach_stream_to_self: bool = True,
    ) -> None:
        """
        Originate an outbound call from this adapter's Twilio number to ``to``.

        Twilio's REST ``Calls.create`` runs TwiML on TWO legs of the
        resulting call:

        - **A-leg** (the originator, ``from_=self.phone_number``): runs the
          inline TwiML passed via ``twiml=`` to ``Calls.create``. We use
          ``<Pause length=120>`` so the originator just holds the bridge
          open while the demo runs.
        - **B-leg** (the callee, ``to=``): when Twilio dials B and B picks
          up, B's number's ``voice_url`` fires — that's where the bridge's
          Media Streams attach. This is identical to the inbound demo's
          flow: B's voice_url returns ``<Connect><Stream>``, the WS opens,
          audio flows.

        So ``place_call`` only makes sense when ``to`` is another Twilio
        number on this account whose ``voice_url`` is set to OUR harness
        webhook. To make that wiring automatic, ``place_call`` temporarily
        rewrites B's ``voice_url`` for the duration of the call and
        restores it on ``disconnect``. The harness on this adapter's own
        number does NOT need to be answer-mode — the Stream attaches to
        B's leg, NOT this adapter's leg, but B's webhook is hosted on this
        adapter's local server, so the WS still lands here.

        The bidirectional audio model is unchanged: ``send_audio`` writes
        frames over the WS (B hears them and bridges to A), ``recv_audio``
        reads inbound frames off the WS (whatever the bridge mixes from
        both legs).

        Limitation: ``to`` MUST be a phone number on this same Twilio
        account. Calling an external PSTN endpoint (a real cell phone)
        requires a different topology (``<Start><Stream>`` + ``<Dial>``)
        which we don't implement here because the inline TwiML route on
        the A-leg can't capture B's audio when B is external.

        Default timeout 120s covers cloudflared cold-start latency.

        Raises:
            RuntimeError: If called after ``wait_for_call()`` (modes are
                exclusive per adapter instance), or if ``to`` is not a
                Twilio number on this account.
            ValueError: If ``to`` is not in E.164 format.
            asyncio.TimeoutError: If the media stream doesn't open within
                ``timeout`` seconds.
        """
        self._assert_connected()
        self._enter_mode("call")
        validate_e164(to)

        assert self.public_base_url is not None
        assert self._rest is not None
        assert self._stream_connected is not None

        if attach_stream_to_self:
            # Resolve B-leg's number SID and snapshot+rewrite its voice_url so
            # B's leg attaches its Media Stream to our harness webhook. We own
            # this number (same Twilio account); disconnect() will restore.
            self._callee_phone_number_sid = self._rest.resolve_phone_number_sid(to)
            self._prior_callee_voice_url = self._rest.read_voice_url(
                self._callee_phone_number_sid
            )
            webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
            self._rest.write_voice_url(self._callee_phone_number_sid, webhook_url)
            logger.info(
                "TwilioAgentAdapter: rewrote callee %s voice_url to %s",
                _redact_e164(to),
                webhook_url,
            )

        # A-leg TwiML: play a short deterministic <Say> line, then hold
        # the bridge open. Twilio runs this on the originator side while
        # B's webhook attaches the Media Stream.
        #
        # The <Say> gives the recording a known-good utterance to
        # transcribe. A bare <Pause> alone produces 120s of line silence
        # that Whisper has been observed to hallucinate as non-English
        # text (issue #465 in this PR). The Say is a one-time anchor at
        # call setup; the Media Stream carries the real bidirectional
        # conversation that follows.
        inline_a_leg_twiml = (
            '<?xml version="1.0" encoding="UTF-8"?>'
            "<Response>"
            f'<Say voice="Polly.Joanna">{PLACE_CALL_A_LEG_SAY_TEXT}</Say>'
            '<Pause length="120"/>'
            "</Response>"
        )
        self._call_sid = self._rest.place_call(
            to=to, from_=self.phone_number, twiml=inline_a_leg_twiml
        )
        logger.info(
            "TwilioAgentAdapter: placed call %s from %s to %s",
            self._call_sid,
            _redact_e164(self.phone_number),
            _redact_e164(to),
        )

        if attach_stream_to_self:
            # Wait for OUR webhook to fire — only meaningful when we rewrote
            # the callee's voice_url to point at us. In originator-only mode
            # (attach_stream_to_self=False), there's no stream coming to us;
            # the callee has its own harness which owns the stream.
            await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)

    async def wait_for_call(self, timeout: float = 120.0) -> None:
        """
        Block until someone dials in and the media stream is live.

        In **default mode** (no conference_room): the number's ``voice_url``
        is overwritten to point at our webhook so inbound calls reach us.
        Caller (``place_call``) elsewhere will dial this number.

        In **conference mode**: there's no inbound call to wait for — the
        two-Twilio-number demo can't naturally have one side receive an
        inbound call when both legs need conference TwiML. Instead, we
        ORIGINATE an outbound call FROM this adapter's number TO itself
        with inline TwiML that opens the capture stream and dials into
        the shared conference room. This is the symmetric counterpart to
        ``place_call`` in conference mode — both adapters end up as
        participants in the same room, exchanging audio via the bridge.

        Default timeout 120s covers cloudflared cold-start, conference-room
        formation, and Twilio's webhook ramp.

        Raises:
            RuntimeError: If called after ``place_call()``.
            asyncio.TimeoutError: If nobody dials in within ``timeout``.
        """
        self._assert_connected()
        self._enter_mode("answer")

        assert self.public_base_url is not None
        assert self._rest is not None
        assert self._phone_number_sid is not None
        assert self._stream_connected is not None

        # Snapshot the prior webhook so we can restore it on disconnect, then
        # point the number at our server. Only answer mode does this.
        self._prior_voice_url = self._rest.read_voice_url(self._phone_number_sid)
        webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
        self._rest.write_voice_url(self._phone_number_sid, webhook_url)
        logger.info("TwilioAgentAdapter: webhook set to %s", webhook_url)

        await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)

    def _enter_mode(self, mode: TwilioAdapterMode) -> None:
        """Transition idle → mode, or raise if already in a different mode.

        Modes are exclusive per connected session: an adapter can place a call
        or answer a call, not both. Disconnect + reconnect to reuse the
        instance in the other direction.
        """
        if self._mode == mode:
            return  # idempotent re-entry (e.g. retrying place_call after timeout)
        if self._mode != "idle":
            raise RuntimeError(
                f"TwilioAgentAdapter: already in {self._mode!r} mode; cannot "
                f"switch to {mode!r}. Disconnect and reconnect to reuse this "
                f"adapter in the other direction."
            )
        self._mode = mode

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        # Pace at real-time (one frame per TWILIO_FRAME_MS). Without pacing the
        # whole utterance arrives in milliseconds, which trips bots' VAD into
        # a clipped-utterance reading.
        self._assert_stream_live()

        ws = self._stream_ws
        stream_sid = self._stream_sid
        assert ws is not None and stream_sid is not None

        mulaw = pcm16_24k_to_mulaw8k(chunk.data)
        frame_secs = TWILIO_FRAME_MS / 1000
        for frame in iter_mulaw_frames(mulaw):
            if not frame:
                continue
            await ws.send_text(build_media_frame(stream_sid, frame))
            await asyncio.sleep(frame_secs)

    async def recv_audio(self, timeout: float) -> AudioChunk:
        self._assert_stream_live()
        assert self._inbound_queue is not None
        return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout)

    async def send_dtmf(self, tones: str) -> None:
        """Send DTMF digits on the live call (uses Twilio REST ``<Play digits>``)."""
        if self._rest is None or self._call_sid is None:
            raise RuntimeError("TwilioAgentAdapter: no active call; send_dtmf requires an in-progress call")
        # Run blocking REST call off-thread so we don't stall the event loop.
        await asyncio.to_thread(self._rest.send_dtmf_on_call, self._call_sid, tones)

    async def interrupt(self) -> None:
        """Drop any buffered outbound audio on Twilio's side (``clear`` event)."""
        self._assert_stream_live()
        ws = self._stream_ws
        stream_sid = self._stream_sid
        assert ws is not None and stream_sid is not None
        await ws.send_text(build_clear_frame(stream_sid))

    # ------------------------------------------------------------------ server

    async def _run_server(self) -> None:
        """Thin lifecycle wrapper; the real work lives in TwilioWebhookServer."""
        assert self._webhook_server is not None
        await self._webhook_server.run()

    def _build_app(self) -> Any:
        """Test seam: build the FastAPI app for in-process exercise.

        Production code never calls this — the server's ``run()`` builds
        the app itself when uvicorn starts. Existing unit tests use
        ``TestClient(_build_app())`` to exercise the routes without
        binding a port; the delegation keeps that test surface stable.
        """
        assert self._webhook_server is not None
        return self._webhook_server.build_app()

    async def _media_stream_loop(self, ws: Any) -> None:
        """Test seam: kick off the Media Streams WS loop directly.

        The two-adapter-bridge test in
        ``tests/voice/test_twilio_two_adapter_bridge.py`` uses this to
        drive a loopback WS without going through the FastAPI route
        wrapper. Production code reaches the loop via the ``/twilio/stream``
        WebSocket handler defined in ``_twilio_server.build_app``.
        """
        assert self._webhook_server is not None
        await self._webhook_server.media_stream_loop(ws)

    # ------------------------------------------------------------------ assertions

    def _assert_connected(self) -> None:
        if self._rest is None:
            raise RuntimeError("TwilioAgentAdapter: not connected; call connect() or use `async with`.")

    def _assert_stream_live(self) -> None:
        self._assert_connected()
        if self._stream_ws is None or self._stream_sid is None:
            raise RuntimeError(
                "TwilioAgentAdapter: no live media stream. Call place_call() or "
                "wait_for_call() first."
            )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Methods

async def connect(self) ‑> None

Resolve number SID and start the FastAPI webhook + WS server.

Does NOT modify the Twilio account's voice_url. That side-effect only happens when wait_for_call() is invoked — callers (who will use place_call()) never overwrite their number's inbound webhook, which makes caller-mode adapters safe to run against a shared pool of Twilio numbers without clobbering anyone's prod webhook.

Idempotent: calling connect() on an already-connected adapter is a no-op. This lets the scenario executor's auto-connect step (_voice_connect_all) coexist with explicit harness-driven connects (TwilioHarness __aenter__) that already brought the adapter up before scenario.run() was called.

Expand source code
async def connect(self) -> None:
    """Resolve number SID and start the FastAPI webhook + WS server.

    Does NOT modify the Twilio account's ``voice_url``. That side-effect
    only happens when ``wait_for_call()`` is invoked — callers (who will
    use ``place_call()``) never overwrite their number's inbound webhook,
    which makes caller-mode adapters safe to run against a shared pool of
    Twilio numbers without clobbering anyone's prod webhook.

    Idempotent: calling connect() on an already-connected adapter is a
    no-op. This lets the scenario executor's auto-connect step
    (``_voice_connect_all``) coexist with explicit harness-driven
    connects (``TwilioHarness`` ``__aenter__``) that already brought
    the adapter up before scenario.run() was called.
    """
    if self._rest is not None:
        return

    if self.public_base_url is None:
        raise RuntimeError(
            "TwilioAgentAdapter: public_base_url is required. Wrap the "
            "adapter in scenario.voice.testing.TwilioHarness, or supply "
            "a stable public HTTPS URL that routes to this machine."
        )

    self._rest = TwilioRESTHelper(self.account_sid, self.auth_token)
    self._phone_number_sid = self._rest.resolve_phone_number_sid(self.phone_number)

    self._stream_connected = asyncio.Event()
    self._inbound_queue = asyncio.Queue()
    self._server_shutdown = asyncio.Event()
    self._mode = "idle"

    # Webhook server is its own unit — see _twilio_server.py. The
    # adapter only orchestrates lifecycle; the routes, signature
    # validation, and WS framing live in TwilioWebhookServer.
    from ._twilio_server import TwilioWebhookServer
    self._webhook_server: Optional[TwilioWebhookServer] = TwilioWebhookServer(self)
    self._server_task = asyncio.create_task(self._run_server())
    # Give uvicorn a beat to bind the port before Twilio hits it.
    await asyncio.sleep(0.2)
async def disconnect(self) ‑> None

Restore prior voice_url (answer mode only), tear down server.

Best-effort on errors. In caller mode we never touched the Twilio number's voice_url, so there's nothing to restore.

Expand source code
async def disconnect(self) -> None:
    """Restore prior voice_url (answer mode only), tear down server.

    Best-effort on errors. In caller mode we never touched the Twilio
    number's voice_url, so there's nothing to restore.
    """
    if self._rest is None:
        return

    # 1. Restore webhook first so Twilio doesn't keep hitting a dead URL.
    if self._mode == "answer" and self._phone_number_sid is not None:
        with suppress(Exception):
            prior = self._prior_voice_url or ""
            self._rest.write_voice_url(self._phone_number_sid, prior)
            logger.debug(
                "TwilioAgentAdapter: restored voice_url=%r on %s",
                prior,
                self._phone_number_sid,
            )
    # place_call() rewrites the CALLEE's voice_url to attach Media
    # Streams to B-leg. Restore that too.
    if self._mode == "call" and self._callee_phone_number_sid is not None:
        with suppress(Exception):
            prior_b = self._prior_callee_voice_url or ""
            self._rest.write_voice_url(self._callee_phone_number_sid, prior_b)
            logger.debug(
                "TwilioAgentAdapter: restored callee voice_url=%r on %s",
                prior_b,
                self._callee_phone_number_sid,
            )

    # 2. Signal server to shut down, then wait for the task.
    if self._server_shutdown is not None:
        self._server_shutdown.set()
    if self._server_task is not None:
        with suppress(Exception):
            await asyncio.wait_for(self._server_task, timeout=3.0)

    # 3. Reset state.
    self._rest = None
    self._phone_number_sid = None
    self._prior_voice_url = None
    self._callee_phone_number_sid = None
    self._prior_callee_voice_url = None
    self._mode = "idle"
    self._server_task = None
    self._server_shutdown = None
    self._call_sid = None
    self._stream_sid = None
    self._stream_connected = None
    self._stream_ws = None
    self._inbound_queue = None
async def interrupt(self) ‑> None

Drop any buffered outbound audio on Twilio's side (clear event).

Expand source code
async def interrupt(self) -> None:
    """Drop any buffered outbound audio on Twilio's side (``clear`` event)."""
    self._assert_stream_live()
    ws = self._stream_ws
    stream_sid = self._stream_sid
    assert ws is not None and stream_sid is not None
    await ws.send_text(build_clear_frame(stream_sid))
async def place_call(self, to: str, *, timeout: float = 120.0, attach_stream_to_self: bool = True) ‑> None

Originate an outbound call from this adapter's Twilio number to to.

Twilio's REST Calls.create runs TwiML on TWO legs of the resulting call:

  • A-leg (the originator, from_=self.phone_number): runs the inline TwiML passed via twiml= to Calls.create. We use <Pause length=120> so the originator just holds the bridge open while the demo runs.
  • B-leg (the callee, to=): when Twilio dials B and B picks up, B's number's voice_url fires — that's where the bridge's Media Streams attach. This is identical to the inbound demo's flow: B's voice_url returns <Connect><Stream>, the WS opens, audio flows.

So place_call only makes sense when to is another Twilio number on this account whose voice_url is set to OUR harness webhook. To make that wiring automatic, place_call temporarily rewrites B's voice_url for the duration of the call and restores it on disconnect. The harness on this adapter's own number does NOT need to be answer-mode — the Stream attaches to B's leg, NOT this adapter's leg, but B's webhook is hosted on this adapter's local server, so the WS still lands here.

The bidirectional audio model is unchanged: send_audio writes frames over the WS (B hears them and bridges to A), recv_audio reads inbound frames off the WS (whatever the bridge mixes from both legs).

Limitation: to MUST be a phone number on this same Twilio account. Calling an external PSTN endpoint (a real cell phone) requires a different topology (<Start><Stream> + <Dial>) which we don't implement here because the inline TwiML route on the A-leg can't capture B's audio when B is external.

Default timeout 120s covers cloudflared cold-start latency.

Raises

RuntimeError
If called after wait_for_call() (modes are exclusive per adapter instance), or if to is not a Twilio number on this account.
ValueError
If to is not in E.164 format.
asyncio.TimeoutError
If the media stream doesn't open within timeout seconds.
Expand source code
async def place_call(
    self,
    to: str,
    *,
    timeout: float = 120.0,
    attach_stream_to_self: bool = True,
) -> None:
    """
    Originate an outbound call from this adapter's Twilio number to ``to``.

    Twilio's REST ``Calls.create`` runs TwiML on TWO legs of the
    resulting call:

    - **A-leg** (the originator, ``from_=self.phone_number``): runs the
      inline TwiML passed via ``twiml=`` to ``Calls.create``. We use
      ``<Pause length=120>`` so the originator just holds the bridge
      open while the demo runs.
    - **B-leg** (the callee, ``to=``): when Twilio dials B and B picks
      up, B's number's ``voice_url`` fires — that's where the bridge's
      Media Streams attach. This is identical to the inbound demo's
      flow: B's voice_url returns ``<Connect><Stream>``, the WS opens,
      audio flows.

    So ``place_call`` only makes sense when ``to`` is another Twilio
    number on this account whose ``voice_url`` is set to OUR harness
    webhook. To make that wiring automatic, ``place_call`` temporarily
    rewrites B's ``voice_url`` for the duration of the call and
    restores it on ``disconnect``. The harness on this adapter's own
    number does NOT need to be answer-mode — the Stream attaches to
    B's leg, NOT this adapter's leg, but B's webhook is hosted on this
    adapter's local server, so the WS still lands here.

    The bidirectional audio model is unchanged: ``send_audio`` writes
    frames over the WS (B hears them and bridges to A), ``recv_audio``
    reads inbound frames off the WS (whatever the bridge mixes from
    both legs).

    Limitation: ``to`` MUST be a phone number on this same Twilio
    account. Calling an external PSTN endpoint (a real cell phone)
    requires a different topology (``<Start><Stream>`` + ``<Dial>``)
    which we don't implement here because the inline TwiML route on
    the A-leg can't capture B's audio when B is external.

    Default timeout 120s covers cloudflared cold-start latency.

    Raises:
        RuntimeError: If called after ``wait_for_call()`` (modes are
            exclusive per adapter instance), or if ``to`` is not a
            Twilio number on this account.
        ValueError: If ``to`` is not in E.164 format.
        asyncio.TimeoutError: If the media stream doesn't open within
            ``timeout`` seconds.
    """
    self._assert_connected()
    self._enter_mode("call")
    validate_e164(to)

    assert self.public_base_url is not None
    assert self._rest is not None
    assert self._stream_connected is not None

    if attach_stream_to_self:
        # Resolve B-leg's number SID and snapshot+rewrite its voice_url so
        # B's leg attaches its Media Stream to our harness webhook. We own
        # this number (same Twilio account); disconnect() will restore.
        self._callee_phone_number_sid = self._rest.resolve_phone_number_sid(to)
        self._prior_callee_voice_url = self._rest.read_voice_url(
            self._callee_phone_number_sid
        )
        webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
        self._rest.write_voice_url(self._callee_phone_number_sid, webhook_url)
        logger.info(
            "TwilioAgentAdapter: rewrote callee %s voice_url to %s",
            _redact_e164(to),
            webhook_url,
        )

    # A-leg TwiML: play a short deterministic <Say> line, then hold
    # the bridge open. Twilio runs this on the originator side while
    # B's webhook attaches the Media Stream.
    #
    # The <Say> gives the recording a known-good utterance to
    # transcribe. A bare <Pause> alone produces 120s of line silence
    # that Whisper has been observed to hallucinate as non-English
    # text (issue #465 in this PR). The Say is a one-time anchor at
    # call setup; the Media Stream carries the real bidirectional
    # conversation that follows.
    inline_a_leg_twiml = (
        '<?xml version="1.0" encoding="UTF-8"?>'
        "<Response>"
        f'<Say voice="Polly.Joanna">{PLACE_CALL_A_LEG_SAY_TEXT}</Say>'
        '<Pause length="120"/>'
        "</Response>"
    )
    self._call_sid = self._rest.place_call(
        to=to, from_=self.phone_number, twiml=inline_a_leg_twiml
    )
    logger.info(
        "TwilioAgentAdapter: placed call %s from %s to %s",
        self._call_sid,
        _redact_e164(self.phone_number),
        _redact_e164(to),
    )

    if attach_stream_to_self:
        # Wait for OUR webhook to fire — only meaningful when we rewrote
        # the callee's voice_url to point at us. In originator-only mode
        # (attach_stream_to_self=False), there's no stream coming to us;
        # the callee has its own harness which owns the stream.
        await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)
async def send_dtmf(self, tones: str) ‑> None

Send DTMF digits on the live call (uses Twilio REST <Play digits>).

Expand source code
async def send_dtmf(self, tones: str) -> None:
    """Send DTMF digits on the live call (uses Twilio REST ``<Play digits>``)."""
    if self._rest is None or self._call_sid is None:
        raise RuntimeError("TwilioAgentAdapter: no active call; send_dtmf requires an in-progress call")
    # Run blocking REST call off-thread so we don't stall the event loop.
    await asyncio.to_thread(self._rest.send_dtmf_on_call, self._call_sid, tones)
async def wait_for_call(self, timeout: float = 120.0) ‑> None

Block until someone dials in and the media stream is live.

In default mode (no conference_room): the number's voice_url is overwritten to point at our webhook so inbound calls reach us. Caller (place_call) elsewhere will dial this number.

In conference mode: there's no inbound call to wait for — the two-Twilio-number demo can't naturally have one side receive an inbound call when both legs need conference TwiML. Instead, we ORIGINATE an outbound call FROM this adapter's number TO itself with inline TwiML that opens the capture stream and dials into the shared conference room. This is the symmetric counterpart to place_call in conference mode — both adapters end up as participants in the same room, exchanging audio via the bridge.

Default timeout 120s covers cloudflared cold-start, conference-room formation, and Twilio's webhook ramp.

Raises

RuntimeError
If called after place_call().
asyncio.TimeoutError
If nobody dials in within timeout.
Expand source code
async def wait_for_call(self, timeout: float = 120.0) -> None:
    """
    Block until someone dials in and the media stream is live.

    In **default mode** (no conference_room): the number's ``voice_url``
    is overwritten to point at our webhook so inbound calls reach us.
    Caller (``place_call``) elsewhere will dial this number.

    In **conference mode**: there's no inbound call to wait for — the
    two-Twilio-number demo can't naturally have one side receive an
    inbound call when both legs need conference TwiML. Instead, we
    ORIGINATE an outbound call FROM this adapter's number TO itself
    with inline TwiML that opens the capture stream and dials into
    the shared conference room. This is the symmetric counterpart to
    ``place_call`` in conference mode — both adapters end up as
    participants in the same room, exchanging audio via the bridge.

    Default timeout 120s covers cloudflared cold-start, conference-room
    formation, and Twilio's webhook ramp.

    Raises:
        RuntimeError: If called after ``place_call()``.
        asyncio.TimeoutError: If nobody dials in within ``timeout``.
    """
    self._assert_connected()
    self._enter_mode("answer")

    assert self.public_base_url is not None
    assert self._rest is not None
    assert self._phone_number_sid is not None
    assert self._stream_connected is not None

    # Snapshot the prior webhook so we can restore it on disconnect, then
    # point the number at our server. Only answer mode does this.
    self._prior_voice_url = self._rest.read_voice_url(self._phone_number_sid)
    webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice"
    self._rest.write_voice_url(self._phone_number_sid, webhook_url)
    logger.info("TwilioAgentAdapter: webhook set to %s", webhook_url)

    await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)

Inherited members

class VapiAgentAdapter (assistant_id: str, api_key: str)

Abstract base for voice agents that exchange audio with the agent under test.

Subclasses implement connect, disconnect, send_audio, and recv_audio. The default call implementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.

Attributes

capabilities
Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout

Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.

60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.

Override per-adapter at construction time::

adapter = MyVoiceAdapter()
adapter.response_timeout = 90.0  # slow tool-call chain
Expand source code
class VapiAgentAdapter(VoiceAgentAdapter):
    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        input_formats=["pcm16/16000"],
        output_formats=["pcm16/16000"],
    )

    def __init__(self, assistant_id: str, api_key: str):
        super().__init__()
        self.assistant_id = assistant_id
        self.api_key = api_key
        self.websocket_call_url: Optional[str] = None
        self._ws: Optional[object] = None

    async def connect(self) -> None:
        # Integration: POST to Vapi REST API to get websocketCallUrl, then
        # open websocket.
        self.websocket_call_url = f"wss://vapi.ai/ws/{self.assistant_id}"
        self._ws = object()

    async def disconnect(self) -> None:
        self._ws = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._ws is None:
            raise RuntimeError("VapiAgentAdapter: not connected")
        raise PendingTransportError("VapiAgentAdapter")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._ws is None:
            raise RuntimeError("VapiAgentAdapter: not connected")
        raise PendingTransportError("VapiAgentAdapter")

    def __repr__(self) -> str:  # redact credentials
        return f"VapiAgentAdapter(assistant_id={self.assistant_id!r}, api_key='***')"

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class WebRTCAgentAdapter (signaling_url: str)

Generic WebRTC adapter that negotiates via an HTTP signaling URL.

Expand source code
class WebRTCAgentAdapter(VoiceAgentAdapter):
    """Generic WebRTC adapter that negotiates via an HTTP signaling URL."""

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=False,
        native_vad=False,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(self, signaling_url: str):
        super().__init__()
        self.signaling_url = signaling_url
        self._pc: Optional[Any] = None
        self._inbound_audio: "asyncio.Queue[AudioChunk]" = asyncio.Queue()

    async def connect(self) -> None:
        # Deferred: actual SDP exchange requires a reachable signaling server.
        # Tested at @integration level with a loopback aiortc peer.
        self._pc = object()  # sentinel — mark "connected"

    async def disconnect(self) -> None:
        self._pc = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._pc is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        raise PendingTransportError(type(self).__name__)

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._pc is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        # If a subclass populated _inbound_audio, return from it.
        if not self._inbound_audio.empty():
            return await asyncio.wait_for(self._inbound_audio.get(), timeout=timeout)
        raise PendingTransportError(type(self).__name__)

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class WebSocketAgentAdapter (url: str, protocol: WebSocketProtocol)

Connects to an arbitrary WebSocket endpoint using a user-supplied protocol.

The protocol's encode_audio is called before sending; decode_response is called on each inbound frame until an AudioChunk is produced.

Expand source code
class WebSocketAgentAdapter(VoiceAgentAdapter):
    """
    Connects to an arbitrary WebSocket endpoint using a user-supplied protocol.

    The protocol's ``encode_audio`` is called before sending; ``decode_response``
    is called on each inbound frame until an AudioChunk is produced.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=False,
        native_vad=False,
        dtmf=False,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(self, url: str, protocol: WebSocketProtocol):
        super().__init__()
        self.url = url
        self.protocol = protocol
        self._ws: Optional[Any] = None

    async def connect(self) -> None:
        import websockets  # hard dep

        self._ws = await websockets.connect(self.url)

    async def disconnect(self) -> None:
        if self._ws is not None:
            await self._ws.close()
            self._ws = None

    async def send_audio(self, chunk: AudioChunk) -> None:
        if self._ws is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        payload = self.protocol.encode_audio(chunk.data)
        await self._ws.send(payload)

    async def recv_audio(self, timeout: float) -> AudioChunk:
        if self._ws is None:
            raise RuntimeError(f"{type(self).__name__}: not connected")
        loop = asyncio.get_running_loop()
        deadline = loop.time() + timeout
        while True:
            remaining = max(0.0, deadline - loop.time())
            message = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            chunk = self.protocol.decode_response(message)
            if chunk is not None:
                return chunk

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Inherited members

class WebSocketProtocol

Encoder/decoder pair for a custom WebSocket audio protocol.

Expand source code
class WebSocketProtocol(ABC):
    """Encoder/decoder pair for a custom WebSocket audio protocol."""

    @abstractmethod
    def encode_audio(self, audio: bytes) -> Any:
        """Convert PCM16 bytes into the wire representation the server expects."""

    @abstractmethod
    def decode_response(self, message: Any) -> Optional[AudioChunk]:
        """Parse a server message into an AudioChunk, or None if it's not audio."""

Ancestors

  • abc.ABC

Methods

def decode_response(self, message: Any) ‑> AudioChunk | None

Parse a server message into an AudioChunk, or None if it's not audio.

Expand source code
@abstractmethod
def decode_response(self, message: Any) -> Optional[AudioChunk]:
    """Parse a server message into an AudioChunk, or None if it's not audio."""
def encode_audio(self, audio: bytes) ‑> Any

Convert PCM16 bytes into the wire representation the server expects.

Expand source code
@abstractmethod
def encode_audio(self, audio: bytes) -> Any:
    """Convert PCM16 bytes into the wire representation the server expects."""