Module scenario.voice.testing
Test-harness helpers for voice adapters that need public HTTP/S endpoints
(webhooks, WebSockets) — specifically TwilioAgentAdapter.
Not imported by default from scenario.voice. Users opt-in via
from scenario.voice.testing import CloudflareTunnel, TwilioHarness.
Expand source code
"""
Test-harness helpers for voice adapters that need public HTTP/S endpoints
(webhooks, WebSockets) — specifically ``TwilioAgentAdapter``.
Not imported by default from ``scenario.voice``. Users opt-in via
``from scenario.voice.testing import CloudflareTunnel, TwilioHarness``.
"""
from __future__ import annotations
from .tunnel import CloudflareTunnel, TunnelUnavailableError
from .twilio_harness import TwilioHarness
__all__ = ["CloudflareTunnel", "TunnelUnavailableError", "TwilioHarness"]
Sub-modules
scenario.voice.testing.tunnel-
Code-managed cloudflared "quick tunnel" for local webhook+WebSocket testing …
scenario.voice.testing.twilio_harness-
Twilio smoke-test harness: compose CloudflareTunnel + TwilioAgentAdapter setup/teardown into one async context manager …
Classes
class CloudflareTunnel (port: int, *, startup_timeout_s: float = 20.0, edge_ready_timeout_s: float = 300.0)-
Async context manager that spawns a cloudflared quick tunnel and yields its public URL.
The public URL is available as
self.public_urlafter__aenter__.Expand source code
class CloudflareTunnel: """ Async context manager that spawns a cloudflared quick tunnel and yields its public URL. The public URL is available as ``self.public_url`` after ``__aenter__``. """ def __init__( self, port: int, *, startup_timeout_s: float = 20.0, # 300s accommodates slow Cloudflare quick-tunnel DNS propagation # (trycloudflare.com has no SLA; propagation can exceed 2 minutes). edge_ready_timeout_s: float = 300.0, ) -> None: self.port = port self.startup_timeout_s = startup_timeout_s self.edge_ready_timeout_s = edge_ready_timeout_s self.public_url: Optional[str] = None self._proc: Optional[asyncio.subprocess.Process] = None async def __aenter__(self) -> "CloudflareTunnel": if shutil.which("cloudflared") is None: raise TunnelUnavailableError(_INSTALL_INSTRUCTIONS) self._proc = await asyncio.create_subprocess_exec( "cloudflared", "tunnel", "--url", f"http://localhost:{self.port}", "--no-autoupdate", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) try: self.public_url = await asyncio.wait_for( self._read_url(), timeout=self.startup_timeout_s ) except asyncio.TimeoutError: await self._terminate() raise TunnelUnavailableError( f"cloudflared did not announce a trycloudflare.com URL within " f"{self.startup_timeout_s}s. Check `cloudflared` output for errors." ) logger.debug("cloudflared quick tunnel announced %s → localhost:%d", self.public_url, self.port) return self async def wait_until_edge_reachable(self, *, timeout_s: Optional[float] = None) -> None: """Poll until the tunnel URL is globally reachable. Cloudflared announces the URL as soon as its side connects to Cloudflare's network, but DNS + edge caching can take additional seconds before the URL is actually reachable from the public internet. Without this wait, Twilio's TwiML fetch races and silently fails (call completed, duration 0, no notification). Resolution strategy: race multiple resolvers, return on the first one that says "this host resolves." Empirically, both Cloudflare DoH and the system resolver have been observed to lag the other on freshly-minted trycloudflare hostnames; either can be authoritative on a given day. We need ANY resolver to say yes, because Twilio's edge will hit one of these paths too. Resolvers tried each tick: - Cloudflare DoH (1.1.1.1) — historically the canonical signal. - Google DoH (dns.google) — backup public resolver. - System getaddrinfo — what local DNS sees; fastest when Cloudflare DoH is the laggard (observed 2026-05-13). Caller is responsible for starting the local HTTP server *before* calling this — otherwise the HTTP probe sees 502 from Cloudflare (origin down). When used via ``TwilioHarness``, the harness wires this correctly. """ assert self.public_url is not None deadline = asyncio.get_running_loop().time() + (timeout_s or self.edge_ready_timeout_s) last_error: Optional[str] = None host = self.public_url.replace("https://", "").replace("http://", "").rstrip("/").split("/")[0] async def _via_doh(client: httpx.AsyncClient, url: str) -> Optional[str]: r = await client.get( url, params={"name": host, "type": "A"}, headers={"accept": "application/dns-json"}, ) data = r.json() if data.get("Status") == 0 and data.get("Answer"): return data["Answer"][0].get("data") return None async def _via_system() -> Optional[str]: try: infos = await asyncio.get_running_loop().getaddrinfo( host, None, family=0, type=0, proto=0, flags=0 ) if infos: return infos[0][4][0] return None except Exception: return None async with httpx.AsyncClient(timeout=5.0) as doh_client: while asyncio.get_running_loop().time() < deadline: # Race three resolvers — succeed if ANY say "resolved" within this tick. tasks = [ asyncio.create_task(_via_doh(doh_client, "https://cloudflare-dns.com/dns-query")), asyncio.create_task(_via_doh(doh_client, "https://dns.google/resolve")), asyncio.create_task(_via_system()), ] # Wait for ALL to complete (or timeout) — fast resolvers # returning None must not cause us to give up before slower # resolvers (which may have the answer) finish. done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED, timeout=5.0) resolved_ip: Optional[str] = None fail_reasons: list[str] = [] for t in done: try: result = t.result() if result: resolved_ip = result break fail_reasons.append("no answer") except Exception as e: fail_reasons.append(f"{type(e).__name__}: {e}") for p in pending: p.cancel() if resolved_ip: logger.debug( "cloudflared tunnel %s resolves globally (A=%s)", host, resolved_ip, ) return last_error = ", ".join(fail_reasons) or "no resolver responded" await asyncio.sleep(1.0) raise TunnelUnavailableError( f"cloudflared tunnel {self.public_url} did not become globally " f"resolvable within {timeout_s or self.edge_ready_timeout_s}s. " f"last_error={last_error}" ) async def __aexit__(self, exc_type, exc, tb) -> None: await self._terminate() async def _read_url(self) -> str: assert self._proc is not None and self._proc.stdout is not None while True: line_bytes = await self._proc.stdout.readline() if not line_bytes: raise TunnelUnavailableError( "cloudflared exited before announcing a tunnel URL." ) line = line_bytes.decode("utf-8", errors="replace") match = _TRYCLOUDFLARE_RE.search(line) if match: return match.group(0) async def _terminate(self) -> None: if self._proc is None or self._proc.returncode is not None: return try: self._proc.send_signal(signal.SIGTERM) try: await asyncio.wait_for(self._proc.wait(), timeout=3.0) except asyncio.TimeoutError: logger.debug("cloudflared did not exit on SIGTERM; sending SIGKILL") self._proc.kill() await self._proc.wait() except ProcessLookupError: # Process already exited between the returncode check above and # our send_signal; nothing to clean up. pass finally: self._proc = NoneMethods
async def wait_until_edge_reachable(self, *, timeout_s: Optional[float] = None) ‑> None-
Poll until the tunnel URL is globally reachable.
Cloudflared announces the URL as soon as its side connects to Cloudflare's network, but DNS + edge caching can take additional seconds before the URL is actually reachable from the public internet. Without this wait, Twilio's TwiML fetch races and silently fails (call completed, duration 0, no notification).
Resolution strategy: race multiple resolvers, return on the first one that says "this host resolves." Empirically, both Cloudflare DoH and the system resolver have been observed to lag the other on freshly-minted trycloudflare hostnames; either can be authoritative on a given day. We need ANY resolver to say yes, because Twilio's edge will hit one of these paths too.
Resolvers tried each tick: - Cloudflare DoH (1.1.1.1) — historically the canonical signal. - Google DoH (dns.google) — backup public resolver. - System getaddrinfo — what local DNS sees; fastest when Cloudflare DoH is the laggard (observed 2026-05-13).
Caller is responsible for starting the local HTTP server before calling this — otherwise the HTTP probe sees 502 from Cloudflare (origin down). When used via
TwilioHarness, the harness wires this correctly.Expand source code
async def wait_until_edge_reachable(self, *, timeout_s: Optional[float] = None) -> None: """Poll until the tunnel URL is globally reachable. Cloudflared announces the URL as soon as its side connects to Cloudflare's network, but DNS + edge caching can take additional seconds before the URL is actually reachable from the public internet. Without this wait, Twilio's TwiML fetch races and silently fails (call completed, duration 0, no notification). Resolution strategy: race multiple resolvers, return on the first one that says "this host resolves." Empirically, both Cloudflare DoH and the system resolver have been observed to lag the other on freshly-minted trycloudflare hostnames; either can be authoritative on a given day. We need ANY resolver to say yes, because Twilio's edge will hit one of these paths too. Resolvers tried each tick: - Cloudflare DoH (1.1.1.1) — historically the canonical signal. - Google DoH (dns.google) — backup public resolver. - System getaddrinfo — what local DNS sees; fastest when Cloudflare DoH is the laggard (observed 2026-05-13). Caller is responsible for starting the local HTTP server *before* calling this — otherwise the HTTP probe sees 502 from Cloudflare (origin down). When used via ``TwilioHarness``, the harness wires this correctly. """ assert self.public_url is not None deadline = asyncio.get_running_loop().time() + (timeout_s or self.edge_ready_timeout_s) last_error: Optional[str] = None host = self.public_url.replace("https://", "").replace("http://", "").rstrip("/").split("/")[0] async def _via_doh(client: httpx.AsyncClient, url: str) -> Optional[str]: r = await client.get( url, params={"name": host, "type": "A"}, headers={"accept": "application/dns-json"}, ) data = r.json() if data.get("Status") == 0 and data.get("Answer"): return data["Answer"][0].get("data") return None async def _via_system() -> Optional[str]: try: infos = await asyncio.get_running_loop().getaddrinfo( host, None, family=0, type=0, proto=0, flags=0 ) if infos: return infos[0][4][0] return None except Exception: return None async with httpx.AsyncClient(timeout=5.0) as doh_client: while asyncio.get_running_loop().time() < deadline: # Race three resolvers — succeed if ANY say "resolved" within this tick. tasks = [ asyncio.create_task(_via_doh(doh_client, "https://cloudflare-dns.com/dns-query")), asyncio.create_task(_via_doh(doh_client, "https://dns.google/resolve")), asyncio.create_task(_via_system()), ] # Wait for ALL to complete (or timeout) — fast resolvers # returning None must not cause us to give up before slower # resolvers (which may have the answer) finish. done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED, timeout=5.0) resolved_ip: Optional[str] = None fail_reasons: list[str] = [] for t in done: try: result = t.result() if result: resolved_ip = result break fail_reasons.append("no answer") except Exception as e: fail_reasons.append(f"{type(e).__name__}: {e}") for p in pending: p.cancel() if resolved_ip: logger.debug( "cloudflared tunnel %s resolves globally (A=%s)", host, resolved_ip, ) return last_error = ", ".join(fail_reasons) or "no resolver responded" await asyncio.sleep(1.0) raise TunnelUnavailableError( f"cloudflared tunnel {self.public_url} did not become globally " f"resolvable within {timeout_s or self.edge_ready_timeout_s}s. " f"last_error={last_error}" )
-
Raised when cloudflared is missing or the tunnel URL never appears.
Expand source code
class TunnelUnavailableError(RuntimeError): """Raised when cloudflared is missing or the tunnel URL never appears."""Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class TwilioHarness (*, account_sid: str, auth_token: str, phone_number: str, http_port: int = 8765, allowed_callers: Optional[list[str]] = None, on_dtmf: Optional[Callable[[str], None]] = None, validate_signature: bool = True)-
Async context manager yielding a connected
TwilioAgentAdapter.The adapter's
public_base_urlis set from the cloudflared quick tunnel URL — no DNS or account setup required.Expand source code
class TwilioHarness: """ Async context manager yielding a connected ``TwilioAgentAdapter``. The adapter's ``public_base_url`` is set from the cloudflared quick tunnel URL — no DNS or account setup required. """ def __init__( self, *, account_sid: str, auth_token: str, phone_number: str, http_port: int = 8765, allowed_callers: Optional[list[str]] = None, on_dtmf: Optional[Callable[[str], None]] = None, validate_signature: bool = True, ) -> None: self._account_sid = account_sid self._auth_token = auth_token self._phone_number = phone_number self._http_port = http_port self._allowed_callers = allowed_callers self._on_dtmf = on_dtmf self._validate_signature = validate_signature self._tunnel: Optional[CloudflareTunnel] = None self._adapter: Optional[TwilioAgentAdapter] = None async def __aenter__(self) -> TwilioAgentAdapter: self._tunnel = CloudflareTunnel(port=self._http_port) await self._tunnel.__aenter__() assert self._tunnel.public_url is not None self._adapter = TwilioAgentAdapter( account_sid=self._account_sid, auth_token=self._auth_token, phone_number=self._phone_number, public_base_url=self._tunnel.public_url, allowed_callers=self._allowed_callers, on_dtmf=self._on_dtmf, http_port=self._http_port, validate_signature=self._validate_signature, ) try: await self._adapter.connect() # Now that the FastAPI server is bound on localhost, wait for the # Cloudflare edge to actually route inbound traffic to it. This # prevents the "Twilio fetched TwiML too early, got a 502, dropped # the call with duration=0" race that otherwise bites callers # placing outbound calls immediately after harness startup. await self._tunnel.wait_until_edge_reachable() except Exception: if self._adapter is not None: try: await self._adapter.disconnect() except Exception: # Startup-path cleanup is best-effort; a secondary # disconnect error must not mask the original failure # we're about to re-raise. logger.exception("TwilioHarness: adapter.disconnect() during aborted startup raised") await self._tunnel.__aexit__(None, None, None) self._tunnel = None self._adapter = None raise # Don't leak full E.164 number into the workflow log (CI retains # for 14 days). Reuse the adapter's redactor — last-4 is enough # to identify which test number is in use. from ..adapters._twilio_shared import _redact_e164 logger.info( "TwilioHarness ready — tunnel %s → localhost:%d, number %s", self._tunnel.public_url, self._http_port, _redact_e164(self._phone_number), ) return self._adapter async def __aexit__(self, exc_type, exc, tb) -> None: # Adapter disconnect first (restores webhook), then tunnel teardown. if self._adapter is not None: try: await self._adapter.disconnect() except Exception: logger.exception("TwilioHarness: adapter.disconnect() raised") if self._tunnel is not None: try: await self._tunnel.__aexit__(exc_type, exc, tb) except Exception: logger.exception("TwilioHarness: tunnel teardown raised") self._adapter = None self._tunnel = None