Module scenario.voice.adapters.openai_realtime

OpenAIRealtimeAgentAdapter: direct-to-model adapter — the model IS the agent.

Source §5.6 + §7.2 L1164-1171. Unlike the other adapters which wrap a user's running agent, this one IS the agent under test (or, when role=AgentRole.USER, the voice-enabled user simulator).

Wire protocol (GA, post-2026-05-12): - Endpoint: wss://api.openai.com/v1/realtime?model=<model> - Headers: Authorization: Bearer <api_key> only (no OpenAI-Beta header). - On connect: emit session.update to configure session type, audio formats, voice, instructions, and tools. session.type="realtime"; audio config nested under session.audio.{input,output} with object format descriptors (e.g. {"type": "audio/pcm", "rate": 24000}). - Send audio: input_audio_buffer.append with base64-encoded PCM16. - Receive audio: loop over server events until response.output_audio.delta (GA name); legacy response.audio.delta accepted defensively with a one-time warning. Return decoded PCM16. - Transcript events: response.output_audio_transcript.delta / response.output_audio_transcript.done update instance attributes. - Send text (role=USER): conversation.item.create (input_text) then response.create.

Expand source code
"""
OpenAIRealtimeAgentAdapter: direct-to-model adapter — the model IS the agent.

Source §5.6 + §7.2 L1164-1171. Unlike the other adapters which wrap a user's
running agent, this one IS the agent under test (or, when
``role=AgentRole.USER``, the voice-enabled user simulator).

Wire protocol (GA, post-2026-05-12):
- Endpoint: ``wss://api.openai.com/v1/realtime?model=<model>``
- Headers: ``Authorization: Bearer <api_key>`` only (no ``OpenAI-Beta`` header).
- On connect: emit ``session.update`` to configure session type, audio formats,
  voice, instructions, and tools. ``session.type="realtime"``; audio config
  nested under ``session.audio.{input,output}`` with object format descriptors
  (e.g. ``{"type": "audio/pcm", "rate": 24000}``).
- Send audio: ``input_audio_buffer.append`` with base64-encoded PCM16.
- Receive audio: loop over server events until ``response.output_audio.delta``
  (GA name); legacy ``response.audio.delta`` accepted defensively with a
  one-time warning. Return decoded PCM16.
- Transcript events: ``response.output_audio_transcript.delta`` /
  ``response.output_audio_transcript.done`` update instance attributes.
- Send text (role=USER): ``conversation.item.create`` (input_text) then
  ``response.create``.
"""

from __future__ import annotations

import asyncio
import base64
import json
import logging
import os
from typing import Any, ClassVar, List, Optional

from ...config.voice_models import OPENAI_REALTIME_MODEL, OPENAI_STT_MODEL
from ...types import AgentRole
from ..adapter import VoiceAgentAdapter
from ..audio_chunk import AudioChunk
from ..capabilities import AdapterCapabilities


logger = logging.getLogger("scenario.voice.openai_realtime")

REALTIME_URL_TEMPLATE = "wss://api.openai.com/v1/realtime?model={model}"


class OpenAIRealtimeAgentAdapter(VoiceAgentAdapter):
    """
    Exercise OpenAI's Realtime API as either the agent under test
    (role=AGENT, default) or as the voice-enabled user simulator
    (role=USER, per §7.2 L1164-1171).

    When role=USER, scripted ``user("text")`` steps route text through the
    realtime session's text-input channel rather than triggering TTS.

    Transcript observability:
        - ``last_user_transcript`` — set from
          ``conversation.item.input_audio_transcription.completed``
        - ``last_agent_transcript`` — accumulated from
          ``response.output_audio_transcript.delta`` / reset on done

    Example::

        adapter = OpenAIRealtimeAgentAdapter(
            model=OPENAI_REALTIME_MODEL,
            voice="alloy",
            instructions="You are a helpful assistant.",
        )
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # OpenAI Realtime exposes ``response.cancel`` as a first-class
        # interrupt event — the model stops generating immediately. Mapped
        # below in ``interrupt()``.
        interruption=True,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        model: str = OPENAI_REALTIME_MODEL,
        voice: str = "alloy",
        instructions: str = "",
        tools: Optional[List[Any]] = None,
        *,
        api_key: Optional[str] = None,
        role: AgentRole = AgentRole.AGENT,
    ):
        super().__init__()
        self.model = model
        self.voice = voice
        self.instructions = instructions
        self.tools = tools or []
        self.role = role  # type: ignore[misc]
        # Resolve API key: explicit param takes precedence over env var.
        self._api_key: str = api_key or os.environ.get("OPENAI_API_KEY", "")
        self._ws: Any = None

        # Transcript observability — updated on incoming transcript events.
        self.last_user_transcript: Optional[str] = None
        self.last_agent_transcript: Optional[str] = None

        # Accumulation buffer for streaming agent transcript deltas.
        self._agent_transcript_buf: str = ""

        # Bytes appended to input_audio_buffer since last commit. Non-zero
        # means recv_audio should commit + request a response before awaiting.
        self._pending_audio_bytes: int = 0

        # Tracks which legacy (pre-GA) event names have already triggered a
        # one-time warning, so the log isn't spammed on every audio frame.
        self._legacy_events_warned: set[str] = set()

    @property
    def url(self) -> str:
        return REALTIME_URL_TEMPLATE.format(model=self.model)

    def __repr__(self) -> str:  # redact credentials
        return (
            f"OpenAIRealtimeAgentAdapter("
            f"model={self.model!r}, "
            f"voice={self.voice!r}, "
            f"role={self.role!r}, "
            f"api_key='***')"
        )

    def _warn_if_legacy(self, received: str, ga_name: str) -> None:
        """Emit a one-time WARNING when a pre-GA (beta) event name is seen.

        Only fires once per distinct legacy name per adapter instance, so a
        multi-chunk audio response doesn't flood the log. The GA-named event
        itself never triggers a warning.
        """
        if received != ga_name and received not in self._legacy_events_warned:
            self._legacy_events_warned.add(received)
            logger.warning(
                "OpenAIRealtimeAgentAdapter: received legacy event %r; "
                "GA uses %r — accepting defensively",
                received,
                ga_name,
            )

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open the Realtime WebSocket and send the initial session.update."""
        import websockets

        self._ws = await websockets.connect(
            self.url,
            additional_headers={
                "Authorization": f"Bearer {self._api_key}",
            },
        )
        logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url)

        # Configure session: audio formats, voice, instructions, tools.
        # Disable server-side VAD (session.audio.input.turn_detection=None) so
        # we control turn boundaries explicitly via input_audio_buffer.commit +
        # response.create after each send_audio.
        session_config: dict[str, Any] = {
            "type": "realtime",
            "audio": {
                "input": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "turn_detection": None,
                    "transcription": {"model": OPENAI_STT_MODEL},
                },
                "output": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "voice": self.voice,
                },
            },
        }
        if self.instructions:
            session_config["instructions"] = self.instructions
        if self.tools:
            session_config["tools"] = self.tools

        await self._ws.send(
            json.dumps({"type": "session.update", "session": session_config})
        )
        logger.debug("OpenAIRealtimeAgentAdapter: session.update sent")

    async def disconnect(self) -> None:
        """Close the WebSocket if open."""
        if self._ws is not None:
            try:
                await self._ws.close()
            except Exception:
                # Best-effort: connection may already be half-closed or in an
                # error state when disconnect() is called. We're tearing down
                # regardless — propagating here would just leak the WS reference.
                pass
            finally:
                self._ws = None
            logger.debug("OpenAIRealtimeAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """
        Append a PCM16 audio chunk to the model's input audio buffer.

        Only emits ``input_audio_buffer.append`` — the commit + response are
        deferred to the next ``recv_audio`` call. The scenario executor may
        call ``send_audio`` many times for a single user turn (TTS streams
        audio as chunks); committing per-chunk would confuse the server with
        sub-second turn boundaries. By deferring commit to recv_audio, we
        get one server turn per user turn.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        b64 = base64.b64encode(chunk.data).decode()
        await self._ws.send(
            json.dumps({"type": "input_audio_buffer.append", "audio": b64})
        )
        self._pending_audio_bytes += len(chunk.data)

    async def interrupt(self) -> None:
        """Send ``response.cancel`` — the OpenAI Realtime API's first-class
        interrupt. The model stops generating audio and text immediately.
        No timing race against VAD: deterministic stop, then the next user
        turn flows normally through ``send_audio`` + ``recv_audio``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        await self._ws.send(json.dumps({"type": "response.cancel"}))
        logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Commit any pending audio, request a response, and return the first
        audio chunk the model produces.

        If ``send_audio`` was called since the last ``recv_audio``, this
        method commits the buffer and emits ``response.create`` before
        awaiting the reply. Subsequent recv calls without new send calls
        just await the next audio delta (for multi-chunk responses).

        Loops over incoming events until a ``response.output_audio.delta``
        event arrives (GA name), then returns decoded PCM16. The legacy
        ``response.audio.delta`` name is accepted defensively with a one-time
        warning. Transcript events update the instance's
        ``last_user_transcript`` / ``last_agent_transcript`` attributes.
        An ``error`` event raises a ``RuntimeError``. All other housekeeping
        events are ignored and the loop continues.

        Raises:
            asyncio.TimeoutError: if no audio arrives within ``timeout``.
            RuntimeError: if the server sends an error event.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # If send_audio was called since last recv, commit and request response.
        if self._pending_audio_bytes > 0:
            await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
            await self._ws.send(json.dumps({"type": "response.create"}))
            self._pending_audio_bytes = 0

        deadline = asyncio.get_running_loop().time() + timeout
        while True:
            remaining = deadline - asyncio.get_running_loop().time()
            if remaining <= 0:
                raise asyncio.TimeoutError(
                    "OpenAIRealtimeAgentAdapter: recv_audio timed out"
                )

            raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            try:
                event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
            except Exception:
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: non-JSON message, skipping"
                )
                continue

            etype = event.get("type", "")

            if etype in ("response.output_audio.delta", "response.audio.delta"):
                # Accept both the GA event name and its retired beta alias —
                # live gpt-realtime* models have been observed still emitting
                # the beta names. These legacy arms should be removed once the
                # GA names are confirmed stable at a live endpoint (issue #602).
                self._warn_if_legacy(etype, "response.output_audio.delta")
                b64 = event.get("delta", "")
                pcm = base64.b64decode(b64)
                # Enforce PCM16 invariant: even byte count.
                if len(pcm) % 2 == 1:
                    pcm = pcm[:-1]
                return AudioChunk(data=pcm)

            elif etype in (
                "response.output_audio_transcript.delta",
                "response.audio_transcript.delta",
            ):
                # Accumulate streaming agent transcript.
                self._warn_if_legacy(etype, "response.output_audio_transcript.delta")
                self._agent_transcript_buf += event.get("delta", "")

            elif etype in (
                "response.output_audio_transcript.done",
                "response.audio_transcript.done",
            ):
                # Finalise; the `transcript` field may have the full text.
                self._warn_if_legacy(etype, "response.output_audio_transcript.done")
                transcript = event.get("transcript", "")
                if transcript:
                    self.last_agent_transcript = transcript
                elif self._agent_transcript_buf:
                    self.last_agent_transcript = self._agent_transcript_buf
                self._agent_transcript_buf = ""

            elif etype == "conversation.item.input_audio_transcription.completed":
                # User-side transcript from Whisper.
                self.last_user_transcript = event.get("transcript", "")

            elif etype == "error":
                error_detail = event.get("error", {})
                msg = error_detail.get("message", str(error_detail))
                raise RuntimeError(
                    f"OpenAIRealtimeAgentAdapter: server error — {msg}"
                )

            else:
                # Housekeeping events — session.created, session.updated,
                # response.created, response.output_item.added, etc. — are
                # benign. Log at DEBUG and keep the loop running.
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype
                )

    async def send_text(self, text: str) -> None:
        """
        Inject scripted text into the realtime session as a user message.

        Used when this adapter is the user simulator (role=USER): scripted
        ``user("text")`` steps route through here instead of spawning TTS.
        The model synthesises the text into spoken audio with natural prosody,
        which is then delivered via ``recv_audio``.

        NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio
        messages retroactively; the downstream transcript reflects what the
        model actually emitted, not what was scripted.

        Raises:
            RuntimeError: if called before ``connect()``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # Create a user conversation item with the scripted text.
        await self._ws.send(
            json.dumps(
                {
                    "type": "conversation.item.create",
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": [{"type": "input_text", "text": text}],
                    },
                }
            )
        )
        # Prompt the model to generate audio output.
        await self._ws.send(json.dumps({"type": "response.create"}))
        logger.debug(
            "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60]
        )

Classes

class OpenAIRealtimeAgentAdapter (model: str = 'gpt-realtime-mini', voice: str = 'alloy', instructions: str = '', tools: Optional[List[Any]] = None, *, api_key: Optional[str] = None, role: AgentRole = AgentRole.AGENT)

Exercise OpenAI's Realtime API as either the agent under test (role=AGENT, default) or as the voice-enabled user simulator (role=USER, per §7.2 L1164-1171).

When role=USER, scripted user("text") steps route text through the realtime session's text-input channel rather than triggering TTS.

Transcript observability: - last_user_transcript — set from conversation.item.input_audio_transcription.completed - last_agent_transcript — accumulated from response.output_audio_transcript.delta / reset on done

Example::

adapter = OpenAIRealtimeAgentAdapter(
    model=OPENAI_REALTIME_MODEL,
    voice="alloy",
    instructions="You are a helpful assistant.",
)
async with adapter:
    # scenario.run() feeds send_audio / recv_audio ...
Expand source code
class OpenAIRealtimeAgentAdapter(VoiceAgentAdapter):
    """
    Exercise OpenAI's Realtime API as either the agent under test
    (role=AGENT, default) or as the voice-enabled user simulator
    (role=USER, per §7.2 L1164-1171).

    When role=USER, scripted ``user("text")`` steps route text through the
    realtime session's text-input channel rather than triggering TTS.

    Transcript observability:
        - ``last_user_transcript`` — set from
          ``conversation.item.input_audio_transcription.completed``
        - ``last_agent_transcript`` — accumulated from
          ``response.output_audio_transcript.delta`` / reset on done

    Example::

        adapter = OpenAIRealtimeAgentAdapter(
            model=OPENAI_REALTIME_MODEL,
            voice="alloy",
            instructions="You are a helpful assistant.",
        )
        async with adapter:
            # scenario.run() feeds send_audio / recv_audio ...
    """

    capabilities: ClassVar[AdapterCapabilities] = AdapterCapabilities(
        streaming_transcripts=True,
        native_vad=True,
        dtmf=False,
        # OpenAI Realtime exposes ``response.cancel`` as a first-class
        # interrupt event — the model stops generating immediately. Mapped
        # below in ``interrupt()``.
        interruption=True,
        input_formats=["pcm16/24000"],
        output_formats=["pcm16/24000"],
    )

    def __init__(
        self,
        model: str = OPENAI_REALTIME_MODEL,
        voice: str = "alloy",
        instructions: str = "",
        tools: Optional[List[Any]] = None,
        *,
        api_key: Optional[str] = None,
        role: AgentRole = AgentRole.AGENT,
    ):
        super().__init__()
        self.model = model
        self.voice = voice
        self.instructions = instructions
        self.tools = tools or []
        self.role = role  # type: ignore[misc]
        # Resolve API key: explicit param takes precedence over env var.
        self._api_key: str = api_key or os.environ.get("OPENAI_API_KEY", "")
        self._ws: Any = None

        # Transcript observability — updated on incoming transcript events.
        self.last_user_transcript: Optional[str] = None
        self.last_agent_transcript: Optional[str] = None

        # Accumulation buffer for streaming agent transcript deltas.
        self._agent_transcript_buf: str = ""

        # Bytes appended to input_audio_buffer since last commit. Non-zero
        # means recv_audio should commit + request a response before awaiting.
        self._pending_audio_bytes: int = 0

        # Tracks which legacy (pre-GA) event names have already triggered a
        # one-time warning, so the log isn't spammed on every audio frame.
        self._legacy_events_warned: set[str] = set()

    @property
    def url(self) -> str:
        return REALTIME_URL_TEMPLATE.format(model=self.model)

    def __repr__(self) -> str:  # redact credentials
        return (
            f"OpenAIRealtimeAgentAdapter("
            f"model={self.model!r}, "
            f"voice={self.voice!r}, "
            f"role={self.role!r}, "
            f"api_key='***')"
        )

    def _warn_if_legacy(self, received: str, ga_name: str) -> None:
        """Emit a one-time WARNING when a pre-GA (beta) event name is seen.

        Only fires once per distinct legacy name per adapter instance, so a
        multi-chunk audio response doesn't flood the log. The GA-named event
        itself never triggers a warning.
        """
        if received != ga_name and received not in self._legacy_events_warned:
            self._legacy_events_warned.add(received)
            logger.warning(
                "OpenAIRealtimeAgentAdapter: received legacy event %r; "
                "GA uses %r — accepting defensively",
                received,
                ga_name,
            )

    # ------------------------------------------------------------------ lifecycle

    async def connect(self) -> None:
        """Open the Realtime WebSocket and send the initial session.update."""
        import websockets

        self._ws = await websockets.connect(
            self.url,
            additional_headers={
                "Authorization": f"Bearer {self._api_key}",
            },
        )
        logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url)

        # Configure session: audio formats, voice, instructions, tools.
        # Disable server-side VAD (session.audio.input.turn_detection=None) so
        # we control turn boundaries explicitly via input_audio_buffer.commit +
        # response.create after each send_audio.
        session_config: dict[str, Any] = {
            "type": "realtime",
            "audio": {
                "input": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "turn_detection": None,
                    "transcription": {"model": OPENAI_STT_MODEL},
                },
                "output": {
                    "format": {"type": "audio/pcm", "rate": 24000},
                    "voice": self.voice,
                },
            },
        }
        if self.instructions:
            session_config["instructions"] = self.instructions
        if self.tools:
            session_config["tools"] = self.tools

        await self._ws.send(
            json.dumps({"type": "session.update", "session": session_config})
        )
        logger.debug("OpenAIRealtimeAgentAdapter: session.update sent")

    async def disconnect(self) -> None:
        """Close the WebSocket if open."""
        if self._ws is not None:
            try:
                await self._ws.close()
            except Exception:
                # Best-effort: connection may already be half-closed or in an
                # error state when disconnect() is called. We're tearing down
                # regardless — propagating here would just leak the WS reference.
                pass
            finally:
                self._ws = None
            logger.debug("OpenAIRealtimeAgentAdapter: disconnected")

    # ------------------------------------------------------------------ I/O

    async def send_audio(self, chunk: AudioChunk) -> None:
        """
        Append a PCM16 audio chunk to the model's input audio buffer.

        Only emits ``input_audio_buffer.append`` — the commit + response are
        deferred to the next ``recv_audio`` call. The scenario executor may
        call ``send_audio`` many times for a single user turn (TTS streams
        audio as chunks); committing per-chunk would confuse the server with
        sub-second turn boundaries. By deferring commit to recv_audio, we
        get one server turn per user turn.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        b64 = base64.b64encode(chunk.data).decode()
        await self._ws.send(
            json.dumps({"type": "input_audio_buffer.append", "audio": b64})
        )
        self._pending_audio_bytes += len(chunk.data)

    async def interrupt(self) -> None:
        """Send ``response.cancel`` — the OpenAI Realtime API's first-class
        interrupt. The model stops generating audio and text immediately.
        No timing race against VAD: deterministic stop, then the next user
        turn flows normally through ``send_audio`` + ``recv_audio``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
        await self._ws.send(json.dumps({"type": "response.cancel"}))
        logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)")

    async def recv_audio(self, timeout: float) -> AudioChunk:
        """
        Commit any pending audio, request a response, and return the first
        audio chunk the model produces.

        If ``send_audio`` was called since the last ``recv_audio``, this
        method commits the buffer and emits ``response.create`` before
        awaiting the reply. Subsequent recv calls without new send calls
        just await the next audio delta (for multi-chunk responses).

        Loops over incoming events until a ``response.output_audio.delta``
        event arrives (GA name), then returns decoded PCM16. The legacy
        ``response.audio.delta`` name is accepted defensively with a one-time
        warning. Transcript events update the instance's
        ``last_user_transcript`` / ``last_agent_transcript`` attributes.
        An ``error`` event raises a ``RuntimeError``. All other housekeeping
        events are ignored and the loop continues.

        Raises:
            asyncio.TimeoutError: if no audio arrives within ``timeout``.
            RuntimeError: if the server sends an error event.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # If send_audio was called since last recv, commit and request response.
        if self._pending_audio_bytes > 0:
            await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
            await self._ws.send(json.dumps({"type": "response.create"}))
            self._pending_audio_bytes = 0

        deadline = asyncio.get_running_loop().time() + timeout
        while True:
            remaining = deadline - asyncio.get_running_loop().time()
            if remaining <= 0:
                raise asyncio.TimeoutError(
                    "OpenAIRealtimeAgentAdapter: recv_audio timed out"
                )

            raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
            try:
                event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
            except Exception:
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: non-JSON message, skipping"
                )
                continue

            etype = event.get("type", "")

            if etype in ("response.output_audio.delta", "response.audio.delta"):
                # Accept both the GA event name and its retired beta alias —
                # live gpt-realtime* models have been observed still emitting
                # the beta names. These legacy arms should be removed once the
                # GA names are confirmed stable at a live endpoint (issue #602).
                self._warn_if_legacy(etype, "response.output_audio.delta")
                b64 = event.get("delta", "")
                pcm = base64.b64decode(b64)
                # Enforce PCM16 invariant: even byte count.
                if len(pcm) % 2 == 1:
                    pcm = pcm[:-1]
                return AudioChunk(data=pcm)

            elif etype in (
                "response.output_audio_transcript.delta",
                "response.audio_transcript.delta",
            ):
                # Accumulate streaming agent transcript.
                self._warn_if_legacy(etype, "response.output_audio_transcript.delta")
                self._agent_transcript_buf += event.get("delta", "")

            elif etype in (
                "response.output_audio_transcript.done",
                "response.audio_transcript.done",
            ):
                # Finalise; the `transcript` field may have the full text.
                self._warn_if_legacy(etype, "response.output_audio_transcript.done")
                transcript = event.get("transcript", "")
                if transcript:
                    self.last_agent_transcript = transcript
                elif self._agent_transcript_buf:
                    self.last_agent_transcript = self._agent_transcript_buf
                self._agent_transcript_buf = ""

            elif etype == "conversation.item.input_audio_transcription.completed":
                # User-side transcript from Whisper.
                self.last_user_transcript = event.get("transcript", "")

            elif etype == "error":
                error_detail = event.get("error", {})
                msg = error_detail.get("message", str(error_detail))
                raise RuntimeError(
                    f"OpenAIRealtimeAgentAdapter: server error — {msg}"
                )

            else:
                # Housekeeping events — session.created, session.updated,
                # response.created, response.output_item.added, etc. — are
                # benign. Log at DEBUG and keep the loop running.
                logger.debug(
                    "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype
                )

    async def send_text(self, text: str) -> None:
        """
        Inject scripted text into the realtime session as a user message.

        Used when this adapter is the user simulator (role=USER): scripted
        ``user("text")`` steps route through here instead of spawning TTS.
        The model synthesises the text into spoken audio with natural prosody,
        which is then delivered via ``recv_audio``.

        NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio
        messages retroactively; the downstream transcript reflects what the
        model actually emitted, not what was scripted.

        Raises:
            RuntimeError: if called before ``connect()``.
        """
        if self._ws is None:
            raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

        # Create a user conversation item with the scripted text.
        await self._ws.send(
            json.dumps(
                {
                    "type": "conversation.item.create",
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": [{"type": "input_text", "text": text}],
                    },
                }
            )
        )
        # Prompt the model to generate audio output.
        await self._ws.send(json.dumps({"type": "response.create"}))
        logger.debug(
            "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60]
        )

Ancestors

Class variables

var capabilities : ClassVar[AdapterCapabilities]

Instance variables

var url : str
Expand source code
@property
def url(self) -> str:
    return REALTIME_URL_TEMPLATE.format(model=self.model)

Methods

async def connect(self) ‑> None

Open the Realtime WebSocket and send the initial session.update.

Expand source code
async def connect(self) -> None:
    """Open the Realtime WebSocket and send the initial session.update."""
    import websockets

    self._ws = await websockets.connect(
        self.url,
        additional_headers={
            "Authorization": f"Bearer {self._api_key}",
        },
    )
    logger.debug("OpenAIRealtimeAgentAdapter: connected to %s", self.url)

    # Configure session: audio formats, voice, instructions, tools.
    # Disable server-side VAD (session.audio.input.turn_detection=None) so
    # we control turn boundaries explicitly via input_audio_buffer.commit +
    # response.create after each send_audio.
    session_config: dict[str, Any] = {
        "type": "realtime",
        "audio": {
            "input": {
                "format": {"type": "audio/pcm", "rate": 24000},
                "turn_detection": None,
                "transcription": {"model": OPENAI_STT_MODEL},
            },
            "output": {
                "format": {"type": "audio/pcm", "rate": 24000},
                "voice": self.voice,
            },
        },
    }
    if self.instructions:
        session_config["instructions"] = self.instructions
    if self.tools:
        session_config["tools"] = self.tools

    await self._ws.send(
        json.dumps({"type": "session.update", "session": session_config})
    )
    logger.debug("OpenAIRealtimeAgentAdapter: session.update sent")
async def disconnect(self) ‑> None

Close the WebSocket if open.

Expand source code
async def disconnect(self) -> None:
    """Close the WebSocket if open."""
    if self._ws is not None:
        try:
            await self._ws.close()
        except Exception:
            # Best-effort: connection may already be half-closed or in an
            # error state when disconnect() is called. We're tearing down
            # regardless — propagating here would just leak the WS reference.
            pass
        finally:
            self._ws = None
        logger.debug("OpenAIRealtimeAgentAdapter: disconnected")
async def interrupt(self) ‑> None

Send response.cancel — the OpenAI Realtime API's first-class interrupt. The model stops generating audio and text immediately. No timing race against VAD: deterministic stop, then the next user turn flows normally through send_audio + recv_audio.

Expand source code
async def interrupt(self) -> None:
    """Send ``response.cancel`` — the OpenAI Realtime API's first-class
    interrupt. The model stops generating audio and text immediately.
    No timing race against VAD: deterministic stop, then the next user
    turn flows normally through ``send_audio`` + ``recv_audio``.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
    await self._ws.send(json.dumps({"type": "response.cancel"}))
    logger.debug("OpenAIRealtimeAgentAdapter: sent response.cancel (interrupt)")
async def recv_audio(self, timeout: float) ‑> AudioChunk

Commit any pending audio, request a response, and return the first audio chunk the model produces.

If send_audio was called since the last recv_audio, this method commits the buffer and emits response.create before awaiting the reply. Subsequent recv calls without new send calls just await the next audio delta (for multi-chunk responses).

Loops over incoming events until a response.output_audio.delta event arrives (GA name), then returns decoded PCM16. The legacy response.audio.delta name is accepted defensively with a one-time warning. Transcript events update the instance's last_user_transcript / last_agent_transcript attributes. An error event raises a RuntimeError. All other housekeeping events are ignored and the loop continues.

Raises

asyncio.TimeoutError
if no audio arrives within timeout.
RuntimeError
if the server sends an error event.
Expand source code
async def recv_audio(self, timeout: float) -> AudioChunk:
    """
    Commit any pending audio, request a response, and return the first
    audio chunk the model produces.

    If ``send_audio`` was called since the last ``recv_audio``, this
    method commits the buffer and emits ``response.create`` before
    awaiting the reply. Subsequent recv calls without new send calls
    just await the next audio delta (for multi-chunk responses).

    Loops over incoming events until a ``response.output_audio.delta``
    event arrives (GA name), then returns decoded PCM16. The legacy
    ``response.audio.delta`` name is accepted defensively with a one-time
    warning. Transcript events update the instance's
    ``last_user_transcript`` / ``last_agent_transcript`` attributes.
    An ``error`` event raises a ``RuntimeError``. All other housekeeping
    events are ignored and the loop continues.

    Raises:
        asyncio.TimeoutError: if no audio arrives within ``timeout``.
        RuntimeError: if the server sends an error event.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

    # If send_audio was called since last recv, commit and request response.
    if self._pending_audio_bytes > 0:
        await self._ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
        await self._ws.send(json.dumps({"type": "response.create"}))
        self._pending_audio_bytes = 0

    deadline = asyncio.get_running_loop().time() + timeout
    while True:
        remaining = deadline - asyncio.get_running_loop().time()
        if remaining <= 0:
            raise asyncio.TimeoutError(
                "OpenAIRealtimeAgentAdapter: recv_audio timed out"
            )

        raw = await asyncio.wait_for(self._ws.recv(), timeout=remaining)
        try:
            event = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode())
        except Exception:
            logger.debug(
                "OpenAIRealtimeAgentAdapter: non-JSON message, skipping"
            )
            continue

        etype = event.get("type", "")

        if etype in ("response.output_audio.delta", "response.audio.delta"):
            # Accept both the GA event name and its retired beta alias —
            # live gpt-realtime* models have been observed still emitting
            # the beta names. These legacy arms should be removed once the
            # GA names are confirmed stable at a live endpoint (issue #602).
            self._warn_if_legacy(etype, "response.output_audio.delta")
            b64 = event.get("delta", "")
            pcm = base64.b64decode(b64)
            # Enforce PCM16 invariant: even byte count.
            if len(pcm) % 2 == 1:
                pcm = pcm[:-1]
            return AudioChunk(data=pcm)

        elif etype in (
            "response.output_audio_transcript.delta",
            "response.audio_transcript.delta",
        ):
            # Accumulate streaming agent transcript.
            self._warn_if_legacy(etype, "response.output_audio_transcript.delta")
            self._agent_transcript_buf += event.get("delta", "")

        elif etype in (
            "response.output_audio_transcript.done",
            "response.audio_transcript.done",
        ):
            # Finalise; the `transcript` field may have the full text.
            self._warn_if_legacy(etype, "response.output_audio_transcript.done")
            transcript = event.get("transcript", "")
            if transcript:
                self.last_agent_transcript = transcript
            elif self._agent_transcript_buf:
                self.last_agent_transcript = self._agent_transcript_buf
            self._agent_transcript_buf = ""

        elif etype == "conversation.item.input_audio_transcription.completed":
            # User-side transcript from Whisper.
            self.last_user_transcript = event.get("transcript", "")

        elif etype == "error":
            error_detail = event.get("error", {})
            msg = error_detail.get("message", str(error_detail))
            raise RuntimeError(
                f"OpenAIRealtimeAgentAdapter: server error — {msg}"
            )

        else:
            # Housekeeping events — session.created, session.updated,
            # response.created, response.output_item.added, etc. — are
            # benign. Log at DEBUG and keep the loop running.
            logger.debug(
                "OpenAIRealtimeAgentAdapter: ignoring event type %r", etype
            )
async def send_audio(self, chunk: AudioChunk) ‑> None

Append a PCM16 audio chunk to the model's input audio buffer.

Only emits input_audio_buffer.append — the commit + response are deferred to the next recv_audio call. The scenario executor may call send_audio many times for a single user turn (TTS streams audio as chunks); committing per-chunk would confuse the server with sub-second turn boundaries. By deferring commit to recv_audio, we get one server turn per user turn.

Expand source code
async def send_audio(self, chunk: AudioChunk) -> None:
    """
    Append a PCM16 audio chunk to the model's input audio buffer.

    Only emits ``input_audio_buffer.append`` — the commit + response are
    deferred to the next ``recv_audio`` call. The scenario executor may
    call ``send_audio`` many times for a single user turn (TTS streams
    audio as chunks); committing per-chunk would confuse the server with
    sub-second turn boundaries. By deferring commit to recv_audio, we
    get one server turn per user turn.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")
    b64 = base64.b64encode(chunk.data).decode()
    await self._ws.send(
        json.dumps({"type": "input_audio_buffer.append", "audio": b64})
    )
    self._pending_audio_bytes += len(chunk.data)
async def send_text(self, text: str) ‑> None

Inject scripted text into the realtime session as a user message.

Used when this adapter is the user simulator (role=USER): scripted user("text") steps route through here instead of spawning TTS. The model synthesises the text into spoken audio with natural prosody, which is then delivered via recv_audio.

NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio messages retroactively; the downstream transcript reflects what the model actually emitted, not what was scripted.

Raises

RuntimeError
if called before connect().
Expand source code
async def send_text(self, text: str) -> None:
    """
    Inject scripted text into the realtime session as a user message.

    Used when this adapter is the user simulator (role=USER): scripted
    ``user("text")`` steps route through here instead of spawning TTS.
    The model synthesises the text into spoken audio with natural prosody,
    which is then delivered via ``recv_audio``.

    NOTE: per §7.2, OpenAI Realtime cannot populate assistant audio
    messages retroactively; the downstream transcript reflects what the
    model actually emitted, not what was scripted.

    Raises:
        RuntimeError: if called before ``connect()``.
    """
    if self._ws is None:
        raise RuntimeError("OpenAIRealtimeAgentAdapter: not connected")

    # Create a user conversation item with the scripted text.
    await self._ws.send(
        json.dumps(
            {
                "type": "conversation.item.create",
                "item": {
                    "type": "message",
                    "role": "user",
                    "content": [{"type": "input_text", "text": text}],
                },
            }
        )
    )
    # Prompt the model to generate audio output.
    await self._ws.send(json.dumps({"type": "response.create"}))
    logger.debug(
        "OpenAIRealtimeAgentAdapter: send_text injected %r", text[:60]
    )

Inherited members