lib: Move pipe signalling to pthreadpool_pipe.c
authorVolker Lendecke <vl@samba.org>
Mon, 15 Aug 2016 11:59:12 +0000 (13:59 +0200)
committerJeremy Allison <jra@samba.org>
Tue, 23 Aug 2016 23:33:48 +0000 (01:33 +0200)
Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/pthreadpool/pthreadpool.c
source3/lib/pthreadpool/pthreadpool.h
source3/lib/pthreadpool/pthreadpool_pipe.c
source3/lib/pthreadpool/pthreadpool_sync.c

index b071e5393db5b20d86a6064a9465037f1956e3df..4c2858a0dee6f0961fe3364930387f251703f58a 100644 (file)
@@ -19,7 +19,6 @@
 
 #include "replace.h"
 #include "system/time.h"
-#include "system/filesys.h"
 #include "system/wait.h"
 #include "system/threads.h"
 #include "pthreadpool.h"
@@ -58,9 +57,10 @@ struct pthreadpool {
        size_t num_jobs;
 
        /*
-        * pipe for signalling
+        * Indicate job completion
         */
-       int sig_pipe[2];
+       int (*signal_fn)(int jobid, void *private_data);
+       void *signal_private_data;
 
        /*
         * indicator to worker threads that they should shut down
@@ -99,7 +99,9 @@ static void pthreadpool_prep_atfork(void);
  * Initialize a thread pool
  */
 
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+                    int (*signal_fn)(int jobid, void *private_data),
+                    void *signal_private_data)
 {
        struct pthreadpool *pool;
        int ret;
@@ -108,6 +110,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
        if (pool == NULL) {
                return ENOMEM;
        }
+       pool->signal_fn = signal_fn;
+       pool->signal_private_data = signal_private_data;
 
        pool->jobs_array_len = 4;
        pool->jobs = calloc(
@@ -120,18 +124,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 
        pool->head = pool->num_jobs = 0;
 
-       ret = pipe(pool->sig_pipe);
-       if (ret == -1) {
-               int err = errno;
-               free(pool->jobs);
-               free(pool);
-               return err;
-       }
-
        ret = pthread_mutex_init(&pool->mutex, NULL);
        if (ret != 0) {
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -140,8 +134,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
        ret = pthread_cond_init(&pool->condvar, NULL);
        if (ret != 0) {
                pthread_mutex_destroy(&pool->mutex);
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -158,8 +150,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
        if (ret != 0) {
                pthread_cond_destroy(&pool->condvar);
                pthread_mutex_destroy(&pool->mutex);
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -218,12 +208,6 @@ static void pthreadpool_child(void)
             pool != NULL;
             pool = DLIST_PREV(pool)) {
 
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
-
-               ret = pipe(pool->sig_pipe);
-               assert(ret == 0);
-
                pool->num_threads = 0;
 
                pool->num_exited = 0;
@@ -248,16 +232,6 @@ static void pthreadpool_prep_atfork(void)
                       pthreadpool_child);
 }
 
-/*
- * Return the file descriptor which becomes readable when a job has
- * finished
- */
-
-int pthreadpool_signal_fd(struct pthreadpool *pool)
-{
-       return pool->sig_pipe[0];
-}
-
 /*
  * Do a pthread_join() on all children that have exited, pool->mutex must be
  * locked
@@ -286,32 +260,6 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
         */
 }
 
-/*
- * Fetch a finished job number from the signal pipe
- */
-
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
-                             unsigned num_jobids)
-{
-       ssize_t to_read, nread;
-
-       nread = -1;
-       errno = EINTR;
-
-       to_read = sizeof(int) * num_jobids;
-
-       while ((nread == -1) && (errno == EINTR)) {
-               nread = read(pool->sig_pipe[0], jobids, to_read);
-       }
-       if (nread == -1) {
-               return -errno;
-       }
-       if ((nread % sizeof(int)) != 0) {
-               return -EINVAL;
-       }
-       return nread / sizeof(int);
-}
-
 /*
  * Destroy a thread pool, finishing all threads working for it
  */
@@ -390,12 +338,6 @@ int pthreadpool_destroy(struct pthreadpool *pool)
        ret = pthread_mutex_unlock(&pthreadpools_mutex);
        assert(ret == 0);
 
-       close(pool->sig_pipe[0]);
-       pool->sig_pipe[0] = -1;
-
-       close(pool->sig_pipe[1]);
-       pool->sig_pipe[1] = -1;
-
        free(pool->exited);
        free(pool->jobs);
        free(pool);
@@ -527,8 +469,7 @@ static void *pthreadpool_server(void *arg)
                }
 
                if (pthreadpool_get_job(pool, &job)) {
-                       ssize_t written;
-                       int sig_pipe = pool->sig_pipe[1];
+                       int ret;
 
                        /*
                         * Do the work with the mutex unlocked
@@ -542,8 +483,9 @@ static void *pthreadpool_server(void *arg)
                        res = pthread_mutex_lock(&pool->mutex);
                        assert(res == 0);
 
-                       written = write(sig_pipe, &job.id, sizeof(job.id));
-                       if (written != sizeof(int)) {
+                       ret = pool->signal_fn(job.id,
+                                             pool->signal_private_data);
+                       if (ret != 0) {
                                pthreadpool_server_exit(pool);
                                pthread_mutex_unlock(&pool->mutex);
                                return NULL;
index adb825a528a06f15abb5753430a31ae1bffbdef2..0b8d6e590c8722038271d58af6e76639101d0069 100644 (file)
@@ -43,7 +43,9 @@ struct pthreadpool;
  * max_threads=0 means unlimited parallelism. The caller has to take
  * care to not overload the system.
  */
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult);
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+                    int (*signal_fn)(int jobid, void *private_data),
+                    void *signal_private_data);
 
 /**
  * @brief Destroy a pthreadpool
@@ -60,8 +62,8 @@ int pthreadpool_destroy(struct pthreadpool *pool);
  * @brief Add a job to a pthreadpool
  *
  * This adds a job to a pthreadpool. The job can be identified by
- * job_id. This integer will be returned from
- * pthreadpool_finished_jobs() then the job is completed.
+ * job_id. This integer will be passed to signal_fn() when the
+ * job is completed.
  *
  * @param[in]  pool            The pool to run the job on
  * @param[in]  job_id          A custom identifier
@@ -72,30 +74,4 @@ int pthreadpool_destroy(struct pthreadpool *pool);
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                        void (*fn)(void *private_data), void *private_data);
 
-/**
- * @brief Get the signalling fd from a pthreadpool
- *
- * Completion of a job is indicated by readability of the fd returned
- * by pthreadpool_signal_fd().
- *
- * @param[in]  pool            The pool in question
- * @return                     The fd to listen on for readability
- */
-int pthreadpool_signal_fd(struct pthreadpool *pool);
-
-/**
- * @brief Get the job_ids of finished jobs
- *
- * This blocks until a job has finished unless the fd returned by
- * pthreadpool_signal_fd() is readable.
- *
- * @param[in]  pool            The pool to query for finished jobs
- * @param[out]  jobids         The job_ids of the finished job
- * @param[int]  num_jobids      The job_ids array size
- * @return                     success: >=0, number of finished jobs
- *                              failure: -errno
- */
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
-                             unsigned num_jobids);
-
 #endif
index 76bafa2c3ffb94d843a8ba443a75498a4353e3b2..3eaf5e39bd9d544294040d4a456032fb79130377 100644 (file)
 
 struct pthreadpool_pipe {
        struct pthreadpool *pool;
+       pid_t pid;
+       int pipe_fds[2];
 };
 
+static int pthreadpool_pipe_signal(int jobid, void *private_data);
+
 int pthreadpool_pipe_init(unsigned max_threads,
                          struct pthreadpool_pipe **presult)
 {
-       struct pthreadpool_pipe *p;
+       struct pthreadpool_pipe *pool;
        int ret;
 
-       p = malloc(sizeof(struct pthreadpool_pipe));
-       if (p == NULL) {
+       pool = malloc(sizeof(struct pthreadpool_pipe));
+       if (pool == NULL) {
                return ENOMEM;
        }
+       pool->pid = getpid();
+
+       ret = pipe(pool->pipe_fds);
+       if (ret == -1) {
+               int err = errno;
+               free(pool);
+               return err;
+       }
 
-       ret = pthreadpool_init(max_threads, &p->pool);
+       ret = pthreadpool_init(max_threads, &pool->pool,
+                              pthreadpool_pipe_signal, pool);
        if (ret != 0) {
-               free(p);
+               close(pool->pipe_fds[0]);
+               close(pool->pipe_fds[1]);
+               free(pool);
                return ret;
        }
 
-       *presult = p;
+       *presult = pool;
+       return 0;
+}
+
+static int pthreadpool_pipe_signal(int jobid, void *private_data)
+{
+       struct pthreadpool_pipe *pool = private_data;
+       ssize_t written;
+
+       do {
+               written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
+       } while ((written == -1) && (errno == EINTR));
+
+       if (written != sizeof(jobid)) {
+               return errno;
+       }
+
        return 0;
 }
 
@@ -55,30 +86,91 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
        if (ret != 0) {
                return ret;
        }
+
+       close(pool->pipe_fds[0]);
+       pool->pipe_fds[0] = -1;
+
+       close(pool->pipe_fds[1]);
+       pool->pipe_fds[1] = -1;
+
        free(pool);
        return 0;
 }
 
+static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
+{
+       pid_t pid = getpid();
+       int signal_fd;
+       int ret;
+
+       if (pid == pool->pid) {
+               return 0;
+       }
+
+       signal_fd = pool->pipe_fds[0];
+
+       close(pool->pipe_fds[0]);
+       pool->pipe_fds[0] = -1;
+
+       close(pool->pipe_fds[1]);
+       pool->pipe_fds[1] = -1;
+
+       ret = pipe(pool->pipe_fds);
+       if (ret != 0) {
+               return errno;
+       }
+
+       ret = dup2(pool->pipe_fds[0], signal_fd);
+       if (ret != 0) {
+               return errno;
+       }
+
+       pool->pipe_fds[0] = signal_fd;
+
+       return 0;
+}
+
 int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
                             void (*fn)(void *private_data),
                             void *private_data)
 {
        int ret;
+
+       ret = pthreadpool_pipe_reinit(pool);
+       if (ret != 0) {
+               return ret;
+       }
+
        ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
        return ret;
 }
 
 int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
 {
-       int fd;
-       fd = pthreadpool_signal_fd(pool->pool);
-       return fd;
+       return pool->pipe_fds[0];
 }
 
 int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
                                   unsigned num_jobids)
 {
-       int ret;
-       ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
-       return ret;
+       ssize_t to_read, nread;
+       pid_t pid = getpid();
+
+       if (pool->pid != pid) {
+               return EINVAL;
+       }
+
+       to_read = sizeof(int) * num_jobids;
+
+       do {
+               nread = read(pool->pipe_fds[0], jobids, to_read);
+       } while ((nread == -1) && (errno == EINTR));
+
+       if (nread == -1) {
+               return -errno;
+       }
+       if ((nread % sizeof(int)) != 0) {
+               return -EINVAL;
+       }
+       return nread / sizeof(int);
 }
index 5f06cae2f8c26d29782191c3235e918f302fba7d..3e78f467179b1847e3748bf61a315669e60b548e 100644 (file)
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
-#include <errno.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <string.h>
-#include <signal.h>
-#include <assert.h>
-#include <fcntl.h>
-#include <sys/time.h>
 
+#include "replace.h"
 #include "pthreadpool.h"
 
 struct pthreadpool {
        /*
-        * pipe for signalling
+        * Indicate job completion
         */
-       int sig_pipe[2];
-
-       /*
-        * Have we sent something into the pipe that has not been
-        * retrieved yet?
-        */
-       int pipe_busy;
-
-       /*
-        * Jobids that we have not sent into the pipe yet
-        */
-       size_t num_ids;
-       int *ids;
+       int (*signal_fn)(int jobid,
+                        void *private_data);
+       void *signal_private_data;
 };
 
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+                    int (*signal_fn)(int jobid,
+                                     void *private_data),
+                    void *signal_private_data)
 {
        struct pthreadpool *pool;
-       int ret;
 
        pool = (struct pthreadpool *)calloc(1, sizeof(struct pthreadpool));
        if (pool == NULL) {
                return ENOMEM;
        }
-       ret = pipe(pool->sig_pipe);
-       if (ret == -1) {
-               int err = errno;
-               free(pool);
-               return err;
-       }
-       *presult = pool;
-       return 0;
-}
-
-int pthreadpool_signal_fd(struct pthreadpool *pool)
-{
-       return pool->sig_pipe[0];
-}
-
-static int pthreadpool_write_to_pipe(struct pthreadpool *pool)
-{
-       ssize_t written;
-
-       if (pool->pipe_busy) {
-               return 0;
-       }
-       if (pool->num_ids == 0) {
-               return 0;
-       }
+       pool->signal_fn = signal_fn;
+       pool->signal_private_data = signal_private_data;
 
-       written = -1;
-       errno = EINTR;
-
-       while ((written == -1) && (errno == EINTR)) {
-               written = write(pool->sig_pipe[1], &pool->ids[0], sizeof(int));
-       }
-       if (written == -1) {
-               return errno;
-       }
-       if (written != sizeof(int)) {
-               /*
-                * If a single int only partially fits into the pipe,
-                * we can assume ourselves pretty broken
-                */
-               close(pool->sig_pipe[1]);
-               pool->sig_pipe[1] = -1;
-               return EIO;
-       }
-
-       if (pool->num_ids > 1) {
-               memmove(pool->ids, pool->ids+1, sizeof(int) * (pool->num_ids-1));
-       }
-       pool->num_ids -= 1;
-       pool->pipe_busy = 1;
+       *presult = pool;
        return 0;
 }
 
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                        void (*fn)(void *private_data), void *private_data)
 {
-       int *tmp;
-
-       if (pool->sig_pipe[1] == -1) {
-               return EIO;
-       }
-
        fn(private_data);
 
-       tmp = realloc(pool->ids, sizeof(int) * (pool->num_ids+1));
-       if (tmp == NULL) {
-               return ENOMEM;
-       }
-       pool->ids = tmp;
-       pool->ids[pool->num_ids] = job_id;
-       pool->num_ids += 1;
-
-       return pthreadpool_write_to_pipe(pool);
-
-}
-
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
-                             unsigned num_jobids)
-{
-       ssize_t to_read, nread;
-       int ret;
-
-       nread = -1;
-       errno = EINTR;
-
-       to_read = sizeof(int) * num_jobids;
-
-       while ((nread == -1) && (errno == EINTR)) {
-               nread = read(pool->sig_pipe[0], jobids, to_read);
-       }
-       if (nread == -1) {
-               return -errno;
-       }
-       if ((nread % sizeof(int)) != 0) {
-               return -EINVAL;
-       }
-
-       pool->pipe_busy = 0;
-
-       ret = pthreadpool_write_to_pipe(pool);
-       if (ret != 0) {
-               return -ret;
-       }
-
-       return nread / sizeof(int);
+       return pool->signal_fn(job_id, pool->signal_private_data);
 }
 
 int pthreadpool_destroy(struct pthreadpool *pool)
 {
-       if (pool->sig_pipe[0] != -1) {
-               close(pool->sig_pipe[0]);
-               pool->sig_pipe[0] = -1;
-       }
-
-       if (pool->sig_pipe[1] != -1) {
-               close(pool->sig_pipe[1]);
-               pool->sig_pipe[1] = -1;
-       }
-       free(pool->ids);
        free(pool);
        return 0;
 }