From e48b0084e19eacc30a2d8a8e274cc284973e0a2c Mon Sep 17 00:00:00 2001 From: Luis Ontanon Date: Thu, 27 Jun 2013 03:41:48 +0000 Subject: [PATCH] MS: Pong from the dispatcher! svn path=/trunk/; revision=50183 --- echld/child.c | 2 +- echld/common.c | 153 ++++++++++++++------------ echld/dispatcher.c | 263 +++++++++++++++++++++++++++++++++++---------- echld/echld-int.h | 22 +++- echld/echld-util.c | 7 +- echld/parent.c | 181 +++++++++++++++++++------------ echld_test.c | 20 ++-- 7 files changed, 441 insertions(+), 207 deletions(-) diff --git a/echld/child.c b/echld/child.c index b603d3464a..5b8e89021f 100644 --- a/echld/child.c +++ b/echld/child.c @@ -60,7 +60,7 @@ static echld_child_t child; #define CHILD_RESP(BYTEARR,TYPE) echld_write_frame(child.fds.pipe_to_parent, BYTEARR, child.chld_id, TYPE, child.reqh_id, NULL) #ifdef DEBUG_CHILD -static int dbg_level = 0; +static int dbg_level = DEBUG_CHILD; void child_debug(int level, const char* fmt, ...) { va_list ap; diff --git a/echld/common.c b/echld/common.c index c078af5fd7..2071fc6bd0 100644 --- a/echld/common.c +++ b/echld/common.c @@ -26,6 +26,37 @@ #include "echld-int.h" +#ifdef DEBUG_BASE + +static int dbg_level = DEBUG_BASE; +static FILE* dbg_fp; + +static void common_dbg(int level, const char* fmt, ...) { + va_list ap; + char str[1024]; + + if (level > dbg_level) return; + + va_start(ap,fmt); + g_vsnprintf(str,1024,fmt,ap); + va_end(ap); + + if (dbg_fp) { + fprintf(dbg_fp,"Common: level=%d msg='%s'\n",level,str); + fflush(dbg_fp); + } +} + +#define DBG(attrs) ( common_dbg attrs ) +#else +#define DBG(attrs) +#endif + + +extern void echld_common_set_dbg(int level, FILE* fp) { + dbg_level = level; + dbg_fp = fp; +} /** @@ -39,6 +70,8 @@ static void child_realloc_buff(echld_reader_t* r, size_t needed) { size_t s = r->len; long rp_off = r->rp - r->data; + DBG((2,"REALLOC BUFF needed=%d",needed)); + if ( a < (s + needed) ) { guint8* data = r->data; @@ -68,19 +101,6 @@ static void parent_realloc_buff(echld_reader_t* b, size_t needed) { -void echld_init_reader(echld_reader_t* r, int fd, size_t initial) { - r->fd = fd; - if (fd >= 0) fcntl(fd, F_SETFL, O_NONBLOCK); - - if (r->data == NULL) { - r->actual_len = initial; - r->data = (guint8*)g_malloc0(initial); - r->wp = r->data; - r->rp = NULL; - r->len = 0; - } -} - void echld_reset_reader(echld_reader_t* r, int fd, size_t initial) { r->fd = fd; fcntl(fd, F_SETFL, O_NONBLOCK); @@ -89,15 +109,21 @@ void echld_reset_reader(echld_reader_t* r, int fd, size_t initial) { r->actual_len = initial; r->data =(guint8*) g_malloc0(initial); r->wp = r->data; - r->rp = NULL; + r->rp = r->data; r->len = 0; } else { r->wp = r->data; - r->rp = NULL; + r->rp = r->data; r->len = 0; } } +void echld_init_reader(echld_reader_t* r, int fd, size_t initial) { + echld_reset_reader(r,fd,initial); +} + + + void free_reader(echld_reader_t* r) { free(r->data); } @@ -106,26 +132,31 @@ static long reader_readv(echld_reader_t* r, size_t len) { struct iovec iov; long nread; + DBG((2,"READV needed=%d",len)); + if ( (r->actual_len - r->len) < len ) reader_realloc_buff(r, len); iov.iov_base = r->wp; iov.iov_len = len; - nread = readv(0, - &iov, - (guint)len); + nread = readv(r->fd, &iov, 1); + + DBG((2,"READV nread=%d msg='%s'",nread, (nread<0) ? strerror(errno) : "-" )); if (nread >= 0) { r->wp += nread; r->len += nread; } + if (errno == EAGAIN) return 0; + return nread; }; long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) { + DBG((4,"READ = echld_read_frame fd=%d",r->fd)); // it will use shared memory instead of inband communication do { @@ -135,6 +166,8 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) { size_t missing; long off; + DBG((5,"READ reader_len=%d",r->len)); + if ( r->len < ECHLD_HDR_LEN) { /* read the header */ goto incomplete_header; @@ -143,29 +176,28 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) { goto incomplete_frame; } - /* we've got a frame! */ - off = (fr_len = HDR_LEN(h)) + ECHLD_HDR_LEN; - + DBG((5,"READ we've got a frame! fr_len=%d ch=%d t='%c' rh=%d",fr_len, h->h.chld_id, HDR_TYPE(h), h->h.reqh_id)); + + cb( &(r->rp[sizeof(hdr_t)]), HDR_LEN(h), h->h.chld_id, HDR_TYPE(h), h->h.reqh_id, cb_data); - if ( ((long)r->len) >= off ) { - /* shift the consumed frame */ - r->len -= off; - memcpy(r->rp ,r->rp + off ,r->len); - r->wp -= off; - r->rp -= off; - } + r->len = 0; + r->wp = r->data; + r->rp = r->data; - continue; + DBG((5,"READ consumed frame!")); + + goto again; incomplete_header: missing = ECHLD_HDR_LEN - (r->len); + DBG((5,"READ incomplete_header missing=%d",missing)); nread = reader_readv(r,missing); - if (nread < 0) { + if (nread < 0 && errno != EAGAIN) { goto kaput; /*XXX*/ } else if (nread < (long)missing) { goto again; @@ -177,16 +209,21 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) { fr_len = HDR_LEN(h) + ECHLD_HDR_LEN; missing = fr_len - r->len; - nread = reader_readv(r,missing); + DBG((5,"READ incomplete_frame fr_len=%d missing=%d",fr_len ,missing)); + if (missing) { + nread = reader_readv(r,missing); - if (nread < 0) { - goto kaput; /*XXX*/ - } else if (nread <= (long)missing) { - goto again; + if (nread < 0 && errno != EAGAIN) { + goto kaput; /*XXX*/ + } else if (nread < (long)missing) { + goto again; + } } } while(1); + + DBG((1,"READ incomplete_frame Cannot happen")); return 0; again: return 1; @@ -196,44 +233,26 @@ long echld_read_frame(echld_reader_t* r, read_cb_t cb, void* cb_data) { -long echld_write_frame(int fd, GByteArray* ba, guint16 chld_id, echld_msg_type_t type, guint16 reqh_id, void* data) { - static guint8* write_buf = NULL; - static long wb_len = 4096; - hdr_t* h; - struct iovec iov; - long fr_len = ba->len+ECHLD_HDR_LEN; - - data = data; // +long echld_write_frame(int fd, GByteArray* ba, guint16 chld_id, echld_msg_type_t type, guint16 reqh_id, void* data _U_) { + hdr_t h; + struct iovec iov[2]; + int iov_cnt = 1; - // it will use shared memory instead of inband communication - if (! write_buf) { - // lock if needed - write_buf = (guint8*)g_malloc0(wb_len); - // unlock if needed - } + h.h.type_len = (type<<24) | ((ba?ba->len:0) & 0x00ffffff) ; + h.h.chld_id = chld_id; + h.h.reqh_id = reqh_id; - if (fr_len > wb_len) { - do { - wb_len *= 2; - } while (fr_len > wb_len); + iov[0].iov_base = &h; + iov[0].iov_len = 8; - // lock if needed - write_buf = (guint8*)g_realloc(write_buf,wb_len); - // unlock if needed + if ( ba && ba->len > 0 ) { + iov[1].iov_base = ba->data; + iov[1].iov_len = ba->len; + iov_cnt++; } - h = (hdr_t*)write_buf; - h->h.type_len = (type<<24) | (((guint32)ba->len) & 0x00ffffff) ; - h->h.chld_id = chld_id; - h->h.reqh_id = reqh_id; - - memcpy(write_buf+ECHLD_HDR_LEN,ba->data,ba->len); - - iov.iov_base = write_buf; - iov.iov_len = fr_len; - - return (long) writev(fd, &iov, (unsigned)fr_len); + return (long) writev(fd, iov, iov_cnt); } diff --git a/echld/dispatcher.c b/echld/dispatcher.c index 53dbbda9d7..6edff8ae5e 100644 --- a/echld/dispatcher.c +++ b/echld/dispatcher.c @@ -62,27 +62,40 @@ struct dispatcher { struct dispatcher* dispatcher; -#define DISP_RESP(B,T) (echld_write_frame( dispatcher->parent_out, (B), 0, (T), dispatcher->reqh_id, NULL)) #ifdef DEBUG_DISPATCHER -static int dbg_level = 0; +static int debug_lvl = DEBUG_DISPATCHER; +static FILE* debug_fp = NULL; -void dispatcher_debug(int level, const char* fmt, ...) { +#define DCOM echld_common_set_dbg(debug_lvl,debug_fp) +#define DFL fflush(debug_fp) + +int dispatcher_debug(int level, const char* fmt, ...) { va_list ap; char* str; - if (dbg_levelpid, dispatcher->reqh_id, level, str); + if (dispatcher) { + fprintf(debug_fp, "dispatcher[%d]: reqh_id=%d dbg_level=%d message='%s'\n", dispatcher->pid, dispatcher->reqh_id, level, str); + } else { + fprintf(debug_fp, "dispatcher: dbg_level=%d message='%s'\n", level, str); + } + + fflush(debug_fp); + g_free(str); + + return 1; } + static char* param_get_dbg_level(char** err _U_) { - return g_strdup_printf("%d",dbg_level); + return g_strdup_printf("%d",debug_lvl); } static echld_bool_t param_set_dbg_level(char* val , char** err ) { @@ -97,15 +110,51 @@ static echld_bool_t param_set_dbg_level(char* val , char** err ) { return FALSE; } - dbg_level = lvl; + debug_lvl = lvl; + DCOM; return TRUE; } +static long dbg_r = 0; + #define DISP_DBG(attrs) ( dispatcher_debug attrs ) +#define DISP_DBG_INIT() do { debug_fp = stderr; DCOM; } while(0) +#define DISP_DBG_START(fname) do { debug_fp = fopen(fname,"a"); DCOM; DISP_DBG((0,"Log Started")); } while(0) +#define DISP_WRITE(FD,BA,CH,T,RH) ( dbg_r = echld_write_frame(FD,BA,CH,T,RH,NULL), DISP_DBG((1,"SND fd=%d ch=%d ty='%c' rh=%d msg='%s'",FD,CH,T,RH, (dbg_r>0?"ok":strerror(errno)))), dbg_r ) #else #define DISP_DBG(attrs) +#define DISP_DBG_INIT() +#define DISP_DBG_START(fname) +#define DISP_WRITE(FD,BA,CH,T,RH) echld_write_frame(FD,BA,CH,T,RH,NULL) #endif +#define DISP_RESP(B,T) (DISP_WRITE( dispatcher->parent_out, (B), 0, (T), dispatcher->reqh_id)) + +static void dispatcher_fatal(int cause, const char* fmt, ...) { + size_t len= 1024; + gchar err_str[len]; + va_list ap; + int i; + struct dispatcher_child* cc = dispatcher->children; + int max_children = dispatcher->max_children; + + va_start(ap, fmt); + g_vsnprintf(err_str,len,fmt,ap); + va_end(ap); + + DISP_DBG((0,"fatal cause=%d msg=\"%s\"",cause ,err_str)); + + /* the massacre */ + for(i = 0; i < max_children; i++) { + struct dispatcher_child* c = &(cc[i]); + if (c->chld_id > 0) kill(c->pid,SIGTERM); + } + + exit(cause); +} + +#define DISP_FATAL(attrs) dispatcher_fatal attrs + static void dispatcher_err(int errnum, const char* fmt, ...) { size_t len= 1024; gchar err_str[len]; @@ -222,12 +271,33 @@ static char* param_get_interfaces(char** err) { return s; } +static struct timeval disp_loop_timeout; + +static char* param_get_loop_to(char** err _U_) { + return g_strdup_printf("%d.%6ds",(int)disp_loop_timeout.tv_sec, (int)disp_loop_timeout.tv_usec ); +} + +static echld_bool_t param_set_loop_to(char* val , char** err ) { + char* p; + int usec = (int)strtol(val, &p, 10); /*XXX: "10ms" or "500us" or "1s" */ + + if (p<=val) { + *err = g_strdup("not an integer"); + return FALSE; + } + + disp_loop_timeout.tv_sec = usec / 1000000; + disp_loop_timeout.tv_usec = usec % 1000000; + + return TRUE; +} static param_t disp_params[] = { #ifdef DEBUG_DISPATCHER {"dbg_level", param_get_dbg_level, param_set_dbg_level}, # endif + {"loop_timeout",param_get_loop_to,param_set_loop_to}, {"interfaces",param_get_interfaces,NULL}, {NULL,NULL,NULL} }; @@ -237,7 +307,7 @@ static param_t* get_paramset(char* name) { if (strcmp(name,disp_params[i].name) == 0 ) return &(disp_params[i]); } return NULL; -} +} static struct dispatcher_child* dispatcher_get_child(struct dispatcher* d, guint16 chld_id) { @@ -255,6 +325,7 @@ static struct dispatcher_child* dispatcher_get_child(struct dispatcher* d, guint static void dispatcher_clear_child(struct dispatcher_child* c) { + DISP_DBG((5,"dispatcher_clear_child chld_id=%d",c->chld_id)); echld_reset_reader(&(c->reader), -1, 4096); c->chld_id = 0; c->write_fd = 0; @@ -263,14 +334,22 @@ static void dispatcher_clear_child(struct dispatcher_child* c) { } static void preinit_epan(void) { + DISP_DBG((2,"preinit_epan")); /* Here we do initialization of parts of epan that will be the same for every child we fork */ } static void dispatcher_clear(void) { + DISP_DBG((2,"Child chld_id=%d ->CAPTURING")); /* remove unnecessary stuff for the working child */ } +void dispatcher_sig(int sig) { + DISP_FATAL((TERMINATED,"SIG sig=%d",sig)); + exit(1); + +} + void dispatcher_reaper(int sig) { int status; int i; @@ -316,7 +395,7 @@ void dispatcher_reaper(int sig) { if (s) g_free(s); } - echld_write_frame(dispatcher->parent_out, em, c->chld_id, ECHLD_CHILD_DEAD, 0, NULL); + DISP_WRITE(dispatcher->parent_out, em, c->chld_id, ECHLD_CHILD_DEAD, 0); dispatcher_clear_child(c); g_byte_array_free(em,TRUE); return; @@ -369,19 +448,38 @@ static long dispatch_to_parent(guint8* b, size_t len, echld_chld_id_t chld_id, e switch(type) { case ECHLD_ERROR: break; case ECHLD_TIMED_OUT: break; - case ECHLD_HELLO: c->state = IDLE; break; - case ECHLD_CLOSING: c->closing = TRUE; c->state = CLOSED; break; + case ECHLD_HELLO: + c->state = IDLE; + DISP_DBG((2,"Child chld_id=%d ->IDLE",c->chld_id)); + break; + case ECHLD_CLOSING: + c->closing = TRUE; + c->state = CLOSED; + DISP_DBG((2,"Child chld_id=%d ->CLOSED",c->chld_id)); + break; case ECHLD_PARAM: break; case ECHLD_PONG: break; - case ECHLD_FILE_OPENED: c->state = READING; break; - case ECHLD_INTERFACE_OPENED: c->state = READY; break; - case ECHLD_CAPTURE_STARTED: c->state = CAPTURING; break; + case ECHLD_FILE_OPENED: + c->state = READING; + DISP_DBG((2,"Child chld_id=%d ->READING",c->chld_id)); + break; + case ECHLD_INTERFACE_OPENED: + c->state = READY; + DISP_DBG((2,"Child chld_id=%d ->READY",c->chld_id)); + break; + case ECHLD_CAPTURE_STARTED: + c->state = CAPTURING; + DISP_DBG((2,"Child chld_id=%d ->CAPTURING",c->chld_id)); + break; case ECHLD_NOTIFY: break; // notify(pre-encoded) case ECHLD_PACKET_SUM: break; // packet_sum(pre-encoded) case ECHLD_TREE: break; //tree(framenum, tree(pre-encoded) ) case ECHLD_BUFFER: break; // buffer (name,range,totlen,data) - case ECHLD_EOF: c->state = DONE; break; - case ECHLD_CAPTURE_STOPPED: c->state = DONE; break; + case ECHLD_EOF: + case ECHLD_CAPTURE_STOPPED: + c->state = DONE; + DISP_DBG((2,"Child chld_id=%d ->DONE",c->chld_id)); + break; case ECHLD_NOTE_ADDED: break; case ECHLD_PACKET_LIST: break; // packet_list(name,filter,range); case ECHLD_FILE_SAVED: break; @@ -389,8 +487,9 @@ static long dispatch_to_parent(guint8* b, size_t len, echld_chld_id_t chld_id, e default: goto misbehabing; } - - return echld_write_frame(dispatcher->parent_out, &in_ba, chld_id, type, reqh_id, NULL); + + DISP_DBG((4,"Dispatching to child reqh_id=%d chld_id=%d type='%c'",reqh_id,c->chld_id,type)); + return DISP_WRITE(dispatcher->parent_out, &in_ba, chld_id, type, reqh_id); misbehabing: c->state = ERRORED; @@ -415,6 +514,7 @@ void dispatch_new_child(struct dispatcher* dd) { int pipe_to_child; int pipe_from_child; + DISP_DBG((5,"new_child pipe(parent)")); if( pipe(parent_pipe_fds) < 0) { dispatcher_err(ECHLD_ERR_CANNOT_FORK,"CANNOT OPEN PARENT PIPE: %s",strerror(errno)); return; @@ -423,6 +523,7 @@ void dispatch_new_child(struct dispatcher* dd) { pipe_from_parent = parent_pipe_fds[0]; pipe_to_child = parent_pipe_fds[1]; + DISP_DBG((5,"new_child pipe(child)")); if( pipe(child_pipe_fds) < 0) { close(pipe_from_parent); close(pipe_to_child); @@ -433,6 +534,7 @@ void dispatch_new_child(struct dispatcher* dd) { pipe_from_child = child_pipe_fds[0]; pipe_to_parent = child_pipe_fds[1]; + DISP_DBG((4,"New Child Forking()")); switch (( pid = fork() )) { case -1: { close(pipe_to_child); @@ -477,10 +579,12 @@ void dispatch_new_child(struct dispatcher* dd) { echld_reset_reader(&(c->reader), pipe_from_child,4096); c->write_fd = pipe_to_child; c->pid = pid; - dispatcher->nchildren++; + c->chld_id = dispatcher->nchildren++; + + DISP_DBG((4,"Child Forked pid=%d chld_id=%d",pid,c->chld_id)); /* configure child */ - echld_write_frame(pipe_to_child, &out_ba, c->chld_id, ECHLD_NEW_CHILD, dispatcher->reqh_id, NULL); + DISP_WRITE(pipe_to_child, &out_ba, c->chld_id, ECHLD_NEW_CHILD, dispatcher->reqh_id); return; } } @@ -494,21 +598,19 @@ void dispatch_new_child(struct dispatcher* dd) { /* process signals sent from parent */ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, echld_msg_type_t type, echld_reqh_id_t reqh_id, void* data) { struct dispatcher* disp = (struct dispatcher*)data; - GByteArray in_ba; disp->reqh_id = reqh_id; - in_ba.data = b; - in_ba.len = (guint)len; if (chld_id == 0) { /* these are messages to the dispatcher itself */ + DISP_DBG((2,"Message to Dispatcher")); switch(type) { case ECHLD_CLOSE_CHILD: dispatcher_destroy(); return 0; case ECHLD_PING: - echld_write_frame(disp->parent_out, &in_ba, chld_id, ECHLD_PONG, reqh_id, NULL); - + DISP_DBG((2,"PONG reqh_id=%d",reqh_id)); + DISP_WRITE(disp->parent_out, NULL, chld_id, ECHLD_PONG, reqh_id); return 0; case ECHLD_NEW_CHILD: dispatch_new_child(disp); @@ -589,6 +691,8 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec } else { struct dispatcher_child* c; + DISP_DBG((2,"Message to Child")); + if (! (c = dispatcher_get_child(dispatcher, chld_id)) ) { dispatcher_err(ECHLD_ERR_NO_SUCH_CHILD, "wrong chld_id %d", chld_id); return 0; @@ -598,21 +702,26 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec case ECHLD_CLOSE_CHILD: c->closing = TRUE; c->state = CLOSED; + DISP_DBG((2,"Child chld_id=%d ->CLOSED",chld_id)); goto relay_frame; case ECHLD_OPEN_FILE: c->state = READING; + DISP_DBG((2,"Child chld_id=%d ->READING",chld_id)); goto relay_frame; case ECHLD_OPEN_INTERFACE: c->state = READY; + DISP_DBG((2,"Child chld_id=%d ->READY",chld_id)); goto relay_frame; case ECHLD_START_CAPTURE: + DISP_DBG((2,"Child chld_id=%d ->CAPTURING",chld_id)); c->state = CAPTURING; goto relay_frame; case ECHLD_STOP_CAPTURE: + DISP_DBG((2,"Child chld_id=%d ->DONE",chld_id)); c->state = DONE; goto relay_frame; @@ -625,9 +734,15 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec case ECHLD_GET_TREE: case ECHLD_GET_BUFFER: case ECHLD_ADD_NOTE: - relay_frame: - return echld_write_frame(c->write_fd, &in_ba, chld_id, type, reqh_id, NULL); + relay_frame: { + GByteArray in_ba; + + in_ba.data = b; + in_ba.len = (guint)len; + DISP_DBG((3,"Relay to Child chld_id=%d type='%c' req_id=%d",chld_id, type, reqh_id)); + return DISP_WRITE(c->write_fd, &in_ba, chld_id, type, reqh_id); + } default: dispatcher_err(ECHLD_ERR_WRONG_MSG, "wrong message %d %c", reqh_id, type); return 0; @@ -636,18 +751,23 @@ static long dispatch_to_child(guint8* b, size_t len, echld_chld_id_t chld_id, ec } + + int dispatcher_loop(void) { int parent_out = dispatcher->parent_out; int parent_in = dispatcher->parent_in.fd; - struct dispatcher_child* children = dispatcher->children; + volatile int pforce = 0; + + DISP_DBG((5,"LOOP in_fd=%d out_fd=%d",parent_in, parent_out)); do { fd_set rfds; fd_set efds; - struct timeval timeout; struct dispatcher_child* c; int nfds; + int nchld = 0; + FD_ZERO(&rfds); FD_ZERO(&efds); @@ -656,42 +776,58 @@ int dispatcher_loop(void) { FD_SET(parent_in,&efds); FD_SET(parent_out,&efds); - for (c = children, nfds = 0; c->pid; c++) { - if (c->chld_id) { + for (c = children; c->pid; c++) { + if (c->chld_id > 0) { + nchld++; FD_SET(c->reader.fd, &rfds); FD_SET(c->reader.fd, &efds); } - nfds++; } - nfds = select(nfds, &rfds, NULL, &efds, &timeout); + DISP_DBG((5,"Select()ing nchld=%d usecs=%d",nchld,disp_loop_timeout.tv_usec)); - if ( FD_ISSET(parent_in, &efds) || FD_ISSET(parent_out, &efds) ) { - /* XXX deep shit */ - break; + nfds = select(FD_SETSIZE, &rfds, NULL, &efds, &disp_loop_timeout); + + if (nfds < 0) { + DISP_DBG((1,"select error='%s'",strerror(errno) )); + continue; } - if (FD_ISSET(parent_in, &rfds)) { + if ( pforce || FD_ISSET(parent_in, &rfds)) { long st = echld_read_frame(&(dispatcher->parent_in), dispatch_to_child, dispatcher); if (st < 0) { - /* XXX */ + DISP_DBG((1,"read frame returning < 0 for parent")); + /* XXX: ??? */ continue; } } + if ( FD_ISSET(parent_in, &efds) ) { + DISP_DBG((1,"Parent In Pipe Errored!")); + continue; + } + + if ( FD_ISSET(parent_out, &efds) ) { + DISP_DBG((1,"Parent Out Pipe Errored!")); + continue; + } + + for (c=children; c->pid; c++) { if (c->chld_id) { - if ( FD_ISSET(c->reader.fd,&efds) ) { - /* XXX cleanup child and report */ - continue; - } + // if ( FD_ISSET(c->reader.fd,&efds) ) { + // DISP_DBG((1,"errored child pipe chld_id=%d",c->chld_id)); + // dispatcher_clear_child(c); + // continue; + // } if (FD_ISSET(c->reader.fd,&rfds)) { long st = echld_read_frame(&(c->reader), dispatch_to_parent, c); if (st < 0) { - /* XXX cleanup child and report */ + DISP_DBG((1,"read_frame returned < 0 for chld_id=%d",c->chld_id)); + /* XXX */ continue; } continue; @@ -704,17 +840,34 @@ int dispatcher_loop(void) { return 1; } + void echld_dispatcher_start(int* in_pipe_fds, int* out_pipe_fds) { static struct dispatcher d; - int fdt_len = getdtablesize(); - int i; +#ifdef DEBUG_DISPATCHER + int dbg_fd; +#endif + + disp_loop_timeout.tv_sec = 0; + disp_loop_timeout.tv_usec = DISPATCHER_WAIT_INITIAL; + + DISP_DBG_INIT(); + DISP_DBG_START("dispatcher.debug"); +#ifdef DEBUG_DISPATCHER + dbg_fd = fileno(debug_fp); +#endif + DISP_DBG((2,"Dispatcher Starting")); - preinit_epan(); signal(SIGCHLD,dispatcher_reaper); + signal(SIGTERM,dispatcher_sig); + signal(SIGPIPE,dispatcher_sig); + signal(SIGINT,SIG_IGN); + signal(SIGCONT,SIG_IGN); + dispatcher = &d; + echld_init_reader(&(d.parent_in),in_pipe_fds[0],4096); d.parent_out = out_pipe_fds[1]; d.children = g_new0(struct dispatcher_child,ECHLD_MAX_CHILDREN); @@ -723,19 +876,19 @@ void echld_dispatcher_start(int* in_pipe_fds, int* out_pipe_fds) { d.reqh_id = -1; d.pid = getpid(); + close(out_pipe_fds[0]); + close(in_pipe_fds[1]); + echld_get_all_codecs(&(d.enc.to_parent), &(d.dec.from_parent), &(d.enc.to_child), &(d.dec.from_child)); - dispatcher_clear(); + DISP_DBG((2,"Dispatcher Configured pid=%d parent_in=%d parent_out=%d",d.pid,in_pipe_fds[0],d.parent_out)); - /* close all fds but those used */ - for(i=0;itv),&t); + break; default: ret = -1; + break; } if (p->cb) p->cb(ret, p->cb_data); + g_free(p); + return TRUE; } @@ -68,7 +73,7 @@ extern echld_state_t echld_ping(int chld_id, echld_ping_cb_t pcb, void* cb_data) p->cb_data = cb_data; gettimeofday(&(p->tv),NULL); - return echld_reqh(chld_id, 0, ECHLD_PING, NULL, pong, p); + return echld_reqh(chld_id, ECHLD_PING, 0, NULL, pong, p); } diff --git a/echld/parent.c b/echld/parent.c index e74125abf3..53028121bb 100644 --- a/echld/parent.c +++ b/echld/parent.c @@ -67,11 +67,13 @@ struct _echld_parent { parent_decoder_t* dec; } parent = {NULL,{NULL,0,NULL,-1,NULL,0},-1,-1,1,NULL,0,NULL,NULL}; -#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, parent.reqh_id++, NULL) + +static int reqh_ids = 1; + #ifdef DEBUG_PARENT -static int dbg_level = 0; +static int dbg_level = DEBUG_PARENT; static void parent_dbg(int level, const char* fmt, ...) { va_list ap; @@ -88,13 +90,18 @@ static void parent_dbg(int level, const char* fmt, ...) { } #define PARENT_DBG(attrs) parent_dbg attrs +#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) do { long st = echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL); PARENT_DBG((1,"SEND type='%c' chld_id=%d reqh_id=%d msg='%s'",TYPE,CHILDNUM,R_ID, ( st >= 8 ? "ok" : ((st<0)?strerror(errno):"?") ) )); } while(0) #else #define PARENT_DBG(attrs) +#define PARENT_SEND(BYTEARR,CHILDNUM,TYPE,R_ID) echld_write_frame(parent.dispatcher_fd, BYTEARR, CHILDNUM, TYPE, R_ID, NULL) #endif extern void echld_set_parent_dbg_level(int lvl) { (dbg_level = lvl); + if (lvl > 6) { + echld_common_set_dbg(lvl,stderr); + } PARENT_DBG((0,"Debug Level Set: %d",lvl)); } @@ -115,31 +122,25 @@ static void parent_fatal(int exit_code, const char* fmt, ...) { fprintf(stderr,"Fatal error: exit_code=%d str=%s",exit_code,str); #endif + kill(parent.dispatcher_pid,SIGTERM); exit(exit_code); } static void echld_cleanup(void) { - int i; - char b[4]; - GByteArray ba; + // int i; - ba.data = b; - ba.len = 0; + PARENT_DBG((4,"echld_cleanup starting")); - PARENT_DBG((0,"echld_cleanup starting")); - PARENT_SEND(&ba,0,ECHLD_CLOSE_CHILD); + // for (i=0;ireqs->len; - - for(i=0; i < imax ; i++) { - if (((reqh_t*)&g_array_index (c->reqs, reqh_t, i))->reqh_id == reqh_id) return i; - } - - return -1; -} @@ -283,30 +282,54 @@ static echld_t* get_child(int id) { /* send a request */ -static int reqh_ids = 1; +int reqh_id_idx(echld_t* c, int reqh_id) { + int i; + int imax = c->reqs->len; + reqh_t* rr = (reqh_t*)c->reqs->data; + + for(i=0; i < imax ; i++) { + if (rr[i].reqh_id == reqh_id) + return i; + } + + return -1; +} + static echld_state_t reqh_snd(echld_t* c, echld_msg_type_t t, GByteArray* ba, echld_msg_cb_t resp_cb, void* cb_data) { - reqh_t req; + int idx; + reqh_t* r; + int reqh_id = reqh_ids++; if (!c) { PARENT_DBG((1,"REQH_SND: No such child")); return 1; } - req.reqh_id = reqh_ids++; - req.cb = resp_cb; - req.cb_data = cb_data; - gettimeofday(&(req.tv),NULL); + idx = reqh_id_idx(c,-1); + if (idx < 0) { + reqh_t req; + idx = c->reqs->len; + g_array_append_val(c->reqs,req); + } + + r = &(((reqh_t*)c->reqs->data)[idx]); + + r->reqh_id = reqh_id; + r->cb = resp_cb; + r->cb_data = cb_data; + + gettimeofday(&(r->tv),NULL); - g_array_append_val(c->reqs,req); + PARENT_DBG((4,"reqh_add: idx='%d'",idx)); - PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t,c->chld_id,req.reqh_id)); + PARENT_DBG((1,"REQH_SND: type='%c' chld_id=%d reqh_id=%d",t, c->chld_id,reqh_id)); - PARENT_SEND(ba,c->chld_id,t); + PARENT_SEND(ba,c->chld_id,t,reqh_id); - if (ba) g_byte_array_free(ba,TRUE); + if (ba) g_byte_array_free(ba,TRUE); /* do we? */ - return req.reqh_id; + return reqh_id; } @@ -699,6 +722,7 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec echld_t* c = get_child(chld_id); GByteArray* ba = g_byte_array_new(); + PARENT_DBG((3,"parent_read_frame ch=%d t='%c' rh=%d",chld_id,t,reqh_id)); g_byte_array_append(ba,b, (guint)len); if (c) { @@ -708,14 +732,27 @@ static long parent_read_frame(guint8* b, size_t len, echld_chld_id_t chld_id, ec gboolean go_ahead = TRUE; if (r) { /* got that reqh_id */ - go_ahead = r->cb ? r->cb(t,ba,r->cb_data) : TRUE; + if (r->cb) { + go_ahead = r->cb(t,ba,r->cb_data); + } + + r->reqh_id = -1; + r->cb = NULL; + r->cb_data = 0; + r->tv.tv_sec = 0; + r->tv.tv_usec = 0; + + PARENT_DBG((2,"hanlded by reqh_id=%d msg='%s'",reqh_id,go_ahead?"retrying":"done")); } while(go_ahead && ( h = get_next_hdlr_for_type(c,t,&i))) { - go_ahead = h->cb(t,ba,r->cb_data); - } + if (h->cb) + go_ahead = h->cb(t,ba,h->cb_data); + + PARENT_DBG((2,"hanlded by t='%c' msgh_id=%d msg='%s'",h->type, h->id,go_ahead?"retrying":"done")); + } } else { - /* no such child??? */ + PARENT_DBG((1,"parent_read_frame: No such child")); } g_byte_array_free(ba,TRUE); @@ -733,31 +770,33 @@ extern int echld_fd_read(fd_set* rfds, fd_set* efds) { int r_nfds=0; if (FD_ISSET(parent.reader.fd,efds) || FD_ISSET(parent.dispatcher_fd,efds) ) { /* Handle errored dispatcher */ - r_nfds--; + PARENT_DBG((1,"parent errored")); return -1; } if (FD_ISSET(parent.reader.fd,rfds)) { - r_nfds++; + PARENT_DBG((1,"reading from dispatcher")); echld_read_frame(&(parent.reader),parent_read_frame,&(parent)); } return r_nfds; } -extern int echld_select(int nfds, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) { +extern int echld_select(int nfds _U_, fd_set* rfds, fd_set* wfds, fd_set* efds, struct timeval* timeout) { fd_set my_rfds, my_wfds, my_efds; int r_nfds; + if (rfds == NULL) { rfds = &my_rfds; FD_ZERO(rfds); } if (wfds == NULL) { wfds = &my_wfds; FD_ZERO(wfds); } if (efds == NULL) { efds = &my_efds; FD_ZERO(efds); } - nfds += echld_fdset(rfds,efds); + echld_fdset(rfds,efds); - r_nfds = select(nfds, rfds, wfds, efds, timeout); + PARENT_DBG((2,"Select()")); + r_nfds = select(FD_SETSIZE, rfds, wfds, efds, timeout); - r_nfds += echld_fd_read(rfds,efds); + echld_fd_read(rfds,efds); return r_nfds ; } diff --git a/echld_test.c b/echld_test.c index ccf7c73be9..872d53adfb 100644 --- a/echld_test.c +++ b/echld_test.c @@ -58,8 +58,10 @@ int pings = 0; int errors = 0; void ping_cb(long usec, void* data _U_) { - if (usec > 0) { - fprintf(stderr, "ping=%d usec=%d\n", pings ,(int)usec ); + + fprintf(stderr, "Ping ping=%d usec=%d\n", pings ,(int)usec ); + + if (usec >= 0) { pings++; } else { errors++; @@ -70,18 +72,16 @@ void ping_cb(long usec, void* data _U_) { int main(int argc, char** argv) { struct timeval tv; int tot_cycles = 0; + int max_cycles = 30; int npings; -// GString* str = g_string_new(""); tv.tv_sec = 0; tv.tv_usec = 250000; - - - echld_set_parent_dbg_level(4); + echld_set_parent_dbg_level(5); switch(argc) { case 1: - npings = 10; + npings = 3; break; case 2: npings = (int)atoi(argv[1]); @@ -91,14 +91,16 @@ int main(int argc, char** argv) { return 1; } + max_cycles = npings*4; + echld_initialize(ECHLD_ENCODING_JSON); do { - if (tot_cycles < npings) echld_ping(0,ping_cb,NULL); + if ( (tot_cycles > npings) && npings && npings-- ) echld_ping(0,ping_cb,NULL); tot_cycles++; echld_wait(&tv); - } while( (pings < 10) && (tot_cycles < 25)); + } while( (pings < npings) || (tot_cycles < max_cycles)); fprintf(stderr, "Done: pings=%d errors=%d tot_cycles=%d\n", pings, errors ,tot_cycles ); -- 2.34.1