lib: Use pthreadpool_pipe instead of pthreadpool
authorVolker Lendecke <vl@samba.org>
Mon, 15 Aug 2016 11:57:20 +0000 (13:57 +0200)
committerJeremy Allison <jra@samba.org>
Tue, 23 Aug 2016 23:33:48 +0000 (01:33 +0200)
Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/asys/asys.c
source3/lib/fncall.c
source3/lib/pthreadpool/tests.c
source3/lib/unix_msg/unix_msg.c
source3/modules/vfs_aio_pthread.c
source3/torture/bench_pthreadpool.c

index 670be01192db38ed4f644c438a69ec28e9fbcbce..594d47004baf8174d413e1edc57b01fe25fbc133 100644 (file)
@@ -19,7 +19,7 @@
 #include "asys.h"
 #include <stdlib.h>
 #include <errno.h>
-#include "../pthreadpool/pthreadpool.h"
+#include "../pthreadpool/pthreadpool_pipe.h"
 #include "lib/util/time.h"
 #include "smbprofile.h"
 
@@ -59,8 +59,8 @@ struct asys_job {
 };
 
 struct asys_context {
-       struct pthreadpool *pool;
-       int pthreadpool_fd;
+       struct pthreadpool_pipe *pool;
+       int pthreadpool_pipe_fd;
 
        unsigned num_jobs;
        struct asys_job **jobs;
@@ -79,12 +79,12 @@ int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
        if (ctx == NULL) {
                return ENOMEM;
        }
-       ret = pthreadpool_init(max_parallel, &ctx->pool);
+       ret = pthreadpool_pipe_init(max_parallel, &ctx->pool);
        if (ret != 0) {
                free(ctx);
                return ret;
        }
-       ctx->pthreadpool_fd = pthreadpool_signal_fd(ctx->pool);
+       ctx->pthreadpool_pipe_fd = pthreadpool_pipe_signal_fd(ctx->pool);
 
        *pctx = ctx;
        return 0;
@@ -92,7 +92,7 @@ int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
 
 int asys_signalfd(struct asys_context *ctx)
 {
-       return ctx->pthreadpool_fd;
+       return ctx->pthreadpool_pipe_fd;
 }
 
 int asys_context_destroy(struct asys_context *ctx)
@@ -106,7 +106,7 @@ int asys_context_destroy(struct asys_context *ctx)
                }
        }
 
-       ret = pthreadpool_destroy(ctx->pool);
+       ret = pthreadpool_pipe_destroy(ctx->pool);
        if (ret != 0) {
                return ret;
        }
@@ -179,7 +179,7 @@ int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
        args->nbyte = nbyte;
        args->offset = offset;
 
-       ret = pthreadpool_add_job(ctx->pool, jobid, asys_pwrite_do, job);
+       ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pwrite_do, job);
        if (ret != 0) {
                return ret;
        }
@@ -224,7 +224,7 @@ int asys_pread(struct asys_context *ctx, int fildes, void *buf,
        args->nbyte = nbyte;
        args->offset = offset;
 
-       ret = pthreadpool_add_job(ctx->pool, jobid, asys_pread_do, job);
+       ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pread_do, job);
        if (ret != 0) {
                return ret;
        }
@@ -265,7 +265,7 @@ int asys_fsync(struct asys_context *ctx, int fildes, void *private_data)
        args = &job->args.fsync_args;
        args->fildes = fildes;
 
-       ret = pthreadpool_add_job(ctx->pool, jobid, asys_fsync_do, job);
+       ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_fsync_do, job);
        if (ret != 0) {
                return ret;
        }
@@ -307,7 +307,7 @@ int asys_results(struct asys_context *ctx, struct asys_result *results,
        int jobids[num_results];
        int i, ret;
 
-       ret = pthreadpool_finished_jobs(ctx->pool, jobids, num_results);
+       ret = pthreadpool_pipe_finished_jobs(ctx->pool, jobids, num_results);
        if (ret <= 0) {
                return ret;
        }
index 88304d6961cb9e931479ce3eb4c7a4d4df71ddce..0923c148dba45e4c95bcbb216d0f7be47c2fa245 100644 (file)
@@ -20,7 +20,7 @@
 #include "includes.h"
 #include "../lib/util/tevent_unix.h"
 
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
 
 struct fncall_state {
        struct fncall_context *ctx;
@@ -32,7 +32,7 @@ struct fncall_state {
 };
 
 struct fncall_context {
-       struct pthreadpool *pool;
+       struct pthreadpool_pipe *pool;
        int next_job_id;
        int sig_fd;
        struct tevent_req **pending;
@@ -61,7 +61,7 @@ static int fncall_context_destructor(struct fncall_context *ctx)
                fncall_handler(NULL, NULL, TEVENT_FD_READ, ctx);
        }
 
-       pthreadpool_destroy(ctx->pool);
+       pthreadpool_pipe_destroy(ctx->pool);
        ctx->pool = NULL;
 
        return 0;
@@ -78,14 +78,14 @@ struct fncall_context *fncall_context_init(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
-       ret = pthreadpool_init(max_threads, &ctx->pool);
+       ret = pthreadpool_pipe_init(max_threads, &ctx->pool);
        if (ret != 0) {
                TALLOC_FREE(ctx);
                return NULL;
        }
        talloc_set_destructor(ctx, fncall_context_destructor);
 
-       ctx->sig_fd = pthreadpool_signal_fd(ctx->pool);
+       ctx->sig_fd = pthreadpool_pipe_signal_fd(ctx->pool);
        if (ctx->sig_fd == -1) {
                TALLOC_FREE(ctx);
                return NULL;
@@ -266,8 +266,8 @@ struct tevent_req *fncall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
        state->private_parent = talloc_parent(private_data);
        state->job_private = talloc_move(state, &private_data);
 
-       ret = pthreadpool_add_job(state->ctx->pool, state->job_id, fn,
-                                 state->job_private);
+       ret = pthreadpool_pipe_add_job(state->ctx->pool, state->job_id, fn,
+                                      state->job_private);
        if (ret == -1) {
                tevent_req_error(req, errno);
                return tevent_req_post(req, ev);
@@ -287,7 +287,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde,
        int i, num_pending;
        int job_id;
 
-       if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
+       if (pthreadpool_pipe_finished_jobs(ctx->pool, &job_id, 1) < 0) {
                return;
        }
 
index 847471297fabfd65714d06e10ad246ad92a00bbc..0b48b41a7f0d0425518ba267b47fd2992b840122 100644 (file)
@@ -7,22 +7,22 @@
 #include <unistd.h>
 #include <sys/types.h>
 #include <sys/wait.h>
-#include "pthreadpool.h"
+#include "pthreadpool_pipe.h"
 
 static int test_init(void)
 {
-       struct pthreadpool *p;
+       struct pthreadpool_pipe *p;
        int ret;
 
-       ret = pthreadpool_init(1, &p);
+       ret = pthreadpool_pipe_init(1, &p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_init failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
                        strerror(ret));
                return -1;
        }
-       ret = pthreadpool_destroy(p);
+       ret = pthreadpool_pipe_destroy(p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_init failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
                        strerror(ret));
                return -1;
        }
@@ -43,7 +43,7 @@ static void test_sleep(void *ptr)
 static int test_jobs(int num_threads, int num_jobs)
 {
        char *finished;
-       struct pthreadpool *p;
+       struct pthreadpool_pipe *p;
        int timeout = 1;
        int i, ret;
 
@@ -53,25 +53,25 @@ static int test_jobs(int num_threads, int num_jobs)
                return -1;
        }
 
-       ret = pthreadpool_init(num_threads, &p);
+       ret = pthreadpool_pipe_init(num_threads, &p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_init failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
                        strerror(ret));
                return -1;
        }
 
        for (i=0; i<num_jobs; i++) {
-               ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
+               ret = pthreadpool_pipe_add_job(p, i, test_sleep, &timeout);
                if (ret != 0) {
-                       fprintf(stderr, "pthreadpool_add_job failed: %s\n",
-                               strerror(ret));
+                       fprintf(stderr, "pthreadpool_pipe_add_job failed: "
+                               "%s\n", strerror(ret));
                        return -1;
                }
        }
 
        for (i=0; i<num_jobs; i++) {
                int jobid = -1;
-               ret = pthreadpool_finished_jobs(p, &jobid, 1);
+               ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
                if ((ret != 1) || (jobid >= num_jobs)) {
                        fprintf(stderr, "invalid job number %d\n", jobid);
                        return -1;
@@ -87,9 +87,9 @@ static int test_jobs(int num_threads, int num_jobs)
                }
        }
 
-       ret = pthreadpool_destroy(p);
+       ret = pthreadpool_pipe_destroy(p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
                        strerror(ret));
                return -1;
        }
@@ -100,37 +100,37 @@ static int test_jobs(int num_threads, int num_jobs)
 
 static int test_busydestroy(void)
 {
-       struct pthreadpool *p;
+       struct pthreadpool_pipe *p;
        int timeout = 50;
        struct pollfd pfd;
        int ret;
 
-       ret = pthreadpool_init(1, &p);
+       ret = pthreadpool_pipe_init(1, &p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_init failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
                        strerror(ret));
                return -1;
        }
-       ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
+       ret = pthreadpool_pipe_add_job(p, 1, test_sleep, &timeout);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_add_job failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_add_job failed: %s\n",
                        strerror(ret));
                return -1;
        }
-       ret = pthreadpool_destroy(p);
+       ret = pthreadpool_pipe_destroy(p);
        if (ret != EBUSY) {
                fprintf(stderr, "Could destroy a busy pool\n");
                return -1;
        }
 
-       pfd.fd = pthreadpool_signal_fd(p);
+       pfd.fd = pthreadpool_pipe_signal_fd(p);
        pfd.events = POLLIN|POLLERR;
 
        poll(&pfd, 1, -1);
 
-       ret = pthreadpool_destroy(p);
+       ret = pthreadpool_pipe_destroy(p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
                        strerror(ret));
                return -1;
        }
@@ -139,7 +139,7 @@ static int test_busydestroy(void)
 
 struct threaded_state {
        pthread_t tid;
-       struct pthreadpool *p;
+       struct pthreadpool_pipe *p;
        int start_job;
        int num_jobs;
        int timeout;
@@ -151,11 +151,12 @@ static void *test_threaded_worker(void *p)
        int i;
 
        for (i=0; i<state->num_jobs; i++) {
-               int ret = pthreadpool_add_job(state->p, state->start_job + i,
-                                             test_sleep, &state->timeout);
+               int ret = pthreadpool_pipe_add_job(
+                       state->p, state->start_job + i,
+                       test_sleep, &state->timeout);
                if (ret != 0) {
-                       fprintf(stderr, "pthreadpool_add_job failed: %s\n",
-                               strerror(ret));
+                       fprintf(stderr, "pthreadpool_pipe_add_job failed: "
+                               "%s\n", strerror(ret));
                        return NULL;
                }
        }
@@ -165,7 +166,7 @@ static void *test_threaded_worker(void *p)
 static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
                                int num_jobs)
 {
-       struct pthreadpool **pools;
+       struct pthreadpool_pipe **pools;
        struct threaded_state *states;
        struct threaded_state *state;
        struct pollfd *pfds;
@@ -186,7 +187,7 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
                return -1;
        }
 
-       pools = calloc(num_pools, sizeof(struct pthreadpool *));
+       pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
        if (pools == NULL) {
                fprintf(stderr, "calloc failed\n");
                return -1;
@@ -199,13 +200,13 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
        }
 
        for (i=0; i<num_pools; i++) {
-               ret = pthreadpool_init(poolsize, &pools[i]);
+               ret = pthreadpool_pipe_init(poolsize, &pools[i]);
                if (ret != 0) {
-                       fprintf(stderr, "pthreadpool_init failed: %s\n",
+                       fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
                                strerror(ret));
                        return -1;
                }
-               pfds[i].fd = pthreadpool_signal_fd(pools[i]);
+               pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
                pfds[i].events = POLLIN|POLLHUP;
        }
 
@@ -241,10 +242,10 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
        }
        if (child == 0) {
                for (i=0; i<num_pools; i++) {
-                       ret = pthreadpool_destroy(pools[i]);
+                       ret = pthreadpool_pipe_destroy(pools[i]);
                        if (ret != 0) {
-                               fprintf(stderr, "pthreadpool_destroy failed: "
-                                       "%s\n", strerror(ret));
+                               fprintf(stderr, "pthreadpool_pipe_destroy "
+                                       "failed: %s\n", strerror(ret));
                                exit(1);
                        }
                }
@@ -284,7 +285,8 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
                                continue;
                        }
 
-                       ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
+                       ret = pthreadpool_pipe_finished_jobs(
+                               pools[j], &jobid, 1);
                        if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
                                fprintf(stderr, "invalid job number %d\n",
                                        jobid);
@@ -304,10 +306,10 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
        }
 
        for (i=0; i<num_pools; i++) {
-               ret = pthreadpool_destroy(pools[i]);
+               ret = pthreadpool_pipe_destroy(pools[i]);
                if (ret != 0) {
-                       fprintf(stderr, "pthreadpool_destroy failed: %s\n",
-                               strerror(ret));
+                       fprintf(stderr, "pthreadpool_pipe_destroy failed: "
+                               "%s\n", strerror(ret));
                        return -1;
                }
        }
@@ -322,19 +324,19 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
 
 static int test_fork(void)
 {
-       struct pthreadpool *p;
+       struct pthreadpool_pipe *p;
        pid_t child, waited;
        int status, ret;
 
-       ret = pthreadpool_init(1, &p);
+       ret = pthreadpool_pipe_init(1, &p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_init failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
                        strerror(ret));
                return -1;
        }
-       ret = pthreadpool_destroy(p);
+       ret = pthreadpool_pipe_destroy(p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
                        strerror(ret));
                return -1;
        }
index aed1f7560b59c2a46caafa01953ee8f58dfc9158..5fac68b7172d2c20c063323de36f786b006635bb 100644 (file)
@@ -22,7 +22,7 @@
 #include "system/time.h"
 #include "system/network.h"
 #include "lib/util/dlinklist.h"
-#include "pthreadpool/pthreadpool.h"
+#include "pthreadpool/pthreadpool_pipe.h"
 #include "lib/util/iov_buf.h"
 #include "lib/util/msghdr.h"
 #include <fcntl.h>
@@ -69,7 +69,7 @@ struct unix_dgram_ctx {
        struct poll_watch *sock_read_watch;
        struct unix_dgram_send_queue *send_queues;
 
-       struct pthreadpool *send_pool;
+       struct pthreadpool_pipe *send_pool;
        struct poll_watch *pool_read_watch;
 
        uint8_t *recv_buf;
@@ -341,18 +341,18 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
                return 0;
        }
 
-       ret = pthreadpool_init(0, &ctx->send_pool);
+       ret = pthreadpool_pipe_init(0, &ctx->send_pool);
        if (ret != 0) {
                return ret;
        }
 
-       signalfd = pthreadpool_signal_fd(ctx->send_pool);
+       signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
 
        ctx->pool_read_watch = ctx->ev_funcs->watch_new(
                ctx->ev_funcs, signalfd, POLLIN,
                unix_dgram_job_finished, ctx);
        if (ctx->pool_read_watch == NULL) {
-               pthreadpool_destroy(ctx->send_pool);
+               pthreadpool_pipe_destroy(ctx->send_pool);
                ctx->send_pool = NULL;
                return ENOMEM;
        }
@@ -525,7 +525,7 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
        struct unix_dgram_msg *msg;
        int ret, job;
 
-       ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+       ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
        if (ret != 1) {
                return;
        }
@@ -547,8 +547,8 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
        free(msg);
 
        if (q->msgs != NULL) {
-               ret = pthreadpool_add_job(ctx->send_pool, q->sock,
-                                         unix_dgram_send_job, q->msgs);
+               ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+                                              unix_dgram_send_job, q->msgs);
                if (ret == 0) {
                        return;
                }
@@ -654,8 +654,8 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                unix_dgram_send_queue_free(q);
                return ret;
        }
-       ret = pthreadpool_add_job(ctx->send_pool, q->sock,
-                                 unix_dgram_send_job, q->msgs);
+       ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+                                      unix_dgram_send_job, q->msgs);
        if (ret != 0) {
                unix_dgram_send_queue_free(q);
                return ret;
@@ -675,7 +675,7 @@ static int unix_dgram_free(struct unix_dgram_ctx *ctx)
        }
 
        if (ctx->send_pool != NULL) {
-               int ret = pthreadpool_destroy(ctx->send_pool);
+               int ret = pthreadpool_pipe_destroy(ctx->send_pool);
                if (ret != 0) {
                        return ret;
                }
index 7037b633e5e19e653c80f93bc0972cbe37d5cf76..6edf250ebdc7561349788b0e6873d677b838ede5 100644 (file)
@@ -26,7 +26,7 @@
 #include "system/shmem.h"
 #include "smbd/smbd.h"
 #include "smbd/globals.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
 #ifdef HAVE_LINUX_FALLOC_H
 #include <linux/falloc.h>
 #endif
@@ -38,7 +38,7 @@
 ***********************************************************************/
 
 static bool init_aio_threadpool(struct tevent_context *ev_ctx,
-                               struct pthreadpool **pp_pool,
+                               struct pthreadpool_pipe **pp_pool,
                                void (*completion_fn)(struct tevent_context *,
                                                struct tevent_fd *,
                                                uint16_t,
@@ -51,19 +51,19 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
                return true;
        }
 
-       ret = pthreadpool_init(lp_aio_max_threads(), pp_pool);
+       ret = pthreadpool_pipe_init(lp_aio_max_threads(), pp_pool);
        if (ret) {
                errno = ret;
                return false;
        }
        sock_event = tevent_add_fd(ev_ctx,
                                NULL,
-                               pthreadpool_signal_fd(*pp_pool),
+                               pthreadpool_pipe_signal_fd(*pp_pool),
                                TEVENT_FD_READ,
                                completion_fn,
                                NULL);
        if (sock_event == NULL) {
-               pthreadpool_destroy(*pp_pool);
+               pthreadpool_pipe_destroy(*pp_pool);
                *pp_pool = NULL;
                return false;
        }
@@ -87,7 +87,7 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
  * process, as is the current jobid.
  */
 
-static struct pthreadpool *open_pool;
+static struct pthreadpool_pipe *open_pool;
 static int aio_pthread_open_jobid;
 
 struct aio_open_private_data {
@@ -167,7 +167,7 @@ static void aio_open_handle_completion(struct tevent_context *event_ctx,
                return;
        }
 
-       ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
+       ret = pthreadpool_pipe_finished_jobs(open_pool, &jobid, 1);
        if (ret != 1) {
                smb_panic("aio_open_handle_completion");
                /* notreached. */
@@ -368,10 +368,10 @@ static int open_async(const files_struct *fsp,
                return -1;
        }
 
-       ret = pthreadpool_add_job(open_pool,
-                               opd->jobid,
-                               aio_open_worker,
-                               (void *)opd);
+       ret = pthreadpool_pipe_add_job(open_pool,
+                                      opd->jobid,
+                                      aio_open_worker,
+                                      (void *)opd);
        if (ret) {
                errno = ret;
                return -1;
index 247063d969463aae0d346cf1061ac6f3e1803cbc..82a84cf53cd744a9ce8b75e2fbc19b542017c899 100644 (file)
@@ -19,7 +19,7 @@
  */
 
 #include "includes.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
 #include "proto.h"
 
 extern int torture_numops;
@@ -31,12 +31,12 @@ static void null_job(void *private_data)
 
 bool run_bench_pthreadpool(int dummy)
 {
-       struct pthreadpool *pool;
+       struct pthreadpool_pipe *pool;
        int i, ret;
 
-       ret = pthreadpool_init(1, &pool);
+       ret = pthreadpool_pipe_init(1, &pool);
        if (ret != 0) {
-               d_fprintf(stderr, "pthreadpool_init failed: %s\n",
+               d_fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
                          strerror(ret));
                return false;
        }
@@ -44,21 +44,21 @@ bool run_bench_pthreadpool(int dummy)
        for (i=0; i<torture_numops; i++) {
                int jobid;
 
-               ret = pthreadpool_add_job(pool, 0, null_job, NULL);
+               ret = pthreadpool_pipe_add_job(pool, 0, null_job, NULL);
                if (ret != 0) {
-                       d_fprintf(stderr, "pthreadpool_add_job failed: %s\n",
-                                 strerror(ret));
+                       d_fprintf(stderr, "pthreadpool_pipe_add_job "
+                                 "failed: %s\n", strerror(ret));
                        break;
                }
-               ret = pthreadpool_finished_jobs(pool, &jobid, 1);
+               ret = pthreadpool_pipe_finished_jobs(pool, &jobid, 1);
                if (ret < 0) {
-                       d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
-                                 strerror(-ret));
+                       d_fprintf(stderr, "pthreadpool_pipe_finished_job "
+                                 "failed: %s\n", strerror(-ret));
                        break;
                }
        }
 
-       pthreadpool_destroy(pool);
+       pthreadpool_pipe_destroy(pool);
 
        return (ret == 1);
 }