Module scenario.voice.adapter
VoiceAgentAdapter — base class for voice-capable agents.
Extends AgentAdapter (text-based) with audio send/receive primitives and a
capability matrix. Concrete subclasses live under
scenario.voice.adapters (PipecatAgentAdapter, LiveKitAgentAdapter, etc.).
The scenario executor calls connect() automatically at scenario start and
disconnect() at end — users do not manage lifecycle.
The default call() implementation records the audio it sends and receives
into the executor's VoiceRecording so result.audio is populated without
each adapter needing its own bookkeeping.
Expand source code
"""
VoiceAgentAdapter — base class for voice-capable agents.
Extends AgentAdapter (text-based) with audio send/receive primitives and a
capability matrix. Concrete subclasses live under
``scenario.voice.adapters`` (PipecatAgentAdapter, LiveKitAgentAdapter, etc.).
The scenario executor calls ``connect()`` automatically at scenario start and
``disconnect()`` at end — users do not manage lifecycle.
The default ``call()`` implementation records the audio it sends and receives
into the executor's ``VoiceRecording`` so ``result.audio`` is populated without
each adapter needing its own bookkeeping.
"""
from __future__ import annotations
import asyncio
import logging
import time
from abc import abstractmethod
from typing import Any, Callable, ClassVar, List, Optional
logger = logging.getLogger("scenario.voice")
from ..agent_adapter import AgentAdapter
from ..types import AgentInput, AgentReturnTypes, AgentRole
from .audio_chunk import AudioChunk
from .capabilities import AdapterCapabilities
from .messages import create_audio_message, extract_audio
from .recording import AudioSegment, VoiceEvent
_FIRST_CHUNK_PHASE = "first-chunk"
"""Phase marker for the first-chunk recv timeout (used in FirstChunkTimeoutError)."""
class FirstChunkTimeoutError(asyncio.TimeoutError):
"""Raised when the agent fails to send its first audio chunk within ``response_timeout``.
WHY this subclass exists: operators could not distinguish a first-chunk hang
(agent never spoke — wrong endpoint, VAD never fired, response_timeout too
short) from a tail-silence cutoff (agent finished speaking normally). The
bare ``asyncio.TimeoutError`` that escaped previously had an empty ``str()``
and no structured attributes, so log aggregators and re-raise chains had no
signal. This class embeds the phase marker (``_FIRST_CHUNK_PHASE``) in its
message, a machine-readable ``.timeout`` attribute, and chains the original
transport error via ``__cause__``.
"""
def __init__(self, *, timeout: float) -> None:
self.timeout = timeout
self.phase = _FIRST_CHUNK_PHASE
super().__init__(
f"agent did not send its first audio chunk within {timeout}s "
f"(phase={_FIRST_CHUNK_PHASE})"
)
class VoiceAgentAdapter(AgentAdapter):
"""
Abstract base for voice agents that exchange audio with the agent under test.
Subclasses implement ``connect``, ``disconnect``, ``send_audio``, and
``recv_audio``. The default ``call`` implementation threads audio extracted
from the last incoming message through the transport and wraps the response
back into an assistant message.
Attributes:
capabilities: Declaration of what the adapter can and cannot do. Each
concrete subclass must set this as a class attribute.
response_timeout: Seconds to wait for agent audio after sending user
audio. Defaults to 60 seconds.
60 seconds covers a typical real-world STT → LLM → TTS round-trip
including backoff/retry inside each provider, tool calls, and RAG
lookups. If you see TimeoutError flakes against a fast LLM-only
chain, you can lower this; if your agent does heavy processing
(MCP roundtrips, multi-step tool chains), consider raising it.
Override per-adapter at construction time::
adapter = MyVoiceAdapter()
adapter.response_timeout = 90.0 # slow tool-call chain
"""
role: ClassVar[AgentRole] = AgentRole.AGENT
capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities()
response_timeout: float = 60.0 # 60s: STT + LLM + TTS budget (see docstring)
# Tail silence: once the first agent chunk arrives, keep draining recv_audio
# until no chunk shows up within this many seconds — that's how we detect the
# agent finished talking. Without this, demos record only the first ~100ms.
response_tail_silence: float = 0.6
# Hard cap on a single agent turn's audio. Prevents runaway loops if a
# transport never signals end-of-stream. 30s = a long sentence.
response_max_duration: float = 30.0
def __init__(self) -> None:
# Per-instance event used by the interruption path to wait until
# the agent is actually speaking before firing an interrupt — so
# we don't fire ``clear`` at a silent SUT. Subclasses that
# override ``__init__`` must call ``super().__init__()``.
self._agent_speaking = asyncio.Event()
@property
def _agent_speaking_event(self) -> asyncio.Event:
"""Event set when the agent emits its first chunk of the current turn."""
# Safety net for subclasses that pre-date this base ``__init__``
# contract and didn't call ``super().__init__()``. They get a
# one-shot lazy event so the interruption path doesn't crash.
# We emit a single warning per subclass — silent fallback masks
# bugs, but a warning per call would spam the timing-critical
# interruption path. New adapters must call super().__init__().
ev = getattr(self, "_agent_speaking", None)
if ev is None:
cls = type(self)
if not getattr(cls, "_agent_speaking_lazy_warned", False):
logger.warning(
"%s.__init__() did not call super().__init__(); "
"lazily initialising _agent_speaking event. "
"Add super().__init__() to silence this warning.",
cls.__name__,
)
# setattr() form: pyright won't infer this dynamic class attr
# otherwise (reportAttributeAccessIssue). Functionally identical
# to cls._agent_speaking_lazy_warned = True.
setattr(cls, "_agent_speaking_lazy_warned", True)
ev = asyncio.Event()
self._agent_speaking = ev
return ev
@abstractmethod
async def connect(self) -> None:
"""Open the transport and prepare to exchange audio."""
@abstractmethod
async def disconnect(self) -> None:
"""Close the transport and release resources."""
@abstractmethod
async def send_audio(self, chunk: AudioChunk) -> None:
"""Transmit an AudioChunk to the agent under test."""
@abstractmethod
async def recv_audio(self, timeout: float) -> AudioChunk:
"""Receive the next AudioChunk from the agent."""
async def __aenter__(self):
# Default async context manager: subclasses don't need to
# reimplement this — they get connect/disconnect sandwiching
# for free. Override only if a transport needs extra setup
# ordering around connect.
await self.connect()
return self
async def __aexit__(self, *exc_info: Any) -> None:
await self.disconnect()
async def interrupt(self) -> None:
"""Send a first-class interrupt signal to the agent under test.
Adapters that advertise ``capabilities.interruption=True`` override
this to send the transport-native interrupt (e.g., Twilio ``clear``,
OpenAI Realtime ``response.cancel``). The agent stops generating
audio immediately — much more deterministic than racing VAD against
a wall-clock sleep.
The default raises ``UnsupportedCapabilityError``. Callers
(``scenario.interrupt()``) check ``capabilities.interruption`` and
fall back to timing-based barge-in (sending audio while the agent
is speaking) when this returns False.
"""
from .capabilities import UnsupportedCapabilityError
raise UnsupportedCapabilityError(
type(self).__name__,
"interruption",
hint=(
"This adapter has no native interrupt signal. Use the "
"timing-based barge-in pattern instead: "
"agent(wait=False) + sleep(N) + user(content), where the "
"user audio overlaps with the agent's TTS and the SUT's "
"VAD detects it."
),
)
async def call(self, input: AgentInput) -> AgentReturnTypes:
"""
Default implementation: extract audio from the latest user message,
send it, drain the agent's full response (multiple recv_audio chunks
until tail silence), record once, return as one assistant audio message.
Why drain instead of taking one chunk: TTS and realtime APIs stream
their response in many small chunks. A single recv_audio() returns the
first one only — the recorder would log ~100ms of agent audio per turn
and the judge would receive a truncated response. Draining until
tail-silence (no new chunk for ``response_tail_silence`` seconds) gives
the natural "agent finished talking" signal that works across
adapters without each one needing to know its transport's done event.
Subclasses may override this for specialised flows but will usually
inherit it.
"""
# Clear the speaking-event for this turn — set in _drain on first chunk.
self._agent_speaking_event.clear()
recorder = _AdapterRecorder(input)
incoming = extract_audio(input.new_messages[-1]) if input.new_messages else None
if incoming is not None:
# Wrap send_audio so user.start = "we began transmitting" and
# user.end = "we finished transmitting" — both real flow points.
recorder.mark_user_start()
await self.send_audio(incoming)
recorder.record_user(incoming)
# Drain. Recorder grabs agent.start at first chunk via
# mark_agent_start, so agent.start is "first chunk on the wire,"
# not "now minus merged.duration."
merged = await self._drain_agent_response(on_first_chunk=recorder.mark_agent_start)
recorder.record_agent(merged)
return create_audio_message(merged, role="assistant")
async def _drain_agent_response(
self, on_first_chunk: Optional[Callable[[], None]] = None
) -> AudioChunk:
"""Loop ``recv_audio`` until tail silence or max duration; merge result.
``on_first_chunk`` is invoked synchronously the moment the first
non-empty audio chunk arrives — used by the recorder to capture
agent.start at a real flow point rather than back-computing from
the merged-chunk duration.
"""
try:
first = await self.recv_audio(timeout=self.response_timeout)
except asyncio.TimeoutError as err:
raise FirstChunkTimeoutError(timeout=self.response_timeout) from err
# First chunk arrived → agent is now speaking. Wakes anyone awaiting
# _agent_speaking_event (the interruption path).
if first.data and on_first_chunk is not None:
on_first_chunk()
self._agent_speaking_event.set()
chunks: List[AudioChunk] = [first]
accumulated = first.duration_seconds
while accumulated < self.response_max_duration:
try:
nxt = await self.recv_audio(timeout=self.response_tail_silence)
except asyncio.TimeoutError:
break
if not nxt.data:
break
chunks.append(nxt)
accumulated += nxt.duration_seconds
return _merge_chunks(chunks)
class _AdapterRecorder:
"""Bridges a single call() turn's audio and timing into the executor state.
Kept as a private helper so the default ``VoiceAgentAdapter.call`` stays
short and each subclass can opt-out by overriding ``call()``.
Timing model: every segment's start/end is captured at a real audio
flow point — when transmission begins, when it ends, when the first
chunk arrives. Nothing is back-computed from chunk byte length, so
user and agent segments share a single timeline and do not overlap.
"""
def __init__(self, input: AgentInput) -> None:
# ``scenario_state`` is declared on AgentInput, but tests use lightweight
# _FakeInput stubs that don't carry it. Guard so the recorder
# degrades to a no-op (segments unwritten) instead of crashing the
# call(), matching the established test-double seam.
state = getattr(input, "scenario_state", None)
executor = getattr(state, "_executor", None) if state is not None else None
self._executor = executor
self._user_start: Optional[float] = None
self._user_end: Optional[float] = None
self._agent_start: Optional[float] = None
def _offset(self) -> float:
anchor = getattr(self._executor, "_voice_recording_started_at", None)
if anchor is None:
return 0.0
return time.monotonic() - anchor
def mark_user_start(self) -> None:
"""Capture the moment send_audio begins transmitting user audio."""
self._user_start = self._offset()
def record_user(self, chunk: AudioChunk) -> None:
"""Finalise the user segment after send_audio returns.
Uses real flow timestamps: start = when transmission began,
end = now (transmission complete). The chunk's intrinsic
duration is metadata only, not used to compute timestamps.
"""
if self._executor is None or not chunk.data:
return
# mark_user_start should have been called; if not (e.g. an adapter
# subclass invokes record_user directly), fall back to back-
# computation so the segment is at least roughly placed.
end = self._offset()
start = self._user_start if self._user_start is not None else max(
0.0, end - chunk.duration_seconds
)
self._user_end = end
write_user_segment(self._executor, chunk, start, end)
def mark_agent_start(self) -> None:
"""Capture the moment the first agent chunk arrives.
Called by ``_drain_agent_response`` synchronously when its first
non-empty chunk lands, so the agent segment's start reflects when
audio actually started flowing back from the AUT — not when drain
eventually returns.
"""
self._agent_start = self._offset()
def record_agent(self, chunk: AudioChunk) -> None:
"""Finalise the agent segment after drain completes.
start = when first chunk arrived (captured by mark_agent_start).
end = now (drain has settled).
latency = agent.start - user.end. Real measurement; no clamp.
"""
if self._executor is None or not chunk.data:
return
_fire_audio_chunk(self._executor, chunk)
end = self._offset()
# Fall back if a subclass bypassed the on_first_chunk hook.
start = self._agent_start if self._agent_start is not None else max(
0.0, end - chunk.duration_seconds
)
_append_segment(self._executor, "agent", start, end, chunk)
latency = None
if self._user_end is not None:
latency = start - self._user_end
# Negative latency means the agent began emitting audio before
# the user audio finished transmitting — which the wire model
# forbids on serial adapters. Treat as a measurement artefact
# and skip the record so p50/p95 aren't poisoned.
if latency >= 0:
_record_latency(self._executor, latency)
else:
latency = None
_append_event(
self._executor,
VoiceEvent(time=start, type="agent_start_speaking", latency=latency),
)
_append_event(self._executor, VoiceEvent(time=end, type="agent_stop_speaking"))
def _merge_chunks(chunks: List[AudioChunk]) -> AudioChunk:
"""Concatenate PCM bytes from drained agent chunks into one AudioChunk.
Transcripts: each adapter populates ``chunk.transcript`` differently —
some on the last chunk (after STT settles), some incrementally. Joining
non-empty transcripts with a space preserves whatever the adapter shipped
without forcing adapters to coordinate.
"""
if len(chunks) == 1:
return chunks[0]
data = b"".join(c.data for c in chunks)
parts = [c.transcript for c in chunks if c.transcript]
transcript = " ".join(parts) if parts else None
return AudioChunk(data=data, transcript=transcript)
def write_user_segment(executor, chunk: AudioChunk, start: float, end: float) -> None:
"""Append a finalised user segment + start/stop timeline events.
Single path that both ``_AdapterRecorder.record_user`` (the default
``call()`` flow) and ``ScenarioExecutor._record_interrupt_user_segment``
(the barge-in flow that bypasses the recorder) call into. Previously
those two paths each open-coded the same four-step sequence
(``_fire_audio_chunk`` + ``_append_segment`` + two ``_append_event``s),
drifting apart as the timing model evolved.
"""
if executor is None or not chunk.data:
return
_fire_audio_chunk(executor, chunk)
_append_segment(executor, "user", start, end, chunk)
_append_event(executor, VoiceEvent(time=start, type="user_start_speaking"))
_append_event(executor, VoiceEvent(time=end, type="user_stop_speaking"))
def _append_segment(executor, speaker: str, start: float, end: float, chunk: AudioChunk) -> None:
recording = getattr(executor, "_voice_recording", None)
if recording is None:
return
recording.segments.append(
AudioSegment(
speaker=speaker, # type: ignore[arg-type]
start_time=start,
end_time=end,
audio=chunk.data,
transcript=chunk.transcript,
)
)
def _append_event(executor, event: VoiceEvent) -> None:
timeline = getattr(executor, "_voice_timeline", None)
if timeline is None:
return
timeline.append(event)
hook = getattr(executor, "_on_voice_event", None)
if hook is not None:
try:
hook(event)
except Exception:
logger.warning(
"on_voice_event callback raised; continuing scenario.",
exc_info=True,
)
def _fire_audio_chunk(executor, chunk: AudioChunk) -> None:
hook = getattr(executor, "_on_audio_chunk", None)
if hook is None:
return
try:
hook(chunk)
except Exception:
logger.warning(
"on_audio_chunk callback raised; continuing scenario.",
exc_info=True,
)
def _record_latency(executor, latency: float) -> None:
metrics = getattr(executor, "_voice_latency", None)
if metrics is None:
return
metrics.measurements.append(latency)
if metrics.time_to_first_byte is None:
metrics.time_to_first_byte = latency
Functions
def write_user_segment(executor, chunk: AudioChunk, start: float, end: float) ‑> None-
Append a finalised user segment + start/stop timeline events.
Single path that both
_AdapterRecorder.record_user(the defaultcall()flow) andScenarioExecutor._record_interrupt_user_segment(the barge-in flow that bypasses the recorder) call into. Previously those two paths each open-coded the same four-step sequence (_fire_audio_chunk+_append_segment+ two_append_events), drifting apart as the timing model evolved.Expand source code
def write_user_segment(executor, chunk: AudioChunk, start: float, end: float) -> None: """Append a finalised user segment + start/stop timeline events. Single path that both ``_AdapterRecorder.record_user`` (the default ``call()`` flow) and ``ScenarioExecutor._record_interrupt_user_segment`` (the barge-in flow that bypasses the recorder) call into. Previously those two paths each open-coded the same four-step sequence (``_fire_audio_chunk`` + ``_append_segment`` + two ``_append_event``s), drifting apart as the timing model evolved. """ if executor is None or not chunk.data: return _fire_audio_chunk(executor, chunk) _append_segment(executor, "user", start, end, chunk) _append_event(executor, VoiceEvent(time=start, type="user_start_speaking")) _append_event(executor, VoiceEvent(time=end, type="user_stop_speaking"))
Classes
class FirstChunkTimeoutError (*, timeout: float)-
Raised when the agent fails to send its first audio chunk within
response_timeout.WHY this subclass exists: operators could not distinguish a first-chunk hang (agent never spoke — wrong endpoint, VAD never fired, response_timeout too short) from a tail-silence cutoff (agent finished speaking normally). The bare
asyncio.TimeoutErrorthat escaped previously had an emptystr()and no structured attributes, so log aggregators and re-raise chains had no signal. This class embeds the phase marker (_FIRST_CHUNK_PHASE) in its message, a machine-readable.timeoutattribute, and chains the original transport error via__cause__.Expand source code
class FirstChunkTimeoutError(asyncio.TimeoutError): """Raised when the agent fails to send its first audio chunk within ``response_timeout``. WHY this subclass exists: operators could not distinguish a first-chunk hang (agent never spoke — wrong endpoint, VAD never fired, response_timeout too short) from a tail-silence cutoff (agent finished speaking normally). The bare ``asyncio.TimeoutError`` that escaped previously had an empty ``str()`` and no structured attributes, so log aggregators and re-raise chains had no signal. This class embeds the phase marker (``_FIRST_CHUNK_PHASE``) in its message, a machine-readable ``.timeout`` attribute, and chains the original transport error via ``__cause__``. """ def __init__(self, *, timeout: float) -> None: self.timeout = timeout self.phase = _FIRST_CHUNK_PHASE super().__init__( f"agent did not send its first audio chunk within {timeout}s " f"(phase={_FIRST_CHUNK_PHASE})" )Ancestors
- builtins.TimeoutError
- builtins.OSError
- builtins.Exception
- builtins.BaseException
class VoiceAgentAdapter-
Abstract base for voice agents that exchange audio with the agent under test.
Subclasses implement
connect,disconnect,send_audio, andrecv_audio. The defaultcallimplementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message.Attributes
capabilities- Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute.
response_timeout-
Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds.
60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it.
Override per-adapter at construction time::
adapter = MyVoiceAdapter() adapter.response_timeout = 90.0 # slow tool-call chain
Expand source code
class VoiceAgentAdapter(AgentAdapter): """ Abstract base for voice agents that exchange audio with the agent under test. Subclasses implement ``connect``, ``disconnect``, ``send_audio``, and ``recv_audio``. The default ``call`` implementation threads audio extracted from the last incoming message through the transport and wraps the response back into an assistant message. Attributes: capabilities: Declaration of what the adapter can and cannot do. Each concrete subclass must set this as a class attribute. response_timeout: Seconds to wait for agent audio after sending user audio. Defaults to 60 seconds. 60 seconds covers a typical real-world STT → LLM → TTS round-trip including backoff/retry inside each provider, tool calls, and RAG lookups. If you see TimeoutError flakes against a fast LLM-only chain, you can lower this; if your agent does heavy processing (MCP roundtrips, multi-step tool chains), consider raising it. Override per-adapter at construction time:: adapter = MyVoiceAdapter() adapter.response_timeout = 90.0 # slow tool-call chain """ role: ClassVar[AgentRole] = AgentRole.AGENT capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities() response_timeout: float = 60.0 # 60s: STT + LLM + TTS budget (see docstring) # Tail silence: once the first agent chunk arrives, keep draining recv_audio # until no chunk shows up within this many seconds — that's how we detect the # agent finished talking. Without this, demos record only the first ~100ms. response_tail_silence: float = 0.6 # Hard cap on a single agent turn's audio. Prevents runaway loops if a # transport never signals end-of-stream. 30s = a long sentence. response_max_duration: float = 30.0 def __init__(self) -> None: # Per-instance event used by the interruption path to wait until # the agent is actually speaking before firing an interrupt — so # we don't fire ``clear`` at a silent SUT. Subclasses that # override ``__init__`` must call ``super().__init__()``. self._agent_speaking = asyncio.Event() @property def _agent_speaking_event(self) -> asyncio.Event: """Event set when the agent emits its first chunk of the current turn.""" # Safety net for subclasses that pre-date this base ``__init__`` # contract and didn't call ``super().__init__()``. They get a # one-shot lazy event so the interruption path doesn't crash. # We emit a single warning per subclass — silent fallback masks # bugs, but a warning per call would spam the timing-critical # interruption path. New adapters must call super().__init__(). ev = getattr(self, "_agent_speaking", None) if ev is None: cls = type(self) if not getattr(cls, "_agent_speaking_lazy_warned", False): logger.warning( "%s.__init__() did not call super().__init__(); " "lazily initialising _agent_speaking event. " "Add super().__init__() to silence this warning.", cls.__name__, ) # setattr() form: pyright won't infer this dynamic class attr # otherwise (reportAttributeAccessIssue). Functionally identical # to cls._agent_speaking_lazy_warned = True. setattr(cls, "_agent_speaking_lazy_warned", True) ev = asyncio.Event() self._agent_speaking = ev return ev @abstractmethod async def connect(self) -> None: """Open the transport and prepare to exchange audio.""" @abstractmethod async def disconnect(self) -> None: """Close the transport and release resources.""" @abstractmethod async def send_audio(self, chunk: AudioChunk) -> None: """Transmit an AudioChunk to the agent under test.""" @abstractmethod async def recv_audio(self, timeout: float) -> AudioChunk: """Receive the next AudioChunk from the agent.""" async def __aenter__(self): # Default async context manager: subclasses don't need to # reimplement this — they get connect/disconnect sandwiching # for free. Override only if a transport needs extra setup # ordering around connect. await self.connect() return self async def __aexit__(self, *exc_info: Any) -> None: await self.disconnect() async def interrupt(self) -> None: """Send a first-class interrupt signal to the agent under test. Adapters that advertise ``capabilities.interruption=True`` override this to send the transport-native interrupt (e.g., Twilio ``clear``, OpenAI Realtime ``response.cancel``). The agent stops generating audio immediately — much more deterministic than racing VAD against a wall-clock sleep. The default raises ``UnsupportedCapabilityError``. Callers (``scenario.interrupt()``) check ``capabilities.interruption`` and fall back to timing-based barge-in (sending audio while the agent is speaking) when this returns False. """ from .capabilities import UnsupportedCapabilityError raise UnsupportedCapabilityError( type(self).__name__, "interruption", hint=( "This adapter has no native interrupt signal. Use the " "timing-based barge-in pattern instead: " "agent(wait=False) + sleep(N) + user(content), where the " "user audio overlaps with the agent's TTS and the SUT's " "VAD detects it." ), ) async def call(self, input: AgentInput) -> AgentReturnTypes: """ Default implementation: extract audio from the latest user message, send it, drain the agent's full response (multiple recv_audio chunks until tail silence), record once, return as one assistant audio message. Why drain instead of taking one chunk: TTS and realtime APIs stream their response in many small chunks. A single recv_audio() returns the first one only — the recorder would log ~100ms of agent audio per turn and the judge would receive a truncated response. Draining until tail-silence (no new chunk for ``response_tail_silence`` seconds) gives the natural "agent finished talking" signal that works across adapters without each one needing to know its transport's done event. Subclasses may override this for specialised flows but will usually inherit it. """ # Clear the speaking-event for this turn — set in _drain on first chunk. self._agent_speaking_event.clear() recorder = _AdapterRecorder(input) incoming = extract_audio(input.new_messages[-1]) if input.new_messages else None if incoming is not None: # Wrap send_audio so user.start = "we began transmitting" and # user.end = "we finished transmitting" — both real flow points. recorder.mark_user_start() await self.send_audio(incoming) recorder.record_user(incoming) # Drain. Recorder grabs agent.start at first chunk via # mark_agent_start, so agent.start is "first chunk on the wire," # not "now minus merged.duration." merged = await self._drain_agent_response(on_first_chunk=recorder.mark_agent_start) recorder.record_agent(merged) return create_audio_message(merged, role="assistant") async def _drain_agent_response( self, on_first_chunk: Optional[Callable[[], None]] = None ) -> AudioChunk: """Loop ``recv_audio`` until tail silence or max duration; merge result. ``on_first_chunk`` is invoked synchronously the moment the first non-empty audio chunk arrives — used by the recorder to capture agent.start at a real flow point rather than back-computing from the merged-chunk duration. """ try: first = await self.recv_audio(timeout=self.response_timeout) except asyncio.TimeoutError as err: raise FirstChunkTimeoutError(timeout=self.response_timeout) from err # First chunk arrived → agent is now speaking. Wakes anyone awaiting # _agent_speaking_event (the interruption path). if first.data and on_first_chunk is not None: on_first_chunk() self._agent_speaking_event.set() chunks: List[AudioChunk] = [first] accumulated = first.duration_seconds while accumulated < self.response_max_duration: try: nxt = await self.recv_audio(timeout=self.response_tail_silence) except asyncio.TimeoutError: break if not nxt.data: break chunks.append(nxt) accumulated += nxt.duration_seconds return _merge_chunks(chunks)Ancestors
- AgentAdapter
- abc.ABC
Subclasses
- ComposableVoiceAgent
- ElevenLabsAgentAdapter
- GeminiLiveAgentAdapter
- LiveKitAgentAdapter
- OpenAIRealtimeAgentAdapter
- PipecatAgentAdapter
- TwilioAgentAdapter
- VapiAgentAdapter
- WebRTCAgentAdapter
- WebSocketAgentAdapter
Class variables
var capabilities : ClassVar[AdapterCapabilities]var response_max_duration : floatvar response_tail_silence : floatvar response_timeout : floatvar role : ClassVar[AgentRole]
Methods
async def call(self, input: AgentInput) ‑> str | openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam | List[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam] | ScenarioResult-
Default implementation: extract audio from the latest user message, send it, drain the agent's full response (multiple recv_audio chunks until tail silence), record once, return as one assistant audio message.
Why drain instead of taking one chunk: TTS and realtime APIs stream their response in many small chunks. A single recv_audio() returns the first one only — the recorder would log ~100ms of agent audio per turn and the judge would receive a truncated response. Draining until tail-silence (no new chunk for
response_tail_silenceseconds) gives the natural "agent finished talking" signal that works across adapters without each one needing to know its transport's done event.Subclasses may override this for specialised flows but will usually inherit it.
Expand source code
async def call(self, input: AgentInput) -> AgentReturnTypes: """ Default implementation: extract audio from the latest user message, send it, drain the agent's full response (multiple recv_audio chunks until tail silence), record once, return as one assistant audio message. Why drain instead of taking one chunk: TTS and realtime APIs stream their response in many small chunks. A single recv_audio() returns the first one only — the recorder would log ~100ms of agent audio per turn and the judge would receive a truncated response. Draining until tail-silence (no new chunk for ``response_tail_silence`` seconds) gives the natural "agent finished talking" signal that works across adapters without each one needing to know its transport's done event. Subclasses may override this for specialised flows but will usually inherit it. """ # Clear the speaking-event for this turn — set in _drain on first chunk. self._agent_speaking_event.clear() recorder = _AdapterRecorder(input) incoming = extract_audio(input.new_messages[-1]) if input.new_messages else None if incoming is not None: # Wrap send_audio so user.start = "we began transmitting" and # user.end = "we finished transmitting" — both real flow points. recorder.mark_user_start() await self.send_audio(incoming) recorder.record_user(incoming) # Drain. Recorder grabs agent.start at first chunk via # mark_agent_start, so agent.start is "first chunk on the wire," # not "now minus merged.duration." merged = await self._drain_agent_response(on_first_chunk=recorder.mark_agent_start) recorder.record_agent(merged) return create_audio_message(merged, role="assistant") async def connect(self) ‑> None-
Open the transport and prepare to exchange audio.
Expand source code
@abstractmethod async def connect(self) -> None: """Open the transport and prepare to exchange audio.""" async def disconnect(self) ‑> None-
Close the transport and release resources.
Expand source code
@abstractmethod async def disconnect(self) -> None: """Close the transport and release resources.""" async def interrupt(self) ‑> None-
Send a first-class interrupt signal to the agent under test.
Adapters that advertise
capabilities.interruption=Trueoverride this to send the transport-native interrupt (e.g., Twilioclear, OpenAI Realtimeresponse.cancel). The agent stops generating audio immediately — much more deterministic than racing VAD against a wall-clock sleep.The default raises
UnsupportedCapabilityError. Callers (interrupt()) checkcapabilities.interruptionand fall back to timing-based barge-in (sending audio while the agent is speaking) when this returns False.Expand source code
async def interrupt(self) -> None: """Send a first-class interrupt signal to the agent under test. Adapters that advertise ``capabilities.interruption=True`` override this to send the transport-native interrupt (e.g., Twilio ``clear``, OpenAI Realtime ``response.cancel``). The agent stops generating audio immediately — much more deterministic than racing VAD against a wall-clock sleep. The default raises ``UnsupportedCapabilityError``. Callers (``scenario.interrupt()``) check ``capabilities.interruption`` and fall back to timing-based barge-in (sending audio while the agent is speaking) when this returns False. """ from .capabilities import UnsupportedCapabilityError raise UnsupportedCapabilityError( type(self).__name__, "interruption", hint=( "This adapter has no native interrupt signal. Use the " "timing-based barge-in pattern instead: " "agent(wait=False) + sleep(N) + user(content), where the " "user audio overlaps with the agent's TTS and the SUT's " "VAD detects it." ), ) async def recv_audio(self, timeout: float) ‑> AudioChunk-
Receive the next AudioChunk from the agent.
Expand source code
@abstractmethod async def recv_audio(self, timeout: float) -> AudioChunk: """Receive the next AudioChunk from the agent.""" async def send_audio(self, chunk: AudioChunk) ‑> None-
Transmit an AudioChunk to the agent under test.
Expand source code
@abstractmethod async def send_audio(self, chunk: AudioChunk) -> None: """Transmit an AudioChunk to the agent under test."""