ctdb-common: Add wait_send/wait_recv to sock_daemon_funcs
authorAmitay Isaacs <amitay@gmail.com>
Wed, 11 Jan 2017 09:37:00 +0000 (20:37 +1100)
committerStefan Metzmacher <metze@samba.org>
Mon, 16 Jan 2017 20:16:51 +0000 (21:16 +0100)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=12510

To be able to terminate the daemon from within the implementation,
create a subreq using wait_send() provided by the implementation.
When the subreq is finished, it signals the sock_daemon code to terminate
the daemon.

This avoids the need to keep track of the top level tevent_req causing
layer violation and keeps the code flow straighforward.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Autobuild-User(master): Stefan Metzmacher <metze@samba.org>
Autobuild-Date(master): Mon Jan 16 21:16:51 CET 2017 on sn-devel-144

ctdb/common/sock_daemon.c
ctdb/common/sock_daemon.h
ctdb/tests/cunit/sock_daemon_test_001.sh
ctdb/tests/src/sock_daemon_test.c

index 3b679ab133263de6520b50b457fdc88174736b0e..b53b4d85333f087dcc8a67e2b8fae279ed786fc6 100644 (file)
@@ -70,7 +70,6 @@ struct sock_daemon_context {
 
        struct pidfile_context *pid_ctx;
        struct sock_socket *socket_list;
 
        struct pidfile_context *pid_ctx;
        struct sock_socket *socket_list;
-       struct tevent_req *req;
 };
 
 /*
 };
 
 /*
@@ -451,8 +450,6 @@ bool sock_socket_write_recv(struct tevent_req *req, int *perr)
  * Socket daemon
  */
 
  * Socket daemon
  */
 
-static int sock_daemon_context_destructor(struct sock_daemon_context *sockd);
-
 int sock_daemon_setup(TALLOC_CTX *mem_ctx, const char *daemon_name,
                      const char *logging, const char *debug_level,
                      const char *pidfile,
 int sock_daemon_setup(TALLOC_CTX *mem_ctx, const char *daemon_name,
                      const char *logging, const char *debug_level,
                      const char *pidfile,
@@ -487,21 +484,10 @@ int sock_daemon_setup(TALLOC_CTX *mem_ctx, const char *daemon_name,
                }
        }
 
                }
        }
 
-       talloc_set_destructor(sockd, sock_daemon_context_destructor);
-
        *out = sockd;
        return 0;
 }
 
        *out = sockd;
        return 0;
 }
 
-static int sock_daemon_context_destructor(struct sock_daemon_context *sockd)
-{
-       if (sockd->req != NULL) {
-               tevent_req_done(sockd->req);
-       }
-
-       return 0;
-}
-
 int sock_daemon_add_unix(struct sock_daemon_context *sockd,
                         const char *sockpath,
                         struct sock_socket_funcs *funcs,
 int sock_daemon_add_unix(struct sock_daemon_context *sockd,
                         const char *sockpath,
                         struct sock_socket_funcs *funcs,
@@ -546,6 +532,7 @@ static void sock_daemon_run_reconfigure(struct tevent_req *req);
 static void sock_daemon_run_shutdown(struct tevent_req *req);
 static void sock_daemon_run_socket_fail(struct tevent_req *subreq);
 static void sock_daemon_run_watch_pid(struct tevent_req *subreq);
 static void sock_daemon_run_shutdown(struct tevent_req *req);
 static void sock_daemon_run_socket_fail(struct tevent_req *subreq);
 static void sock_daemon_run_watch_pid(struct tevent_req *subreq);
+static void sock_daemon_run_wait_done(struct tevent_req *subreq);
 
 struct tevent_req *sock_daemon_run_send(TALLOC_CTX *mem_ctx,
                                        struct tevent_context *ev,
 
 struct tevent_req *sock_daemon_run_send(TALLOC_CTX *mem_ctx,
                                        struct tevent_context *ev,
@@ -620,7 +607,16 @@ struct tevent_req *sock_daemon_run_send(TALLOC_CTX *mem_ctx,
                                        req);
        }
 
                                        req);
        }
 
-       sockd->req = req;
+       if (sockd->funcs != NULL && sockd->funcs->wait_send != NULL &&
+           sockd->funcs->wait_recv != NULL) {
+               subreq = sockd->funcs->wait_send(state, ev,
+                                                sockd->private_data);
+               if (tevent_req_nomem(subreq, req)) {
+                       return tevent_req_post(req, ev);
+               }
+               tevent_req_set_callback(subreq, sock_daemon_run_wait_done,
+                                       req);
+       }
 
        return req;
 }
 
        return req;
 }
@@ -748,6 +744,26 @@ static void sock_daemon_run_watch_pid(struct tevent_req *subreq)
        tevent_req_set_callback(subreq, sock_daemon_run_watch_pid, req);
 }
 
        tevent_req_set_callback(subreq, sock_daemon_run_watch_pid, req);
 }
 
+static void sock_daemon_run_wait_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct sock_daemon_run_state *state = tevent_req_data(
+               req, struct sock_daemon_run_state);
+       struct sock_daemon_context *sockd = state->sockd;
+       int ret;
+       bool status;
+
+       status = sockd->funcs->wait_recv(subreq, &ret);
+       TALLOC_FREE(subreq);
+       sock_daemon_run_shutdown(req);
+       if (! status) {
+               tevent_req_error(req, ret);
+       } else {
+               tevent_req_done(req);
+       }
+}
+
 bool sock_daemon_run_recv(struct tevent_req *req, int *perr)
 {
        int ret;
 bool sock_daemon_run_recv(struct tevent_req *req, int *perr)
 {
        int ret;
@@ -778,7 +794,6 @@ int sock_daemon_run(struct tevent_context *ev,
        tevent_req_poll(req, ev);
 
        status = sock_daemon_run_recv(req, &ret);
        tevent_req_poll(req, ev);
 
        status = sock_daemon_run_recv(req, &ret);
-       sockd->req = NULL;
        TALLOC_FREE(req);
        if (! status) {
                return ret;
        TALLOC_FREE(req);
        if (! status) {
                return ret;
index 6c474acd021f21b3471514fda7e65639f36fec81..81853f66446aa42814a91ab065ca6ee9f2b067c5 100644 (file)
@@ -50,12 +50,24 @@ struct sock_client_context;
  * startup() is called when the daemon starts running
  *            either via sock_daemon_run() or via sock_daemon_run_send()
  * reconfigure() is called when process receives SIGUSR1 or SIGHUP
  * startup() is called when the daemon starts running
  *            either via sock_daemon_run() or via sock_daemon_run_send()
  * reconfigure() is called when process receives SIGUSR1 or SIGHUP
- * shutdown() is called when process receives SIGINT or SIGTERM
+ * shutdown() is called when process receives SIGINT or SIGTERM or
+ *             when wait computation has finished
+ *
+ * wait_send() starts the async computation to keep running the daemon
+ * wait_recv() ends the async computation to keep running the daemon
+ *
+ * If wait_send()/wait_recv() is NULL, then daemon will keep running forever.
+ * If wait_send() returns req, then when req is over, daemon will shutdown.
  */
 struct sock_daemon_funcs {
        void (*startup)(void *private_data);
        void (*reconfigure)(void *private_data);
        void (*shutdown)(void *private_data);
  */
 struct sock_daemon_funcs {
        void (*startup)(void *private_data);
        void (*reconfigure)(void *private_data);
        void (*shutdown)(void *private_data);
+
+       struct tevent_req * (*wait_send)(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        void *private_data);
+       bool (*wait_recv)(struct tevent_req *req, int *perr);
 };
 
 /**
 };
 
 /**
index 036b6ac6f26f25cf935198de97d36ab4a28bd031..72e5532a3321e3d01d9dc930b2c6bd45f31627b4 100755 (executable)
@@ -47,6 +47,7 @@ unit_test sock_daemon_test "$pidfile" "$sockpath" 3
 
 ok <<EOF
 test4[PID]: daemon started, pid=PID
 
 ok <<EOF
 test4[PID]: daemon started, pid=PID
+test4[PID]: Shutting down
 EOF
 unit_test sock_daemon_test "$pidfile" "$sockpath" 4
 
 EOF
 unit_test sock_daemon_test "$pidfile" "$sockpath" 4
 
@@ -61,5 +62,6 @@ unit_test sock_daemon_test "$pidfile" "$sockpath" 5
 ok <<EOF
 test6[PID]: listening on $sockpath
 test6[PID]: daemon started, pid=PID
 ok <<EOF
 test6[PID]: listening on $sockpath
 test6[PID]: daemon started, pid=PID
+test6[PID]: Shutting down
 EOF
 unit_test sock_daemon_test "$pidfile" "$sockpath" 6
 EOF
 unit_test sock_daemon_test "$pidfile" "$sockpath" 6
index 4a085c0a55a0b19408a9e661fe2d52c0ddde29ed..278dcab8ec071b9e059cd673780f298775f1e43b 100644 (file)
@@ -254,17 +254,68 @@ static void test3(TALLOC_CTX *mem_ctx, const char *pidfile,
        assert(ret == -1);
 }
 
        assert(ret == -1);
 }
 
-static void test4_handler(struct tevent_context *ev,
-                         struct tevent_timer *te,
-                         struct timeval curtime,
-                         void *private_data)
+struct test4_wait_state {
+};
+
+static void test4_wait_done(struct tevent_req *subreq);
+
+static struct tevent_req *test4_wait_send(TALLOC_CTX *mem_ctx,
+                                         struct tevent_context *ev,
+                                         void *private_data)
 {
 {
-       struct sock_daemon_context *sockd = talloc_get_type_abort(
-               private_data, struct sock_daemon_context);
+       struct tevent_req *req, *subreq;
+       struct test4_wait_state *state;
 
 
-       talloc_free(sockd);
+       req = tevent_req_create(mem_ctx, &state, struct test4_wait_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       subreq = tevent_wakeup_send(state, ev,
+                                   tevent_timeval_current_ofs(10,0));
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, test4_wait_done, req);
+
+       return req;
 }
 
 }
 
+static void test4_wait_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       bool status;
+
+       status = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+
+       if (! status) {
+               tevent_req_error(req, EIO);
+       } else {
+               tevent_req_done(req);
+       }
+}
+
+static bool test4_wait_recv(struct tevent_req *req, int *perr)
+{
+       int ret;
+
+       if (tevent_req_is_unix_error(req, &ret)) {
+               if (perr != NULL) {
+                       *perr = ret;
+               }
+               return false;
+       }
+
+       return true;
+}
+
+static struct sock_daemon_funcs test4_funcs = {
+       .wait_send = test4_wait_send,
+       .wait_recv = test4_wait_recv,
+};
+
 static void test4(TALLOC_CTX *mem_ctx, const char *pidfile,
                  const char *sockpath)
 {
 static void test4(TALLOC_CTX *mem_ctx, const char *pidfile,
                  const char *sockpath)
 {
@@ -278,19 +329,14 @@ static void test4(TALLOC_CTX *mem_ctx, const char *pidfile,
        if (pid == 0) {
                struct tevent_context *ev;
                struct sock_daemon_context *sockd;
        if (pid == 0) {
                struct tevent_context *ev;
                struct sock_daemon_context *sockd;
-               struct tevent_timer *te;
 
                ev = tevent_context_init(mem_ctx);
                assert(ev != NULL);
 
                ret = sock_daemon_setup(mem_ctx, "test4", "file:", "NOTICE",
 
                ev = tevent_context_init(mem_ctx);
                assert(ev != NULL);
 
                ret = sock_daemon_setup(mem_ctx, "test4", "file:", "NOTICE",
-                                       NULL, NULL, NULL, &sockd);
+                                       pidfile, &test4_funcs, NULL, &sockd);
                assert(ret == 0);
 
                assert(ret == 0);
 
-               te = tevent_add_timer(ev, ev, tevent_timeval_current_ofs(10,0),
-                                     test4_handler, sockd);
-               assert(te != NULL);
-
                ret = sock_daemon_run(ev, sockd, -1);
                assert(ret == 0);
 
                ret = sock_daemon_run(ev, sockd, -1);
                assert(ret == 0);
 
@@ -666,7 +712,7 @@ static void test6_client(const char *sockpath)
 
 struct test6_server_state {
        struct sock_daemon_context *sockd;
 
 struct test6_server_state {
        struct sock_daemon_context *sockd;
-       int done;
+       int fd, done;
 };
 
 struct test6_read_state {
 };
 
 struct test6_read_state {
@@ -752,35 +798,90 @@ static struct sock_socket_funcs test6_client_funcs = {
 
 static void test6_startup(void *private_data)
 {
 
 static void test6_startup(void *private_data)
 {
-       int fd = *(int *)private_data;
+       struct test6_server_state *server_state =
+               (struct test6_server_state *)private_data;
        int ret = 1;
        ssize_t nwritten;
 
        int ret = 1;
        ssize_t nwritten;
 
-       nwritten = write(fd, &ret, sizeof(ret));
+       nwritten = write(server_state->fd, &ret, sizeof(ret));
        assert(nwritten == sizeof(ret));
        assert(nwritten == sizeof(ret));
-       close(fd);
+       close(server_state->fd);
+       server_state->fd = -1;
 }
 
 }
 
-static struct sock_daemon_funcs test6_funcs = {
-       .startup = test6_startup,
+struct test6_wait_state {
+       struct test6_server_state *server_state;
 };
 
 };
 
-static void test6_handler(struct tevent_context *ev,
-                         struct tevent_timer *te,
-                         struct timeval curtime,
-                         void *private_data)
+static void test6_wait_done(struct tevent_req *subreq);
+
+static struct tevent_req *test6_wait_send(TALLOC_CTX *mem_ctx,
+                                         struct tevent_context *ev,
+                                         void *private_data)
 {
 {
-       struct test6_server_state *state =
-               (struct test6_server_state *)private_data;
+       struct tevent_req *req, *subreq;
+       struct test6_wait_state *state;
+
+       req = tevent_req_create(mem_ctx, &state, struct test6_wait_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->server_state = (struct test6_server_state *)private_data;
 
 
-       if (state->done == 0) {
-               kill(0, SIGTERM);
+       subreq = tevent_wakeup_send(state, ev,
+                                   tevent_timeval_current_ofs(10,0));
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, test6_wait_done, req);
+
+       return req;
+}
+
+static void test6_wait_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct test6_wait_state *state = tevent_req_data(
+               req, struct test6_wait_state);
+       bool status;
+
+       status = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, EIO);
                return;
        }
 
                return;
        }
 
-       talloc_free(state->sockd);
+       if (state->server_state->done == 0) {
+               tevent_req_error(req, EIO);
+               return;
+       }
+
+       tevent_req_done(req);
 }
 
 }
 
+static bool test6_wait_recv(struct tevent_req *req, int *perr)
+{
+       int ret;
+
+       if (tevent_req_is_unix_error(req, &ret)) {
+               if (perr != NULL) {
+                       *perr = ret;
+               }
+               return false;
+       }
+
+       return true;
+}
+
+static struct sock_daemon_funcs test6_funcs = {
+       .startup = test6_startup,
+       .wait_send = test6_wait_send,
+       .wait_recv = test6_wait_recv,
+};
+
 static void test6(TALLOC_CTX *mem_ctx, const char *pidfile,
                  const char *sockpath)
 {
 static void test6(TALLOC_CTX *mem_ctx, const char *pidfile,
                  const char *sockpath)
 {
@@ -799,29 +900,27 @@ static void test6(TALLOC_CTX *mem_ctx, const char *pidfile,
        if (pid_server == 0) {
                struct tevent_context *ev;
                struct sock_daemon_context *sockd;
        if (pid_server == 0) {
                struct tevent_context *ev;
                struct sock_daemon_context *sockd;
-               struct test6_server_state state;
-               struct tevent_timer *te;
+               struct test6_server_state server_state = { 0 };
 
                close(fd[0]);
 
                ev = tevent_context_init(mem_ctx);
                assert(ev != NULL);
 
 
                close(fd[0]);
 
                ev = tevent_context_init(mem_ctx);
                assert(ev != NULL);
 
+               server_state.fd = fd[1];
+
                ret = sock_daemon_setup(mem_ctx, "test6", "file:", "NOTICE",
                ret = sock_daemon_setup(mem_ctx, "test6", "file:", "NOTICE",
-                                       pidfile, &test6_funcs, &fd[1], &sockd);
+                                       pidfile, &test6_funcs, &server_state,
+                                       &sockd);
                assert(ret == 0);
 
                assert(ret == 0);
 
-               state.sockd = sockd;
-               state.done = 0;
+               server_state.sockd = sockd;
+               server_state.done = 0;
 
                ret = sock_daemon_add_unix(sockd, sockpath,
 
                ret = sock_daemon_add_unix(sockd, sockpath,
-                                          &test6_client_funcs, &state);
+                                          &test6_client_funcs, &server_state);
                assert(ret == 0);
 
                assert(ret == 0);
 
-               te = tevent_add_timer(ev, ev, tevent_timeval_current_ofs(10,0),
-                                     test6_handler, &state);
-               assert(te != NULL);
-
                ret = sock_daemon_run(ev, sockd, pid);
                assert(ret == 0);
 
                ret = sock_daemon_run(ev, sockd, pid);
                assert(ret == 0);