#include "asys.h"
#include <stdlib.h>
#include <errno.h>
-#include "../pthreadpool/pthreadpool.h"
+#include "../pthreadpool/pthreadpool_pipe.h"
#include "lib/util/time.h"
#include "smbprofile.h"
};
struct asys_context {
- struct pthreadpool *pool;
- int pthreadpool_fd;
+ struct pthreadpool_pipe *pool;
+ int pthreadpool_pipe_fd;
unsigned num_jobs;
struct asys_job **jobs;
if (ctx == NULL) {
return ENOMEM;
}
- ret = pthreadpool_init(max_parallel, &ctx->pool);
+ ret = pthreadpool_pipe_init(max_parallel, &ctx->pool);
if (ret != 0) {
free(ctx);
return ret;
}
- ctx->pthreadpool_fd = pthreadpool_signal_fd(ctx->pool);
+ ctx->pthreadpool_pipe_fd = pthreadpool_pipe_signal_fd(ctx->pool);
*pctx = ctx;
return 0;
int asys_signalfd(struct asys_context *ctx)
{
- return ctx->pthreadpool_fd;
+ return ctx->pthreadpool_pipe_fd;
}
int asys_context_destroy(struct asys_context *ctx)
}
}
- ret = pthreadpool_destroy(ctx->pool);
+ ret = pthreadpool_pipe_destroy(ctx->pool);
if (ret != 0) {
return ret;
}
args->nbyte = nbyte;
args->offset = offset;
- ret = pthreadpool_add_job(ctx->pool, jobid, asys_pwrite_do, job);
+ ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pwrite_do, job);
if (ret != 0) {
return ret;
}
args->nbyte = nbyte;
args->offset = offset;
- ret = pthreadpool_add_job(ctx->pool, jobid, asys_pread_do, job);
+ ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pread_do, job);
if (ret != 0) {
return ret;
}
args = &job->args.fsync_args;
args->fildes = fildes;
- ret = pthreadpool_add_job(ctx->pool, jobid, asys_fsync_do, job);
+ ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_fsync_do, job);
if (ret != 0) {
return ret;
}
int jobids[num_results];
int i, ret;
- ret = pthreadpool_finished_jobs(ctx->pool, jobids, num_results);
+ ret = pthreadpool_pipe_finished_jobs(ctx->pool, jobids, num_results);
if (ret <= 0) {
return ret;
}
#include "includes.h"
#include "../lib/util/tevent_unix.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
struct fncall_state {
struct fncall_context *ctx;
};
struct fncall_context {
- struct pthreadpool *pool;
+ struct pthreadpool_pipe *pool;
int next_job_id;
int sig_fd;
struct tevent_req **pending;
fncall_handler(NULL, NULL, TEVENT_FD_READ, ctx);
}
- pthreadpool_destroy(ctx->pool);
+ pthreadpool_pipe_destroy(ctx->pool);
ctx->pool = NULL;
return 0;
return NULL;
}
- ret = pthreadpool_init(max_threads, &ctx->pool);
+ ret = pthreadpool_pipe_init(max_threads, &ctx->pool);
if (ret != 0) {
TALLOC_FREE(ctx);
return NULL;
}
talloc_set_destructor(ctx, fncall_context_destructor);
- ctx->sig_fd = pthreadpool_signal_fd(ctx->pool);
+ ctx->sig_fd = pthreadpool_pipe_signal_fd(ctx->pool);
if (ctx->sig_fd == -1) {
TALLOC_FREE(ctx);
return NULL;
state->private_parent = talloc_parent(private_data);
state->job_private = talloc_move(state, &private_data);
- ret = pthreadpool_add_job(state->ctx->pool, state->job_id, fn,
- state->job_private);
+ ret = pthreadpool_pipe_add_job(state->ctx->pool, state->job_id, fn,
+ state->job_private);
if (ret == -1) {
tevent_req_error(req, errno);
return tevent_req_post(req, ev);
int i, num_pending;
int job_id;
- if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
+ if (pthreadpool_pipe_finished_jobs(ctx->pool, &job_id, 1) < 0) {
return;
}
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
-#include "pthreadpool.h"
+#include "pthreadpool_pipe.h"
static int test_init(void)
{
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int ret;
- ret = pthreadpool_init(1, &p);
+ ret = pthreadpool_pipe_init(1, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
static int test_jobs(int num_threads, int num_jobs)
{
char *finished;
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int timeout = 1;
int i, ret;
return -1;
}
- ret = pthreadpool_init(num_threads, &p);
+ ret = pthreadpool_pipe_init(num_threads, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
for (i=0; i<num_jobs; i++) {
- ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
+ ret = pthreadpool_pipe_add_job(p, i, test_sleep, &timeout);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_add_job failed: %s\n",
- strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_add_job failed: "
+ "%s\n", strerror(ret));
return -1;
}
}
for (i=0; i<num_jobs; i++) {
int jobid = -1;
- ret = pthreadpool_finished_jobs(p, &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
if ((ret != 1) || (jobid >= num_jobs)) {
fprintf(stderr, "invalid job number %d\n", jobid);
return -1;
}
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
strerror(ret));
return -1;
}
static int test_busydestroy(void)
{
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int timeout = 50;
struct pollfd pfd;
int ret;
- ret = pthreadpool_init(1, &p);
+ ret = pthreadpool_pipe_init(1, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
+ ret = pthreadpool_pipe_add_job(p, 1, test_sleep, &timeout);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_add_job failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_add_job failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != EBUSY) {
fprintf(stderr, "Could destroy a busy pool\n");
return -1;
}
- pfd.fd = pthreadpool_signal_fd(p);
+ pfd.fd = pthreadpool_pipe_signal_fd(p);
pfd.events = POLLIN|POLLERR;
poll(&pfd, 1, -1);
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
strerror(ret));
return -1;
}
struct threaded_state {
pthread_t tid;
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int start_job;
int num_jobs;
int timeout;
int i;
for (i=0; i<state->num_jobs; i++) {
- int ret = pthreadpool_add_job(state->p, state->start_job + i,
- test_sleep, &state->timeout);
+ int ret = pthreadpool_pipe_add_job(
+ state->p, state->start_job + i,
+ test_sleep, &state->timeout);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_add_job failed: %s\n",
- strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_add_job failed: "
+ "%s\n", strerror(ret));
return NULL;
}
}
static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
int num_jobs)
{
- struct pthreadpool **pools;
+ struct pthreadpool_pipe **pools;
struct threaded_state *states;
struct threaded_state *state;
struct pollfd *pfds;
return -1;
}
- pools = calloc(num_pools, sizeof(struct pthreadpool *));
+ pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
if (pools == NULL) {
fprintf(stderr, "calloc failed\n");
return -1;
}
for (i=0; i<num_pools; i++) {
- ret = pthreadpool_init(poolsize, &pools[i]);
+ ret = pthreadpool_pipe_init(poolsize, &pools[i]);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- pfds[i].fd = pthreadpool_signal_fd(pools[i]);
+ pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
pfds[i].events = POLLIN|POLLHUP;
}
}
if (child == 0) {
for (i=0; i<num_pools; i++) {
- ret = pthreadpool_destroy(pools[i]);
+ ret = pthreadpool_pipe_destroy(pools[i]);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: "
- "%s\n", strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_destroy "
+ "failed: %s\n", strerror(ret));
exit(1);
}
}
continue;
}
- ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(
+ pools[j], &jobid, 1);
if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
fprintf(stderr, "invalid job number %d\n",
jobid);
}
for (i=0; i<num_pools; i++) {
- ret = pthreadpool_destroy(pools[i]);
+ ret = pthreadpool_pipe_destroy(pools[i]);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
- strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: "
+ "%s\n", strerror(ret));
return -1;
}
}
static int test_fork(void)
{
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
pid_t child, waited;
int status, ret;
- ret = pthreadpool_init(1, &p);
+ ret = pthreadpool_pipe_init(1, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
strerror(ret));
return -1;
}
#include "system/time.h"
#include "system/network.h"
#include "lib/util/dlinklist.h"
-#include "pthreadpool/pthreadpool.h"
+#include "pthreadpool/pthreadpool_pipe.h"
#include "lib/util/iov_buf.h"
#include "lib/util/msghdr.h"
#include <fcntl.h>
struct poll_watch *sock_read_watch;
struct unix_dgram_send_queue *send_queues;
- struct pthreadpool *send_pool;
+ struct pthreadpool_pipe *send_pool;
struct poll_watch *pool_read_watch;
uint8_t *recv_buf;
return 0;
}
- ret = pthreadpool_init(0, &ctx->send_pool);
+ ret = pthreadpool_pipe_init(0, &ctx->send_pool);
if (ret != 0) {
return ret;
}
- signalfd = pthreadpool_signal_fd(ctx->send_pool);
+ signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
ctx->pool_read_watch = ctx->ev_funcs->watch_new(
ctx->ev_funcs, signalfd, POLLIN,
unix_dgram_job_finished, ctx);
if (ctx->pool_read_watch == NULL) {
- pthreadpool_destroy(ctx->send_pool);
+ pthreadpool_pipe_destroy(ctx->send_pool);
ctx->send_pool = NULL;
return ENOMEM;
}
struct unix_dgram_msg *msg;
int ret, job;
- ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+ ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
if (ret != 1) {
return;
}
free(msg);
if (q->msgs != NULL) {
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
if (ret == 0) {
return;
}
unix_dgram_send_queue_free(q);
return ret;
}
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return ret;
}
if (ctx->send_pool != NULL) {
- int ret = pthreadpool_destroy(ctx->send_pool);
+ int ret = pthreadpool_pipe_destroy(ctx->send_pool);
if (ret != 0) {
return ret;
}
#include "system/shmem.h"
#include "smbd/smbd.h"
#include "smbd/globals.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
#ifdef HAVE_LINUX_FALLOC_H
#include <linux/falloc.h>
#endif
***********************************************************************/
static bool init_aio_threadpool(struct tevent_context *ev_ctx,
- struct pthreadpool **pp_pool,
+ struct pthreadpool_pipe **pp_pool,
void (*completion_fn)(struct tevent_context *,
struct tevent_fd *,
uint16_t,
return true;
}
- ret = pthreadpool_init(lp_aio_max_threads(), pp_pool);
+ ret = pthreadpool_pipe_init(lp_aio_max_threads(), pp_pool);
if (ret) {
errno = ret;
return false;
}
sock_event = tevent_add_fd(ev_ctx,
NULL,
- pthreadpool_signal_fd(*pp_pool),
+ pthreadpool_pipe_signal_fd(*pp_pool),
TEVENT_FD_READ,
completion_fn,
NULL);
if (sock_event == NULL) {
- pthreadpool_destroy(*pp_pool);
+ pthreadpool_pipe_destroy(*pp_pool);
*pp_pool = NULL;
return false;
}
* process, as is the current jobid.
*/
-static struct pthreadpool *open_pool;
+static struct pthreadpool_pipe *open_pool;
static int aio_pthread_open_jobid;
struct aio_open_private_data {
return;
}
- ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(open_pool, &jobid, 1);
if (ret != 1) {
smb_panic("aio_open_handle_completion");
/* notreached. */
return -1;
}
- ret = pthreadpool_add_job(open_pool,
- opd->jobid,
- aio_open_worker,
- (void *)opd);
+ ret = pthreadpool_pipe_add_job(open_pool,
+ opd->jobid,
+ aio_open_worker,
+ (void *)opd);
if (ret) {
errno = ret;
return -1;
*/
#include "includes.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
#include "proto.h"
extern int torture_numops;
bool run_bench_pthreadpool(int dummy)
{
- struct pthreadpool *pool;
+ struct pthreadpool_pipe *pool;
int i, ret;
- ret = pthreadpool_init(1, &pool);
+ ret = pthreadpool_pipe_init(1, &pool);
if (ret != 0) {
- d_fprintf(stderr, "pthreadpool_init failed: %s\n",
+ d_fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return false;
}
for (i=0; i<torture_numops; i++) {
int jobid;
- ret = pthreadpool_add_job(pool, 0, null_job, NULL);
+ ret = pthreadpool_pipe_add_job(pool, 0, null_job, NULL);
if (ret != 0) {
- d_fprintf(stderr, "pthreadpool_add_job failed: %s\n",
- strerror(ret));
+ d_fprintf(stderr, "pthreadpool_pipe_add_job "
+ "failed: %s\n", strerror(ret));
break;
}
- ret = pthreadpool_finished_jobs(pool, &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(pool, &jobid, 1);
if (ret < 0) {
- d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
- strerror(-ret));
+ d_fprintf(stderr, "pthreadpool_pipe_finished_job "
+ "failed: %s\n", strerror(-ret));
break;
}
}
- pthreadpool_destroy(pool);
+ pthreadpool_pipe_destroy(pool);
return (ret == 1);
}