Module scenario.voice.testing

Test-harness helpers for voice adapters that need public HTTP/S endpoints (webhooks, WebSockets) — specifically TwilioAgentAdapter.

Not imported by default from scenario.voice. Users opt-in via from scenario.voice.testing import CloudflareTunnel, TwilioHarness.

Expand source code
"""
Test-harness helpers for voice adapters that need public HTTP/S endpoints
(webhooks, WebSockets) — specifically ``TwilioAgentAdapter``.

Not imported by default from ``scenario.voice``. Users opt-in via
``from scenario.voice.testing import CloudflareTunnel, TwilioHarness``.
"""

from __future__ import annotations

from .tunnel import CloudflareTunnel, TunnelUnavailableError
from .twilio_harness import TwilioHarness

__all__ = ["CloudflareTunnel", "TunnelUnavailableError", "TwilioHarness"]

Sub-modules

scenario.voice.testing.tunnel

Code-managed cloudflared "quick tunnel" for local webhook+WebSocket testing …

scenario.voice.testing.twilio_harness

Twilio smoke-test harness: compose CloudflareTunnel + TwilioAgentAdapter setup/teardown into one async context manager …

Classes

class CloudflareTunnel (port: int, *, startup_timeout_s: float = 20.0, edge_ready_timeout_s: float = 300.0)

Async context manager that spawns a cloudflared quick tunnel and yields its public URL.

The public URL is available as self.public_url after __aenter__.

Expand source code
class CloudflareTunnel:
    """
    Async context manager that spawns a cloudflared quick tunnel and yields
    its public URL.

    The public URL is available as ``self.public_url`` after ``__aenter__``.
    """

    def __init__(
        self,
        port: int,
        *,
        startup_timeout_s: float = 20.0,
        # 300s accommodates slow Cloudflare quick-tunnel DNS propagation
        # (trycloudflare.com has no SLA; propagation can exceed 2 minutes).
        edge_ready_timeout_s: float = 300.0,
    ) -> None:
        self.port = port
        self.startup_timeout_s = startup_timeout_s
        self.edge_ready_timeout_s = edge_ready_timeout_s
        self.public_url: Optional[str] = None
        self._proc: Optional[asyncio.subprocess.Process] = None

    async def __aenter__(self) -> "CloudflareTunnel":
        if shutil.which("cloudflared") is None:
            raise TunnelUnavailableError(_INSTALL_INSTRUCTIONS)

        self._proc = await asyncio.create_subprocess_exec(
            "cloudflared",
            "tunnel",
            "--url",
            f"http://localhost:{self.port}",
            "--no-autoupdate",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.STDOUT,
        )

        try:
            self.public_url = await asyncio.wait_for(
                self._read_url(), timeout=self.startup_timeout_s
            )
        except asyncio.TimeoutError:
            await self._terminate()
            raise TunnelUnavailableError(
                f"cloudflared did not announce a trycloudflare.com URL within "
                f"{self.startup_timeout_s}s. Check `cloudflared` output for errors."
            )

        logger.debug("cloudflared quick tunnel announced %s → localhost:%d", self.public_url, self.port)
        return self

    async def wait_until_edge_reachable(self, *, timeout_s: Optional[float] = None) -> None:
        """Poll until the tunnel URL is globally reachable.

        Cloudflared announces the URL as soon as its side connects to
        Cloudflare's network, but DNS + edge caching can take additional
        seconds before the URL is actually reachable from the public
        internet. Without this wait, Twilio's TwiML fetch races and
        silently fails (call completed, duration 0, no notification).

        Resolution strategy: race multiple resolvers, return on the first
        one that says "this host resolves." Empirically, both
        Cloudflare DoH and the system resolver have been observed to
        lag the other on freshly-minted trycloudflare hostnames; either
        can be authoritative on a given day. We need ANY resolver to
        say yes, because Twilio's edge will hit one of these paths too.

        Resolvers tried each tick:
        - Cloudflare DoH (1.1.1.1) — historically the canonical signal.
        - Google DoH (dns.google) — backup public resolver.
        - System getaddrinfo — what local DNS sees; fastest when
          Cloudflare DoH is the laggard (observed 2026-05-13).

        Caller is responsible for starting the local HTTP server *before*
        calling this — otherwise the HTTP probe sees 502 from Cloudflare
        (origin down). When used via ``TwilioHarness``, the harness
        wires this correctly.
        """
        assert self.public_url is not None
        deadline = asyncio.get_running_loop().time() + (timeout_s or self.edge_ready_timeout_s)
        last_error: Optional[str] = None
        host = self.public_url.replace("https://", "").replace("http://", "").rstrip("/").split("/")[0]

        async def _via_doh(client: httpx.AsyncClient, url: str) -> Optional[str]:
            r = await client.get(
                url,
                params={"name": host, "type": "A"},
                headers={"accept": "application/dns-json"},
            )
            data = r.json()
            if data.get("Status") == 0 and data.get("Answer"):
                return data["Answer"][0].get("data")
            return None

        async def _via_system() -> Optional[str]:
            try:
                infos = await asyncio.get_running_loop().getaddrinfo(
                    host, None, family=0, type=0, proto=0, flags=0
                )
                if infos:
                    return infos[0][4][0]
                return None
            except Exception:
                return None

        async with httpx.AsyncClient(timeout=5.0) as doh_client:
            while asyncio.get_running_loop().time() < deadline:
                # Race three resolvers — succeed if ANY say "resolved" within this tick.
                tasks = [
                    asyncio.create_task(_via_doh(doh_client, "https://cloudflare-dns.com/dns-query")),
                    asyncio.create_task(_via_doh(doh_client, "https://dns.google/resolve")),
                    asyncio.create_task(_via_system()),
                ]
                # Wait for ALL to complete (or timeout) — fast resolvers
                # returning None must not cause us to give up before slower
                # resolvers (which may have the answer) finish.
                done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED, timeout=5.0)
                resolved_ip: Optional[str] = None
                fail_reasons: list[str] = []
                for t in done:
                    try:
                        result = t.result()
                        if result:
                            resolved_ip = result
                            break
                        fail_reasons.append("no answer")
                    except Exception as e:
                        fail_reasons.append(f"{type(e).__name__}: {e}")
                for p in pending:
                    p.cancel()
                if resolved_ip:
                    logger.debug(
                        "cloudflared tunnel %s resolves globally (A=%s)",
                        host, resolved_ip,
                    )
                    return
                last_error = ", ".join(fail_reasons) or "no resolver responded"
                await asyncio.sleep(1.0)

        raise TunnelUnavailableError(
            f"cloudflared tunnel {self.public_url} did not become globally "
            f"resolvable within {timeout_s or self.edge_ready_timeout_s}s. "
            f"last_error={last_error}"
        )

    async def __aexit__(self, exc_type, exc, tb) -> None:
        await self._terminate()

    async def _read_url(self) -> str:
        assert self._proc is not None and self._proc.stdout is not None
        while True:
            line_bytes = await self._proc.stdout.readline()
            if not line_bytes:
                raise TunnelUnavailableError(
                    "cloudflared exited before announcing a tunnel URL."
                )
            line = line_bytes.decode("utf-8", errors="replace")
            match = _TRYCLOUDFLARE_RE.search(line)
            if match:
                return match.group(0)

    async def _terminate(self) -> None:
        if self._proc is None or self._proc.returncode is not None:
            return
        try:
            self._proc.send_signal(signal.SIGTERM)
            try:
                await asyncio.wait_for(self._proc.wait(), timeout=3.0)
            except asyncio.TimeoutError:
                logger.debug("cloudflared did not exit on SIGTERM; sending SIGKILL")
                self._proc.kill()
                await self._proc.wait()
        except ProcessLookupError:
            # Process already exited between the returncode check above and
            # our send_signal; nothing to clean up.
            pass
        finally:
            self._proc = None

Methods

async def wait_until_edge_reachable(self, *, timeout_s: Optional[float] = None) ‑> None

Poll until the tunnel URL is globally reachable.

Cloudflared announces the URL as soon as its side connects to Cloudflare's network, but DNS + edge caching can take additional seconds before the URL is actually reachable from the public internet. Without this wait, Twilio's TwiML fetch races and silently fails (call completed, duration 0, no notification).

Resolution strategy: race multiple resolvers, return on the first one that says "this host resolves." Empirically, both Cloudflare DoH and the system resolver have been observed to lag the other on freshly-minted trycloudflare hostnames; either can be authoritative on a given day. We need ANY resolver to say yes, because Twilio's edge will hit one of these paths too.

Resolvers tried each tick: - Cloudflare DoH (1.1.1.1) — historically the canonical signal. - Google DoH (dns.google) — backup public resolver. - System getaddrinfo — what local DNS sees; fastest when Cloudflare DoH is the laggard (observed 2026-05-13).

Caller is responsible for starting the local HTTP server before calling this — otherwise the HTTP probe sees 502 from Cloudflare (origin down). When used via TwilioHarness, the harness wires this correctly.

Expand source code
async def wait_until_edge_reachable(self, *, timeout_s: Optional[float] = None) -> None:
    """Poll until the tunnel URL is globally reachable.

    Cloudflared announces the URL as soon as its side connects to
    Cloudflare's network, but DNS + edge caching can take additional
    seconds before the URL is actually reachable from the public
    internet. Without this wait, Twilio's TwiML fetch races and
    silently fails (call completed, duration 0, no notification).

    Resolution strategy: race multiple resolvers, return on the first
    one that says "this host resolves." Empirically, both
    Cloudflare DoH and the system resolver have been observed to
    lag the other on freshly-minted trycloudflare hostnames; either
    can be authoritative on a given day. We need ANY resolver to
    say yes, because Twilio's edge will hit one of these paths too.

    Resolvers tried each tick:
    - Cloudflare DoH (1.1.1.1) — historically the canonical signal.
    - Google DoH (dns.google) — backup public resolver.
    - System getaddrinfo — what local DNS sees; fastest when
      Cloudflare DoH is the laggard (observed 2026-05-13).

    Caller is responsible for starting the local HTTP server *before*
    calling this — otherwise the HTTP probe sees 502 from Cloudflare
    (origin down). When used via ``TwilioHarness``, the harness
    wires this correctly.
    """
    assert self.public_url is not None
    deadline = asyncio.get_running_loop().time() + (timeout_s or self.edge_ready_timeout_s)
    last_error: Optional[str] = None
    host = self.public_url.replace("https://", "").replace("http://", "").rstrip("/").split("/")[0]

    async def _via_doh(client: httpx.AsyncClient, url: str) -> Optional[str]:
        r = await client.get(
            url,
            params={"name": host, "type": "A"},
            headers={"accept": "application/dns-json"},
        )
        data = r.json()
        if data.get("Status") == 0 and data.get("Answer"):
            return data["Answer"][0].get("data")
        return None

    async def _via_system() -> Optional[str]:
        try:
            infos = await asyncio.get_running_loop().getaddrinfo(
                host, None, family=0, type=0, proto=0, flags=0
            )
            if infos:
                return infos[0][4][0]
            return None
        except Exception:
            return None

    async with httpx.AsyncClient(timeout=5.0) as doh_client:
        while asyncio.get_running_loop().time() < deadline:
            # Race three resolvers — succeed if ANY say "resolved" within this tick.
            tasks = [
                asyncio.create_task(_via_doh(doh_client, "https://cloudflare-dns.com/dns-query")),
                asyncio.create_task(_via_doh(doh_client, "https://dns.google/resolve")),
                asyncio.create_task(_via_system()),
            ]
            # Wait for ALL to complete (or timeout) — fast resolvers
            # returning None must not cause us to give up before slower
            # resolvers (which may have the answer) finish.
            done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED, timeout=5.0)
            resolved_ip: Optional[str] = None
            fail_reasons: list[str] = []
            for t in done:
                try:
                    result = t.result()
                    if result:
                        resolved_ip = result
                        break
                    fail_reasons.append("no answer")
                except Exception as e:
                    fail_reasons.append(f"{type(e).__name__}: {e}")
            for p in pending:
                p.cancel()
            if resolved_ip:
                logger.debug(
                    "cloudflared tunnel %s resolves globally (A=%s)",
                    host, resolved_ip,
                )
                return
            last_error = ", ".join(fail_reasons) or "no resolver responded"
            await asyncio.sleep(1.0)

    raise TunnelUnavailableError(
        f"cloudflared tunnel {self.public_url} did not become globally "
        f"resolvable within {timeout_s or self.edge_ready_timeout_s}s. "
        f"last_error={last_error}"
    )
class TunnelUnavailableError (*args, **kwargs)

Raised when cloudflared is missing or the tunnel URL never appears.

Expand source code
class TunnelUnavailableError(RuntimeError):
    """Raised when cloudflared is missing or the tunnel URL never appears."""

Ancestors

  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException
class TwilioHarness (*, account_sid: str, auth_token: str, phone_number: str, http_port: int = 8765, allowed_callers: Optional[list[str]] = None, on_dtmf: Optional[Callable[[str], None]] = None, validate_signature: bool = True)

Async context manager yielding a connected TwilioAgentAdapter.

The adapter's public_base_url is set from the cloudflared quick tunnel URL — no DNS or account setup required.

Expand source code
class TwilioHarness:
    """
    Async context manager yielding a connected ``TwilioAgentAdapter``.

    The adapter's ``public_base_url`` is set from the cloudflared quick
    tunnel URL — no DNS or account setup required.
    """

    def __init__(
        self,
        *,
        account_sid: str,
        auth_token: str,
        phone_number: str,
        http_port: int = 8765,
        allowed_callers: Optional[list[str]] = None,
        on_dtmf: Optional[Callable[[str], None]] = None,
        validate_signature: bool = True,
    ) -> None:
        self._account_sid = account_sid
        self._auth_token = auth_token
        self._phone_number = phone_number
        self._http_port = http_port
        self._allowed_callers = allowed_callers
        self._on_dtmf = on_dtmf
        self._validate_signature = validate_signature

        self._tunnel: Optional[CloudflareTunnel] = None
        self._adapter: Optional[TwilioAgentAdapter] = None

    async def __aenter__(self) -> TwilioAgentAdapter:
        self._tunnel = CloudflareTunnel(port=self._http_port)
        await self._tunnel.__aenter__()

        assert self._tunnel.public_url is not None
        self._adapter = TwilioAgentAdapter(
            account_sid=self._account_sid,
            auth_token=self._auth_token,
            phone_number=self._phone_number,
            public_base_url=self._tunnel.public_url,
            allowed_callers=self._allowed_callers,
            on_dtmf=self._on_dtmf,
            http_port=self._http_port,
            validate_signature=self._validate_signature,
        )
        try:
            await self._adapter.connect()
            # Now that the FastAPI server is bound on localhost, wait for the
            # Cloudflare edge to actually route inbound traffic to it. This
            # prevents the "Twilio fetched TwiML too early, got a 502, dropped
            # the call with duration=0" race that otherwise bites callers
            # placing outbound calls immediately after harness startup.
            await self._tunnel.wait_until_edge_reachable()
        except Exception:
            if self._adapter is not None:
                try:
                    await self._adapter.disconnect()
                except Exception:
                    # Startup-path cleanup is best-effort; a secondary
                    # disconnect error must not mask the original failure
                    # we're about to re-raise.
                    logger.exception("TwilioHarness: adapter.disconnect() during aborted startup raised")
            await self._tunnel.__aexit__(None, None, None)
            self._tunnel = None
            self._adapter = None
            raise

        # Don't leak full E.164 number into the workflow log (CI retains
        # for 14 days). Reuse the adapter's redactor — last-4 is enough
        # to identify which test number is in use.
        from ..adapters._twilio_shared import _redact_e164

        logger.info(
            "TwilioHarness ready — tunnel %s → localhost:%d, number %s",
            self._tunnel.public_url,
            self._http_port,
            _redact_e164(self._phone_number),
        )
        return self._adapter

    async def __aexit__(self, exc_type, exc, tb) -> None:
        # Adapter disconnect first (restores webhook), then tunnel teardown.
        if self._adapter is not None:
            try:
                await self._adapter.disconnect()
            except Exception:
                logger.exception("TwilioHarness: adapter.disconnect() raised")
        if self._tunnel is not None:
            try:
                await self._tunnel.__aexit__(exc_type, exc, tb)
            except Exception:
                logger.exception("TwilioHarness: tunnel teardown raised")
        self._adapter = None
        self._tunnel = None