s3: Allow unlimited parallelism in pthreadpool
authorVolker Lendecke <vl@samba.org>
Mon, 25 Apr 2011 18:05:31 +0000 (20:05 +0200)
committerVolker Lendecke <vl@samba.org>
Tue, 26 Apr 2011 10:41:56 +0000 (12:41 +0200)
source3/lib/pthreadpool/pthreadpool.c
source3/lib/pthreadpool/pthreadpool.h

index 2a75a5255778a3ef66c529cadf539b50a5bc856e..3cf6cb7045835b78c027c729b621e0e00ce2b1a7 100644 (file)
@@ -85,11 +85,10 @@ struct pthreadpool {
        int num_idle;
 
        /*
-        * An array of threads that require joining, the array has
-        * "max_threads" elements. It contains "num_exited" ids.
+        * An array of threads that require joining.
         */
        int                     num_exited;
-       pthread_t               exited[1]; /* We alloc more */
+       pthread_t               *exited; /* We alloc more */
 };
 
 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -105,13 +104,9 @@ static void pthreadpool_prep_atfork(void);
 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 {
        struct pthreadpool *pool;
-       size_t size;
        int ret;
 
-       size = sizeof(struct pthreadpool)
-               + (max_threads-1) * sizeof(pthread_t);
-
-       pool = (struct pthreadpool *)malloc(size);
+       pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
        if (pool == NULL) {
                return ENOMEM;
        }
@@ -140,6 +135,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
        pool->jobs = pool->last_job = NULL;
        pool->num_threads = 0;
        pool->num_exited = 0;
+       pool->exited = NULL;
        pool->max_threads = max_threads;
        pool->num_idle = 0;
 
@@ -215,7 +211,11 @@ static void pthreadpool_child(void)
                assert(ret == 0);
 
                pool->num_threads = 0;
+
                pool->num_exited = 0;
+               free(pool->exited);
+               pool->exited = NULL;
+
                pool->num_idle = 0;
 
                while (pool->jobs != NULL) {
@@ -267,6 +267,11 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
                pthread_join(pool->exited[i], NULL);
        }
        pool->num_exited = 0;
+
+       /*
+        * Deliberately not free and NULL pool->exited. That will be
+        * re-used by realloc later.
+        */
 }
 
 /*
@@ -377,6 +382,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
        close(pool->sig_pipe[1]);
        pool->sig_pipe[1] = -1;
 
+       free(pool->exited);
        free(pool);
 
        return 0;
@@ -387,7 +393,19 @@ int pthreadpool_destroy(struct pthreadpool *pool)
  */
 static void pthreadpool_server_exit(struct pthreadpool *pool)
 {
+       pthread_t *exited;
+
        pool->num_threads -= 1;
+
+       exited = (pthread_t *)realloc(
+               pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
+
+       if (exited == NULL) {
+               /* lost a thread status */
+               return;
+       }
+       pool->exited = exited;
+
        pool->exited[pool->num_exited] = pthread_self();
        pool->num_exited += 1;
 }
@@ -559,7 +577,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                return res;
        }
 
-       if (pool->num_threads >= pool->max_threads) {
+       if ((pool->max_threads != 0) &&
+           (pool->num_threads >= pool->max_threads)) {
                /*
                 * No more new threads, we just queue the request
                 */
index 255393377fd8491a7204ab42084cce3e617431a1..79704ea385366f108158566f1a58557888ae6fcf 100644 (file)
@@ -39,6 +39,9 @@ struct pthreadpool;
  * @param[in]  max_threads     Maximum parallelism in this pool
  * @param[out] presult         Pointer to the threadpool returned
  * @return                     success: 0, failure: errno
+ *
+ * max_threads=0 means unlimited parallelism. The caller has to take
+ * care to not overload the system.
  */
 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult);