Skip to content

Streaming Response Pattern

Test agents that stream responses in chunks using HTTP chunked transfer encoding. This provides progressive output as the LLM generates text.

Use Cases

  • Long-form responses where you want progressive output
  • Better perceived performance for users
  • Agents that use streaming LLM APIs
  • Real-time feel for generated content

Complete Working Example

This example demonstrates:

  • Creating a server that streams real LLM responses
  • Building an adapter that collects chunks into complete response
  • Proper stream reading with appropriate APIs
  • Full scenario test with streaming
test_testing_remote_agents_streaming
# Source: https://github.com/langwatch/scenario/blob/main/python/examples/test_testing_remote_agents_streaming.py
"""
Example: Testing an agent that returns streaming responses
 
This test demonstrates handling agents that stream their responses in chunks
rather than returning a complete message at once. The server uses real LLM streaming.
"""
 
import asyncio
import json
from aiohttp import web
import aiohttp
import pytest
import pytest_asyncio
import scenario
from openai import AsyncOpenAI
 
# Base URL for the test server (set during server startup)
base_url = ""
 
 
class StreamingAgentAdapter(scenario.AgentAdapter):
    """
    Adapter for testing agents that stream responses in chunks.
 
    This adapter:
    1. Makes an HTTP POST request to the streaming endpoint
    2. Collects all chunks as they arrive
    3. Returns the complete response after streaming completes
    """
 
    async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes:
        # Request streaming response from your agent
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{base_url}/chat/stream",
                json={"messages": input.messages},
            ) as response:
                # Collect all chunks into a single response
                full_response = ""
 
                # Read stream chunk by chunk
                async for chunk in response.content.iter_any():
                    # Decode chunk and append to full response
                    full_response += chunk.decode("utf-8")
 
                # Return complete response after all chunks received
                return full_response
 
 
# OpenAI client for LLM
client = AsyncOpenAI()
 
 
async def stream_handler(request: web.Request) -> web.StreamResponse:
    """
    HTTP endpoint that streams LLM responses chunk by chunk.
 
    This uses chunked transfer encoding to send the response progressively.
    """
    data = await request.json()
    messages = data["messages"]
 
    # Determine last user message content
    last_msg = messages[-1]
    content = last_msg["content"]
    if not isinstance(content, str):
        content = ""
 
    # Set up streaming response
    response = web.StreamResponse()
    response.headers["Content-Type"] = "text/plain"
    response.headers["Transfer-Encoding"] = "chunked"
    await response.prepare(request)
 
    # Stream response using real LLM
    stream = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": "You are a helpful weather assistant. Provide brief, friendly responses, immediately. Pretend like you have access to a weather API and make up the weather.",
            },
            {"role": "user", "content": content},
        ],
        temperature=0.7,
        stream=True,
    )
 
    # Stream chunks to client
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            await response.write(chunk.choices[0].delta.content.encode("utf-8"))
 
    await response.write_eof()
    return response
 
 
@pytest_asyncio.fixture
async def test_server():
    """
    Start a test HTTP server before tests and shut it down after.
 
    This server simulates a deployed agent endpoint with streaming.
    """
    global base_url
 
    # Create web application
    app = web.Application()
    app.router.add_post("/chat/stream", stream_handler)
 
    # Start server on random available port
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, "localhost", 0)
    await site.start()
 
    # Get the actual port assigned
    server = site._server
    assert server is not None
    port = server.sockets[0].getsockname()[1]  # type: ignore[union-attr]
    base_url = f"http://localhost:{port}"
 
    yield
 
    # Cleanup: stop server
    await runner.cleanup()
 
 
@pytest.mark.asyncio
async def test_streaming_response(test_server):
    """
    Test agent via HTTP endpoint with streaming response.
 
    This test verifies:
    - Adapter correctly handles streaming chunks
    - Complete response is assembled from chunks
    - Agent provides relevant weather information
    - Full scenario flow works with streaming
    """
    result = await scenario.run(
        name="Streaming weather response",
        description="User asks about weather and receives streamed response",
        agents=[
            scenario.UserSimulatorAgent(model="openai/gpt-4o-mini"),
            StreamingAgentAdapter(),
            scenario.JudgeAgent(
                model="openai/gpt-4o-mini",
                criteria=[
                    "Agent should provide weather information",
                    "Response should be complete and coherent",
                ],
            ),
        ],
        script=[
            scenario.user("What's the weather forecast in Amsterdam?"),
            scenario.agent(),
            scenario.judge(),
        ],
        set_id="python-examples",
    )
 
    assert result.success

Key Points

  1. Server streams chunks: Uses streamText to get real LLM streaming
  2. Transfer-Encoding: chunked: HTTP header for streaming
  3. Collect all chunks: Adapter reads stream completely before returning
  4. TextDecoder: Properly decode binary chunks to text
  5. Return complete response: Scenario expects full text, not chunks

Adapter Pattern

The adapter collects all streamed chunks:

const streamingAdapter: AgentAdapter = {
  role: AgentRole.AGENT,
  call: async (input) => {
    // Request streaming response
    const response = await fetch(`${baseUrl}/chat/stream`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ messages: input.messages }),
    });
 
    // Collect chunks
    let fullResponse = "";
    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
 
    if (reader) {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        fullResponse += decoder.decode(value, { stream: true });
      }
    }
 
    return fullResponse;
  },
};

Why Not Return Chunks?

Scenario expects complete responses because:

  • Judge needs full context: Can't evaluate partial responses
  • Message history: Stores complete messages for conversation context
  • Simplicity: One response per turn

If you need to test streaming behavior specifically (latency, chunk timing), consider integration tests outside Scenario.

Testing Your Own Agent

To test your streaming agent:

const myStreamingAdapter: AgentAdapter = {
  role: AgentRole.AGENT,
  call: async (input) => {
    const response = await fetch("https://my-agent.com/stream", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ query: input.messages }),
    });
 
    let result = "";
    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
 
    if (reader) {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        result += decoder.decode(value, { stream: true });
      }
    }
 
    return result;
  },
};

See Also