diff --git a/Makefile b/Makefile index 270b6f6..6caa3b6 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PREFIX=/usr/local BINDIR=$(PREFIX)/bin MANDIR=$(PREFIX)/share/man -ALL_CFLAGS=$(CFLAGS) -std=c99 -Wall -Wextra -Wshadow -Wmissing-prototypes -Wpedantic -Wno-unused-parameter +ALL_CFLAGS=$(CFLAGS) -std=c99 -Wall -Wextra -Wshadow -Wmissing-prototypes -Wpedantic -Wno-unused-parameter -pthread OBJ=\ build.o\ deps.o\ @@ -15,6 +15,7 @@ OBJ=\ parse.o\ samu.o\ scan.o\ + token.o\ tool.o\ tree.o\ util.o @@ -28,6 +29,7 @@ HDR=\ log.h\ parse.h\ scan.h\ + token.h\ tool.h\ tree.h\ util.h diff --git a/build.c b/build.c index 7988423..72cc11f 100644 --- a/build.c +++ b/build.c @@ -2,12 +2,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -16,6 +18,7 @@ #include "env.h" #include "graph.h" #include "log.h" +#include "token.h" #include "util.h" struct job { @@ -34,6 +37,10 @@ static size_t nstarted, nfinished, ntotal; static bool consoleused; static struct timespec starttime; +/* for signal handling */ +static volatile sig_atomic_t killall; +static volatile sig_atomic_t stopsig; + void buildreset(void) { @@ -115,6 +122,11 @@ queue(struct edge *e) *front = e; } +static bool have_work(void) +{ + return work && !stopsig; +} + void buildadd(struct node *n) { @@ -453,7 +465,8 @@ jobdone(struct job *j) j->failed = true; } } else if (WIFSIGNALED(status)) { - warn("job terminated due to signal %d: %s", WTERMSIG(status), j->cmd->s); + if (stopsig != WTERMSIG(status)) + warn("job terminated due to signal %d: %s", WTERMSIG(status), j->cmd->s); j->failed = true; } else { /* cannot happen according to POSIX */ @@ -465,6 +478,8 @@ jobdone(struct job *j) fwrite(j->buf.data, 1, j->buf.len, stdout); j->buf.len = 0; e = j->edge; + j->edge = NULL; + if (e->pool) { p = e->pool; @@ -484,6 +499,14 @@ jobdone(struct job *j) edgedone(e); } +static void +jobkill(struct job *j) +{ + kill(j->pid, SIGTERM); + j->failed = true; + jobdone(j); +} + /* returns whether a job still has work to do. if not, sets j->failed */ static bool jobwork(struct job *j) @@ -507,16 +530,16 @@ jobwork(struct job *j) j->buf.len += n; return true; } - if (n == 0) - goto done; - warn("read:"); + if (n < 0) { + warn("read:"); + goto kill; + } -kill: - kill(j->pid, SIGTERM); - j->failed = true; -done: jobdone(j); + return false; +kill: + jobkill(j); return false; } @@ -538,6 +561,20 @@ queryload(void) #endif } +static void sighandler(int sig) +{ + /* + * Both SIGINT and SIGTERM will not start any more processes, + * but only SIGTERM needs to be forwarded to samurai's child + * processes. That's because samurai is a process group leader + * and SIGINT has already been sent to all children, and they + * will stop on their own. + */ + if (sig == SIGTERM) + killall = true; + stopsig = sig; +} + void build(void) { @@ -545,14 +582,27 @@ build(void) struct pollfd *fds = NULL; size_t i, next = 0, jobslen = 0, maxjobs = buildopts.maxjobs, numjobs = 0, numfail = 0; struct edge *e; + int jobserver_rfd; + struct sigaction sa; if (ntotal == 0) { warn("nothing to do"); return; } + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = sighandler; + sa.sa_flags = SA_RESTART; + sigaction(SIGTERM, &sa, NULL); + sigaction(SIGINT, &sa, NULL); + clock_gettime(CLOCK_MONOTONIC, &starttime); formatstatus(NULL, 0); + jobserver_rfd = tokeninit(); + + /* if running under job server start as many jobs as allowed */ + if (jobserver_rfd != -1) + buildopts.maxjobs = INT_MAX; nstarted = 0; for (;;) { @@ -560,25 +610,28 @@ build(void) if (buildopts.maxload) maxjobs = queryload() > buildopts.maxload ? 1 : buildopts.maxjobs; /* start ready edges */ - while (work && numjobs < maxjobs && numfail < buildopts.maxfail) { + while (have_work() && numjobs < maxjobs && numfail < buildopts.maxfail) { e = work; - work = work->worknext; if (e->rule != &phonyrule && buildopts.dryrun) { ++nstarted; printstatus(e, edgevar(e, "command", true)); ++nfinished; } if (e->rule == &phonyrule || buildopts.dryrun) { + work = work->worknext; for (i = 0; i < e->nout; ++i) nodedone(e->out[i], false); continue; } + if (!tokenget(e)) + break; + work = work->worknext; if (next == jobslen) { jobslen = jobslen ? jobslen * 2 : 8; if (jobslen > buildopts.maxjobs) jobslen = buildopts.maxjobs; jobs = xreallocarray(jobs, jobslen, sizeof(jobs[0])); - fds = xreallocarray(fds, jobslen, sizeof(fds[0])); + fds = xreallocarray(fds, jobslen + 1, sizeof(fds[0])); for (i = next; i < jobslen; ++i) { jobs[i].buf.data = NULL; jobs[i].buf.len = 0; @@ -587,6 +640,8 @@ build(void) fds[i].fd = -1; fds[i].events = POLLIN; } + fds[jobslen].fd = jobserver_rfd; + fds[jobslen].events = POLLIN; } fds[next].fd = jobstart(&jobs[next], e); if (fds[next].fd < 0) { @@ -597,13 +652,23 @@ build(void) ++numjobs; } } - if (numjobs == 0) + if (!have_work() && !numjobs) break; - if (poll(fds, jobslen, 5000) < 0) + /* + * the last slot is for the jobserver and is only used to + * kick samurai out of poll() + */ + if (poll(fds, jobslen + 1, 5000) < 0 && errno != EINTR) fatal("poll:"); for (i = 0; i < jobslen; ++i) { - if (!fds[i].revents || jobwork(&jobs[i])) + if (fds[i].fd == -1) + continue; + else if (killall) + jobkill(&jobs[i]); + else if (!fds[i].revents || jobwork(&jobs[i])) continue; + + tokenput(); --numjobs; jobs[i].next = next; fds[i].fd = -1; @@ -619,6 +684,10 @@ build(void) if (numfail > 0) { if (numfail < buildopts.maxfail) fatal("cannot make progress due to previous errors"); + else if (stopsig == SIGINT) + fatal("interrupted by user"); + else if (stopsig) + fatal("interrupted by signal %d", stopsig); else if (numfail > 1) fatal("subcommands failed"); else diff --git a/graph.h b/graph.h index 5ea808a..ef2c6ee 100644 --- a/graph.h +++ b/graph.h @@ -66,6 +66,9 @@ struct edge { struct edge *worknext; /* used for alledges linked list */ struct edge *allnext; + + /* has the edge requested a token from the job server */ + _Bool reserve; }; void graphinit(void); diff --git a/token.c b/token.c new file mode 100644 index 0000000..b2bb87f --- /dev/null +++ b/token.c @@ -0,0 +1,334 @@ +#define _POSIX_C_SOURCE 200809L + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "graph.h" +#include "token.h" +#include "util.h" + +/* + * The jobserver protocol is simple: samurai can always run one job, + * and needs to read a character from a pipe in order to start more. + * The characters on the pipe are called "tokens" and the extra job is + * referred to as the "free" or "gifted" token; its purpose is to guarantee + * progress of each jobserver client and hence of the overall build. + * When a job finishes, the token must be written back to the pipe, unless + * it was the only running job (in which case you can keep the free token + * for yourself). + * + * One complication is that the pipe should stay in blocking mode. This is + * because traditionally Make had nothing to do while waiting for a token; + * with some tricks to avoid races, SIGCHLD could be used to interrupt + * the blocking read. This does not work well in a more modern event + * loop, therefore samurai uses a thread that handles communication on + * this pipe. A simple mutex/condvar pair is used to send requests to + * the auxiliary thread; any requests submitted during a read will only + * be processed when read() returns, but anyway the auxiliary thread would + * not be able to satisfy them (a blocking read means no available tokens). + * + * Communication from the auxiliary thread to the main thread, instead, + * uses another OS pipe. This pipe holds a character for each job that + * samurai can start, and it can be non-blocking for easy integration with + * the main thread's poll loop. Furthermore, this pipe always starts with + * one character, thus making no distinction between the free token and + * those that the auxiliary thread has taken from the cross-process pipe. + * This way, the main loop does not deal with the complications introduced + * by the free token. + */ + +/* Used by jobserver thread. */ +static int parent_rfd = -1; +static int local_wfd = -1; + +/* Used by main thread. */ +static int parent_wfd = -1; +static int local_rfd = -1; +static pthread_t jobserver_thread; + +/* Read/written under lock. */ +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t request = PTHREAD_COND_INITIALIZER; +static size_t my_tokens = 1; +static size_t pending_edges; +static bool done; + +static bool tokenread(int fd) +{ + char ch; + return read(fd, &ch, 1) > 0; +} + +static void tokenwrite(int fd) +{ + write(fd, "X", 1); +} + +static void tokenwait() +{ + while (!tokenread(parent_rfd)) { + /* + * just in case someone does not follow the spec and makes + * the pipe non-blocking + */ + struct pollfd pfd = { parent_rfd, POLLIN, 0 }; + poll(&pfd, 1, -1); + } +} + +static void *tokenthread(void *unused) +{ + (void)unused; + + pthread_mutex_lock(&mutex); + for (;;) { + while (!pending_edges && !done) + pthread_cond_wait(&request, &mutex); + + if (done) + break; + + pending_edges--; + + /* + * one or more jobs couldn't find a token in local_rfd. + * get it from the global pool and give it to the main + * thread via local_wfd. + */ + pthread_mutex_unlock(&mutex); + tokenwait(); + pthread_mutex_lock(&mutex); + + my_tokens++; + tokenwrite(local_wfd); + } + pthread_mutex_unlock(&mutex); + return NULL; +} + +/* return false if there are no available tokens; in that case one has + * been requested, and local_rfd will become POLLIN once it's been + * retrieved */ +bool tokenget(struct edge *e) +{ + bool got_token = false; + + if (parent_wfd == -1) + return true; + + got_token = tokenread(local_rfd); + if (!got_token && !e->reserve) { + /* + * one increment of pending_edges corresponds to + * one token written to local_wfd. make sure to + * only request a token once per edge! + */ + e->reserve = true; + pthread_mutex_lock(&mutex); + pending_edges++; + pthread_cond_signal(&request); + pthread_mutex_unlock(&mutex); + } + + return got_token; +} + +/* put back a token when a job is done */ +void tokenput(void) +{ + bool freetoken = false; + if (parent_wfd == -1) + return; + + pthread_mutex_lock(&mutex); + if (my_tokens == 1) + freetoken = true; + else + my_tokens--; + pthread_mutex_unlock(&mutex); + + /* + * do not attempt to shortcut tokens directly within samurai: + * apart from the free token, any other token goes to the build + * job and *that* will result in a wakeup of the auxiliary thread. + * This is both simpler and fairer. + */ + tokenwrite(freetoken ? local_wfd : parent_wfd); +} + +static void tokenexit(void) +{ + pthread_mutex_lock(&mutex); + pending_edges = 0; + done = true; + pthread_cond_signal(&request); + pthread_mutex_unlock(&mutex); + pthread_join(jobserver_thread, NULL); + + /* samu could have some tokens if called from fatal(). */ + while (my_tokens > 1) { + my_tokens--; + tokenwrite(parent_wfd); + } +} + +/* --jobserver-auth=fifo:PATH - open named pipe for both reading and writing */ +static int parse_makeflags_fifo(const char *makeflags) +{ + char *fname; + size_t len = 0; + + if (isspace(*makeflags)) + return false; + + do + len++; + while (!isspace(makeflags[len])); + + fname = xmemdup(makeflags, len + 1); + fname[len] = '\0'; + + parent_rfd = open(fname, O_RDONLY); + if (parent_rfd == -1) + return errno; + + parent_wfd = open(fname, O_WRONLY); + if (parent_wfd == -1) + return errno; + + fcntl(parent_rfd, F_SETFD, 1); + fcntl(parent_wfd, F_SETFD, 1); + return 0; +} + +static bool c_isdigit(char ch) +{ + return ch >= '0' && ch <= '9'; +} + +/* --jobserver-auth=R,W - file descriptors provided on the command line */ +static int parse_makeflags_pipe(const char *makeflags) +{ + if (!c_isdigit(*makeflags)) + return false; + parent_rfd = 0; + do + parent_rfd = parent_rfd * 10 + (*makeflags++ - '0'); + while (c_isdigit(*makeflags)); + if (*makeflags != ',') + return -1; + if (fcntl(parent_rfd, F_GETFL) == -1) + return errno; + + makeflags++; + if (!c_isdigit(*makeflags)) + return -1; + parent_wfd = 0; + do + parent_wfd = parent_wfd * 10 + (*makeflags++ - '0'); + while (c_isdigit(*makeflags)); + if (*makeflags && !isspace(*makeflags)) + return -1; + if (fcntl(parent_wfd, F_GETFL) == -1) + return errno; + + return 0; +} + +#define FLAG1 "--jobserver-auth=" +#define FLAG2 "--jobserver-fds=" + +/* + * return file descriptor signaling build() that it can start a + * new edge + */ +int tokeninit(void) +{ + char *makeflags = getenv("MAKEFLAGS"); + const char *p; + const char *makearg = NULL; + int fd[2]; + int ret = 0; + + sigset_t blocked, old; + + if (!makeflags) + return -1; + + makeflags = strdup(makeflags); + p = strtok(makeflags, " \t"); + while (p) { + /* + * the `MAKEFLAGS` variable may contain multiple instances of + * the option. Only the last instance is relevant. + */ + if (!memcmp(p, FLAG1, strlen(FLAG1))) + makearg = p + strlen(FLAG1); + else if (!memcmp(p, FLAG2, strlen(FLAG2))) + makearg = p + strlen(FLAG2); + + p = strtok(NULL, " \t"); + } + + if (!makearg) + goto out; + + if (!memcmp(makearg, "fifo:", 5)) + ret = parse_makeflags_fifo(makearg + 5); + else + ret = parse_makeflags_pipe(makearg); + + if (ret) { + if (ret > 0) + warn("could not open jobserver pipe: %s\n", strerror(ret)); + goto out; + } + + if (pipe(fd) == -1) { + ret = errno; + warn("could not create jobserver pipe: %s\n", strerror(ret)); + goto out; + } + + local_rfd = fd[0]; + local_wfd = fd[1]; + + /* + * temporarily block all signals so that they are processed in + * the main thread + */ + sigfillset(&blocked); + pthread_sigmask(SIG_SETMASK, &blocked, &old); + ret = pthread_create(&jobserver_thread, NULL, tokenthread, NULL); + pthread_sigmask(SIG_SETMASK, &old, NULL); + if (ret) { + warn("could not create jobserver thread: %s\n", strerror(ret)); + goto out_close; + } + + atexit(tokenexit); + + /* the parent gives us a free token */ + tokenwrite(local_wfd); + fcntl(local_rfd, F_SETFL, O_NONBLOCK); + return local_rfd; + +out_close: + close(local_rfd); + close(local_wfd); + local_rfd = local_wfd = -1; +out: + parent_rfd = parent_wfd = -1; + free(makeflags); + return -1; +} diff --git a/token.h b/token.h new file mode 100644 index 0000000..99b3626 --- /dev/null +++ b/token.h @@ -0,0 +1,3 @@ +int tokeninit(void); +bool tokenget(struct edge *e); +void tokenput(void);