Module scenario.voice.adapters.webrtc
Generic WebRTC adapter.
Uses aiortc directly (not pipecat-ai's SmallWebRTC client) per the
implementer-level decision — pipecat-ai would add multi-hundred-MB of
transitive deps for ~SDP negotiation code.
Expand source code
"""
Generic WebRTC adapter.
Uses ``aiortc`` directly (not pipecat-ai's SmallWebRTC client) per the
implementer-level decision — pipecat-ai would add multi-hundred-MB of
transitive deps for ~SDP negotiation code.
"""
from __future__ import annotations
import asyncio
from typing import Any, ClassVar, Optional
from ..adapter import VoiceAgentAdapter
from ..audio_chunk import AudioChunk
from ..capabilities import AdapterCapabilities
from ._stub import PendingTransportError
class WebRTCAgentAdapter(VoiceAgentAdapter):
"""Generic WebRTC adapter that negotiates via an HTTP signaling URL."""
capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
streaming_transcripts=False,
native_vad=False,
dtmf=False,
input_formats=["pcm16/24000"],
output_formats=["pcm16/24000"],
)
def __init__(self, signaling_url: str):
super().__init__()
self.signaling_url = signaling_url
self._pc: Optional[Any] = None
self._inbound_audio: "asyncio.Queue[AudioChunk]" = asyncio.Queue()
async def connect(self) -> None:
# Deferred: actual SDP exchange requires a reachable signaling server.
# Tested at @integration level with a loopback aiortc peer.
self._pc = object() # sentinel — mark "connected"
async def disconnect(self) -> None:
self._pc = None
async def send_audio(self, chunk: AudioChunk) -> None:
if self._pc is None:
raise RuntimeError(f"{type(self).__name__}: not connected")
raise PendingTransportError(type(self).__name__)
async def recv_audio(self, timeout: float) -> AudioChunk:
if self._pc is None:
raise RuntimeError(f"{type(self).__name__}: not connected")
# If a subclass populated _inbound_audio, return from it.
if not self._inbound_audio.empty():
return await asyncio.wait_for(self._inbound_audio.get(), timeout=timeout)
raise PendingTransportError(type(self).__name__)
Classes
class WebRTCAgentAdapter (signaling_url: str)-
Generic WebRTC adapter that negotiates via an HTTP signaling URL.
Expand source code
class WebRTCAgentAdapter(VoiceAgentAdapter): """Generic WebRTC adapter that negotiates via an HTTP signaling URL.""" capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=False, native_vad=False, dtmf=False, input_formats=["pcm16/24000"], output_formats=["pcm16/24000"], ) def __init__(self, signaling_url: str): super().__init__() self.signaling_url = signaling_url self._pc: Optional[Any] = None self._inbound_audio: "asyncio.Queue[AudioChunk]" = asyncio.Queue() async def connect(self) -> None: # Deferred: actual SDP exchange requires a reachable signaling server. # Tested at @integration level with a loopback aiortc peer. self._pc = object() # sentinel — mark "connected" async def disconnect(self) -> None: self._pc = None async def send_audio(self, chunk: AudioChunk) -> None: if self._pc is None: raise RuntimeError(f"{type(self).__name__}: not connected") raise PendingTransportError(type(self).__name__) async def recv_audio(self, timeout: float) -> AudioChunk: if self._pc is None: raise RuntimeError(f"{type(self).__name__}: not connected") # If a subclass populated _inbound_audio, return from it. if not self._inbound_audio.empty(): return await asyncio.wait_for(self._inbound_audio.get(), timeout=timeout) raise PendingTransportError(type(self).__name__)Ancestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Inherited members