Module scenario.voice.adapters
Platform-specific voice adapters (Phase 2).
Per the proposal (§7.3): per-platform classes over a unified
VoiceAgent(transport=...). PipecatAgentAdapter means "test my Pipecat agent";
TwilioAgentAdapter means "test via phone call"; each has platform-specific
constructor parameters that don't fit cleanly on a generic class.
Expand source code
"""
Platform-specific voice adapters (Phase 2).
Per the proposal (§7.3): per-platform classes over a unified
``VoiceAgent(transport=...)``. ``PipecatAgentAdapter`` means "test my Pipecat agent";
``TwilioAgentAdapter`` means "test via phone call"; each has platform-specific
constructor parameters that don't fit cleanly on a generic class.
"""
from __future__ import annotations
from ._stub import PendingTransportError
from .composable import ComposableVoiceAgent, ElevenLabsVoiceAgent
from .elevenlabs import ElevenLabsAgentAdapter
from .gemini_live import GeminiLiveAgentAdapter
from .livekit import LiveKitAgentAdapter
from .openai_realtime import OpenAIRealtimeAgentAdapter
from .pipecat import PipecatAgentAdapter
from .twilio import TwilioAgentAdapter
from .vapi import VapiAgentAdapter
from .webrtc import WebRTCAgentAdapter
from .websocket import WebSocketAgentAdapter, WebSocketProtocol
__all__ = [
"ComposableVoiceAgent",
"ElevenLabsAgentAdapter",
"ElevenLabsVoiceAgent",
"GeminiLiveAgentAdapter",
"LiveKitAgentAdapter",
"OpenAIRealtimeAgentAdapter",
"PendingTransportError",
"PipecatAgentAdapter",
"TwilioAgentAdapter",
"VapiAgentAdapter",
"WebRTCAgentAdapter",
"WebSocketAgentAdapter",
"WebSocketProtocol",
]
Sub-modules
scenario.voice.adapters.composable-
Composable and provider-branded voice agents …
scenario.voice.adapters.elevenlabs-
ElevenLabsAgentAdapter: connect to ElevenLabs Conversational AI via their WebSocket …
scenario.voice.adapters.gemini_live-
GeminiLiveAgentAdapter: direct-to-model adapter for Gemini Live native-audio …
scenario.voice.adapters.livekit-
LiveKitAgentAdapter: join a LiveKit room as a participant and exchange audio …
scenario.voice.adapters.openai_realtime-
OpenAIRealtimeAgentAdapter: direct-to-model adapter — the model IS the agent …
scenario.voice.adapters.pipecat-
PipecatAgentAdapter: WebSocket client to a user-run Pipecat bot …
scenario.voice.adapters.twilio-
TwilioAgentAdapter: bidirectional real phone transport via Twilio Media Streams …
scenario.voice.adapters.vapi-
VapiAgentAdapter: call Vapi's REST API to create a call, then connect to the returned websocketCallUrl …
scenario.voice.adapters.webrtc-
Generic WebRTC adapter …
scenario.voice.adapters.websocket-
Generic WebSocket adapter: bring-your-own protocol …
Classes
class ComposableVoiceAgent (stt: STTProvider, llm: str, tts: str, *, system_prompt: Optional[str] = None)-
Locally-executed STT → LLM → TTS voice agent.
stttranscribes incoming user audio, the result is fed tollm(a litellm model string) along with conversation history, and the response is synthesised via thettsvoice string using the existingsynthesize()router.Each seam is independently swappable — change any one without touching the other two. Intermediate results are surfaced on instance attributes so the scenario harness can assert on them.
Attributes
last_user_transcript- Transcript of the most-recent user audio turn.
last_llm_response- Text produced by the LLM for the most-recent turn.
Args
stt- STTProvider implementation for the user's audio.
llm- litellm-style model identifier, e.g.
COMPOSABLE_VOICE_LLM_MODEL. tts- TTS voice string in
"provider/voice"format, e.g."openai/nova"or"elevenlabs/rachel". system_prompt- Optional system prompt seeded at turn zero so the LLM has guidance before the first user message. Defaults to a generic helpful-assistant prompt.
Expand source code
class ComposableVoiceAgent(VoiceAgentAdapter): """ Locally-executed STT → LLM → TTS voice agent. ``stt`` transcribes incoming user audio, the result is fed to ``llm`` (a litellm model string) along with conversation history, and the response is synthesised via the ``tts`` voice string using the existing ``scenario.voice.synthesize`` router. Each seam is independently swappable — change any one without touching the other two. Intermediate results are surfaced on instance attributes so the scenario harness can assert on them. Attributes: last_user_transcript: Transcript of the most-recent user audio turn. last_llm_response: Text produced by the LLM for the most-recent turn. """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=True, native_vad=False, dtmf=False, input_formats=["pcm16/24000"], output_formats=["pcm16/24000"], ) DEFAULT_SYSTEM_PROMPT = ( "You are a helpful voice assistant. Respond naturally and conversationally " "as this is an audio conversation — be concise, friendly, and clear." ) def __init__( self, stt: STTProvider, llm: str, tts: str, *, system_prompt: Optional[str] = None, ) -> None: """ Args: stt: STTProvider implementation for the user's audio. llm: litellm-style model identifier, e.g. ``COMPOSABLE_VOICE_LLM_MODEL``. tts: TTS voice string in ``"provider/voice"`` format, e.g. ``"openai/nova"`` or ``"elevenlabs/rachel"``. system_prompt: Optional system prompt seeded at turn zero so the LLM has guidance before the first user message. Defaults to a generic helpful-assistant prompt. """ super().__init__() self.stt = stt self.llm = llm self.tts = tts self.last_user_transcript: Optional[str] = None self.last_llm_response: Optional[str] = None # Seed history with a system prompt so the first recv_audio call (which # can happen before any user audio when the agent speaks first) doesn't # send an empty messages array to the LLM. self._history: List[dict] = [ {"role": "system", "content": system_prompt or self.DEFAULT_SYSTEM_PROMPT} ] # Turn-output guard. ``recv_audio`` synthesises ONE chunk per # user turn. The default ``call()`` drains by re-calling # ``recv_audio`` until tail-silence — on this adapter that would # kick a second LLM call, cancelled later by timeout (wasted # credits + latency). The guard makes subsequent ``recv_audio`` # calls in the same turn return an empty chunk, which the drain # loop interprets as end-of-stream. # # Reset boundary: ``send_audio`` (new user audio → new turn). # Set boundary: end of ``recv_audio`` (LLM+TTS completed). self._turn_output_emitted: bool = False def __repr__(self) -> str: return f"ComposableVoiceAgent(llm={self.llm!r}, tts={self.tts!r})" # ------------------------------------------------------------------ lifecycle async def connect(self) -> None: """No-op — no external transport to open.""" async def disconnect(self) -> None: """No-op — nothing to tear down.""" # ------------------------------------------------------------------ I/O async def send_audio(self, chunk: AudioChunk) -> None: """Transcribe the chunk via STT and store for the next recv_audio call.""" transcript = await self.stt.transcribe(chunk) self.last_user_transcript = transcript self._history.append({"role": "user", "content": transcript}) # New user turn → next recv_audio is allowed to synthesise. self._turn_output_emitted = False async def recv_audio(self, timeout: float) -> AudioChunk: """ Run the LLM on the current history, synthesise the response via TTS, and return the resulting AudioChunk. ``timeout`` is honoured for the combined LLM+TTS call via ``asyncio.wait_for``. Subsequent calls in the same turn (the default ``call()`` drains until tail-silence) return an empty chunk so the drain loop exits without billing a second LLM round-trip — see ``_turn_output_emitted`` for the guard contract. """ if self._turn_output_emitted: return AudioChunk(data=b"") import asyncio async def _run() -> AudioChunk: import litellm # type: ignore from litellm.types.utils import Choices, ModelResponse from typing import cast as _cast from ..tts import synthesize completion = await litellm.acompletion( model=self.llm, messages=self._history, ) # Non-streaming acompletion returns ModelResponse with Choices; # cast satisfies pyright without runtime isinstance overhead. completion = _cast(ModelResponse, completion) choice = _cast(Choices, completion.choices[0]) response_text: str = choice.message.content or "" self.last_llm_response = response_text self._history.append({"role": "assistant", "content": response_text}) return await synthesize(response_text, self.tts) chunk = await asyncio.wait_for(_run(), timeout=timeout) self._turn_output_emitted = True return chunkAncestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Subclasses
Class variables
var DEFAULT_SYSTEM_PROMPTvar capabilities : ClassVar[AdapterCapabilities]
Methods
async def connect(self) ‑> None-
No-op — no external transport to open.
Expand source code
async def connect(self) -> None: """No-op — no external transport to open.""" async def disconnect(self) ‑> None-
No-op — nothing to tear down.
Expand source code
async def disconnect(self) -> None: """No-op — nothing to tear down.""" async def recv_audio(self, timeout: float) ‑> AudioChunk-
Run the LLM on the current history, synthesise the response via TTS, and return the resulting AudioChunk.
timeoutis honoured for the combined LLM+TTS call viaasyncio.wait_for. Subsequent calls in the same turn (the defaultcall()drains until tail-silence) return an empty chunk so the drain loop exits without billing a second LLM round-trip — see_turn_output_emittedfor the guard contract.Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk: """ Run the LLM on the current history, synthesise the response via TTS, and return the resulting AudioChunk. ``timeout`` is honoured for the combined LLM+TTS call via ``asyncio.wait_for``. Subsequent calls in the same turn (the default ``call()`` drains until tail-silence) return an empty chunk so the drain loop exits without billing a second LLM round-trip — see ``_turn_output_emitted`` for the guard contract. """ if self._turn_output_emitted: return AudioChunk(data=b"") import asyncio async def _run() -> AudioChunk: import litellm # type: ignore from litellm.types.utils import Choices, ModelResponse from typing import cast as _cast from ..tts import synthesize completion = await litellm.acompletion( model=self.llm, messages=self._history, ) # Non-streaming acompletion returns ModelResponse with Choices; # cast satisfies pyright without runtime isinstance overhead. completion = _cast(ModelResponse, completion) choice = _cast(Choices, completion.choices[0]) response_text: str = choice.message.content or "" self.last_llm_response = response_text self._history.append({"role": "assistant", "content": response_text}) return await synthesize(response_text, self.tts) chunk = await asyncio.wait_for(_run(), timeout=timeout) self._turn_output_emitted = True return chunk async def send_audio(self, chunk: AudioChunk) ‑> None-
Transcribe the chunk via STT and store for the next recv_audio call.
Expand source code
async def send_audio(self, chunk: AudioChunk) -> None: """Transcribe the chunk via STT and store for the next recv_audio call.""" transcript = await self.stt.transcribe(chunk) self.last_user_transcript = transcript self._history.append({"role": "user", "content": transcript}) # New user turn → next recv_audio is allowed to synthesise. self._turn_output_emitted = False
Inherited members
class ElevenLabsAgentAdapter (agent_id: str, api_key: str, *, system_prompt_override: Optional[str] = None, first_message_override: Optional[str] = None)-
ElevenLabs hosted Conversational AI adapter.
Connects to ElevenLabs' hosted endpoint where the STT→LLM→TTS loop runs on their infrastructure. All audio is PCM16 @ 24kHz mono — no conversion needed at either edge.
Not to be confused with :class:
ElevenLabsVoiceAgent(inscenario.voice.adapters.composable), which is the typed composable preset that runs locally with separate STT, LLM, and TTS providers. The two complement each other:ElevenLabsAgentAdapter(this class): black-box hosted EL ConvAI; you provide anagent_idprovisioned in the EL dashboard and EL runs the whole pipeline server-side.- :class:
ElevenLabsVoiceAgent: composesElevenLabsSTTProvider+ any LLM + ElevenLabs TTS on your side; you control the prompts, model choice, and tool calls.
Intermediate transcripts are tracked on
last_user_transcriptandlast_agent_transcriptfor scenario observability.Example::
adapter = ElevenLabsAgentAdapter(agent_id="abc123", api_key="sk-...") async with adapter: # scenario.run() feeds send_audio / recv_audio ...Expand source code
class ElevenLabsAgentAdapter(VoiceAgentAdapter): """ ElevenLabs **hosted** Conversational AI adapter. Connects to ElevenLabs' hosted endpoint where the STT→LLM→TTS loop runs on their infrastructure. All audio is PCM16 @ 24kHz mono — no conversion needed at either edge. Not to be confused with :class:`ElevenLabsVoiceAgent` (in ``scenario.voice.adapters.composable``), which is the typed composable preset that runs locally with separate STT, LLM, and TTS providers. The two complement each other: - ``ElevenLabsAgentAdapter`` (this class): black-box hosted EL ConvAI; you provide an ``agent_id`` provisioned in the EL dashboard and EL runs the whole pipeline server-side. - :class:`ElevenLabsVoiceAgent`: composes ``ElevenLabsSTTProvider`` + any LLM + ElevenLabs TTS on your side; you control the prompts, model choice, and tool calls. Intermediate transcripts are tracked on ``last_user_transcript`` and ``last_agent_transcript`` for scenario observability. Example:: adapter = ElevenLabsAgentAdapter(agent_id="abc123", api_key="sk-...") async with adapter: # scenario.run() feeds send_audio / recv_audio ... """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=True, native_vad=True, dtmf=False, input_formats=["pcm16/24000"], output_formats=["pcm16/24000"], ) def __init__( self, agent_id: str, api_key: str, *, system_prompt_override: Optional[str] = None, first_message_override: Optional[str] = None, ) -> None: super().__init__() self.agent_id = agent_id self.api_key = api_key # Per-session overrides applied via conversation_initiation_client_data # at the start of every WS connect. Used by demos that need a # different prompt shape (e.g. verbose for interrupt demos) without # mutating the shared test agent's persistent config. self._system_prompt_override = system_prompt_override self._first_message_override = first_message_override self._ws: Any = None # Transcript observability — updated on each transcript event. self.last_user_transcript: Optional[str] = None self.last_agent_transcript: Optional[str] = None @property def url(self) -> str: return CONVAI_URL_TEMPLATE.format(agent_id=self.agent_id) def __repr__(self) -> str: # redact credentials return f"ElevenLabsAgentAdapter(agent_id={self.agent_id!r}, api_key='***')" # ------------------------------------------------------------------ lifecycle async def connect(self) -> None: """Open the WebSocket to ElevenLabs' ConvAI endpoint. We send ``conversation_initiation_client_data`` on every connect. The EL docs neither require nor forbid this (the reference SDK sample sends it unconditionally with an empty body); empirically we've seen ``first_message`` skipped on bare connects and reliably fire when the init message is sent, even with an empty override block. If EL's behavior changes, this is the first thing to revisit. """ import websockets self._ws = await websockets.connect( self.url, additional_headers={"xi-api-key": self.api_key}, ) logger.debug("ElevenLabsAgentAdapter: connected to %s", self.url) agent_override: dict[str, Any] = {} if self._system_prompt_override: agent_override["prompt"] = {"prompt": self._system_prompt_override} if self._first_message_override: agent_override["first_message"] = self._first_message_override init = { "type": "conversation_initiation_client_data", "conversation_config_override": {"agent": agent_override}, } await self._ws.send(json.dumps(init)) logger.debug( "ElevenLabsAgentAdapter: sent conversation_initiation_client_data with overrides=%s", list(agent_override.keys()) or "none", ) async def disconnect(self) -> None: """Close the WebSocket if open.""" if self._ws is not None: try: await self._ws.close() except Exception: # Best-effort: connection may already be half-closed or # in an error state when disconnect() is called. We're # tearing down regardless — propagating here would just # leak the WS reference. pass finally: self._ws = None logger.debug("ElevenLabsAgentAdapter: disconnected") # ------------------------------------------------------------------ I/O async def send_audio(self, chunk: AudioChunk) -> None: """Send a PCM16 audio chunk encoded as base64 in a JSON message. Empirically, EL ConvAI stops responding to subsequent turns if the client sends only a single chunk and never signals end of turn. The EL docs document no client-driven end-of-turn signal (server-side VAD is supposed to handle it) but in practice the VAD only fires after enough silence has been observed. We append a fixed-size tail of zero-bytes after every chunk to provide that silence signal. Tail size: 16000 zero bytes — empirically the sweet spot. - Removing the tail entirely: EL stops responding to user turns after the greeting. - Doubling to 24000 bytes (a "true 500ms" at the provisioned pcm_24000 rate): EL stops responding mid-conversation, same stall pattern. - 16000 bytes at pcm_24000 = ~333ms of silence: reliable. If EL ever exposes an explicit end-of-turn message we should switch to that instead. """ if self._ws is None: raise RuntimeError("ElevenLabsAgentAdapter: not connected") # 1. Speech. b64 = base64.b64encode(chunk.data).decode() await self._ws.send(json.dumps({"user_audio_chunk": b64})) # 2. Silence tail. See docstring for size rationale. silence = b"\x00" * 16000 silence_b64 = base64.b64encode(silence).decode() await self._ws.send(json.dumps({"user_audio_chunk": silence_b64})) async def recv_audio(self, timeout: float) -> AudioChunk: """ Receive the next audio chunk from ElevenLabs. ``timeout`` bounds **inter-message silence** — the maximum gap between any two received frames — NOT the total duration of the call. Every received frame (**keep-alive pings included**) resets the idle deadline, so this returns when an ``audio`` event arrives and raises :class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse with **no message of any kind**. Pings are replied to inline; transcript events update instance attributes for observability; all other event types are swallowed without error. Design decision (issue #493 — intentional, not an oversight): because a received ping is treated as proof of liveness, a hosted agent that keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will make this method wait **indefinitely**. There is deliberately **no total-duration ceiling** here — a legitimate 30s+ silent-but-pinging stretch must not abort the turn, which a cumulative budget would do. The caller's ``response_max_duration`` is checked *between* ``recv_audio`` calls and does **not** bound a single in-progress recv. (An absolute caller-side backstop for the wedged-agent case is tracked as a separate follow-up; it is intentionally not implemented here.) """ if self._ws is None: raise RuntimeError("ElevenLabsAgentAdapter: not connected") deadline = asyncio.get_running_loop().time() + timeout while True: remaining = deadline - asyncio.get_running_loop().time() if remaining <= 0: raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out") raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining) # A received message (ping included) proves the socket is alive, so # re-arm the idle deadline. Placed BEFORE json.loads so ANY frame — # even a non-JSON/malformed one — counts as a liveness signal. deadline = asyncio.get_running_loop().time() + timeout try: event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode()) except Exception: logger.debug("ElevenLabsAgentAdapter: non-JSON message, skipping") continue etype = event.get("type", "") logger.debug("ElevenLabsAgentAdapter: recv event %s", etype) if etype == "audio": audio_event = event.get("audio_event", {}) b64 = audio_event.get("audio_base_64", "") pcm = base64.b64decode(b64) # Ensure even byte count (PCM16 invariant). if len(pcm) % 2 == 1: pcm = pcm[:-1] return AudioChunk(data=pcm) elif etype == "ping": # Per EL docs, ping wire shape is # {"type": "ping", "ping_event": {"event_id": <int>, "ping_ms": <int>}} # Pong must echo the event_id at the top level. The # fallback to top-level event_id covers any older shape. ping_event = event.get("ping_event") or {} event_id = ping_event.get("event_id") if event_id is None: event_id = event.get("event_id") if event_id is None: logger.debug("ElevenLabsAgentAdapter: ping with no event_id, skipping pong: %r", event) continue pong = json.dumps({"type": "pong", "event_id": event_id}) await self._ws.send(pong) elif etype == "user_transcript": self.last_user_transcript = event.get("user_transcription_event", {}).get("user_transcript") elif etype == "agent_response": self.last_agent_transcript = event.get("agent_response_event", {}).get("agent_response") elif etype == "agent_response_correction": # EL signals a corrected agent reply (post server-side # barge-in detection). The corrected text replaces the # last_agent_transcript so consumers see what the agent # ACTUALLY said after our interrupt landed, not the # pre-correction draft. # # Wire shape: # {"type": "agent_response_correction", # "agent_response_correction_event": { # "original_agent_response": "...", # "corrected_agent_response": "..."}} correction = event.get("agent_response_correction_event", {}) or {} corrected = correction.get("corrected_agent_response") if corrected: self.last_agent_transcript = corrected elif etype == "conversation_initiation_metadata": # EL reports the agent's actual configured audio formats # here. Our adapter capabilities advertise pcm16/24000, # matching the test agent we provision. If a caller # points the adapter at an agent configured differently, # this is where the mismatch becomes visible — warn so # the codec mismatch is logged rather than silently # garbling audio. # # Wire shape (per docs): # {"type": "conversation_initiation_metadata", # "conversation_initiation_metadata_event": { # "conversation_id": "...", # "agent_output_audio_format": "pcm_24000", # "user_input_audio_format": "pcm_24000"}} meta = event.get("conversation_initiation_metadata_event", {}) or {} out_fmt = meta.get("agent_output_audio_format") in_fmt = meta.get("user_input_audio_format") if out_fmt and out_fmt != "pcm_24000": logger.warning( "ElevenLabsAgentAdapter: agent_output_audio_format=%r " "differs from advertised pcm16/24000 capability; " "audio may pitch-shift or fail to decode.", out_fmt, ) if in_fmt and in_fmt != "pcm_24000": logger.warning( "ElevenLabsAgentAdapter: user_input_audio_format=%r " "differs from advertised pcm16/24000 capability; " "the agent may not understand audio we send.", in_fmt, ) elif etype == "interruption": pass # documented non-audio event, no action needed else: logger.debug("ElevenLabsAgentAdapter: unknown event type %r, skipping", etype)Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Instance variables
var url : str-
Expand source code
@property def url(self) -> str: return CONVAI_URL_TEMPLATE.format(agent_id=self.agent_id)
Methods
async def connect(self) ‑> None-
Open the WebSocket to ElevenLabs' ConvAI endpoint.
We send
conversation_initiation_client_dataon every connect. The EL docs neither require nor forbid this (the reference SDK sample sends it unconditionally with an empty body); empirically we've seenfirst_messageskipped on bare connects and reliably fire when the init message is sent, even with an empty override block. If EL's behavior changes, this is the first thing to revisit.Expand source code
async def connect(self) -> None: """Open the WebSocket to ElevenLabs' ConvAI endpoint. We send ``conversation_initiation_client_data`` on every connect. The EL docs neither require nor forbid this (the reference SDK sample sends it unconditionally with an empty body); empirically we've seen ``first_message`` skipped on bare connects and reliably fire when the init message is sent, even with an empty override block. If EL's behavior changes, this is the first thing to revisit. """ import websockets self._ws = await websockets.connect( self.url, additional_headers={"xi-api-key": self.api_key}, ) logger.debug("ElevenLabsAgentAdapter: connected to %s", self.url) agent_override: dict[str, Any] = {} if self._system_prompt_override: agent_override["prompt"] = {"prompt": self._system_prompt_override} if self._first_message_override: agent_override["first_message"] = self._first_message_override init = { "type": "conversation_initiation_client_data", "conversation_config_override": {"agent": agent_override}, } await self._ws.send(json.dumps(init)) logger.debug( "ElevenLabsAgentAdapter: sent conversation_initiation_client_data with overrides=%s", list(agent_override.keys()) or "none", ) async def disconnect(self) ‑> None-
Close the WebSocket if open.
Expand source code
async def disconnect(self) -> None: """Close the WebSocket if open.""" if self._ws is not None: try: await self._ws.close() except Exception: # Best-effort: connection may already be half-closed or # in an error state when disconnect() is called. We're # tearing down regardless — propagating here would just # leak the WS reference. pass finally: self._ws = None logger.debug("ElevenLabsAgentAdapter: disconnected") async def recv_audio(self, timeout: float) ‑> AudioChunk-
Receive the next audio chunk from ElevenLabs.
timeoutbounds inter-message silence — the maximum gap between any two received frames — NOT the total duration of the call. Every received frame (keep-alive pings included) resets the idle deadline, so this returns when anaudioevent arrives and raises :class:asyncio.TimeoutErroronly aftertimeoutseconds elapse with no message of any kind. Pings are replied to inline; transcript events update instance attributes for observability; all other event types are swallowed without error.Design decision (issue #493 — intentional, not an oversight): because a received ping is treated as proof of liveness, a hosted agent that keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will make this method wait indefinitely. There is deliberately no total-duration ceiling here — a legitimate 30s+ silent-but-pinging stretch must not abort the turn, which a cumulative budget would do. The caller's
response_max_durationis checked betweenrecv_audiocalls and does not bound a single in-progress recv. (An absolute caller-side backstop for the wedged-agent case is tracked as a separate follow-up; it is intentionally not implemented here.)Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk: """ Receive the next audio chunk from ElevenLabs. ``timeout`` bounds **inter-message silence** — the maximum gap between any two received frames — NOT the total duration of the call. Every received frame (**keep-alive pings included**) resets the idle deadline, so this returns when an ``audio`` event arrives and raises :class:`asyncio.TimeoutError` only after ``timeout`` seconds elapse with **no message of any kind**. Pings are replied to inline; transcript events update instance attributes for observability; all other event types are swallowed without error. Design decision (issue #493 — intentional, not an oversight): because a received ping is treated as proof of liveness, a hosted agent that keeps pinging but never sends audio (e.g. a wedged tool/RAG call) will make this method wait **indefinitely**. There is deliberately **no total-duration ceiling** here — a legitimate 30s+ silent-but-pinging stretch must not abort the turn, which a cumulative budget would do. The caller's ``response_max_duration`` is checked *between* ``recv_audio`` calls and does **not** bound a single in-progress recv. (An absolute caller-side backstop for the wedged-agent case is tracked as a separate follow-up; it is intentionally not implemented here.) """ if self._ws is None: raise RuntimeError("ElevenLabsAgentAdapter: not connected") deadline = asyncio.get_running_loop().time() + timeout while True: remaining = deadline - asyncio.get_running_loop().time() if remaining <= 0: raise asyncio.TimeoutError("ElevenLabsAgentAdapter: recv_audio timed out") raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining) # A received message (ping included) proves the socket is alive, so # re-arm the idle deadline. Placed BEFORE json.loads so ANY frame — # even a non-JSON/malformed one — counts as a liveness signal. deadline = asyncio.get_running_loop().time() + timeout try: event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode()) except Exception: logger.debug("ElevenLabsAgentAdapter: non-JSON message, skipping") continue etype = event.get("type", "") logger.debug("ElevenLabsAgentAdapter: recv event %s", etype) if etype == "audio": audio_event = event.get("audio_event", {}) b64 = audio_event.get("audio_base_64", "") pcm = base64.b64decode(b64) # Ensure even byte count (PCM16 invariant). if len(pcm) % 2 == 1: pcm = pcm[:-1] return AudioChunk(data=pcm) elif etype == "ping": # Per EL docs, ping wire shape is # {"type": "ping", "ping_event": {"event_id": <int>, "ping_ms": <int>}} # Pong must echo the event_id at the top level. The # fallback to top-level event_id covers any older shape. ping_event = event.get("ping_event") or {} event_id = ping_event.get("event_id") if event_id is None: event_id = event.get("event_id") if event_id is None: logger.debug("ElevenLabsAgentAdapter: ping with no event_id, skipping pong: %r", event) continue pong = json.dumps({"type": "pong", "event_id": event_id}) await self._ws.send(pong) elif etype == "user_transcript": self.last_user_transcript = event.get("user_transcription_event", {}).get("user_transcript") elif etype == "agent_response": self.last_agent_transcript = event.get("agent_response_event", {}).get("agent_response") elif etype == "agent_response_correction": # EL signals a corrected agent reply (post server-side # barge-in detection). The corrected text replaces the # last_agent_transcript so consumers see what the agent # ACTUALLY said after our interrupt landed, not the # pre-correction draft. # # Wire shape: # {"type": "agent_response_correction", # "agent_response_correction_event": { # "original_agent_response": "...", # "corrected_agent_response": "..."}} correction = event.get("agent_response_correction_event", {}) or {} corrected = correction.get("corrected_agent_response") if corrected: self.last_agent_transcript = corrected elif etype == "conversation_initiation_metadata": # EL reports the agent's actual configured audio formats # here. Our adapter capabilities advertise pcm16/24000, # matching the test agent we provision. If a caller # points the adapter at an agent configured differently, # this is where the mismatch becomes visible — warn so # the codec mismatch is logged rather than silently # garbling audio. # # Wire shape (per docs): # {"type": "conversation_initiation_metadata", # "conversation_initiation_metadata_event": { # "conversation_id": "...", # "agent_output_audio_format": "pcm_24000", # "user_input_audio_format": "pcm_24000"}} meta = event.get("conversation_initiation_metadata_event", {}) or {} out_fmt = meta.get("agent_output_audio_format") in_fmt = meta.get("user_input_audio_format") if out_fmt and out_fmt != "pcm_24000": logger.warning( "ElevenLabsAgentAdapter: agent_output_audio_format=%r " "differs from advertised pcm16/24000 capability; " "audio may pitch-shift or fail to decode.", out_fmt, ) if in_fmt and in_fmt != "pcm_24000": logger.warning( "ElevenLabsAgentAdapter: user_input_audio_format=%r " "differs from advertised pcm16/24000 capability; " "the agent may not understand audio we send.", in_fmt, ) elif etype == "interruption": pass # documented non-audio event, no action needed else: logger.debug("ElevenLabsAgentAdapter: unknown event type %r, skipping", etype) async def send_audio(self, chunk: AudioChunk) ‑> None-
Send a PCM16 audio chunk encoded as base64 in a JSON message.
Empirically, EL ConvAI stops responding to subsequent turns if the client sends only a single chunk and never signals end of turn. The EL docs document no client-driven end-of-turn signal (server-side VAD is supposed to handle it) but in practice the VAD only fires after enough silence has been observed. We append a fixed-size tail of zero-bytes after every chunk to provide that silence signal.
Tail size: 16000 zero bytes — empirically the sweet spot. - Removing the tail entirely: EL stops responding to user turns after the greeting. - Doubling to 24000 bytes (a "true 500ms" at the provisioned pcm_24000 rate): EL stops responding mid-conversation, same stall pattern. - 16000 bytes at pcm_24000 = ~333ms of silence: reliable.
If EL ever exposes an explicit end-of-turn message we should switch to that instead.
Expand source code
async def send_audio(self, chunk: AudioChunk) -> None: """Send a PCM16 audio chunk encoded as base64 in a JSON message. Empirically, EL ConvAI stops responding to subsequent turns if the client sends only a single chunk and never signals end of turn. The EL docs document no client-driven end-of-turn signal (server-side VAD is supposed to handle it) but in practice the VAD only fires after enough silence has been observed. We append a fixed-size tail of zero-bytes after every chunk to provide that silence signal. Tail size: 16000 zero bytes — empirically the sweet spot. - Removing the tail entirely: EL stops responding to user turns after the greeting. - Doubling to 24000 bytes (a "true 500ms" at the provisioned pcm_24000 rate): EL stops responding mid-conversation, same stall pattern. - 16000 bytes at pcm_24000 = ~333ms of silence: reliable. If EL ever exposes an explicit end-of-turn message we should switch to that instead. """ if self._ws is None: raise RuntimeError("ElevenLabsAgentAdapter: not connected") # 1. Speech. b64 = base64.b64encode(chunk.data).decode() await self._ws.send(json.dumps({"user_audio_chunk": b64})) # 2. Silence tail. See docstring for size rationale. silence = b"\x00" * 16000 silence_b64 = base64.b64encode(silence).decode() await self._ws.send(json.dumps({"user_audio_chunk": silence_b64}))
Inherited members
class ElevenLabsVoiceAgent (api_key: str, *, llm: str = 'openai/gpt-5.4-mini', voice: Optional[str] = None, stt: Optional[STTProvider] = None, system_prompt: Optional[str] = None)-
Composable voice agent with ElevenLabs-opinionated defaults.
Not to be confused with :class:
ElevenLabsAgentAdapter(inscenario.voice.adapters.elevenlabs) — that one talks to ElevenLabs' hosted Conversational AI endpoint where EL runs the full STT→LLM→TTS loop. This class is local: you composeElevenLabsSTTProvider+ any LLM + ElevenLabs TTS yourself, keeping full control over prompts, model choice, and tool calls.Instantiate with just an
api_keyto get an ElevenLabs STT + LLM (defaultCOMPOSABLE_VOICE_LLM_MODEL) +elevenlabs/rachelTTS stack. Each piece can be overridden independently without changing the others.Example::
# Defaults — all ElevenLabs STT, GPT-4o-mini, ElevenLabs TTS agent = ElevenLabsVoiceAgent(api_key="sk-...") # Override just the LLM agent = ElevenLabsVoiceAgent(api_key="sk-...", llm="openai/gpt-4o") # Bring your own STT agent = ElevenLabsVoiceAgent(api_key="sk-...", stt=MyCustomSTT())Args
api_key- ElevenLabs API key. Redacted in
__repr__. llm- litellm-style model identifier. Defaults to
COMPOSABLE_VOICE_LLM_MODEL. voice- TTS voice string in
"elevenlabs/<voice_id>"format. Defaults to theELEVENLABS_VOICE_IDenvironment variable when set, otherwise falls back to "Sarah" ("elevenlabs/EXAVITQu4vr4xnSDxMaL") — premade and accessible on the ElevenLabs free tier as of 2026-05. Other premade voices (e.g. "Rachel"21m00Tcm4TlvDq8ikWAM) returned 402 paid_plan_required from the EL TTS API; gating differs per voice. SetELEVENLABS_VOICE_IDto override. stt- STTProvider override. Defaults to
ElevenLabsSTTProvider(api_key=api_key). system_prompt- Optional system prompt. Defaults to
ComposableVoiceAgent.DEFAULT_SYSTEM_PROMPT.
Expand source code
class ElevenLabsVoiceAgent(ComposableVoiceAgent): """ Composable voice agent with ElevenLabs-opinionated defaults. Not to be confused with :class:`ElevenLabsAgentAdapter` (in ``scenario.voice.adapters.elevenlabs``) — that one talks to ElevenLabs' **hosted** Conversational AI endpoint where EL runs the full STT→LLM→TTS loop. This class is local: you compose ``ElevenLabsSTTProvider`` + any LLM + ElevenLabs TTS yourself, keeping full control over prompts, model choice, and tool calls. Instantiate with just an ``api_key`` to get an ElevenLabs STT + LLM (default ``COMPOSABLE_VOICE_LLM_MODEL``) + ``elevenlabs/rachel`` TTS stack. Each piece can be overridden independently without changing the others. Example:: # Defaults — all ElevenLabs STT, GPT-4o-mini, ElevenLabs TTS agent = ElevenLabsVoiceAgent(api_key="sk-...") # Override just the LLM agent = ElevenLabsVoiceAgent(api_key="sk-...", llm="openai/gpt-4o") # Bring your own STT agent = ElevenLabsVoiceAgent(api_key="sk-...", stt=MyCustomSTT()) """ def __init__( self, api_key: str, *, llm: str = COMPOSABLE_VOICE_LLM_MODEL, voice: Optional[str] = None, stt: Optional[STTProvider] = None, system_prompt: Optional[str] = None, ) -> None: """ Args: api_key: ElevenLabs API key. Redacted in ``__repr__``. llm: litellm-style model identifier. Defaults to ``COMPOSABLE_VOICE_LLM_MODEL``. voice: TTS voice string in ``"elevenlabs/<voice_id>"`` format. Defaults to the ``ELEVENLABS_VOICE_ID`` environment variable when set, otherwise falls back to "Sarah" (``"elevenlabs/EXAVITQu4vr4xnSDxMaL"``) — premade and accessible on the ElevenLabs free tier as of 2026-05. Other premade voices (e.g. "Rachel" ``21m00Tcm4TlvDq8ikWAM``) returned 402 paid_plan_required from the EL TTS API; gating differs per voice. Set ``ELEVENLABS_VOICE_ID`` to override. stt: STTProvider override. Defaults to ``ElevenLabsSTTProvider(api_key=api_key)``. system_prompt: Optional system prompt. Defaults to ``ComposableVoiceAgent.DEFAULT_SYSTEM_PROMPT``. """ import os if voice is None: env_voice_id = os.environ.get("ELEVENLABS_VOICE_ID") voice = ( f"elevenlabs/{env_voice_id}" if env_voice_id else "elevenlabs/EXAVITQu4vr4xnSDxMaL" # "Sarah" — free-tier premade ) resolved_stt = stt if stt is not None else ElevenLabsSTTProvider(api_key=api_key) super().__init__(stt=resolved_stt, llm=llm, tts=voice, system_prompt=system_prompt) self._api_key = api_key self.voice = voice def __repr__(self) -> str: # redact credentials return ( f"ElevenLabsVoiceAgent(" f"api_key='***', llm={self.llm!r}, voice={self.voice!r})" )Ancestors
Inherited members
class GeminiLiveAgentAdapter (model: str = 'gemini-2.5-flash-native-audio-latest', voice: str = 'Algieba', system_instruction: str = '', api_key: Optional[str] = None)-
Gemini Live native-audio adapter.
Connects directly to the Gemini Live API via the official
google-genaiSDK. STT, LLM, and TTS all run on Google's infrastructure; audio flows bidirectionally as raw PCM16.Example::
adapter = GeminiLiveAgentAdapter( model=GEMINI_LIVE_MODEL, system_instruction="You are a helpful assistant.", ) async with adapter: # scenario.run() feeds send_audio / recv_audio ...Attributes
last_agent_transcript- Most-recent output transcript received from the server (if transcription is available), for observability.
Expand source code
class GeminiLiveAgentAdapter(VoiceAgentAdapter): """ Gemini Live native-audio adapter. Connects directly to the Gemini Live API via the official ``google-genai`` SDK. STT, LLM, and TTS all run on Google's infrastructure; audio flows bidirectionally as raw PCM16. Example:: adapter = GeminiLiveAgentAdapter( model=GEMINI_LIVE_MODEL, system_instruction="You are a helpful assistant.", ) async with adapter: # scenario.run() feeds send_audio / recv_audio ... Attributes: last_agent_transcript: Most-recent output transcript received from the server (if transcription is available), for observability. """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=True, native_vad=True, dtmf=False, # ``interruption=True``: with explicit Activity markers and # ``activity_handling=START_OF_ACTIVITY_INTERRUPTS`` (see # ``connect``), the next ``activity_start`` we send while the # model is replying causes Gemini to cut its in-flight audio. # ``interrupt()`` itself just drains stale chunks out of the # local queue so the recovery agent turn doesn't replay them. interruption=True, input_formats=["pcm16/16000"], output_formats=["pcm16/24000"], ) def __init__( self, model: str = GEMINI_LIVE_MODEL, voice: str = "Algieba", system_instruction: str = "", api_key: Optional[str] = None, ) -> None: super().__init__() self.model = model self.voice = voice self.system_instruction = system_instruction # Resolve key: explicit arg > env var. self._api_key: str = api_key or os.environ.get("GEMINI_API_KEY", "") # Populated when the background session task is live. self._session: Optional[Any] = None self._session_task: Optional[asyncio.Task[None]] = None self._session_ready: Optional[asyncio.Event] = None self._shutdown: Optional[asyncio.Event] = None self._session_error: Optional[BaseException] = None # Cached async iterator on ``session.receive()``. Acquired lazily # on the first ``recv_audio`` call so we can iterate the same # stream across consecutive agent turns. Without caching, each # ``recv_audio`` would call ``session.receive()`` afresh — which # the SDK does not support cleanly across turns. self._recv_iter: Optional[Any] = None # Tracks whether any audio was received on the CURRENT iterator # (reset whenever ``_recv_iter`` is recreated). Used by # ``recv_audio`` to distinguish a spurious empty-interrupt turn # (no audio at all) from a real mid-reply interrupt (audio # arrived before the interrupt landed). self._iter_had_audio: bool = False # Observability. self.last_agent_transcript: Optional[str] = None def __repr__(self) -> str: # Never leak the API key. masked = "***" if self._api_key else "" return ( f"GeminiLiveAgentAdapter(" f"model={self.model!r}, " f"voice={self.voice!r}, " f"api_key={masked!r})" ) # ------------------------------------------------------------------ lifecycle async def connect(self) -> None: """Open a Gemini Live session. Spawns a background task that holds the ``async with`` SDK context open for the adapter's lifetime. Returns once the session handshake is complete and audio can flow. """ from google import genai # type: ignore[attr-defined] # noqa: PLC0415 — lazy import from google.genai import types # noqa: PLC0415 self._session_ready = asyncio.Event() self._shutdown = asyncio.Event() loop = asyncio.get_running_loop() session_future: asyncio.Future[Any] = loop.create_future() config = types.LiveConnectConfig( response_modalities=[types.Modality.AUDIO], system_instruction=self.system_instruction or None, speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig( voice_name=self.voice, ) ) ), # Disable Automatic Activity Detection. AAD requires a clean # trailing silence to fire its end-of-speech detector, which is # unreliable across the audio shapes scenario produces (TTS'd # user-sim audio, scripted clips, layered interruptions). With # AAD off we drive turn boundaries explicitly via # ``activity_start`` / ``activity_end`` in ``send_audio``; # Gemini replies the moment we close the turn instead of # waiting on its own VAD heuristic. Activity handling is left # at its default (START_OF_ACTIVITY_INTERRUPTS): when we send # a new ``activity_start`` while Gemini is mid-reply, the # model treats it as a barge-in and cuts its in-flight audio. # # Subtlety: even after ``generation_complete`` on turn N, the # next ``activity_start`` opening turn N+1 is still treated as # a barge-in on the just-completed turn. The server emits a # spurious ``interrupted → turn_complete`` pair (with no model # output) BEFORE actually producing turn N+1's reply. The # ``recv_audio`` loop transparently skips that empty pair and # re-enters ``session.receive()`` to read the real reply. realtime_input_config=types.RealtimeInputConfig( automatic_activity_detection=types.AutomaticActivityDetection( disabled=True, ), ), # Enable transcripts so the recv loop can populate # last_agent_transcript / chunk.transcript. Without these, # audio still flows but consumers (judge, manifest) get # no readable text. input_audio_transcription=types.AudioTranscriptionConfig(), output_audio_transcription=types.AudioTranscriptionConfig(), ) client = genai.Client(api_key=self._api_key) async def _session_lifetime() -> None: """Hold the SDK context manager open; expose session via future.""" try: async with client.aio.live.connect( model=self.model, config=config ) as session: if not session_future.done(): session_future.set_result(session) assert self._session_ready is not None self._session_ready.set() # Stay alive until disconnect() fires the shutdown event. assert self._shutdown is not None await self._shutdown.wait() except Exception as exc: self._session_error = exc if not session_future.done(): session_future.set_exception(exc) assert self._session_ready is not None self._session_ready.set() # unblock connect() even on error self._session_task = asyncio.create_task(_session_lifetime()) # Wait until the session is ready (or errored). assert self._session_ready is not None await self._session_ready.wait() if self._session_error is not None: raise self._session_error self._session = await session_future self._recv_iter = None logger.debug("GeminiLiveAgentAdapter: connected model=%s", self.model) async def disconnect(self) -> None: """Close the Gemini Live session.""" if self._recv_iter is not None: try: await self._recv_iter.aclose() # type: ignore[attr-defined] except Exception: # Best-effort teardown: the iterator may already be # closed or in an invalid state during shutdown. Any # exception here is non-actionable since we're tearing # down anyway. pass self._recv_iter = None if self._shutdown is not None: self._shutdown.set() if self._session_task is not None: try: await asyncio.wait_for(self._session_task, timeout=5.0) except (asyncio.TimeoutError, Exception): # Timeout: task didn't finish in 5s — proceed with # teardown anyway, can't block disconnect indefinitely. # Other Exception: task error during shutdown is # non-actionable; we're discarding the session. pass self._session = None self._session_task = None self._session_ready = None self._shutdown = None self._session_error = None logger.debug("GeminiLiveAgentAdapter: disconnected") # ------------------------------------------------------------------ I/O async def send_audio(self, chunk: AudioChunk) -> None: """Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn. Resamples from 24kHz → 16kHz at the wire boundary so the adapter speaks Gemini's expected ``audio/pcm;rate=16000`` format while the rest of the framework stays at the canonical 24kHz. Wraps the audio in explicit ``activity_start`` / ``activity_end`` markers because we connect with Automatic Activity Detection disabled (see ``connect``). Each ``send_audio`` call is therefore a complete user turn from Gemini's perspective: it triggers the model to reply immediately on ``activity_end`` instead of waiting on its own VAD heuristic to detect end-of-speech. This is critical for the interrupt path — when the user barges in, we send a fresh turn boundary on top of the agent's in-flight reply, which Gemini treats as a deterministic interruption signal. """ if self._session is None: raise RuntimeError("GeminiLiveAgentAdapter: not connected") from google.genai import types # noqa: PLC0415 pcm_16k = _resample_pcm16(chunk.data, CANONICAL_RATE, GEMINI_INPUT_RATE) if not pcm_16k: return # New user turn → reset transcript and the per-turn receive # iterator so the next ``recv_audio`` enters # ``session.receive()`` fresh for this turn. self._reset_turn_transcript() if self._recv_iter is not None: try: await self._recv_iter.aclose() # type: ignore[attr-defined] except Exception: # Best-effort: prior turn's receive iterator may already be # closed or in an error state. We're resetting to start a new # turn — propagating here would block legitimate new turns. pass self._recv_iter = None await self._session.send_realtime_input(activity_start=types.ActivityStart()) blob = types.Blob( data=pcm_16k, mime_type="audio/pcm;rate=16000", ) await self._session.send_realtime_input(audio=blob) await self._session.send_realtime_input(activity_end=types.ActivityEnd()) async def recv_audio(self, timeout: float) -> AudioChunk: """Receive the next audio fragment from Gemini Live for the current turn. The SDK's ``session.receive()`` async generator yields messages for ONE model turn then stops at ``turn_complete``. We cache the per-turn iterator on ``self._recv_iter`` and reset it when the previous turn ended (StopAsyncIteration), so each user turn sent via ``send_audio`` can read its full reply across multiple ``recv_audio`` calls without us re-entering ``session.receive()`` mid-turn (which would skip messages already buffered server-side). Returns the next non-empty audio chunk as soon as it arrives so the executor's ``_drain_agent_response`` can set ``_agent_speaking_event`` early — the interruption path depends on this. On ``turn_complete`` returns an empty AudioChunk so the drain loop's tail-silence path exits. Raises ``asyncio.TimeoutError`` if no chunk arrives within ``timeout`` seconds. """ if self._session is None: raise RuntimeError("GeminiLiveAgentAdapter: not connected") if self._recv_iter is None: self._recv_iter = self._session.receive().__aiter__() # type: ignore[union-attr] self._iter_had_audio = False async def _next_chunk() -> AudioChunk: pending_delta = "" # Local-to-call: detects the spurious empty-interrupt turn # pattern (server emits ``interrupted=True`` then # ``turn_complete=True`` with no audio at all when a fresh # ``activity_start`` arrives during turn N's post- # ``generation_complete`` playback delay). Combined with # ``self._iter_had_audio`` (iterator-scope) we can tell the # difference between a spurious turn (no audio ever, on this # iterator) and a real mid-reply interrupt (audio arrived # earlier on this iterator). saw_interrupted = False while True: try: assert self._recv_iter is not None message = await self._recv_iter.__anext__() # type: ignore[union-attr] except StopAsyncIteration: # The previous turn ended (turn_complete already # consumed). Surface end-of-turn to the drain loop # and reset the iterator so the next user turn # can re-enter session.receive() afresh. self._recv_iter = None return AudioChunk( data=b"", transcript=pending_delta or None, ) if message.go_away is not None: raise RuntimeError( f"GeminiLiveAgentAdapter: server sent go_away: {message.go_away}" ) sc = message.server_content if sc is None: continue if getattr(sc, "interrupted", None): saw_interrupted = True if sc.output_transcription is not None: transcript_text = getattr(sc.output_transcription, "text", None) if transcript_text: pending_delta += transcript_text existing = self.last_agent_transcript or "" self.last_agent_transcript = existing + transcript_text if sc.model_turn is not None and sc.model_turn.parts: audio_bytes = b"" for part in sc.model_turn.parts: if part.inline_data is not None and part.inline_data.data: audio_bytes += part.inline_data.data if audio_bytes: if len(audio_bytes) % 2 == 1: audio_bytes = audio_bytes[:-1] if audio_bytes: self._iter_had_audio = True return AudioChunk( data=audio_bytes, transcript=pending_delta or None, ) if sc.turn_complete: # Spurious empty-interrupt turn? When activity_start # opens turn N+1 after turn N's generation_complete, # the server emits ``interrupted → turn_complete`` with # no audio FIRST, then the real reply in a separate # turn. Detect that pattern (saw interrupted=True, no # audio on THIS iterator, no transcript) and re-enter # ``session.receive()`` to read the actual reply. # # We gate on ``self._iter_had_audio`` (iterator-scope) # rather than this call's audio: a real mid-reply # interrupt earlier in the same turn would have yielded # audio chunks before this point, even if THIS call sees # only the trailing ``interrupted → turn_complete`` pair. if ( saw_interrupted and not self._iter_had_audio and not pending_delta ): self._recv_iter = self._session.receive().__aiter__() # type: ignore[union-attr] self._iter_had_audio = False saw_interrupted = False continue # Real end-of-turn — yield empty AudioChunk and reset # the iterator. The next ``recv_audio`` call (for the # next user turn) will re-enter ``session.receive()``. self._recv_iter = None return AudioChunk( data=b"", transcript=pending_delta or None, ) return await asyncio.wait_for(_next_chunk(), timeout=timeout) async def interrupt(self) -> None: """Drain leftover chunks from the in-flight agent turn so the recovery agent's ``recv_audio`` doesn't pick them up as a fake first reply. On Gemini Live, when we send a fresh ``activity_start`` (the next ``send_audio``) while the model is mid-reply, the server cuts its in-flight audio AND emits ``turn_complete`` for that cancelled turn. ``session.receive()`` is a one-turn generator, so the cancelled turn's tail messages still need to be consumed before the next ``session.receive()`` invocation can read the recovery turn cleanly. ``interrupt()`` consumes them up to that ``turn_complete`` and resets the cached iterator. Best-effort: bounded by 2 seconds so a stuck stream doesn't block the executor's interrupt sequence. """ if self._session is None or self._recv_iter is None: return try: async with asyncio.timeout(2.0): while True: try: message = await self._recv_iter.__anext__() # type: ignore[union-attr] except StopAsyncIteration: break sc = getattr(message, "server_content", None) if sc is not None and sc.turn_complete: break except asyncio.TimeoutError: # Bounded drain: if the server doesn't close out the turn within # 2s after we sent the activity_end, give up and proceed. The # finally block will still close the iterator. pass finally: try: await self._recv_iter.aclose() # type: ignore[attr-defined] except Exception: # Best-effort close; the iterator may already be exhausted # or in an error state. Don't mask the original outcome. pass self._recv_iter = None def _reset_turn_transcript(self) -> None: """Clear the running transcript before each new agent turn. Called from ``send_audio`` so each turn starts fresh — otherwise the agent's first reply text would be permanently prefixed onto all subsequent turns' transcripts. """ self.last_agent_transcript = NoneAncestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Methods
async def connect(self) ‑> None-
Open a Gemini Live session.
Spawns a background task that holds the
async withSDK context open for the adapter's lifetime. Returns once the session handshake is complete and audio can flow.Expand source code
async def connect(self) -> None: """Open a Gemini Live session. Spawns a background task that holds the ``async with`` SDK context open for the adapter's lifetime. Returns once the session handshake is complete and audio can flow. """ from google import genai # type: ignore[attr-defined] # noqa: PLC0415 — lazy import from google.genai import types # noqa: PLC0415 self._session_ready = asyncio.Event() self._shutdown = asyncio.Event() loop = asyncio.get_running_loop() session_future: asyncio.Future[Any] = loop.create_future() config = types.LiveConnectConfig( response_modalities=[types.Modality.AUDIO], system_instruction=self.system_instruction or None, speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig( voice_name=self.voice, ) ) ), # Disable Automatic Activity Detection. AAD requires a clean # trailing silence to fire its end-of-speech detector, which is # unreliable across the audio shapes scenario produces (TTS'd # user-sim audio, scripted clips, layered interruptions). With # AAD off we drive turn boundaries explicitly via # ``activity_start`` / ``activity_end`` in ``send_audio``; # Gemini replies the moment we close the turn instead of # waiting on its own VAD heuristic. Activity handling is left # at its default (START_OF_ACTIVITY_INTERRUPTS): when we send # a new ``activity_start`` while Gemini is mid-reply, the # model treats it as a barge-in and cuts its in-flight audio. # # Subtlety: even after ``generation_complete`` on turn N, the # next ``activity_start`` opening turn N+1 is still treated as # a barge-in on the just-completed turn. The server emits a # spurious ``interrupted → turn_complete`` pair (with no model # output) BEFORE actually producing turn N+1's reply. The # ``recv_audio`` loop transparently skips that empty pair and # re-enters ``session.receive()`` to read the real reply. realtime_input_config=types.RealtimeInputConfig( automatic_activity_detection=types.AutomaticActivityDetection( disabled=True, ), ), # Enable transcripts so the recv loop can populate # last_agent_transcript / chunk.transcript. Without these, # audio still flows but consumers (judge, manifest) get # no readable text. input_audio_transcription=types.AudioTranscriptionConfig(), output_audio_transcription=types.AudioTranscriptionConfig(), ) client = genai.Client(api_key=self._api_key) async def _session_lifetime() -> None: """Hold the SDK context manager open; expose session via future.""" try: async with client.aio.live.connect( model=self.model, config=config ) as session: if not session_future.done(): session_future.set_result(session) assert self._session_ready is not None self._session_ready.set() # Stay alive until disconnect() fires the shutdown event. assert self._shutdown is not None await self._shutdown.wait() except Exception as exc: self._session_error = exc if not session_future.done(): session_future.set_exception(exc) assert self._session_ready is not None self._session_ready.set() # unblock connect() even on error self._session_task = asyncio.create_task(_session_lifetime()) # Wait until the session is ready (or errored). assert self._session_ready is not None await self._session_ready.wait() if self._session_error is not None: raise self._session_error self._session = await session_future self._recv_iter = None logger.debug("GeminiLiveAgentAdapter: connected model=%s", self.model) async def disconnect(self) ‑> None-
Close the Gemini Live session.
Expand source code
async def disconnect(self) -> None: """Close the Gemini Live session.""" if self._recv_iter is not None: try: await self._recv_iter.aclose() # type: ignore[attr-defined] except Exception: # Best-effort teardown: the iterator may already be # closed or in an invalid state during shutdown. Any # exception here is non-actionable since we're tearing # down anyway. pass self._recv_iter = None if self._shutdown is not None: self._shutdown.set() if self._session_task is not None: try: await asyncio.wait_for(self._session_task, timeout=5.0) except (asyncio.TimeoutError, Exception): # Timeout: task didn't finish in 5s — proceed with # teardown anyway, can't block disconnect indefinitely. # Other Exception: task error during shutdown is # non-actionable; we're discarding the session. pass self._session = None self._session_task = None self._session_ready = None self._shutdown = None self._session_error = None logger.debug("GeminiLiveAgentAdapter: disconnected") async def interrupt(self) ‑> None-
Drain leftover chunks from the in-flight agent turn so the recovery agent's
recv_audiodoesn't pick them up as a fake first reply.On Gemini Live, when we send a fresh
activity_start(the nextsend_audio) while the model is mid-reply, the server cuts its in-flight audio AND emitsturn_completefor that cancelled turn.session.receive()is a one-turn generator, so the cancelled turn's tail messages still need to be consumed before the nextsession.receive()invocation can read the recovery turn cleanly.interrupt()consumes them up to thatturn_completeand resets the cached iterator.Best-effort: bounded by 2 seconds so a stuck stream doesn't block the executor's interrupt sequence.
Expand source code
async def interrupt(self) -> None: """Drain leftover chunks from the in-flight agent turn so the recovery agent's ``recv_audio`` doesn't pick them up as a fake first reply. On Gemini Live, when we send a fresh ``activity_start`` (the next ``send_audio``) while the model is mid-reply, the server cuts its in-flight audio AND emits ``turn_complete`` for that cancelled turn. ``session.receive()`` is a one-turn generator, so the cancelled turn's tail messages still need to be consumed before the next ``session.receive()`` invocation can read the recovery turn cleanly. ``interrupt()`` consumes them up to that ``turn_complete`` and resets the cached iterator. Best-effort: bounded by 2 seconds so a stuck stream doesn't block the executor's interrupt sequence. """ if self._session is None or self._recv_iter is None: return try: async with asyncio.timeout(2.0): while True: try: message = await self._recv_iter.__anext__() # type: ignore[union-attr] except StopAsyncIteration: break sc = getattr(message, "server_content", None) if sc is not None and sc.turn_complete: break except asyncio.TimeoutError: # Bounded drain: if the server doesn't close out the turn within # 2s after we sent the activity_end, give up and proceed. The # finally block will still close the iterator. pass finally: try: await self._recv_iter.aclose() # type: ignore[attr-defined] except Exception: # Best-effort close; the iterator may already be exhausted # or in an error state. Don't mask the original outcome. pass self._recv_iter = None async def recv_audio(self, timeout: float) ‑> AudioChunk-
Receive the next audio fragment from Gemini Live for the current turn.
The SDK's
session.receive()async generator yields messages for ONE model turn then stops atturn_complete. We cache the per-turn iterator onself._recv_iterand reset it when the previous turn ended (StopAsyncIteration), so each user turn sent viasend_audiocan read its full reply across multiplerecv_audiocalls without us re-enteringsession.receive()mid-turn (which would skip messages already buffered server-side).Returns the next non-empty audio chunk as soon as it arrives so the executor's
_drain_agent_responsecan set_agent_speaking_eventearly — the interruption path depends on this.On
turn_completereturns an empty AudioChunk so the drain loop's tail-silence path exits.Raises
asyncio.TimeoutErrorif no chunk arrives withintimeoutseconds.Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk: """Receive the next audio fragment from Gemini Live for the current turn. The SDK's ``session.receive()`` async generator yields messages for ONE model turn then stops at ``turn_complete``. We cache the per-turn iterator on ``self._recv_iter`` and reset it when the previous turn ended (StopAsyncIteration), so each user turn sent via ``send_audio`` can read its full reply across multiple ``recv_audio`` calls without us re-entering ``session.receive()`` mid-turn (which would skip messages already buffered server-side). Returns the next non-empty audio chunk as soon as it arrives so the executor's ``_drain_agent_response`` can set ``_agent_speaking_event`` early — the interruption path depends on this. On ``turn_complete`` returns an empty AudioChunk so the drain loop's tail-silence path exits. Raises ``asyncio.TimeoutError`` if no chunk arrives within ``timeout`` seconds. """ if self._session is None: raise RuntimeError("GeminiLiveAgentAdapter: not connected") if self._recv_iter is None: self._recv_iter = self._session.receive().__aiter__() # type: ignore[union-attr] self._iter_had_audio = False async def _next_chunk() -> AudioChunk: pending_delta = "" # Local-to-call: detects the spurious empty-interrupt turn # pattern (server emits ``interrupted=True`` then # ``turn_complete=True`` with no audio at all when a fresh # ``activity_start`` arrives during turn N's post- # ``generation_complete`` playback delay). Combined with # ``self._iter_had_audio`` (iterator-scope) we can tell the # difference between a spurious turn (no audio ever, on this # iterator) and a real mid-reply interrupt (audio arrived # earlier on this iterator). saw_interrupted = False while True: try: assert self._recv_iter is not None message = await self._recv_iter.__anext__() # type: ignore[union-attr] except StopAsyncIteration: # The previous turn ended (turn_complete already # consumed). Surface end-of-turn to the drain loop # and reset the iterator so the next user turn # can re-enter session.receive() afresh. self._recv_iter = None return AudioChunk( data=b"", transcript=pending_delta or None, ) if message.go_away is not None: raise RuntimeError( f"GeminiLiveAgentAdapter: server sent go_away: {message.go_away}" ) sc = message.server_content if sc is None: continue if getattr(sc, "interrupted", None): saw_interrupted = True if sc.output_transcription is not None: transcript_text = getattr(sc.output_transcription, "text", None) if transcript_text: pending_delta += transcript_text existing = self.last_agent_transcript or "" self.last_agent_transcript = existing + transcript_text if sc.model_turn is not None and sc.model_turn.parts: audio_bytes = b"" for part in sc.model_turn.parts: if part.inline_data is not None and part.inline_data.data: audio_bytes += part.inline_data.data if audio_bytes: if len(audio_bytes) % 2 == 1: audio_bytes = audio_bytes[:-1] if audio_bytes: self._iter_had_audio = True return AudioChunk( data=audio_bytes, transcript=pending_delta or None, ) if sc.turn_complete: # Spurious empty-interrupt turn? When activity_start # opens turn N+1 after turn N's generation_complete, # the server emits ``interrupted → turn_complete`` with # no audio FIRST, then the real reply in a separate # turn. Detect that pattern (saw interrupted=True, no # audio on THIS iterator, no transcript) and re-enter # ``session.receive()`` to read the actual reply. # # We gate on ``self._iter_had_audio`` (iterator-scope) # rather than this call's audio: a real mid-reply # interrupt earlier in the same turn would have yielded # audio chunks before this point, even if THIS call sees # only the trailing ``interrupted → turn_complete`` pair. if ( saw_interrupted and not self._iter_had_audio and not pending_delta ): self._recv_iter = self._session.receive().__aiter__() # type: ignore[union-attr] self._iter_had_audio = False saw_interrupted = False continue # Real end-of-turn — yield empty AudioChunk and reset # the iterator. The next ``recv_audio`` call (for the # next user turn) will re-enter ``session.receive()``. self._recv_iter = None return AudioChunk( data=b"", transcript=pending_delta or None, ) return await asyncio.wait_for(_next_chunk(), timeout=timeout) async def send_audio(self, chunk: AudioChunk) ‑> None-
Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn.
Resamples from 24kHz → 16kHz at the wire boundary so the adapter speaks Gemini's expected
audio/pcm;rate=16000format while the rest of the framework stays at the canonical 24kHz.Wraps the audio in explicit
activity_start/activity_endmarkers because we connect with Automatic Activity Detection disabled (seeconnect). Eachsend_audiocall is therefore a complete user turn from Gemini's perspective: it triggers the model to reply immediately onactivity_endinstead of waiting on its own VAD heuristic to detect end-of-speech. This is critical for the interrupt path — when the user barges in, we send a fresh turn boundary on top of the agent's in-flight reply, which Gemini treats as a deterministic interruption signal.Expand source code
async def send_audio(self, chunk: AudioChunk) -> None: """Send a canonical 24kHz AudioChunk to Gemini Live as a complete turn. Resamples from 24kHz → 16kHz at the wire boundary so the adapter speaks Gemini's expected ``audio/pcm;rate=16000`` format while the rest of the framework stays at the canonical 24kHz. Wraps the audio in explicit ``activity_start`` / ``activity_end`` markers because we connect with Automatic Activity Detection disabled (see ``connect``). Each ``send_audio`` call is therefore a complete user turn from Gemini's perspective: it triggers the model to reply immediately on ``activity_end`` instead of waiting on its own VAD heuristic to detect end-of-speech. This is critical for the interrupt path — when the user barges in, we send a fresh turn boundary on top of the agent's in-flight reply, which Gemini treats as a deterministic interruption signal. """ if self._session is None: raise RuntimeError("GeminiLiveAgentAdapter: not connected") from google.genai import types # noqa: PLC0415 pcm_16k = _resample_pcm16(chunk.data, CANONICAL_RATE, GEMINI_INPUT_RATE) if not pcm_16k: return # New user turn → reset transcript and the per-turn receive # iterator so the next ``recv_audio`` enters # ``session.receive()`` fresh for this turn. self._reset_turn_transcript() if self._recv_iter is not None: try: await self._recv_iter.aclose() # type: ignore[attr-defined] except Exception: # Best-effort: prior turn's receive iterator may already be # closed or in an error state. We're resetting to start a new # turn — propagating here would block legitimate new turns. pass self._recv_iter = None await self._session.send_realtime_input(activity_start=types.ActivityStart()) blob = types.Blob( data=pcm_16k, mime_type="audio/pcm;rate=16000", ) await self._session.send_realtime_input(audio=blob) await self._session.send_realtime_input(activity_end=types.ActivityEnd())
Inherited members
class LiveKitAgentAdapter (url: str, api_key: str, api_secret: str, room: str)-
Abstract base for voice agents that exchange audio with the agent under test.
Subclasses implement
connect,disconnect,send_audio, andrecv_audio. The defaultcallimplementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.Attributes
capabilities- Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout-
Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.
60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.
Override per-adapter at construction time::
adapter = MyVoiceAdapter() adapter.response_timeout = 90.0 # slow tool-call chain
Expand source code
class LiveKitAgentAdapter(VoiceAgentAdapter): capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=True, native_vad=True, dtmf=False, input_formats=["pcm16/48000"], output_formats=["pcm16/48000"], ) def __init__(self, url: str, api_key: str, api_secret: str, room: str): super().__init__() self.url = url self.api_key = api_key self.api_secret = api_secret self.room = room self._room: Optional[object] = None def __repr__(self) -> str: # redact credentials return f"LiveKitAgentAdapter(url={self.url!r}, room={self.room!r}, api_key='***', api_secret='***')" async def connect(self) -> None: self._room = object() async def disconnect(self) -> None: self._room = None async def send_audio(self, chunk: AudioChunk) -> None: if self._room is None: raise RuntimeError("LiveKitAgentAdapter: not connected") raise PendingTransportError("LiveKitAgentAdapter") async def recv_audio(self, timeout: float) -> AudioChunk: if self._room is None: raise RuntimeError("LiveKitAgentAdapter: not connected") raise PendingTransportError("LiveKitAgentAdapter")Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Inherited members
class OpenAIRealtimeAgentAdapter (model: str = 'gpt-realtime-mini', voice: str = 'alloy', instructions: str = '', tools: Optional[List[Any]] = None, *, api_key: Optional[str] = None, role: AgentRole = AgentRole.AGENT, speaks_first: bool = False)-
Exercise OpenAI's Realtime API as either the agent under test (role=AGENT, default) or as the voice-enabled user simulator (role=USER, per §7.2 L1164-1171).
When role=USER, scripted
user("text")steps route text through the realtime session's text-input channel rather than triggering TTS.Transcript observability: -
last_user_transcript— set fromconversation.item.input_audio_transcription.completed-last_agent_transcript— accumulated fromresponse.output_audio_transcript.delta/ reset on doneExample::
adapter = OpenAIRealtimeAgentAdapter( model=OPENAI_REALTIME_MODEL, voice="alloy", instructions="You are a helpful assistant.", ) async with adapter: # scenario.run() feeds send_audio / recv_audio ...Expand source code
class OpenAIRealtimeAgentAdapter(VoiceAgentAdapter): """ Exercise OpenAI's Realtime API as either the agent under test (role=AGENT, default) or as the voice-enabled user simulator (role=USER, per §7.2 L1164-1171). When role=USER, scripted ``user("text")`` steps route text through the realtime session's text-input channel rather than triggering TTS. Transcript observability: - ``last_user_transcript`` — set from ``conversation.item.input_audio_transcription.completed`` - ``last_agent_transcript`` — accumulated from ``response.output_audio_transcript.delta`` / reset on done Example:: adapter = OpenAIRealtimeAgentAdapter( model=OPENAI_REALTIME_MODEL, voice="alloy", instructions="You are a helpful assistant.", ) async with adapter: # scenario.run() feeds send_audio / recv_audio ... """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=True, native_vad=True, dtmf=False, # OpenAI Realtime exposes ``response.cancel`` as a first-class # interrupt event — the model stops generating immediately. Mapped # below in ``interrupt()``. interruption=True, input_formats=["pcm16/24000"], output_formats=["pcm16/24000"], ) def __init__( self, model: str = OPENAI_REALTIME_MODEL, voice: str = "alloy", instructions: str = "", tools: Optional[List[Any]] = None, *, api_key: Optional[str] = None, role: AgentRole = AgentRole.AGENT, speaks_first: bool = False, ): super().__init__() self.model = model self.voice = voice self.instructions = instructions self.tools = tools or [] self.role = role # type: ignore[misc] # Resolve API key: explicit param takes precedence over env var. self._api_key: str = api_key or os.environ.get("OPENAI_API_KEY", "") self._ws: Any = None # Transcript observability — updated on incoming transcript events. self.last_user_transcript: Optional[str] = None self.last_agent_transcript: Optional[str] = None # Accumulation buffer for streaming agent transcript deltas. self._agent_transcript_buf: str = "" # Bytes appended to input_audio_buffer since last commit. Non-zero # means recv_audio should commit + request a response before awaiting. self._pending_audio_bytes: int = 0 # Tracks which legacy (pre-GA) event names have already triggered a # one-time warning, so the log isn't spammed on every audio frame. self._legacy_events_warned: set[str] = set() # --- Gap 1: agent-speaks-first support --- # When speaks_first=True, the adapter is configured for an agent-initiated # scenario where the agent must speak first without any user audio. self._speaks_first: bool = speaks_first # True while a response is in flight (set on response.created, cleared on # response.done / response.cancelled). Prevents double-firing response.create # and allows drain re-entries after a completed response to return empty # (clean drain exit). self._response_active: bool = False # Set to True the first time response.created is received. Guards the # drain re-entry short-circuit in recv_audio: the empty-chunk early-return # must only fire AFTER at least one response has been active and completed, # not on the very first recv_audio call (which would break direct-call # tests that don't go through notify_agent_turn). self._response_ever_active: bool = False # Per-turn signal: set by notify_agent_turn() before each agent step so # recv_audio knows this is a genuine agent-initiated turn (not a silent # proceed()/resume). Consumed (cleared) when the kick fires. self._agent_turn_pending: bool = speaks_first # first turn armed if speaks_first # --- Issue #630: realtime function-call (tool-call) surfacing --- # In-progress function calls keyed by call_id. Each value is a dict # {"name": str|None, "arguments": str} assembled from the streaming # `response.function_call_arguments.delta`/`.done` events and the # `response.output_item.added`/`.done` (function_call item) events. # recv_audio() returns an AudioChunk, so it cannot carry tool calls in # its return value — instead the function-call branches accumulate here # and the overridden call() drains them into result.messages. self._tool_call_accumulators: dict[str, dict[str, Any]] = {} # Finalized OpenAI-chat tool_call dicts for the CURRENT turn, in arrival # order. Cleared at the start of each call() so calls never leak across # turns (mirrors the transcript-state reset in _drain_agent_response). # Shape per entry: # {"id": call_id, "type": "function", # "function": {"name": name, "arguments": <json-str>}} self._completed_tool_calls: List[dict[str, Any]] = [] @property def url(self) -> str: return REALTIME_URL_TEMPLATE.format(model=self.model) def __repr__(self) -> str: # redact credentials return ( f"OpenAIRealtimeAgentAdapter(" f"model={self.model!r}, " f"voice={self.voice!r}, " f"role={self.role!r}, " f"api_key='***')" ) def _warn_if_legacy(self, received: str, ga_name: str) -> None: """Emit a one-time WARNING when a pre-GA (beta) event name is seen. Only fires once per distinct legacy name per adapter instance, so a multi-chunk audio response doesn't flood the log. The GA-named event itself never triggers a warning. """ if received != ga_name and received not in self._legacy_events_warned: self._legacy_events_warned.add(received) logger.warning( "OpenAIRealtimeAgentAdapter: received legacy event %r; " "GA uses %r — accepting defensively", received, ga_name, ) # ------------------------------------------------------------- tool calls # Issue #630: surface OpenAI Realtime function-call events as OpenAI-chat # tool_calls on the assistant message in result.messages. The Realtime wire # describes ONE logical call across several events # (`response.function_call_arguments.delta` × N → `.done`, and/or # `response.output_item.added`/`.done` carrying the function_call item). # We accumulate per call_id, then finalize into self._completed_tool_calls, # de-duplicating so each call_id yields exactly one tool_call (AC6). def _accumulate_tool_call_delta(self, call_id: str, delta: str) -> None: """Append a streaming arguments fragment for ``call_id``.""" acc = self._tool_call_accumulators.setdefault( call_id, {"name": None, "arguments": ""} ) acc["arguments"] = (acc["arguments"] or "") + (delta or "") def _note_tool_call_name(self, call_id: str, name: Optional[str]) -> None: """Record the function name for ``call_id`` (the item/added event).""" if not name: return acc = self._tool_call_accumulators.setdefault( call_id, {"name": None, "arguments": ""} ) acc["name"] = name def _finalize_tool_call( self, call_id: Optional[str], *, name: Optional[str] = None, arguments: Optional[str] = None, ) -> None: """Resolve one logical function call into ``self._completed_tool_calls``. Idempotent on ``call_id`` (AC6): the streaming-args path and the output-item path both describe the SAME call, so a second finalize for an already-completed ``call_id`` MERGES (fills a missing name / upgrades to a more complete arguments string) rather than appending a duplicate. Degrades safely (AC7): a finalize with NO ``call_id`` is skipped with a DEBUG log and emits nothing. Missing arguments become ``"{}"``; a malformed (non-JSON) arguments string is passed through verbatim — the adapter never parses-and-reraises. """ if not call_id: logger.debug( "OpenAIRealtimeAgentAdapter: function-call event with no " "call_id; skipping (AC7 degraded path)" ) return acc = self._tool_call_accumulators.get(call_id, {}) resolved_name = name or acc.get("name") # Pick the most complete arguments source: explicit (item/done) arg if # non-empty, else the accumulated streaming deltas, else "{}". candidates = [arguments, acc.get("arguments")] resolved_args = next( (c for c in candidates if c is not None and c != ""), None ) if resolved_args is None: resolved_args = "{}" # De-dup / merge on call_id (AC6). for existing in self._completed_tool_calls: if existing["id"] == call_id: fn = existing["function"] if not fn.get("name") and resolved_name: fn["name"] = resolved_name # Upgrade to a longer/more-complete arguments string if the new # source carries more (item arguments arriving after deltas). if resolved_args not in ("", "{}") and len(resolved_args) > len( fn.get("arguments") or "" ): fn["arguments"] = resolved_args return self._completed_tool_calls.append( { "id": call_id, "type": "function", "function": { "name": resolved_name or "", "arguments": resolved_args, }, } ) def notify_agent_turn(self) -> None: """Signal that an agent turn is about to be dispatched. Called by the executor before each agent step so recv_audio can fire a bare response.create for agent-initiated turns (where no user audio has been committed). This per-turn signal handles both turn 1 (opening) and subsequent agent turns in multi-turn scripts like [agent(), user(), agent()]. Only meaningful when role=AGENT. Safe to call on every agent step — recv_audio consumes and clears the flag. """ if self.role == AgentRole.AGENT: self._agent_turn_pending = True # ------------------------------------------------------------------ lifecycle async def connect(self) -> None: """Open the Realtime WebSocket and send the initial session.update.""" import websockets self._ws = await websockets.connect( self.url, additional_headers={ "Authorization": f"Bearer {self._api_key}", }, ) logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url) # Configure session: audio formats, voice, instructions, tools. # Disable server-side VAD (session.audio.input.turn_detection=None) so # we control turn boundaries explicitly via input_audio_buffer.commit + # response.create after each send_audio. session_config: dict[str, Any] = { "type": "realtime", "audio": { "input": { "format": {"type": "audio/pcm", "rate": 24000}, "turn_detection": None, "transcription": {"model": OPENAI_STT_MODEL}, }, "output": { "format": {"type": "audio/pcm", "rate": 24000}, "voice": self.voice, }, }, } if self.instructions: session_config["instructions"] = self.instructions if self.tools: session_config["tools"] = self.tools await self._ws.send( json.dumps({"type": "session.update", "session": session_config}) ) logger.debug("OpenAIRealtimeAgentAdapter: session.update sent") async def disconnect(self) -> None: """Close the WebSocket if open.""" if self._ws is not None: try: await self._ws.close() except Exception: # Best-effort: connection may already be half-closed or in an # error state when disconnect() is called. We're tearing down # regardless — propagating here would just leak the WS reference. pass finally: self._ws = None logger.debug("OpenAIRealtimeAgentAdapter: disconnected") # ------------------------------------------------------------------ I/O async def send_audio(self, chunk: AudioChunk) -> None: """ Append a PCM16 audio chunk to the model's input audio buffer. Only emits ``input_audio_buffer.append`` — the commit + response are deferred to the next ``recv_audio`` call. The scenario executor may call ``send_audio`` many times for a single user turn (TTS streams audio as chunks); committing per-chunk would confuse the server with sub-second turn boundaries. By deferring commit to recv_audio, we get one server turn per user turn. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") b64 = base64.b64encode(chunk.data).decode() await self._ws.send( json.dumps({"type": "input_audio_buffer.append", "audio": b64}) ) self._pending_audio_bytes += len(chunk.data) async def interrupt(self) -> None: """Send ``response.cancel`` — the OpenAI Realtime API's first-class interrupt. The model stops generating audio and text immediately. No timing race against VAD: deterministic stop, then the next user turn flows normally through ``send_audio`` + ``recv_audio``. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") await self._ws.send(json.dumps({"type": "response.cancel"})) logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)") async def recv_audio(self, timeout: float) -> AudioChunk: """ Commit any pending audio, request a response, and return the first audio chunk the model produces. If ``send_audio`` was called since the last ``recv_audio``, this method commits the buffer and emits ``response.create`` before awaiting the reply. Subsequent recv calls without new send calls just await the next audio delta (for multi-chunk responses). Loops over incoming events until a ``response.output_audio.delta`` event arrives (GA name), then returns decoded PCM16. The legacy ``response.audio.delta`` name is accepted defensively with a one-time warning. Transcript events update the instance's ``last_user_transcript`` / ``last_agent_transcript`` attributes. An ``error`` event raises a ``RuntimeError``. All other housekeeping events are ignored and the loop continues. Raises: asyncio.TimeoutError: if no audio arrives within ``timeout``. RuntimeError: if the server sends an error event. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") # If send_audio was called since last recv, commit and request response. if self._pending_audio_bytes > 0: await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"})) await self._ws.send(json.dumps({"type": "response.create"})) self._pending_audio_bytes = 0 self._agent_turn_pending = False # user spoke → per-turn signal consumed # Gap 1: agent-speaks-first / multi-turn agent initiation. # When the executor signals an agent turn via notify_agent_turn() and # no user audio has been committed and no response is already in flight, # send a bare content-free response.create so the model speaks. # Opening words come from the session instructions — no text is injected. # Self-limiting on user-first scripts: if user audio was committed above, # _agent_turn_pending was already cleared. # Also serves as the clean drain exit: if a response already completed # (_response_active is False after response.done cleared it) and # _agent_turn_pending is False, a drain re-entry returns an empty chunk. elif self._agent_turn_pending and not self._response_active: await self._ws.send(json.dumps({"type": "response.create"})) self._agent_turn_pending = False self._response_active = True elif not self._agent_turn_pending and not self._response_active and self._response_ever_active: # No pending audio, no agent-turn signal, no response in flight, # AND at least one response has already completed this session: # this is a drain re-entry after a completed response. Return empty # chunk so _drain_agent_response's tail-silence loop exits cleanly. # Guard on _response_ever_active so that a fresh recv_audio call # (before any response.created fires) does NOT short-circuit — # that would break direct recv_audio callers (e.g. unit tests that # call recv_audio without going through notify_agent_turn). return AudioChunk(data=b"") deadline = asyncio.get_running_loop().time() + timeout while True: remaining = deadline - asyncio.get_running_loop().time() if remaining <= 0: raise asyncio.TimeoutError( "OpenAIRealtimeAgentAdapter: recv_audio timed out" ) raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining) try: event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode()) except Exception: logger.debug( "OpenAIRealtimeAgentAdapter: non-JSON message, skipping" ) continue etype = event.get("type", "") if etype in ("response.output_audio.delta", "response.audio.delta"): # Accept both the GA event name and its retired beta alias — # live gpt-realtime* models have been observed still emitting # the beta names. These legacy arms should be removed once the # GA names are confirmed stable at a live endpoint (issue #602). self._warn_if_legacy(etype, "response.output_audio.delta") b64 = event.get("delta", "") pcm = base64.b64decode(b64) # Enforce PCM16 invariant: even byte count. if len(pcm) % 2 == 1: pcm = pcm[:-1] return AudioChunk(data=pcm) elif etype == "response.created": # Response is now in flight — mark it so subsequent recv_audio # drain re-entries don't fire a spurious second response.create. self._response_active = True self._response_ever_active = True elif etype in ( "response.output_audio_transcript.delta", "response.audio_transcript.delta", ): # Accumulate streaming agent transcript. self._warn_if_legacy(etype, "response.output_audio_transcript.delta") self._agent_transcript_buf += event.get("delta", "") elif etype in ( "response.output_audio_transcript.done", "response.audio_transcript.done", ): # Finalise; the `transcript` field may have the full text. self._warn_if_legacy(etype, "response.output_audio_transcript.done") transcript = event.get("transcript", "") if transcript: self.last_agent_transcript = transcript elif self._agent_transcript_buf: self.last_agent_transcript = self._agent_transcript_buf self._agent_transcript_buf = "" elif etype in ("response.done", "response.cancelled"): # Response finished or was cancelled — mark it so the next # drain re-entry returns an empty chunk (clean exit). self._response_active = False # Issue #646: a tool-only turn (function call, NO audio delta) would # otherwise loop here to the deadline and raise — the accumulated tool # call is parsed but never returned. When the response is done and at # least one tool call has been finalized this turn, return an empty # chunk so the drain exits cleanly and call() surfaces the tool_calls # message. A genuinely empty turn (done + EMPTY accumulator) must still # fall through to the timeout — the non-empty accumulator is the # discriminator, NOT response.done alone. # (Distinct from the drain-re-entry empty-chunk path above at the # _response_ever_active guard, which handles a COMPLETED prior response.) if self._completed_tool_calls: return AudioChunk(data=b"") elif etype == "conversation.item.input_audio_transcription.completed": # User-side transcript from Whisper. self.last_user_transcript = event.get("transcript", "") elif etype == "response.function_call_arguments.delta": # Issue #630: streaming arguments fragment for a function call. # Accumulate per call_id; the call is finalized on `.done` or # the function_call output-item event. self._accumulate_tool_call_delta( event.get("call_id", ""), event.get("delta", "") ) elif etype == "response.function_call_arguments.done": # Issue #630: streaming-args path complete. `name` is typically # NOT on this event (it arrives via the output_item), so resolve # it from the accumulator. A missing call_id degrades safely. self._finalize_tool_call( event.get("call_id"), name=event.get("name"), arguments=event.get("arguments"), ) elif etype in ("response.output_item.added", "response.output_item.done"): # Issue #630: the output-item form of a function call carries the # authoritative `name` + `call_id` (and, on `.done`, the full # `arguments`). For non-function items (e.g. an audio message # item) this is benign housekeeping — fall through silently. item = event.get("item") or {} if isinstance(item, dict) and item.get("type") == "function_call": call_id = item.get("call_id") name = item.get("name") if etype == "response.output_item.added": # Shell arrives before args stream — record the name so a # later delta/done can attach to it. Do not finalize yet. self._note_tool_call_name(call_id or "", name) else: # `.done`: authoritative full call. Finalize (idempotent # on call_id — merges with any streaming-args entry, AC6). self._finalize_tool_call( call_id, name=name, arguments=item.get("arguments") ) else: logger.debug( "OpenAIRealtimeAgentAdapter: ignoring non-function " "output_item event %r", etype, ) elif etype == "error": error_detail = event.get("error", {}) msg = error_detail.get("message", str(error_detail)) raise RuntimeError( f"OpenAIRealtimeAgentAdapter: server error — {msg}" ) else: # Housekeeping events — session.created, session.updated, etc. — # are benign. Log at DEBUG and keep the loop running. logger.debug( "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype ) async def call(self, input: AgentInput) -> AgentReturnTypes: """Surface realtime tool calls alongside the spoken audio turn (#630). The base ``call()`` returns a single assistant audio message and does all the recording bookkeeping. We keep that intact and, when the agent called any tools this turn, append ONE extra assistant message carrying every tool call as OpenAI-chat ``tool_calls`` — the shape ``state.has_tool_call`` / ``state.last_tool_call`` consume. Returns: - the single audio message (dict) when no tools were called — byte identical to the base behaviour (AC8 regression), OR - ``[audio_message, tool_call_message]`` when ≥1 tool was called (AC1/AC9/AC10). ``convert_agent_return_types_to_openai_messages`` passes a list of dicts through verbatim into result.messages. The completed-calls list is reset HERE (turn start) so tool calls never leak across turns; the function-call events for THIS turn are consumed inside the ``super().call()`` drain and finalized onto the list. """ # Reset per-turn tool-call state so a prior turn's calls don't bleed # through (mirrors the transcript reset in _drain_agent_response). self._completed_tool_calls = [] self._tool_call_accumulators = {} audio_message = await super().call(input) if not self._completed_tool_calls: return audio_message # One assistant message carrying ALL of this turn's tool calls — the # conventional OpenAI shape (one assistant turn, many tool_calls). tool_call_message: ChatCompletionMessageParam = cast( ChatCompletionMessageParam, { "role": "assistant", "content": None, "tool_calls": list(self._completed_tool_calls), }, ) return [cast(ChatCompletionMessageParam, audio_message), tool_call_message] async def _drain_agent_response( self, on_first_chunk=None ) -> "AudioChunk": """Override to surface the spoken transcript after draining. The base class drains recv_audio chunks until tail silence. After draining, ``response.output_audio_transcript.done`` will have already fired (it arrives before ``response.done``), so ``self.last_agent_transcript`` is populated. We rebuild the merged chunk with ``transcript=`` set so ``create_audio_message`` attaches a text part to the assistant message. This puts the transcript in ``result.messages`` (AC2/AC5) without modifying messages.py, tts.py, or composable.py (AC6). The transcript text is an ordinary ``{"type":"text","text":...}`` part — no extra keys. Echo-safety is handled in ``_strip_audio_content`` (user_simulator_agent.py) by detecting the structural pattern: an assistant message that carries both an ``input_audio`` part AND a ``text`` part is a voiced agent turn, so the text is reframed as third-person context before ``reverse_roles`` runs (AC4/AC11). """ from ..audio_chunk import AudioChunk as _AudioChunk # Clear transcript state at the start of each turn so a stale value # from a prior turn doesn't bleed through if this turn's event never # fires (AC8: degraded case — absent transcript → no text part). self.last_agent_transcript = None self._agent_transcript_buf = "" merged = await super()._drain_agent_response(on_first_chunk=on_first_chunk) transcript = self.last_agent_transcript if transcript: # Rebuild with transcript attached so create_audio_message adds # the text part. The original merged.data is unchanged. return _AudioChunk(data=merged.data, transcript=transcript) # No transcript (AC8): return the merge unchanged (audio-only). return merged async def send_text(self, text: str) -> None: """ Inject scripted text into the realtime session as a user message. Used when this adapter is the user simulator (role=USER): scripted ``user("text")`` steps route through here instead of spawning TTS. The model synthesises the text into spoken audio with natural prosody, which is then delivered via ``recv_audio``. NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio messages retroactively; the downstream transcript reflects what the model actually emitted, not what was scripted. Raises: RuntimeError: if called before ``connect()``. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") # Create a user conversation item with the scripted text. await self._ws.send( json.dumps( { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": text}], }, } ) ) # Prompt the model to generate audio output. await self._ws.send(json.dumps({"type": "response.create"})) logger.debug( "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60] )Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Instance variables
var url : str-
Expand source code
@property def url(self) -> str: return REALTIME_URL_TEMPLATE.format(model=self.model)
Methods
async def call(self, input: AgentInput) ‑> str | openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam | List[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam] | ScenarioResult-
Surface realtime tool calls alongside the spoken audio turn (#630).
The base
call()returns a single assistant audio message and does all the recording bookkeeping. We keep that intact and, when the agent called any tools this turn, append ONE extra assistant message carrying every tool call as OpenAI-chattool_calls— the shapestate.has_tool_call/state.last_tool_callconsume.Returns
- the single audio message (dict) when no tools were called — byte identical to the base behaviour (AC8 regression), OR
[audio_message, tool_call_message]when ≥1 tool was called (AC1/AC9/AC10).convert_agent_return_types_to_openai_messagespasses a list of dicts through verbatim into result.messages. The completed-calls list is reset HERE (turn start) so tool calls never leak across turns; the function-call events for THIS turn are consumed inside thesuper().call()drain and finalized onto the list.
Expand source code
async def call(self, input: AgentInput) -> AgentReturnTypes: """Surface realtime tool calls alongside the spoken audio turn (#630). The base ``call()`` returns a single assistant audio message and does all the recording bookkeeping. We keep that intact and, when the agent called any tools this turn, append ONE extra assistant message carrying every tool call as OpenAI-chat ``tool_calls`` — the shape ``state.has_tool_call`` / ``state.last_tool_call`` consume. Returns: - the single audio message (dict) when no tools were called — byte identical to the base behaviour (AC8 regression), OR - ``[audio_message, tool_call_message]`` when ≥1 tool was called (AC1/AC9/AC10). ``convert_agent_return_types_to_openai_messages`` passes a list of dicts through verbatim into result.messages. The completed-calls list is reset HERE (turn start) so tool calls never leak across turns; the function-call events for THIS turn are consumed inside the ``super().call()`` drain and finalized onto the list. """ # Reset per-turn tool-call state so a prior turn's calls don't bleed # through (mirrors the transcript reset in _drain_agent_response). self._completed_tool_calls = [] self._tool_call_accumulators = {} audio_message = await super().call(input) if not self._completed_tool_calls: return audio_message # One assistant message carrying ALL of this turn's tool calls — the # conventional OpenAI shape (one assistant turn, many tool_calls). tool_call_message: ChatCompletionMessageParam = cast( ChatCompletionMessageParam, { "role": "assistant", "content": None, "tool_calls": list(self._completed_tool_calls), }, ) return [cast(ChatCompletionMessageParam, audio_message), tool_call_message] async def connect(self) ‑> None-
Open the Realtime WebSocket and send the initial session.update.
Expand source code
async def connect(self) -> None: """Open the Realtime WebSocket and send the initial session.update.""" import websockets self._ws = await websockets.connect( self.url, additional_headers={ "Authorization": f"Bearer {self._api_key}", }, ) logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url) # Configure session: audio formats, voice, instructions, tools. # Disable server-side VAD (session.audio.input.turn_detection=None) so # we control turn boundaries explicitly via input_audio_buffer.commit + # response.create after each send_audio. session_config: dict[str, Any] = { "type": "realtime", "audio": { "input": { "format": {"type": "audio/pcm", "rate": 24000}, "turn_detection": None, "transcription": {"model": OPENAI_STT_MODEL}, }, "output": { "format": {"type": "audio/pcm", "rate": 24000}, "voice": self.voice, }, }, } if self.instructions: session_config["instructions"] = self.instructions if self.tools: session_config["tools"] = self.tools await self._ws.send( json.dumps({"type": "session.update", "session": session_config}) ) logger.debug("OpenAIRealtimeAgentAdapter: session.update sent") async def disconnect(self) ‑> None-
Close the WebSocket if open.
Expand source code
async def disconnect(self) -> None: """Close the WebSocket if open.""" if self._ws is not None: try: await self._ws.close() except Exception: # Best-effort: connection may already be half-closed or in an # error state when disconnect() is called. We're tearing down # regardless — propagating here would just leak the WS reference. pass finally: self._ws = None logger.debug("OpenAIRealtimeAgentAdapter: disconnected") async def interrupt(self) ‑> None-
Send
response.cancel— the OpenAI Realtime API's first-class interrupt. The model stops generating audio and text immediately. No timing race against VAD: deterministic stop, then the next user turn flows normally throughsend_audio+recv_audio.Expand source code
async def interrupt(self) -> None: """Send ``response.cancel`` — the OpenAI Realtime API's first-class interrupt. The model stops generating audio and text immediately. No timing race against VAD: deterministic stop, then the next user turn flows normally through ``send_audio`` + ``recv_audio``. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") await self._ws.send(json.dumps({"type": "response.cancel"})) logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)") def notify_agent_turn(self) ‑> None-
Signal that an agent turn is about to be dispatched.
Called by the executor before each agent step so recv_audio can fire a bare response.create for agent-initiated turns (where no user audio has been committed). This per-turn signal handles both turn 1 (opening) and subsequent agent turns in multi-turn scripts like [agent(), user(), agent()].
Only meaningful when role=AGENT. Safe to call on every agent step — recv_audio consumes and clears the flag.
Expand source code
def notify_agent_turn(self) -> None: """Signal that an agent turn is about to be dispatched. Called by the executor before each agent step so recv_audio can fire a bare response.create for agent-initiated turns (where no user audio has been committed). This per-turn signal handles both turn 1 (opening) and subsequent agent turns in multi-turn scripts like [agent(), user(), agent()]. Only meaningful when role=AGENT. Safe to call on every agent step — recv_audio consumes and clears the flag. """ if self.role == AgentRole.AGENT: self._agent_turn_pending = True async def recv_audio(self, timeout: float) ‑> AudioChunk-
Commit any pending audio, request a response, and return the first audio chunk the model produces.
If
send_audiowas called since the lastrecv_audio, this method commits the buffer and emitsresponse.createbefore awaiting the reply. Subsequent recv calls without new send calls just await the next audio delta (for multi-chunk responses).Loops over incoming events until a
response.output_audio.deltaevent arrives (GA name), then returns decoded PCM16. The legacyresponse.audio.deltaname is accepted defensively with a one-time warning. Transcript events update the instance'slast_user_transcript/last_agent_transcriptattributes. Anerrorevent raises aRuntimeError. All other housekeeping events are ignored and the loop continues.Raises
asyncio.TimeoutError- if no audio arrives within
timeout. RuntimeError- if the server sends an error event.
Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk: """ Commit any pending audio, request a response, and return the first audio chunk the model produces. If ``send_audio`` was called since the last ``recv_audio``, this method commits the buffer and emits ``response.create`` before awaiting the reply. Subsequent recv calls without new send calls just await the next audio delta (for multi-chunk responses). Loops over incoming events until a ``response.output_audio.delta`` event arrives (GA name), then returns decoded PCM16. The legacy ``response.audio.delta`` name is accepted defensively with a one-time warning. Transcript events update the instance's ``last_user_transcript`` / ``last_agent_transcript`` attributes. An ``error`` event raises a ``RuntimeError``. All other housekeeping events are ignored and the loop continues. Raises: asyncio.TimeoutError: if no audio arrives within ``timeout``. RuntimeError: if the server sends an error event. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") # If send_audio was called since last recv, commit and request response. if self._pending_audio_bytes > 0: await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"})) await self._ws.send(json.dumps({"type": "response.create"})) self._pending_audio_bytes = 0 self._agent_turn_pending = False # user spoke → per-turn signal consumed # Gap 1: agent-speaks-first / multi-turn agent initiation. # When the executor signals an agent turn via notify_agent_turn() and # no user audio has been committed and no response is already in flight, # send a bare content-free response.create so the model speaks. # Opening words come from the session instructions — no text is injected. # Self-limiting on user-first scripts: if user audio was committed above, # _agent_turn_pending was already cleared. # Also serves as the clean drain exit: if a response already completed # (_response_active is False after response.done cleared it) and # _agent_turn_pending is False, a drain re-entry returns an empty chunk. elif self._agent_turn_pending and not self._response_active: await self._ws.send(json.dumps({"type": "response.create"})) self._agent_turn_pending = False self._response_active = True elif not self._agent_turn_pending and not self._response_active and self._response_ever_active: # No pending audio, no agent-turn signal, no response in flight, # AND at least one response has already completed this session: # this is a drain re-entry after a completed response. Return empty # chunk so _drain_agent_response's tail-silence loop exits cleanly. # Guard on _response_ever_active so that a fresh recv_audio call # (before any response.created fires) does NOT short-circuit — # that would break direct recv_audio callers (e.g. unit tests that # call recv_audio without going through notify_agent_turn). return AudioChunk(data=b"") deadline = asyncio.get_running_loop().time() + timeout while True: remaining = deadline - asyncio.get_running_loop().time() if remaining <= 0: raise asyncio.TimeoutError( "OpenAIRealtimeAgentAdapter: recv_audio timed out" ) raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining) try: event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode()) except Exception: logger.debug( "OpenAIRealtimeAgentAdapter: non-JSON message, skipping" ) continue etype = event.get("type", "") if etype in ("response.output_audio.delta", "response.audio.delta"): # Accept both the GA event name and its retired beta alias — # live gpt-realtime* models have been observed still emitting # the beta names. These legacy arms should be removed once the # GA names are confirmed stable at a live endpoint (issue #602). self._warn_if_legacy(etype, "response.output_audio.delta") b64 = event.get("delta", "") pcm = base64.b64decode(b64) # Enforce PCM16 invariant: even byte count. if len(pcm) % 2 == 1: pcm = pcm[:-1] return AudioChunk(data=pcm) elif etype == "response.created": # Response is now in flight — mark it so subsequent recv_audio # drain re-entries don't fire a spurious second response.create. self._response_active = True self._response_ever_active = True elif etype in ( "response.output_audio_transcript.delta", "response.audio_transcript.delta", ): # Accumulate streaming agent transcript. self._warn_if_legacy(etype, "response.output_audio_transcript.delta") self._agent_transcript_buf += event.get("delta", "") elif etype in ( "response.output_audio_transcript.done", "response.audio_transcript.done", ): # Finalise; the `transcript` field may have the full text. self._warn_if_legacy(etype, "response.output_audio_transcript.done") transcript = event.get("transcript", "") if transcript: self.last_agent_transcript = transcript elif self._agent_transcript_buf: self.last_agent_transcript = self._agent_transcript_buf self._agent_transcript_buf = "" elif etype in ("response.done", "response.cancelled"): # Response finished or was cancelled — mark it so the next # drain re-entry returns an empty chunk (clean exit). self._response_active = False # Issue #646: a tool-only turn (function call, NO audio delta) would # otherwise loop here to the deadline and raise — the accumulated tool # call is parsed but never returned. When the response is done and at # least one tool call has been finalized this turn, return an empty # chunk so the drain exits cleanly and call() surfaces the tool_calls # message. A genuinely empty turn (done + EMPTY accumulator) must still # fall through to the timeout — the non-empty accumulator is the # discriminator, NOT response.done alone. # (Distinct from the drain-re-entry empty-chunk path above at the # _response_ever_active guard, which handles a COMPLETED prior response.) if self._completed_tool_calls: return AudioChunk(data=b"") elif etype == "conversation.item.input_audio_transcription.completed": # User-side transcript from Whisper. self.last_user_transcript = event.get("transcript", "") elif etype == "response.function_call_arguments.delta": # Issue #630: streaming arguments fragment for a function call. # Accumulate per call_id; the call is finalized on `.done` or # the function_call output-item event. self._accumulate_tool_call_delta( event.get("call_id", ""), event.get("delta", "") ) elif etype == "response.function_call_arguments.done": # Issue #630: streaming-args path complete. `name` is typically # NOT on this event (it arrives via the output_item), so resolve # it from the accumulator. A missing call_id degrades safely. self._finalize_tool_call( event.get("call_id"), name=event.get("name"), arguments=event.get("arguments"), ) elif etype in ("response.output_item.added", "response.output_item.done"): # Issue #630: the output-item form of a function call carries the # authoritative `name` + `call_id` (and, on `.done`, the full # `arguments`). For non-function items (e.g. an audio message # item) this is benign housekeeping — fall through silently. item = event.get("item") or {} if isinstance(item, dict) and item.get("type") == "function_call": call_id = item.get("call_id") name = item.get("name") if etype == "response.output_item.added": # Shell arrives before args stream — record the name so a # later delta/done can attach to it. Do not finalize yet. self._note_tool_call_name(call_id or "", name) else: # `.done`: authoritative full call. Finalize (idempotent # on call_id — merges with any streaming-args entry, AC6). self._finalize_tool_call( call_id, name=name, arguments=item.get("arguments") ) else: logger.debug( "OpenAIRealtimeAgentAdapter: ignoring non-function " "output_item event %r", etype, ) elif etype == "error": error_detail = event.get("error", {}) msg = error_detail.get("message", str(error_detail)) raise RuntimeError( f"OpenAIRealtimeAgentAdapter: server error — {msg}" ) else: # Housekeeping events — session.created, session.updated, etc. — # are benign. Log at DEBUG and keep the loop running. logger.debug( "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype ) async def send_audio(self, chunk: AudioChunk) ‑> None-
Append a PCM16 audio chunk to the model's input audio buffer.
Only emits
input_audio_buffer.append— the commit + response are deferred to the nextrecv_audiocall. The scenario executor may callsend_audiomany times for a single user turn (TTS streams audio as chunks); committing per-chunk would confuse the server with sub-second turn boundaries. By deferring commit to recv_audio, we get one server turn per user turn.Expand source code
async def send_audio(self, chunk: AudioChunk) -> None: """ Append a PCM16 audio chunk to the model's input audio buffer. Only emits ``input_audio_buffer.append`` — the commit + response are deferred to the next ``recv_audio`` call. The scenario executor may call ``send_audio`` many times for a single user turn (TTS streams audio as chunks); committing per-chunk would confuse the server with sub-second turn boundaries. By deferring commit to recv_audio, we get one server turn per user turn. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") b64 = base64.b64encode(chunk.data).decode() await self._ws.send( json.dumps({"type": "input_audio_buffer.append", "audio": b64}) ) self._pending_audio_bytes += len(chunk.data) async def send_text(self, text: str) ‑> None-
Inject scripted text into the realtime session as a user message.
Used when this adapter is the user simulator (role=USER): scripted
user("text")steps route through here instead of spawning TTS. The model synthesises the text into spoken audio with natural prosody, which is then delivered viarecv_audio.NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio messages retroactively; the downstream transcript reflects what the model actually emitted, not what was scripted.
Raises
RuntimeError- if called before
connect().
Expand source code
async def send_text(self, text: str) -> None: """ Inject scripted text into the realtime session as a user message. Used when this adapter is the user simulator (role=USER): scripted ``user("text")`` steps route through here instead of spawning TTS. The model synthesises the text into spoken audio with natural prosody, which is then delivered via ``recv_audio``. NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio messages retroactively; the downstream transcript reflects what the model actually emitted, not what was scripted. Raises: RuntimeError: if called before ``connect()``. """ if self._ws is None: raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected") # Create a user conversation item with the scripted text. await self._ws.send( json.dumps( { "type": "conversation.item.create", "item": { "type": "message", "role": "user", "content": [{"type": "input_text", "text": text}], }, } ) ) # Prompt the model to generate audio output. await self._ws.send(json.dumps({"type": "response.create"})) logger.debug( "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60] )
class PendingTransportError (adapter_name: str)-
Raised by stub adapters when their transport code has not landed yet.
Expand source code
class PendingTransportError(NotImplementedError): """Raised by stub adapters when their transport code has not landed yet.""" def __init__(self, adapter_name: str) -> None: super().__init__( f"{adapter_name}: transport implementation is not yet wired up. " "Options: (1) run this scenario as an @integration test against a " f"live endpoint, (2) subclass {adapter_name} and implement " "send_audio/recv_audio — and re-audit the inherited " "`capabilities` ClassVar so the matrix matches what your subclass " "can actually do. Claiming streaming_transcripts=True in a " "subclass without a real transcript stream will silently break " "after_words interruption." ) self.adapter_name = adapter_nameAncestors
- builtins.NotImplementedError
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class PipecatAgentAdapter (url: Optional[str] = None, *, signaling_url: Optional[str] = None, transport: "Literal['scenario.voice.adapters.websocket', 'scenario.voice.adapters.webrtc']" = 'websocket', audio_format: str = 'mulaw', sample_rate: int = 8000, stream_sid: Optional[str] = None, call_sid: Optional[str] = None)-
Test a running Pipecat bot via its exposed WebSocket endpoint.
Transport is selected by the
transportargument: -"websocket"(default): Twilio Media Streams protocol over WS. Scenario sends a syntheticstartevent, thenmediaframes. Pipecat'sTwilioFrameSerializeron the bot side handles the wire format. -"webrtc": SmallWebRTC-style negotiation. RaisesPendingTransportError; tracked as a follow-up.Expand source code
class PipecatAgentAdapter(VoiceAgentAdapter): """ Test a running Pipecat bot via its exposed WebSocket endpoint. Transport is selected by the ``transport`` argument: - ``"websocket"`` (default): Twilio Media Streams protocol over WS. Scenario sends a synthetic ``start`` event, then ``media`` frames. Pipecat's ``TwilioFrameSerializer`` on the bot side handles the wire format. - ``"webrtc"``: SmallWebRTC-style negotiation. Raises ``PendingTransportError``; tracked as a follow-up. """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=True, native_vad=True, dtmf=False, # Pipecat over the Twilio WS transport speaks the Twilio Media Streams # protocol; the ``clear`` event drops all buffered outbound audio on # the bot side. That's first-class interrupt — no VAD timing race. interruption=True, input_formats=["pcm16/24000", "mulaw/8000", "opus"], output_formats=["pcm16/24000", "mulaw/8000", "opus"], ) def __init__( self, url: Optional[str] = None, *, signaling_url: Optional[str] = None, transport: Literal["websocket", "webrtc"] = "websocket", audio_format: str = "mulaw", sample_rate: int = 8000, stream_sid: Optional[str] = None, call_sid: Optional[str] = None, ) -> None: super().__init__() if transport == "websocket" and url is None: raise ValueError("PipecatAgentAdapter(transport='websocket') requires url=") if transport == "webrtc" and signaling_url is None: raise ValueError("PipecatAgentAdapter(transport='webrtc') requires signaling_url=") self.url = url self.signaling_url = signaling_url self.transport = transport self.audio_format = audio_format self.sample_rate = sample_rate # Synthetic SIDs pipecat's TwilioFrameSerializer needs in the `start` # event. If caller doesn't supply them, we fabricate UUIDs. Pipecat # uses them for logging and the auto-hangup REST call; both are no-ops # when we're not actually going through Twilio. self.stream_sid = stream_sid self.call_sid = call_sid self._ws: Any = None self._recv_task: Optional[asyncio.Task] = None self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None # Serialises concurrent send_audio() calls — without it two paced # senders would interleave 20-ms mulaw frames on the wire and the # bot would receive corrupted audio. Used for the interruption case # where the executor calls send_audio() while a previous turn's # send is still in flight. self._send_lock: Optional[asyncio.Lock] = None @property def transport_format(self) -> str: return f"{self.audio_format}/{self.sample_rate}" # ------------------------------------------------------------------ lifecycle async def connect(self) -> None: if self.transport == "webrtc": from ._stub import PendingTransportError raise PendingTransportError( "PipecatAgentAdapter(transport='webrtc')" ) # Lazy import so `import scenario` doesn't require websockets at the # top of the module-load path (it's already a hard dep, but being # consistent with the Twilio adapter style). import websockets assert self.url is not None # validated in __init__ self._ws = await websockets.connect( self.url, ping_interval=None, ping_timeout=None ) self._inbound_queue = asyncio.Queue() self._send_lock = asyncio.Lock() # Send the synthetic `start` event that pipecat's TwilioFrameSerializer # requires to learn the stream/call SIDs and start deserializing # media frames. if self.stream_sid is None: self.stream_sid = f"MZ{uuid.uuid4().hex}" if self.call_sid is None: self.call_sid = f"CA{uuid.uuid4().hex}" await self._ws.send(json.dumps({"event": "connected", "protocol": "Call", "version": "1.0.0"})) await self._ws.send( json.dumps( { "event": "start", "streamSid": self.stream_sid, "start": { "streamSid": self.stream_sid, "callSid": self.call_sid, "mediaFormat": { "encoding": "audio/x-mulaw", "sampleRate": 8000, "channels": 1, }, }, } ) ) self._recv_task = asyncio.create_task(self._recv_loop()) logger.debug("PipecatAgentAdapter: connected to %s (stream=%s)", self.url, self.stream_sid) async def disconnect(self) -> None: ws = self._ws if ws is None: return # Send `stop` event so the bot can clean up its pipeline gracefully. try: if self.stream_sid: await ws.send(json.dumps({"event": "stop", "streamSid": self.stream_sid})) except Exception: logger.debug("PipecatAgentAdapter: failed to send stop frame", exc_info=True) if self._recv_task is not None: self._recv_task.cancel() try: await self._recv_task except asyncio.CancelledError: # Expected: we just cancelled it. pass except Exception: # Unexpected teardown error — already logging enough context # elsewhere; disconnect() is best-effort. logger.debug("PipecatAgentAdapter: recv_task raised during cancel", exc_info=True) self._recv_task = None try: await ws.close() except Exception: # WS may already be closed by the peer; disconnect() is best-effort. logger.debug("PipecatAgentAdapter: ws.close() raised", exc_info=True) self._ws = None self._inbound_queue = None self.stream_sid = None self.call_sid = None # ------------------------------------------------------------------ I/O async def send_audio(self, chunk: AudioChunk) -> None: # Pace at real-time (TWILIO_FRAME_MS/1000s per 20-ms frame). Matches what # a real caller produces over a PSTN line — the SUT sees normal speech # rhythm, not a synthetic dump. # # After the last frame we send a Twilio ``mark`` named "utterance_end". # Real-time pacing means TTS-induced inter-phrase pauses survive on the # wire, and a stateless inactivity-timer on the receiver can't # distinguish "speaker paused after a comma" from "speaker finished # their turn." The mark is an explicit, non-ambiguous end-of-turn # signal: cooperating SUTs flush on the mark; legacy SUTs fall back to # VAD timing. self._assert_connected() assert self._ws is not None and self.stream_sid is not None and self._send_lock is not None mulaw = pcm16_24k_to_mulaw8k(chunk.data) frame_secs = TWILIO_FRAME_MS / 1000 async with self._send_lock: for frame in iter_mulaw_frames(mulaw): if not frame: continue await self._ws.send(build_media_frame(self.stream_sid, frame)) await asyncio.sleep(frame_secs) await self._ws.send(build_mark_frame(self.stream_sid, "utterance_end")) async def recv_audio(self, timeout: float) -> AudioChunk: self._assert_connected() assert self._inbound_queue is not None return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout) async def interrupt(self) -> None: """Send a Twilio ``clear`` frame — the bot drops all buffered outbound audio immediately. Cooperating Pipecat bots (and any code wired to the Media Streams protocol) treat ``clear`` as "stop talking now." Use this in preference to timing-based barge-in when the SUT supports it: it's deterministic, doesn't depend on VAD detection windows, and matches the same protocol used in production. """ self._assert_connected() assert self._ws is not None and self.stream_sid is not None await self._ws.send(build_clear_frame(self.stream_sid)) logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)") # ------------------------------------------------------------------ background async def _recv_loop(self) -> None: """Read frames from pipecat, decode µ-law → PCM16 24k, enqueue.""" assert self._ws is not None and self._inbound_queue is not None buffered_mulaw = bytearray() BATCH_MS = 100 try: async for raw in self._ws: if isinstance(raw, bytes): # pipecat sometimes emits binary frames for audio; treat # as raw µ-law payload if we see one. buffered_mulaw.extend(raw) if len(buffered_mulaw) >= (BATCH_MS * 8): pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw)) buffered_mulaw.clear() await self._inbound_queue.put(AudioChunk(data=pcm)) continue frame = parse_media_stream_frame(raw) if frame is None: continue if frame.event == "media" and frame.payload_mulaw: buffered_mulaw.extend(frame.payload_mulaw) if len(buffered_mulaw) >= (BATCH_MS * 8): pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw)) buffered_mulaw.clear() await self._inbound_queue.put(AudioChunk(data=pcm)) elif frame.event == "stop": if buffered_mulaw: pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw)) buffered_mulaw.clear() await self._inbound_queue.put(AudioChunk(data=pcm)) return except asyncio.CancelledError: raise except Exception: logger.warning("PipecatAgentAdapter: recv loop exited with error", exc_info=True) # ------------------------------------------------------------------ assertions def _assert_connected(self) -> None: if self._ws is None: raise RuntimeError( "PipecatAgentAdapter: not connected. Did you forget to call connect()?" )Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Instance variables
var transport_format : str-
Expand source code
@property def transport_format(self) -> str: return f"{self.audio_format}/{self.sample_rate}"
Methods
async def interrupt(self) ‑> None-
Send a Twilio
clearframe — the bot drops all buffered outbound audio immediately. Cooperating Pipecat bots (and any code wired to the Media Streams protocol) treatclearas "stop talking now." Use this in preference to timing-based barge-in when the SUT supports it: it's deterministic, doesn't depend on VAD detection windows, and matches the same protocol used in production.Expand source code
async def interrupt(self) -> None: """Send a Twilio ``clear`` frame — the bot drops all buffered outbound audio immediately. Cooperating Pipecat bots (and any code wired to the Media Streams protocol) treat ``clear`` as "stop talking now." Use this in preference to timing-based barge-in when the SUT supports it: it's deterministic, doesn't depend on VAD detection windows, and matches the same protocol used in production. """ self._assert_connected() assert self._ws is not None and self.stream_sid is not None await self._ws.send(build_clear_frame(self.stream_sid)) logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")
Inherited members
class TwilioAgentAdapter (*, account_sid: str, auth_token: str, phone_number: str, public_base_url: Optional[str] = None, allowed_callers: Optional[list[str]] = None, on_dtmf: Optional[Callable[[str], None]] = None, http_port: int = 8765, role: AgentRole = AgentRole.AGENT, validate_signature: bool = True)-
Bidirectional Twilio Media Streams adapter.
Same class, same state, either direction:
adapter = TwilioAgentAdapter( account_sid=..., auth_token=..., phone_number="+14155551234", ) async with adapter: # connect() / disconnect() await adapter.place_call(to="+14155557777") # OR wait_for_call() # ... scenario.run(...) feeds send_audio / recv_audio ...The adapter is the only adapter with
dtmf=True. DTMF events received from the callee surface via theon_dtmfcallback set at construction time. To send DTMF, usesend_dtmf().interrupt(after_words=N)raisesUnsupportedCapabilityErroron this adapter — Media Streams delivers raw audio without incremental transcripts. Useinterrupt(after=seconds)instead.Expand source code
class TwilioAgentAdapter(VoiceAgentAdapter): """ Bidirectional Twilio Media Streams adapter. Same class, same state, either direction: adapter = TwilioAgentAdapter( account_sid=..., auth_token=..., phone_number="+14155551234", ) async with adapter: # connect() / disconnect() await adapter.place_call(to="+14155557777") # OR wait_for_call() # ... scenario.run(...) feeds send_audio / recv_audio ... The adapter is the *only* adapter with ``dtmf=True``. DTMF events received from the callee surface via the ``on_dtmf`` callback set at construction time. To send DTMF, use ``send_dtmf()``. ``interrupt(after_words=N)`` raises ``UnsupportedCapabilityError`` on this adapter — Media Streams delivers raw audio without incremental transcripts. Use ``interrupt(after=seconds)`` instead. """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=False, native_vad=False, dtmf=True, # Twilio Media Streams ``clear`` event drops all buffered outbound # audio. Used by ``adapter.interrupt()`` already wired below. interruption=True, input_formats=["mulaw/8000"], output_formats=["mulaw/8000"], ) # ------------------------------------------------------------------ init def __init__( self, *, account_sid: str, auth_token: str, phone_number: str, public_base_url: Optional[str] = None, allowed_callers: Optional[list[str]] = None, on_dtmf: Optional[Callable[[str], None]] = None, http_port: int = 8765, role: AgentRole = AgentRole.AGENT, validate_signature: bool = True, ) -> None: super().__init__() validate_e164(phone_number) self.account_sid = account_sid self.auth_token = auth_token self.phone_number = phone_number self.public_base_url = public_base_url self.allowed_callers = set(allowed_callers) if allowed_callers else None self.on_dtmf = on_dtmf self.http_port = http_port self.role = role # type: ignore[misc] # When True (default), the inbound /twilio/voice route requires a # valid X-Twilio-Signature header before accepting the webhook # body. The cloudflared tunnel URL is ephemeral and not # guessable, but anyone who learns it could otherwise POST fake # Twilio webhook events into the harness. Tests that don't have # real Twilio credentials disable this with # ``validate_signature=False``; production callers should leave # it on. self.validate_signature = validate_signature if not validate_signature: logger.warning( "TwilioAgentAdapter: validate_signature=False — inbound " "webhooks accept any payload without signature checks. " "Use only in tests; do not deploy to production." ) # Populated during connect(); None when disconnected. self._rest: Optional[TwilioRESTHelper] = None self._phone_number_sid: Optional[str] = None self._prior_voice_url: Optional[str] = None # Set by place_call() when it rewrites the callee's voice_url so # B-leg's webhook lands on our harness. Restored in disconnect. self._callee_phone_number_sid: Optional[str] = None self._prior_callee_voice_url: Optional[str] = None # Set by the first of wait_for_call()/place_call(); subsequent calls to # the other method raise. "idle" after connect() before either fires. self._mode: TwilioAdapterMode = "idle" # Server / media stream state. self._server_task: Optional[asyncio.Task] = None self._server_shutdown: Optional[asyncio.Event] = None self._call_sid: Optional[str] = None self._stream_sid: Optional[str] = None self._stream_connected: Optional[asyncio.Event] = None self._stream_ws: Any = None # starlette WebSocket self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None # ------------------------------------------------------------------ repr def __repr__(self) -> str: # redact credentials return ( f"TwilioAgentAdapter(" f"phone_number={self.phone_number!r}, " f"account_sid='***', auth_token='***', " f"public_base_url={self.public_base_url!r})" ) # ------------------------------------------------------------------ lifecycle async def connect(self) -> None: """Resolve number SID and start the FastAPI webhook + WS server. Does NOT modify the Twilio account's ``voice_url``. That side-effect only happens when ``wait_for_call()`` is invoked — callers (who will use ``place_call()``) never overwrite their number's inbound webhook, which makes caller-mode adapters safe to run against a shared pool of Twilio numbers without clobbering anyone's prod webhook. Idempotent: calling connect() on an already-connected adapter is a no-op. This lets the scenario executor's auto-connect step (``_voice_connect_all``) coexist with explicit harness-driven connects (``TwilioHarness`` ``__aenter__``) that already brought the adapter up before scenario.run() was called. """ if self._rest is not None: return if self.public_base_url is None: raise RuntimeError( "TwilioAgentAdapter: public_base_url is required. Wrap the " "adapter in scenario.voice.testing.TwilioHarness, or supply " "a stable public HTTPS URL that routes to this machine." ) self._rest = TwilioRESTHelper(self.account_sid, self.auth_token) self._phone_number_sid = self._rest.resolve_phone_number_sid(self.phone_number) self._stream_connected = asyncio.Event() self._inbound_queue = asyncio.Queue() self._server_shutdown = asyncio.Event() self._mode = "idle" # Webhook server is its own unit — see _twilio_server.py. The # adapter only orchestrates lifecycle; the routes, signature # validation, and WS framing live in TwilioWebhookServer. from ._twilio_server import TwilioWebhookServer self._webhook_server: Optional[TwilioWebhookServer] = TwilioWebhookServer(self) self._server_task = asyncio.create_task(self._run_server()) # Give uvicorn a beat to bind the port before Twilio hits it. await asyncio.sleep(0.2) async def disconnect(self) -> None: """Restore prior voice_url (answer mode only), tear down server. Best-effort on errors. In caller mode we never touched the Twilio number's voice_url, so there's nothing to restore. """ if self._rest is None: return # 1. Restore webhook first so Twilio doesn't keep hitting a dead URL. if self._mode == "answer" and self._phone_number_sid is not None: with suppress(Exception): prior = self._prior_voice_url or "" self._rest.write_voice_url(self._phone_number_sid, prior) logger.debug( "TwilioAgentAdapter: restored voice_url=%r on %s", prior, self._phone_number_sid, ) # place_call() rewrites the CALLEE's voice_url to attach Media # Streams to B-leg. Restore that too. if self._mode == "call" and self._callee_phone_number_sid is not None: with suppress(Exception): prior_b = self._prior_callee_voice_url or "" self._rest.write_voice_url(self._callee_phone_number_sid, prior_b) logger.debug( "TwilioAgentAdapter: restored callee voice_url=%r on %s", prior_b, self._callee_phone_number_sid, ) # 2. Signal server to shut down, then wait for the task. if self._server_shutdown is not None: self._server_shutdown.set() if self._server_task is not None: with suppress(Exception): await asyncio.wait_for(self._server_task, timeout=3.0) # 3. Reset state. self._rest = None self._phone_number_sid = None self._prior_voice_url = None self._callee_phone_number_sid = None self._prior_callee_voice_url = None self._mode = "idle" self._server_task = None self._server_shutdown = None self._call_sid = None self._stream_sid = None self._stream_connected = None self._stream_ws = None self._inbound_queue = None # ------------------------------------------------------------------ direction async def place_call( self, to: str, *, timeout: float = 120.0, attach_stream_to_self: bool = True, ) -> None: """ Originate an outbound call from this adapter's Twilio number to ``to``. Twilio's REST ``Calls.create`` runs TwiML on TWO legs of the resulting call: - **A-leg** (the originator, ``from_=self.phone_number``): runs the inline TwiML passed via ``twiml=`` to ``Calls.create``. We use ``<Pause length=120>`` so the originator just holds the bridge open while the demo runs. - **B-leg** (the callee, ``to=``): when Twilio dials B and B picks up, B's number's ``voice_url`` fires — that's where the bridge's Media Streams attach. This is identical to the inbound demo's flow: B's voice_url returns ``<Connect><Stream>``, the WS opens, audio flows. So ``place_call`` only makes sense when ``to`` is another Twilio number on this account whose ``voice_url`` is set to OUR harness webhook. To make that wiring automatic, ``place_call`` temporarily rewrites B's ``voice_url`` for the duration of the call and restores it on ``disconnect``. The harness on this adapter's own number does NOT need to be answer-mode — the Stream attaches to B's leg, NOT this adapter's leg, but B's webhook is hosted on this adapter's local server, so the WS still lands here. The bidirectional audio model is unchanged: ``send_audio`` writes frames over the WS (B hears them and bridges to A), ``recv_audio`` reads inbound frames off the WS (whatever the bridge mixes from both legs). Limitation: ``to`` MUST be a phone number on this same Twilio account. Calling an external PSTN endpoint (a real cell phone) requires a different topology (``<Start><Stream>`` + ``<Dial>``) which we don't implement here because the inline TwiML route on the A-leg can't capture B's audio when B is external. Default timeout 120s covers cloudflared cold-start latency. Raises: RuntimeError: If called after ``wait_for_call()`` (modes are exclusive per adapter instance), or if ``to`` is not a Twilio number on this account. ValueError: If ``to`` is not in E.164 format. asyncio.TimeoutError: If the media stream doesn't open within ``timeout`` seconds. """ self._assert_connected() self._enter_mode("call") validate_e164(to) assert self.public_base_url is not None assert self._rest is not None assert self._stream_connected is not None if attach_stream_to_self: # Resolve B-leg's number SID and snapshot+rewrite its voice_url so # B's leg attaches its Media Stream to our harness webhook. We own # this number (same Twilio account); disconnect() will restore. self._callee_phone_number_sid = self._rest.resolve_phone_number_sid(to) self._prior_callee_voice_url = self._rest.read_voice_url( self._callee_phone_number_sid ) webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice" self._rest.write_voice_url(self._callee_phone_number_sid, webhook_url) logger.info( "TwilioAgentAdapter: rewrote callee %s voice_url to %s", _redact_e164(to), webhook_url, ) # A-leg TwiML: play a short deterministic <Say> line, then hold # the bridge open. Twilio runs this on the originator side while # B's webhook attaches the Media Stream. # # The <Say> gives the recording a known-good utterance to # transcribe. A bare <Pause> alone produces 120s of line silence # that Whisper has been observed to hallucinate as non-English # text (issue #465 in this PR). The Say is a one-time anchor at # call setup; the Media Stream carries the real bidirectional # conversation that follows. inline_a_leg_twiml = ( '<?xml version="1.0" encoding="UTF-8"?>' "<Response>" f'<Say voice="Polly.Joanna">{PLACE_CALL_A_LEG_SAY_TEXT}</Say>' '<Pause length="120"/>' "</Response>" ) self._call_sid = self._rest.place_call( to=to, from_=self.phone_number, twiml=inline_a_leg_twiml ) logger.info( "TwilioAgentAdapter: placed call %s from %s to %s", self._call_sid, _redact_e164(self.phone_number), _redact_e164(to), ) if attach_stream_to_self: # Wait for OUR webhook to fire — only meaningful when we rewrote # the callee's voice_url to point at us. In originator-only mode # (attach_stream_to_self=False), there's no stream coming to us; # the callee has its own harness which owns the stream. await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout) async def wait_for_call(self, timeout: float = 120.0) -> None: """ Block until someone dials in and the media stream is live. In **default mode** (no conference_room): the number's ``voice_url`` is overwritten to point at our webhook so inbound calls reach us. Caller (``place_call``) elsewhere will dial this number. In **conference mode**: there's no inbound call to wait for — the two-Twilio-number demo can't naturally have one side receive an inbound call when both legs need conference TwiML. Instead, we ORIGINATE an outbound call FROM this adapter's number TO itself with inline TwiML that opens the capture stream and dials into the shared conference room. This is the symmetric counterpart to ``place_call`` in conference mode — both adapters end up as participants in the same room, exchanging audio via the bridge. Default timeout 120s covers cloudflared cold-start, conference-room formation, and Twilio's webhook ramp. Raises: RuntimeError: If called after ``place_call()``. asyncio.TimeoutError: If nobody dials in within ``timeout``. """ self._assert_connected() self._enter_mode("answer") assert self.public_base_url is not None assert self._rest is not None assert self._phone_number_sid is not None assert self._stream_connected is not None # Snapshot the prior webhook so we can restore it on disconnect, then # point the number at our server. Only answer mode does this. self._prior_voice_url = self._rest.read_voice_url(self._phone_number_sid) webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice" self._rest.write_voice_url(self._phone_number_sid, webhook_url) logger.info("TwilioAgentAdapter: webhook set to %s", webhook_url) await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout) def _enter_mode(self, mode: TwilioAdapterMode) -> None: """Transition idle → mode, or raise if already in a different mode. Modes are exclusive per connected session: an adapter can place a call or answer a call, not both. Disconnect + reconnect to reuse the instance in the other direction. """ if self._mode == mode: return # idempotent re-entry (e.g. retrying place_call after timeout) if self._mode != "idle": raise RuntimeError( f"TwilioAgentAdapter: already in {self._mode!r} mode; cannot " f"switch to {mode!r}. Disconnect and reconnect to reuse this " f"adapter in the other direction." ) self._mode = mode # ------------------------------------------------------------------ I/O async def send_audio(self, chunk: AudioChunk) -> None: # Pace at real-time (one frame per TWILIO_FRAME_MS). Without pacing the # whole utterance arrives in milliseconds, which trips bots' VAD into # a clipped-utterance reading. self._assert_stream_live() ws = self._stream_ws stream_sid = self._stream_sid assert ws is not None and stream_sid is not None mulaw = pcm16_24k_to_mulaw8k(chunk.data) frame_secs = TWILIO_FRAME_MS / 1000 for frame in iter_mulaw_frames(mulaw): if not frame: continue await ws.send_text(build_media_frame(stream_sid, frame)) await asyncio.sleep(frame_secs) async def recv_audio(self, timeout: float) -> AudioChunk: self._assert_stream_live() assert self._inbound_queue is not None return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout) async def send_dtmf(self, tones: str) -> None: """Send DTMF digits on the live call (uses Twilio REST ``<Play digits>``).""" if self._rest is None or self._call_sid is None: raise RuntimeError("TwilioAgentAdapter: no active call; send_dtmf requires an in-progress call") # Run blocking REST call off-thread so we don't stall the event loop. await asyncio.to_thread(self._rest.send_dtmf_on_call, self._call_sid, tones) async def interrupt(self) -> None: """Drop any buffered outbound audio on Twilio's side (``clear`` event).""" self._assert_stream_live() ws = self._stream_ws stream_sid = self._stream_sid assert ws is not None and stream_sid is not None await ws.send_text(build_clear_frame(stream_sid)) # ------------------------------------------------------------------ server async def _run_server(self) -> None: """Thin lifecycle wrapper; the real work lives in TwilioWebhookServer.""" assert self._webhook_server is not None await self._webhook_server.run() def _build_app(self) -> Any: """Test seam: build the FastAPI app for in-process exercise. Production code never calls this — the server's ``run()`` builds the app itself when uvicorn starts. Existing unit tests use ``TestClient(_build_app())`` to exercise the routes without binding a port; the delegation keeps that test surface stable. """ assert self._webhook_server is not None return self._webhook_server.build_app() async def _media_stream_loop(self, ws: Any) -> None: """Test seam: kick off the Media Streams WS loop directly. The two-adapter-bridge test in ``tests/voice/test_twilio_two_adapter_bridge.py`` uses this to drive a loopback WS without going through the FastAPI route wrapper. Production code reaches the loop via the ``/twilio/stream`` WebSocket handler defined in ``_twilio_server.build_app``. """ assert self._webhook_server is not None await self._webhook_server.media_stream_loop(ws) # ------------------------------------------------------------------ assertions def _assert_connected(self) -> None: if self._rest is None: raise RuntimeError("TwilioAgentAdapter: not connected; call connect() or use `async with`.") def _assert_stream_live(self) -> None: self._assert_connected() if self._stream_ws is None or self._stream_sid is None: raise RuntimeError( "TwilioAgentAdapter: no live media stream. Call place_call() or " "wait_for_call() first." )Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Methods
async def connect(self) ‑> None-
Resolve number SID and start the FastAPI webhook + WS server.
Does NOT modify the Twilio account's
voice_url. That side-effect only happens whenwait_for_call()is invoked — callers (who will useplace_call()) never overwrite their number's inbound webhook, which makes caller-mode adapters safe to run against a shared pool of Twilio numbers without clobbering anyone's prod webhook.Idempotent: calling connect() on an already-connected adapter is a no-op. This lets the scenario executor's auto-connect step (
_voice_connect_all) coexist with explicit harness-driven connects (TwilioHarness__aenter__) that already brought the adapter up before scenario.run() was called.Expand source code
async def connect(self) -> None: """Resolve number SID and start the FastAPI webhook + WS server. Does NOT modify the Twilio account's ``voice_url``. That side-effect only happens when ``wait_for_call()`` is invoked — callers (who will use ``place_call()``) never overwrite their number's inbound webhook, which makes caller-mode adapters safe to run against a shared pool of Twilio numbers without clobbering anyone's prod webhook. Idempotent: calling connect() on an already-connected adapter is a no-op. This lets the scenario executor's auto-connect step (``_voice_connect_all``) coexist with explicit harness-driven connects (``TwilioHarness`` ``__aenter__``) that already brought the adapter up before scenario.run() was called. """ if self._rest is not None: return if self.public_base_url is None: raise RuntimeError( "TwilioAgentAdapter: public_base_url is required. Wrap the " "adapter in scenario.voice.testing.TwilioHarness, or supply " "a stable public HTTPS URL that routes to this machine." ) self._rest = TwilioRESTHelper(self.account_sid, self.auth_token) self._phone_number_sid = self._rest.resolve_phone_number_sid(self.phone_number) self._stream_connected = asyncio.Event() self._inbound_queue = asyncio.Queue() self._server_shutdown = asyncio.Event() self._mode = "idle" # Webhook server is its own unit — see _twilio_server.py. The # adapter only orchestrates lifecycle; the routes, signature # validation, and WS framing live in TwilioWebhookServer. from ._twilio_server import TwilioWebhookServer self._webhook_server: Optional[TwilioWebhookServer] = TwilioWebhookServer(self) self._server_task = asyncio.create_task(self._run_server()) # Give uvicorn a beat to bind the port before Twilio hits it. await asyncio.sleep(0.2) async def disconnect(self) ‑> None-
Restore prior voice_url (answer mode only), tear down server.
Best-effort on errors. In caller mode we never touched the Twilio number's voice_url, so there's nothing to restore.
Expand source code
async def disconnect(self) -> None: """Restore prior voice_url (answer mode only), tear down server. Best-effort on errors. In caller mode we never touched the Twilio number's voice_url, so there's nothing to restore. """ if self._rest is None: return # 1. Restore webhook first so Twilio doesn't keep hitting a dead URL. if self._mode == "answer" and self._phone_number_sid is not None: with suppress(Exception): prior = self._prior_voice_url or "" self._rest.write_voice_url(self._phone_number_sid, prior) logger.debug( "TwilioAgentAdapter: restored voice_url=%r on %s", prior, self._phone_number_sid, ) # place_call() rewrites the CALLEE's voice_url to attach Media # Streams to B-leg. Restore that too. if self._mode == "call" and self._callee_phone_number_sid is not None: with suppress(Exception): prior_b = self._prior_callee_voice_url or "" self._rest.write_voice_url(self._callee_phone_number_sid, prior_b) logger.debug( "TwilioAgentAdapter: restored callee voice_url=%r on %s", prior_b, self._callee_phone_number_sid, ) # 2. Signal server to shut down, then wait for the task. if self._server_shutdown is not None: self._server_shutdown.set() if self._server_task is not None: with suppress(Exception): await asyncio.wait_for(self._server_task, timeout=3.0) # 3. Reset state. self._rest = None self._phone_number_sid = None self._prior_voice_url = None self._callee_phone_number_sid = None self._prior_callee_voice_url = None self._mode = "idle" self._server_task = None self._server_shutdown = None self._call_sid = None self._stream_sid = None self._stream_connected = None self._stream_ws = None self._inbound_queue = None async def interrupt(self) ‑> None-
Drop any buffered outbound audio on Twilio's side (
clearevent).Expand source code
async def interrupt(self) -> None: """Drop any buffered outbound audio on Twilio's side (``clear`` event).""" self._assert_stream_live() ws = self._stream_ws stream_sid = self._stream_sid assert ws is not None and stream_sid is not None await ws.send_text(build_clear_frame(stream_sid)) async def place_call(self, to: str, *, timeout: float = 120.0, attach_stream_to_self: bool = True) ‑> None-
Originate an outbound call from this adapter's Twilio number to
to.Twilio's REST
Calls.createruns TwiML on TWO legs of the resulting call:- A-leg (the originator,
from_=self.phone_number): runs the inline TwiML passed viatwiml=toCalls.create. We use<Pause length=120>so the originator just holds the bridge open while the demo runs. - B-leg (the callee,
to=): when Twilio dials B and B picks up, B's number'svoice_urlfires — that's where the bridge's Media Streams attach. This is identical to the inbound demo's flow: B's voice_url returns<Connect><Stream>, the WS opens, audio flows.
So
place_callonly makes sense whentois another Twilio number on this account whosevoice_urlis set to OUR harness webhook. To make that wiring automatic,place_calltemporarily rewrites B'svoice_urlfor the duration of the call and restores it ondisconnect. The harness on this adapter's own number does NOT need to be answer-mode — the Stream attaches to B's leg, NOT this adapter's leg, but B's webhook is hosted on this adapter's local server, so the WS still lands here.The bidirectional audio model is unchanged:
send_audiowrites frames over the WS (B hears them and bridges to A),recv_audioreads inbound frames off the WS (whatever the bridge mixes from both legs).Limitation:
toMUST be a phone number on this same Twilio account. Calling an external PSTN endpoint (a real cell phone) requires a different topology (<Start><Stream>+<Dial>) which we don't implement here because the inline TwiML route on the A-leg can't capture B's audio when B is external.Default timeout 120s covers cloudflared cold-start latency.
Raises
RuntimeError- If called after
wait_for_call()(modes are exclusive per adapter instance), or iftois not a Twilio number on this account. ValueError- If
tois not in E.164 format. asyncio.TimeoutError- If the media stream doesn't open within
timeoutseconds.
Expand source code
async def place_call( self, to: str, *, timeout: float = 120.0, attach_stream_to_self: bool = True, ) -> None: """ Originate an outbound call from this adapter's Twilio number to ``to``. Twilio's REST ``Calls.create`` runs TwiML on TWO legs of the resulting call: - **A-leg** (the originator, ``from_=self.phone_number``): runs the inline TwiML passed via ``twiml=`` to ``Calls.create``. We use ``<Pause length=120>`` so the originator just holds the bridge open while the demo runs. - **B-leg** (the callee, ``to=``): when Twilio dials B and B picks up, B's number's ``voice_url`` fires — that's where the bridge's Media Streams attach. This is identical to the inbound demo's flow: B's voice_url returns ``<Connect><Stream>``, the WS opens, audio flows. So ``place_call`` only makes sense when ``to`` is another Twilio number on this account whose ``voice_url`` is set to OUR harness webhook. To make that wiring automatic, ``place_call`` temporarily rewrites B's ``voice_url`` for the duration of the call and restores it on ``disconnect``. The harness on this adapter's own number does NOT need to be answer-mode — the Stream attaches to B's leg, NOT this adapter's leg, but B's webhook is hosted on this adapter's local server, so the WS still lands here. The bidirectional audio model is unchanged: ``send_audio`` writes frames over the WS (B hears them and bridges to A), ``recv_audio`` reads inbound frames off the WS (whatever the bridge mixes from both legs). Limitation: ``to`` MUST be a phone number on this same Twilio account. Calling an external PSTN endpoint (a real cell phone) requires a different topology (``<Start><Stream>`` + ``<Dial>``) which we don't implement here because the inline TwiML route on the A-leg can't capture B's audio when B is external. Default timeout 120s covers cloudflared cold-start latency. Raises: RuntimeError: If called after ``wait_for_call()`` (modes are exclusive per adapter instance), or if ``to`` is not a Twilio number on this account. ValueError: If ``to`` is not in E.164 format. asyncio.TimeoutError: If the media stream doesn't open within ``timeout`` seconds. """ self._assert_connected() self._enter_mode("call") validate_e164(to) assert self.public_base_url is not None assert self._rest is not None assert self._stream_connected is not None if attach_stream_to_self: # Resolve B-leg's number SID and snapshot+rewrite its voice_url so # B's leg attaches its Media Stream to our harness webhook. We own # this number (same Twilio account); disconnect() will restore. self._callee_phone_number_sid = self._rest.resolve_phone_number_sid(to) self._prior_callee_voice_url = self._rest.read_voice_url( self._callee_phone_number_sid ) webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice" self._rest.write_voice_url(self._callee_phone_number_sid, webhook_url) logger.info( "TwilioAgentAdapter: rewrote callee %s voice_url to %s", _redact_e164(to), webhook_url, ) # A-leg TwiML: play a short deterministic <Say> line, then hold # the bridge open. Twilio runs this on the originator side while # B's webhook attaches the Media Stream. # # The <Say> gives the recording a known-good utterance to # transcribe. A bare <Pause> alone produces 120s of line silence # that Whisper has been observed to hallucinate as non-English # text (issue #465 in this PR). The Say is a one-time anchor at # call setup; the Media Stream carries the real bidirectional # conversation that follows. inline_a_leg_twiml = ( '<?xml version="1.0" encoding="UTF-8"?>' "<Response>" f'<Say voice="Polly.Joanna">{PLACE_CALL_A_LEG_SAY_TEXT}</Say>' '<Pause length="120"/>' "</Response>" ) self._call_sid = self._rest.place_call( to=to, from_=self.phone_number, twiml=inline_a_leg_twiml ) logger.info( "TwilioAgentAdapter: placed call %s from %s to %s", self._call_sid, _redact_e164(self.phone_number), _redact_e164(to), ) if attach_stream_to_self: # Wait for OUR webhook to fire — only meaningful when we rewrote # the callee's voice_url to point at us. In originator-only mode # (attach_stream_to_self=False), there's no stream coming to us; # the callee has its own harness which owns the stream. await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout) - A-leg (the originator,
async def send_dtmf(self, tones: str) ‑> None-
Send DTMF digits on the live call (uses Twilio REST
<Play digits>).Expand source code
async def send_dtmf(self, tones: str) -> None: """Send DTMF digits on the live call (uses Twilio REST ``<Play digits>``).""" if self._rest is None or self._call_sid is None: raise RuntimeError("TwilioAgentAdapter: no active call; send_dtmf requires an in-progress call") # Run blocking REST call off-thread so we don't stall the event loop. await asyncio.to_thread(self._rest.send_dtmf_on_call, self._call_sid, tones) async def wait_for_call(self, timeout: float = 120.0) ‑> None-
Block until someone dials in and the media stream is live.
In default mode (no conference_room): the number's
voice_urlis overwritten to point at our webhook so inbound calls reach us. Caller (place_call) elsewhere will dial this number.In conference mode: there's no inbound call to wait for — the two-Twilio-number demo can't naturally have one side receive an inbound call when both legs need conference TwiML. Instead, we ORIGINATE an outbound call FROM this adapter's number TO itself with inline TwiML that opens the capture stream and dials into the shared conference room. This is the symmetric counterpart to
place_callin conference mode — both adapters end up as participants in the same room, exchanging audio via the bridge.Default timeout 120s covers cloudflared cold-start, conference-room formation, and Twilio's webhook ramp.
Raises
RuntimeError- If called after
place_call(). asyncio.TimeoutError- If nobody dials in within
timeout.
Expand source code
async def wait_for_call(self, timeout: float = 120.0) -> None: """ Block until someone dials in and the media stream is live. In **default mode** (no conference_room): the number's ``voice_url`` is overwritten to point at our webhook so inbound calls reach us. Caller (``place_call``) elsewhere will dial this number. In **conference mode**: there's no inbound call to wait for — the two-Twilio-number demo can't naturally have one side receive an inbound call when both legs need conference TwiML. Instead, we ORIGINATE an outbound call FROM this adapter's number TO itself with inline TwiML that opens the capture stream and dials into the shared conference room. This is the symmetric counterpart to ``place_call`` in conference mode — both adapters end up as participants in the same room, exchanging audio via the bridge. Default timeout 120s covers cloudflared cold-start, conference-room formation, and Twilio's webhook ramp. Raises: RuntimeError: If called after ``place_call()``. asyncio.TimeoutError: If nobody dials in within ``timeout``. """ self._assert_connected() self._enter_mode("answer") assert self.public_base_url is not None assert self._rest is not None assert self._phone_number_sid is not None assert self._stream_connected is not None # Snapshot the prior webhook so we can restore it on disconnect, then # point the number at our server. Only answer mode does this. self._prior_voice_url = self._rest.read_voice_url(self._phone_number_sid) webhook_url = self.public_base_url.rstrip("/") + "/twilio/voice" self._rest.write_voice_url(self._phone_number_sid, webhook_url) logger.info("TwilioAgentAdapter: webhook set to %s", webhook_url) await asyncio.wait_for(self._stream_connected.wait(), timeout=timeout)
Inherited members
class VapiAgentAdapter (assistant_id: str, api_key: str)-
Abstract base for voice agents that exchange audio with the agent under test.
Subclasses implement
connect,disconnect,send_audio, andrecv_audio. The defaultcallimplementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.Attributes
capabilities- Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout-
Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.
60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.
Override per-adapter at construction time::
adapter = MyVoiceAdapter() adapter.response_timeout = 90.0 # slow tool-call chain
Expand source code
class VapiAgentAdapter(VoiceAgentAdapter): capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=True, native_vad=True, dtmf=False, input_formats=["pcm16/16000"], output_formats=["pcm16/16000"], ) def __init__(self, assistant_id: str, api_key: str): super().__init__() self.assistant_id = assistant_id self.api_key = api_key self.websocket_call_url: Optional[str] = None self._ws: Optional[object] = None async def connect(self) -> None: # Integration: POST to Vapi REST API to get websocketCallUrl, then # open websocket. self.websocket_call_url = f"wss://vapi.ai/ws/{self.assistant_id}" self._ws = object() async def disconnect(self) -> None: self._ws = None async def send_audio(self, chunk: AudioChunk) -> None: if self._ws is None: raise RuntimeError("VapiAgentAdapter: not connected") raise PendingTransportError("VapiAgentAdapter") async def recv_audio(self, timeout: float) -> AudioChunk: if self._ws is None: raise RuntimeError("VapiAgentAdapter: not connected") raise PendingTransportError("VapiAgentAdapter") def __repr__(self) -> str: # redact credentials return f"VapiAgentAdapter(assistant_id={self.assistant_id!r}, api_key='***')"Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Inherited members
class WebRTCAgentAdapter (signaling_url: str)-
Generic WebRTC adapter that negotiates via an HTTP signaling URL.
Expand source code
class WebRTCAgentAdapter(VoiceAgentAdapter): """Generic WebRTC adapter that negotiates via an HTTP signaling URL.""" capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=False, native_vad=False, dtmf=False, input_formats=["pcm16/24000"], output_formats=["pcm16/24000"], ) def __init__(self, signaling_url: str): super().__init__() self.signaling_url = signaling_url self._pc: Optional[Any] = None self._inbound_audio: "asyncio.Queue[AudioChunk]" = asyncio.Queue() async def connect(self) -> None: # Deferred: actual SDP exchange requires a reachable signaling server. # Tested at @integration level with a loopback aiortc peer. self._pc = object() # sentinel — mark "connected" async def disconnect(self) -> None: self._pc = None async def send_audio(self, chunk: AudioChunk) -> None: if self._pc is None: raise RuntimeError(f"{type(self).__name__}: not connected") raise PendingTransportError(type(self).__name__) async def recv_audio(self, timeout: float) -> AudioChunk: if self._pc is None: raise RuntimeError(f"{type(self).__name__}: not connected") # If a subclass populated _inbound_audio, return from it. if not self._inbound_audio.empty(): return await asyncio.wait_for(self._inbound_audio.get(), timeout=timeout) raise PendingTransportError(type(self).__name__)Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Inherited members
class WebSocketAgentAdapter (url: str, protocol: WebSocketProtocol)-
Connects to an arbitrary WebSocket endpoint using a user-supplied protocol.
The protocol's
encode_audiois called before sending;decode_responseis called on each inbound frame until an AudioChunk is produced.Expand source code
class WebSocketAgentAdapter(VoiceAgentAdapter): """ Connects to an arbitrary WebSocket endpoint using a user-supplied protocol. The protocol's ``encode_audio`` is called before sending; ``decode_response`` is called on each inbound frame until an AudioChunk is produced. """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=False, native_vad=False, dtmf=False, input_formats=["pcm16/24000"], output_formats=["pcm16/24000"], ) def __init__(self, url: str, protocol: WebSocketProtocol): super().__init__() self.url = url self.protocol = protocol self._ws: Optional[Any] = None async def connect(self) -> None: import websockets # hard dep self._ws = await websockets.connect(self.url) async def disconnect(self) -> None: if self._ws is not None: await self._ws.close() self._ws = None async def send_audio(self, chunk: AudioChunk) -> None: if self._ws is None: raise RuntimeError(f"{type(self).__name__}: not connected") payload = self.protocol.encode_audio(chunk.data) await self._ws.send(payload) async def recv_audio(self, timeout: float) -> AudioChunk: if self._ws is None: raise RuntimeError(f"{type(self).__name__}: not connected") loop = asyncio.get_running_loop() deadline = loop.time() + timeout while True: remaining = max(0.0, deadline - loop.time()) message = await asyncio.wait_for(self._ws.recv(), timeout=remaining) chunk = self.protocol.decode_response(message) if chunk is not None: return chunkAncestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Inherited members
class WebSocketProtocol-
Encoder/decoder pair for a custom WebSocket audio protocol.
Expand source code
class WebSocketProtocol(ABC): """Encoder/decoder pair for a custom WebSocket audio protocol.""" @abstractmethod def encode_audio(self, audio: bytes) -> Any: """Convert PCM16 bytes into the wire representation the server expects.""" @abstractmethod def decode_response(self, message: Any) -> Optional[AudioChunk]: """Parse a server message into an AudioChunk, or None if it's not audio."""Ancestors
- abc.ABC
Methods
def decode_response(self, message: Any) ‑> AudioChunk | None-
Parse a server message into an AudioChunk, or None if it's not audio.
Expand source code
@abstractmethod def decode_response(self, message: Any) -> Optional[AudioChunk]: """Parse a server message into an AudioChunk, or None if it's not audio.""" def encode_audio(self, audio: bytes) ‑> Any-
Convert PCM16 bytes into the wire representation the server expects.
Expand source code
@abstractmethod def encode_audio(self, audio: bytes) -> Any: """Convert PCM16 bytes into the wire representation the server expects."""