Files
lush_grading/lush_bench/report.py
Cormac Shannon 18ce7e57cf Revamp questionnaire, parallelize run-all, add new tasks
- Replace 6 compound Likert questions with 12 atomic ones grouped by
  dimension (syntax, expressiveness, data/IO, errors, overall); drop
  free-form question. Responses now stored as ints, not strings.
- Back-compat layer maps legacy keys to new dimensions so existing
  results still render.
- Parallelize run-all with ThreadPoolExecutor (configurable workers)
  and add a thread-safe min-request-interval rate limiter to the
  Anthropic provider.
- Add new tasks: path_normalizer, todo_manager, currency_converter,
  locale_weather_url, network_info_parser, url_normalizer.
2026-04-07 19:07:21 +01:00

342 lines
13 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from .models import BenchmarkResult
# New 12-item question list: (key, label, dimension)
LIKERT_QUESTIONS = [
("syntax_clarity", "Syntax clarity", "Syntax & Readability"),
("signal_to_noise", "Signal-to-noise", "Syntax & Readability"),
("familiar_conventions", "Familiar conventions", "Syntax & Readability"),
("builtin_ops", "Built-in operations", "Expressiveness"),
("string_ops", "String operations", "Expressiveness"),
("composition", "Composition", "Expressiveness"),
("io_ergonomics", "I/O ergonomics", "Data & I/O"),
("data_structures", "Data structures", "Data & I/O"),
("error_model", "Error model", "Error Handling"),
("edge_case_support", "Edge case support", "Error Handling"),
("learnability", "Learnability", "Overall"),
("fitness", "Fitness for task", "Overall"),
]
# Map old 6 legacy keys to new keys for back-compat with existing results
LEGACY_KEY_MAP = {
"Readability": ["syntax_clarity", "signal_to_noise", "familiar_conventions"],
"Expressiveness": ["builtin_ops", "string_ops", "composition"],
"Conciseness": ["signal_to_noise"],
"Error handling": ["error_model", "edge_case_support"],
"Overall preference": ["fitness"],
"Learning curve": ["learnability"],
}
def load_latest_results(results_dir: Path) -> list[BenchmarkResult]:
"""Load results, keeping only the latest run per task name."""
latest: dict[str, BenchmarkResult] = {}
for d in sorted(results_dir.iterdir()):
result_file = d / "result.json"
if not result_file.exists():
continue
with open(result_file) as f:
r = BenchmarkResult.from_dict(json.load(f))
latest[r.task_name] = r
return sorted(latest.values(), key=lambda r: (r.category, r.task_name))
def _parse_likert(selected: str | int) -> int | None:
"""Extract numeric value from a likert response. Handles int directly or string like '4 - Agree'."""
if isinstance(selected, int):
return selected
s = str(selected).strip()
if s and s[0].isdigit():
return int(s[0])
return None
def _get_likert_scores(result: BenchmarkResult) -> dict[str, dict[str, float | None]]:
"""Extract likert scores per language. Returns {question_key: {bash: N, lush: N}}.
Handles both new-format results (exact id match) and legacy results (startswith match
mapped to new keys).
"""
scores: dict[str, dict[str, float | None]] = {}
for key, _, _ in LIKERT_QUESTIONS:
scores[key] = {"bash": None, "lush": None}
for lang_name, lang_result in [("bash", result.bash_result), ("lush", result.lush_result)]:
if not lang_result:
continue
for q in lang_result.questionnaire:
# Try exact match on new question ids
if q.question in scores:
val = _parse_likert(q.selected)
if val is not None:
scores[q.question][lang_name] = float(val)
continue
# Legacy: map old key to new keys (spread the score)
for legacy_prefix, new_keys in LEGACY_KEY_MAP.items():
if q.question.startswith(legacy_prefix):
val = _parse_likert(q.selected)
if val is not None:
for nk in new_keys:
if scores[nk][lang_name] is None:
scores[nk][lang_name] = float(val)
break
return scores
def _bar(value: float, max_val: float = 5.0, width: int = 20) -> str:
"""Render a small horizontal bar."""
filled = int(round(value / max_val * width))
return "\u2588" * filled + "\u2591" * (width - filled)
def render_summary_table(results: list[BenchmarkResult]) -> str:
"""Render the pass/fail + turns overview table."""
lines: list[str] = []
lines.append("")
lines.append("=" * 78)
lines.append(" BENCHMARK RESULTS SUMMARY")
lines.append("=" * 78)
lines.append("")
header = f" {'Task':<22s} {'Category':<12s} {'Mode':<8s} {'Bash':^14s} {'Lush':^14s}"
lines.append(header)
sub = f" {'':<22s} {'':<12s} {'':<8s} {'pass turns':^14s} {'pass turns':^14s}"
lines.append(sub)
lines.append(" " + "-" * 74)
for r in results:
b = r.bash_result
l = r.lush_result
b_pass = "PASS" if b and b.all_passed else "FAIL" if b else "-"
l_pass = "PASS" if l and l.all_passed else "FAIL" if l else "-"
b_turns = str(b.agent_turns) if b else "-"
l_turns = str(l.agent_turns) if l else "-"
lines.append(f" {r.task_name:<22s} {r.category:<12s} {r.mode:<8s} {b_pass:>4s} {b_turns:>5s} {l_pass:>4s} {l_turns:>5s}")
# Totals
b_passed = sum(1 for r in results if r.bash_result and r.bash_result.all_passed)
l_passed = sum(1 for r in results if r.lush_result and r.lush_result.all_passed)
b_total = sum(1 for r in results if r.bash_result)
l_total = sum(1 for r in results if r.lush_result)
b_turns_avg = 0.0
l_turns_avg = 0.0
b_turn_counts = [r.bash_result.agent_turns for r in results if r.bash_result and r.bash_result.agent_turns > 0]
l_turn_counts = [r.lush_result.agent_turns for r in results if r.lush_result and r.lush_result.agent_turns > 0]
if b_turn_counts:
b_turns_avg = sum(b_turn_counts) / len(b_turn_counts)
if l_turn_counts:
l_turns_avg = sum(l_turn_counts) / len(l_turn_counts)
lines.append(" " + "-" * 74)
lines.append(f" {'TOTAL':<22s} {'':<12s} {'':<8s} {b_passed}/{b_total:>2d} {b_turns_avg:>5.1f} {l_passed}/{l_total:>2d} {l_turns_avg:>5.1f}")
lines.append(f" {'':<22s} {'':<12s} {'':<8s} {'pass avg turns':^14s} {'pass avg turns':^14s}")
lines.append("")
return "\n".join(lines)
def render_questionnaire_comparison(results: list[BenchmarkResult]) -> str:
"""Render aggregated questionnaire scores with bar charts, grouped by dimension."""
lines: list[str] = []
lines.append("=" * 78)
lines.append(" QUESTIONNAIRE SCORES (1-5 Likert, higher = better)")
lines.append("=" * 78)
lines.append("")
# Aggregate scores across all tasks
agg: dict[str, dict[str, list[float]]] = {}
for key, _, _ in LIKERT_QUESTIONS:
agg[key] = {"bash": [], "lush": []}
for r in results:
scores = _get_likert_scores(r)
for key in scores:
for lang in ("bash", "lush"):
val = scores[key][lang]
if val is not None:
agg[key][lang].append(val)
# Group by dimension
current_dim = None
for key, label, dimension in LIKERT_QUESTIONS:
if dimension != current_dim:
if current_dim is not None:
lines.append("")
lines.append(f" [{dimension}]")
current_dim = dimension
b_vals = agg[key]["bash"]
l_vals = agg[key]["lush"]
b_avg = sum(b_vals) / len(b_vals) if b_vals else 0.0
l_avg = sum(l_vals) / len(l_vals) if l_vals else 0.0
diff = l_avg - b_avg
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
lines.append(f" {label}")
lines.append(f" bash {_bar(b_avg)} {b_avg:.1f}")
lines.append(f" lush {_bar(l_avg)} {l_avg:.1f} ({diff_str})")
# Overall average
all_bash = [v for key in agg for v in agg[key]["bash"]]
all_lush = [v for key in agg for v in agg[key]["lush"]]
b_overall = sum(all_bash) / len(all_bash) if all_bash else 0.0
l_overall = sum(all_lush) / len(all_lush) if all_lush else 0.0
diff = l_overall - b_overall
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
lines.append("")
lines.append(" " + "-" * 50)
lines.append(f" Overall average")
lines.append(f" bash {_bar(b_overall)} {b_overall:.1f}")
lines.append(f" lush {_bar(l_overall)} {l_overall:.1f} ({diff_str})")
lines.append("")
return "\n".join(lines)
def render_per_category_summary(results: list[BenchmarkResult]) -> str:
"""Render per-category breakdown: pass rates, avg turns, avg questionnaire scores."""
from collections import defaultdict
lines: list[str] = []
lines.append("=" * 78)
lines.append(" PER-CATEGORY SUMMARY")
lines.append("=" * 78)
lines.append("")
# Group by category
by_cat: dict[str, list[BenchmarkResult]] = defaultdict(list)
for r in results:
by_cat[r.category].append(r)
header = f" {'Category':<12s} {'Bash pass':>9s} {'Lush pass':>9s} {'B turns':>7s} {'L turns':>7s} {'B score':>7s} {'L score':>7s}"
lines.append(header)
lines.append(" " + "-" * 70)
for cat in sorted(by_cat):
cat_results = by_cat[cat]
b_passed = sum(1 for r in cat_results if r.bash_result and r.bash_result.all_passed)
l_passed = sum(1 for r in cat_results if r.lush_result and r.lush_result.all_passed)
b_total = sum(1 for r in cat_results if r.bash_result)
l_total = sum(1 for r in cat_results if r.lush_result)
b_turn_vals = [r.bash_result.agent_turns for r in cat_results if r.bash_result and r.bash_result.agent_turns > 0]
l_turn_vals = [r.lush_result.agent_turns for r in cat_results if r.lush_result and r.lush_result.agent_turns > 0]
b_turns_avg = sum(b_turn_vals) / len(b_turn_vals) if b_turn_vals else 0.0
l_turns_avg = sum(l_turn_vals) / len(l_turn_vals) if l_turn_vals else 0.0
# Avg questionnaire scores
b_scores: list[float] = []
l_scores: list[float] = []
for r in cat_results:
scores = _get_likert_scores(r)
for key in scores:
if scores[key]["bash"] is not None:
b_scores.append(scores[key]["bash"])
if scores[key]["lush"] is not None:
l_scores.append(scores[key]["lush"])
b_avg_score = sum(b_scores) / len(b_scores) if b_scores else 0.0
l_avg_score = sum(l_scores) / len(l_scores) if l_scores else 0.0
lines.append(
f" {cat:<12s} {b_passed}/{b_total:>2d} {l_passed}/{l_total:>2d} "
f"{b_turns_avg:>5.1f} {l_turns_avg:>5.1f} {b_avg_score:>5.1f} {l_avg_score:>5.1f}"
)
lines.append("")
return "\n".join(lines)
def render_per_category_questionnaire(results: list[BenchmarkResult]) -> str:
"""Render per-category Likert averages."""
from collections import defaultdict
lines: list[str] = []
lines.append("=" * 78)
lines.append(" PER-CATEGORY QUESTIONNAIRE AVERAGES")
lines.append("=" * 78)
lines.append("")
by_cat: dict[str, list[BenchmarkResult]] = defaultdict(list)
for r in results:
by_cat[r.category].append(r)
for cat in sorted(by_cat):
cat_results = by_cat[cat]
lines.append(f" {cat}")
agg: dict[str, dict[str, list[float]]] = {}
for key, _, _ in LIKERT_QUESTIONS:
agg[key] = {"bash": [], "lush": []}
for r in cat_results:
scores = _get_likert_scores(r)
for key in scores:
for lang in ("bash", "lush"):
val = scores[key][lang]
if val is not None:
agg[key][lang].append(val)
for key, label, _ in LIKERT_QUESTIONS:
b_vals = agg[key]["bash"]
l_vals = agg[key]["lush"]
b_avg = sum(b_vals) / len(b_vals) if b_vals else 0.0
l_avg = sum(l_vals) / len(l_vals) if l_vals else 0.0
diff = l_avg - b_avg
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
lines.append(f" {label:<22s} bash={b_avg:.1f} lush={l_avg:.1f} ({diff_str})")
lines.append("")
return "\n".join(lines)
def render_per_task_detail(results: list[BenchmarkResult]) -> str:
"""Render per-task questionnaire breakdown."""
lines: list[str] = []
lines.append("=" * 78)
lines.append(" PER-TASK DETAIL")
lines.append("=" * 78)
for r in results:
lines.append("")
b_status = "PASS" if r.bash_result and r.bash_result.all_passed else "FAIL"
l_status = "PASS" if r.lush_result and r.lush_result.all_passed else "FAIL"
lines.append(f" {r.task_name} [{r.category}/{r.mode}] bash={b_status} lush={l_status}")
lines.append("")
scores = _get_likert_scores(r)
lines.append(f" {'Metric':<22s} {'Bash':>4s} {'Lush':>4s} {'Diff':>5s}")
lines.append(" " + "-" * 40)
for key, label, _ in LIKERT_QUESTIONS:
b_val = scores[key]["bash"]
l_val = scores[key]["lush"]
b_str = f"{b_val:.0f}" if b_val is not None else "-"
l_str = f"{l_val:.0f}" if l_val is not None else "-"
if b_val is not None and l_val is not None:
d = l_val - b_val
d_str = f"+{d:.0f}" if d > 0 else f"{d:.0f}" if d < 0 else "0"
else:
d_str = "-"
lines.append(f" {label:<22s} {b_str:>4s} {l_str:>4s} {d_str:>5s}")
lines.append("")
return "\n".join(lines)
def render_report(results_dir: Path) -> str:
"""Generate full report."""
results = load_latest_results(results_dir)
if not results:
return "No results found."
parts = [
render_summary_table(results),
render_per_category_summary(results),
render_questionnaire_comparison(results),
render_per_category_questionnaire(results),
render_per_task_detail(results),
]
return "\n".join(parts)