Module scenario.judge_agent

Use the Judge Agent module in Scenario to evaluate conversation quality and LLM reasoning during AI agent testing.

This module provides the JudgeAgent class, which evaluates ongoing conversations between users and agents to determine if success criteria are met. The judge makes real-time decisions about whether scenarios should continue or end with success/failure verdicts.

Expand source code
"""
Use the Judge Agent module in Scenario to evaluate conversation quality and LLM reasoning during AI agent testing.

This module provides the JudgeAgent class, which evaluates ongoing conversations
between users and agents to determine if success criteria are met. The judge
makes real-time decisions about whether scenarios should continue or end with
success/failure verdicts.
"""

import json
import logging
import re
from typing import Any, List, Optional, Sequence, cast

import litellm
from litellm import Choices
from litellm.files.main import ModelResponse

from scenario.cache import scenario_cache
from scenario.agent_adapter import AgentAdapter
from scenario.config import ModelConfig, ScenarioConfig

from ._error_messages import agent_not_configured_error_message
from ._judge import JudgeUtils, judge_span_digest_formatter
from ._judge.estimate_tokens import estimate_tokens, DEFAULT_TOKEN_THRESHOLD
from ._judge.trace_tools import expand_trace, grep_trace
from ._tracing import judge_span_collector, JudgeSpanCollector
from .types import AgentInput, AgentReturnTypes, AgentRole, ScenarioResult


logger = logging.getLogger("scenario")


class JudgeAgent(AgentAdapter):
    """
    Agent that evaluates conversations against success criteria.

    The JudgeAgent watches conversations in real-time and makes decisions about
    whether the agent under test is meeting the specified criteria. It can either
    allow the conversation to continue or end it with a success/failure verdict.

    The judge uses function calling to make structured decisions and provides
    detailed reasoning for its verdicts. It evaluates each criterion independently
    and provides comprehensive feedback about what worked and what didn't.

    Attributes:
        role: Always AgentRole.JUDGE for judge agents
        model: LLM model identifier to use for evaluation
        api_base: Optional base URL where the model is hosted
        api_key: Optional API key for the model provider
        temperature: Sampling temperature for evaluation consistency
        max_tokens: Maximum tokens for judge reasoning
        criteria: List of success criteria to evaluate against
        system_prompt: Custom system prompt to override default judge behavior

    Example:
        ```
        import scenario

        # Basic judge agent with criteria
        judge = scenario.JudgeAgent(
            criteria=[
                "Agent provides helpful responses",
                "Agent asks relevant follow-up questions",
                "Agent does not provide harmful information"
            ]
        )

        # Customized judge with specific model and behavior
        strict_judge = scenario.JudgeAgent(
            model="openai/gpt-4.1-mini",
            criteria=[
                "Code examples are syntactically correct",
                "Explanations are technically accurate",
                "Security best practices are mentioned"
            ],
            temperature=0.0,  # More deterministic evaluation
            system_prompt="You are a strict technical reviewer evaluating code quality."
        )

        # Use in scenario
        result = await scenario.run(
            name="coding assistant test",
            description="User asks for help with Python functions",
            agents=[
                coding_agent,
                scenario.UserSimulatorAgent(),
                judge
            ]
        )

        print(f"Passed criteria: {result.passed_criteria}")
        print(f"Failed criteria: {result.failed_criteria}")
        ```

    Note:
        - Judge agents evaluate conversations continuously, not just at the end
        - They can end scenarios early if clear success/failure conditions are met
        - Provide detailed reasoning for their decisions
        - Support both positive criteria (things that should happen) and negative criteria (things that shouldn't)
    """

    role = AgentRole.JUDGE

    model: str
    api_base: Optional[str]
    api_key: Optional[str]
    temperature: float
    max_tokens: Optional[int]
    criteria: List[str]
    system_prompt: Optional[str]
    _extra_params: dict
    _span_collector: JudgeSpanCollector
    _token_threshold: int
    _max_discovery_steps: int

    def __init__(
        self,
        *,
        criteria: Optional[List[str]] = None,
        model: Optional[str] = None,
        api_base: Optional[str] = None,
        api_key: Optional[str] = None,
        temperature: float = 0.0,
        max_tokens: Optional[int] = None,
        system_prompt: Optional[str] = None,
        span_collector: Optional[JudgeSpanCollector] = None,
        token_threshold: int = DEFAULT_TOKEN_THRESHOLD,
        max_discovery_steps: int = 10,
        **extra_params,
    ):
        """
        Initialize a judge agent with evaluation criteria.

        Args:
            criteria: List of success criteria to evaluate the conversation against.
                     Can include both positive requirements ("Agent provides helpful responses")
                     and negative constraints ("Agent should not provide personal information").
            model: LLM model identifier (e.g., "openai/gpt-4.1-mini").
                   If not provided, uses the default model from global configuration.
            api_base: Optional base URL where the model is hosted. If not provided,
                      uses the base URL from global configuration.
            api_key: API key for the model provider. If not provided,
                     uses the key from global configuration or environment.
            temperature: Sampling temperature for evaluation (0.0-1.0).
                        Lower values (0.0-0.2) recommended for consistent evaluation.
            max_tokens: Maximum number of tokens for judge reasoning and explanations.
            system_prompt: Custom system prompt to override default judge behavior.
                          Use this to create specialized evaluation perspectives.
            span_collector: Optional span collector for telemetry. Defaults to global singleton.
            token_threshold: Estimated token count above which traces switch to
                            structure-only rendering with progressive discovery tools.
                            Defaults to 8192.
            max_discovery_steps: Maximum number of expand/grep tool calls the judge
                                can make before being forced to return a verdict.
                                Defaults to 10.

        Raises:
            Exception: If no model is configured either in parameters or global config

        Example:
            ```
            # Customer service judge
            cs_judge = JudgeAgent(
                criteria=[
                    "Agent replies with the refund policy",
                    "Agent offers next steps for the customer",
                ],
                temperature=0.1
            )

            # Technical accuracy judge
            tech_judge = JudgeAgent(
                criteria=[
                    "Agent adds a code review pointing out the code compilation errors",
                    "Agent adds a code review about the missing security headers"
                ],
                system_prompt="You are a senior software engineer reviewing code for production use."
            )
            ```

        Note:
            Advanced usage: Additional parameters can be passed as keyword arguments
            (e.g., headers, timeout, client) for specialized configurations. These are
            experimental and may not be supported in future versions.
        """
        self.criteria = criteria or []
        self.api_base = api_base
        self.api_key = api_key
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.system_prompt = system_prompt
        self._span_collector = span_collector or judge_span_collector
        self._token_threshold = token_threshold
        self._max_discovery_steps = max_discovery_steps

        if model:
            self.model = model

        if ScenarioConfig.default_config is not None and isinstance(
            ScenarioConfig.default_config.default_model, str
        ):
            self.model = model or ScenarioConfig.default_config.default_model
            self._extra_params = extra_params
        elif ScenarioConfig.default_config is not None and isinstance(
            ScenarioConfig.default_config.default_model, ModelConfig
        ):
            self.model = model or ScenarioConfig.default_config.default_model.model
            self.api_base = (
                api_base or ScenarioConfig.default_config.default_model.api_base
            )
            self.api_key = (
                api_key or ScenarioConfig.default_config.default_model.api_key
            )
            self.temperature = (
                temperature or ScenarioConfig.default_config.default_model.temperature
            )
            self.max_tokens = (
                max_tokens or ScenarioConfig.default_config.default_model.max_tokens
            )
            # Extract extra params from ModelConfig
            config_dict = ScenarioConfig.default_config.default_model.model_dump(
                exclude_none=True
            )
            config_dict.pop("model", None)
            config_dict.pop("api_base", None)
            config_dict.pop("api_key", None)
            config_dict.pop("temperature", None)
            config_dict.pop("max_tokens", None)
            # Merge: config extras < agent extra_params
            self._extra_params = {**config_dict, **extra_params}
        else:
            self._extra_params = extra_params

        if not hasattr(self, "model"):
            raise Exception(agent_not_configured_error_message("JudgeAgent"))

    @scenario_cache()
    async def call(
        self,
        input: AgentInput,
    ) -> AgentReturnTypes:
        """
        Evaluate the current conversation state against the configured criteria.

        This method analyzes the conversation history and determines whether the
        scenario should continue or end with a verdict. It uses function calling
        to make structured decisions and provides detailed reasoning.

        Args:
            input: AgentInput containing conversation history and scenario context

        Returns:
            AgentReturnTypes: Either an empty list (continue scenario) or a
                            ScenarioResult (end scenario with verdict)

        Raises:
            Exception: If the judge cannot make a valid decision or if there's an
                      error in the evaluation process

        Note:
            - Returns empty list [] to continue the scenario
            - Returns ScenarioResult to end with success/failure
            - Provides detailed reasoning for all decisions
            - Evaluates each criterion independently
            - Can end scenarios early if clear violation or success is detected
        """

        scenario = input.scenario_state
        effective_criteria = (
            input.judgment_request.criteria
            if input.judgment_request and input.judgment_request.criteria is not None
            else self.criteria
        )

        # Build transcript and traces digest
        transcript = JudgeUtils.build_transcript_from_messages(input.messages)
        spans = self._span_collector.get_spans_for_thread(input.thread_id)
        digest, is_large_trace = self._build_trace_digest(spans)

        logger.debug(f"OpenTelemetry traces built: {digest[:200]}...")

        content_for_judge = f"""
<transcript>
{transcript}
</transcript>
<opentelemetry_traces>
{digest}
</opentelemetry_traces>
"""

        criteria_str = "\n".join(
            [f"{idx + 1}. {criterion}" for idx, criterion in enumerate(effective_criteria)]
        )

        messages: List[dict] = [
            {
                "role": "system",
                "content": self.system_prompt
                or f"""
<role>
You are an LLM as a judge watching a simulated conversation as it plays out live to determine if the agent under test meets the criteria or not.
</role>

<goal>
Your goal is to determine if you already have enough information to make a verdict of the scenario below, or if the conversation should continue for longer.
If you do have enough information, use the finish_test tool to determine if all the criteria have been met, if not, use the continue_test tool to let the next step play out.
</goal>

<scenario>
{scenario.description}
</scenario>

<criteria>
{criteria_str}
</criteria>

<rules>
- Be strict, do not let the conversation continue if the agent already broke one of the "do not" or "should not" criterias.
- DO NOT make any judgment calls that are not explicitly listed in the success or failure criteria, withhold judgement if necessary
</rules>
""",
            },
            {"role": "user", "content": content_for_judge},
        ]

        max_turns = input.scenario_state.config.max_turns or 10
        is_last_message = (
            input.scenario_state.current_turn >= max_turns - 1
        )

        if is_last_message:
            messages.append(
                {
                    "role": "user",
                    "content": """
System:

<finish_test>
This is the last message, conversation has reached the maximum number of turns, give your final verdict,
if you don't have enough information to make a verdict, say inconclusive with max turns reached.
</finish_test>
""",
                }
            )

        # Define the tools
        criteria_names = [
            re.sub(
                r"[^a-zA-Z0-9]",
                "_",
                criterion.replace(" ", "_").replace("'", "").lower(),
            )[:70]
            for criterion in effective_criteria
        ]
        tools: List[dict] = [
            {
                "type": "function",
                "function": {
                    "name": "continue_test",
                    "description": "Continue the test with the next step",
                    "strict": True,
                    "parameters": {
                        "type": "object",
                        "properties": {},
                        "required": [],
                        "additionalProperties": False,
                    },
                },
            },
            {
                "type": "function",
                "function": {
                    "name": "finish_test",
                    "description": "Complete the test with a final verdict",
                    "strict": True,
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "criteria": {
                                "type": "object",
                                "properties": {
                                    criteria_names[idx]: {
                                        "type": "string",
                                        "enum": ["true", "false", "inconclusive"],
                                        "description": criterion,
                                    }
                                    for idx, criterion in enumerate(effective_criteria)
                                },
                                "required": criteria_names,
                                "additionalProperties": False,
                                "description": "Strict verdict for each criterion",
                            },
                            "reasoning": {
                                "type": "string",
                                "description": "Explanation of what the final verdict should be",
                            },
                            "verdict": {
                                "type": "string",
                                "enum": ["success", "failure", "inconclusive"],
                                "description": "The final verdict of the test",
                            },
                        },
                        "required": ["criteria", "reasoning", "verdict"],
                        "additionalProperties": False,
                    },
                },
            },
        ]

        if is_large_trace:
            tools = self._build_progressive_discovery_tools() + tools

        enforce_judgment = input.judgment_request is not None
        has_criteria = len(effective_criteria) > 0

        if enforce_judgment and not has_criteria:
            return ScenarioResult(
                success=False,
                messages=[],
                reasoning="TestingAgent was called as a judge, but it has no criteria to judge against",
            )

        tool_choice: Any = (
            {"type": "function", "function": {"name": "finish_test"}}
            if (is_last_message or enforce_judgment) and has_criteria
            else "required"
        )

        # Multi-step discovery loop for large traces
        if is_large_trace:
            return self._run_discovery_loop(
                messages=messages,
                tools=tools,
                tool_choice=tool_choice,
                spans=spans,
                effective_criteria=effective_criteria,
            )

        # Standard single-call path for small traces
        response = cast(
            ModelResponse,
            litellm.completion(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                api_key=self.api_key,
                api_base=self.api_base,
                max_tokens=self.max_tokens,
                tools=tools,
                tool_choice=tool_choice,
                **self._extra_params,
            ),
        )

        return self._parse_response(response, effective_criteria, messages)

    def _build_trace_digest(self, spans: Sequence[Any]) -> tuple[str, bool]:
        """
        Builds the trace digest, choosing between full inline rendering
        and structure-only mode based on estimated token count.

        Args:
            spans: The spans for this thread.

        Returns:
            Tuple of (digest_string, is_large_trace).
        """
        full_digest = judge_span_digest_formatter.format(spans)
        is_large_trace = (
            len(spans) > 0 and estimate_tokens(full_digest) > self._token_threshold
        )

        if is_large_trace:
            digest = (
                judge_span_digest_formatter.format_structure_only(spans)
                + "\n\nUse expand_trace(span_id) to see span details or grep_trace(pattern) to search across spans. Reference spans by the ID shown in brackets."
            )
        else:
            digest = full_digest

        logger.debug(
            "Trace digest built",
            extra={
                "is_large_trace": is_large_trace,
                "estimated_tokens": estimate_tokens(full_digest),
            },
        )

        return digest, is_large_trace

    def _build_progressive_discovery_tools(self) -> List[dict]:
        """
        Builds the expand_trace and grep_trace tool definitions for litellm.

        Returns:
            List of tool definition dicts for litellm function calling.
        """
        return [
            {
                "type": "function",
                "function": {
                    "name": "expand_trace",
                    "description": (
                        "Expand one or more spans to see their full details "
                        "(attributes, events, content). Use the span ID shown "
                        "in brackets in the trace skeleton."
                    ),
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "span_ids": {
                                "type": "array",
                                "items": {"type": "string"},
                                "description": "Span IDs (or 8-char prefixes) to expand",
                            },
                        },
                        "required": ["span_ids"],
                        "additionalProperties": False,
                    },
                },
            },
            {
                "type": "function",
                "function": {
                    "name": "grep_trace",
                    "description": (
                        "Search across all span attributes, events, and content "
                        "for a pattern (case-insensitive). Returns matching spans with context."
                    ),
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "pattern": {
                                "type": "string",
                                "description": "Search pattern (case-insensitive)",
                            },
                        },
                        "required": ["pattern"],
                        "additionalProperties": False,
                    },
                },
            },
        ]

    def _run_discovery_loop(
        self,
        *,
        messages: List[dict],
        tools: List[dict],
        tool_choice: Any,
        spans: Sequence[Any],
        effective_criteria: List[str],
    ) -> AgentReturnTypes:
        """
        Runs the multi-step discovery loop for large traces.

        The judge can call expand_trace/grep_trace tools multiple times before
        reaching a terminal tool (finish_test/continue_test) or hitting the
        max discovery steps limit.

        On intermediate steps, tool_choice is "required" so the judge can freely
        pick expand_trace/grep_trace. On the final step, the original tool_choice
        (which may force finish_test) is applied.

        Args:
            messages: The conversation messages so far.
            tools: The tool definitions.
            tool_choice: The tool choice constraint for the final step.
            spans: The spans for executing expand/grep tools.
            effective_criteria: The criteria to judge against.

        Returns:
            AgentReturnTypes from the terminal tool call.
        """
        terminal_tool_names = {"finish_test", "continue_test"}

        for step in range(self._max_discovery_steps):
            # Use "required" for intermediate steps so the judge can use
            # discovery tools; only apply the forced tool_choice on the
            # last allowed step.
            is_last_step = step == self._max_discovery_steps - 1
            step_tool_choice = tool_choice if is_last_step else "required"

            response = cast(
                ModelResponse,
                litellm.completion(
                    model=self.model,
                    messages=messages,
                    temperature=self.temperature,
                    api_key=self.api_key,
                    api_base=self.api_base,
                    max_tokens=self.max_tokens,
                    tools=tools,
                    tool_choice=step_tool_choice,
                    **self._extra_params,
                ),
            )

            if not hasattr(response, "choices") or len(response.choices) == 0:
                raise Exception(
                    f"Unexpected response format from LLM: {response.__repr__()}"
                )

            message = cast(Choices, response.choices[0]).message
            if not message.tool_calls:
                # No tool calls - try to parse as a response
                return self._parse_response(response, effective_criteria, messages)

            # Check for terminal tool call
            terminal_call = next(
                (tc for tc in message.tool_calls if tc.function.name in terminal_tool_names),
                None,
            )
            if terminal_call:
                return self._parse_response(response, effective_criteria, messages)

            # Execute discovery tools and add results to messages
            # Add the assistant message with tool calls
            messages.append({
                "role": "assistant",
                "content": message.content or "",
                "tool_calls": [
                    {
                        "id": tc.id,
                        "type": "function",
                        "function": {
                            "name": tc.function.name,
                            "arguments": tc.function.arguments,
                        },
                    }
                    for tc in message.tool_calls
                ],
            })

            for tc in message.tool_calls:
                tool_result = self._execute_discovery_tool(tc, spans)
                messages.append({
                    "role": "tool",
                    "tool_call_id": tc.id,
                    "content": tool_result,
                })

        # Hit max steps - force a verdict with whatever information was gathered
        return self._force_verdict(
            messages=messages,
            tools=tools,
            effective_criteria=effective_criteria,
        )

    def _force_verdict(
        self,
        *,
        messages: List[dict],
        tools: List[dict],
        effective_criteria: List[str],
    ) -> AgentReturnTypes:
        """
        Makes one final LLM call with tool_choice forced to finish_test,
        so the judge renders a verdict with whatever context it accumulated
        during discovery instead of hard-failing.
        """
        logger.warning(
            f"Progressive discovery hit max steps ({self._max_discovery_steps}), "
            "forcing verdict"
        )
        messages.append({
            "role": "user",
            "content": (
                "You have reached the maximum number of trace exploration steps. "
                "Based on the information you have gathered so far, give your final verdict now."
            ),
        })
        forced_response = cast(
            ModelResponse,
            litellm.completion(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                api_key=self.api_key,
                api_base=self.api_base,
                max_tokens=self.max_tokens,
                tools=tools,
                tool_choice={"type": "function", "function": {"name": "finish_test"}},
                **self._extra_params,
            ),
        )
        return self._parse_response(forced_response, effective_criteria, messages)

    def _execute_discovery_tool(self, tool_call: Any, spans: Sequence[Any]) -> str:
        """
        Executes an expand_trace or grep_trace tool call.

        Args:
            tool_call: The tool call from the LLM response.
            spans: The spans to operate on.

        Returns:
            The tool result string.
        """
        try:
            args = json.loads(tool_call.function.arguments)
        except json.JSONDecodeError:
            return f"Error: could not parse arguments: {tool_call.function.arguments}"

        if tool_call.function.name == "expand_trace":
            return expand_trace(
                spans,
                span_ids=args.get("span_ids", []),
            )
        elif tool_call.function.name == "grep_trace":
            return grep_trace(spans, args.get("pattern", ""))
        else:
            return f"Unknown tool: {tool_call.function.name}"

    def _parse_response(
        self,
        response: Any,
        effective_criteria: List[str],
        messages: List[dict],
    ) -> AgentReturnTypes:
        """
        Parses a litellm response into the appropriate return type.

        Handles finish_test, continue_test, and error cases.

        Args:
            response: The litellm ModelResponse.
            effective_criteria: The criteria to evaluate against.
            messages: The conversation messages (for inclusion in ScenarioResult).

        Returns:
            AgentReturnTypes: Either an empty list (continue) or ScenarioResult.
        """
        if not hasattr(response, "choices") or len(response.choices) == 0:
            raise Exception(
                f"Unexpected response format from LLM: {response.__repr__()}"
            )

        message = cast(Choices, response.choices[0]).message

        if not message.tool_calls:
            raise Exception(
                f"Invalid response from judge agent, tool calls not found: {message.__repr__()}"
            )

        # In multi-step mode, find the terminal tool call
        terminal_names = {"finish_test", "continue_test"}
        terminal_call = next(
            (tc for tc in message.tool_calls if tc.function.name in terminal_names),
            None,
        )
        tool_call = terminal_call or message.tool_calls[0]

        if tool_call.function.name == "continue_test":
            return []

        if tool_call.function.name == "finish_test":
            try:
                args = json.loads(tool_call.function.arguments)
                verdict = args.get("verdict", "inconclusive")
                reasoning = args.get("reasoning", "No reasoning provided")
                criteria_verdicts = args.get("criteria", {})

                passed_criteria = [
                    effective_criteria[idx]
                    for idx, criterion in enumerate(criteria_verdicts.values())
                    if criterion == "true"
                ]
                failed_criteria = [
                    effective_criteria[idx]
                    for idx, criterion in enumerate(criteria_verdicts.values())
                    if criterion == "false" or criterion == "inconclusive"
                ]

                return ScenarioResult(
                    success=verdict == "success" and len(failed_criteria) == 0,
                    messages=cast(Any, messages),
                    reasoning=reasoning,
                    passed_criteria=passed_criteria,
                    failed_criteria=failed_criteria,
                )
            except json.JSONDecodeError:
                raise Exception(
                    f"Failed to parse tool call arguments from judge agent: {tool_call.function.arguments}"
                )

        raise Exception(
            f"Invalid tool call from judge agent: {tool_call.function.name}"
        )

Classes

class JudgeAgent (*, criteria: List[str] | None = None, model: str | None = None, api_base: str | None = None, api_key: str | None = None, temperature: float = 0.0, max_tokens: int | None = None, system_prompt: str | None = None, span_collector: scenario._tracing.judge_span_collector.JudgeSpanCollector | None = None, token_threshold: int = 8192, max_discovery_steps: int = 10, **extra_params)

Agent that evaluates conversations against success criteria.

The JudgeAgent watches conversations in real-time and makes decisions about whether the agent under test is meeting the specified criteria. It can either allow the conversation to continue or end it with a success/failure verdict.

The judge uses function calling to make structured decisions and provides detailed reasoning for its verdicts. It evaluates each criterion independently and provides comprehensive feedback about what worked and what didn't.

Attributes

role
Always AgentRole.JUDGE for judge agents
model
LLM model identifier to use for evaluation
api_base
Optional base URL where the model is hosted
api_key
Optional API key for the model provider
temperature
Sampling temperature for evaluation consistency
max_tokens
Maximum tokens for judge reasoning
criteria
List of success criteria to evaluate against
system_prompt
Custom system prompt to override default judge behavior

Example

import scenario

# Basic judge agent with criteria
judge = scenario.JudgeAgent(
    criteria=[
        "Agent provides helpful responses",
        "Agent asks relevant follow-up questions",
        "Agent does not provide harmful information"
    ]
)

# Customized judge with specific model and behavior
strict_judge = scenario.JudgeAgent(
    model="openai/gpt-4.1-mini",
    criteria=[
        "Code examples are syntactically correct",
        "Explanations are technically accurate",
        "Security best practices are mentioned"
    ],
    temperature=0.0,  # More deterministic evaluation
    system_prompt="You are a strict technical reviewer evaluating code quality."
)

# Use in scenario
result = await scenario.run(
    name="coding assistant test",
    description="User asks for help with Python functions",
    agents=[
        coding_agent,
        scenario.UserSimulatorAgent(),
        judge
    ]
)

print(f"Passed criteria: {result.passed_criteria}")
print(f"Failed criteria: {result.failed_criteria}")

Note

  • Judge agents evaluate conversations continuously, not just at the end
  • They can end scenarios early if clear success/failure conditions are met
  • Provide detailed reasoning for their decisions
  • Support both positive criteria (things that should happen) and negative criteria (things that shouldn't)

Initialize a judge agent with evaluation criteria.

Args

criteria
List of success criteria to evaluate the conversation against. Can include both positive requirements ("Agent provides helpful responses") and negative constraints ("Agent should not provide personal information").
model
LLM model identifier (e.g., "openai/gpt-4.1-mini"). If not provided, uses the default model from global configuration.
api_base
Optional base URL where the model is hosted. If not provided, uses the base URL from global configuration.
api_key
API key for the model provider. If not provided, uses the key from global configuration or environment.
temperature
Sampling temperature for evaluation (0.0-1.0). Lower values (0.0-0.2) recommended for consistent evaluation.
max_tokens
Maximum number of tokens for judge reasoning and explanations.
system_prompt
Custom system prompt to override default judge behavior. Use this to create specialized evaluation perspectives.
span_collector
Optional span collector for telemetry. Defaults to global singleton.
token_threshold
Estimated token count above which traces switch to structure-only rendering with progressive discovery tools. Defaults to 8192.
max_discovery_steps
Maximum number of expand/grep tool calls the judge can make before being forced to return a verdict. Defaults to 10.

Raises

Exception
If no model is configured either in parameters or global config

Example

# Customer service judge
cs_judge = JudgeAgent(
    criteria=[
        "Agent replies with the refund policy",
        "Agent offers next steps for the customer",
    ],
    temperature=0.1
)

# Technical accuracy judge
tech_judge = JudgeAgent(
    criteria=[
        "Agent adds a code review pointing out the code compilation errors",
        "Agent adds a code review about the missing security headers"
    ],
    system_prompt="You are a senior software engineer reviewing code for production use."
)

Note

Advanced usage: Additional parameters can be passed as keyword arguments (e.g., headers, timeout, client) for specialized configurations. These are experimental and may not be supported in future versions.

Expand source code
class JudgeAgent(AgentAdapter):
    """
    Agent that evaluates conversations against success criteria.

    The JudgeAgent watches conversations in real-time and makes decisions about
    whether the agent under test is meeting the specified criteria. It can either
    allow the conversation to continue or end it with a success/failure verdict.

    The judge uses function calling to make structured decisions and provides
    detailed reasoning for its verdicts. It evaluates each criterion independently
    and provides comprehensive feedback about what worked and what didn't.

    Attributes:
        role: Always AgentRole.JUDGE for judge agents
        model: LLM model identifier to use for evaluation
        api_base: Optional base URL where the model is hosted
        api_key: Optional API key for the model provider
        temperature: Sampling temperature for evaluation consistency
        max_tokens: Maximum tokens for judge reasoning
        criteria: List of success criteria to evaluate against
        system_prompt: Custom system prompt to override default judge behavior

    Example:
        ```
        import scenario

        # Basic judge agent with criteria
        judge = scenario.JudgeAgent(
            criteria=[
                "Agent provides helpful responses",
                "Agent asks relevant follow-up questions",
                "Agent does not provide harmful information"
            ]
        )

        # Customized judge with specific model and behavior
        strict_judge = scenario.JudgeAgent(
            model="openai/gpt-4.1-mini",
            criteria=[
                "Code examples are syntactically correct",
                "Explanations are technically accurate",
                "Security best practices are mentioned"
            ],
            temperature=0.0,  # More deterministic evaluation
            system_prompt="You are a strict technical reviewer evaluating code quality."
        )

        # Use in scenario
        result = await scenario.run(
            name="coding assistant test",
            description="User asks for help with Python functions",
            agents=[
                coding_agent,
                scenario.UserSimulatorAgent(),
                judge
            ]
        )

        print(f"Passed criteria: {result.passed_criteria}")
        print(f"Failed criteria: {result.failed_criteria}")
        ```

    Note:
        - Judge agents evaluate conversations continuously, not just at the end
        - They can end scenarios early if clear success/failure conditions are met
        - Provide detailed reasoning for their decisions
        - Support both positive criteria (things that should happen) and negative criteria (things that shouldn't)
    """

    role = AgentRole.JUDGE

    model: str
    api_base: Optional[str]
    api_key: Optional[str]
    temperature: float
    max_tokens: Optional[int]
    criteria: List[str]
    system_prompt: Optional[str]
    _extra_params: dict
    _span_collector: JudgeSpanCollector
    _token_threshold: int
    _max_discovery_steps: int

    def __init__(
        self,
        *,
        criteria: Optional[List[str]] = None,
        model: Optional[str] = None,
        api_base: Optional[str] = None,
        api_key: Optional[str] = None,
        temperature: float = 0.0,
        max_tokens: Optional[int] = None,
        system_prompt: Optional[str] = None,
        span_collector: Optional[JudgeSpanCollector] = None,
        token_threshold: int = DEFAULT_TOKEN_THRESHOLD,
        max_discovery_steps: int = 10,
        **extra_params,
    ):
        """
        Initialize a judge agent with evaluation criteria.

        Args:
            criteria: List of success criteria to evaluate the conversation against.
                     Can include both positive requirements ("Agent provides helpful responses")
                     and negative constraints ("Agent should not provide personal information").
            model: LLM model identifier (e.g., "openai/gpt-4.1-mini").
                   If not provided, uses the default model from global configuration.
            api_base: Optional base URL where the model is hosted. If not provided,
                      uses the base URL from global configuration.
            api_key: API key for the model provider. If not provided,
                     uses the key from global configuration or environment.
            temperature: Sampling temperature for evaluation (0.0-1.0).
                        Lower values (0.0-0.2) recommended for consistent evaluation.
            max_tokens: Maximum number of tokens for judge reasoning and explanations.
            system_prompt: Custom system prompt to override default judge behavior.
                          Use this to create specialized evaluation perspectives.
            span_collector: Optional span collector for telemetry. Defaults to global singleton.
            token_threshold: Estimated token count above which traces switch to
                            structure-only rendering with progressive discovery tools.
                            Defaults to 8192.
            max_discovery_steps: Maximum number of expand/grep tool calls the judge
                                can make before being forced to return a verdict.
                                Defaults to 10.

        Raises:
            Exception: If no model is configured either in parameters or global config

        Example:
            ```
            # Customer service judge
            cs_judge = JudgeAgent(
                criteria=[
                    "Agent replies with the refund policy",
                    "Agent offers next steps for the customer",
                ],
                temperature=0.1
            )

            # Technical accuracy judge
            tech_judge = JudgeAgent(
                criteria=[
                    "Agent adds a code review pointing out the code compilation errors",
                    "Agent adds a code review about the missing security headers"
                ],
                system_prompt="You are a senior software engineer reviewing code for production use."
            )
            ```

        Note:
            Advanced usage: Additional parameters can be passed as keyword arguments
            (e.g., headers, timeout, client) for specialized configurations. These are
            experimental and may not be supported in future versions.
        """
        self.criteria = criteria or []
        self.api_base = api_base
        self.api_key = api_key
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.system_prompt = system_prompt
        self._span_collector = span_collector or judge_span_collector
        self._token_threshold = token_threshold
        self._max_discovery_steps = max_discovery_steps

        if model:
            self.model = model

        if ScenarioConfig.default_config is not None and isinstance(
            ScenarioConfig.default_config.default_model, str
        ):
            self.model = model or ScenarioConfig.default_config.default_model
            self._extra_params = extra_params
        elif ScenarioConfig.default_config is not None and isinstance(
            ScenarioConfig.default_config.default_model, ModelConfig
        ):
            self.model = model or ScenarioConfig.default_config.default_model.model
            self.api_base = (
                api_base or ScenarioConfig.default_config.default_model.api_base
            )
            self.api_key = (
                api_key or ScenarioConfig.default_config.default_model.api_key
            )
            self.temperature = (
                temperature or ScenarioConfig.default_config.default_model.temperature
            )
            self.max_tokens = (
                max_tokens or ScenarioConfig.default_config.default_model.max_tokens
            )
            # Extract extra params from ModelConfig
            config_dict = ScenarioConfig.default_config.default_model.model_dump(
                exclude_none=True
            )
            config_dict.pop("model", None)
            config_dict.pop("api_base", None)
            config_dict.pop("api_key", None)
            config_dict.pop("temperature", None)
            config_dict.pop("max_tokens", None)
            # Merge: config extras < agent extra_params
            self._extra_params = {**config_dict, **extra_params}
        else:
            self._extra_params = extra_params

        if not hasattr(self, "model"):
            raise Exception(agent_not_configured_error_message("JudgeAgent"))

    @scenario_cache()
    async def call(
        self,
        input: AgentInput,
    ) -> AgentReturnTypes:
        """
        Evaluate the current conversation state against the configured criteria.

        This method analyzes the conversation history and determines whether the
        scenario should continue or end with a verdict. It uses function calling
        to make structured decisions and provides detailed reasoning.

        Args:
            input: AgentInput containing conversation history and scenario context

        Returns:
            AgentReturnTypes: Either an empty list (continue scenario) or a
                            ScenarioResult (end scenario with verdict)

        Raises:
            Exception: If the judge cannot make a valid decision or if there's an
                      error in the evaluation process

        Note:
            - Returns empty list [] to continue the scenario
            - Returns ScenarioResult to end with success/failure
            - Provides detailed reasoning for all decisions
            - Evaluates each criterion independently
            - Can end scenarios early if clear violation or success is detected
        """

        scenario = input.scenario_state
        effective_criteria = (
            input.judgment_request.criteria
            if input.judgment_request and input.judgment_request.criteria is not None
            else self.criteria
        )

        # Build transcript and traces digest
        transcript = JudgeUtils.build_transcript_from_messages(input.messages)
        spans = self._span_collector.get_spans_for_thread(input.thread_id)
        digest, is_large_trace = self._build_trace_digest(spans)

        logger.debug(f"OpenTelemetry traces built: {digest[:200]}...")

        content_for_judge = f"""
<transcript>
{transcript}
</transcript>
<opentelemetry_traces>
{digest}
</opentelemetry_traces>
"""

        criteria_str = "\n".join(
            [f"{idx + 1}. {criterion}" for idx, criterion in enumerate(effective_criteria)]
        )

        messages: List[dict] = [
            {
                "role": "system",
                "content": self.system_prompt
                or f"""
<role>
You are an LLM as a judge watching a simulated conversation as it plays out live to determine if the agent under test meets the criteria or not.
</role>

<goal>
Your goal is to determine if you already have enough information to make a verdict of the scenario below, or if the conversation should continue for longer.
If you do have enough information, use the finish_test tool to determine if all the criteria have been met, if not, use the continue_test tool to let the next step play out.
</goal>

<scenario>
{scenario.description}
</scenario>

<criteria>
{criteria_str}
</criteria>

<rules>
- Be strict, do not let the conversation continue if the agent already broke one of the "do not" or "should not" criterias.
- DO NOT make any judgment calls that are not explicitly listed in the success or failure criteria, withhold judgement if necessary
</rules>
""",
            },
            {"role": "user", "content": content_for_judge},
        ]

        max_turns = input.scenario_state.config.max_turns or 10
        is_last_message = (
            input.scenario_state.current_turn >= max_turns - 1
        )

        if is_last_message:
            messages.append(
                {
                    "role": "user",
                    "content": """
System:

<finish_test>
This is the last message, conversation has reached the maximum number of turns, give your final verdict,
if you don't have enough information to make a verdict, say inconclusive with max turns reached.
</finish_test>
""",
                }
            )

        # Define the tools
        criteria_names = [
            re.sub(
                r"[^a-zA-Z0-9]",
                "_",
                criterion.replace(" ", "_").replace("'", "").lower(),
            )[:70]
            for criterion in effective_criteria
        ]
        tools: List[dict] = [
            {
                "type": "function",
                "function": {
                    "name": "continue_test",
                    "description": "Continue the test with the next step",
                    "strict": True,
                    "parameters": {
                        "type": "object",
                        "properties": {},
                        "required": [],
                        "additionalProperties": False,
                    },
                },
            },
            {
                "type": "function",
                "function": {
                    "name": "finish_test",
                    "description": "Complete the test with a final verdict",
                    "strict": True,
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "criteria": {
                                "type": "object",
                                "properties": {
                                    criteria_names[idx]: {
                                        "type": "string",
                                        "enum": ["true", "false", "inconclusive"],
                                        "description": criterion,
                                    }
                                    for idx, criterion in enumerate(effective_criteria)
                                },
                                "required": criteria_names,
                                "additionalProperties": False,
                                "description": "Strict verdict for each criterion",
                            },
                            "reasoning": {
                                "type": "string",
                                "description": "Explanation of what the final verdict should be",
                            },
                            "verdict": {
                                "type": "string",
                                "enum": ["success", "failure", "inconclusive"],
                                "description": "The final verdict of the test",
                            },
                        },
                        "required": ["criteria", "reasoning", "verdict"],
                        "additionalProperties": False,
                    },
                },
            },
        ]

        if is_large_trace:
            tools = self._build_progressive_discovery_tools() + tools

        enforce_judgment = input.judgment_request is not None
        has_criteria = len(effective_criteria) > 0

        if enforce_judgment and not has_criteria:
            return ScenarioResult(
                success=False,
                messages=[],
                reasoning="TestingAgent was called as a judge, but it has no criteria to judge against",
            )

        tool_choice: Any = (
            {"type": "function", "function": {"name": "finish_test"}}
            if (is_last_message or enforce_judgment) and has_criteria
            else "required"
        )

        # Multi-step discovery loop for large traces
        if is_large_trace:
            return self._run_discovery_loop(
                messages=messages,
                tools=tools,
                tool_choice=tool_choice,
                spans=spans,
                effective_criteria=effective_criteria,
            )

        # Standard single-call path for small traces
        response = cast(
            ModelResponse,
            litellm.completion(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                api_key=self.api_key,
                api_base=self.api_base,
                max_tokens=self.max_tokens,
                tools=tools,
                tool_choice=tool_choice,
                **self._extra_params,
            ),
        )

        return self._parse_response(response, effective_criteria, messages)

    def _build_trace_digest(self, spans: Sequence[Any]) -> tuple[str, bool]:
        """
        Builds the trace digest, choosing between full inline rendering
        and structure-only mode based on estimated token count.

        Args:
            spans: The spans for this thread.

        Returns:
            Tuple of (digest_string, is_large_trace).
        """
        full_digest = judge_span_digest_formatter.format(spans)
        is_large_trace = (
            len(spans) > 0 and estimate_tokens(full_digest) > self._token_threshold
        )

        if is_large_trace:
            digest = (
                judge_span_digest_formatter.format_structure_only(spans)
                + "\n\nUse expand_trace(span_id) to see span details or grep_trace(pattern) to search across spans. Reference spans by the ID shown in brackets."
            )
        else:
            digest = full_digest

        logger.debug(
            "Trace digest built",
            extra={
                "is_large_trace": is_large_trace,
                "estimated_tokens": estimate_tokens(full_digest),
            },
        )

        return digest, is_large_trace

    def _build_progressive_discovery_tools(self) -> List[dict]:
        """
        Builds the expand_trace and grep_trace tool definitions for litellm.

        Returns:
            List of tool definition dicts for litellm function calling.
        """
        return [
            {
                "type": "function",
                "function": {
                    "name": "expand_trace",
                    "description": (
                        "Expand one or more spans to see their full details "
                        "(attributes, events, content). Use the span ID shown "
                        "in brackets in the trace skeleton."
                    ),
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "span_ids": {
                                "type": "array",
                                "items": {"type": "string"},
                                "description": "Span IDs (or 8-char prefixes) to expand",
                            },
                        },
                        "required": ["span_ids"],
                        "additionalProperties": False,
                    },
                },
            },
            {
                "type": "function",
                "function": {
                    "name": "grep_trace",
                    "description": (
                        "Search across all span attributes, events, and content "
                        "for a pattern (case-insensitive). Returns matching spans with context."
                    ),
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "pattern": {
                                "type": "string",
                                "description": "Search pattern (case-insensitive)",
                            },
                        },
                        "required": ["pattern"],
                        "additionalProperties": False,
                    },
                },
            },
        ]

    def _run_discovery_loop(
        self,
        *,
        messages: List[dict],
        tools: List[dict],
        tool_choice: Any,
        spans: Sequence[Any],
        effective_criteria: List[str],
    ) -> AgentReturnTypes:
        """
        Runs the multi-step discovery loop for large traces.

        The judge can call expand_trace/grep_trace tools multiple times before
        reaching a terminal tool (finish_test/continue_test) or hitting the
        max discovery steps limit.

        On intermediate steps, tool_choice is "required" so the judge can freely
        pick expand_trace/grep_trace. On the final step, the original tool_choice
        (which may force finish_test) is applied.

        Args:
            messages: The conversation messages so far.
            tools: The tool definitions.
            tool_choice: The tool choice constraint for the final step.
            spans: The spans for executing expand/grep tools.
            effective_criteria: The criteria to judge against.

        Returns:
            AgentReturnTypes from the terminal tool call.
        """
        terminal_tool_names = {"finish_test", "continue_test"}

        for step in range(self._max_discovery_steps):
            # Use "required" for intermediate steps so the judge can use
            # discovery tools; only apply the forced tool_choice on the
            # last allowed step.
            is_last_step = step == self._max_discovery_steps - 1
            step_tool_choice = tool_choice if is_last_step else "required"

            response = cast(
                ModelResponse,
                litellm.completion(
                    model=self.model,
                    messages=messages,
                    temperature=self.temperature,
                    api_key=self.api_key,
                    api_base=self.api_base,
                    max_tokens=self.max_tokens,
                    tools=tools,
                    tool_choice=step_tool_choice,
                    **self._extra_params,
                ),
            )

            if not hasattr(response, "choices") or len(response.choices) == 0:
                raise Exception(
                    f"Unexpected response format from LLM: {response.__repr__()}"
                )

            message = cast(Choices, response.choices[0]).message
            if not message.tool_calls:
                # No tool calls - try to parse as a response
                return self._parse_response(response, effective_criteria, messages)

            # Check for terminal tool call
            terminal_call = next(
                (tc for tc in message.tool_calls if tc.function.name in terminal_tool_names),
                None,
            )
            if terminal_call:
                return self._parse_response(response, effective_criteria, messages)

            # Execute discovery tools and add results to messages
            # Add the assistant message with tool calls
            messages.append({
                "role": "assistant",
                "content": message.content or "",
                "tool_calls": [
                    {
                        "id": tc.id,
                        "type": "function",
                        "function": {
                            "name": tc.function.name,
                            "arguments": tc.function.arguments,
                        },
                    }
                    for tc in message.tool_calls
                ],
            })

            for tc in message.tool_calls:
                tool_result = self._execute_discovery_tool(tc, spans)
                messages.append({
                    "role": "tool",
                    "tool_call_id": tc.id,
                    "content": tool_result,
                })

        # Hit max steps - force a verdict with whatever information was gathered
        return self._force_verdict(
            messages=messages,
            tools=tools,
            effective_criteria=effective_criteria,
        )

    def _force_verdict(
        self,
        *,
        messages: List[dict],
        tools: List[dict],
        effective_criteria: List[str],
    ) -> AgentReturnTypes:
        """
        Makes one final LLM call with tool_choice forced to finish_test,
        so the judge renders a verdict with whatever context it accumulated
        during discovery instead of hard-failing.
        """
        logger.warning(
            f"Progressive discovery hit max steps ({self._max_discovery_steps}), "
            "forcing verdict"
        )
        messages.append({
            "role": "user",
            "content": (
                "You have reached the maximum number of trace exploration steps. "
                "Based on the information you have gathered so far, give your final verdict now."
            ),
        })
        forced_response = cast(
            ModelResponse,
            litellm.completion(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                api_key=self.api_key,
                api_base=self.api_base,
                max_tokens=self.max_tokens,
                tools=tools,
                tool_choice={"type": "function", "function": {"name": "finish_test"}},
                **self._extra_params,
            ),
        )
        return self._parse_response(forced_response, effective_criteria, messages)

    def _execute_discovery_tool(self, tool_call: Any, spans: Sequence[Any]) -> str:
        """
        Executes an expand_trace or grep_trace tool call.

        Args:
            tool_call: The tool call from the LLM response.
            spans: The spans to operate on.

        Returns:
            The tool result string.
        """
        try:
            args = json.loads(tool_call.function.arguments)
        except json.JSONDecodeError:
            return f"Error: could not parse arguments: {tool_call.function.arguments}"

        if tool_call.function.name == "expand_trace":
            return expand_trace(
                spans,
                span_ids=args.get("span_ids", []),
            )
        elif tool_call.function.name == "grep_trace":
            return grep_trace(spans, args.get("pattern", ""))
        else:
            return f"Unknown tool: {tool_call.function.name}"

    def _parse_response(
        self,
        response: Any,
        effective_criteria: List[str],
        messages: List[dict],
    ) -> AgentReturnTypes:
        """
        Parses a litellm response into the appropriate return type.

        Handles finish_test, continue_test, and error cases.

        Args:
            response: The litellm ModelResponse.
            effective_criteria: The criteria to evaluate against.
            messages: The conversation messages (for inclusion in ScenarioResult).

        Returns:
            AgentReturnTypes: Either an empty list (continue) or ScenarioResult.
        """
        if not hasattr(response, "choices") or len(response.choices) == 0:
            raise Exception(
                f"Unexpected response format from LLM: {response.__repr__()}"
            )

        message = cast(Choices, response.choices[0]).message

        if not message.tool_calls:
            raise Exception(
                f"Invalid response from judge agent, tool calls not found: {message.__repr__()}"
            )

        # In multi-step mode, find the terminal tool call
        terminal_names = {"finish_test", "continue_test"}
        terminal_call = next(
            (tc for tc in message.tool_calls if tc.function.name in terminal_names),
            None,
        )
        tool_call = terminal_call or message.tool_calls[0]

        if tool_call.function.name == "continue_test":
            return []

        if tool_call.function.name == "finish_test":
            try:
                args = json.loads(tool_call.function.arguments)
                verdict = args.get("verdict", "inconclusive")
                reasoning = args.get("reasoning", "No reasoning provided")
                criteria_verdicts = args.get("criteria", {})

                passed_criteria = [
                    effective_criteria[idx]
                    for idx, criterion in enumerate(criteria_verdicts.values())
                    if criterion == "true"
                ]
                failed_criteria = [
                    effective_criteria[idx]
                    for idx, criterion in enumerate(criteria_verdicts.values())
                    if criterion == "false" or criterion == "inconclusive"
                ]

                return ScenarioResult(
                    success=verdict == "success" and len(failed_criteria) == 0,
                    messages=cast(Any, messages),
                    reasoning=reasoning,
                    passed_criteria=passed_criteria,
                    failed_criteria=failed_criteria,
                )
            except json.JSONDecodeError:
                raise Exception(
                    f"Failed to parse tool call arguments from judge agent: {tool_call.function.arguments}"
                )

        raise Exception(
            f"Invalid tool call from judge agent: {tool_call.function.name}"
        )

Ancestors

Class variables

var api_base : str | None
var api_key : str | None
var criteria : List[str]
var max_tokens : int | None
var model : str
var role : ClassVar[AgentRole]
var system_prompt : str | None
var temperature : float

Methods

async def call(self, input: AgentInput) ‑> str | openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam | List[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam] | ScenarioResult

Evaluate the current conversation state against the configured criteria.

This method analyzes the conversation history and determines whether the scenario should continue or end with a verdict. It uses function calling to make structured decisions and provides detailed reasoning.

Args

input
AgentInput containing conversation history and scenario context

Returns

AgentReturnTypes
Either an empty list (continue scenario) or a ScenarioResult (end scenario with verdict)

Raises

Exception
If the judge cannot make a valid decision or if there's an error in the evaluation process

Note

  • Returns empty list [] to continue the scenario
  • Returns ScenarioResult to end with success/failure
  • Provides detailed reasoning for all decisions
  • Evaluates each criterion independently
  • Can end scenarios early if clear violation or success is detected
Expand source code
    @scenario_cache()
    async def call(
        self,
        input: AgentInput,
    ) -> AgentReturnTypes:
        """
        Evaluate the current conversation state against the configured criteria.

        This method analyzes the conversation history and determines whether the
        scenario should continue or end with a verdict. It uses function calling
        to make structured decisions and provides detailed reasoning.

        Args:
            input: AgentInput containing conversation history and scenario context

        Returns:
            AgentReturnTypes: Either an empty list (continue scenario) or a
                            ScenarioResult (end scenario with verdict)

        Raises:
            Exception: If the judge cannot make a valid decision or if there's an
                      error in the evaluation process

        Note:
            - Returns empty list [] to continue the scenario
            - Returns ScenarioResult to end with success/failure
            - Provides detailed reasoning for all decisions
            - Evaluates each criterion independently
            - Can end scenarios early if clear violation or success is detected
        """

        scenario = input.scenario_state
        effective_criteria = (
            input.judgment_request.criteria
            if input.judgment_request and input.judgment_request.criteria is not None
            else self.criteria
        )

        # Build transcript and traces digest
        transcript = JudgeUtils.build_transcript_from_messages(input.messages)
        spans = self._span_collector.get_spans_for_thread(input.thread_id)
        digest, is_large_trace = self._build_trace_digest(spans)

        logger.debug(f"OpenTelemetry traces built: {digest[:200]}...")

        content_for_judge = f"""
<transcript>
{transcript}
</transcript>
<opentelemetry_traces>
{digest}
</opentelemetry_traces>
"""

        criteria_str = "\n".join(
            [f"{idx + 1}. {criterion}" for idx, criterion in enumerate(effective_criteria)]
        )

        messages: List[dict] = [
            {
                "role": "system",
                "content": self.system_prompt
                or f"""
<role>
You are an LLM as a judge watching a simulated conversation as it plays out live to determine if the agent under test meets the criteria or not.
</role>

<goal>
Your goal is to determine if you already have enough information to make a verdict of the scenario below, or if the conversation should continue for longer.
If you do have enough information, use the finish_test tool to determine if all the criteria have been met, if not, use the continue_test tool to let the next step play out.
</goal>

<scenario>
{scenario.description}
</scenario>

<criteria>
{criteria_str}
</criteria>

<rules>
- Be strict, do not let the conversation continue if the agent already broke one of the "do not" or "should not" criterias.
- DO NOT make any judgment calls that are not explicitly listed in the success or failure criteria, withhold judgement if necessary
</rules>
""",
            },
            {"role": "user", "content": content_for_judge},
        ]

        max_turns = input.scenario_state.config.max_turns or 10
        is_last_message = (
            input.scenario_state.current_turn >= max_turns - 1
        )

        if is_last_message:
            messages.append(
                {
                    "role": "user",
                    "content": """
System:

<finish_test>
This is the last message, conversation has reached the maximum number of turns, give your final verdict,
if you don't have enough information to make a verdict, say inconclusive with max turns reached.
</finish_test>
""",
                }
            )

        # Define the tools
        criteria_names = [
            re.sub(
                r"[^a-zA-Z0-9]",
                "_",
                criterion.replace(" ", "_").replace("'", "").lower(),
            )[:70]
            for criterion in effective_criteria
        ]
        tools: List[dict] = [
            {
                "type": "function",
                "function": {
                    "name": "continue_test",
                    "description": "Continue the test with the next step",
                    "strict": True,
                    "parameters": {
                        "type": "object",
                        "properties": {},
                        "required": [],
                        "additionalProperties": False,
                    },
                },
            },
            {
                "type": "function",
                "function": {
                    "name": "finish_test",
                    "description": "Complete the test with a final verdict",
                    "strict": True,
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "criteria": {
                                "type": "object",
                                "properties": {
                                    criteria_names[idx]: {
                                        "type": "string",
                                        "enum": ["true", "false", "inconclusive"],
                                        "description": criterion,
                                    }
                                    for idx, criterion in enumerate(effective_criteria)
                                },
                                "required": criteria_names,
                                "additionalProperties": False,
                                "description": "Strict verdict for each criterion",
                            },
                            "reasoning": {
                                "type": "string",
                                "description": "Explanation of what the final verdict should be",
                            },
                            "verdict": {
                                "type": "string",
                                "enum": ["success", "failure", "inconclusive"],
                                "description": "The final verdict of the test",
                            },
                        },
                        "required": ["criteria", "reasoning", "verdict"],
                        "additionalProperties": False,
                    },
                },
            },
        ]

        if is_large_trace:
            tools = self._build_progressive_discovery_tools() + tools

        enforce_judgment = input.judgment_request is not None
        has_criteria = len(effective_criteria) > 0

        if enforce_judgment and not has_criteria:
            return ScenarioResult(
                success=False,
                messages=[],
                reasoning="TestingAgent was called as a judge, but it has no criteria to judge against",
            )

        tool_choice: Any = (
            {"type": "function", "function": {"name": "finish_test"}}
            if (is_last_message or enforce_judgment) and has_criteria
            else "required"
        )

        # Multi-step discovery loop for large traces
        if is_large_trace:
            return self._run_discovery_loop(
                messages=messages,
                tools=tools,
                tool_choice=tool_choice,
                spans=spans,
                effective_criteria=effective_criteria,
            )

        # Standard single-call path for small traces
        response = cast(
            ModelResponse,
            litellm.completion(
                model=self.model,
                messages=messages,
                temperature=self.temperature,
                api_key=self.api_key,
                api_base=self.api_base,
                max_tokens=self.max_tokens,
                tools=tools,
                tool_choice=tool_choice,
                **self._extra_params,
            ),
        )

        return self._parse_response(response, effective_criteria, messages)