Initial commit: Lush vs Bash AI benchmarking framework

Benchmark harness that uses LLM agents to solve shell scripting tasks
in both Bash and Lush, then compares correctness and code quality.

- CLI with run, run-all, list-tasks, report, and export commands
- Agent loop with retry support via Anthropic Claude provider
- Test harness executing solutions in sandboxed subprocesses
- LLM-driven questionnaire for subjective code quality evaluation
- HTML report export with charts (matplotlib)
- 8 Category A tasks (write-from-scratch in both languages)
- 4 Category B tasks (verify provided Bash, convert to Lush)
- Lush language reference for agent context
This commit is contained in:
Cormac Shannon
2026-03-29 17:56:30 +01:00
commit be8d657b24
33 changed files with 3302 additions and 0 deletions

0
lush_bench/__init__.py Normal file
View File

164
lush_bench/agent.py Normal file
View File

@@ -0,0 +1,164 @@
from __future__ import annotations
import re
from pathlib import Path
from .config import Config
from .harness import evaluate
from .models import LanguageResult, Task, TestCase
from .providers.base import LLMProvider, Message
from .questionnaire import run_questionnaire
LUSH_REFERENCE_PATH = Path(__file__).parent.parent / "lush_reference.md"
def load_lush_reference() -> str:
return LUSH_REFERENCE_PATH.read_text()
def extract_code(response: str, language: str) -> str | None:
"""Extract the last fenced code block from the response."""
if language == "bash":
patterns = [r"```(?:bash|sh)\n(.*?)```", r"```\n(.*?)```"]
else:
patterns = [r"```(?:lua|lush)\n(.*?)```", r"```\n(.*?)```"]
for pattern in patterns:
matches = re.findall(pattern, response, re.DOTALL)
if matches:
return matches[-1].strip()
return None
def build_system_prompt(language: str) -> str:
base = (
"You are a skilled programmer. Write solutions that read from stdin and write to stdout. "
"Output ONLY the code in a single fenced code block. No explanations."
)
if language == "lush":
ref = load_lush_reference()
return f"{base}\n\nYou are writing in lush, a Lua-based shell language. Here is the language reference:\n\n{ref}"
return f"{base}\n\nYou are writing in bash."
def _describe_test_case(tc: TestCase, index: int) -> str:
"""Build a human-readable description of a test case for the agent."""
parts = [f"Test case {index}:"]
if tc.stdin:
parts.append(f" Input (stdin):\n{tc.stdin}")
if tc.env:
parts.append(f" Environment variables: {tc.env}")
if tc.setup_files:
for fname, content in tc.setup_files.items():
parts.append(f" File in working directory ({fname}):\n{content}")
if tc.expected_stdout:
parts.append(f" Expected stdout:\n{tc.expected_stdout}")
if tc.expected_files:
for fname, content in tc.expected_files.items():
parts.append(f" Expected file ({fname}):\n{content}")
return "\n".join(parts)
def build_task_prompt(task: Task, language: str) -> str:
prompt = f"Task: {task.name}\n\n{task.description}\n\n"
prompt += "Your script runs in an isolated working directory. "
prompt += "Any files listed as setup files will exist in that directory before your script runs.\n\n"
prompt += "Example test cases:\n"
for i, tc in enumerate(task.test_cases[:2]): # Show first 2 as examples
prompt += "\n" + _describe_test_case(tc, i) + "\n"
lang_label = "bash" if language == "bash" else "lua"
prompt += f"\nWrite the solution in a ```{lang_label} code block."
return prompt
def build_conversion_prompt(task: Task) -> str:
prompt = f"Task: {task.name}\n\n{task.description}\n\n"
prompt += f"Here is the bash source to convert to lush (Lua-based shell):\n\n```bash\n{task.bash_source}\n```\n\n"
prompt += "Example test cases:\n"
for i, tc in enumerate(task.test_cases[:2]):
prompt += f"\nInput:\n{tc.stdin}\nExpected output:\n{tc.expected_stdout}\n"
prompt += "\nConvert this to lush. Write the solution in a ```lua code block."
return prompt
def build_failure_feedback(test_results: list, task: Task) -> str:
lines = ["Your solution failed some test cases:\n"]
for tr in test_results:
if not tr.passed:
tc = task.test_cases[tr.test_case_index]
lines.append(f"Test case {tr.test_case_index}:")
lines.append(f" Input: {tc.stdin!r}")
lines.append(f" Expected stdout: {tc.expected_stdout!r}")
lines.append(f" Got stdout: {tr.actual_stdout!r}")
if tr.stderr:
lines.append(f" Stderr: {tr.stderr!r}")
if tc.env:
lines.append(f" Environment vars: {tc.env}")
if tc.setup_files:
lines.append(f" Files in working directory: {list(tc.setup_files.keys())}")
for fname, mismatch in tr.file_mismatches.items():
lines.append(f" File {fname!r}: expected {mismatch['expected']!r}, got {mismatch['actual']!r}")
lines.append("")
lines.append("Please fix your solution. Output ONLY the corrected code in a fenced code block.")
return "\n".join(lines)
def solve_task(
provider: LLMProvider,
task: Task,
language: str,
config: Config,
) -> LanguageResult:
"""Run the agent loop: prompt -> code -> test -> retry."""
system = build_system_prompt(language)
if task.category == "b" and language == "lush":
user_prompt = build_conversion_prompt(task)
else:
user_prompt = build_task_prompt(task, language)
messages: list[Message] = [Message(role="user", content=user_prompt)]
turns = 0
for attempt in range(1 + config.max_retries):
turns += 1
response = provider.send(messages, system=system)
messages.append(Message(role="assistant", content=response))
code = extract_code(response, language)
if code is None:
if attempt < config.max_retries:
feedback = "I couldn't find a code block in your response. Please provide your solution in a fenced code block."
messages.append(Message(role="user", content=feedback))
continue
return LanguageResult(
language=language,
solution_code="",
test_results=[],
all_passed=False,
agent_turns=turns,
)
test_results = evaluate(task, code, language, config)
all_passed = all(tr.passed for tr in test_results)
if all_passed or attempt == config.max_retries:
return LanguageResult(
language=language,
solution_code=code,
test_results=test_results,
all_passed=all_passed,
agent_turns=turns,
)
feedback = build_failure_feedback(test_results, task)
messages.append(Message(role="user", content=feedback))
# Should not reach here, but just in case
return LanguageResult(
language=language,
solution_code=code if code else "",
test_results=test_results if test_results else [],
all_passed=False,
agent_turns=turns,
)

39
lush_bench/config.py Normal file
View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass
class Config:
lush_binary: Path
max_retries: int = 3
timeout_seconds: float = 10.0
normalize_whitespace: bool = True
output_dir: Path = Path("results")
provider_configs: dict[str, dict[str, Any]] = field(default_factory=dict)
@classmethod
def load(cls, path: Path | None = None) -> Config:
if path is None:
path = Path(__file__).parent.parent / "config.toml"
raw = tomllib.loads(path.read_text())
lush = raw.get("lush", {})
agent = raw.get("agent", {})
results = raw.get("results", {})
# Collect provider configs (any top-level section not in known keys)
known_sections = {"lush", "agent", "results"}
provider_configs = {k: v for k, v in raw.items() if k not in known_sections and isinstance(v, dict)}
return cls(
lush_binary=Path(lush["binary"]),
max_retries=agent.get("max_retries", 3),
timeout_seconds=agent.get("timeout_seconds", 10.0),
normalize_whitespace=agent.get("normalize_whitespace", True),
output_dir=Path(results.get("output_dir", "results")),
provider_configs=provider_configs,
)

307
lush_bench/export.py Normal file
View File

@@ -0,0 +1,307 @@
from __future__ import annotations
import base64
import html
import io
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from .models import BenchmarkResult
from .report import (
LIKERT_QUESTIONS,
_get_freeform,
_get_likert_scores,
_parse_likert,
load_latest_results,
)
BASH_COLOR = "#4E79A7"
LUSH_COLOR = "#E15759"
NEUTRAL_COLOR = "#999999"
def _fig_to_base64(fig: plt.Figure) -> str:
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=150, bbox_inches="tight", facecolor="white")
plt.close(fig)
buf.seek(0)
return base64.b64encode(buf.read()).decode()
def _aggregate_likert(results: list[BenchmarkResult]) -> dict[str, dict[str, float]]:
"""Return {question_key: {bash: avg, lush: avg}}."""
agg: dict[str, dict[str, list[float]]] = {}
for key, _ in LIKERT_QUESTIONS:
agg[key] = {"bash": [], "lush": []}
for r in results:
scores = _get_likert_scores(r)
for key in scores:
for lang in ("bash", "lush"):
val = scores[key][lang]
if val is not None:
agg[key][lang].append(val)
return {
key: {
lang: (sum(vals) / len(vals)) if vals else 0.0
for lang, vals in agg[key].items()
}
for key in agg
}
def chart_questionnaire_comparison(results: list[BenchmarkResult]) -> str:
"""Grouped horizontal bar chart comparing bash vs lush on each Likert metric."""
avgs = _aggregate_likert(results)
labels = [label for _, label in LIKERT_QUESTIONS]
bash_vals = [avgs[key]["bash"] for key, _ in LIKERT_QUESTIONS]
lush_vals = [avgs[key]["lush"] for key, _ in LIKERT_QUESTIONS]
fig, ax = plt.subplots(figsize=(8, 4.5))
y = range(len(labels))
bar_h = 0.35
bars_bash = ax.barh([i + bar_h / 2 for i in y], bash_vals, bar_h, label="bash", color=BASH_COLOR)
bars_lush = ax.barh([i - bar_h / 2 for i in y], lush_vals, bar_h, label="lush", color=LUSH_COLOR)
ax.set_yticks(list(y))
ax.set_yticklabels(labels)
ax.set_xlim(0, 5.5)
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
ax.set_xlabel("Score (1-5)")
ax.set_title("Questionnaire Scores: Bash vs Lush")
ax.legend(loc="lower right")
ax.invert_yaxis()
for bar in bars_bash:
w = bar.get_width()
ax.text(w + 0.08, bar.get_y() + bar.get_height() / 2, f"{w:.1f}", va="center", fontsize=8)
for bar in bars_lush:
w = bar.get_width()
ax.text(w + 0.08, bar.get_y() + bar.get_height() / 2, f"{w:.1f}", va="center", fontsize=8)
ax.grid(axis="x", alpha=0.3)
return _fig_to_base64(fig)
def chart_turns_comparison(results: list[BenchmarkResult]) -> str:
"""Bar chart of agent turns per task for bash vs lush."""
# Only include tasks where the agent actually solved (turns > 0)
cat_a = [r for r in results if r.category == "a"]
names = [r.task_name for r in cat_a]
bash_turns = [r.bash_result.agent_turns if r.bash_result else 0 for r in cat_a]
lush_turns = [r.lush_result.agent_turns if r.lush_result else 0 for r in cat_a]
fig, ax = plt.subplots(figsize=(8, 4))
x = range(len(names))
bar_w = 0.35
ax.bar([i - bar_w / 2 for i in x], bash_turns, bar_w, label="bash", color=BASH_COLOR)
ax.bar([i + bar_w / 2 for i in x], lush_turns, bar_w, label="lush", color=LUSH_COLOR)
ax.set_xticks(list(x))
ax.set_xticklabels(names, rotation=35, ha="right", fontsize=8)
ax.set_ylabel("Agent Turns")
ax.set_title("Agent Turns to Solve (Category A)")
ax.yaxis.set_major_locator(ticker.MaxNLocator(integer=True))
ax.legend()
ax.grid(axis="y", alpha=0.3)
return _fig_to_base64(fig)
def chart_per_task_heatmap(results: list[BenchmarkResult]) -> str:
"""Heatmap showing lush-minus-bash score diff per task and metric."""
labels = [label for _, label in LIKERT_QUESTIONS]
tasks = [r.task_name for r in results]
data: list[list[float]] = []
for r in results:
scores = _get_likert_scores(r)
row = []
for key, _ in LIKERT_QUESTIONS:
b = scores[key]["bash"]
l = scores[key]["lush"]
if b is not None and l is not None:
row.append(l - b)
else:
row.append(0.0)
data.append(row)
fig, ax = plt.subplots(figsize=(8, max(4, len(tasks) * 0.45 + 1)))
im = ax.imshow(data, cmap="RdYlGn", aspect="auto", vmin=-3, vmax=3)
ax.set_xticks(range(len(labels)))
ax.set_xticklabels(labels, rotation=35, ha="right", fontsize=8)
ax.set_yticks(range(len(tasks)))
ax.set_yticklabels(tasks, fontsize=8)
for i in range(len(tasks)):
for j in range(len(labels)):
val = data[i][j]
text = f"+{val:.0f}" if val > 0 else f"{val:.0f}" if val < 0 else "0"
ax.text(j, i, text, ha="center", va="center", fontsize=8,
color="white" if abs(val) >= 2 else "black")
ax.set_title("Score Difference (Lush - Bash)")
fig.colorbar(im, ax=ax, shrink=0.8, label="Lush advantage")
return _fig_to_base64(fig)
def _build_summary_html(results: list[BenchmarkResult]) -> str:
rows = []
for r in results:
b = r.bash_result
l = r.lush_result
b_cls = "pass" if b and b.all_passed else "fail"
l_cls = "pass" if l and l.all_passed else "fail"
b_pass = "PASS" if b and b.all_passed else "FAIL"
l_pass = "PASS" if l and l.all_passed else "FAIL"
b_turns = str(b.agent_turns) if b else "-"
l_turns = str(l.agent_turns) if l else "-"
rows.append(f"""<tr>
<td>{html.escape(r.task_name)}</td><td>{r.category.upper()}</td>
<td class="{b_cls}">{b_pass}</td><td>{b_turns}</td>
<td class="{l_cls}">{l_pass}</td><td>{l_turns}</td>
</tr>""")
b_passed = sum(1 for r in results if r.bash_result and r.bash_result.all_passed)
l_passed = sum(1 for r in results if r.lush_result and r.lush_result.all_passed)
total = len(results)
return f"""<table>
<thead><tr>
<th>Task</th><th>Cat</th>
<th>Bash</th><th>Turns</th>
<th>Lush</th><th>Turns</th>
</tr></thead>
<tbody>{"".join(rows)}</tbody>
<tfoot><tr>
<td><strong>Total</strong></td><td></td>
<td><strong>{b_passed}/{total}</strong></td><td></td>
<td><strong>{l_passed}/{total}</strong></td><td></td>
</tr></tfoot>
</table>"""
def _build_detail_html(results: list[BenchmarkResult]) -> str:
sections = []
for r in results:
b_status = "PASS" if r.bash_result and r.bash_result.all_passed else "FAIL"
l_status = "PASS" if r.lush_result and r.lush_result.all_passed else "FAIL"
scores = _get_likert_scores(r)
score_rows = []
for key, label in LIKERT_QUESTIONS:
b_val = scores[key]["bash"]
l_val = scores[key]["lush"]
b_str = f"{b_val:.0f}" if b_val is not None else "-"
l_str = f"{l_val:.0f}" if l_val is not None else "-"
if b_val is not None and l_val is not None:
d = l_val - b_val
d_str = f"+{d:.0f}" if d > 0 else f"{d:.0f}"
d_cls = "pos" if d > 0 else "neg" if d < 0 else ""
else:
d_str = "-"
d_cls = ""
score_rows.append(f'<tr><td>{html.escape(label)}</td>'
f'<td>{b_str}</td><td>{l_str}</td>'
f'<td class="{d_cls}">{d_str}</td></tr>')
obs = _get_freeform(r)
obs_html = ""
for lang, text in obs.items():
obs_html += f'<p><strong>{lang}:</strong> {html.escape(text)}</p>\n'
sections.append(f"""
<div class="task-detail">
<h3>{html.escape(r.task_name)} <span class="cat">[{r.category}]</span>
<span class="{"pass" if b_status == "PASS" else "fail"}">bash={b_status}</span>
<span class="{"pass" if l_status == "PASS" else "fail"}">lush={l_status}</span>
</h3>
<table class="scores">
<thead><tr><th>Metric</th><th>Bash</th><th>Lush</th><th>Diff</th></tr></thead>
<tbody>{"".join(score_rows)}</tbody>
</table>
<div class="observations">{obs_html}</div>
</div>""")
return "\n".join(sections)
def export_html(results_dir: Path, output_path: Path) -> None:
results = load_latest_results(results_dir)
if not results:
output_path.write_text("<html><body><p>No results found.</p></body></html>")
return
chart_questionnaire = chart_questionnaire_comparison(results)
chart_turns = chart_turns_comparison(results)
chart_heatmap = chart_per_task_heatmap(results)
summary_table = _build_summary_html(results)
detail_html = _build_detail_html(results)
model = results[0].model if results else "unknown"
timestamp = max(r.timestamp for r in results)
page = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Lush vs Bash Benchmark Report</title>
<style>
:root {{ --bash: {BASH_COLOR}; --lush: {LUSH_COLOR}; }}
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
max-width: 960px; margin: 40px auto; padding: 0 20px; color: #1a1a1a; line-height: 1.5; }}
h1 {{ font-size: 1.8rem; margin-bottom: 4px; }}
h2 {{ font-size: 1.3rem; margin: 32px 0 16px; border-bottom: 2px solid #e0e0e0; padding-bottom: 6px; }}
h3 {{ font-size: 1.05rem; margin-bottom: 10px; }}
.meta {{ color: #666; font-size: 0.9rem; margin-bottom: 24px; }}
table {{ border-collapse: collapse; width: 100%; margin: 12px 0 20px; font-size: 0.9rem; }}
th, td {{ padding: 8px 12px; text-align: left; border-bottom: 1px solid #e0e0e0; }}
th {{ background: #f5f5f5; font-weight: 600; }}
td.pass {{ color: #2d8a4e; font-weight: 600; }}
td.fail {{ color: #d32f2f; font-weight: 600; }}
td.pos {{ color: #2d8a4e; }}
td.neg {{ color: #d32f2f; }}
tfoot td {{ font-weight: 600; border-top: 2px solid #333; }}
.chart {{ text-align: center; margin: 20px 0; }}
.chart img {{ max-width: 100%; height: auto; border: 1px solid #e0e0e0; border-radius: 4px; }}
.task-detail {{ margin: 20px 0 30px; padding: 16px; background: #fafafa; border-radius: 6px; border: 1px solid #e8e8e8; }}
.task-detail h3 {{ margin-bottom: 12px; }}
.task-detail .cat {{ color: #888; font-weight: normal; }}
.task-detail .pass {{ color: #2d8a4e; font-size: 0.85rem; margin-left: 8px; }}
.task-detail .fail {{ color: #d32f2f; font-size: 0.85rem; margin-left: 8px; }}
.scores {{ width: auto; }}
.scores td:nth-child(n+2) {{ text-align: center; min-width: 50px; }}
.scores th:nth-child(n+2) {{ text-align: center; }}
.observations {{ margin-top: 12px; font-size: 0.85rem; color: #444; }}
.observations p {{ margin-bottom: 6px; }}
</style>
</head>
<body>
<h1>Lush vs Bash Benchmark Report</h1>
<p class="meta">Model: {html.escape(model)} &middot; Latest run: {html.escape(timestamp)} &middot; Tasks: {len(results)}</p>
<h2>Summary</h2>
{summary_table}
<h2>Questionnaire Scores</h2>
<div class="chart"><img src="data:image/png;base64,{chart_questionnaire}" alt="Questionnaire comparison"></div>
<h2>Agent Turns (Category A)</h2>
<div class="chart"><img src="data:image/png;base64,{chart_turns}" alt="Turns comparison"></div>
<h2>Score Difference Heatmap (Lush - Bash)</h2>
<div class="chart"><img src="data:image/png;base64,{chart_heatmap}" alt="Score heatmap"></div>
<h2>Per-Task Detail</h2>
{detail_html}
</body>
</html>"""
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(page)

156
lush_bench/harness.py Normal file
View File

@@ -0,0 +1,156 @@
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
from .config import Config
from .models import RunOutput, Task, TestCase, TestResult
# Minimal base env — keeps scripts deterministic
BASE_ENV_KEYS = {"PATH", "HOME", "USER", "LANG", "TERM", "TMPDIR"}
def _build_env(test_case: TestCase) -> dict[str, str]:
"""Build a controlled environment: base host vars + test-specific vars."""
env = {k: v for k, v in os.environ.items() if k in BASE_ENV_KEYS}
env.update(test_case.env)
return env
def run_script(
command: list[str],
script: Path,
stdin: str,
timeout: float,
cwd: Path,
env: dict[str, str],
) -> RunOutput:
start = time.monotonic()
try:
result = subprocess.run(
[*command, str(script)],
input=stdin,
capture_output=True,
text=True,
timeout=timeout,
cwd=cwd,
env=env,
)
elapsed_ms = (time.monotonic() - start) * 1000
return RunOutput(
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.returncode,
runtime_ms=elapsed_ms,
)
except subprocess.TimeoutExpired:
elapsed_ms = (time.monotonic() - start) * 1000
return RunOutput(
stdout="",
stderr="Timeout exceeded",
exit_code=-1,
runtime_ms=elapsed_ms,
)
def normalize(s: str) -> str:
return s.strip()
def _setup_sandbox(tc: TestCase) -> Path:
"""Create a temp directory and populate it with setup files."""
sandbox = Path(tempfile.mkdtemp(prefix="lush_bench_"))
for filename, content in tc.setup_files.items():
filepath = sandbox / filename
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(content)
return sandbox
def _check_expected_files(
sandbox: Path,
tc: TestCase,
do_normalize: bool,
) -> dict[str, dict[str, str]]:
"""Compare expected files against sandbox contents. Returns mismatches."""
mismatches: dict[str, dict[str, str]] = {}
for filename, expected_content in tc.expected_files.items():
filepath = sandbox / filename
if not filepath.exists():
mismatches[filename] = {
"expected": expected_content,
"actual": "<file not found>",
}
continue
actual_content = filepath.read_text()
expected = expected_content
actual = actual_content
if do_normalize:
expected = normalize(expected)
actual = normalize(actual)
if actual != expected:
mismatches[filename] = {
"expected": expected_content,
"actual": actual_content,
}
return mismatches
def evaluate(
task: Task,
code: str,
language: str,
config: Config,
) -> list[TestResult]:
suffix = ".sh" if language == "bash" else ".lua"
results: list[TestResult] = []
for i, tc in enumerate(task.test_cases):
sandbox = _setup_sandbox(tc)
try:
# Write script into the sandbox
script_path = sandbox / f"solution{suffix}"
script_path.write_text(code)
env = _build_env(tc)
if language == "bash":
command = ["bash"]
else:
command = [str(config.lush_binary)]
output = run_script(
command, script_path, tc.stdin, config.timeout_seconds, sandbox, env
)
actual = output.stdout
expected = tc.expected_stdout
if config.normalize_whitespace:
actual = normalize(actual)
expected = normalize(expected)
stdout_ok = actual == expected
file_mismatches = _check_expected_files(
sandbox, tc, config.normalize_whitespace
)
passed = stdout_ok and not file_mismatches
results.append(
TestResult(
test_case_index=i,
passed=passed,
actual_stdout=output.stdout,
expected_stdout=tc.expected_stdout,
stderr=output.stderr,
exit_code=output.exit_code,
file_mismatches=file_mismatches,
)
)
finally:
shutil.rmtree(sandbox, ignore_errors=True)
return results

210
lush_bench/models.py Normal file
View File

@@ -0,0 +1,210 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class TestCase:
stdin: str
expected_stdout: str
env: dict[str, str] = field(default_factory=dict)
setup_files: dict[str, str] = field(default_factory=dict)
expected_files: dict[str, str] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {"stdin": self.stdin, "expected_stdout": self.expected_stdout}
if self.env:
d["env"] = self.env
if self.setup_files:
d["setup_files"] = self.setup_files
if self.expected_files:
d["expected_files"] = self.expected_files
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> TestCase:
return cls(
stdin=d["stdin"],
expected_stdout=d["expected_stdout"],
env=d.get("env", {}),
setup_files=d.get("setup_files", {}),
expected_files=d.get("expected_files", {}),
)
@dataclass
class Task:
name: str
category: str # "a" or "b"
description: str
test_cases: list[TestCase]
bash_source: str | None = None # category B only
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"name": self.name,
"category": self.category,
"description": self.description,
"test_cases": [tc.to_dict() for tc in self.test_cases],
}
if self.bash_source is not None:
d["bash_source"] = self.bash_source
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> Task:
return cls(
name=d["name"],
category=d["category"],
description=d["description"],
test_cases=[TestCase.from_dict(tc) for tc in d["test_cases"]],
bash_source=d.get("bash_source"),
)
@dataclass
class RunOutput:
stdout: str
stderr: str
exit_code: int
runtime_ms: float
def to_dict(self) -> dict[str, Any]:
return {
"stdout": self.stdout,
"stderr": self.stderr,
"exit_code": self.exit_code,
"runtime_ms": self.runtime_ms,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> RunOutput:
return cls(
stdout=d["stdout"],
stderr=d["stderr"],
exit_code=d["exit_code"],
runtime_ms=d["runtime_ms"],
)
@dataclass
class TestResult:
test_case_index: int
passed: bool
actual_stdout: str
expected_stdout: str
stderr: str
exit_code: int
file_mismatches: dict[str, dict[str, str]] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"test_case_index": self.test_case_index,
"passed": self.passed,
"actual_stdout": self.actual_stdout,
"expected_stdout": self.expected_stdout,
"stderr": self.stderr,
"exit_code": self.exit_code,
}
if self.file_mismatches:
d["file_mismatches"] = self.file_mismatches
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> TestResult:
return cls(
test_case_index=d["test_case_index"],
passed=d["passed"],
actual_stdout=d["actual_stdout"],
expected_stdout=d["expected_stdout"],
stderr=d["stderr"],
exit_code=d["exit_code"],
file_mismatches=d.get("file_mismatches", {}),
)
@dataclass
class LanguageResult:
language: str
solution_code: str
test_results: list[TestResult]
all_passed: bool
agent_turns: int
questionnaire: list[QuestionnaireResponse] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"language": self.language,
"solution_code": self.solution_code,
"test_results": [tr.to_dict() for tr in self.test_results],
"all_passed": self.all_passed,
"agent_turns": self.agent_turns,
"questionnaire": [q.to_dict() for q in self.questionnaire],
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> LanguageResult:
return cls(
language=d["language"],
solution_code=d["solution_code"],
test_results=[TestResult.from_dict(tr) for tr in d["test_results"]],
all_passed=d["all_passed"],
agent_turns=d["agent_turns"],
questionnaire=[QuestionnaireResponse.from_dict(q) for q in d.get("questionnaire", [])],
)
@dataclass
class QuestionnaireResponse:
question: str
selected: str | int
choices: list[str] | None = None
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {"question": self.question, "selected": self.selected}
if self.choices is not None:
d["choices"] = self.choices
return d
@classmethod
def from_dict(cls, d: dict[str, Any]) -> QuestionnaireResponse:
return cls(
question=d["question"],
selected=d["selected"],
choices=d.get("choices"),
)
@dataclass
class BenchmarkResult:
task_name: str
category: str
provider: str
model: str
timestamp: str
bash_result: LanguageResult | None
lush_result: LanguageResult | None
def to_dict(self) -> dict[str, Any]:
return {
"task_name": self.task_name,
"category": self.category,
"provider": self.provider,
"model": self.model,
"timestamp": self.timestamp,
"bash_result": self.bash_result.to_dict() if self.bash_result else None,
"lush_result": self.lush_result.to_dict() if self.lush_result else None,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> BenchmarkResult:
return cls(
task_name=d["task_name"],
category=d["category"],
provider=d["provider"],
model=d["model"],
timestamp=d["timestamp"],
bash_result=LanguageResult.from_dict(d["bash_result"]) if d.get("bash_result") else None,
lush_result=LanguageResult.from_dict(d["lush_result"]) if d.get("lush_result") else None,
)

View File

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import os
from typing import Any
import anthropic
from .base import Message
class AnthropicProvider:
def __init__(self, config: dict[str, Any]) -> None:
api_key_env = config.get("api_key_env", "ANTHROPIC_API_KEY")
api_key = os.environ.get(api_key_env)
if not api_key:
raise RuntimeError(f"Set {api_key_env} environment variable")
self._client = anthropic.Anthropic(api_key=api_key)
self._model = config.get("model", "claude-sonnet-4-20250514")
self._max_tokens = config.get("max_tokens", 4096)
def send(self, messages: list[Message], system: str = "") -> str:
api_messages = [{"role": m.role, "content": m.content} for m in messages]
kwargs: dict[str, Any] = {
"model": self._model,
"max_tokens": self._max_tokens,
"messages": api_messages,
}
if system:
kwargs["system"] = system
response = self._client.messages.create(**kwargs)
return response.content[0].text
@property
def model_name(self) -> str:
return self._model

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
@dataclass
class Message:
role: str # "user" or "assistant"
content: str
class LLMProvider(Protocol):
def send(self, messages: list[Message], system: str = "") -> str: ...
@property
def model_name(self) -> str: ...

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
import json
import re
from .models import QuestionnaireResponse
from .providers.base import LLMProvider, Message
QUESTIONS = [
{
"question": "Readability: The solution is easy to read and understand",
"choices": ["1 - Strongly disagree", "2 - Disagree", "3 - Neutral", "4 - Agree", "5 - Strongly agree"],
},
{
"question": "Expressiveness: The language provided sufficient constructs to solve the problem naturally",
"choices": ["1 - Strongly disagree", "2 - Disagree", "3 - Neutral", "4 - Agree", "5 - Strongly agree"],
},
{
"question": "Conciseness: The solution required minimal boilerplate",
"choices": ["1 - Strongly disagree", "2 - Disagree", "3 - Neutral", "4 - Agree", "5 - Strongly agree"],
},
{
"question": "Error handling: Error handling was straightforward",
"choices": ["1 - Strongly disagree", "2 - Disagree", "3 - Neutral", "4 - Agree", "5 - Strongly agree"],
},
{
"question": "Overall preference: I would prefer this language for similar tasks",
"choices": ["1 - Strongly disagree", "2 - Disagree", "3 - Neutral", "4 - Agree", "5 - Strongly agree"],
},
{
"question": "Learning curve: An unfamiliar developer could understand the solution quickly",
"choices": ["1 - Strongly disagree", "2 - Disagree", "3 - Neutral", "4 - Agree", "5 - Strongly agree"],
},
]
def build_questionnaire_prompt(
task_name: str,
language: str,
solution_code: str,
) -> str:
questions_text = ""
for i, q in enumerate(QUESTIONS, 1):
choices_str = ", ".join(f'"{c}"' for c in q["choices"])
questions_text += f' {{"question": "{q["question"]}", "choices": [{choices_str}], "selected": <your choice>}},\n'
return f"""You just solved the task "{task_name}" in {language}. Here is your solution:
```
{solution_code}
```
Please evaluate your experience by answering the following questionnaire. Respond with ONLY a JSON array — no other text.
[
{questions_text} {{"question": "Free-form observation about using {language} for this task", "selected": "<your observation>"}}
]"""
def parse_questionnaire_response(response: str) -> list[QuestionnaireResponse]:
# Try to extract JSON array from response
json_match = re.search(r"\[.*\]", response, re.DOTALL)
if not json_match:
return [QuestionnaireResponse(question="raw_response", selected=response)]
try:
data = json.loads(json_match.group())
except json.JSONDecodeError:
return [QuestionnaireResponse(question="raw_response", selected=response)]
results = []
for item in data:
results.append(
QuestionnaireResponse(
question=item.get("question", ""),
selected=item.get("selected", ""),
choices=item.get("choices"),
)
)
return results
def run_questionnaire(
provider: LLMProvider,
task_name: str,
language: str,
solution_code: str,
) -> list[QuestionnaireResponse]:
prompt = build_questionnaire_prompt(task_name, language, solution_code)
response = provider.send([Message(role="user", content=prompt)])
return parse_questionnaire_response(response)

228
lush_bench/report.py Normal file
View File

@@ -0,0 +1,228 @@
from __future__ import annotations
import json
from pathlib import Path
from .models import BenchmarkResult
# Likert questions in order (must match questionnaire.py QUESTIONS)
LIKERT_QUESTIONS = [
("Readability", "Readability"),
("Expressiveness", "Expressiveness"),
("Conciseness", "Conciseness"),
("Error handling", "Error handling"),
("Overall preference", "Overall preference"),
("Learning curve", "Learning curve"),
]
def load_latest_results(results_dir: Path) -> list[BenchmarkResult]:
"""Load results, keeping only the latest run per task name."""
latest: dict[str, BenchmarkResult] = {}
for d in sorted(results_dir.iterdir()):
result_file = d / "result.json"
if not result_file.exists():
continue
with open(result_file) as f:
r = BenchmarkResult.from_dict(json.load(f))
latest[r.task_name] = r
return sorted(latest.values(), key=lambda r: (r.category, r.task_name))
def _parse_likert(selected: str | int) -> int | None:
"""Extract numeric value from a likert response like '4 - Agree'."""
if isinstance(selected, int):
return selected
s = str(selected).strip()
if s and s[0].isdigit():
return int(s[0])
return None
def _get_likert_scores(result: BenchmarkResult) -> dict[str, dict[str, float | None]]:
"""Extract likert scores per language. Returns {question_key: {bash: N, lush: N}}."""
scores: dict[str, dict[str, float | None]] = {}
for key, _ in LIKERT_QUESTIONS:
scores[key] = {"bash": None, "lush": None}
for lang_name, lang_result in [("bash", result.bash_result), ("lush", result.lush_result)]:
if not lang_result:
continue
for q in lang_result.questionnaire:
for key, _ in LIKERT_QUESTIONS:
if q.question.startswith(key):
val = _parse_likert(q.selected)
if val is not None:
scores[key][lang_name] = float(val)
break
return scores
def _bar(value: float, max_val: float = 5.0, width: int = 20) -> str:
"""Render a small horizontal bar."""
filled = int(round(value / max_val * width))
return "\u2588" * filled + "\u2591" * (width - filled)
def _get_freeform(result: BenchmarkResult) -> dict[str, str]:
"""Extract free-form observations per language."""
obs: dict[str, str] = {}
for lang_name, lang_result in [("bash", result.bash_result), ("lush", result.lush_result)]:
if not lang_result:
continue
for q in lang_result.questionnaire:
if q.question.startswith("Free-form"):
obs[lang_name] = str(q.selected)
break
return obs
def render_summary_table(results: list[BenchmarkResult]) -> str:
"""Render the pass/fail + turns overview table."""
lines: list[str] = []
lines.append("")
lines.append("=" * 78)
lines.append(" BENCHMARK RESULTS SUMMARY")
lines.append("=" * 78)
lines.append("")
header = f" {'Task':<22s} {'Cat':>3s} {'Bash':^14s} {'Lush':^14s}"
lines.append(header)
sub = f" {'':<22s} {'':>3s} {'pass turns':^14s} {'pass turns':^14s}"
lines.append(sub)
lines.append(" " + "-" * 60)
for r in results:
b = r.bash_result
l = r.lush_result
b_pass = "PASS" if b and b.all_passed else "FAIL" if b else "-"
l_pass = "PASS" if l and l.all_passed else "FAIL" if l else "-"
b_turns = str(b.agent_turns) if b else "-"
l_turns = str(l.agent_turns) if l else "-"
lines.append(f" {r.task_name:<22s} [{r.category}] {b_pass:>4s} {b_turns:>5s} {l_pass:>4s} {l_turns:>5s}")
# Totals
b_passed = sum(1 for r in results if r.bash_result and r.bash_result.all_passed)
l_passed = sum(1 for r in results if r.lush_result and r.lush_result.all_passed)
b_total = sum(1 for r in results if r.bash_result)
l_total = sum(1 for r in results if r.lush_result)
b_turns_avg = 0.0
l_turns_avg = 0.0
b_turn_counts = [r.bash_result.agent_turns for r in results if r.bash_result and r.bash_result.agent_turns > 0]
l_turn_counts = [r.lush_result.agent_turns for r in results if r.lush_result and r.lush_result.agent_turns > 0]
if b_turn_counts:
b_turns_avg = sum(b_turn_counts) / len(b_turn_counts)
if l_turn_counts:
l_turns_avg = sum(l_turn_counts) / len(l_turn_counts)
lines.append(" " + "-" * 60)
lines.append(f" {'TOTAL':<22s} {b_passed}/{b_total:>2d} {b_turns_avg:>5.1f} {l_passed}/{l_total:>2d} {l_turns_avg:>5.1f}")
lines.append(f" {'':27s}{'pass avg turns':^14s} {'pass avg turns':^14s}")
lines.append("")
return "\n".join(lines)
def render_questionnaire_comparison(results: list[BenchmarkResult]) -> str:
"""Render aggregated questionnaire scores with bar charts."""
lines: list[str] = []
lines.append("=" * 78)
lines.append(" QUESTIONNAIRE SCORES (1-5 Likert, higher = better)")
lines.append("=" * 78)
lines.append("")
# Aggregate scores across all tasks
agg: dict[str, dict[str, list[float]]] = {}
for key, _ in LIKERT_QUESTIONS:
agg[key] = {"bash": [], "lush": []}
for r in results:
scores = _get_likert_scores(r)
for key in scores:
for lang in ("bash", "lush"):
val = scores[key][lang]
if val is not None:
agg[key][lang].append(val)
for key, label in LIKERT_QUESTIONS:
b_vals = agg[key]["bash"]
l_vals = agg[key]["lush"]
b_avg = sum(b_vals) / len(b_vals) if b_vals else 0.0
l_avg = sum(l_vals) / len(l_vals) if l_vals else 0.0
diff = l_avg - b_avg
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
lines.append(f" {label}")
lines.append(f" bash {_bar(b_avg)} {b_avg:.1f}")
lines.append(f" lush {_bar(l_avg)} {l_avg:.1f} ({diff_str})")
lines.append("")
# Overall average
all_bash = [v for key in agg for v in agg[key]["bash"]]
all_lush = [v for key in agg for v in agg[key]["lush"]]
b_overall = sum(all_bash) / len(all_bash) if all_bash else 0.0
l_overall = sum(all_lush) / len(all_lush) if all_lush else 0.0
diff = l_overall - b_overall
diff_str = f"+{diff:.1f}" if diff > 0 else f"{diff:.1f}" if diff < 0 else " 0.0"
lines.append(" " + "-" * 50)
lines.append(f" Overall average")
lines.append(f" bash {_bar(b_overall)} {b_overall:.1f}")
lines.append(f" lush {_bar(l_overall)} {l_overall:.1f} ({diff_str})")
lines.append("")
return "\n".join(lines)
def render_per_task_detail(results: list[BenchmarkResult]) -> str:
"""Render per-task questionnaire breakdown."""
lines: list[str] = []
lines.append("=" * 78)
lines.append(" PER-TASK DETAIL")
lines.append("=" * 78)
for r in results:
lines.append("")
b_status = "PASS" if r.bash_result and r.bash_result.all_passed else "FAIL"
l_status = "PASS" if r.lush_result and r.lush_result.all_passed else "FAIL"
lines.append(f" {r.task_name} [{r.category}] bash={b_status} lush={l_status}")
lines.append("")
scores = _get_likert_scores(r)
lines.append(f" {'Metric':<22s} {'Bash':>4s} {'Lush':>4s} {'Diff':>5s}")
lines.append(" " + "-" * 40)
for key, label in LIKERT_QUESTIONS:
b_val = scores[key]["bash"]
l_val = scores[key]["lush"]
b_str = f"{b_val:.0f}" if b_val is not None else "-"
l_str = f"{l_val:.0f}" if l_val is not None else "-"
if b_val is not None and l_val is not None:
d = l_val - b_val
d_str = f"+{d:.0f}" if d > 0 else f"{d:.0f}" if d < 0 else "0"
else:
d_str = "-"
lines.append(f" {label:<22s} {b_str:>4s} {l_str:>4s} {d_str:>5s}")
# Free-form observations
obs = _get_freeform(r)
if obs:
lines.append("")
for lang, text in obs.items():
# Wrap long text
wrapped = text[:120] + ("..." if len(text) > 120 else "")
lines.append(f" {lang}: {wrapped}")
lines.append("")
return "\n".join(lines)
def render_report(results_dir: Path) -> str:
"""Generate full report."""
results = load_latest_results(results_dir)
if not results:
return "No results found."
parts = [
render_summary_table(results),
render_questionnaire_comparison(results),
render_per_task_detail(results),
]
return "\n".join(parts)

29
lush_bench/results.py Normal file
View File

@@ -0,0 +1,29 @@
from __future__ import annotations
import json
from pathlib import Path
from .models import BenchmarkResult
def save_result(result: BenchmarkResult, output_dir: Path) -> Path:
dir_name = f"{result.timestamp}_{result.task_name}_{result.provider}"
result_dir = output_dir / dir_name
result_dir.mkdir(parents=True, exist_ok=True)
# Save JSON
with open(result_dir / "result.json", "w") as f:
json.dump(result.to_dict(), f, indent=2)
# Save solution files
if result.bash_result and result.bash_result.solution_code:
(result_dir / "solution.sh").write_text(result.bash_result.solution_code)
if result.lush_result and result.lush_result.solution_code:
(result_dir / "solution.lua").write_text(result.lush_result.solution_code)
return result_dir
def load_result(path: Path) -> BenchmarkResult:
with open(path / "result.json") as f:
return BenchmarkResult.from_dict(json.load(f))