Implement piping between commands in backtick syntax (issue #06)
Add split_pipeline() to split command strings on unquoted | and exec_pipeline() to fork N children connected by inter-stage pipes. Only the last stage's stdout/stderr are captured; middle stages' stderr is inherited. Exit code comes from the last stage.
This commit is contained in:
@@ -1,25 +1,40 @@
|
||||
# 06 — Implement piping between commands
|
||||
|
||||
**Status:** open (post-core)
|
||||
**Blocked by:** #03, #04
|
||||
**Status:** done
|
||||
|
||||
## Syntax
|
||||
|
||||
Pipes are written inside backtick strings using `|`:
|
||||
|
||||
```lua
|
||||
local result = `ls -l` | `grep ".lua"`
|
||||
local r = `ls -l | grep ".lua" | wc -l`
|
||||
```
|
||||
|
||||
The `|` operator between backtick expressions remains Lua's bitwise OR — pipes only work *within* a single backtick command string.
|
||||
|
||||
## Implementation
|
||||
|
||||
- Connect stdout of one child to stdin of the next via `pipe()` + `dup2()`
|
||||
- Build a pipeline of N processes, all forked and connected
|
||||
- Only capture stdout/stderr of the **final** process
|
||||
- `waitpid()` all children, return exit code of last process
|
||||
Implemented in `lcmd.c` with two new functions:
|
||||
|
||||
## Challenge
|
||||
- **`split_pipeline()`** — scans the command string for `|` outside single/double quotes, splits into an array of stage strings (up to 64 stages). Reuses the same quote-tracking logic as `parse_argv()`.
|
||||
|
||||
Lua uses `|` for bitwise OR. The parser needs to disambiguate:
|
||||
- `|` between two command expressions → pipe
|
||||
- `|` between numeric expressions → bitwise OR
|
||||
- **`exec_pipeline()`** — executes a multi-stage pipeline:
|
||||
1. `parse_argv()` each stage
|
||||
2. Creates N-1 inter-stage pipes + stdout/stderr capture pipes for the last stage
|
||||
3. Forks N children with appropriate stdin/stdout wiring
|
||||
4. Captures output from the last stage only
|
||||
5. Returns `{code=last_exit_code, stdout=captured, stderr=captured}`
|
||||
|
||||
This is resolvable because the parser knows the type of each subexpression — a backtick expression is always a command.
|
||||
`luaB_command()` calls `split_pipeline()` first. Single-stage commands (no `|`) fall through to the original single-command codepath unchanged.
|
||||
|
||||
## Behaviour
|
||||
|
||||
- Exit code is from the **last** pipeline stage (like bash)
|
||||
- Only the last stage's stdout/stderr are captured in the result table
|
||||
- Middle stages' stderr is inherited (goes to terminal)
|
||||
- Quoted `|` characters (single or double quotes) are not pipe separators
|
||||
- Empty pipeline stages (e.g. `cmd1 || cmd2` or leading/trailing `|`) are errors
|
||||
|
||||
## Tests
|
||||
|
||||
See `testes/lush/piping.lua`.
|
||||
|
||||
319
lcmd.c
319
lcmd.c
@@ -129,6 +129,82 @@ static int parse_argv (const char *cmd, ParsedArgs *result) {
|
||||
}
|
||||
|
||||
|
||||
/* ===== pipeline splitter ===== */
|
||||
|
||||
#define MAX_PIPELINE_STAGES 64
|
||||
|
||||
/*
|
||||
** Split a command string on unquoted '|' into pipeline stages.
|
||||
** Reuses the same quote-tracking logic as parse_argv().
|
||||
** Returns 0 on success, -1 on error (empty stage, leading/trailing pipe).
|
||||
** Writes stage pointers into stages[] and count into *nstages.
|
||||
** Caller must free stages[0] (the backing buffer).
|
||||
*/
|
||||
static int split_pipeline (const char *cmd, char **stages, int *nstages) {
|
||||
size_t len = strlen(cmd);
|
||||
char *buf = (char *)malloc(len + 1);
|
||||
int count = 0;
|
||||
const char *p = cmd;
|
||||
size_t bp = 0;
|
||||
|
||||
if (buf == NULL) return -1;
|
||||
|
||||
stages[count++] = buf;
|
||||
|
||||
while (*p != '\0') {
|
||||
if (*p == '\'' ) {
|
||||
/* single-quoted string: copy through including quotes */
|
||||
buf[bp++] = *p++;
|
||||
while (*p != '\0' && *p != '\'')
|
||||
buf[bp++] = *p++;
|
||||
if (*p == '\'') buf[bp++] = *p++;
|
||||
}
|
||||
else if (*p == '"') {
|
||||
/* double-quoted string: copy through, respecting backslash */
|
||||
buf[bp++] = *p++;
|
||||
while (*p != '\0' && *p != '"') {
|
||||
if (*p == '\\' && p[1] != '\0') {
|
||||
buf[bp++] = *p++;
|
||||
buf[bp++] = *p++;
|
||||
} else {
|
||||
buf[bp++] = *p++;
|
||||
}
|
||||
}
|
||||
if (*p == '"') buf[bp++] = *p++;
|
||||
}
|
||||
else if (*p == '\\' && p[1] != '\0') {
|
||||
/* backslash escape: copy both chars */
|
||||
buf[bp++] = *p++;
|
||||
buf[bp++] = *p++;
|
||||
}
|
||||
else if (*p == '|') {
|
||||
/* unquoted pipe — split here */
|
||||
buf[bp++] = '\0';
|
||||
if (count >= MAX_PIPELINE_STAGES) { free(buf); return -1; }
|
||||
stages[count++] = buf + bp;
|
||||
p++;
|
||||
}
|
||||
else {
|
||||
buf[bp++] = *p++;
|
||||
}
|
||||
}
|
||||
buf[bp] = '\0';
|
||||
|
||||
/* validate: no empty stages (only for multi-stage pipelines) */
|
||||
if (count > 1) {
|
||||
for (int i = 0; i < count; i++) {
|
||||
const char *s = stages[i];
|
||||
/* skip whitespace */
|
||||
while (*s == ' ' || *s == '\t' || *s == '\n' || *s == '\r') s++;
|
||||
if (*s == '\0') { free(buf); return -1; }
|
||||
}
|
||||
}
|
||||
|
||||
*nstages = count;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/* ===== pipe reader ===== */
|
||||
|
||||
#define READ_CHUNK 4096
|
||||
@@ -219,10 +295,239 @@ static void read_pipes (int fd_out, int fd_err,
|
||||
}
|
||||
|
||||
|
||||
/* ===== pipeline execution ===== */
|
||||
|
||||
/*
|
||||
** Build result table {code=N, stdout=S, stderr=S} on the Lua stack.
|
||||
*/
|
||||
static void push_result_table (lua_State *L, int code,
|
||||
DynBuf *buf_out, DynBuf *buf_err) {
|
||||
lua_createtable(L, 0, 3);
|
||||
lua_pushinteger(L, code);
|
||||
lua_setfield(L, -2, "code");
|
||||
lua_pushlstring(L, buf_out->data ? buf_out->data : "", buf_out->len);
|
||||
lua_setfield(L, -2, "stdout");
|
||||
lua_pushlstring(L, buf_err->data ? buf_err->data : "", buf_err->len);
|
||||
lua_setfield(L, -2, "stderr");
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
** Execute a multi-stage pipeline.
|
||||
** stages[0..nstages-1] are command strings. Each is parsed with parse_argv().
|
||||
** Inter-stage pipes connect stdout→stdin. Only the last stage's stdout/stderr
|
||||
** are captured. Middle stages' stderr goes to the parent's stderr (inherited).
|
||||
** Returns 1 (one result table on the Lua stack).
|
||||
*/
|
||||
static int exec_pipeline (lua_State *L, char **stages, int nstages) {
|
||||
ParsedArgs *pa = NULL;
|
||||
int (*inter_pipes)[2] = NULL; /* inter_pipes[i] connects stage i → i+1 */
|
||||
int out_pipe[2], err_pipe[2]; /* capture pipes for last stage */
|
||||
pid_t *pids = NULL;
|
||||
int i, last_code = -1;
|
||||
DynBuf buf_out, buf_err;
|
||||
struct sigaction sa_old, sa_new;
|
||||
|
||||
/* parse all stages */
|
||||
pa = (ParsedArgs *)calloc((size_t)nstages, sizeof(ParsedArgs));
|
||||
if (pa == NULL) return luaL_error(L, "out of memory");
|
||||
|
||||
for (i = 0; i < nstages; i++) {
|
||||
if (parse_argv(stages[i], &pa[i]) != 0) {
|
||||
for (int j = 0; j < i; j++) free_argv(&pa[j]);
|
||||
free(pa);
|
||||
return luaL_error(L, "unterminated quote in pipeline stage %d", i + 1);
|
||||
}
|
||||
if (pa[i].argc == 0) {
|
||||
for (int j = 0; j <= i; j++) free_argv(&pa[j]);
|
||||
free(pa);
|
||||
return luaL_error(L, "empty command in pipeline stage %d", i + 1);
|
||||
}
|
||||
}
|
||||
|
||||
/* allocate inter-stage pipes (nstages-1 of them) */
|
||||
if (nstages > 1) {
|
||||
inter_pipes = (int (*)[2])calloc((size_t)(nstages - 1), sizeof(int [2]));
|
||||
if (inter_pipes == NULL) {
|
||||
for (i = 0; i < nstages; i++) free_argv(&pa[i]);
|
||||
free(pa);
|
||||
return luaL_error(L, "out of memory");
|
||||
}
|
||||
for (i = 0; i < nstages - 1; i++) {
|
||||
if (pipe(inter_pipes[i]) != 0) {
|
||||
for (int j = 0; j < i; j++) { close(inter_pipes[j][0]); close(inter_pipes[j][1]); }
|
||||
free(inter_pipes);
|
||||
for (int j = 0; j < nstages; j++) free_argv(&pa[j]);
|
||||
free(pa);
|
||||
return luaL_error(L, "pipe() failed: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* capture pipes for last stage */
|
||||
if (pipe(out_pipe) != 0) {
|
||||
if (inter_pipes) {
|
||||
for (i = 0; i < nstages - 1; i++) { close(inter_pipes[i][0]); close(inter_pipes[i][1]); }
|
||||
free(inter_pipes);
|
||||
}
|
||||
for (i = 0; i < nstages; i++) free_argv(&pa[i]);
|
||||
free(pa);
|
||||
return luaL_error(L, "pipe() failed: %s", strerror(errno));
|
||||
}
|
||||
if (pipe(err_pipe) != 0) {
|
||||
close(out_pipe[0]); close(out_pipe[1]);
|
||||
if (inter_pipes) {
|
||||
for (i = 0; i < nstages - 1; i++) { close(inter_pipes[i][0]); close(inter_pipes[i][1]); }
|
||||
free(inter_pipes);
|
||||
}
|
||||
for (i = 0; i < nstages; i++) free_argv(&pa[i]);
|
||||
free(pa);
|
||||
return luaL_error(L, "pipe() failed: %s", strerror(errno));
|
||||
}
|
||||
|
||||
/* ignore SIGPIPE in parent */
|
||||
memset(&sa_new, 0, sizeof(sa_new));
|
||||
sa_new.sa_handler = SIG_IGN;
|
||||
sigemptyset(&sa_new.sa_mask);
|
||||
sigaction(SIGPIPE, &sa_new, &sa_old);
|
||||
|
||||
/* allocate pid array */
|
||||
pids = (pid_t *)calloc((size_t)nstages, sizeof(pid_t));
|
||||
if (pids == NULL) {
|
||||
close(out_pipe[0]); close(out_pipe[1]);
|
||||
close(err_pipe[0]); close(err_pipe[1]);
|
||||
if (inter_pipes) {
|
||||
for (i = 0; i < nstages - 1; i++) { close(inter_pipes[i][0]); close(inter_pipes[i][1]); }
|
||||
free(inter_pipes);
|
||||
}
|
||||
for (i = 0; i < nstages; i++) free_argv(&pa[i]);
|
||||
free(pa);
|
||||
sigaction(SIGPIPE, &sa_old, NULL);
|
||||
return luaL_error(L, "out of memory");
|
||||
}
|
||||
|
||||
/* fork all stages */
|
||||
for (i = 0; i < nstages; i++) {
|
||||
pids[i] = fork();
|
||||
if (pids[i] < 0) {
|
||||
/* fork failed — kill already-forked children */
|
||||
for (int j = 0; j < i; j++) {
|
||||
kill(pids[j], SIGTERM);
|
||||
waitpid(pids[j], NULL, 0);
|
||||
}
|
||||
close(out_pipe[0]); close(out_pipe[1]);
|
||||
close(err_pipe[0]); close(err_pipe[1]);
|
||||
if (inter_pipes) {
|
||||
for (int j = 0; j < nstages - 1; j++) { close(inter_pipes[j][0]); close(inter_pipes[j][1]); }
|
||||
free(inter_pipes);
|
||||
}
|
||||
for (int j = 0; j < nstages; j++) free_argv(&pa[j]);
|
||||
free(pa); free(pids);
|
||||
sigaction(SIGPIPE, &sa_old, NULL);
|
||||
return luaL_error(L, "fork() failed: %s", strerror(errno));
|
||||
}
|
||||
|
||||
if (pids[i] == 0) {
|
||||
/* === child process for stage i === */
|
||||
int j;
|
||||
|
||||
/* restore default SIGPIPE */
|
||||
sa_new.sa_handler = SIG_DFL;
|
||||
sigaction(SIGPIPE, &sa_new, NULL);
|
||||
|
||||
/* set up stdin */
|
||||
if (i > 0) {
|
||||
dup2(inter_pipes[i - 1][0], STDIN_FILENO);
|
||||
}
|
||||
|
||||
/* set up stdout */
|
||||
if (i < nstages - 1) {
|
||||
dup2(inter_pipes[i][1], STDOUT_FILENO);
|
||||
} else {
|
||||
/* last stage: capture stdout and stderr */
|
||||
dup2(out_pipe[1], STDOUT_FILENO);
|
||||
dup2(err_pipe[1], STDERR_FILENO);
|
||||
}
|
||||
|
||||
/* close all pipe fds in child */
|
||||
if (inter_pipes) {
|
||||
for (j = 0; j < nstages - 1; j++) {
|
||||
close(inter_pipes[j][0]);
|
||||
close(inter_pipes[j][1]);
|
||||
}
|
||||
}
|
||||
close(out_pipe[0]); close(out_pipe[1]);
|
||||
close(err_pipe[0]); close(err_pipe[1]);
|
||||
|
||||
execvp(pa[i].argv[0], pa[i].argv);
|
||||
|
||||
/* exec failed */
|
||||
{
|
||||
const char *err = strerror(errno);
|
||||
size_t namelen = strlen(pa[i].argv[0]);
|
||||
size_t errlen = strlen(err);
|
||||
(void)write(STDERR_FILENO, pa[i].argv[0], namelen);
|
||||
(void)write(STDERR_FILENO, ": ", 2);
|
||||
(void)write(STDERR_FILENO, err, errlen);
|
||||
(void)write(STDERR_FILENO, "\n", 1);
|
||||
}
|
||||
_exit(127);
|
||||
}
|
||||
}
|
||||
|
||||
/* === parent: close all write ends === */
|
||||
if (inter_pipes) {
|
||||
for (i = 0; i < nstages - 1; i++) {
|
||||
close(inter_pipes[i][0]);
|
||||
close(inter_pipes[i][1]);
|
||||
}
|
||||
free(inter_pipes);
|
||||
}
|
||||
close(out_pipe[1]);
|
||||
close(err_pipe[1]);
|
||||
|
||||
/* free parsed args (children have already exec'd) */
|
||||
for (i = 0; i < nstages; i++) free_argv(&pa[i]);
|
||||
free(pa);
|
||||
|
||||
/* read captured output from last stage */
|
||||
dynbuf_init(&buf_out);
|
||||
dynbuf_init(&buf_err);
|
||||
read_pipes(out_pipe[0], err_pipe[0], &buf_out, &buf_err);
|
||||
|
||||
/* wait for all children, keep last stage's exit code */
|
||||
for (i = 0; i < nstages; i++) {
|
||||
int status;
|
||||
waitpid(pids[i], &status, 0);
|
||||
if (i == nstages - 1) {
|
||||
if (WIFEXITED(status))
|
||||
last_code = WEXITSTATUS(status);
|
||||
else if (WIFSIGNALED(status))
|
||||
last_code = 128 + WTERMSIG(status);
|
||||
else
|
||||
last_code = -1;
|
||||
}
|
||||
}
|
||||
free(pids);
|
||||
|
||||
/* restore SIGPIPE */
|
||||
sigaction(SIGPIPE, &sa_old, NULL);
|
||||
|
||||
/* push result table */
|
||||
push_result_table(L, last_code, &buf_out, &buf_err);
|
||||
dynbuf_free(&buf_out);
|
||||
dynbuf_free(&buf_err);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
/* ===== luaB_command ===== */
|
||||
|
||||
int luaB_command (lua_State *L) {
|
||||
const char *cmd = luaL_checkstring(L, 1);
|
||||
char *stages[MAX_PIPELINE_STAGES];
|
||||
int nstages;
|
||||
ParsedArgs pa;
|
||||
int out_pipe[2], err_pipe[2];
|
||||
pid_t pid;
|
||||
@@ -230,6 +535,20 @@ int luaB_command (lua_State *L) {
|
||||
DynBuf buf_out, buf_err;
|
||||
struct sigaction sa_old, sa_new;
|
||||
|
||||
/* try to split into pipeline stages */
|
||||
if (split_pipeline(cmd, stages, &nstages) != 0)
|
||||
return luaL_error(L, "invalid pipeline syntax in command");
|
||||
|
||||
/* multi-stage pipeline: delegate to exec_pipeline */
|
||||
if (nstages > 1) {
|
||||
int result = exec_pipeline(L, stages, nstages);
|
||||
free(stages[0]); /* free backing buffer */
|
||||
return result;
|
||||
}
|
||||
|
||||
/* single stage: free pipeline buffer and use original path */
|
||||
free(stages[0]);
|
||||
|
||||
/* parse command into argv */
|
||||
if (parse_argv(cmd, &pa) != 0)
|
||||
return luaL_error(L, "unterminated quote in command");
|
||||
|
||||
83
testes/lush/piping.lua
Normal file
83
testes/lush/piping.lua
Normal file
@@ -0,0 +1,83 @@
|
||||
-- testes/lush/piping.lua
|
||||
-- Tests for piping between commands (issue #06).
|
||||
|
||||
print "testing piping"
|
||||
|
||||
-- basic pipe
|
||||
do
|
||||
local r = `echo hello | cat`
|
||||
assert(r.stdout == "hello\n", "basic pipe stdout: " .. r.stdout)
|
||||
assert(r.code == 0)
|
||||
end
|
||||
|
||||
-- multi-stage pipe
|
||||
do
|
||||
local r = `printf "a\nb\nc\n" | grep b | cat`
|
||||
assert(r.stdout == "b\n", "multi-stage pipe stdout: " .. r.stdout)
|
||||
end
|
||||
|
||||
-- exit code is from last stage
|
||||
do
|
||||
local r = `echo hello | sh -c "exit 42"`
|
||||
assert(r.code == 42, "exit code from last stage: " .. r.code)
|
||||
end
|
||||
|
||||
-- pipe with first stage failing: cat gets EOF, succeeds
|
||||
do
|
||||
local r = `nonexistent_cmd_999 | cat`
|
||||
assert(r.code == 0, "cat should succeed with EOF: " .. r.code)
|
||||
end
|
||||
|
||||
-- quoted pipe character is NOT a pipe separator (double quotes)
|
||||
do
|
||||
local r = `echo "a|b"`
|
||||
assert(r.stdout == "a|b\n", "double-quoted pipe: " .. r.stdout)
|
||||
end
|
||||
|
||||
-- single-quoted pipe character is NOT a pipe separator
|
||||
do
|
||||
local r = `echo 'a|b'`
|
||||
assert(r.stdout == "a|b\n", "single-quoted pipe: " .. r.stdout)
|
||||
end
|
||||
|
||||
-- pipe with interpolation
|
||||
do
|
||||
local ext = ".lua"
|
||||
local r = `echo "test.lua test.py" | grep "${ext}"`
|
||||
assert(r.stdout == "test.lua test.py\n", "pipe with interpolation: " .. r.stdout)
|
||||
end
|
||||
|
||||
-- stderr from middle stages goes to terminal, not captured
|
||||
do
|
||||
local r = `sh -c "echo err >&2; echo out" | cat`
|
||||
assert(r.stdout == "out\n", "stdout through pipe: " .. r.stdout)
|
||||
assert(r.stderr == "", "middle stderr not captured: '" .. r.stderr .. "'")
|
||||
end
|
||||
|
||||
-- single command unchanged (regression check)
|
||||
do
|
||||
local r = `echo hello`
|
||||
assert(r.stdout == "hello\n")
|
||||
assert(r.code == 0)
|
||||
end
|
||||
|
||||
-- pipe preserves multi-line output
|
||||
do
|
||||
local r = `printf "line1\nline2\nline3\n" | cat`
|
||||
assert(r.stdout == "line1\nline2\nline3\n", "multiline pipe: " .. r.stdout)
|
||||
end
|
||||
|
||||
-- pipe with whitespace around |
|
||||
do
|
||||
local r = `echo hello | cat`
|
||||
assert(r.stdout == "hello\n", "whitespace around pipe: " .. r.stdout)
|
||||
end
|
||||
|
||||
-- last stage stderr is captured
|
||||
do
|
||||
local r = `echo hello | sh -c "cat; echo piperr >&2"`
|
||||
assert(r.stdout == "hello\n", "last stage stdout: " .. r.stdout)
|
||||
assert(r.stderr == "piperr\n", "last stage stderr: " .. r.stderr)
|
||||
end
|
||||
|
||||
print "OK"
|
||||
Reference in New Issue
Block a user