s3-prefork: add asynchronous functions
authorSimo Sorce <idra@samba.org>
Tue, 3 May 2011 22:55:25 +0000 (18:55 -0400)
committerAndreas Schneider <asn@samba.org>
Wed, 10 Aug 2011 16:14:03 +0000 (18:14 +0200)
To get a client connection it is now possible to use asynchronous functions.

Signed-off-by: Andreas Schneider <asn@samba.org>
source3/lib/server_prefork.c
source3/lib/server_prefork.h

index 4aa3f482cc0d4d4f596b77cbcbe4b0c302ff6d33..b337fa0c3b4ef0dfe2495ee5df937e951805788d 100644 (file)
@@ -24,6 +24,7 @@
 #include "system/filesys.h"
 #include "server_prefork.h"
 #include "../lib/util/util.h"
+#include "../lib/util/tevent_unix.h"
 
 struct prefork_pool {
 
@@ -94,7 +95,6 @@ bool prefork_create_pool(struct tevent_context *ev_ctx,
                case 0: /* THE CHILD */
 
                        pfp->pool[i].status = PF_WORKER_IDLE;
-
                        ret = pfp->main_fn(ev_ctx, &pfp->pool[i],
                                           pfp->listen_fd, pfp->lock_fd,
                                           pfp->private_data);
@@ -500,3 +500,261 @@ int prefork_wait_for_client(struct pf_worker_data *pf,
        *fd = sd;
        return 0;
 }
+
+/* ==== async code ==== */
+
+#define PF_ASYNC_LOCK_GRAB     0x01
+#define PF_ASYNC_LOCK_RELEASE  0x02
+#define PF_ASYNC_ACTION_MASK   0x03
+#define PF_ASYNC_LOCK_DONE     0x04
+
+struct pf_lock_state {
+       struct pf_worker_data *pf;
+       int lock_fd;
+       int flags;
+};
+
+static void prefork_lock_handler(struct tevent_context *ev,
+                                       struct tevent_timer *te,
+                                       struct timeval curtime, void *pvt);
+
+static struct tevent_req *prefork_lock_send(TALLOC_CTX *mem_ctx,
+                                               struct tevent_context *ev,
+                                               struct pf_worker_data *pf,
+                                               int lock_fd, int action)
+{
+       struct tevent_req *req;
+       struct pf_lock_state *state;
+
+       req = tevent_req_create(mem_ctx, &state, struct pf_lock_state);
+       if (!req) {
+               return NULL;
+       }
+
+       state->pf = pf;
+       state->lock_fd = lock_fd;
+       state->flags = action;
+
+       /* try once immediately */
+       prefork_lock_handler(ev, NULL, tevent_timeval_zero(), req);
+       if (state->flags & PF_ASYNC_LOCK_DONE) {
+               tevent_req_post(req, ev);
+       }
+
+       return req;
+}
+
+static void prefork_lock_handler(struct tevent_context *ev,
+                                struct tevent_timer *te,
+                                struct timeval curtime, void *pvt)
+{
+       struct tevent_req *req;
+       struct pf_lock_state *state;
+       int ret;
+
+       req = talloc_get_type_abort(pvt, struct tevent_req);
+       state = tevent_req_data(req, struct pf_lock_state);
+
+       switch (state->flags & PF_ASYNC_ACTION_MASK) {
+       case PF_ASYNC_LOCK_GRAB:
+               ret = prefork_grab_lock(state->pf, state->lock_fd, 0);
+               break;
+       case PF_ASYNC_LOCK_RELEASE:
+               ret = prefork_release_lock(state->pf, state->lock_fd, 0);
+               break;
+       default:
+               ret = EINVAL;
+               break;
+       }
+
+       switch (ret) {
+       case 0:
+               state->flags |= PF_ASYNC_LOCK_DONE;
+               tevent_req_done(req);
+               return;
+       case -1:
+               te = tevent_add_timer(ev, state,
+                                       tevent_timeval_current_ofs(1, 0),
+                                       prefork_lock_handler, req);
+               tevent_req_nomem(te, req);
+               return;
+       case -2:
+               /* server tells us to stop */
+               state->flags |= PF_ASYNC_LOCK_DONE;
+               tevent_req_error(req, -2);
+               return;
+       default:
+               state->flags |= PF_ASYNC_LOCK_DONE;
+               tevent_req_error(req, ret);
+               return;
+       }
+}
+
+static int prefork_lock_recv(struct tevent_req *req)
+{
+       int ret;
+
+       if (!tevent_req_is_unix_error(req, &ret)) {
+               ret = 0;
+       }
+
+       tevent_req_received(req);
+       return ret;
+}
+
+struct pf_listen_state {
+       struct tevent_context *ev;
+       struct pf_worker_data *pf;
+
+       int lock_fd;
+       int listen_fd;
+
+       struct sockaddr *addr;
+       socklen_t *addrlen;
+
+       int accept_fd;
+
+       int error;
+};
+
+static void prefork_listen_lock_done(struct tevent_req *subreq);
+static void prefork_listen_accept_handler(struct tevent_context *ev,
+                                         struct tevent_fd *fde,
+                                         uint16_t flags, void *pvt);
+static void prefork_listen_release_done(struct tevent_req *subreq);
+
+struct tevent_req *prefork_listen_send(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       struct pf_worker_data *pf,
+                                       int lock_fd, int listen_fd,
+                                       struct sockaddr *addr,
+                                       socklen_t *addrlen)
+{
+       struct tevent_req *req, *subreq;
+       struct pf_listen_state *state;
+
+       req = tevent_req_create(mem_ctx, &state, struct pf_listen_state);
+       if (!req) {
+               return NULL;
+       }
+
+       state->ev = ev;
+       state->pf = pf;
+       state->lock_fd = lock_fd;
+       state->listen_fd = listen_fd;
+       state->addr = addr;
+       state->addrlen = addrlen;
+       state->accept_fd = -1;
+       state->error = 0;
+
+       subreq = prefork_lock_send(state, state->ev, state->pf,
+                                  state->lock_fd, PF_ASYNC_LOCK_GRAB);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       tevent_req_set_callback(subreq, prefork_listen_lock_done, req);
+       return req;
+}
+
+static void prefork_listen_lock_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req;
+       struct pf_listen_state *state;
+       struct tevent_fd *fde;
+       int ret;
+
+       req = tevent_req_callback_data(subreq, struct tevent_req);
+       state = tevent_req_data(req, struct pf_listen_state);
+
+       ret = prefork_lock_recv(subreq);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       /* next step, accept */
+       fde = tevent_add_fd(state->ev, state,
+                           state->listen_fd, TEVENT_FD_READ,
+                           prefork_listen_accept_handler, req);
+       tevent_req_nomem(fde, req);
+}
+
+static void prefork_listen_accept_handler(struct tevent_context *ev,
+                                         struct tevent_fd *fde,
+                                         uint16_t flags, void *pvt)
+{
+       struct pf_listen_state *state;
+       struct tevent_req *req, *subreq;
+       int err = 0;
+       int sd = -1;
+
+       req = talloc_get_type_abort(pvt, struct tevent_req);
+       state = tevent_req_data(req, struct pf_listen_state);
+
+       sd = accept(state->listen_fd, state->addr, state->addrlen);
+       if (sd == -1) {
+               if (errno == EINTR) {
+                       /* keep trying */
+                       return;
+               }
+               err = errno;
+               DEBUG(6, ("Accept failed! (%d, %s)\n", err, strerror(err)));
+
+       }
+
+       /* do not track the listen fd anymore */
+       talloc_free(fde);
+       if (err) {
+               tevent_req_error(req, err);
+               return;
+       }
+
+       state->accept_fd = sd;
+
+       /* release lock now */
+       subreq = prefork_lock_send(state, state->ev, state->pf,
+                                  state->lock_fd, PF_ASYNC_LOCK_RELEASE);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, prefork_listen_release_done, req);
+}
+
+static void prefork_listen_release_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req;
+       int ret;
+
+       req = tevent_req_callback_data(subreq, struct tevent_req);
+
+       ret = prefork_lock_recv(subreq);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+int prefork_listen_recv(struct tevent_req *req, int *fd)
+{
+       struct pf_listen_state *state;
+       int ret;
+
+       state = tevent_req_data(req, struct pf_listen_state);
+
+       if (tevent_req_is_unix_error(req, &ret)) {
+               if (state->accept_fd != -1) {
+                       close(state->accept_fd);
+               }
+       } else {
+               *fd = state->accept_fd;
+               ret = 0;
+               state->pf->status = PF_WORKER_BUSY;
+               state->pf->num_clients++;
+       }
+
+       tevent_req_received(req);
+       return ret;
+}
index d6d7bf95c952da3b904b4cad8f5781e032e46e57..7e95602e814a26bddd28f7f7084ba09f7d2148ff 100644 (file)
@@ -74,3 +74,11 @@ int prefork_wait_for_client(struct pf_worker_data *pf,
                            int lock_fd, int listen_fd,
                            struct sockaddr *addr,
                            socklen_t *addrlen, int *fd);
+
+struct tevent_req *prefork_listen_send(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       struct pf_worker_data *pf,
+                                       int lock_fd, int listen_fd,
+                                       struct sockaddr *addr,
+                                       socklen_t *addrlen);
+int prefork_listen_recv(struct tevent_req *req, int *fd);