Module scenario.voice.adapters.websocket
Generic WebSocket adapter: bring-your-own protocol.
Users either subclass WebSocketAgentAdapter or pass a WebSocketProtocol that
encodes audio on the wire and decodes responses. This is the escape hatch for
custom voice backends that don't fit one of the platform-specific adapters.
Expand source code
"""
Generic WebSocket adapter: bring-your-own protocol.
Users either subclass ``WebSocketAgentAdapter`` or pass a ``WebSocketProtocol`` that
encodes audio on the wire and decodes responses. This is the escape hatch for
custom voice backends that don't fit one of the platform-specific adapters.
"""
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from typing import Any, ClassVar, Optional
from ..adapter import VoiceAgentAdapter
from ..audio_chunk import AudioChunk
from ..capabilities import AdapterCapabilities
class WebSocketProtocol(ABC):
"""Encoder/decoder pair for a custom WebSocket audio protocol."""
@abstractmethod
def encode_audio(self, audio: bytes) -> Any:
"""Convert PCM16 bytes into the wire representation the server expects."""
@abstractmethod
def decode_response(self, message: Any) -> Optional[AudioChunk]:
"""Parse a server message into an AudioChunk, or None if it's not audio."""
class WebSocketAgentAdapter(VoiceAgentAdapter):
"""
Connects to an arbitrary WebSocket endpoint using a user-supplied protocol.
The protocol's ``encode_audio`` is called before sending; ``decode_response``
is called on each inbound frame until an AudioChunk is produced.
"""
capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
streaming_transcripts=False,
native_vad=False,
dtmf=False,
input_formats=["pcm16/24000"],
output_formats=["pcm16/24000"],
)
def __init__(self, url: str, protocol: WebSocketProtocol):
super().__init__()
self.url = url
self.protocol = protocol
self._ws: Optional[Any] = None
async def connect(self) -> None:
import websockets # hard dep
self._ws = await websockets.connect(self.url)
async def disconnect(self) -> None:
if self._ws is not None:
await self._ws.close()
self._ws = None
async def send_audio(self, chunk: AudioChunk) -> None:
if self._ws is None:
raise RuntimeError(f"{type(self).__name__}: not connected")
payload = self.protocol.encode_audio(chunk.data)
await self._ws.send(payload)
async def recv_audio(self, timeout: float) -> AudioChunk:
if self._ws is None:
raise RuntimeError(f"{type(self).__name__}: not connected")
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while True:
remaining = max(0.0, deadline - loop.time())
message = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
chunk = self.protocol.decode_response(message)
if chunk is not None:
return chunk
Classes
class WebSocketAgentAdapter (url: str, protocol: WebSocketProtocol)-
Connects to an arbitrary WebSocket endpoint using a user-supplied protocol.
The protocol's
encode_audiois called before sending;decode_responseis called on each inbound frame until an AudioChunk is produced.Expand source code
class WebSocketAgentAdapter(VoiceAgentAdapter): """ Connects to an arbitrary WebSocket endpoint using a user-supplied protocol. The protocol's ``encode_audio`` is called before sending; ``decode_response`` is called on each inbound frame until an AudioChunk is produced. """ capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities( streaming_transcripts=False, native_vad=False, dtmf=False, input_formats=["pcm16/24000"], output_formats=["pcm16/24000"], ) def __init__(self, url: str, protocol: WebSocketProtocol): super().__init__() self.url = url self.protocol = protocol self._ws: Optional[Any] = None async def connect(self) -> None: import websockets # hard dep self._ws = await websockets.connect(self.url) async def disconnect(self) -> None: if self._ws is not None: await self._ws.close() self._ws = None async def send_audio(self, chunk: AudioChunk) -> None: if self._ws is None: raise RuntimeError(f"{type(self).__name__}: not connected") payload = self.protocol.encode_audio(chunk.data) await self._ws.send(payload) async def recv_audio(self, timeout: float) -> AudioChunk: if self._ws is None: raise RuntimeError(f"{type(self).__name__}: not connected") loop = asyncio.get_running_loop() deadline = loop.time() + timeout while True: remaining = max(0.0, deadline - loop.time()) message = await asyncio.wait_for(self._ws.recv(), timeout=remaining) chunk = self.protocol.decode_response(message) if chunk is not None: return chunkAncestors
- VoiceAgentAdapter
- AgentAdapter
- abc.ABC
Class variables
var capabilities : ClassVar[AdapterCapabilities]
Inherited members
class WebSocketProtocol-
Encoder/decoder pair for a custom WebSocket audio protocol.
Expand source code
class WebSocketProtocol(ABC): """Encoder/decoder pair for a custom WebSocket audio protocol.""" @abstractmethod def encode_audio(self, audio: bytes) -> Any: """Convert PCM16 bytes into the wire representation the server expects.""" @abstractmethod def decode_response(self, message: Any) -> Optional[AudioChunk]: """Parse a server message into an AudioChunk, or None if it's not audio."""Ancestors
- abc.ABC
Methods
def decode_response(self, message: Any) ‑> AudioChunk | None-
Parse a server message into an AudioChunk, or None if it's not audio.
Expand source code
@abstractmethod def decode_response(self, message: Any) -> Optional[AudioChunk]: """Parse a server message into an AudioChunk, or None if it's not audio.""" def encode_audio(self, audio: bytes) ‑> Any-
Convert PCM16 bytes into the wire representation the server expects.
Expand source code
@abstractmethod def encode_audio(self, audio: bytes) -> Any: """Convert PCM16 bytes into the wire representation the server expects."""