--- adns-orig.c 2007-08-24 15:14:43.000000000 -0400 +++ adns-thr.c 2007-08-27 14:38:23.000000000 -0400 @@ -57,7 +57,12 @@ #include <sys/wait.h> #include <netinet/in.h> #include <arpa/nameser.h> -#include <resolv.h> +#ifdef DNSSEC_LOCAL_VALIDATION +# include <validator/validator.h> +# include <pthread.h> +#else +# include <resolv.h> +#endif #include <netdb.h> /* ??? for h_errno */ #include <openswan.h> @@ -78,6 +83,43 @@ static bool debug = FALSE; +#ifndef MAX_WORKERS +# define MAX_WORKERS 10 /* number of in-flight queries */ +#endif + +struct worker_info { + int qfd; /* query pipe's file descriptor */ + int afd; /* answer pipe's file descriptor */ +#ifndef ADNS_THREADS + pid_t pid; +#else + int wqfd; /* worker query pipe's file descriptor */ + int wafd; /* worker answer pipe's file descriptor */ + pthread_t thread; +#endif + bool busy; + void *continuation; /* of outstanding request */ +}; + + +static void send_eof(struct worker_info *w); + + +#ifndef TESTING +#define ERROR syslog +#else +#define ERROR(level, format, args...) do { \ + if (debug) { \ + fprintf(stderr, format , ## args); \ + fprintf(stderr, "\n"); \ + } \ + else \ + syslog(level, format , ## args); \ + } while (0) +#endif + +/**************** read/write funcs **************/ + /* Read a variable-length record from a pipe (and no more!). * First bytes must be a size_t containing the length. * HES_CONTINUE if record read @@ -91,32 +133,54 @@ size_t n = 0; size_t goal = minlen; + if (debug) + fprintf(stderr, "read_pipe: fd %d, expect %lu-%lu\n", fd + ,(u_long)minlen, (u_long)maxlen); do { - ssize_t m = read(fd, stuff + n, goal - n); + ssize_t m; +#ifdef TESTING + m = read(fd, stuff + n, maxlen - n); + if (debug) + fprintf(stderr, "read_pipe: fd %d, m %d\n", fd, m); +#else + m = read(fd, stuff + n, goal - n); +#endif if (m == -1) { if (errno != EINTR) { - syslog(LOG_ERR, "Input error on pipe: %s", strerror(errno)); + ERROR(LOG_ERR, "Input error on pipe: %s", strerror(errno)); return HES_IO_ERROR_IN; } } else if (m == 0) { + if(debug) + fprintf(stderr,"fd %d, empty message\n", fd); return HES_OK; /* treat empty message as EOF */ } else { +#ifdef TESTING + if(stuff[n+m-1] == '\n') { + stuff[n+m-1] = 0; + --m; + if (m == 0) + return HES_BAD_LEN; + } +#endif n += m; if (n >= sizeof(size_t)) { +#ifndef TESTING goal = *(size_t *)(void *)stuff; +#endif if (goal < minlen || maxlen < goal) { if (debug) - fprintf(stderr, "%lu : [%lu, %lu]\n" - , (unsigned long)goal + fprintf(stderr, "fd %d, %lu : [%lu, %lu]\n", fd + , (unsigned long)goal , (unsigned long)minlen, (unsigned long)maxlen); return HES_BAD_LEN; } @@ -124,6 +188,9 @@ } } while (n < goal); + if (debug) + fprintf(stderr, "read_pipe: fd %d, got %lu/%lu\n", fd, (unsigned long)n + , (unsigned long)maxlen); return HES_CONTINUE; } @@ -138,6 +205,8 @@ size_t len = *(const size_t *)(const void *)stuff; size_t n = 0; + if(debug) + fprintf(stderr, "write_pipe: %d bytes to %d\n",len,fd); do { ssize_t m = write(fd, stuff + n, len - n); @@ -146,7 +215,7 @@ /* error, but ignore and retry if EINTR */ if (errno != EINTR) { - syslog(LOG_ERR, "Output error from master: %s", strerror(errno)); + ERROR(LOG_ERR, "Output error from master: %s", strerror(errno)); return HES_IO_ERROR_OUT; } } @@ -174,7 +243,14 @@ # define OLD_RESOLVER 1 #endif -#ifdef OLD_RESOLVER +#if defined ( VALIDATOR_H ) + +# define res_ninit(statp) 0 +# define res_nquery(statp, dname, class, type, answer, anslen) \ + val_res_query(NULL, (char*)dname, class, type, answer, anslen, &val_status) +# define res_nclose(statp) 0 + +#elif defined( OLD_RESOLVER ) # define res_ninit(statp) res_init() # define res_nquery(statp, dname, class, type, answer, anslen) \ @@ -190,47 +266,107 @@ #endif /* !OLD_RESOLVER */ +#ifdef ADNS_THREADS +static void +worker_exit(enum helper_exit_status r, struct worker_info * w) +{ + if (debug) + fprintf(stderr, "worker exiting: %p\n", (void*)w->thread); + + /** close fds so master knows we've quit */ + if (NULL != w) { + close(w->wqfd); + close(w->wafd); + w->wqfd = w->wafd = NULL_FD; + } + + pthread_exit((void*)r); +} + +static void * +worker(void *args) +#else + +#define worker_exit(r, w) return(r) + static int worker(int qfd, int afd) +#endif { +#ifdef ADNS_THREADS + struct worker_info * w = (struct worker_info *)args; + int qfd = w->wqfd, afd = w->wafd; +#endif + + if (debug) + fprintf(stderr, "worker: starting worker, pipes %d/%d\n", qfd, afd); { int r = res_ninit(statp); if (r != 0) { - syslog(LOG_ERR, "cannot initialize resolver"); - return HES_RES_INIT; + ERROR(LOG_ERR, "cannot initialize resolver"); + worker_exit( HES_RES_INIT, w); } +#ifndef VALIDATOR_H #ifndef OLD_RESOLVER statp->options |= RES_ROTATE; #endif statp->options |= RES_DEBUG; +#endif } for (;;) { struct adns_query q; struct adns_answer a; +#ifdef VALIDATOR_H + val_status_t val_status; +#endif enum helper_exit_status r = read_pipe(qfd, (unsigned char *)&q , sizeof(q), sizeof(q)); + if (debug) + fprintf(stderr, "worker: data from master pipe\n"); if (r != HES_CONTINUE) - return r; /* some kind of exit */ + worker_exit(r, w); /* some kind of exit */ if (q.qmagic != ADNS_Q_MAGIC) { - syslog(LOG_ERR, "error in input from master: bad magic"); - return HES_BAD_MAGIC; + ERROR(LOG_ERR, "error in input from master: bad magic"); + worker_exit(HES_BAD_MAGIC, w); } a.amagic = ADNS_A_MAGIC; a.serial = q.serial; + if (debug) + fprintf(stderr, "worker: sending query to resolver\n"); a.result = res_nquery(statp, q.name_buf, C_IN, q.type, a.ans, sizeof(a.ans)); a.h_errno_val = h_errno; a.len = offsetof(struct adns_answer, ans) + (a.result < 0? 0 : a.result); +#ifdef VALIDATOR_H + /** val_status not set for internal errors */ + if ((a.result == -1) && (NETDB_INTERNAL == h_errno)) { + if (debug) + fprintf(stderr, "internal err resolving %s\n", q.name_buf); + } + else { + /** log validation status */ + if (debug) + fprintf(stderr, "ValStatus: %strusted:%s\n", + val_istrusted(val_status) ? "" : "not ", + p_val_status(val_status)); + + if((a.result >= 0) && ((size_t)a.result > sizeof(a.ans))) { + if (debug) + fprintf(stderr, "packet size err resolving %s\n" + , q.name_buf); + } + } +#endif #ifdef DEBUG if (((q.debugging & IMPAIR_DELAY_ADNS_KEY_ANSWER) && q.type == T_KEY) @@ -242,7 +378,7 @@ r = write_pipe(afd, (const unsigned char *)&a); if (r != HES_CONTINUE) - return r; /* some kind of exit */ + worker_exit(r,w); /* some kind of exit */ } } @@ -252,18 +388,6 @@ #define PLUTO_QFD 0 /* queries come on stdin */ #define PLUTO_AFD 1 /* answers go out on stdout */ -#ifndef MAX_WORKERS -# define MAX_WORKERS 10 /* number of in-flight queries */ -#endif - -struct worker_info { - int qfd; /* query pipe's file descriptor */ - int afd; /* answer pipe's file descriptor */ - pid_t pid; - bool busy; - void *continuation; /* of outstanding request */ -}; - static struct worker_info wi[MAX_WORKERS]; static struct worker_info *wi_roof = wi; @@ -278,30 +402,70 @@ static struct query_list *newest_query; /* undefined when oldest == NULL */ static struct query_list *free_queries = NULL; +#ifdef ADNS_THREADS +static void +master_exit(enum helper_exit_status r) +{ + int i; + + if (debug) + fprintf(stderr, "master exiting\n"); + + /** for each worker, send eof and wait for worker termination */ + for(i=0; i < MAX_WORKERS; ++i) { + send_eof(&wi[i]); + } + pthread_exit((void*)r); +} +#else +#define master_exit(r) exit(r) +#endif + static bool spawn_worker(void) { int qfds[2]; int afds[2]; pid_t p; +#ifdef ADNS_THREADS + int rc; +#endif if (pipe(qfds) != 0 || pipe(afds) != 0) { - syslog(LOG_ERR, "pipe(2) failed: %s", strerror(errno)); - exit(HES_PIPE); + ERROR(LOG_ERR, "pipe(2) failed: %s", strerror(errno)); + master_exit(HES_PIPE); } wi_roof->qfd = qfds[1]; /* write end of query pipe */ wi_roof->afd = afds[0]; /* read end of answer pipe */ +#ifdef ADNS_THREADS + wi_roof->wqfd = qfds[0]; /* write end of query pipe */ + wi_roof->wafd = afds[1]; /* read end of answer pipe */ + + if(debug) + fprintf(stderr,"spawn_worker: starting worker thread\n"); + + rc = pthread_create(&wi_roof->thread, NULL, worker, + (void *)wi_roof); + if (rc) + p = -1; + else + p = 1; +#else + if(debug) + fprintf(stderr,"spawn_worker: starting worker process\n"); + p = fork(); +#endif if (p == -1) { /* fork failed: ignore if at least one worker exists */ if (wi_roof == wi) { - syslog(LOG_ERR, "fork(2) error creating first worker: %s", strerror(errno)); - exit(HES_FORK); + ERROR(LOG_ERR, "error creating first worker: %s", strerror(errno)); + master_exit(HES_FORK); } close(qfds[0]); close(qfds[1]); @@ -309,6 +473,7 @@ close(afds[1]); return FALSE; } +#ifndef ADNS_THREADS else if (p == 0) { /* child */ @@ -322,17 +487,20 @@ close(w->qfd); close(w->afd); } - exit(worker(qfds[0], afds[1])); + master_exit(worker(qfds[0], afds[1])); } +#endif else { /* parent */ struct worker_info *w = wi_roof++; - w->pid = p; w->busy = FALSE; +#ifndef ADNS_THREADS + w->pid = p; close(qfds[0]); close(afds[1]); +#endif return TRUE; } } @@ -340,8 +508,13 @@ static void send_eof(struct worker_info *w) { +#ifndef ADNS_THREADS pid_t p; int status; +#endif + + if (debug) + fprintf(stderr,"closing pipes to worker fds %d/%d\n", w->qfd, w->afd); close(w->qfd); w->qfd = NULL_FD; @@ -350,7 +523,11 @@ w->afd = NULL_FD; /* reap child */ +#ifndef ADNS_THREADS p = waitpid(w->pid, &status, 0); +#else + pthread_join(w->thread, NULL); +#endif /* ignore result -- what could we do with it? */ } @@ -361,16 +538,22 @@ if (q == NULL) { + if (debug) + fprintf(stderr, "forward_query: no pending queries\n"); if (eof_from_pluto) send_eof(w); } else { - enum helper_exit_status r - = write_pipe(w->qfd, (const unsigned char *) &q->aq); + enum helper_exit_status r; + + if (debug) + fprintf(stderr, "forward_query: sending query to worker\n"); + + r = write_pipe(w->qfd, (const unsigned char *) &q->aq); if (r != HES_CONTINUE) - exit(r); + master_exit(r); w->busy = TRUE; @@ -386,14 +569,17 @@ struct query_list *q = free_queries; enum helper_exit_status r; + if (debug) + fprintf(stderr, "query: reading pipe\n"); + /* find an unused queue entry */ if (q == NULL) { q = malloc(sizeof(*q)); if (q == NULL) { - syslog(LOG_ERR, "malloc(3) failed"); - exit(HES_MALLOC); + ERROR(LOG_ERR, "malloc(3) failed"); + master_exit(HES_MALLOC); } } else @@ -401,14 +587,28 @@ free_queries = q->next; } +#ifdef TESTING + { + r = read_pipe(PLUTO_QFD, q->aq.name_buf, 7, 100); //sizeof(q->aq.name_buf)); + if (r == HES_BAD_LEN) + return; + q->aq.name_buf[7] = 0; + q->aq.qmagic = ADNS_Q_MAGIC; + q->aq.len = sizeof(q->aq); + q->aq.type = ns_t_a; + } +#else r = read_pipe(PLUTO_QFD, (unsigned char *)&q->aq , sizeof(q->aq), sizeof(q->aq)); +#endif if (r == HES_OK) { /* EOF: we're done, except for unanswered queries */ struct worker_info *w; + if (debug) + fprintf(stderr, "query: EOF from pluto\n"); eof_from_pluto = TRUE; q->next = free_queries; free_queries = q; @@ -423,18 +623,20 @@ } else if (r != HES_CONTINUE) { - exit(r); + master_exit(r); } else if (q->aq.qmagic != ADNS_Q_MAGIC) { - syslog(LOG_ERR, "error in query from Pluto: bad magic"); - exit(HES_BAD_MAGIC); + ERROR(LOG_ERR, "error in query from Pluto: bad magic"); + master_exit(HES_BAD_MAGIC); } else { struct worker_info *w; /* got a query */ + if (debug) + fprintf(stderr, "query: got a query; looking for worker\n"); /* add it to FIFO */ q->next = NULL; @@ -450,15 +652,23 @@ if (w == wi_roof) { /* no free worker */ - if (w == wi + MAX_WORKERS) + if (w == wi + MAX_WORKERS) { + if (debug) + fprintf(stderr, "query: no free worker\n"); break; /* no more to be created */ + } /* make a new one */ + if (debug) + fprintf(stderr, "query: spawning new worker\n"); if (!spawn_worker()) break; /* cannot create one at this time */ } if (!w->busy) { /* assign first to free worker */ + if (debug) + fprintf(stderr, "query: forwarding '%s' to worker\n", + q->aq.name_buf); forward_query(w); break; } @@ -471,38 +681,48 @@ answer(struct worker_info *w) { struct adns_answer a; - enum helper_exit_status r = read_pipe(w->afd, (unsigned char *)&a - , offsetof(struct adns_answer, ans), sizeof(a)); + enum helper_exit_status r; + + if (debug) + fprintf(stderr, "answer: reading from workeron fd %d\n", w->afd); + r = read_pipe(w->afd, (unsigned char *)&a + , offsetof(struct adns_answer, ans), sizeof(a)); if (r == HES_OK) { /* unexpected EOF */ - syslog(LOG_ERR, "unexpected EOF from worker"); - exit(HES_IO_ERROR_IN); + ERROR(LOG_ERR, "unexpected EOF from worker"); + master_exit(HES_IO_ERROR_IN); } else if (r != HES_CONTINUE) { - exit(r); + master_exit(r); } else if (a.amagic != ADNS_A_MAGIC) { - syslog(LOG_ERR, "Input from worker error: bad magic"); - exit(HES_BAD_MAGIC); + ERROR(LOG_ERR, "Input from worker error: bad magic"); + master_exit(HES_BAD_MAGIC); } else if (a.continuation != w->continuation) { /* answer doesn't match query */ - syslog(LOG_ERR, "Input from worker error: continuation mismatch"); - exit(HES_SYNC); + ERROR(LOG_ERR, "Input from worker error: continuation mismatch"); + master_exit(HES_SYNC); } else { /* pass the answer on to Pluto */ - enum helper_exit_status r - = write_pipe(PLUTO_AFD, (const unsigned char *) &a); + enum helper_exit_status r; + + if (debug) + fprintf(stderr, "answer: sending answer to pluto\n"); + r = write_pipe(PLUTO_AFD, (const unsigned char *) &a); if (r != HES_CONTINUE) - exit(r); + master_exit(r); + + if (debug) + fprintf(stderr, "answer: worker now available\n"); w->busy = FALSE; forward_query(w); } @@ -512,6 +732,8 @@ static int master(void) { + if (debug) + fprintf(stderr, "master: loop\n"); for (;;) { fd_set readfds; @@ -529,6 +751,8 @@ { if (w->busy) { + if (debug) + fprintf(stderr, "master: selecting on busy worker fd %d\n",w->afd); FD_SET(w->afd, &readfds); ndes++; if (maxfd < w->afd) @@ -536,21 +760,28 @@ } } - if (ndes == 0) + if (ndes == 0) { + if (debug) + fprintf(stderr, "master: no busy workers or pluto fd, done!\n"); return HES_OK; /* done! */ + } do { ndes = select(maxfd + 1, &readfds, NULL, NULL, NULL); } while (ndes == -1 && errno == EINTR); if (ndes == -1) { - syslog(LOG_ERR, "select(2) error: %s", strerror(errno)); - exit(HES_IO_ERROR_SELECT); + if (debug) + fprintf(stderr, "master: selecting on %d fds, done!\n", ndes); + ERROR(LOG_ERR, "select(2) error: %s", strerror(errno)); + master_exit(HES_IO_ERROR_SELECT); } else if (ndes > 0) { if (FD_ISSET(PLUTO_QFD, &readfds)) { + if (debug) + fprintf(stderr, "master: query from pluto!\n"); query(); ndes--; } @@ -558,6 +789,8 @@ { if (w->busy && FD_ISSET(w->afd, &readfds)) { + if (debug) + fprintf(stderr, "master: answer from worker fd %d\n",w->afd); answer(w); ndes--; } @@ -584,7 +817,7 @@ for (; *sp != NULL; sp++) fprintf(stderr, "%s\n", *sp); - syslog(LOG_ERR, fmt, arg); + ERROR(LOG_ERR, fmt, arg); exit(HES_INVOCATION); } @@ -609,6 +842,12 @@ } } +#ifdef VALIDATOR_H + if(debug) { + val_log_t *logp = val_log_add_optarg("5:stderr", 1); + } +#endif + return master(); }