pthreadpool: Avoid a malloc/free per job
authorVolker Lendecke <vl@samba.org>
Fri, 21 Mar 2014 16:53:26 +0000 (17:53 +0100)
committerJeremy Allison <jra@samba.org>
Thu, 27 Mar 2014 05:06:11 +0000 (06:06 +0100)
pthreadpool_add_job is in our hottest code path for r/w intensive workloads, so
we should avoid anything CPU-intensive. pthreadpool used to malloc each job and
free it in the worker thread. This patch adds a FIFO queue for jobs that helper
threads copy from, avoiding constant malloc/free. This cuts user space
CPU in the local-bench-pthreadpool benchmark by roughly 10% on my system.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/pthreadpool/pthreadpool.c

index 654d420732f41c61b5356a7351aae9de8ea3e281..d51e8083601a6ebd61e5f8c0826279fabe1f601f 100644 (file)
@@ -34,7 +34,6 @@
 #include "lib/util/dlinklist.h"
 
 struct pthreadpool_job {
-       struct pthreadpool_job *next;
        int id;
        void (*fn)(void *private_data);
        void *private_data;
@@ -57,9 +56,13 @@ struct pthreadpool {
        pthread_cond_t condvar;
 
        /*
-        * List of work jobs
+        * Array of jobs
         */
-       struct pthreadpool_job *jobs, *last_job;
+       size_t jobs_array_len;
+       struct pthreadpool_job *jobs;
+
+       size_t head;
+       size_t num_jobs;
 
        /*
         * pipe for signalling
@@ -113,9 +116,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
                return ENOMEM;
        }
 
+       pool->jobs_array_len = 4;
+       pool->jobs = calloc(
+               pool->jobs_array_len, sizeof(struct pthreadpool_job));
+
+       if (pool->jobs == NULL) {
+               free(pool);
+               return ENOMEM;
+       }
+
+       pool->head = pool->num_jobs = 0;
+
        ret = pipe(pool->sig_pipe);
        if (ret == -1) {
                int err = errno;
+               free(pool->jobs);
                free(pool);
                return err;
        }
@@ -124,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
        if (ret != 0) {
                close(pool->sig_pipe[0]);
                close(pool->sig_pipe[1]);
+               free(pool->jobs);
                free(pool);
                return ret;
        }
@@ -133,12 +149,12 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
                pthread_mutex_destroy(&pool->mutex);
                close(pool->sig_pipe[0]);
                close(pool->sig_pipe[1]);
+               free(pool->jobs);
                free(pool);
                return ret;
        }
 
        pool->shutdown = 0;
-       pool->jobs = pool->last_job = NULL;
        pool->num_threads = 0;
        pool->num_exited = 0;
        pool->exited = NULL;
@@ -151,6 +167,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
                pthread_mutex_destroy(&pool->mutex);
                close(pool->sig_pipe[0]);
                close(pool->sig_pipe[1]);
+               free(pool->jobs);
                free(pool);
                return ret;
        }
@@ -221,14 +238,8 @@ static void pthreadpool_child(void)
                pool->exited = NULL;
 
                pool->num_idle = 0;
-
-               while (pool->jobs != NULL) {
-                       struct pthreadpool_job *job;
-                       job = pool->jobs;
-                       pool->jobs = job->next;
-                       free(job);
-               }
-               pool->last_job = NULL;
+               pool->head = 0;
+               pool->num_jobs = 0;
 
                ret = pthread_mutex_unlock(&pool->mutex);
                assert(ret == 0);
@@ -311,7 +322,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
                return ret;
        }
 
-       if ((pool->jobs != NULL) || pool->shutdown) {
+       if ((pool->num_jobs != 0) || pool->shutdown) {
                ret = pthread_mutex_unlock(&pool->mutex);
                assert(ret == 0);
                return EBUSY;
@@ -383,6 +394,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
        pool->sig_pipe[1] = -1;
 
        free(pool->exited);
+       free(pool->jobs);
        free(pool);
 
        return 0;
@@ -410,6 +422,61 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
        pool->num_exited += 1;
 }
 
+static bool pthreadpool_get_job(struct pthreadpool *p,
+                               struct pthreadpool_job *job)
+{
+       if (p->num_jobs == 0) {
+               return false;
+       }
+       *job = p->jobs[p->head];
+       p->head = (p->head+1) % p->jobs_array_len;
+       p->num_jobs -= 1;
+       return true;
+}
+
+static bool pthreadpool_put_job(struct pthreadpool *p,
+                               int id,
+                               void (*fn)(void *private_data),
+                               void *private_data)
+{
+       struct pthreadpool_job *job;
+
+       if (p->num_jobs == p->jobs_array_len) {
+               struct pthreadpool_job *tmp;
+               size_t new_len = p->jobs_array_len * 2;
+
+               tmp = realloc(
+                       p->jobs, sizeof(struct pthreadpool_job) * new_len);
+               if (tmp == NULL) {
+                       return false;
+               }
+               p->jobs = tmp;
+
+               /*
+                * We just doubled the jobs array. The array implements a FIFO
+                * queue with a modulo-based wraparound, so we have to memcpy
+                * the jobs that are logically at the queue end but physically
+                * before the queue head into the reallocated area. The new
+                * space starts at the current jobs_array_len, and we have to
+                * copy everything before the current head job into the new
+                * area.
+                */
+               memcpy(&p->jobs[p->jobs_array_len], p->jobs,
+                      sizeof(struct pthreadpool_job) * p->head);
+
+               p->jobs_array_len = new_len;
+       }
+
+       job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
+       job->id = id;
+       job->fn = fn;
+       job->private_data = private_data;
+
+       p->num_jobs += 1;
+
+       return true;
+}
+
 static void *pthreadpool_server(void *arg)
 {
        struct pthreadpool *pool = (struct pthreadpool *)arg;
@@ -422,7 +489,7 @@ static void *pthreadpool_server(void *arg)
 
        while (1) {
                struct timespec ts;
-               struct pthreadpool_job *job;
+               struct pthreadpool_job job;
 
                /*
                 * idle-wait at most 1 second. If nothing happens in that
@@ -432,7 +499,7 @@ static void *pthreadpool_server(void *arg)
                clock_gettime(CLOCK_REALTIME, &ts);
                ts.tv_sec += 1;
 
-               while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
+               while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
 
                        pool->num_idle += 1;
                        res = pthread_cond_timedwait(
@@ -441,7 +508,7 @@ static void *pthreadpool_server(void *arg)
 
                        if (res == ETIMEDOUT) {
 
-                               if (pool->jobs == NULL) {
+                               if (pool->num_jobs == 0) {
                                        /*
                                         * we timed out and still no work for
                                         * us. Exit.
@@ -456,19 +523,9 @@ static void *pthreadpool_server(void *arg)
                        assert(res == 0);
                }
 
-               job = pool->jobs;
-
-               if (job != NULL) {
+               if (pthreadpool_get_job(pool, &job)) {
                        ssize_t written;
-
-                       /*
-                        * Ok, there's work for us to do, remove the job from
-                        * the pthreadpool list
-                        */
-                       pool->jobs = job->next;
-                       if (pool->last_job == job) {
-                               pool->last_job = NULL;
-                       }
+                       int sig_pipe = pool->sig_pipe[1];
 
                        /*
                         * Do the work with the mutex unlocked
@@ -477,12 +534,8 @@ static void *pthreadpool_server(void *arg)
                        res = pthread_mutex_unlock(&pool->mutex);
                        assert(res == 0);
 
-                       job->fn(job->private_data);
-
-                       written = write(pool->sig_pipe[1], &job->id,
-                                       sizeof(int));
-
-                       free(job);
+                       job.fn(job.private_data);
+                       written = write(sig_pipe, &job.id, sizeof(job.id));
 
                        res = pthread_mutex_lock(&pool->mutex);
                        assert(res == 0);
@@ -494,7 +547,7 @@ static void *pthreadpool_server(void *arg)
                        }
                }
 
-               if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
+               if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
                        /*
                         * No more work to do and we're asked to shut down, so
                         * exit
@@ -518,24 +571,12 @@ static void *pthreadpool_server(void *arg)
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                        void (*fn)(void *private_data), void *private_data)
 {
-       struct pthreadpool_job *job;
        pthread_t thread_id;
        int res;
        sigset_t mask, omask;
 
-       job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
-       if (job == NULL) {
-               return ENOMEM;
-       }
-
-       job->fn = fn;
-       job->private_data = private_data;
-       job->id = job_id;
-       job->next = NULL;
-
        res = pthread_mutex_lock(&pool->mutex);
        if (res != 0) {
-               free(job);
                return res;
        }
 
@@ -546,7 +587,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                 */
                res = pthread_mutex_unlock(&pool->mutex);
                assert(res == 0);
-               free(job);
                return EINVAL;
        }
 
@@ -558,13 +598,10 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
        /*
         * Add job to the end of the queue
         */
-       if (pool->jobs == NULL) {
-               pool->jobs = job;
-       }
-       else {
-               pool->last_job->next = job;
+       if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
+               pthread_mutex_unlock(&pool->mutex);
+               return ENOMEM;
        }
-       pool->last_job = job;
 
        if (pool->num_idle > 0) {
                /*