pthreadpool: add pthreadpool_restart_check[_monitor_{fd,drain}]()
authorStefan Metzmacher <metze@samba.org>
Mon, 16 Jul 2018 12:43:01 +0000 (14:43 +0200)
committerRalph Boehme <slow@samba.org>
Tue, 24 Jul 2018 15:38:28 +0000 (17:38 +0200)
This makes it possible to monitor the pthreadpool for exited worker
threads and may restart new threads from the main thread again.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
lib/pthreadpool/pthreadpool.c
lib/pthreadpool/pthreadpool.h
lib/pthreadpool/pthreadpool_sync.c

index 127e684c63e34c27e25aa7624700f5c40bd57134..db3837cbda37076dab99a6d7e86fe575fff1015d 100644 (file)
@@ -23,6 +23,7 @@
 #include "system/threads.h"
 #include "pthreadpool.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/blocking.h"
 
 #ifdef NDEBUG
 #undef NDEBUG
@@ -52,6 +53,8 @@ struct pthreadpool {
         */
        pthread_cond_t condvar;
 
+       int check_pipefd[2];
+
        /*
         * Array of jobs
         */
@@ -136,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 {
        struct pthreadpool *pool;
        int ret;
+       bool ok;
 
        pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
        if (pool == NULL) {
@@ -153,10 +157,52 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
                return ENOMEM;
        }
 
+       ret = pipe(pool->check_pipefd);
+       if (ret != 0) {
+               free(pool->jobs);
+               free(pool);
+               return ENOMEM;
+       }
+
+       ok = smb_set_close_on_exec(pool->check_pipefd[0]);
+       if (!ok) {
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
+               free(pool->jobs);
+               free(pool);
+               return EINVAL;
+       }
+       ok = smb_set_close_on_exec(pool->check_pipefd[1]);
+       if (!ok) {
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
+               free(pool->jobs);
+               free(pool);
+               return EINVAL;
+       }
+       ret = set_blocking(pool->check_pipefd[0], true);
+       if (ret == -1) {
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
+               free(pool->jobs);
+               free(pool);
+               return EINVAL;
+       }
+       ret = set_blocking(pool->check_pipefd[1], false);
+       if (ret == -1) {
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
+               free(pool->jobs);
+               free(pool);
+               return EINVAL;
+       }
+
        pool->head = pool->num_jobs = 0;
 
        ret = pthread_mutex_init(&pool->mutex, NULL);
        if (ret != 0) {
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -165,6 +211,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
        ret = pthread_cond_init(&pool->condvar, NULL);
        if (ret != 0) {
                pthread_mutex_destroy(&pool->mutex);
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -174,6 +222,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
        if (ret != 0) {
                pthread_cond_destroy(&pool->condvar);
                pthread_mutex_destroy(&pool->mutex);
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -196,6 +246,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
                pthread_mutex_destroy(&pool->fork_mutex);
                pthread_cond_destroy(&pool->condvar);
                pthread_mutex_destroy(&pool->mutex);
+               close(pool->check_pipefd[0]);
+               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -359,6 +411,14 @@ static void pthreadpool_child(void)
                pool->head = 0;
                pool->num_jobs = 0;
                pool->stopped = true;
+               if (pool->check_pipefd[0] != -1) {
+                       close(pool->check_pipefd[0]);
+                       pool->check_pipefd[0] = -1;
+               }
+               if (pool->check_pipefd[1] != -1) {
+                       close(pool->check_pipefd[1]);
+                       pool->check_pipefd[1] = -1;
+               }
 
                ret = pthread_cond_init(&pool->condvar, NULL);
                assert(ret == 0);
@@ -421,6 +481,14 @@ static int pthreadpool_free(struct pthreadpool *pool)
                return ret2;
        }
 
+       if (pool->check_pipefd[0] != -1) {
+               close(pool->check_pipefd[0]);
+               pool->check_pipefd[0] = -1;
+       }
+       if (pool->check_pipefd[1] != -1) {
+               close(pool->check_pipefd[1]);
+               pool->check_pipefd[1] = -1;
+       }
        free(pool->jobs);
        free(pool);
 
@@ -437,6 +505,15 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool)
 
        pool->stopped = true;
 
+       if (pool->check_pipefd[0] != -1) {
+               close(pool->check_pipefd[0]);
+               pool->check_pipefd[0] = -1;
+       }
+       if (pool->check_pipefd[1] != -1) {
+               close(pool->check_pipefd[1]);
+               pool->check_pipefd[1] = -1;
+       }
+
        if (pool->num_threads == 0) {
                return 0;
        }
@@ -521,6 +598,33 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 
        free_it = (pool->destroyed && (pool->num_threads == 0));
 
+       while (true) {
+               uint8_t c = 0;
+               ssize_t nwritten = 0;
+
+               if (pool->check_pipefd[1] == -1) {
+                       break;
+               }
+
+               nwritten = write(pool->check_pipefd[1], &c, 1);
+               if (nwritten == -1) {
+                       if (errno == EINTR) {
+                               continue;
+                       }
+                       if (errno == EAGAIN) {
+                               break;
+                       }
+#ifdef EWOULDBLOCK
+                       if (errno == EWOULDBLOCK) {
+                               break;
+                       }
+#endif
+                       /* ignore ... */
+               }
+
+               break;
+       }
+
        ret = pthread_mutex_unlock(&pool->mutex);
        assert(ret == 0);
 
@@ -851,6 +955,183 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
        return res;
 }
 
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+       int res;
+       int unlock_res;
+       unsigned possible_threads = 0;
+       unsigned missing_threads = 0;
+
+       assert(!pool->destroyed);
+
+       res = pthread_mutex_lock(&pool->mutex);
+       if (res != 0) {
+               return res;
+       }
+
+       if (pool->stopped) {
+               /*
+                * Protect against the pool being shut down while
+                * trying to add a job
+                */
+               unlock_res = pthread_mutex_unlock(&pool->mutex);
+               assert(unlock_res == 0);
+               return EINVAL;
+       }
+
+       if (pool->num_jobs == 0) {
+               /*
+                * This also handles the pool->max_threads == 0 case as it never
+                * calls pthreadpool_put_job()
+                */
+               unlock_res = pthread_mutex_unlock(&pool->mutex);
+               assert(unlock_res == 0);
+               return 0;
+       }
+
+       if (pool->num_idle > 0) {
+               /*
+                * We have idle threads and pending jobs,
+                * this means we better let all threads
+                * start and check for pending jobs.
+                */
+               res = pthread_cond_broadcast(&pool->condvar);
+               assert(res == 0);
+       }
+
+       if (pool->num_threads < pool->max_threads) {
+               possible_threads = pool->max_threads - pool->num_threads;
+       }
+
+       if (pool->num_idle < pool->num_jobs) {
+               missing_threads = pool->num_jobs - pool->num_idle;
+       }
+
+       missing_threads = MIN(missing_threads, possible_threads);
+
+       while (missing_threads > 0) {
+
+               res = pthreadpool_create_thread(pool);
+               if (res != 0) {
+                       break;
+               }
+
+               missing_threads--;
+       }
+
+       if (missing_threads == 0) {
+               /*
+                * Ok, we recreated all thread we need.
+                */
+               unlock_res = pthread_mutex_unlock(&pool->mutex);
+               assert(unlock_res == 0);
+               return 0;
+       }
+
+       if (pool->num_threads != 0) {
+               /*
+                * At least one thread is still available, let
+                * that one run the queued jobs.
+                */
+               unlock_res = pthread_mutex_unlock(&pool->mutex);
+               assert(unlock_res == 0);
+               return 0;
+       }
+
+       /*
+        * There's no thread available to run any pending jobs.
+        * The caller may want to cancel the jobs and destroy the pool.
+        * But that's up to the caller.
+        */
+       unlock_res = pthread_mutex_unlock(&pool->mutex);
+       assert(unlock_res == 0);
+
+       return res;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+       int fd;
+       int ret;
+       bool ok;
+
+       if (pool->stopped) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       if (pool->check_pipefd[0] == -1) {
+               errno = ENOSYS;
+               return -1;
+       }
+
+       fd = dup(pool->check_pipefd[0]);
+       if (fd == -1) {
+               return -1;
+       }
+
+       ok = smb_set_close_on_exec(fd);
+       if (!ok) {
+               int saved_errno = errno;
+               close(fd);
+               errno = saved_errno;
+               return -1;
+       }
+
+       ret = set_blocking(fd, false);
+       if (ret == -1) {
+               int saved_errno = errno;
+               close(fd);
+               errno = saved_errno;
+               return -1;
+       }
+
+       return fd;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+       if (pool->stopped) {
+               return EINVAL;
+       }
+
+       if (pool->check_pipefd[0] == -1) {
+               return ENOSYS;
+       }
+
+       while (true) {
+               uint8_t buf[128];
+               ssize_t nread;
+
+               nread = read(pool->check_pipefd[0], buf, sizeof(buf));
+               if (nread == -1) {
+                       if (errno == EINTR) {
+                               continue;
+                       }
+                       if (errno == EAGAIN) {
+                               return 0;
+                       }
+#ifdef EWOULDBLOCK
+                       if (errno == EWOULDBLOCK) {
+                               return 0;
+                       }
+#endif
+                       if (errno == 0) {
+                               errno = INT_MAX;
+                       }
+
+                       return errno;
+               }
+
+               if (nread < sizeof(buf)) {
+                       return 0;
+               }
+       }
+
+       abort();
+       return INT_MAX;
+}
+
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
                              void (*fn)(void *private_data), void *private_data)
 {
index d8daf9e4519b95e68b17fb1d085c5e1611e9ba9e..543567ceaf78e177e0eb909fc2f92f2e7d419d5c 100644 (file)
@@ -144,6 +144,70 @@ int pthreadpool_destroy(struct pthreadpool *pool);
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                        void (*fn)(void *private_data), void *private_data);
 
+/**
+ * @brief Check if the pthreadpool needs a restart.
+ *
+ * This checks if there are enough threads to run the already
+ * queued jobs. This should be called only the callers signal_fn
+ * (passed to pthreadpool_init()) returned an error, so
+ * that the job's worker thread exited.
+ *
+ * Typically this is called once the file destriptor
+ * returned by pthreadpool_restart_check_monitor_fd()
+ * became readable and pthreadpool_restart_check_monitor_drain()
+ * returned success.
+ *
+ * This function tries to restart the missing threads.
+ *
+ * @param[in]  pool            The pool to run the job on
+ * @return                     success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check(struct pthreadpool *pool);
+
+/**
+ * @brief Return a file destriptor that monitors the pool.
+ *
+ * If the file destrictor becomes readable,
+ * the event handler should call pthreadpool_restart_check_monitor_drain().
+ *
+ * pthreadpool_restart_check() should also be called once the
+ * state is drained.
+ *
+ * This function returns a fresh fd using dup() each time.
+ *
+ * If the pool doesn't require restarts, this function
+ * returns -1 and sets errno = ENOSYS. The caller
+ * may ignore that situation.
+ *
+ * @param[in]  pool            The pool to run the job on
+ * @return                     success: 0, failure: -1 (set errno)
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool);
+
+/**
+ * @brief Drain the monitor file destriptor of the pool.
+ *
+ * If the file destrictor returned by pthreadpool_restart_check_monitor_fd()
+ * becomes readable, pthreadpool_restart_check_monitor_drain() should be
+ * called before pthreadpool_restart_check().
+ *
+ * If this function returns an error the caller should close
+ * the file destriptor it got from pthreadpool_restart_check_monitor_fd().
+ *
+ * @param[in]  pool            The pool to run the job on
+ * @return                     success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check
+ */
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool);
+
 /**
  * @brief Try to cancel a job in a pthreadpool
  *
index 2ed6f36dbbc70745cf46ba5ca01296b6ab156db9..a476ea712c3a586fb40b736af8f2a701cd24aa54 100644 (file)
@@ -83,6 +83,26 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                               pool->signal_fn_private_data);
 }
 
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+       if (pool->stopped) {
+               return EINVAL;
+       }
+
+       return 0;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+       errno = ENOSYS;
+       return -1;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+       return EINVAL;
+}
+
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
                              void (*fn)(void *private_data), void *private_data)
 {