ctdb-common: Add special monitor handling to run_event abstraction
authorAmitay Isaacs <amitay@gmail.com>
Wed, 8 Nov 2017 08:31:05 +0000 (19:31 +1100)
committerMartin Schwenke <martins@samba.org>
Fri, 24 Nov 2017 10:49:19 +0000 (11:49 +0100)
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/common/run_event.c

index 0961d657a743fabfd31b4e629a4fd70e2081fdca..e230b12f151ab0b16a3b0792d9ffa1aab23e4b6a 100644 (file)
@@ -271,6 +271,10 @@ struct run_event_context {
        const char *script_dir;
        const char *debug_prog;
        bool debug_running;
+
+       struct tevent_queue *queue;
+       struct tevent_req *current_req;
+       bool monitor_running;
 };
 
 
@@ -321,6 +325,14 @@ int run_event_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 
        run_ctx->debug_running = false;
 
+       run_ctx->queue = tevent_queue_create(run_ctx, "run event queue");
+       if (run_ctx->queue == NULL) {
+               talloc_free(run_ctx);
+               return ENOMEM;
+       }
+
+       run_ctx->monitor_running = false;
+
        *out = run_ctx;
        return 0;
 }
@@ -341,6 +353,32 @@ static const char *run_event_debug_prog(struct run_event_context *run_ctx)
        return run_ctx->debug_prog;
 }
 
+static struct tevent_queue *run_event_queue(struct run_event_context *run_ctx)
+{
+       return run_ctx->queue;
+}
+
+static void run_event_start_running(struct run_event_context *run_ctx,
+                                   struct tevent_req *req, bool is_monitor)
+{
+       run_ctx->current_req = req;
+       run_ctx->monitor_running = is_monitor;
+}
+
+static void run_event_stop_running(struct run_event_context *run_ctx)
+{
+       run_ctx->current_req = NULL;
+       run_ctx->monitor_running = false;
+}
+
+static struct tevent_req *run_event_get_running(
+                               struct run_event_context *run_ctx,
+                               bool *is_monitor)
+{
+       *is_monitor = run_ctx->monitor_running;
+       return run_ctx->current_req;
+}
+
 static int run_event_script_status(struct run_event_script *script)
 {
        int ret;
@@ -583,14 +621,18 @@ struct run_event_state {
        struct tevent_context *ev;
        struct run_event_context *run_ctx;
        const char *event_str;
+       const char *arg_str;
        struct timeval timeout;
 
        struct run_event_script_list *script_list;
        const char **argv;
+       struct tevent_req *script_subreq;
        int index;
-       int status;
+       bool cancelled;
 };
 
+static void run_event_cancel(struct tevent_req *req);
+static void run_event_trigger(struct tevent_req *req, void *private_data);
 static struct tevent_req *run_event_run_script(struct tevent_req *req);
 static void run_event_next_script(struct tevent_req *subreq);
 static void run_event_debug(struct tevent_req *req, pid_t pid);
@@ -603,9 +645,9 @@ struct tevent_req *run_event_send(TALLOC_CTX *mem_ctx,
                                  const char *arg_str,
                                  struct timeval timeout)
 {
-       struct tevent_req *req, *subreq;
+       struct tevent_req *req, *current_req;
        struct run_event_state *state;
-       int ret;
+       bool monitor_running, status;
 
        req = tevent_req_create(mem_ctx, &state, struct run_event_state);
        if (req == NULL) {
@@ -618,39 +660,113 @@ struct tevent_req *run_event_send(TALLOC_CTX *mem_ctx,
        if (tevent_req_nomem(state->event_str, req)) {
                return tevent_req_post(req, ev);
        }
+       if (arg_str != NULL) {
+               state->arg_str = talloc_strdup(state, arg_str);
+               if (tevent_req_nomem(state->arg_str, req)) {
+                       return tevent_req_post(req, ev);
+               }
+       }
        state->timeout = timeout;
+       state->cancelled = false;
+
+       /*
+        * If monitor event is running,
+        *   cancel the running monitor event and run new event
+        *
+        * If any other event is running,
+        *   if new event is monitor, cancel that event
+        *   else add new event to the queue
+        */
+
+       current_req = run_event_get_running(run_ctx, &monitor_running);
+       if (current_req != NULL) {
+               if (monitor_running) {
+                       run_event_cancel(current_req);
+               } else if (strcmp(event_str, "monitor") == 0) {
+                       state->script_list = talloc_zero(
+                               state, struct run_event_script_list);
+                       if (tevent_req_nomem(state->script_list, req)) {
+                               return tevent_req_post(req, ev);
+                       }
+                       state->script_list->summary = -ECANCELED;
+                       tevent_req_done(req);
+                       return tevent_req_post(req, ev);
+               }
+       }
+
+       status = tevent_queue_add(run_event_queue(run_ctx), ev, req,
+                                 run_event_trigger, NULL);
+       if (! status) {
+               tevent_req_error(req, ENOMEM);
+               return tevent_req_post(req, ev);
+       }
+
+       return req;
+}
 
-       ret = get_script_list(state, run_event_script_dir(run_ctx),
+static void run_event_cancel(struct tevent_req *req)
+{
+       struct run_event_state *state = tevent_req_data(
+               req, struct run_event_state);
+
+       run_event_stop_running(state->run_ctx);
+
+       state->script_list->summary = -ECANCELED;
+       state->cancelled = true;
+
+       TALLOC_FREE(state->script_subreq);
+
+       tevent_req_done(req);
+}
+
+static void run_event_trigger(struct tevent_req *req, void *private_data)
+{
+       struct tevent_req *subreq;
+       struct run_event_state *state = tevent_req_data(
+               req, struct run_event_state);
+       int ret;
+       bool is_monitor = false;
+
+       D_DEBUG("Running event %s with args \"%s\"\n", state->event_str,
+               state->arg_str == NULL ? "(null)" : state->arg_str);
+
+       ret = get_script_list(state, run_event_script_dir(state->run_ctx),
                              &state->script_list);
        if (ret != 0) {
                D_ERR("get_script_list() failed, ret=%d\n", ret);
                tevent_req_error(req, ret);
-               return tevent_req_post(req, ev);
+               return;
        }
 
        /* No scripts */
        if (state->script_list == NULL ||
            state->script_list->num_scripts == 0) {
                tevent_req_done(req);
-               return tevent_req_post(req, ev);
+               return;
        }
 
-       ret = script_args(state, event_str, arg_str, &state->argv);
+       ret = script_args(state, state->event_str, state->arg_str,
+                         &state->argv);
        if (ret != 0) {
                D_ERR("script_args() failed, ret=%d\n", ret);
                tevent_req_error(req, ret);
-               return tevent_req_post(req, ev);
+               return;
        }
 
        state->index = 0;
 
        subreq = run_event_run_script(req);
        if (tevent_req_nomem(subreq, req)) {
-               return tevent_req_post(req, ev);
+               return;
        }
        tevent_req_set_callback(subreq, run_event_next_script, req);
 
-       return req;
+       state->script_subreq = subreq;
+
+       if (strcmp(state->event_str, "monitor") == 0) {
+               is_monitor = true;
+       }
+       run_event_start_running(state->run_ctx, req, is_monitor);
 }
 
 static struct tevent_req *run_event_run_script(struct tevent_req *req)
@@ -702,12 +818,17 @@ static void run_event_next_script(struct tevent_req *subreq)
        status = run_proc_recv(subreq, &ret, &script->result, &pid,
                               state->script_list, &script->output);
        TALLOC_FREE(subreq);
+       state->script_subreq = NULL;
        if (! status) {
                D_ERR("run_proc failed for %s, ret=%d\n", script->name, ret);
                tevent_req_error(req, ret);
                return;
        }
 
+       if (state->cancelled) {
+               return;
+       }
+
        /* Log output */
        if (script->output != NULL) {
                debug_log(DEBUG_ERR, script->output, script->name);
@@ -731,6 +852,7 @@ static void run_event_next_script(struct tevent_req *subreq)
                D_NOTICE("%s event %s\n", state->event_str,
                         (script->summary == -ETIME) ? "timed out" : "failed");
 
+               run_event_stop_running(state->run_ctx);
                tevent_req_done(req);
                return;
        }
@@ -739,6 +861,7 @@ static void run_event_next_script(struct tevent_req *subreq)
 
        /* All scripts executed */
        if (state->index >= state->script_list->num_scripts) {
+               run_event_stop_running(state->run_ctx);
                tevent_req_done(req);
                return;
        }
@@ -748,6 +871,8 @@ static void run_event_next_script(struct tevent_req *subreq)
                return;
        }
        tevent_req_set_callback(subreq, run_event_next_script, req);
+
+       state->script_subreq = subreq;
 }
 
 static void run_event_debug(struct tevent_req *req, pid_t pid)