r1578: the first stage of the async client rewrite.
authorAndrew Tridgell <tridge@samba.org>
Fri, 23 Jul 2004 06:40:49 +0000 (06:40 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 17:57:42 +0000 (12:57 -0500)
Up to now the client code has had an async API, and operated
asynchronously at the packet level, but was not truly async in that it
assumed that it could always write to the socket and when a partial
packet came in that it could block waiting for the rest of the packet.

This change makes the SMB client library full async, by adding a
separate outgoing packet queue, using non-blocking socket IO and
having a input buffer that can fill asynchonously until the full
packet has arrived.

The main complexity was in dealing with the events structure when
using the CIFS proxy backend. In that case the same events structure
needs to be used in both the client library and the main smbd server,
so that when the client library is waiting for a reply that the main
server keeps processing packets. This required some changes in the
events library code.

Next step is to make the generated rpc client code use these new
capabilities.
(This used to be commit 96bf4da3edc4d64b0f58ef520269f3b385b8da02)

13 files changed:
source4/client/client.c
source4/include/cli_context.h
source4/include/dlinklist.h
source4/include/events.h
source4/include/rewrite.h
source4/lib/events.c
source4/libcli/raw/clisocket.c
source4/libcli/raw/clitransport.c
source4/libcli/raw/rawrequest.c
source4/ntvfs/cifs/vfs_cifs.c
source4/smb_server/smb_server.c
source4/torture/basic/scanner.c
source4/torture/gentest.c

index 0ad3eed889690778595b1d357ecf20a3bf8b7934..abc4033f2905a7f36c8c67d38c9f7ba47512f1a7 100644 (file)
@@ -2677,8 +2677,6 @@ make sure we swallow keepalives during idle time
 ****************************************************************************/
 static void readline_callback(void)
 {
-       fd_set fds;
-       struct timeval timeout;
        static time_t last_t;
        time_t t;
 
@@ -2688,28 +2686,7 @@ static void readline_callback(void)
 
        last_t = t;
 
- again:
-       if (cli->transport->socket->fd == -1)
-               return;
-
-       FD_ZERO(&fds);
-       FD_SET(cli->transport->socket->fd, &fds);
-
-       timeout.tv_sec = 0;
-       timeout.tv_usec = 0;
-       sys_select_intr(cli->transport->socket->fd+1,&fds,NULL,NULL,&timeout);
-               
-       /* We deliberately use cli_swallow_keepalives instead of
-          client_receive_smb as we want to receive
-          session keepalives and then drop them here.
-       */
-       if (FD_ISSET(cli->transport->socket->fd, &fds)) {
-               if (!cli_request_receive_next(cli->transport)) {
-                       d_printf("Lost connection to server\n");
-                       exit(1);
-               }
-               goto again;
-       }
+       cli_transport_process(cli->transport);
 
        if (cli->tree) {
                cli_chkpath(cli->tree, "\\");
index 22a98981888f58f980db1daf2901cd3944df19f6..e0bf1689ad6db9fa7a2801cc7701668c750639cb 100644 (file)
@@ -134,7 +134,7 @@ struct cli_transport {
        uint_t readbraw_pending:1;
        
        /* an idle function - if this is defined then it will be
-          called once every period milliseconds while we are waiting
+          called once every period seconds while we are waiting
           for a packet */
        struct {
                void (*func)(struct cli_transport *, void *);
@@ -151,7 +151,11 @@ struct cli_transport {
                                uint16_t ecode;
                        } dos;
                        NTSTATUS nt_status;
-                       enum socket_error socket_error;
+                       enum {SOCKET_READ_TIMEOUT,
+                             SOCKET_READ_EOF,
+                             SOCKET_READ_ERROR,
+                             SOCKET_WRITE_ERROR,
+                             SOCKET_READ_BAD_SIG} socket_error;
                        uint_t nbt_error;
                } e;
        } error;
@@ -164,12 +168,30 @@ struct cli_transport {
                void *private;
        } oplock;
 
-       /* a list of async requests that are pending on this connection */
-       struct cli_request *pending_requests;
+       /* a list of async requests that are pending for send on this connection */
+       struct cli_request *pending_send;
+
+       /* a list of async requests that are pending for receive on this connection */
+       struct cli_request *pending_recv;
 
        /* remember the called name - some sub-protocols require us to
           know the server name */
        struct nmb_name called;
+
+       /* a buffer for partially received SMB packets. */
+       struct {
+               uint8_t header[NBT_HDR_SIZE];
+               size_t req_size;
+               size_t received;
+               uint8_t *buffer;
+       } recv_buffer;
+
+       /* the event handle for waiting for socket IO */
+       struct {
+               struct event_context *ctx;
+               struct fd_event *fde;
+               struct timed_event *te;
+       } event;
 };
 
 /* this is the context for the user */
@@ -216,6 +238,15 @@ struct cli_tree {
 };
 
 
+/*
+  a client request moves between the following 4 states.
+*/
+enum cli_request_state {CLI_REQUEST_INIT, /* we are creating the request */
+                       CLI_REQUEST_SEND, /* the request is in the outgoing socket Q */
+                       CLI_REQUEST_RECV, /* we are waiting for a matching reply */
+                       CLI_REQUEST_DONE, /* the request is finished */
+                       CLI_REQUEST_ERROR}; /* a packet or transport level error has occurred */
+
 /* the context for a single SMB request. This is passed to any request-context 
  * functions (similar to context.h, the server version).
  * This will allow requests to be multi-threaded. */
@@ -226,6 +257,9 @@ struct cli_request {
        /* a talloc context for the lifetime of this request */
        TALLOC_CTX *mem_ctx;
        
+       /* each request is in one of 4 possible states */
+       enum cli_request_state state;
+       
        /* a request always has a transport context, nearly always has
           a session context and usually has a tree context */
        struct cli_transport *transport;
index 6191299384270a4c90a24bda5892fa874a52ab02..40f7f0a0c79e2e29930fe4b0a95ed1b2dc424cf4 100644 (file)
@@ -77,3 +77,17 @@ do { \
                DLIST_REMOVE(list, p); \
                DLIST_ADD_END(list, p, tmp); \
 } while (0)
+
+/* concatenate two lists - putting all elements of the 2nd list at the
+   end of the first list */
+#define DLIST_CONCATENATE(list1, list2, type) \
+do { \
+               if (!(list1)) { \
+                       (list1) = (list2); \
+               } else { \
+                       type tmp; \
+                       for (tmp = (list1); tmp->next; tmp = tmp->next) ; \
+                       tmp->next = (list2); \
+                       (list2)->prev = tmp; \
+               } \
+} while (0)
index 7dde3b2ba0f50bac0bb5b0cc042adc0fb7ba4ccd..edded2632bb09fcfa010a37ddde4fced46c535b0 100644 (file)
@@ -67,6 +67,8 @@ struct event_context {
                BOOL exit_now;
                int code;
        } exit;
+
+       int ref_count;
 };
 
 
index c8587f5e4e25e9cf81c7f292c8911b9eaac4c508..21cc7342d1c06674dfdbfe9ac151c28a3ea6a356 100644 (file)
@@ -50,13 +50,6 @@ typedef int BOOL;
 /* Debugging stuff */
 #include "debug.h"
 
-/* types of socket errors */
-enum socket_error {SOCKET_READ_TIMEOUT,
-                  SOCKET_READ_EOF,
-                  SOCKET_READ_ERROR,
-                  SOCKET_WRITE_ERROR,
-                  SOCKET_READ_BAD_SIG};
-
 #include "doserr.h"
 
 /*
index 13a9a444e84cc7e4fa0e8988f55e4f90593bec1b..a6099db5c59bd7c6917098c1c2d8f37488340206 100644 (file)
@@ -81,25 +81,50 @@ struct event_context *event_context_init(void)
        /* start off with no events */
        ZERO_STRUCTP(ev);
 
+       ev->ref_count = 1;
+
        return ev;
 }
 
-
-
 /*
-  add a fd based event
-  return NULL on failure (memory allocation error)
+  destroy an events context, also destroying any remaining events
 */
-struct fd_event *event_add_fd(struct event_context *ev, struct fd_event *e) 
+void event_context_destroy(struct event_context *ev)
 {
-       e = memdup(e, sizeof(*e));
-       if (!e) return NULL;
-       DLIST_ADD(ev->fd_events, e);
-       e->ref_count = 1;
-       if (e->fd > ev->maxfd) {
-               ev->maxfd = e->fd;
+       struct fd_event *fde;
+       struct timed_event *te;
+       struct loop_event *le;
+
+       ev->ref_count--;
+       if (ev->ref_count != 0) {
+               return;
        }
-       return e;
+
+       for (fde=ev->fd_events; fde;) {
+               struct fd_event *next = fde->next;
+               event_remove_fd(ev, fde);
+               if (fde->ref_count == 0) {
+                       free(fde);
+               }
+               fde=next;
+       }
+       for (te=ev->timed_events; te;) {
+               struct timed_event *next = te->next;
+               event_remove_timed(ev, te);
+               if (te->ref_count == 0) {
+                       free(te);
+               }
+               te=next;
+       }
+       for (le=ev->loop_events; le;) {
+               struct loop_event *next = le->next;
+               event_remove_loop(ev, le);
+               if (le->ref_count == 0) {
+                       free(le);
+               }
+               le=next;
+       }
+       free(ev);
 }
 
 
@@ -118,6 +143,47 @@ static void calc_maxfd(struct event_context *ev)
        }
 }
 
+/*
+  move the event structures from ev2 into ev, upping the reference
+  count on ev. The event context ev2 is then destroyed.
+
+  this is used by modules that need to call on the events of a lower module
+*/
+void event_context_merge(struct event_context *ev, struct event_context *ev2)
+{
+       DLIST_CONCATENATE(ev->fd_events, ev2->fd_events, struct fd_event *);
+       DLIST_CONCATENATE(ev->timed_events, ev2->timed_events, struct timed_event *);
+       DLIST_CONCATENATE(ev->loop_events, ev2->loop_events, struct loop_event *);
+
+       ev->ref_count++;
+
+       ev2->fd_events = NULL;
+       ev2->timed_events = NULL;
+       ev2->loop_events = NULL;
+
+       event_context_destroy(ev2);
+
+       calc_maxfd(ev);
+}
+
+
+/*
+  add a fd based event
+  return NULL on failure (memory allocation error)
+*/
+struct fd_event *event_add_fd(struct event_context *ev, struct fd_event *e) 
+{
+       e = memdup(e, sizeof(*e));
+       if (!e) return NULL;
+       DLIST_ADD(ev->fd_events, e);
+       e->ref_count = 1;
+       if (e->fd > ev->maxfd) {
+               ev->maxfd = e->fd;
+       }
+       return e;
+}
+
+
 /* to mark the ev->maxfd invalid
  * this means we need to recalculate it
  */
@@ -242,150 +308,158 @@ void event_loop_exit(struct event_context *ev, int code)
 }
 
 /*
-  go into an event loop using the events defined in ev this function
-  will return with the specified code if one of the handlers calls
-  event_loop_exit()
-
-  also return (with code 0) if all fd events are removed
+  do a single event loop using the events defined in ev this function
 */
-int event_loop_wait(struct event_context *ev)
+void event_loop_once(struct event_context *ev)
 {
        time_t t;
-
-       ZERO_STRUCT(ev->exit);
-       ev->maxfd = EVENT_INVALID_MAXFD;
+       fd_set r_fds, w_fds;
+       struct fd_event *fe;
+       struct loop_event *le;
+       struct timed_event *te;
+       int selrtn;
+       struct timeval tval;
 
        t = time(NULL);
 
-       while (ev->fd_events && !ev->exit.exit_now) {
-               fd_set r_fds, w_fds;
-               struct fd_event *fe;
-               struct loop_event *le;
-               struct timed_event *te;
-               int selrtn;
-               struct timeval tval;
-
-               /* the loop events are called on each loop. Be careful to allow the 
-                  event to remove itself */
-               for (le=ev->loop_events;le;) {
-                       struct loop_event *next = le->next;
-                       if (le->ref_count == 0) {
-                               DLIST_REMOVE(ev->loop_events, le);
-                               free(le);
-                       } else {
-                               le->ref_count++;
-                               le->handler(ev, le, t);
-                               le->ref_count--;
-                       }
-                       le = next;
+       /* the loop events are called on each loop. Be careful to allow the 
+          event to remove itself */
+       for (le=ev->loop_events;le;) {
+               struct loop_event *next = le->next;
+               if (le->ref_count == 0) {
+                       DLIST_REMOVE(ev->loop_events, le);
+                       free(le);
+               } else {
+                       le->ref_count++;
+                       le->handler(ev, le, t);
+                       le->ref_count--;
                }
+               le = next;
+       }
 
-               ZERO_STRUCT(tval);
-               FD_ZERO(&r_fds);
-               FD_ZERO(&w_fds);
-
-               /* setup any fd events */
-               for (fe=ev->fd_events; fe; ) {
-                       struct fd_event *next = fe->next;
-                       if (fe->ref_count == 0) {
-                               DLIST_REMOVE(ev->fd_events, fe);
-                               if (ev->maxfd == fe->fd) {
-                                       ev->maxfd = EVENT_INVALID_MAXFD;
-                               }
-                               free(fe);
-                       } else {
-                               if (fe->flags & EVENT_FD_READ) {
-                                       FD_SET(fe->fd, &r_fds);
-                               }
-                               if (fe->flags & EVENT_FD_WRITE) {
-                                       FD_SET(fe->fd, &w_fds);
-                               }
+       ZERO_STRUCT(tval);
+       FD_ZERO(&r_fds);
+       FD_ZERO(&w_fds);
+
+       /* setup any fd events */
+       for (fe=ev->fd_events; fe; ) {
+               struct fd_event *next = fe->next;
+               if (fe->ref_count == 0) {
+                       DLIST_REMOVE(ev->fd_events, fe);
+                       if (ev->maxfd == fe->fd) {
+                               ev->maxfd = EVENT_INVALID_MAXFD;
+                       }
+                       free(fe);
+               } else {
+                       if (fe->flags & EVENT_FD_READ) {
+                               FD_SET(fe->fd, &r_fds);
+                       }
+                       if (fe->flags & EVENT_FD_WRITE) {
+                               FD_SET(fe->fd, &w_fds);
                        }
-                       fe = next;
                }
+               fe = next;
+       }
 
-               /* start with a reasonable max timeout */
-               tval.tv_sec = 600;
+       /* start with a reasonable max timeout */
+       tval.tv_sec = 600;
                
-               /* work out the right timeout for all timed events */
-               for (te=ev->timed_events;te;te=te->next) {
-                       int timeout = te->next_event - t;
-                       if (timeout < 0) {
-                               timeout = 0;
-                       }
-                       if (te->ref_count &&
-                           timeout < tval.tv_sec) {
-                               tval.tv_sec = timeout;
-                       }
+       /* work out the right timeout for all timed events */
+       for (te=ev->timed_events;te;te=te->next) {
+               int timeout = te->next_event - t;
+               if (timeout < 0) {
+                       timeout = 0;
                }
+               if (te->ref_count &&
+                   timeout < tval.tv_sec) {
+                       tval.tv_sec = timeout;
+               }
+       }
 
-               /* only do a select() if there're fd_events
-                * otherwise we would block for a the time in tval,
-                * and if there're no fd_events present anymore we want to
-                * leave the event loop directly
+       /* only do a select() if there're fd_events
+        * otherwise we would block for a the time in tval,
+        * and if there're no fd_events present anymore we want to
+        * leave the event loop directly
+        */
+       if (ev->fd_events) {
+               /* we maybe need to recalculate the maxfd */
+               if (ev->maxfd == EVENT_INVALID_MAXFD) {
+                       calc_maxfd(ev);
+               }
+               
+               /* TODO:
+                * we don't use sys_select() as it isn't thread
+                * safe. We need to replace the magic pipe handling in
+                * sys_select() with something in the events
+                * structure - for now just use select() 
                 */
-               if (ev->fd_events) {
-                       /* we maybe need to recalculate the maxfd */
-                       if (ev->maxfd == EVENT_INVALID_MAXFD) {
-                               calc_maxfd(ev);
-                       }
-
-                       /* TODO:
-                        * we don't use sys_select() as it isn't thread
-                        * safe. We need to replace the magic pipe handling in
-                        * sys_select() with something in the events
-                        * structure - for now just use select() 
-                        */
-                       selrtn = select(ev->maxfd+1, &r_fds, &w_fds, NULL, &tval);
-
-                       t = time(NULL);
-
-                       if (selrtn == -1 && errno == EBADF) {
-                               /* the socket is dead! this should never
-                                  happen as the socket should have first been
-                                  made readable and that should have removed
-                                  the event, so this must be a bug. This is a
-                                  fatal error. */
-                               DEBUG(0,("EBADF on event_loop_wait - exiting\n"));
-                               return -1;
-                       }
-
-                       if (selrtn > 0) {
-                               /* at least one file descriptor is ready - check
-                                  which ones and call the handler, being careful to allow
-                                  the handler to remove itself when called */
-                               for (fe=ev->fd_events; fe; fe=fe->next) {
-                                       uint16_t flags = 0;
-                                       if (FD_ISSET(fe->fd, &r_fds)) flags |= EVENT_FD_READ;
-                                       if (FD_ISSET(fe->fd, &w_fds)) flags |= EVENT_FD_WRITE;
-                                       if (fe->ref_count && flags) {
-                                               fe->ref_count++;
-                                               fe->handler(ev, fe, t, flags);
-                                               fe->ref_count--;
-                                       }
+               selrtn = select(ev->maxfd+1, &r_fds, &w_fds, NULL, &tval);
+               
+               t = time(NULL);
+               
+               if (selrtn == -1 && errno == EBADF) {
+                       /* the socket is dead! this should never
+                          happen as the socket should have first been
+                          made readable and that should have removed
+                          the event, so this must be a bug. This is a
+                          fatal error. */
+                       DEBUG(0,("EBADF on event_loop_wait - exiting\n"));
+                       return;
+               }
+               
+               if (selrtn > 0) {
+                       /* at least one file descriptor is ready - check
+                          which ones and call the handler, being careful to allow
+                          the handler to remove itself when called */
+                       for (fe=ev->fd_events; fe; fe=fe->next) {
+                               uint16_t flags = 0;
+                               if (FD_ISSET(fe->fd, &r_fds)) flags |= EVENT_FD_READ;
+                               if (FD_ISSET(fe->fd, &w_fds)) flags |= EVENT_FD_WRITE;
+                               if (fe->ref_count && flags) {
+                                       fe->ref_count++;
+                                       fe->handler(ev, fe, t, flags);
+                                       fe->ref_count--;
                                }
                        }
                }
+       }
 
-               /* call any timed events that are now due */
-               for (te=ev->timed_events;te;) {
-                       struct timed_event *next = te->next;
-                       if (te->ref_count == 0) {
-                               DLIST_REMOVE(ev->timed_events, te);
-                               free(te);
-                       } else if (te->next_event <= t) {
-                               te->ref_count++;
-                               te->handler(ev, te, t);
-                               te->ref_count--;
-                               if (te->next_event <= t) {
-                                       /* the handler didn't set a time for the 
-                                          next event - remove the event */
-                                       event_remove_timed(ev, te);
-                               }
+       /* call any timed events that are now due */
+       for (te=ev->timed_events;te;) {
+               struct timed_event *next = te->next;
+               if (te->ref_count == 0) {
+                       DLIST_REMOVE(ev->timed_events, te);
+                       free(te);
+               } else if (te->next_event <= t) {
+                       te->ref_count++;
+                       te->handler(ev, te, t);
+                       te->ref_count--;
+                       if (te->next_event <= t) {
+                               /* the handler didn't set a time for the 
+                                  next event - remove the event */
+                               event_remove_timed(ev, te);
                        }
-                       te = next;
-               }               
+               }
+               te = next;
+       }               
+}
+
+/*
+  go into an event loop using the events defined in ev this function
+  will return with the specified code if one of the handlers calls
+  event_loop_exit()
+
+  also return (with code 0) if all fd events are removed
+*/
+int event_loop_wait(struct event_context *ev)
+{
+       ZERO_STRUCT(ev->exit);
+       ev->maxfd = EVENT_INVALID_MAXFD;
+
+       ev->exit.exit_now = False;
 
+       while (ev->fd_events && !ev->exit.exit_now) {
+               event_loop_once(ev);
        }
 
        return ev->exit.code;
index 5cd6f33689afb8a8620ab66f65027465839efa45..eb5d3c034259e3f2b517b38f8ed8589ff92b870c 100644 (file)
@@ -72,7 +72,13 @@ BOOL cli_sock_connect(struct cli_socket *sock, struct in_addr *ip, int port)
                                   &sock->dest_ip,
                                   sock->port, 
                                   LONG_CONNECT_TIMEOUT);
-       return (sock->fd != -1);
+       if (sock->fd == -1) {
+               return False;
+       }
+
+       set_blocking(sock->fd, False);
+
+       return True;
 }
 
 
index 18784fe33ac951580dea7ca100cfece8c42491f8..96d5a18a71956e6a8ff79110ebe534c117e00c18 100644 (file)
 
 #include "includes.h"
 
+/*
+  an event has happened on the socket
+*/
+static void cli_transport_event_handler(struct event_context *ev, struct fd_event *fde, 
+                                       time_t t, uint16_t flags)
+{
+       struct cli_transport *transport = fde->private;
+
+       cli_transport_process(transport);
+}
+
 /*
   create a transport structure based on an established socket
 */
@@ -28,6 +39,7 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock)
 {
        TALLOC_CTX *mem_ctx;
        struct cli_transport *transport;
+       struct fd_event fde;
 
        mem_ctx = talloc_init("cli_transport");
        if (!mem_ctx) return NULL;
@@ -35,6 +47,12 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock)
        transport = talloc_zero(mem_ctx, sizeof(*transport));
        if (!transport) return NULL;
 
+       transport->event.ctx = event_context_init();
+       if (transport->event.ctx == NULL) {
+               talloc_destroy(mem_ctx);
+               return NULL;
+       }
+
        transport->mem_ctx = mem_ctx;
        transport->socket = sock;
        transport->negotiate.protocol = PROTOCOL_NT1;
@@ -47,6 +65,14 @@ struct cli_transport *cli_transport_init(struct cli_socket *sock)
 
        ZERO_STRUCT(transport->called);
 
+       fde.fd = sock->fd;
+       fde.flags = EVENT_FD_READ;
+       fde.handler = cli_transport_event_handler;
+       fde.private = transport;
+       fde.ref_count = 1;
+
+       transport->event.fde = event_add_fd(transport->event.ctx, &fde);
+
        return transport;
 }
 
@@ -59,6 +85,9 @@ void cli_transport_close(struct cli_transport *transport)
        transport->reference_count--;
        if (transport->reference_count <= 0) {
                cli_sock_close(transport->socket);
+               event_remove_fd(transport->event.ctx, transport->event.fde);
+               event_remove_timed(transport->event.ctx, transport->event.te);
+               event_context_destroy(transport->event.ctx);
                talloc_destroy(transport->mem_ctx);
        }
 }
@@ -72,6 +101,21 @@ void cli_transport_dead(struct cli_transport *transport)
 }
 
 
+/*
+  enable select for write on a transport
+*/
+static void cli_transport_write_enable(struct cli_transport *transport)
+{
+       transport->event.fde->flags |= EVENT_FD_WRITE;
+}
+
+/*
+  disable select for write on a transport
+*/
+static void cli_transport_write_disable(struct cli_transport *transport)
+{
+       transport->event.fde->flags &= ~EVENT_FD_WRITE;
+}
 
 /****************************************************************************
 send a session request (if appropriate)
@@ -145,7 +189,7 @@ again:
        /* the zero mid is reserved for requests that don't have a mid */
        if (mid == 0) mid = 1;
 
-       for (req=transport->pending_requests; req; req=req->next) {
+       for (req=transport->pending_recv; req; req=req->next) {
                if (req->mid == mid) {
                        mid++;
                        goto again;
@@ -156,81 +200,284 @@ again:
        return mid;
 }
 
+static void idle_handler(struct event_context *ev, 
+                        struct timed_event *te, time_t t)
+{
+       struct cli_transport *transport = te->private;
+       te->next_event = t + transport->idle.period;
+       transport->idle.func(transport, transport->idle.private);
+}
+
 /*
   setup the idle handler for a transport
+  the period is in seconds
 */
 void cli_transport_idle_handler(struct cli_transport *transport, 
                                void (*idle_func)(struct cli_transport *, void *),
                                uint_t period,
                                void *private)
 {
+       struct timed_event te;
        transport->idle.func = idle_func;
        transport->idle.private = private;
        transport->idle.period = period;
-}
 
+       if (transport->event.te != NULL) {
+               event_remove_timed(transport->event.ctx, transport->event.te);
+       }
+
+       te.next_event = time(NULL) + period;
+       te.handler = idle_handler;
+       te.private = transport;
+       transport->event.te = event_add_timed(transport->event.ctx, &te);
+}
 
 /*
-  determine if a packet is pending for receive on a transport
+  process some pending sends
 */
-BOOL cli_transport_pending(struct cli_transport *transport)
+static void cli_transport_process_send(struct cli_transport *transport)
 {
-       return socket_pending(transport->socket->fd);
-}
-
+       while (transport->pending_send) {
+               struct cli_request *req = transport->pending_send;
+               ssize_t ret;
+               ret = cli_sock_write(transport->socket, req->out.buffer, req->out.size);
+               if (ret == -1) {
+                       if (errno == EAGAIN || errno == EINTR) {
+                               return;
+                       }
+                       cli_transport_dead(transport);
+               }
+               req->out.buffer += ret;
+               req->out.size -= ret;
+               if (req->out.size == 0) {
+                       req->state = CLI_REQUEST_RECV;
+                       DLIST_REMOVE(transport->pending_send, req);
+                       DLIST_ADD(transport->pending_recv, req);
+               }
+       }
 
+       /* we're out of requests to send, so don't wait for write
+          events any more */
+       cli_transport_write_disable(transport);
+}
 
 /*
-  wait for data on a transport, periodically calling a wait function
-  if one has been defined
-  return True if a packet is received
-*/
-BOOL cli_transport_select(struct cli_transport *transport)
+  we have a full request in our receive buffer - match it to a pending request
+  and process
+ */
+static void cli_transport_finish_recv(struct cli_transport *transport)
 {
-       fd_set fds;
-       int selrtn;
-       int fd;
-       struct timeval timeout;
+       uint8_t *buffer, *hdr, *vwv;
+       int len;
+       uint16_t wct, mid = 0;
+       struct cli_request *req;
 
-       fd = transport->socket->fd;
+       buffer = transport->recv_buffer.buffer;
+       len = transport->recv_buffer.req_size;
 
-       if (fd == -1) {
-               return False;
+       hdr = buffer+NBT_HDR_SIZE;
+       vwv = hdr + HDR_VWV;
+
+       /* see if it could be an oplock break request */
+       if (handle_oplock_break(transport, len, hdr, vwv)) {
+               talloc_free(transport->mem_ctx, buffer);
+               ZERO_STRUCT(transport->recv_buffer);
+               return;
        }
 
-       do {
-               uint_t period = 1000;
+       /* at this point we need to check for a readbraw reply, as
+          these can be any length */
+       if (transport->readbraw_pending) {
+               transport->readbraw_pending = 0;
+
+               /* it must match the first entry in the pending queue
+                  as the client is not allowed to have outstanding
+                  readbraw requests */
+               req = transport->pending_recv;
+               if (!req) goto error;
+
+               req->in.buffer = buffer;
+               talloc_steal(transport->mem_ctx, req->mem_ctx, buffer);
+               req->in.size = len + NBT_HDR_SIZE;
+               req->in.allocated = req->in.size;
+               goto async;
+       }
 
-               FD_ZERO(&fds);
-               FD_SET(fd,&fds);
-       
-               if (transport->idle.func) {
-                       period = transport->idle.period;
+       if (len >= MIN_SMB_SIZE) {
+               /* extract the mid for matching to pending requests */
+               mid = SVAL(hdr, HDR_MID);
+               wct = CVAL(hdr, HDR_WCT);
+       }
+
+       /* match the incoming request against the list of pending requests */
+       for (req=transport->pending_recv; req; req=req->next) {
+               if (req->mid == mid) break;
+       }
+
+       if (!req) {
+               DEBUG(1,("Discarding unmatched reply with mid %d\n", mid));
+               goto error;
+       }
+
+       /* fill in the 'in' portion of the matching request */
+       req->in.buffer = buffer;
+       talloc_steal(transport->mem_ctx, req->mem_ctx, buffer);
+       req->in.size = len + NBT_HDR_SIZE;
+       req->in.allocated = req->in.size;
+
+       /* handle non-SMB replies */
+       if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE) {
+               req->state = CLI_REQUEST_ERROR;
+               goto error;
+       }
+
+       if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
+               DEBUG(2,("bad reply size for mid %d\n", mid));
+               req->status = NT_STATUS_UNSUCCESSFUL;
+               req->state = CLI_REQUEST_ERROR;
+               goto error;
+       }
+
+       req->in.hdr = hdr;
+       req->in.vwv = vwv;
+       req->in.wct = wct;
+       if (req->in.size >= NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
+               req->in.data = req->in.vwv + VWV(wct) + 2;
+               req->in.data_size = SVAL(req->in.vwv, VWV(wct));
+               if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct) + req->in.data_size) {
+                       DEBUG(3,("bad data size for mid %d\n", mid));
+                       /* blergh - w2k3 gives a bogus data size values in some
+                          openX replies */
+                       req->in.data_size = req->in.size - (NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct));
                }
+       }
+       req->in.ptr = req->in.data;
+       req->flags2 = SVAL(req->in.hdr, HDR_FLG2);
+
+       if (!(req->flags2 & FLAGS2_32_BIT_ERROR_CODES)) {
+               transport->error.etype = ETYPE_DOS;
+               transport->error.e.dos.eclass = CVAL(req->in.hdr,HDR_RCLS);
+               transport->error.e.dos.ecode = SVAL(req->in.hdr,HDR_ERR);
+               req->status = dos_to_ntstatus(transport->error.e.dos.eclass, 
+                                             transport->error.e.dos.ecode);
+       } else {
+               transport->error.etype = ETYPE_NT;
+               transport->error.e.nt_status = NT_STATUS(IVAL(req->in.hdr, HDR_RCLS));
+               req->status = transport->error.e.nt_status;
+       }
+
+       if (!cli_request_check_sign_mac(req)) {
+               transport->error.etype = ETYPE_SOCKET;
+               transport->error.e.socket_error = SOCKET_READ_BAD_SIG;
+               req->state = CLI_REQUEST_ERROR;
+               goto error;
+       };
+
+async:
+       /* if this request has an async handler then call that to
+          notify that the reply has been received. This might destroy
+          the request so it must happen last */
+       ZERO_STRUCT(transport->recv_buffer);
+       DLIST_REMOVE(transport->pending_recv, req);
+       req->state = CLI_REQUEST_DONE;
+       if (req->async.fn) {
+               req->async.fn(req);
+       }
+       return;
+
+error:
+       if (req) {
+               DLIST_REMOVE(transport->pending_recv, req);
+               req->state = CLI_REQUEST_ERROR;
+       }
+       ZERO_STRUCT(transport->recv_buffer);
+}
 
-               timeout.tv_sec = period / 1000;
-               timeout.tv_usec = 1000*(period%1000);
-               
-               selrtn = sys_select_intr(fd+1,&fds,NULL,NULL,&timeout);
-               
-               if (selrtn == 1) {
-                       /* the fd is readable */
-                       return True;
+/*
+  process some pending receives
+*/
+static void cli_transport_process_recv(struct cli_transport *transport)
+{
+       /* a incoming packet goes through 2 stages - first we read the
+          4 byte header, which tells us how much more is coming. Then
+          we read the rest */
+       if (transport->recv_buffer.received < NBT_HDR_SIZE) {
+               ssize_t ret;
+               ret = cli_sock_read(transport->socket, 
+                                   transport->recv_buffer.header + 
+                                   transport->recv_buffer.received,
+                                   NBT_HDR_SIZE - transport->recv_buffer.received);
+               if (ret == -1) {
+                       if (errno == EINTR || errno == EAGAIN) {
+                               return;
+                       }
+                       cli_transport_dead(transport);
+                       return;
                }
-               
-               if (selrtn == -1) {
-                       /* sys_select_intr() already handles EINTR, so this
-                          is an error. The socket is probably dead */
-                       return False;
+
+               transport->recv_buffer.received += ret;
+
+               if (transport->recv_buffer.received == NBT_HDR_SIZE) {
+                       /* we've got a full header */
+                       transport->recv_buffer.req_size = smb_len(transport->recv_buffer.header) + NBT_HDR_SIZE;
+                       transport->recv_buffer.buffer = talloc(transport->mem_ctx,
+                                                              NBT_HDR_SIZE+transport->recv_buffer.req_size);
+                       if (transport->recv_buffer.buffer == NULL) {
+                               cli_transport_dead(transport);
+                               return;
+                       }
+                       memcpy(transport->recv_buffer.buffer, transport->recv_buffer.header, NBT_HDR_SIZE);
                }
-               
-               /* only other possibility is that we timed out - call the idle function
-                  if there is one */
-               if (transport->idle.func) {
-                       transport->idle.func(transport, transport->idle.private);
+       }
+
+       if (transport->recv_buffer.received < transport->recv_buffer.req_size) {
+               ssize_t ret;
+               ret = cli_sock_read(transport->socket, 
+                                   transport->recv_buffer.buffer + 
+                                   transport->recv_buffer.received,
+                                   transport->recv_buffer.req_size - 
+                                   transport->recv_buffer.received);
+               if (ret == -1) {
+                       if (errno == EINTR || errno == EAGAIN) {
+                               return;
+                       }
+                       cli_transport_dead(transport);
+                       return;
                }
-       } while (selrtn == 0);
+               transport->recv_buffer.received += ret;
+       }
+
+       if (transport->recv_buffer.received != 0 &&
+           transport->recv_buffer.received == transport->recv_buffer.req_size) {
+               cli_transport_finish_recv(transport);
+       }
+}
 
+/*
+  process some read/write requests that are pending
+  return False if the socket is dead
+*/
+BOOL cli_transport_process(struct cli_transport *transport)
+{
+       cli_transport_process_send(transport);
+       cli_transport_process_recv(transport);
+       if (transport->socket->fd == -1) {
+               return False;
+       }
        return True;
 }
 
+
+
+/*
+  put a request into the send queue
+*/
+void cli_transport_send(struct cli_request *req)
+{
+       /* put it on the outgoing socket queue */
+       req->state = CLI_REQUEST_SEND;
+       DLIST_ADD(req->transport->pending_send, req);
+
+       /* make sure we look for write events */
+       cli_transport_write_enable(req->transport);
+}
index c31f07505fd935443b7bcf1f7c151e775060b915..ce6cd0a1a49c567a908f4b0d583cd32fcfb32010 100644 (file)
@@ -43,7 +43,7 @@ NTSTATUS cli_request_destroy(struct cli_request *req)
        if (req->transport) {
                /* remove it from the list of pending requests (a null op if
                   its not in the list) */
-               DLIST_REMOVE(req->transport->pending_requests, req);
+               DLIST_REMOVE(req->transport->pending_recv, req);
        }
 
        /* ahh, its so nice to destroy a complex structure in such a
@@ -79,6 +79,7 @@ struct cli_request *cli_request_setup_nonsmb(struct cli_transport *transport, ui
        ZERO_STRUCTP(req);
 
        /* setup the request context */
+       req->state = CLI_REQUEST_INIT;
        req->mem_ctx = mem_ctx;
        req->transport = transport;
        req->session = NULL;
@@ -266,32 +267,20 @@ static void cli_req_grow_data(struct cli_request *req, uint_t new_size)
        SSVAL(req->out.vwv, VWV(req->out.wct), new_size);
 }
 
+
 /*
   send a message
 */
 BOOL cli_request_send(struct cli_request *req)
 {
-       uint_t ret;
-
        if (IVAL(req->out.buffer, 0) == 0) {
                _smb_setlen(req->out.buffer, req->out.size - NBT_HDR_SIZE);
        }
 
        cli_request_calculate_sign_mac(req);
 
-       ret = cli_sock_write(req->transport->socket, req->out.buffer, req->out.size);
-
-       if (req->out.size != ret) {
-               req->transport->error.etype = ETYPE_SOCKET;
-               req->transport->error.e.socket_error = SOCKET_WRITE_ERROR;
-               DEBUG(0,("Error writing %d bytes to server - %s\n",
-                        (int)req->out.size, strerror(errno)));
-               return False;
-       }
+       cli_transport_send(req);
 
-       /* add it to the list of pending requests */
-       DLIST_ADD(req->transport->pending_requests, req);
-       
        return True;
 }
 
@@ -306,17 +295,8 @@ BOOL cli_request_receive(struct cli_request *req)
        if (!req) return False;
 
        /* keep receiving packets until this one is replied to */
-       while (!req->in.buffer) {
-               if (!cli_transport_select(req->transport)) {
-                       req->status = NT_STATUS_UNSUCCESSFUL;
-                       return False;
-               }
-
-               if (!cli_request_receive_next(req->transport)) {
-                       cli_transport_dead(req->transport);
-                       req->status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
-                       return False;
-               }
+       while (req->state <= CLI_REQUEST_RECV) {
+               event_loop_once(req->transport->event.ctx);
        }
 
        return True;
@@ -327,7 +307,7 @@ BOOL cli_request_receive(struct cli_request *req)
   handle oplock break requests from the server - return True if the request was
   an oplock break
 */
-static BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, const char *hdr, const char *vwv)
+BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, const char *hdr, const char *vwv)
 {
        /* we must be very fussy about what we consider an oplock break to avoid
           matching readbraw replies */
@@ -350,158 +330,6 @@ static BOOL handle_oplock_break(struct cli_transport *transport, uint_t len, con
        return True;
 }
 
-
-/*
-  receive an async message from the server
-  this function assumes that the caller already knows that the socket is readable
-  and that there is a packet waiting
-
-  The packet is not actually returned by this function, instead any
-  registered async message handlers are called
-
-  return True if a packet was successfully received and processed
-  return False if the socket appears to be dead
-*/
-BOOL cli_request_receive_next(struct cli_transport *transport)
-{
-       BOOL ret;
-       int len;
-       char header[NBT_HDR_SIZE];
-       char *buffer, *hdr, *vwv;
-       TALLOC_CTX *mem_ctx;
-       struct cli_request *req;
-       uint16_t wct, mid = 0;
-
-       len = cli_sock_read(transport->socket, header, 4);
-       if (len != 4) {
-               return False;
-       }
-       
-       len = smb_len(header);
-
-       mem_ctx = talloc_init("cli_request_receive_next");
-       
-       /* allocate the incoming buffer at the right size */
-       buffer = talloc(mem_ctx, len+NBT_HDR_SIZE);
-       if (!buffer) {
-               talloc_destroy(mem_ctx);
-               return False;
-       }
-
-       /* fill in the already received header */
-       memcpy(buffer, header, NBT_HDR_SIZE);
-
-       ret = cli_sock_read(transport->socket, buffer + NBT_HDR_SIZE, len);
-       /* If the server is not responding, note that now */
-       if (ret != len) {
-               return False;
-       }
-
-       hdr = buffer+NBT_HDR_SIZE;
-       vwv = hdr + HDR_VWV;
-
-       /* see if it could be an oplock break request */
-       if (handle_oplock_break(transport, len, hdr, vwv)) {
-               goto done;
-       }
-
-       /* at this point we need to check for a readbraw reply, as these can be any length */
-       if (transport->readbraw_pending) {
-               transport->readbraw_pending = 0;
-
-               /* it must match the first entry in the pending queue as the client is not allowed
-                  to have outstanding readbraw requests */
-               req = transport->pending_requests;
-               if (!req) goto done;
-
-               req->in.buffer = buffer;
-               talloc_steal(mem_ctx, req->mem_ctx, buffer);
-               req->in.size = len + NBT_HDR_SIZE;
-               req->in.allocated = req->in.size;
-               goto async;
-       }
-
-       if (len >= MIN_SMB_SIZE) {
-               /* extract the mid for matching to pending requests */
-               mid = SVAL(hdr, HDR_MID);
-               wct = CVAL(hdr, HDR_WCT);
-       }
-
-       /* match the incoming request against the list of pending requests */
-       for (req=transport->pending_requests; req; req=req->next) {
-               if (req->mid == mid) break;
-       }
-
-       if (!req) {
-               DEBUG(3,("Discarding unmatched reply with mid %d\n", mid));
-               goto done;
-       }
-
-       /* fill in the 'in' portion of the matching request */
-       req->in.buffer = buffer;
-       talloc_steal(mem_ctx, req->mem_ctx, buffer);
-       req->in.size = len + NBT_HDR_SIZE;
-       req->in.allocated = req->in.size;
-
-       /* handle non-SMB replies */
-       if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE) {
-               goto done;
-       }
-
-       if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
-               DEBUG(2,("bad reply size for mid %d\n", mid));
-               req->status = NT_STATUS_UNSUCCESSFUL;
-               goto done;
-       }
-
-       req->in.hdr = hdr;
-       req->in.vwv = vwv;
-       req->in.wct = wct;
-       if (req->in.size >= NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct)) {
-               req->in.data = req->in.vwv + VWV(wct) + 2;
-               req->in.data_size = SVAL(req->in.vwv, VWV(wct));
-               if (req->in.size < NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct) + req->in.data_size) {
-                       DEBUG(3,("bad data size for mid %d\n", mid));
-                       /* blergh - w2k3 gives a bogus data size values in some
-                          openX replies */
-                       req->in.data_size = req->in.size - (NBT_HDR_SIZE + MIN_SMB_SIZE + VWV(wct));
-               }
-       }
-       req->in.ptr = req->in.data;
-       req->flags2 = SVAL(req->in.hdr, HDR_FLG2);
-
-       if (!(req->flags2 & FLAGS2_32_BIT_ERROR_CODES)) {
-               transport->error.etype = ETYPE_DOS;
-               transport->error.e.dos.eclass = CVAL(req->in.hdr,HDR_RCLS);
-               transport->error.e.dos.ecode = SVAL(req->in.hdr,HDR_ERR);
-               req->status = dos_to_ntstatus(transport->error.e.dos.eclass, 
-                                             transport->error.e.dos.ecode);
-       } else {
-               transport->error.etype = ETYPE_NT;
-               transport->error.e.nt_status = NT_STATUS(IVAL(req->in.hdr, HDR_RCLS));
-               req->status = transport->error.e.nt_status;
-       }
-
-       if (!cli_request_check_sign_mac(req)) {
-               transport->error.etype = ETYPE_SOCKET;
-               transport->error.e.socket_error = SOCKET_READ_BAD_SIG;
-               return False;
-       };
-
-async:
-       /* if this request has an async handler then call that to
-          notify that the reply has been received. This might destroy
-          the request so it must happen last */
-       if (req->async.fn) {
-               req->async.fn(req);
-       }
-
-done:
-       talloc_destroy(mem_ctx);
-       return True;
-}
-
-
 /*
   wait for a reply to be received for a packet that just returns an error
   code and nothing more
index b6d3486ad8c18c8c44197710d2bc2880298f98e5..fd94a923c90cbf6d03e0a229ace8b65661d90a8f 100644 (file)
@@ -68,17 +68,17 @@ static BOOL oplock_handler(struct cli_transport *transport, uint16_t tid, uint16
        return req_send_oplock_break(private->tcon, fnum, level);
 }
 
-/*
+ /*
   a handler for read events on a connection to a backend server
 */
 static void cifs_socket_handler(struct event_context *ev, struct fd_event *fde, time_t t, uint16_t flags)
 {
        struct cvfs_private *private = fde->private;
        struct smbsrv_tcon *tcon = private->tcon;
-
+       
        DEBUG(5,("cifs_socket_handler event on fd %d\n", fde->fd));
-
-       if (!cli_request_receive_next(private->transport)) {
+       
+       if (!cli_transport_process(private->transport)) {
                /* the connection to our server is dead */
                close_cnum(tcon);
        }
@@ -93,7 +93,6 @@ static NTSTATUS cvfs_connect(struct smbsrv_request *req, const char *sharename)
        NTSTATUS status;
        struct cvfs_private *private;
        const char *map_calls;
-       struct fd_event fde;
        const char *host, *user, *pass, *domain, *remote_share;
 
        /* Here we need to determine which server to connect to.
@@ -157,18 +156,17 @@ static NTSTATUS cvfs_connect(struct smbsrv_request *req, const char *sharename)
                tcon->ntvfs_ops = ops;
        }         
 
-       /* we need to tell the event loop that we wish to receive read events
-          on our SMB connection to the server */
-       fde.fd = private->transport->socket->fd;
-       fde.flags = EVENT_FD_READ;
-       fde.private = private;
-       fde.handler = cifs_socket_handler;
-
-       event_add_fd(tcon->smb_conn->connection->event.ctx, &fde);
-
        /* we need to receive oplock break requests from the server */
        cli_oplock_handler(private->transport, oplock_handler, private);
-       cli_transport_idle_handler(private->transport, idle_func, 100, private);
+       cli_transport_idle_handler(private->transport, idle_func, 1, private);
+
+       private->transport->event.fde->handler = cifs_socket_handler;
+       private->transport->event.fde->private = private;
+
+       event_context_merge(tcon->smb_conn->connection->event.ctx,
+                           private->transport->event.ctx);
+
+       private->transport->event.ctx = tcon->smb_conn->connection->event.ctx;
 
        return NT_STATUS_OK;
 }
@@ -180,7 +178,6 @@ static NTSTATUS cvfs_disconnect(struct smbsrv_tcon *tcon)
 {
        struct cvfs_private *private = tcon->ntvfs_private;
 
-       event_remove_fd_all(tcon->smb_conn->connection->event.ctx, private->transport->socket->fd);
        smb_tree_disconnect(private->tree);
        cli_tree_close(private->tree);
 
index fd9e35074d6448f67a07d1ebd75a9b0b23761e8d..ca36dc3aa94cafa947edfd1eece4528bade578fa 100644 (file)
@@ -745,6 +745,7 @@ static void smbsrv_recv(struct server_connection *conn, time_t t, uint16_t flags
 
        req = receive_smb_request(smb_conn);
        if (!req) {
+               conn->event.fde->flags = 0;
                smbsrv_terminate_connection(smb_conn, "receive error");
                return;
        }
index cf513414e8d953337b7bd0d35035442fcbae6687..20a467100b0fc25c45633fbad39dc17d23d8e2d6 100644 (file)
@@ -537,7 +537,8 @@ BOOL torture_smb_scan(int dummy)
                }
 
                usleep(10000);
-               if (cli_transport_pending(cli->transport)) {
+               cli_transport_process(cli->transport);
+               if (req->state > CLI_REQUEST_RECV) {
                        status = cli_request_simple_recv(req);
                        printf("op=0x%x status=%s\n", op, nt_errstr(status));
                        torture_close_connection(cli);
@@ -545,7 +546,8 @@ BOOL torture_smb_scan(int dummy)
                }
 
                sleep(1);
-               if (cli_transport_pending(cli->transport)) {
+               cli_transport_process(cli->transport);
+               if (req->state > CLI_REQUEST_RECV) {
                        status = cli_request_simple_recv(req);
                        printf("op=0x%x status=%s\n", op, nt_errstr(status));
                } else {
index 016c19fd5b1b97616cd549ea3ae008af6094b1fe..e45d9f0124f4ba6d4853b4424c420c78ce5f3d16 100644 (file)
@@ -185,7 +185,7 @@ static BOOL connect_servers(void)
                        }
 
                        cli_oplock_handler(servers[i].cli[j]->transport, oplock_handler, NULL);
-                       cli_transport_idle_handler(servers[i].cli[j]->transport, idle_func, 10, NULL);
+                       cli_transport_idle_handler(servers[i].cli[j]->transport, idle_func, 1, NULL);
                }
        }
 
@@ -764,13 +764,8 @@ static void idle_func(struct cli_transport *transport, void *private)
        for (i=0;i<NSERVERS;i++) {
                for (j=0;j<NINSTANCES;j++) {
                        if (servers[i].cli[j] &&
-                           transport != servers[i].cli[j]->transport &&
-                           cli_transport_pending(servers[i].cli[j]->transport)) {
-                               if (!cli_request_receive_next(servers[i].cli[j]->transport)) {
-                                       printf("Connection to server %d instance %d died!\n",
-                                              i, j);
-                                       exit(1);
-                               }
+                           transport != servers[i].cli[j]->transport) {
+                               cli_transport_process(servers[i].cli[j]->transport);
                        }
                }
        }
@@ -808,13 +803,7 @@ static void check_pending(void)
 
        for (j=0;j<NINSTANCES;j++) {
                for (i=0;i<NSERVERS;i++) {
-                       if (cli_transport_pending(servers[i].cli[j]->transport)) {
-                               if (!cli_request_receive_next(servers[i].cli[j]->transport)) {
-                                       printf("Connection to server %d instance %d died!\n",
-                                              i, j);
-                                       exit(1);                                        
-                               }
-                       }
+                       cli_transport_process(servers[i].cli[j]->transport);
                }
        }       
 }