Module scenario.voice.adapters.pipecat

PipecatAgentAdapter: WebSocket client to a user-run Pipecat bot.

Source §5.1. Pipecat is a framework for BUILDING voice agents. The user runs their Pipecat bot separately (e.g. python bot.py -t twilio --port 8765) and this adapter connects as a client to exchange audio with it.

Wire protocol. A pipecat bot configured with the -t twilio transport uses TwilioFrameSerializer on its WebSocket — the same Twilio Media Streams JSON protocol scenario's TwilioAgentAdapter already speaks. Scenario impersonates Twilio: sends a synthetic start event with fake stream/call SIDs, then exchanges media events carrying base64-encoded µ-law 8kHz audio.

transport="webrtc" (SmallWebRTC) is not implemented in this PR — it stays on PendingTransportError and is tracked in a follow-up issue.

Expand source code
"""
PipecatAgentAdapter: WebSocket client to a user-run Pipecat bot.

Source §5.1. Pipecat is a framework for BUILDING voice agents. The user
runs their Pipecat bot separately (e.g. ``python bot.py -t twilio --port 8765``)
and this adapter connects as a client to exchange audio with it.

**Wire protocol.** A pipecat bot configured with the ``-t twilio`` transport
uses ``TwilioFrameSerializer`` on its WebSocket — the same Twilio Media
Streams JSON protocol scenario's ``TwilioAgentAdapter`` already speaks.
Scenario impersonates Twilio: sends a synthetic ``start`` event with fake
stream/call SIDs, then exchanges ``media`` events carrying base64-encoded
µ-law 8kHz audio.

``transport="webrtc"`` (SmallWebRTC) is not implemented in this PR — it
stays on ``PendingTransportError`` and is tracked in a follow-up issue.
"""

from __future__ import annotations

import asyncio
import json
import logging
import uuid
from typing import Any, ClassVar, Literal, Optional

from ..adapter import VoiceAgentAdapter
from ..audio_chunk import AudioChunk
from ..capabilities import AdapterCapabilities
from ._twilio_shared import (
    TWILIO_FRAME_MS,
    build_clear_frame,
    build_mark_frame,
    build_media_frame,
    iter_mulaw_frames,
    mulaw8k_to_pcm16_24k,
    parse_media_stream_frame,
    pcm16_24k_to_mulaw8k,
)


logger = logging.getLogger("scenario.voice.pipecat")


class PipecatAgentAdapter(VoiceAgentAdapter):
    """
    Test a running Pipecat bot via its exposed WebSocket endpoint.

    Transport is selected by the ``transport`` argument:
        - ``"websocket"`` (default): Twilio Media Streams protocol over WS.
          Scenario sends a synthetic ``start`` event, then ``media`` frames.
          Pipecat's ``TwilioFrameSerializer`` on the bot side handles the
          wire format.
        - ``"webrtc"``: SmallWebRTC-style negotiation. Raises
          ``PendingTransportError``; tracked as a follow-up.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # Pipecat over the Twilio WS transport speaks the Twilio Media Streams
        # protocol; the ``clear`` event drops all buffered outbound audio on
        # the bot side. That's first-class interrupt — no VAD timing race.
        interruption=True,
        input_formats=["pcm16/24000", "mulaw/8000", "opus"],
        output_formats=["pcm16/24000", "mulaw/8000", "opus"],
    )

    def __init__(
        self,
        url: Optional[str] = None,
        *,
        signaling_url: Optional[str] = None,
        transport: Literal["websocket", "webrtc"] = "websocket",
        audio_format: str = "mulaw",
        sample_rate: int = 8000,
        stream_sid: Optional[str] = None,
        call_sid: Optional[str] = None,
    ) -> None:
        super().__init__()
        if transport == "websocket" and url is None:
            raise ValueError("PipecatAgentAdapter(transport='websocket') requires url=")
        if transport == "webrtc" and signaling_url is None:
            raise ValueError("PipecatAgentAdapter(transport='webrtc') requires signaling_url=")

        self.url = url
        self.signaling_url = signaling_url
        self.transport = transport
        self.audio_format = audio_format
        self.sample_rate = sample_rate
        # Synthetic SIDs pipecat's TwilioFrameSerializer needs in the `start`
        # event. If caller doesn't supply them, we fabricate UUIDs. Pipecat
        # uses them for logging and the auto-hangup REST call; both are no-ops
        # when we're not actually going through Twilio.
        self.stream_sid = stream_sid
        self.call_sid = call_sid

        self._ws: Any = None
        self._recv_task: Optional[asyncio.Task] = None
        self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None
        # Serialises concurrent send_audio() calls — without it two paced
        # senders would interleave 20-ms mulaw frames on the wire and the
        # bot would receive corrupted audio. Used for the interruption case
        # where the executor calls send_audio() while a previous turn's
        # send is still in flight.
        self._send_lock: Optional[asyncio.Lock] = None

    @property
    def transport_format(self) -> str:
        return f"{self.audio_format}/{self.sample_rate}"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        if self.transport == "webrtc":
            from ._stub import PendingTransportError

            raise PendingTransportError(
                "PipecatAgentAdapter(transport='webrtc')"
            )

        # Lazy import so `import scenario` doesn't require websockets at the
        # top of the module-load path (it's already a hard dep, but being
        # consistent with the Twilio adapter style).
        import websockets

        assert self.url is not None  # validated in __init__
        self._ws = await websockets.connect(
            self.url, ping_interval=None, ping_timeout=None
        )
        self._inbound_queue = asyncio.Queue()
        self._send_lock = asyncio.Lock()

        # Send the synthetic `start` event that pipecat's TwilioFrameSerializer
        # requires to learn the stream/call SIDs and start deserializing
        # media frames.
        if self.stream_sid is None:
            self.stream_sid = f"MZ{uuid.uuid4().hex}"
        if self.call_sid is None:
            self.call_sid = f"CA{uuid.uuid4().hex}"

        await self._ws.send(json.dumps({"event": "connected", "protocol": "Call", "version": "1.0.0"}))
        await self._ws.send(
            json.dumps(
                {
                    "event": "start",
                    "streamSid": self.stream_sid,
                    "start": {
                        "streamSid": self.stream_sid,
                        "callSid": self.call_sid,
                        "mediaFormat": {
                            "encoding": "audio/x-mulaw",
                            "sampleRate": 8000,
                            "channels": 1,
                        },
                    },
                }
            )
        )

        self._recv_task = asyncio.create_task(self._recv_loop())
        logger.debug("PipecatAgentAdapter: connected to %s (stream=%s)", self.url, self.stream_sid)

    async def disconnect(self) -> None:
        ws = self._ws
        if ws is None:
            return

        # Send `stop` event so the bot can clean up its pipeline gracefully.
        try:
            if self.stream_sid:
                await ws.send(json.dumps({"event": "stop", "streamSid": self.stream_sid}))
        except Exception:
            logger.debug("PipecatAgentAdapter: failed to send stop frame", exc_info=True)

        if self._recv_task is not None:
            self._recv_task.cancel()
            try:
                await self._recv_task
            except asyncio.CancelledError:
                # Expected: we just cancelled it.
                pass
            except Exception:
                # Unexpected teardown error — already logging enough context
                # elsewhere; disconnect() is best-effort.
                logger.debug("PipecatAgentAdapter: recv_task raised during cancel", exc_info=True)
            self._recv_task = None

        try:
            await ws.close()
        except Exception:
            # WS may already be closed by the peer; disconnect() is best-effort.
            logger.debug("PipecatAgentAdapter: ws.close() raised", exc_info=True)

        self._ws = None
        self._inbound_queue = None
        self.stream_sid = None
        self.call_sid = None

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        # Pace at real-time (TWILIO_FRAME_MS/1000s per 20-ms frame). Matches what
        # a real caller produces over a PSTN line — the SUT sees normal speech
        # rhythm, not a synthetic dump.
        #
        # After the last frame we send a Twilio ``mark`` named "utterance_end".
        # Real-time pacing means TTS-induced inter-phrase pauses survive on the
        # wire, and a stateless inactivity-timer on the receiver can't
        # distinguish "speaker paused after a comma" from "speaker finished
        # their turn." The mark is an explicit, non-ambiguous end-of-turn
        # signal: cooperating SUTs flush on the mark; legacy SUTs fall back to
        # VAD timing.
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None and self._send_lock is not None
        mulaw = pcm16_24k_to_mulaw8k(chunk.data)
        frame_secs = TWILIO_FRAME_MS / 1000
        async with self._send_lock:
            for frame in iter_mulaw_frames(mulaw):
                if not frame:
                    continue
                await self._ws.send(build_media_frame(self.stream_sid, frame))
                await asyncio.sleep(frame_secs)
            await self._ws.send(build_mark_frame(self.stream_sid, "utterance_end"))

    async def recv_audio(self, timeout: float) -> AudioChunk:
        self._assert_connected()
        assert self._inbound_queue is not None
        return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout)

    async def interrupt(self) -> None:
        """Send a Twilio ``clear`` frame — the bot drops all buffered outbound
        audio immediately. Cooperating Pipecat bots (and any code wired to
        the Media Streams protocol) treat ``clear`` as "stop talking now."
        Use this in preference to timing-based barge-in when the SUT
        supports it: it's deterministic, doesn't depend on VAD detection
        windows, and matches the same protocol used in production.
        """
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None
        await self._ws.send(build_clear_frame(self.stream_sid))
        logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")

    # ------------------------------------------------------------------ background

    async def _recv_loop(self) -> None:
        """Read frames from pipecat, decode µ-law → PCM16 24k, enqueue."""
        assert self._ws is not None and self._inbound_queue is not None
        buffered_mulaw = bytearray()
        BATCH_MS = 100

        try:
            async for raw in self._ws:
                if isinstance(raw, bytes):
                    # pipecat sometimes emits binary frames for audio; treat
                    # as raw µ-law payload if we see one.
                    buffered_mulaw.extend(raw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    continue

                frame = parse_media_stream_frame(raw)
                if frame is None:
                    continue
                if frame.event == "media" and frame.payload_mulaw:
                    buffered_mulaw.extend(frame.payload_mulaw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                elif frame.event == "stop":
                    if buffered_mulaw:
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    return
        except asyncio.CancelledError:
            raise
        except Exception:
            logger.warning("PipecatAgentAdapter: recv loop exited with error", exc_info=True)

    # ------------------------------------------------------------------ assertions

    def _assert_connected(self) -> None:
        if self._ws is None:
            raise RuntimeError(
                "PipecatAgentAdapter: not connected. Did you forget to call connect()?"
            )

Classes

class PipecatAgentAdapter (url: Optional[str] = None, *, signaling_url: Optional[str] = None, transport: "Literal['websocket', 'webrtc']" = 'websocket', audio_format: str = 'mulaw', sample_rate: int = 8000, stream_sid: Optional[str] = None, call_sid: Optional[str] = None)

Test a running Pipecat bot via its exposed WebSocket endpoint.

Transport is selected by the transport argument: - "websocket" (default): Twilio Media Streams protocol over WS. Scenario sends a synthetic start event, then media frames. Pipecat's TwilioFrameSerializer on the bot side handles the wire format. - "webrtc": SmallWebRTC-style negotiation. Raises PendingTransportError; tracked as a follow-up.

Expand source code
class PipecatAgentAdapter(VoiceAgentAdapter):
    """
    Test a running Pipecat bot via its exposed WebSocket endpoint.

    Transport is selected by the ``transport`` argument:
        - ``"websocket"`` (default): Twilio Media Streams protocol over WS.
          Scenario sends a synthetic ``start`` event, then ``media`` frames.
          Pipecat's ``TwilioFrameSerializer`` on the bot side handles the
          wire format.
        - ``"webrtc"``: SmallWebRTC-style negotiation. Raises
          ``PendingTransportError``; tracked as a follow-up.
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # Pipecat over the Twilio WS transport speaks the Twilio Media Streams
        # protocol; the ``clear`` event drops all buffered outbound audio on
        # the bot side. That's first-class interrupt — no VAD timing race.
        interruption=True,
        input_formats=["pcm16/24000", "mulaw/8000", "opus"],
        output_formats=["pcm16/24000", "mulaw/8000", "opus"],
    )

    def __init__(
        self,
        url: Optional[str] = None,
        *,
        signaling_url: Optional[str] = None,
        transport: Literal["websocket", "webrtc"] = "websocket",
        audio_format: str = "mulaw",
        sample_rate: int = 8000,
        stream_sid: Optional[str] = None,
        call_sid: Optional[str] = None,
    ) -> None:
        super().__init__()
        if transport == "websocket" and url is None:
            raise ValueError("PipecatAgentAdapter(transport='websocket') requires url=")
        if transport == "webrtc" and signaling_url is None:
            raise ValueError("PipecatAgentAdapter(transport='webrtc') requires signaling_url=")

        self.url = url
        self.signaling_url = signaling_url
        self.transport = transport
        self.audio_format = audio_format
        self.sample_rate = sample_rate
        # Synthetic SIDs pipecat's TwilioFrameSerializer needs in the `start`
        # event. If caller doesn't supply them, we fabricate UUIDs. Pipecat
        # uses them for logging and the auto-hangup REST call; both are no-ops
        # when we're not actually going through Twilio.
        self.stream_sid = stream_sid
        self.call_sid = call_sid

        self._ws: Any = None
        self._recv_task: Optional[asyncio.Task] = None
        self._inbound_queue: Optional[asyncio.Queue[AudioChunk]] = None
        # Serialises concurrent send_audio() calls — without it two paced
        # senders would interleave 20-ms mulaw frames on the wire and the
        # bot would receive corrupted audio. Used for the interruption case
        # where the executor calls send_audio() while a previous turn's
        # send is still in flight.
        self._send_lock: Optional[asyncio.Lock] = None

    @property
    def transport_format(self) -> str:
        return f"{self.audio_format}/{self.sample_rate}"

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        if self.transport == "webrtc":
            from ._stub import PendingTransportError

            raise PendingTransportError(
                "PipecatAgentAdapter(transport='webrtc')"
            )

        # Lazy import so `import scenario` doesn't require websockets at the
        # top of the module-load path (it's already a hard dep, but being
        # consistent with the Twilio adapter style).
        import websockets

        assert self.url is not None  # validated in __init__
        self._ws = await websockets.connect(
            self.url, ping_interval=None, ping_timeout=None
        )
        self._inbound_queue = asyncio.Queue()
        self._send_lock = asyncio.Lock()

        # Send the synthetic `start` event that pipecat's TwilioFrameSerializer
        # requires to learn the stream/call SIDs and start deserializing
        # media frames.
        if self.stream_sid is None:
            self.stream_sid = f"MZ{uuid.uuid4().hex}"
        if self.call_sid is None:
            self.call_sid = f"CA{uuid.uuid4().hex}"

        await self._ws.send(json.dumps({"event": "connected", "protocol": "Call", "version": "1.0.0"}))
        await self._ws.send(
            json.dumps(
                {
                    "event": "start",
                    "streamSid": self.stream_sid,
                    "start": {
                        "streamSid": self.stream_sid,
                        "callSid": self.call_sid,
                        "mediaFormat": {
                            "encoding": "audio/x-mulaw",
                            "sampleRate": 8000,
                            "channels": 1,
                        },
                    },
                }
            )
        )

        self._recv_task = asyncio.create_task(self._recv_loop())
        logger.debug("PipecatAgentAdapter: connected to %s (stream=%s)", self.url, self.stream_sid)

    async def disconnect(self) -> None:
        ws = self._ws
        if ws is None:
            return

        # Send `stop` event so the bot can clean up its pipeline gracefully.
        try:
            if self.stream_sid:
                await ws.send(json.dumps({"event": "stop", "streamSid": self.stream_sid}))
        except Exception:
            logger.debug("PipecatAgentAdapter: failed to send stop frame", exc_info=True)

        if self._recv_task is not None:
            self._recv_task.cancel()
            try:
                await self._recv_task
            except asyncio.CancelledError:
                # Expected: we just cancelled it.
                pass
            except Exception:
                # Unexpected teardown error — already logging enough context
                # elsewhere; disconnect() is best-effort.
                logger.debug("PipecatAgentAdapter: recv_task raised during cancel", exc_info=True)
            self._recv_task = None

        try:
            await ws.close()
        except Exception:
            # WS may already be closed by the peer; disconnect() is best-effort.
            logger.debug("PipecatAgentAdapter: ws.close() raised", exc_info=True)

        self._ws = None
        self._inbound_queue = None
        self.stream_sid = None
        self.call_sid = None

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        # Pace at real-time (TWILIO_FRAME_MS/1000s per 20-ms frame). Matches what
        # a real caller produces over a PSTN line — the SUT sees normal speech
        # rhythm, not a synthetic dump.
        #
        # After the last frame we send a Twilio ``mark`` named "utterance_end".
        # Real-time pacing means TTS-induced inter-phrase pauses survive on the
        # wire, and a stateless inactivity-timer on the receiver can't
        # distinguish "speaker paused after a comma" from "speaker finished
        # their turn." The mark is an explicit, non-ambiguous end-of-turn
        # signal: cooperating SUTs flush on the mark; legacy SUTs fall back to
        # VAD timing.
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None and self._send_lock is not None
        mulaw = pcm16_24k_to_mulaw8k(chunk.data)
        frame_secs = TWILIO_FRAME_MS / 1000
        async with self._send_lock:
            for frame in iter_mulaw_frames(mulaw):
                if not frame:
                    continue
                await self._ws.send(build_media_frame(self.stream_sid, frame))
                await asyncio.sleep(frame_secs)
            await self._ws.send(build_mark_frame(self.stream_sid, "utterance_end"))

    async def recv_audio(self, timeout: float) -> AudioChunk:
        self._assert_connected()
        assert self._inbound_queue is not None
        return await asyncio.wait_for(self._inbound_queue.get(), timeout=timeout)

    async def interrupt(self) -> None:
        """Send a Twilio ``clear`` frame — the bot drops all buffered outbound
        audio immediately. Cooperating Pipecat bots (and any code wired to
        the Media Streams protocol) treat ``clear`` as "stop talking now."
        Use this in preference to timing-based barge-in when the SUT
        supports it: it's deterministic, doesn't depend on VAD detection
        windows, and matches the same protocol used in production.
        """
        self._assert_connected()
        assert self._ws is not None and self.stream_sid is not None
        await self._ws.send(build_clear_frame(self.stream_sid))
        logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")

    # ------------------------------------------------------------------ background

    async def _recv_loop(self) -> None:
        """Read frames from pipecat, decode µ-law → PCM16 24k, enqueue."""
        assert self._ws is not None and self._inbound_queue is not None
        buffered_mulaw = bytearray()
        BATCH_MS = 100

        try:
            async for raw in self._ws:
                if isinstance(raw, bytes):
                    # pipecat sometimes emits binary frames for audio; treat
                    # as raw µ-law payload if we see one.
                    buffered_mulaw.extend(raw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    continue

                frame = parse_media_stream_frame(raw)
                if frame is None:
                    continue
                if frame.event == "media" and frame.payload_mulaw:
                    buffered_mulaw.extend(frame.payload_mulaw)
                    if len(buffered_mulaw) >= (BATCH_MS * 8):
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                elif frame.event == "stop":
                    if buffered_mulaw:
                        pcm = mulaw8k_to_pcm16_24k(bytes(buffered_mulaw))
                        buffered_mulaw.clear()
                        await self._inbound_queue.put(AudioChunk(data=pcm))
                    return
        except asyncio.CancelledError:
            raise
        except Exception:
            logger.warning("PipecatAgentAdapter: recv loop exited with error", exc_info=True)

    # ------------------------------------------------------------------ assertions

    def _assert_connected(self) -> None:
        if self._ws is None:
            raise RuntimeError(
                "PipecatAgentAdapter: not connected. Did you forget to call connect()?"
            )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var transport_format : str
Expand source code
@property
def transport_format(self) -> str:
    return f"{self.audio_format}/{self.sample_rate}"

Methods

async def interrupt(self) ‑> None

Send a Twilio clear frame — the bot drops all buffered outbound audio immediately. Cooperating Pipecat bots (and any code wired to the Media Streams protocol) treat clear as "stop talking now." Use this in preference to timing-based barge-in when the SUT supports it: it's deterministic, doesn't depend on VAD detection windows, and matches the same protocol used in production.

Expand source code
async def interrupt(self) -> None:
    """Send a Twilio ``clear`` frame — the bot drops all buffered outbound
    audio immediately. Cooperating Pipecat bots (and any code wired to
    the Media Streams protocol) treat ``clear`` as "stop talking now."
    Use this in preference to timing-based barge-in when the SUT
    supports it: it's deterministic, doesn't depend on VAD detection
    windows, and matches the same protocol used in production.
    """
    self._assert_connected()
    assert self._ws is not None and self.stream_sid is not None
    await self._ws.send(build_clear_frame(self.stream_sid))
    logger.debug("PipecatAgentAdapter: sent clear frame (interrupt)")

Inherited members