Module scenario.judge_agent
Use the Judge Agent module in Scenario to evaluate conversation quality and LLM reasoning during AI agent testing.
This module provides the JudgeAgent class, which evaluates ongoing conversations between users and agents to determine if success criteria are met. The judge makes real-time decisions about whether scenarios should continue or end with success/failure verdicts.
Expand source code
"""
Use the Judge Agent module in Scenario to evaluate conversation quality and LLM reasoning during AI agent testing.
This module provides the JudgeAgent class, which evaluates ongoing conversations
between users and agents to determine if success criteria are met. The judge
makes real-time decisions about whether scenarios should continue or end with
success/failure verdicts.
"""
import json
import logging
import re
from typing import Any, List, Optional, Sequence, cast
import litellm
from litellm import Choices
from litellm.files.main import ModelResponse
from scenario.cache import scenario_cache
from scenario.agent_adapter import AgentAdapter
from scenario.config import ModelConfig, ScenarioConfig
from ._error_messages import agent_not_configured_error_message
from ._judge import JudgeUtils, judge_span_digest_formatter
from ._judge.estimate_tokens import estimate_tokens, DEFAULT_TOKEN_THRESHOLD
from ._judge.trace_tools import expand_trace, grep_trace
from ._tracing import judge_span_collector, JudgeSpanCollector
from .types import AgentInput, AgentReturnTypes, AgentRole, ScenarioResult
logger = logging.getLogger("scenario")
class JudgeAgent(AgentAdapter):
"""
Agent that evaluates conversations against success criteria.
The JudgeAgent watches conversations in real-time and makes decisions about
whether the agent under test is meeting the specified criteria. It can either
allow the conversation to continue or end it with a success/failure verdict.
The judge uses function calling to make structured decisions and provides
detailed reasoning for its verdicts. It evaluates each criterion independently
and provides comprehensive feedback about what worked and what didn't.
Attributes:
role: Always AgentRole.JUDGE for judge agents
model: LLM model identifier to use for evaluation
api_base: Optional base URL where the model is hosted
api_key: Optional API key for the model provider
temperature: Sampling temperature for evaluation consistency
max_tokens: Maximum tokens for judge reasoning
criteria: List of success criteria to evaluate against
system_prompt: Custom system prompt to override default judge behavior
Example:
```
import scenario
# Basic judge agent with criteria
judge = scenario.JudgeAgent(
criteria=[
"Agent provides helpful responses",
"Agent asks relevant follow-up questions",
"Agent does not provide harmful information"
]
)
# Customized judge with specific model and behavior
strict_judge = scenario.JudgeAgent(
model="openai/gpt-4.1-mini",
criteria=[
"Code examples are syntactically correct",
"Explanations are technically accurate",
"Security best practices are mentioned"
],
temperature=0.0, # More deterministic evaluation
system_prompt="You are a strict technical reviewer evaluating code quality."
)
# Use in scenario
result = await scenario.run(
name="coding assistant test",
description="User asks for help with Python functions",
agents=[
coding_agent,
scenario.UserSimulatorAgent(),
judge
]
)
print(f"Passed criteria: {result.passed_criteria}")
print(f"Failed criteria: {result.failed_criteria}")
```
Note:
- Judge agents evaluate conversations continuously, not just at the end
- They can end scenarios early if clear success/failure conditions are met
- Provide detailed reasoning for their decisions
- Support both positive criteria (things that should happen) and negative criteria (things that shouldn't)
"""
role = AgentRole.JUDGE
model: str
api_base: Optional[str]
api_key: Optional[str]
temperature: float
max_tokens: Optional[int]
criteria: List[str]
system_prompt: Optional[str]
_extra_params: dict
_span_collector: JudgeSpanCollector
_token_threshold: int
_max_discovery_steps: int
def __init__(
self,
*,
criteria: Optional[List[str]] = None,
model: Optional[str] = None,
api_base: Optional[str] = None,
api_key: Optional[str] = None,
temperature: float = 0.0,
max_tokens: Optional[int] = None,
system_prompt: Optional[str] = None,
span_collector: Optional[JudgeSpanCollector] = None,
token_threshold: int = DEFAULT_TOKEN_THRESHOLD,
max_discovery_steps: int = 10,
**extra_params,
):
"""
Initialize a judge agent with evaluation criteria.
Args:
criteria: List of success criteria to evaluate the conversation against.
Can include both positive requirements ("Agent provides helpful responses")
and negative constraints ("Agent should not provide personal information").
model: LLM model identifier (e.g., "openai/gpt-4.1-mini").
If not provided, uses the default model from global configuration.
api_base: Optional base URL where the model is hosted. If not provided,
uses the base URL from global configuration.
api_key: API key for the model provider. If not provided,
uses the key from global configuration or environment.
temperature: Sampling temperature for evaluation (0.0-1.0).
Lower values (0.0-0.2) recommended for consistent evaluation.
max_tokens: Maximum number of tokens for judge reasoning and explanations.
system_prompt: Custom system prompt to override default judge behavior.
Use this to create specialized evaluation perspectives.
span_collector: Optional span collector for telemetry. Defaults to global singleton.
token_threshold: Estimated token count above which traces switch to
structure-only rendering with progressive discovery tools.
Defaults to 8192.
max_discovery_steps: Maximum number of expand/grep tool calls the judge
can make before being forced to return a verdict.
Defaults to 10.
Raises:
Exception: If no model is configured either in parameters or global config
Example:
```
# Customer service judge
cs_judge = JudgeAgent(
criteria=[
"Agent replies with the refund policy",
"Agent offers next steps for the customer",
],
temperature=0.1
)
# Technical accuracy judge
tech_judge = JudgeAgent(
criteria=[
"Agent adds a code review pointing out the code compilation errors",
"Agent adds a code review about the missing security headers"
],
system_prompt="You are a senior software engineer reviewing code for production use."
)
```
Note:
Advanced usage: Additional parameters can be passed as keyword arguments
(e.g., headers, timeout, client) for specialized configurations. These are
experimental and may not be supported in future versions.
"""
self.criteria = criteria or []
self.api_base = api_base
self.api_key = api_key
self.temperature = temperature
self.max_tokens = max_tokens
self.system_prompt = system_prompt
self._span_collector = span_collector or judge_span_collector
self._token_threshold = token_threshold
self._max_discovery_steps = max_discovery_steps
if model:
self.model = model
if ScenarioConfig.default_config is not None and isinstance(
ScenarioConfig.default_config.default_model, str
):
self.model = model or ScenarioConfig.default_config.default_model
self._extra_params = extra_params
elif ScenarioConfig.default_config is not None and isinstance(
ScenarioConfig.default_config.default_model, ModelConfig
):
self.model = model or ScenarioConfig.default_config.default_model.model
self.api_base = (
api_base or ScenarioConfig.default_config.default_model.api_base
)
self.api_key = (
api_key or ScenarioConfig.default_config.default_model.api_key
)
self.temperature = (
temperature or ScenarioConfig.default_config.default_model.temperature
)
self.max_tokens = (
max_tokens or ScenarioConfig.default_config.default_model.max_tokens
)
# Extract extra params from ModelConfig
config_dict = ScenarioConfig.default_config.default_model.model_dump(
exclude_none=True
)
config_dict.pop("model", None)
config_dict.pop("api_base", None)
config_dict.pop("api_key", None)
config_dict.pop("temperature", None)
config_dict.pop("max_tokens", None)
# Merge: config extras < agent extra_params
self._extra_params = {**config_dict, **extra_params}
else:
self._extra_params = extra_params
if not hasattr(self, "model"):
raise Exception(agent_not_configured_error_message("JudgeAgent"))
@scenario_cache()
async def call(
self,
input: AgentInput,
) -> AgentReturnTypes:
"""
Evaluate the current conversation state against the configured criteria.
This method analyzes the conversation history and determines whether the
scenario should continue or end with a verdict. It uses function calling
to make structured decisions and provides detailed reasoning.
Args:
input: AgentInput containing conversation history and scenario context
Returns:
AgentReturnTypes: Either an empty list (continue scenario) or a
ScenarioResult (end scenario with verdict)
Raises:
Exception: If the judge cannot make a valid decision or if there's an
error in the evaluation process
Note:
- Returns empty list [] to continue the scenario
- Returns ScenarioResult to end with success/failure
- Provides detailed reasoning for all decisions
- Evaluates each criterion independently
- Can end scenarios early if clear violation or success is detected
"""
scenario = input.scenario_state
effective_criteria = (
input.judgment_request.criteria
if input.judgment_request and input.judgment_request.criteria is not None
else self.criteria
)
# Build transcript and traces digest
transcript = JudgeUtils.build_transcript_from_messages(input.messages)
spans = self._span_collector.get_spans_for_thread(input.thread_id)
digest, is_large_trace = self._build_trace_digest(spans)
logger.debug(f"OpenTelemetry traces built: {digest[:200]}...")
content_for_judge = f"""
<transcript>
{transcript}
</transcript>
<opentelemetry_traces>
{digest}
</opentelemetry_traces>
"""
criteria_str = "\n".join(
[f"{idx + 1}. {criterion}" for idx, criterion in enumerate(effective_criteria)]
)
messages: List[dict] = [
{
"role": "system",
"content": self.system_prompt
or f"""
<role>
You are an LLM as a judge watching a simulated conversation as it plays out live to determine if the agent under test meets the criteria or not.
</role>
<goal>
Your goal is to determine if you already have enough information to make a verdict of the scenario below, or if the conversation should continue for longer.
If you do have enough information, use the finish_test tool to determine if all the criteria have been met, if not, use the continue_test tool to let the next step play out.
</goal>
<scenario>
{scenario.description}
</scenario>
<criteria>
{criteria_str}
</criteria>
<rules>
- Be strict, do not let the conversation continue if the agent already broke one of the "do not" or "should not" criterias.
- DO NOT make any judgment calls that are not explicitly listed in the success or failure criteria, withhold judgement if necessary
</rules>
""",
},
{"role": "user", "content": content_for_judge},
]
max_turns = input.scenario_state.config.max_turns or 10
is_last_message = (
input.scenario_state.current_turn >= max_turns - 1
)
if is_last_message:
messages.append(
{
"role": "user",
"content": """
System:
<finish_test>
This is the last message, conversation has reached the maximum number of turns, give your final verdict,
if you don't have enough information to make a verdict, say inconclusive with max turns reached.
</finish_test>
""",
}
)
# Define the tools
criteria_names = [
re.sub(
r"[^a-zA-Z0-9]",
"_",
criterion.replace(" ", "_").replace("'", "").lower(),
)[:70]
for criterion in effective_criteria
]
tools: List[dict] = [
{
"type": "function",
"function": {
"name": "continue_test",
"description": "Continue the test with the next step",
"strict": True,
"parameters": {
"type": "object",
"properties": {},
"required": [],
"additionalProperties": False,
},
},
},
{
"type": "function",
"function": {
"name": "finish_test",
"description": "Complete the test with a final verdict",
"strict": True,
"parameters": {
"type": "object",
"properties": {
"criteria": {
"type": "object",
"properties": {
criteria_names[idx]: {
"type": "string",
"enum": ["true", "false", "inconclusive"],
"description": criterion,
}
for idx, criterion in enumerate(effective_criteria)
},
"required": criteria_names,
"additionalProperties": False,
"description": "Strict verdict for each criterion",
},
"reasoning": {
"type": "string",
"description": "Explanation of what the final verdict should be",
},
"verdict": {
"type": "string",
"enum": ["success", "failure", "inconclusive"],
"description": "The final verdict of the test",
},
},
"required": ["criteria", "reasoning", "verdict"],
"additionalProperties": False,
},
},
},
]
if is_large_trace:
tools = self._build_progressive_discovery_tools() + tools
enforce_judgment = input.judgment_request is not None
has_criteria = len(effective_criteria) > 0
if enforce_judgment and not has_criteria:
return ScenarioResult(
success=False,
messages=[],
reasoning="TestingAgent was called as a judge, but it has no criteria to judge against",
)
tool_choice: Any = (
{"type": "function", "function": {"name": "finish_test"}}
if (is_last_message or enforce_judgment) and has_criteria
else "required"
)
# Multi-step discovery loop for large traces
if is_large_trace:
return self._run_discovery_loop(
messages=messages,
tools=tools,
tool_choice=tool_choice,
spans=spans,
effective_criteria=effective_criteria,
)
# Standard single-call path for small traces
response = cast(
ModelResponse,
litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
api_key=self.api_key,
api_base=self.api_base,
max_tokens=self.max_tokens,
tools=tools,
tool_choice=tool_choice,
**self._extra_params,
),
)
return self._parse_response(response, effective_criteria, messages)
def _build_trace_digest(self, spans: Sequence[Any]) -> tuple[str, bool]:
"""
Builds the trace digest, choosing between full inline rendering
and structure-only mode based on estimated token count.
Args:
spans: The spans for this thread.
Returns:
Tuple of (digest_string, is_large_trace).
"""
full_digest = judge_span_digest_formatter.format(spans)
is_large_trace = (
len(spans) > 0 and estimate_tokens(full_digest) > self._token_threshold
)
if is_large_trace:
digest = (
judge_span_digest_formatter.format_structure_only(spans)
+ "\n\nUse expand_trace(span_id) to see span details or grep_trace(pattern) to search across spans. Reference spans by the ID shown in brackets."
)
else:
digest = full_digest
logger.debug(
"Trace digest built",
extra={
"is_large_trace": is_large_trace,
"estimated_tokens": estimate_tokens(full_digest),
},
)
return digest, is_large_trace
def _build_progressive_discovery_tools(self) -> List[dict]:
"""
Builds the expand_trace and grep_trace tool definitions for litellm.
Returns:
List of tool definition dicts for litellm function calling.
"""
return [
{
"type": "function",
"function": {
"name": "expand_trace",
"description": (
"Expand one or more spans to see their full details "
"(attributes, events, content). Use the span ID shown "
"in brackets in the trace skeleton."
),
"parameters": {
"type": "object",
"properties": {
"span_ids": {
"type": "array",
"items": {"type": "string"},
"description": "Span IDs (or 8-char prefixes) to expand",
},
},
"required": ["span_ids"],
"additionalProperties": False,
},
},
},
{
"type": "function",
"function": {
"name": "grep_trace",
"description": (
"Search across all span attributes, events, and content "
"for a pattern (case-insensitive). Returns matching spans with context."
),
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Search pattern (case-insensitive)",
},
},
"required": ["pattern"],
"additionalProperties": False,
},
},
},
]
def _run_discovery_loop(
self,
*,
messages: List[dict],
tools: List[dict],
tool_choice: Any,
spans: Sequence[Any],
effective_criteria: List[str],
) -> AgentReturnTypes:
"""
Runs the multi-step discovery loop for large traces.
The judge can call expand_trace/grep_trace tools multiple times before
reaching a terminal tool (finish_test/continue_test) or hitting the
max discovery steps limit.
On intermediate steps, tool_choice is "required" so the judge can freely
pick expand_trace/grep_trace. On the final step, the original tool_choice
(which may force finish_test) is applied.
Args:
messages: The conversation messages so far.
tools: The tool definitions.
tool_choice: The tool choice constraint for the final step.
spans: The spans for executing expand/grep tools.
effective_criteria: The criteria to judge against.
Returns:
AgentReturnTypes from the terminal tool call.
"""
terminal_tool_names = {"finish_test", "continue_test"}
for step in range(self._max_discovery_steps):
# Use "required" for intermediate steps so the judge can use
# discovery tools; only apply the forced tool_choice on the
# last allowed step.
is_last_step = step == self._max_discovery_steps - 1
step_tool_choice = tool_choice if is_last_step else "required"
response = cast(
ModelResponse,
litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
api_key=self.api_key,
api_base=self.api_base,
max_tokens=self.max_tokens,
tools=tools,
tool_choice=step_tool_choice,
**self._extra_params,
),
)
if not hasattr(response, "choices") or len(response.choices) == 0:
raise Exception(
f"Unexpected response format from LLM: {response.__repr__()}"
)
message = cast(Choices, response.choices[0]).message
if not message.tool_calls:
# No tool calls - try to parse as a response
return self._parse_response(response, effective_criteria, messages)
# Check for terminal tool call
terminal_call = next(
(tc for tc in message.tool_calls if tc.function.name in terminal_tool_names),
None,
)
if terminal_call:
return self._parse_response(response, effective_criteria, messages)
# Execute discovery tools and add results to messages
# Add the assistant message with tool calls
messages.append({
"role": "assistant",
"content": message.content or "",
"tool_calls": [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in message.tool_calls
],
})
for tc in message.tool_calls:
tool_result = self._execute_discovery_tool(tc, spans)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": tool_result,
})
# Hit max steps - force a verdict with whatever information was gathered
return self._force_verdict(
messages=messages,
tools=tools,
effective_criteria=effective_criteria,
)
def _force_verdict(
self,
*,
messages: List[dict],
tools: List[dict],
effective_criteria: List[str],
) -> AgentReturnTypes:
"""
Makes one final LLM call with tool_choice forced to finish_test,
so the judge renders a verdict with whatever context it accumulated
during discovery instead of hard-failing.
"""
logger.warning(
f"Progressive discovery hit max steps ({self._max_discovery_steps}), "
"forcing verdict"
)
messages.append({
"role": "user",
"content": (
"You have reached the maximum number of trace exploration steps. "
"Based on the information you have gathered so far, give your final verdict now."
),
})
forced_response = cast(
ModelResponse,
litellm.completion(
model=self.model,
messages=messages,
temperature=self.temperature,
api_key=self.api_key,
api_base=self.api_base,
max_tokens=self.max_tokens,
tools=tools,
tool_choice={"type": "function", "function": {"name": "finish_test"}},
**self._extra_params,
),
)
return self._parse_response(forced_response, effective_criteria, messages)
def _execute_discovery_tool(self, tool_call: Any, spans: Sequence[Any]) -> str:
"""
Executes an expand_trace or grep_trace tool call.
Args:
tool_call: The tool call from the LLM response.
spans: The spans to operate on.
Returns:
The tool result string.
"""
try:
args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
return f"Error: could not parse arguments: {tool_call.function.arguments}"
if tool_call.function.name == "expand_trace":
return expand_trace(
spans,
span_ids=args.get("span_ids", []),
)
elif tool_call.function.name == "grep_trace":
return grep_trace(spans, args.get("pattern", ""))
else:
return f"Unknown tool: {tool_call.function.name}"
def _parse_response(
self,
response: Any,
effective_criteria: List[str],
messages: List[dict],
) -> AgentReturnTypes:
"""
Parses a litellm response into the appropriate return type.
Handles finish_test, continue_test, and error cases.
Args:
response: The litellm ModelResponse.
effective_criteria: The criteria to evaluate against.
messages: The conversation messages (for inclusion in ScenarioResult).
Returns:
AgentReturnTypes: Either an empty list (continue) or ScenarioResult.
"""
if not hasattr(response, "choices") or len(response.choices) == 0:
raise Exception(
f"Unexpected response format from LLM: {response.__repr__()}"
)
message = cast(Choices, response.choices[0]).message
if not message.tool_calls:
raise Exception(
f"Invalid response from judge agent, tool calls not found: {message.__repr__()}"
)
# In multi-step mode, find the terminal tool call
terminal_names = {"finish_test", "continue_test"}
terminal_call = next(
(tc for tc in message.tool_calls if tc.function.name in terminal_names),
None,
)
tool_call = terminal_call or message.tool_calls[0]
if tool_call.function.name == "continue_test":
return []
if tool_call.function.name == "finish_test":
try:
args = json.loads(tool_call.function.arguments)
verdict = args.get("verdict", "inconclusive")
reasoning = args.get("reasoning", "No reasoning provided")
criteria_verdicts = args.get("criteria", {})
passed_criteria = [
effective_criteria[idx]
for idx, criterion in enumerate(criteria_verdicts.values())
if criterion == "true"
]
failed_criteria = [
effective_criteria[idx]
for idx, criterion in enumerate(criteria_verdicts.values())
if criterion == "false" or criterion == "inconclusive"
]
return ScenarioResult(
success=verdict == "success" and len(failed_criteria) == 0,
messages=cast(Any, messages),
reasoning=reasoning,
passed_criteria=passed_criteria,
failed_criteria=failed_criteria,
)
except json.JSONDecodeError:
raise Exception(
f"Failed to parse tool call arguments from judge agent: {tool_call.function.arguments}"
)
raise Exception(
f"Invalid tool call from judge agent: {tool_call.function.name}"
)
Classes
class JudgeAgent (*, criteria: List[str] | None = None, model: str | None = None, api_base: str | None = None, api_key: str | None = None, temperature: float = 0.0, max_tokens: int | None = None, system_prompt: str | None = None, span_collector: scenario._tracing.judge_span_collector.JudgeSpanCollector | None = None, token_threshold: int = 8192, max_discovery_steps: int = 10, **extra_params)-
Agent that evaluates conversations against success criteria.
The JudgeAgent watches conversations in real-time and makes decisions about whether the agent under test is meeting the specified criteria. It can either allow the conversation to continue or end it with a success/failure verdict.
The judge uses function calling to make structured decisions and provides detailed reasoning for its verdicts. It evaluates each criterion independently and provides comprehensive feedback about what worked and what didn't.
Attributes
role- Always AgentRole.JUDGE for judge agents
model- LLM model identifier to use for evaluation
api_base- Optional base URL where the model is hosted
api_key- Optional API key for the model provider
temperature- Sampling temperature for evaluation consistency
max_tokens- Maximum tokens for judge reasoning
criteria- List of success criteria to evaluate against
system_prompt- Custom system prompt to override default judge behavior
Example
import scenario # Basic judge agent with criteria judge = scenario.JudgeAgent( criteria=[ "Agent provides helpful responses", "Agent asks relevant follow-up questions", "Agent does not provide harmful information" ] ) # Customized judge with specific model and behavior strict_judge = scenario.JudgeAgent( model="openai/gpt-4.1-mini", criteria=[ "Code examples are syntactically correct", "Explanations are technically accurate", "Security best practices are mentioned" ], temperature=0.0, # More deterministic evaluation system_prompt="You are a strict technical reviewer evaluating code quality." ) # Use in scenario result = await scenario.run( name="coding assistant test", description="User asks for help with Python functions", agents=[ coding_agent, scenario.UserSimulatorAgent(), judge ] ) print(f"Passed criteria: {result.passed_criteria}") print(f"Failed criteria: {result.failed_criteria}")Note
- Judge agents evaluate conversations continuously, not just at the end
- They can end scenarios early if clear success/failure conditions are met
- Provide detailed reasoning for their decisions
- Support both positive criteria (things that should happen) and negative criteria (things that shouldn't)
Initialize a judge agent with evaluation criteria.
Args
criteria- List of success criteria to evaluate the conversation against. Can include both positive requirements ("Agent provides helpful responses") and negative constraints ("Agent should not provide personal information").
model- LLM model identifier (e.g., "openai/gpt-4.1-mini"). If not provided, uses the default model from global configuration.
api_base- Optional base URL where the model is hosted. If not provided, uses the base URL from global configuration.
api_key- API key for the model provider. If not provided, uses the key from global configuration or environment.
temperature- Sampling temperature for evaluation (0.0-1.0). Lower values (0.0-0.2) recommended for consistent evaluation.
max_tokens- Maximum number of tokens for judge reasoning and explanations.
system_prompt- Custom system prompt to override default judge behavior. Use this to create specialized evaluation perspectives.
span_collector- Optional span collector for telemetry. Defaults to global singleton.
token_threshold- Estimated token count above which traces switch to structure-only rendering with progressive discovery tools. Defaults to 8192.
max_discovery_steps- Maximum number of expand/grep tool calls the judge can make before being forced to return a verdict. Defaults to 10.
Raises
Exception- If no model is configured either in parameters or global config
Example
# Customer service judge cs_judge = JudgeAgent( criteria=[ "Agent replies with the refund policy", "Agent offers next steps for the customer", ], temperature=0.1 ) # Technical accuracy judge tech_judge = JudgeAgent( criteria=[ "Agent adds a code review pointing out the code compilation errors", "Agent adds a code review about the missing security headers" ], system_prompt="You are a senior software engineer reviewing code for production use." )Note
Advanced usage: Additional parameters can be passed as keyword arguments (e.g., headers, timeout, client) for specialized configurations. These are experimental and may not be supported in future versions.
Expand source code
class JudgeAgent(AgentAdapter): """ Agent that evaluates conversations against success criteria. The JudgeAgent watches conversations in real-time and makes decisions about whether the agent under test is meeting the specified criteria. It can either allow the conversation to continue or end it with a success/failure verdict. The judge uses function calling to make structured decisions and provides detailed reasoning for its verdicts. It evaluates each criterion independently and provides comprehensive feedback about what worked and what didn't. Attributes: role: Always AgentRole.JUDGE for judge agents model: LLM model identifier to use for evaluation api_base: Optional base URL where the model is hosted api_key: Optional API key for the model provider temperature: Sampling temperature for evaluation consistency max_tokens: Maximum tokens for judge reasoning criteria: List of success criteria to evaluate against system_prompt: Custom system prompt to override default judge behavior Example: ``` import scenario # Basic judge agent with criteria judge = scenario.JudgeAgent( criteria=[ "Agent provides helpful responses", "Agent asks relevant follow-up questions", "Agent does not provide harmful information" ] ) # Customized judge with specific model and behavior strict_judge = scenario.JudgeAgent( model="openai/gpt-4.1-mini", criteria=[ "Code examples are syntactically correct", "Explanations are technically accurate", "Security best practices are mentioned" ], temperature=0.0, # More deterministic evaluation system_prompt="You are a strict technical reviewer evaluating code quality." ) # Use in scenario result = await scenario.run( name="coding assistant test", description="User asks for help with Python functions", agents=[ coding_agent, scenario.UserSimulatorAgent(), judge ] ) print(f"Passed criteria: {result.passed_criteria}") print(f"Failed criteria: {result.failed_criteria}") ``` Note: - Judge agents evaluate conversations continuously, not just at the end - They can end scenarios early if clear success/failure conditions are met - Provide detailed reasoning for their decisions - Support both positive criteria (things that should happen) and negative criteria (things that shouldn't) """ role = AgentRole.JUDGE model: str api_base: Optional[str] api_key: Optional[str] temperature: float max_tokens: Optional[int] criteria: List[str] system_prompt: Optional[str] _extra_params: dict _span_collector: JudgeSpanCollector _token_threshold: int _max_discovery_steps: int def __init__( self, *, criteria: Optional[List[str]] = None, model: Optional[str] = None, api_base: Optional[str] = None, api_key: Optional[str] = None, temperature: float = 0.0, max_tokens: Optional[int] = None, system_prompt: Optional[str] = None, span_collector: Optional[JudgeSpanCollector] = None, token_threshold: int = DEFAULT_TOKEN_THRESHOLD, max_discovery_steps: int = 10, **extra_params, ): """ Initialize a judge agent with evaluation criteria. Args: criteria: List of success criteria to evaluate the conversation against. Can include both positive requirements ("Agent provides helpful responses") and negative constraints ("Agent should not provide personal information"). model: LLM model identifier (e.g., "openai/gpt-4.1-mini"). If not provided, uses the default model from global configuration. api_base: Optional base URL where the model is hosted. If not provided, uses the base URL from global configuration. api_key: API key for the model provider. If not provided, uses the key from global configuration or environment. temperature: Sampling temperature for evaluation (0.0-1.0). Lower values (0.0-0.2) recommended for consistent evaluation. max_tokens: Maximum number of tokens for judge reasoning and explanations. system_prompt: Custom system prompt to override default judge behavior. Use this to create specialized evaluation perspectives. span_collector: Optional span collector for telemetry. Defaults to global singleton. token_threshold: Estimated token count above which traces switch to structure-only rendering with progressive discovery tools. Defaults to 8192. max_discovery_steps: Maximum number of expand/grep tool calls the judge can make before being forced to return a verdict. Defaults to 10. Raises: Exception: If no model is configured either in parameters or global config Example: ``` # Customer service judge cs_judge = JudgeAgent( criteria=[ "Agent replies with the refund policy", "Agent offers next steps for the customer", ], temperature=0.1 ) # Technical accuracy judge tech_judge = JudgeAgent( criteria=[ "Agent adds a code review pointing out the code compilation errors", "Agent adds a code review about the missing security headers" ], system_prompt="You are a senior software engineer reviewing code for production use." ) ``` Note: Advanced usage: Additional parameters can be passed as keyword arguments (e.g., headers, timeout, client) for specialized configurations. These are experimental and may not be supported in future versions. """ self.criteria = criteria or [] self.api_base = api_base self.api_key = api_key self.temperature = temperature self.max_tokens = max_tokens self.system_prompt = system_prompt self._span_collector = span_collector or judge_span_collector self._token_threshold = token_threshold self._max_discovery_steps = max_discovery_steps if model: self.model = model if ScenarioConfig.default_config is not None and isinstance( ScenarioConfig.default_config.default_model, str ): self.model = model or ScenarioConfig.default_config.default_model self._extra_params = extra_params elif ScenarioConfig.default_config is not None and isinstance( ScenarioConfig.default_config.default_model, ModelConfig ): self.model = model or ScenarioConfig.default_config.default_model.model self.api_base = ( api_base or ScenarioConfig.default_config.default_model.api_base ) self.api_key = ( api_key or ScenarioConfig.default_config.default_model.api_key ) self.temperature = ( temperature or ScenarioConfig.default_config.default_model.temperature ) self.max_tokens = ( max_tokens or ScenarioConfig.default_config.default_model.max_tokens ) # Extract extra params from ModelConfig config_dict = ScenarioConfig.default_config.default_model.model_dump( exclude_none=True ) config_dict.pop("model", None) config_dict.pop("api_base", None) config_dict.pop("api_key", None) config_dict.pop("temperature", None) config_dict.pop("max_tokens", None) # Merge: config extras < agent extra_params self._extra_params = {**config_dict, **extra_params} else: self._extra_params = extra_params if not hasattr(self, "model"): raise Exception(agent_not_configured_error_message("JudgeAgent")) @scenario_cache() async def call( self, input: AgentInput, ) -> AgentReturnTypes: """ Evaluate the current conversation state against the configured criteria. This method analyzes the conversation history and determines whether the scenario should continue or end with a verdict. It uses function calling to make structured decisions and provides detailed reasoning. Args: input: AgentInput containing conversation history and scenario context Returns: AgentReturnTypes: Either an empty list (continue scenario) or a ScenarioResult (end scenario with verdict) Raises: Exception: If the judge cannot make a valid decision or if there's an error in the evaluation process Note: - Returns empty list [] to continue the scenario - Returns ScenarioResult to end with success/failure - Provides detailed reasoning for all decisions - Evaluates each criterion independently - Can end scenarios early if clear violation or success is detected """ scenario = input.scenario_state effective_criteria = ( input.judgment_request.criteria if input.judgment_request and input.judgment_request.criteria is not None else self.criteria ) # Build transcript and traces digest transcript = JudgeUtils.build_transcript_from_messages(input.messages) spans = self._span_collector.get_spans_for_thread(input.thread_id) digest, is_large_trace = self._build_trace_digest(spans) logger.debug(f"OpenTelemetry traces built: {digest[:200]}...") content_for_judge = f""" <transcript> {transcript} </transcript> <opentelemetry_traces> {digest} </opentelemetry_traces> """ criteria_str = "\n".join( [f"{idx + 1}. {criterion}" for idx, criterion in enumerate(effective_criteria)] ) messages: List[dict] = [ { "role": "system", "content": self.system_prompt or f""" <role> You are an LLM as a judge watching a simulated conversation as it plays out live to determine if the agent under test meets the criteria or not. </role> <goal> Your goal is to determine if you already have enough information to make a verdict of the scenario below, or if the conversation should continue for longer. If you do have enough information, use the finish_test tool to determine if all the criteria have been met, if not, use the continue_test tool to let the next step play out. </goal> <scenario> {scenario.description} </scenario> <criteria> {criteria_str} </criteria> <rules> - Be strict, do not let the conversation continue if the agent already broke one of the "do not" or "should not" criterias. - DO NOT make any judgment calls that are not explicitly listed in the success or failure criteria, withhold judgement if necessary </rules> """, }, {"role": "user", "content": content_for_judge}, ] max_turns = input.scenario_state.config.max_turns or 10 is_last_message = ( input.scenario_state.current_turn >= max_turns - 1 ) if is_last_message: messages.append( { "role": "user", "content": """ System: <finish_test> This is the last message, conversation has reached the maximum number of turns, give your final verdict, if you don't have enough information to make a verdict, say inconclusive with max turns reached. </finish_test> """, } ) # Define the tools criteria_names = [ re.sub( r"[^a-zA-Z0-9]", "_", criterion.replace(" ", "_").replace("'", "").lower(), )[:70] for criterion in effective_criteria ] tools: List[dict] = [ { "type": "function", "function": { "name": "continue_test", "description": "Continue the test with the next step", "strict": True, "parameters": { "type": "object", "properties": {}, "required": [], "additionalProperties": False, }, }, }, { "type": "function", "function": { "name": "finish_test", "description": "Complete the test with a final verdict", "strict": True, "parameters": { "type": "object", "properties": { "criteria": { "type": "object", "properties": { criteria_names[idx]: { "type": "string", "enum": ["true", "false", "inconclusive"], "description": criterion, } for idx, criterion in enumerate(effective_criteria) }, "required": criteria_names, "additionalProperties": False, "description": "Strict verdict for each criterion", }, "reasoning": { "type": "string", "description": "Explanation of what the final verdict should be", }, "verdict": { "type": "string", "enum": ["success", "failure", "inconclusive"], "description": "The final verdict of the test", }, }, "required": ["criteria", "reasoning", "verdict"], "additionalProperties": False, }, }, }, ] if is_large_trace: tools = self._build_progressive_discovery_tools() + tools enforce_judgment = input.judgment_request is not None has_criteria = len(effective_criteria) > 0 if enforce_judgment and not has_criteria: return ScenarioResult( success=False, messages=[], reasoning="TestingAgent was called as a judge, but it has no criteria to judge against", ) tool_choice: Any = ( {"type": "function", "function": {"name": "finish_test"}} if (is_last_message or enforce_judgment) and has_criteria else "required" ) # Multi-step discovery loop for large traces if is_large_trace: return self._run_discovery_loop( messages=messages, tools=tools, tool_choice=tool_choice, spans=spans, effective_criteria=effective_criteria, ) # Standard single-call path for small traces response = cast( ModelResponse, litellm.completion( model=self.model, messages=messages, temperature=self.temperature, api_key=self.api_key, api_base=self.api_base, max_tokens=self.max_tokens, tools=tools, tool_choice=tool_choice, **self._extra_params, ), ) return self._parse_response(response, effective_criteria, messages) def _build_trace_digest(self, spans: Sequence[Any]) -> tuple[str, bool]: """ Builds the trace digest, choosing between full inline rendering and structure-only mode based on estimated token count. Args: spans: The spans for this thread. Returns: Tuple of (digest_string, is_large_trace). """ full_digest = judge_span_digest_formatter.format(spans) is_large_trace = ( len(spans) > 0 and estimate_tokens(full_digest) > self._token_threshold ) if is_large_trace: digest = ( judge_span_digest_formatter.format_structure_only(spans) + "\n\nUse expand_trace(span_id) to see span details or grep_trace(pattern) to search across spans. Reference spans by the ID shown in brackets." ) else: digest = full_digest logger.debug( "Trace digest built", extra={ "is_large_trace": is_large_trace, "estimated_tokens": estimate_tokens(full_digest), }, ) return digest, is_large_trace def _build_progressive_discovery_tools(self) -> List[dict]: """ Builds the expand_trace and grep_trace tool definitions for litellm. Returns: List of tool definition dicts for litellm function calling. """ return [ { "type": "function", "function": { "name": "expand_trace", "description": ( "Expand one or more spans to see their full details " "(attributes, events, content). Use the span ID shown " "in brackets in the trace skeleton." ), "parameters": { "type": "object", "properties": { "span_ids": { "type": "array", "items": {"type": "string"}, "description": "Span IDs (or 8-char prefixes) to expand", }, }, "required": ["span_ids"], "additionalProperties": False, }, }, }, { "type": "function", "function": { "name": "grep_trace", "description": ( "Search across all span attributes, events, and content " "for a pattern (case-insensitive). Returns matching spans with context." ), "parameters": { "type": "object", "properties": { "pattern": { "type": "string", "description": "Search pattern (case-insensitive)", }, }, "required": ["pattern"], "additionalProperties": False, }, }, }, ] def _run_discovery_loop( self, *, messages: List[dict], tools: List[dict], tool_choice: Any, spans: Sequence[Any], effective_criteria: List[str], ) -> AgentReturnTypes: """ Runs the multi-step discovery loop for large traces. The judge can call expand_trace/grep_trace tools multiple times before reaching a terminal tool (finish_test/continue_test) or hitting the max discovery steps limit. On intermediate steps, tool_choice is "required" so the judge can freely pick expand_trace/grep_trace. On the final step, the original tool_choice (which may force finish_test) is applied. Args: messages: The conversation messages so far. tools: The tool definitions. tool_choice: The tool choice constraint for the final step. spans: The spans for executing expand/grep tools. effective_criteria: The criteria to judge against. Returns: AgentReturnTypes from the terminal tool call. """ terminal_tool_names = {"finish_test", "continue_test"} for step in range(self._max_discovery_steps): # Use "required" for intermediate steps so the judge can use # discovery tools; only apply the forced tool_choice on the # last allowed step. is_last_step = step == self._max_discovery_steps - 1 step_tool_choice = tool_choice if is_last_step else "required" response = cast( ModelResponse, litellm.completion( model=self.model, messages=messages, temperature=self.temperature, api_key=self.api_key, api_base=self.api_base, max_tokens=self.max_tokens, tools=tools, tool_choice=step_tool_choice, **self._extra_params, ), ) if not hasattr(response, "choices") or len(response.choices) == 0: raise Exception( f"Unexpected response format from LLM: {response.__repr__()}" ) message = cast(Choices, response.choices[0]).message if not message.tool_calls: # No tool calls - try to parse as a response return self._parse_response(response, effective_criteria, messages) # Check for terminal tool call terminal_call = next( (tc for tc in message.tool_calls if tc.function.name in terminal_tool_names), None, ) if terminal_call: return self._parse_response(response, effective_criteria, messages) # Execute discovery tools and add results to messages # Add the assistant message with tool calls messages.append({ "role": "assistant", "content": message.content or "", "tool_calls": [ { "id": tc.id, "type": "function", "function": { "name": tc.function.name, "arguments": tc.function.arguments, }, } for tc in message.tool_calls ], }) for tc in message.tool_calls: tool_result = self._execute_discovery_tool(tc, spans) messages.append({ "role": "tool", "tool_call_id": tc.id, "content": tool_result, }) # Hit max steps - force a verdict with whatever information was gathered return self._force_verdict( messages=messages, tools=tools, effective_criteria=effective_criteria, ) def _force_verdict( self, *, messages: List[dict], tools: List[dict], effective_criteria: List[str], ) -> AgentReturnTypes: """ Makes one final LLM call with tool_choice forced to finish_test, so the judge renders a verdict with whatever context it accumulated during discovery instead of hard-failing. """ logger.warning( f"Progressive discovery hit max steps ({self._max_discovery_steps}), " "forcing verdict" ) messages.append({ "role": "user", "content": ( "You have reached the maximum number of trace exploration steps. " "Based on the information you have gathered so far, give your final verdict now." ), }) forced_response = cast( ModelResponse, litellm.completion( model=self.model, messages=messages, temperature=self.temperature, api_key=self.api_key, api_base=self.api_base, max_tokens=self.max_tokens, tools=tools, tool_choice={"type": "function", "function": {"name": "finish_test"}}, **self._extra_params, ), ) return self._parse_response(forced_response, effective_criteria, messages) def _execute_discovery_tool(self, tool_call: Any, spans: Sequence[Any]) -> str: """ Executes an expand_trace or grep_trace tool call. Args: tool_call: The tool call from the LLM response. spans: The spans to operate on. Returns: The tool result string. """ try: args = json.loads(tool_call.function.arguments) except json.JSONDecodeError: return f"Error: could not parse arguments: {tool_call.function.arguments}" if tool_call.function.name == "expand_trace": return expand_trace( spans, span_ids=args.get("span_ids", []), ) elif tool_call.function.name == "grep_trace": return grep_trace(spans, args.get("pattern", "")) else: return f"Unknown tool: {tool_call.function.name}" def _parse_response( self, response: Any, effective_criteria: List[str], messages: List[dict], ) -> AgentReturnTypes: """ Parses a litellm response into the appropriate return type. Handles finish_test, continue_test, and error cases. Args: response: The litellm ModelResponse. effective_criteria: The criteria to evaluate against. messages: The conversation messages (for inclusion in ScenarioResult). Returns: AgentReturnTypes: Either an empty list (continue) or ScenarioResult. """ if not hasattr(response, "choices") or len(response.choices) == 0: raise Exception( f"Unexpected response format from LLM: {response.__repr__()}" ) message = cast(Choices, response.choices[0]).message if not message.tool_calls: raise Exception( f"Invalid response from judge agent, tool calls not found: {message.__repr__()}" ) # In multi-step mode, find the terminal tool call terminal_names = {"finish_test", "continue_test"} terminal_call = next( (tc for tc in message.tool_calls if tc.function.name in terminal_names), None, ) tool_call = terminal_call or message.tool_calls[0] if tool_call.function.name == "continue_test": return [] if tool_call.function.name == "finish_test": try: args = json.loads(tool_call.function.arguments) verdict = args.get("verdict", "inconclusive") reasoning = args.get("reasoning", "No reasoning provided") criteria_verdicts = args.get("criteria", {}) passed_criteria = [ effective_criteria[idx] for idx, criterion in enumerate(criteria_verdicts.values()) if criterion == "true" ] failed_criteria = [ effective_criteria[idx] for idx, criterion in enumerate(criteria_verdicts.values()) if criterion == "false" or criterion == "inconclusive" ] return ScenarioResult( success=verdict == "success" and len(failed_criteria) == 0, messages=cast(Any, messages), reasoning=reasoning, passed_criteria=passed_criteria, failed_criteria=failed_criteria, ) except json.JSONDecodeError: raise Exception( f"Failed to parse tool call arguments from judge agent: {tool_call.function.arguments}" ) raise Exception( f"Invalid tool call from judge agent: {tool_call.function.name}" )Ancestors
- AgentAdapter
- abc.ABC
Class variables
var api_base : str | Nonevar api_key : str | Nonevar criteria : List[str]var max_tokens : int | Nonevar model : strvar role : ClassVar[AgentRole]var system_prompt : str | Nonevar temperature : float
Methods
async def call(self, input: AgentInput) ‑> str | openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam | List[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam | openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam | openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam | openai.types.chat.chat_completion_assistant_message_param.ChatCompletionAssistantMessageParam | openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam | openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam] | ScenarioResult-
Evaluate the current conversation state against the configured criteria.
This method analyzes the conversation history and determines whether the scenario should continue or end with a verdict. It uses function calling to make structured decisions and provides detailed reasoning.
Args
input- AgentInput containing conversation history and scenario context
Returns
AgentReturnTypes- Either an empty list (continue scenario) or a ScenarioResult (end scenario with verdict)
Raises
Exception- If the judge cannot make a valid decision or if there's an error in the evaluation process
Note
- Returns empty list [] to continue the scenario
- Returns ScenarioResult to end with success/failure
- Provides detailed reasoning for all decisions
- Evaluates each criterion independently
- Can end scenarios early if clear violation or success is detected
Expand source code
@scenario_cache() async def call( self, input: AgentInput, ) -> AgentReturnTypes: """ Evaluate the current conversation state against the configured criteria. This method analyzes the conversation history and determines whether the scenario should continue or end with a verdict. It uses function calling to make structured decisions and provides detailed reasoning. Args: input: AgentInput containing conversation history and scenario context Returns: AgentReturnTypes: Either an empty list (continue scenario) or a ScenarioResult (end scenario with verdict) Raises: Exception: If the judge cannot make a valid decision or if there's an error in the evaluation process Note: - Returns empty list [] to continue the scenario - Returns ScenarioResult to end with success/failure - Provides detailed reasoning for all decisions - Evaluates each criterion independently - Can end scenarios early if clear violation or success is detected """ scenario = input.scenario_state effective_criteria = ( input.judgment_request.criteria if input.judgment_request and input.judgment_request.criteria is not None else self.criteria ) # Build transcript and traces digest transcript = JudgeUtils.build_transcript_from_messages(input.messages) spans = self._span_collector.get_spans_for_thread(input.thread_id) digest, is_large_trace = self._build_trace_digest(spans) logger.debug(f"OpenTelemetry traces built: {digest[:200]}...") content_for_judge = f""" <transcript> {transcript} </transcript> <opentelemetry_traces> {digest} </opentelemetry_traces> """ criteria_str = "\n".join( [f"{idx + 1}. {criterion}" for idx, criterion in enumerate(effective_criteria)] ) messages: List[dict] = [ { "role": "system", "content": self.system_prompt or f""" <role> You are an LLM as a judge watching a simulated conversation as it plays out live to determine if the agent under test meets the criteria or not. </role> <goal> Your goal is to determine if you already have enough information to make a verdict of the scenario below, or if the conversation should continue for longer. If you do have enough information, use the finish_test tool to determine if all the criteria have been met, if not, use the continue_test tool to let the next step play out. </goal> <scenario> {scenario.description} </scenario> <criteria> {criteria_str} </criteria> <rules> - Be strict, do not let the conversation continue if the agent already broke one of the "do not" or "should not" criterias. - DO NOT make any judgment calls that are not explicitly listed in the success or failure criteria, withhold judgement if necessary </rules> """, }, {"role": "user", "content": content_for_judge}, ] max_turns = input.scenario_state.config.max_turns or 10 is_last_message = ( input.scenario_state.current_turn >= max_turns - 1 ) if is_last_message: messages.append( { "role": "user", "content": """ System: <finish_test> This is the last message, conversation has reached the maximum number of turns, give your final verdict, if you don't have enough information to make a verdict, say inconclusive with max turns reached. </finish_test> """, } ) # Define the tools criteria_names = [ re.sub( r"[^a-zA-Z0-9]", "_", criterion.replace(" ", "_").replace("'", "").lower(), )[:70] for criterion in effective_criteria ] tools: List[dict] = [ { "type": "function", "function": { "name": "continue_test", "description": "Continue the test with the next step", "strict": True, "parameters": { "type": "object", "properties": {}, "required": [], "additionalProperties": False, }, }, }, { "type": "function", "function": { "name": "finish_test", "description": "Complete the test with a final verdict", "strict": True, "parameters": { "type": "object", "properties": { "criteria": { "type": "object", "properties": { criteria_names[idx]: { "type": "string", "enum": ["true", "false", "inconclusive"], "description": criterion, } for idx, criterion in enumerate(effective_criteria) }, "required": criteria_names, "additionalProperties": False, "description": "Strict verdict for each criterion", }, "reasoning": { "type": "string", "description": "Explanation of what the final verdict should be", }, "verdict": { "type": "string", "enum": ["success", "failure", "inconclusive"], "description": "The final verdict of the test", }, }, "required": ["criteria", "reasoning", "verdict"], "additionalProperties": False, }, }, }, ] if is_large_trace: tools = self._build_progressive_discovery_tools() + tools enforce_judgment = input.judgment_request is not None has_criteria = len(effective_criteria) > 0 if enforce_judgment and not has_criteria: return ScenarioResult( success=False, messages=[], reasoning="TestingAgent was called as a judge, but it has no criteria to judge against", ) tool_choice: Any = ( {"type": "function", "function": {"name": "finish_test"}} if (is_last_message or enforce_judgment) and has_criteria else "required" ) # Multi-step discovery loop for large traces if is_large_trace: return self._run_discovery_loop( messages=messages, tools=tools, tool_choice=tool_choice, spans=spans, effective_criteria=effective_criteria, ) # Standard single-call path for small traces response = cast( ModelResponse, litellm.completion( model=self.model, messages=messages, temperature=self.temperature, api_key=self.api_key, api_base=self.api_base, max_tokens=self.max_tokens, tools=tools, tool_choice=tool_choice, **self._extra_params, ), ) return self._parse_response(response, effective_criteria, messages)