- Replace 6 compound Likert questions with 12 atomic ones grouped by dimension (syntax, expressiveness, data/IO, errors, overall); drop free-form question. Responses now stored as ints, not strings. - Back-compat layer maps legacy keys to new dimensions so existing results still render. - Parallelize run-all with ThreadPoolExecutor (configurable workers) and add a thread-safe min-request-interval rate limiter to the Anthropic provider. - Add new tasks: path_normalizer, todo_manager, currency_converter, locale_weather_url, network_info_parser, url_normalizer.
342 lines
13 KiB
Python
342 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from .models import BenchmarkResult
|
|
|
|
# New 12-item question list: (key, label, dimension)
|
|
LIKERT_QUESTIONS = [
|
|
("syntax_clarity", "Syntax clarity", "Syntax & Readability"),
|
|
("signal_to_noise", "Signal-to-noise", "Syntax & Readability"),
|
|
("familiar_conventions", "Familiar conventions", "Syntax & Readability"),
|
|
("builtin_ops", "Built-in operations", "Expressiveness"),
|
|
("string_ops", "String operations", "Expressiveness"),
|
|
("composition", "Composition", "Expressiveness"),
|
|
("io_ergonomics", "I/O ergonomics", "Data & I/O"),
|
|
("data_structures", "Data structures", "Data & I/O"),
|
|
("error_model", "Error model", "Error Handling"),
|
|
("edge_case_support", "Edge case support", "Error Handling"),
|
|
("learnability", "Learnability", "Overall"),
|
|
("fitness", "Fitness for task", "Overall"),
|
|
]
|
|
|
|
# Map old 6 legacy keys to new keys for back-compat with existing results
|
|
LEGACY_KEY_MAP = {
|
|
"Readability": ["syntax_clarity", "signal_to_noise", "familiar_conventions"],
|
|
"Expressiveness": ["builtin_ops", "string_ops", "composition"],
|
|
"Conciseness": ["signal_to_noise"],
|
|
"Error handling": ["error_model", "edge_case_support"],
|
|
"Overall preference": ["fitness"],
|
|
"Learning curve": ["learnability"],
|
|
}
|
|
|
|
|
|
def load_latest_results(results_dir: Path) -> list[BenchmarkResult]:
|
|
"""Load results, keeping only the latest run per task name."""
|
|
latest: dict[str, BenchmarkResult] = {}
|
|
for d in sorted(results_dir.iterdir()):
|
|
result_file = d / "result.json"
|
|
if not result_file.exists():
|
|
continue
|
|
with open(result_file) as f:
|
|
r = BenchmarkResult.from_dict(json.load(f))
|
|
latest[r.task_name] = r
|
|
return sorted(latest.values(), key=lambda r: (r.category, r.task_name))
|
|
|
|
|
|
def _parse_likert(selected: str | int) -> int | None:
|
|
"""Extract numeric value from a likert response. Handles int directly or string like '4 - Agree'."""
|
|
if isinstance(selected, int):
|
|
return selected
|
|
s = str(selected).strip()
|
|
if s and s[0].isdigit():
|
|
return int(s[0])
|
|
return None
|
|
|
|
|
|
def _get_likert_scores(result: BenchmarkResult) -> dict[str, dict[str, float | None]]:
|
|
"""Extract likert scores per language. Returns {question_key: {bash: N, lush: N}}.
|
|
|
|
Handles both new-format results (exact id match) and legacy results (startswith match
|
|
mapped to new keys).
|
|
"""
|
|
scores: dict[str, dict[str, float | None]] = {}
|
|
for key, _, _ in LIKERT_QUESTIONS:
|
|
scores[key] = {"bash": None, "lush": None}
|
|
|
|
for lang_name, lang_result in [("bash", result.bash_result), ("lush", result.lush_result)]:
|
|
if not lang_result:
|
|
continue
|
|
for q in lang_result.questionnaire:
|
|
# Try exact match on new question ids
|
|
if q.question in scores:
|
|
val = _parse_likert(q.selected)
|
|
if val is not None:
|
|
scores[q.question][lang_name] = float(val)
|
|
continue
|
|
|
|
# Legacy: map old key to new keys (spread the score)
|
|
for legacy_prefix, new_keys in LEGACY_KEY_MAP.items():
|
|
if q.question.startswith(legacy_prefix):
|
|
val = _parse_likert(q.selected)
|
|
if val is not None:
|
|
for nk in new_keys:
|
|
if scores[nk][lang_name] is None:
|
|
scores[nk][lang_name] = float(val)
|
|
break
|
|
return scores
|
|
|
|
|
|
def _bar(value: float, max_val: float = 5.0, width: int = 20) -> str:
|
|
"""Render a small horizontal bar."""
|
|
filled = int(round(value / max_val * width))
|
|
return "\u2588" * filled + "\u2591" * (width - filled)
|
|
|
|
|
|
def render_summary_table(results: list[BenchmarkResult]) -> str:
|
|
"""Render the pass/fail + turns overview table."""
|
|
lines: list[str] = []
|
|
lines.append("")
|
|
lines.append("=" * 78)
|
|
lines.append(" BENCHMARK RESULTS SUMMARY")
|
|
lines.append("=" * 78)
|
|
lines.append("")
|
|
|
|
header = f" {'Task':<22s} {'Category':<12s} {'Mode':<8s} {'Bash':^14s} {'Lush':^14s}"
|
|
lines.append(header)
|
|
sub = f" {'':<22s} {'':<12s} {'':<8s} {'pass turns':^14s} {'pass turns':^14s}"
|
|
lines.append(sub)
|
|
lines.append(" " + "-" * 74)
|
|
|
|
for r in results:
|
|
b = r.bash_result
|
|
l = r.lush_result
|
|
b_pass = "PASS" if b and b.all_passed else "FAIL" if b else "-"
|
|
l_pass = "PASS" if l and l.all_passed else "FAIL" if l else "-"
|
|
b_turns = str(b.agent_turns) if b else "-"
|
|
l_turns = str(l.agent_turns) if l else "-"
|
|
lines.append(f" {r.task_name:<22s} {r.category:<12s} {r.mode:<8s} {b_pass:>4s} {b_turns:>5s} {l_pass:>4s} {l_turns:>5s}")
|
|
|
|
# Totals
|
|
b_passed = sum(1 for r in results if r.bash_result and r.bash_result.all_passed)
|
|
l_passed = sum(1 for r in results if r.lush_result and r.lush_result.all_passed)
|
|
b_total = sum(1 for r in results if r.bash_result)
|
|
l_total = sum(1 for r in results if r.lush_result)
|
|
b_turns_avg = 0.0
|
|
l_turns_avg = 0.0
|
|
b_turn_counts = [r.bash_result.agent_turns for r in results if r.bash_result and r.bash_result.agent_turns > 0]
|
|
l_turn_counts = [r.lush_result.agent_turns for r in results if r.lush_result and r.lush_result.agent_turns > 0]
|
|
if b_turn_counts:
|
|
b_turns_avg = sum(b_turn_counts) / len(b_turn_counts)
|
|
if l_turn_counts:
|
|
l_turns_avg = sum(l_turn_counts) / len(l_turn_counts)
|
|
|
|
lines.append(" " + "-" * 74)
|
|
lines.append(f" {'TOTAL':<22s} {'':<12s} {'':<8s} {b_passed}/{b_total:>2d} {b_turns_avg:>5.1f} {l_passed}/{l_total:>2d} {l_turns_avg:>5.1f}")
|
|
lines.append(f" {'':<22s} {'':<12s} {'':<8s} {'pass avg turns':^14s} {'pass avg turns':^14s}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_questionnaire_comparison(results: list[BenchmarkResult]) -> str:
|
|
"""Render aggregated questionnaire scores with bar charts, grouped by dimension."""
|
|
lines: list[str] = []
|
|
lines.append("=" * 78)
|
|
lines.append(" QUESTIONNAIRE SCORES (1-5 Likert, higher = better)")
|
|
lines.append("=" * 78)
|
|
lines.append("")
|
|
|
|
# Aggregate scores across all tasks
|
|
agg: dict[str, dict[str, list[float]]] = {}
|
|
for key, _, _ in LIKERT_QUESTIONS:
|
|
agg[key] = {"bash": [], "lush": []}
|
|
|
|
for r in results:
|
|
scores = _get_likert_scores(r)
|
|
for key in scores:
|
|
for lang in ("bash", "lush"):
|
|
val = scores[key][lang]
|
|
if val is not None:
|
|
agg[key][lang].append(val)
|
|
|
|
# Group by dimension
|
|
current_dim = None
|
|
for key, label, dimension in LIKERT_QUESTIONS:
|
|
if dimension != current_dim:
|
|
if current_dim is not None:
|
|
lines.append("")
|
|
lines.append(f" [{dimension}]")
|
|
current_dim = dimension
|
|
|
|
b_vals = agg[key]["bash"]
|
|
l_vals = agg[key]["lush"]
|
|
b_avg = sum(b_vals) / len(b_vals) if b_vals else 0.0
|
|
l_avg = sum(l_vals) / len(l_vals) if l_vals else 0.0
|
|
diff = l_avg - b_avg
|
|
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
|
|
|
|
lines.append(f" {label}")
|
|
lines.append(f" bash {_bar(b_avg)} {b_avg:.1f}")
|
|
lines.append(f" lush {_bar(l_avg)} {l_avg:.1f} ({diff_str})")
|
|
|
|
# Overall average
|
|
all_bash = [v for key in agg for v in agg[key]["bash"]]
|
|
all_lush = [v for key in agg for v in agg[key]["lush"]]
|
|
b_overall = sum(all_bash) / len(all_bash) if all_bash else 0.0
|
|
l_overall = sum(all_lush) / len(all_lush) if all_lush else 0.0
|
|
diff = l_overall - b_overall
|
|
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
|
|
|
|
lines.append("")
|
|
lines.append(" " + "-" * 50)
|
|
lines.append(f" Overall average")
|
|
lines.append(f" bash {_bar(b_overall)} {b_overall:.1f}")
|
|
lines.append(f" lush {_bar(l_overall)} {l_overall:.1f} ({diff_str})")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_per_category_summary(results: list[BenchmarkResult]) -> str:
|
|
"""Render per-category breakdown: pass rates, avg turns, avg questionnaire scores."""
|
|
from collections import defaultdict
|
|
|
|
lines: list[str] = []
|
|
lines.append("=" * 78)
|
|
lines.append(" PER-CATEGORY SUMMARY")
|
|
lines.append("=" * 78)
|
|
lines.append("")
|
|
|
|
# Group by category
|
|
by_cat: dict[str, list[BenchmarkResult]] = defaultdict(list)
|
|
for r in results:
|
|
by_cat[r.category].append(r)
|
|
|
|
header = f" {'Category':<12s} {'Bash pass':>9s} {'Lush pass':>9s} {'B turns':>7s} {'L turns':>7s} {'B score':>7s} {'L score':>7s}"
|
|
lines.append(header)
|
|
lines.append(" " + "-" * 70)
|
|
|
|
for cat in sorted(by_cat):
|
|
cat_results = by_cat[cat]
|
|
b_passed = sum(1 for r in cat_results if r.bash_result and r.bash_result.all_passed)
|
|
l_passed = sum(1 for r in cat_results if r.lush_result and r.lush_result.all_passed)
|
|
b_total = sum(1 for r in cat_results if r.bash_result)
|
|
l_total = sum(1 for r in cat_results if r.lush_result)
|
|
|
|
b_turn_vals = [r.bash_result.agent_turns for r in cat_results if r.bash_result and r.bash_result.agent_turns > 0]
|
|
l_turn_vals = [r.lush_result.agent_turns for r in cat_results if r.lush_result and r.lush_result.agent_turns > 0]
|
|
b_turns_avg = sum(b_turn_vals) / len(b_turn_vals) if b_turn_vals else 0.0
|
|
l_turns_avg = sum(l_turn_vals) / len(l_turn_vals) if l_turn_vals else 0.0
|
|
|
|
# Avg questionnaire scores
|
|
b_scores: list[float] = []
|
|
l_scores: list[float] = []
|
|
for r in cat_results:
|
|
scores = _get_likert_scores(r)
|
|
for key in scores:
|
|
if scores[key]["bash"] is not None:
|
|
b_scores.append(scores[key]["bash"])
|
|
if scores[key]["lush"] is not None:
|
|
l_scores.append(scores[key]["lush"])
|
|
b_avg_score = sum(b_scores) / len(b_scores) if b_scores else 0.0
|
|
l_avg_score = sum(l_scores) / len(l_scores) if l_scores else 0.0
|
|
|
|
lines.append(
|
|
f" {cat:<12s} {b_passed}/{b_total:>2d} {l_passed}/{l_total:>2d} "
|
|
f"{b_turns_avg:>5.1f} {l_turns_avg:>5.1f} {b_avg_score:>5.1f} {l_avg_score:>5.1f}"
|
|
)
|
|
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_per_category_questionnaire(results: list[BenchmarkResult]) -> str:
|
|
"""Render per-category Likert averages."""
|
|
from collections import defaultdict
|
|
|
|
lines: list[str] = []
|
|
lines.append("=" * 78)
|
|
lines.append(" PER-CATEGORY QUESTIONNAIRE AVERAGES")
|
|
lines.append("=" * 78)
|
|
lines.append("")
|
|
|
|
by_cat: dict[str, list[BenchmarkResult]] = defaultdict(list)
|
|
for r in results:
|
|
by_cat[r.category].append(r)
|
|
|
|
for cat in sorted(by_cat):
|
|
cat_results = by_cat[cat]
|
|
lines.append(f" {cat}")
|
|
|
|
agg: dict[str, dict[str, list[float]]] = {}
|
|
for key, _, _ in LIKERT_QUESTIONS:
|
|
agg[key] = {"bash": [], "lush": []}
|
|
for r in cat_results:
|
|
scores = _get_likert_scores(r)
|
|
for key in scores:
|
|
for lang in ("bash", "lush"):
|
|
val = scores[key][lang]
|
|
if val is not None:
|
|
agg[key][lang].append(val)
|
|
|
|
for key, label, _ in LIKERT_QUESTIONS:
|
|
b_vals = agg[key]["bash"]
|
|
l_vals = agg[key]["lush"]
|
|
b_avg = sum(b_vals) / len(b_vals) if b_vals else 0.0
|
|
l_avg = sum(l_vals) / len(l_vals) if l_vals else 0.0
|
|
diff = l_avg - b_avg
|
|
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
|
|
lines.append(f" {label:<22s} bash={b_avg:.1f} lush={l_avg:.1f} ({diff_str})")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_per_task_detail(results: list[BenchmarkResult]) -> str:
|
|
"""Render per-task questionnaire breakdown."""
|
|
lines: list[str] = []
|
|
lines.append("=" * 78)
|
|
lines.append(" PER-TASK DETAIL")
|
|
lines.append("=" * 78)
|
|
|
|
for r in results:
|
|
lines.append("")
|
|
b_status = "PASS" if r.bash_result and r.bash_result.all_passed else "FAIL"
|
|
l_status = "PASS" if r.lush_result and r.lush_result.all_passed else "FAIL"
|
|
lines.append(f" {r.task_name} [{r.category}/{r.mode}] bash={b_status} lush={l_status}")
|
|
lines.append("")
|
|
|
|
scores = _get_likert_scores(r)
|
|
lines.append(f" {'Metric':<22s} {'Bash':>4s} {'Lush':>4s} {'Diff':>5s}")
|
|
lines.append(" " + "-" * 40)
|
|
for key, label, _ in LIKERT_QUESTIONS:
|
|
b_val = scores[key]["bash"]
|
|
l_val = scores[key]["lush"]
|
|
b_str = f"{b_val:.0f}" if b_val is not None else "-"
|
|
l_str = f"{l_val:.0f}" if l_val is not None else "-"
|
|
if b_val is not None and l_val is not None:
|
|
d = l_val - b_val
|
|
d_str = f"+{d:.0f}" if d > 0 else f"{d:.0f}" if d < 0 else "0"
|
|
else:
|
|
d_str = "-"
|
|
lines.append(f" {label:<22s} {b_str:>4s} {l_str:>4s} {d_str:>5s}")
|
|
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_report(results_dir: Path) -> str:
|
|
"""Generate full report."""
|
|
results = load_latest_results(results_dir)
|
|
if not results:
|
|
return "No results found."
|
|
|
|
parts = [
|
|
render_summary_table(results),
|
|
render_per_category_summary(results),
|
|
render_questionnaire_comparison(results),
|
|
render_per_category_questionnaire(results),
|
|
render_per_task_detail(results),
|
|
]
|
|
return "\n".join(parts)
|