- Replace 6 compound Likert questions with 12 atomic ones grouped by dimension (syntax, expressiveness, data/IO, errors, overall); drop free-form question. Responses now stored as ints, not strings. - Back-compat layer maps legacy keys to new dimensions so existing results still render. - Parallelize run-all with ThreadPoolExecutor (configurable workers) and add a thread-safe min-request-interval rate limiter to the Anthropic provider. - Add new tasks: path_normalizer, todo_manager, currency_converter, locale_weather_url, network_info_parser, url_normalizer.
255 lines
9.0 KiB
Python
255 lines
9.0 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
import threading
|
|
import tomllib
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from lush_bench.agent import solve_task
|
|
from lush_bench.config import Config
|
|
from lush_bench.harness import evaluate
|
|
from lush_bench.models import BenchmarkResult, LanguageResult, Task, TestCase
|
|
from lush_bench.providers.anthropic import AnthropicProvider
|
|
from lush_bench.questionnaire import run_questionnaire
|
|
from lush_bench.export import export_html
|
|
from lush_bench.report import render_report
|
|
from lush_bench.results import save_result
|
|
|
|
_print_lock = threading.Lock()
|
|
|
|
|
|
PROVIDERS = {
|
|
"anthropic": AnthropicProvider,
|
|
}
|
|
|
|
|
|
def load_task(path: Path) -> Task:
|
|
raw = tomllib.loads(path.read_text())
|
|
test_cases = [
|
|
TestCase(
|
|
stdin=tc.get("stdin", ""),
|
|
expected_stdout=tc.get("expected_stdout", ""),
|
|
env=tc.get("env", {}),
|
|
setup_files=tc.get("setup_files", {}),
|
|
expected_files=tc.get("expected_files", {}),
|
|
)
|
|
for tc in raw["test_cases"]
|
|
]
|
|
return Task(
|
|
name=raw["name"],
|
|
category=raw["category"],
|
|
description=raw["description"],
|
|
test_cases=test_cases,
|
|
mode=raw.get("mode", "solve"),
|
|
bash_source=raw.get("bash_source"),
|
|
)
|
|
|
|
|
|
def find_tasks(category: str | None = None, mode: str | None = None) -> list[Path]:
|
|
tasks_dir = Path(__file__).parent / "tasks"
|
|
paths = []
|
|
if category:
|
|
cat_dir = tasks_dir / category
|
|
if cat_dir.exists():
|
|
paths = sorted(cat_dir.glob("*.toml"))
|
|
else:
|
|
for cat_dir in sorted(tasks_dir.iterdir()):
|
|
if cat_dir.is_dir():
|
|
paths.extend(sorted(cat_dir.glob("*.toml")))
|
|
if mode:
|
|
paths = [p for p in paths if load_task(p).mode == mode]
|
|
return paths
|
|
|
|
|
|
def cmd_list_tasks(args: argparse.Namespace) -> None:
|
|
paths = find_tasks(args.category, getattr(args, "mode", None))
|
|
if not paths:
|
|
print("No tasks found.")
|
|
return
|
|
for p in paths:
|
|
task = load_task(p)
|
|
print(f" [{task.category:<12s} {task.mode:<7s}] {task.name:20s} {p.relative_to(Path.cwd())}")
|
|
|
|
|
|
def _log(msg: str) -> None:
|
|
"""Thread-safe print."""
|
|
with _print_lock:
|
|
print(msg)
|
|
|
|
|
|
def _run_task(
|
|
task_path: Path,
|
|
provider_name: str,
|
|
config: Config,
|
|
provider: AnthropicProvider | None = None,
|
|
) -> BenchmarkResult:
|
|
"""Core task runner. Thread-safe — usable from cmd_run or a thread pool."""
|
|
task = load_task(task_path)
|
|
|
|
if provider is None:
|
|
provider_config = config.provider_configs.get(provider_name, {})
|
|
provider = PROVIDERS[provider_name](provider_config)
|
|
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
|
|
_log(f"Running task: {task.name} ({task.category}/{task.mode}) with {provider.model_name}")
|
|
|
|
bash_result = None
|
|
lush_result = None
|
|
|
|
if task.mode == "solve":
|
|
_log(f" [{task.name}] Solving in bash...")
|
|
bash_result = solve_task(provider, task, "bash", config)
|
|
_log(f" [{task.name}] Bash: {'PASS' if bash_result.all_passed else 'FAIL'} ({bash_result.agent_turns} turns)")
|
|
|
|
_log(f" [{task.name}] Solving in lush...")
|
|
lush_result = solve_task(provider, task, "lush", config)
|
|
_log(f" [{task.name}] Lush: {'PASS' if lush_result.all_passed else 'FAIL'} ({lush_result.agent_turns} turns)")
|
|
|
|
elif task.mode == "convert":
|
|
assert task.bash_source, f"Convert-mode task {task.name} missing bash_source"
|
|
_log(f" [{task.name}] Verifying provided bash source...")
|
|
test_results = evaluate(task, task.bash_source, "bash", config)
|
|
all_passed = all(tr.passed for tr in test_results)
|
|
bash_result = LanguageResult(
|
|
language="bash",
|
|
solution_code=task.bash_source,
|
|
test_results=test_results,
|
|
all_passed=all_passed,
|
|
agent_turns=0,
|
|
)
|
|
_log(f" [{task.name}] Bash: {'PASS' if all_passed else 'FAIL'}")
|
|
|
|
_log(f" [{task.name}] Converting to lush...")
|
|
lush_result = solve_task(provider, task, "lush", config)
|
|
_log(f" [{task.name}] Lush: {'PASS' if lush_result.all_passed else 'FAIL'} ({lush_result.agent_turns} turns)")
|
|
|
|
# Run questionnaire for each completed language
|
|
for lang, result in [("bash", bash_result), ("lush", lush_result)]:
|
|
if result and result.solution_code:
|
|
_log(f" [{task.name}] Questionnaire for {lang}...")
|
|
result.questionnaire = run_questionnaire(provider, task.name, lang, result.solution_code)
|
|
|
|
benchmark = BenchmarkResult(
|
|
task_name=task.name,
|
|
category=task.category,
|
|
mode=task.mode,
|
|
provider=provider_name,
|
|
model=provider.model_name,
|
|
timestamp=timestamp,
|
|
bash_result=bash_result,
|
|
lush_result=lush_result,
|
|
)
|
|
|
|
result_dir = save_result(benchmark, config.output_dir)
|
|
_log(f" [{task.name}] Results saved to {result_dir}")
|
|
return benchmark
|
|
|
|
|
|
def cmd_run(args: argparse.Namespace) -> None:
|
|
config = Config.load()
|
|
|
|
provider_name = args.provider
|
|
if provider_name not in PROVIDERS:
|
|
print(f"Unknown provider: {provider_name}. Available: {', '.join(PROVIDERS)}")
|
|
sys.exit(1)
|
|
|
|
provider_config = config.provider_configs.get(provider_name, {})
|
|
provider = PROVIDERS[provider_name](provider_config)
|
|
_run_task(Path(args.task), provider_name, config, provider)
|
|
|
|
|
|
def cmd_run_all(args: argparse.Namespace) -> None:
|
|
config = Config.load()
|
|
paths = find_tasks(args.category, getattr(args, "mode", None))
|
|
if not paths:
|
|
print("No tasks found.")
|
|
return
|
|
|
|
provider_name = args.provider
|
|
if provider_name not in PROVIDERS:
|
|
print(f"Unknown provider: {provider_name}. Available: {', '.join(PROVIDERS)}")
|
|
sys.exit(1)
|
|
|
|
# Share one provider instance across threads (its rate limiter is thread-safe)
|
|
provider_config = config.provider_configs.get(provider_name, {})
|
|
provider = PROVIDERS[provider_name](provider_config)
|
|
|
|
max_workers = args.workers if args.workers is not None else config.max_workers
|
|
print(f"Running {len(paths)} tasks with {max_workers} workers")
|
|
|
|
failed: list[str] = []
|
|
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
|
futures = {
|
|
pool.submit(_run_task, p, provider_name, config, provider): p
|
|
for p in paths
|
|
}
|
|
for future in as_completed(futures):
|
|
task_path = futures[future]
|
|
try:
|
|
future.result()
|
|
except Exception as exc:
|
|
task_name = task_path.stem
|
|
failed.append(task_name)
|
|
_log(f" [{task_name}] FAILED: {exc}")
|
|
|
|
print(f"\nDone. {len(paths) - len(failed)}/{len(paths)} succeeded.")
|
|
if failed:
|
|
print(f"Failed: {', '.join(failed)}")
|
|
|
|
|
|
def cmd_report(args: argparse.Namespace) -> None:
|
|
print(render_report(Path(args.results_dir)))
|
|
|
|
|
|
def cmd_export(args: argparse.Namespace) -> None:
|
|
output = Path(args.output)
|
|
export_html(Path(args.results_dir), output)
|
|
print(f"Report exported to {output}")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Lush vs Bash AI Benchmarking")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
# list-tasks
|
|
ls = sub.add_parser("list-tasks", help="List available tasks")
|
|
ls.add_argument("--category", choices=["algorithm", "pipeline", "environment", "filesystem", "process"], help="Filter by category")
|
|
ls.add_argument("--mode", choices=["solve", "convert"], help="Filter by mode")
|
|
ls.set_defaults(func=cmd_list_tasks)
|
|
|
|
# run
|
|
run = sub.add_parser("run", help="Run a single task")
|
|
run.add_argument("--task", required=True, help="Path to task TOML file")
|
|
run.add_argument("--provider", default="anthropic", help="LLM provider")
|
|
run.set_defaults(func=cmd_run)
|
|
|
|
# run-all
|
|
ra = sub.add_parser("run-all", help="Run all tasks (optionally filtered)")
|
|
ra.add_argument("--category", choices=["algorithm", "pipeline", "environment", "filesystem", "process"], help="Filter by category")
|
|
ra.add_argument("--mode", choices=["solve", "convert"], help="Filter by mode")
|
|
ra.add_argument("--provider", default="anthropic", help="LLM provider")
|
|
ra.add_argument("--workers", type=int, default=None, help="Max parallel tasks (default: from config, typically 4)")
|
|
ra.set_defaults(func=cmd_run_all)
|
|
|
|
# report
|
|
rpt = sub.add_parser("report", help="Show results report in terminal")
|
|
rpt.add_argument("--results-dir", default="results", help="Results directory")
|
|
rpt.set_defaults(func=cmd_report)
|
|
|
|
# export
|
|
exp = sub.add_parser("export", help="Export HTML report with charts")
|
|
exp.add_argument("--results-dir", default="results", help="Results directory")
|
|
exp.add_argument("--output", "-o", default="report.html", help="Output HTML file")
|
|
exp.set_defaults(func=cmd_export)
|
|
|
|
args = parser.parse_args()
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|