Module scenario.voice.effects

Audio effects pipeline for the voice user simulator (§4.5).

Per the TTS cache key locked decision: effects are applied AFTER the TTS cache hit and are never baked into the cached audio.

Each effect is a Callable[[bytes], bytes] that takes PCM16 @ 24kHz mono and returns PCM16 @ 24kHz mono. This keeps them trivially composable.

Accents are handled via TTS voice selection (voice="elevenlabs/raj_indian_english"), not via post-processing. There is no accent effect — by design (§4.5 L536-544).

Expand source code
"""
Audio effects pipeline for the voice user simulator (§4.5).

Per the TTS cache key locked decision: effects are applied AFTER the TTS
cache hit and are never baked into the cached audio.

Each effect is a ``Callable[[bytes], bytes]`` that takes PCM16 @ 24kHz mono
and returns PCM16 @ 24kHz mono. This keeps them trivially composable.

Accents are handled via TTS voice selection (``voice="elevenlabs/raj_indian_english"``),
not via post-processing. There is no ``accent`` effect — by design (§4.5 L536-544).
"""

from __future__ import annotations

from .custom import custom
from .noise import background_noise, multiple_voices, static
from .prosody import high_volume, low_volume, speaking_fast, speaking_slow
from .quality import (
    breaking_up,
    echo,
    low_quality,
    packet_loss,
    phone_quality,
    robotic,
)

__all__ = [
    "background_noise",
    "breaking_up",
    "custom",
    "echo",
    "high_volume",
    "low_quality",
    "low_volume",
    "multiple_voices",
    "packet_loss",
    "phone_quality",
    "robotic",
    "speaking_fast",
    "speaking_slow",
    "static",
]

Sub-modules

scenario.voice.effects.noise

Noise-class effects: background_noise, static, multiple_voices.

scenario.voice.effects.prosody

Prosody effects: volume scaling and time-stretching speech.

scenario.voice.effects.quality

Quality-degradation effects: phone_quality, low_quality, packet_loss, echo, robotic, breaking_up.

Functions

def background_noise(preset_or_path: str, volume: float = 0.3) ‑> Callable[[bytes], bytes]

Overlay ambient noise. preset_or_path is one of the bundled presets (cafe, street, office, airport) or a path to a WAV file.

Expand source code
def background_noise(preset_or_path: str, volume: float = 0.3) -> EffectFn:
    """
    Overlay ambient noise. ``preset_or_path`` is one of the bundled presets
    (``cafe``, ``street``, ``office``, ``airport``) or a path to a WAV file.
    """
    if preset_or_path in _BACKGROUND_PRESETS:
        sample = _load_sample(preset_or_path)
    else:
        # Only treat the argument as a filesystem path when it clearly is one
        # (contains a separator or ends with .wav). Avoids the cwd-relative
        # footgun where "cfae" (typo of "cafe") matches a stray local file.
        looks_like_path = (
            "/" in preset_or_path
            or "\\" in preset_or_path
            or preset_or_path.lower().endswith(".wav")
        )
        if not looks_like_path:
            raise ValueError(
                f"background_noise: preset {preset_or_path!r} is not one of "
                f"{sorted(_BACKGROUND_PRESETS)}. To load a custom WAV pass a "
                "path containing a separator or ending with .wav."
            )
        p = Path(preset_or_path)
        if not p.exists():
            raise ValueError(
                f"background_noise: path {preset_or_path!r} does not exist"
            )
        sample = _wav_to_np(p.read_bytes())

    def _apply(audio: bytes) -> bytes:
        signal = pcm16_to_np(audio).astype(np.float32)
        if len(sample) == 0 or len(signal) == 0:
            return audio
        # Tile the noise to match signal length.
        reps = (len(signal) + len(sample) - 1) // len(sample)
        noise = np.tile(sample.astype(np.float32), reps)[: len(signal)]
        mixed = signal + noise * float(volume)
        return np_to_pcm16(mixed)

    return _apply
def breaking_up() ‑> Callable[[bytes], bytes]

Simulate intermittent connection: larger and more frequent dropouts than packet_loss.

Expand source code
def breaking_up() -> EffectFn:
    """Simulate intermittent connection: larger and more frequent dropouts than packet_loss."""

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio).copy()
        if len(arr) == 0:
            return audio
        chunk_samples = (rate() * 100) // 1000  # 100ms windows
        rng = np.random.default_rng()
        for i in range(0, len(arr), chunk_samples):
            if rng.random() < 0.2:
                arr[i : i + chunk_samples] = 0
        return np_to_pcm16(arr)

    return _apply
def custom(fn: EffectFn) ‑> Callable[[bytes], bytes]

Wrap a user-supplied fn(audio_bytes) -> audio_bytes.

Expand source code
def custom(fn: EffectFn) -> EffectFn:
    """Wrap a user-supplied ``fn(audio_bytes) -> audio_bytes``."""
    if not callable(fn):
        raise TypeError("custom() requires a callable that takes and returns bytes")

    def _apply(audio: bytes) -> bytes:
        result = fn(audio)
        if not isinstance(result, (bytes, bytearray)):
            raise TypeError("custom effect function must return bytes")
        return bytes(result)

    return _apply
def echo(delay_ms: int = 200, decay: float = 0.5) ‑> Callable[[bytes], bytes]

Overlay a delayed/attenuated copy of the signal.

Expand source code
def echo(delay_ms: int = 200, decay: float = 0.5) -> EffectFn:
    """Overlay a delayed/attenuated copy of the signal."""

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio).astype(np.float32)
        if len(arr) == 0:
            return audio
        delay_samples = (rate() * delay_ms) // 1000
        if delay_samples >= len(arr):
            return audio
        delayed = np.zeros_like(arr)
        delayed[delay_samples:] = arr[:-delay_samples] * decay
        return np_to_pcm16(arr + delayed)

    return _apply
def high_volume(factor: float = 1.5) ‑> Callable[[bytes], bytes]

Scale amplitude up by factor (>= 1). Clips at int16 bounds.

Expand source code
def high_volume(factor: float = 1.5) -> EffectFn:
    """Scale amplitude up by ``factor`` (>= 1). Clips at int16 bounds."""
    if factor < 1:
        raise ValueError("high_volume factor must be >= 1")

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio).astype(np.float32) * factor
        return np_to_pcm16(arr)

    return _apply
def low_quality(bitrate: int = 8000) ‑> Callable[[bytes], bytes]

Downsample to bitrate Hz and back, simulating a low-bitrate codec.

Expand source code
def low_quality(bitrate: int = 8000) -> EffectFn:
    """Downsample to ``bitrate`` Hz and back, simulating a low-bitrate codec."""

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio)
        if len(arr) == 0 or bitrate >= rate():
            return audio
        # Downsample then upsample — introduces aliasing and quantisation noise.
        down_len = max(1, int(len(arr) * bitrate / rate()))
        down_idx = np.linspace(0, len(arr) - 1, down_len).astype(np.int64)
        down = arr[down_idx]
        up_idx = np.linspace(0, len(down) - 1, len(arr)).astype(np.int64)
        return np_to_pcm16(down[up_idx])

    return _apply
def low_volume(factor: float = 0.5) ‑> Callable[[bytes], bytes]

Scale amplitude down by factor (0 < factor <= 1).

Expand source code
def low_volume(factor: float = 0.5) -> EffectFn:
    """Scale amplitude down by ``factor`` (0 < factor <= 1)."""
    if factor <= 0:
        raise ValueError("low_volume factor must be > 0")

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio).astype(np.float32) * factor
        return np_to_pcm16(arr)

    return _apply
def multiple_voices(background_audio: Optional[str] = None) ‑> Callable[[bytes], bytes]

Mix with a babble speech sample to simulate background conversation.

Expand source code
def multiple_voices(background_audio: Optional[str] = None) -> EffectFn:
    """Mix with a babble speech sample to simulate background conversation."""
    sample = _load_sample("babble") if background_audio is None else _wav_to_np(Path(background_audio).read_bytes())

    def _apply(audio: bytes) -> bytes:
        signal = pcm16_to_np(audio).astype(np.float32)
        if len(sample) == 0 or len(signal) == 0:
            return audio
        reps = (len(signal) + len(sample) - 1) // len(sample)
        babble = np.tile(sample.astype(np.float32), reps)[: len(signal)]
        return np_to_pcm16(signal + babble * 0.3)

    return _apply
def packet_loss(probability: float = 0.05, chunk_ms: int = 20) ‑> Callable[[bytes], bytes]

Zero out random chunk_ms windows at the given probability.

Expand source code
def packet_loss(probability: float = 0.05, chunk_ms: int = 20) -> EffectFn:
    """Zero out random ``chunk_ms`` windows at the given probability."""
    if not 0 <= probability <= 1:
        raise ValueError("packet_loss probability must be in [0, 1]")

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio).copy()
        if len(arr) == 0:
            return audio
        chunk_samples = max(1, (rate() * chunk_ms) // 1000)
        rng = np.random.default_rng()
        for i in range(0, len(arr), chunk_samples):
            if rng.random() < probability:
                arr[i : i + chunk_samples] = 0
        return np_to_pcm16(arr)

    return _apply
def phone_quality() ‑> Callable[[bytes], bytes]

Bandpass 300Hz-3.4kHz + amplitude compression to mimic a phone line.

Expand source code
def phone_quality() -> EffectFn:
    """Bandpass 300Hz-3.4kHz + amplitude compression to mimic a phone line."""

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio).astype(np.float32)
        if len(arr) == 0:
            return audio
        # Simple bandpass via FFT — cheap and dependency-free.
        fft = np.fft.rfft(arr)
        freqs = np.fft.rfftfreq(len(arr), d=1.0 / rate())
        mask = (freqs >= 300) & (freqs <= 3400)
        fft *= mask
        filtered = np.fft.irfft(fft, n=len(arr))
        # Mild compression (saturate)
        compressed = np.tanh(filtered / 16000.0) * 16000.0
        return np_to_pcm16(compressed)

    return _apply
def robotic() ‑> Callable[[bytes], bytes]

Crude vocoder-ish effect: ring-modulate the signal with a low-freq carrier.

Expand source code
def robotic() -> EffectFn:
    """Crude vocoder-ish effect: ring-modulate the signal with a low-freq carrier."""

    def _apply(audio: bytes) -> bytes:
        arr = pcm16_to_np(audio).astype(np.float32)
        if len(arr) == 0:
            return audio
        t = np.arange(len(arr)) / rate()
        carrier = np.sin(2 * np.pi * 100 * t)
        return np_to_pcm16(arr * carrier)

    return _apply
def speaking_fast(factor: float = 1.3) ‑> Callable[[bytes], bytes]

Time-stretch to speak faster (factor > 1). Linear resample, so pitch shifts.

Source table says "time-stretch without pitch change" — true pitch-preserving stretching needs phase vocoder. Our implementation prioritises simplicity and zero extra deps; pitch shift is a documented tradeoff.

Expand source code
def speaking_fast(factor: float = 1.3) -> EffectFn:
    """
    Time-stretch to speak faster (factor > 1). Linear resample, so pitch shifts.

    Source table says "time-stretch without pitch change" — true pitch-preserving
    stretching needs phase vocoder. Our implementation prioritises simplicity
    and zero extra deps; pitch shift is a documented tradeoff.
    """
    if factor <= 1:
        raise ValueError("speaking_fast factor must be > 1")
    return _resample_factor(factor)
def speaking_slow(factor: float = 0.7) ‑> Callable[[bytes], bytes]

Time-stretch to speak slower (factor < 1). Same pitch tradeoff as speaking_fast.

Expand source code
def speaking_slow(factor: float = 0.7) -> EffectFn:
    """Time-stretch to speak slower (factor < 1). Same pitch tradeoff as speaking_fast."""
    if factor >= 1:
        raise ValueError("speaking_slow factor must be < 1")
    return _resample_factor(factor)
def static(intensity: float = 0.05) ‑> Callable[[bytes], bytes]

Overlay white-noise static at the given intensity (fraction of full scale).

Expand source code
def static(intensity: float = 0.05) -> EffectFn:
    """Overlay white-noise static at the given intensity (fraction of full scale)."""

    def _apply(audio: bytes) -> bytes:
        signal = pcm16_to_np(audio).astype(np.float32)
        if len(signal) == 0:
            return audio
        noise = (np.random.default_rng().standard_normal(len(signal)) * 32767 * intensity)
        return np_to_pcm16(signal + noise)

    return _apply