#include "system/filesys.h"
#include "pthreadpool.h"
#include "lib/util/dlinklist.h"
-#include "lib/util/blocking.h"
#ifdef NDEBUG
#undef NDEBUG
*/
pthread_cond_t condvar;
- int check_pipefd[2];
-
/*
* Array of jobs
*/
{
struct pthreadpool *pool;
int ret;
- bool ok;
pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
if (pool == NULL) {
return ENOMEM;
}
- ret = pipe(pool->check_pipefd);
- if (ret != 0) {
- free(pool->jobs);
- free(pool);
- return ENOMEM;
- }
-
- ok = smb_set_close_on_exec(pool->check_pipefd[0]);
- if (!ok) {
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
- free(pool->jobs);
- free(pool);
- return EINVAL;
- }
- ok = smb_set_close_on_exec(pool->check_pipefd[1]);
- if (!ok) {
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
- free(pool->jobs);
- free(pool);
- return EINVAL;
- }
- ret = set_blocking(pool->check_pipefd[0], true);
- if (ret == -1) {
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
- free(pool->jobs);
- free(pool);
- return EINVAL;
- }
- ret = set_blocking(pool->check_pipefd[1], false);
- if (ret == -1) {
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
- free(pool->jobs);
- free(pool);
- return EINVAL;
- }
-
pool->head = pool->num_jobs = 0;
ret = pthread_mutex_init(&pool->mutex, NULL);
if (ret != 0) {
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
ret = pthread_cond_init(&pool->condvar, NULL);
if (ret != 0) {
pthread_mutex_destroy(&pool->mutex);
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
if (ret != 0) {
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
pthread_mutex_destroy(&pool->fork_mutex);
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
- close(pool->check_pipefd[0]);
- close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
pool->head = 0;
pool->num_jobs = 0;
pool->stopped = true;
- if (pool->check_pipefd[0] != -1) {
- close(pool->check_pipefd[0]);
- pool->check_pipefd[0] = -1;
- }
- if (pool->check_pipefd[1] != -1) {
- close(pool->check_pipefd[1]);
- pool->check_pipefd[1] = -1;
- }
ret = pthread_cond_init(&pool->condvar, NULL);
assert(ret == 0);
return ret2;
}
- if (pool->check_pipefd[0] != -1) {
- close(pool->check_pipefd[0]);
- pool->check_pipefd[0] = -1;
- }
- if (pool->check_pipefd[1] != -1) {
- close(pool->check_pipefd[1]);
- pool->check_pipefd[1] = -1;
- }
free(pool->jobs);
free(pool);
pool->stopped = true;
- if (pool->check_pipefd[0] != -1) {
- close(pool->check_pipefd[0]);
- pool->check_pipefd[0] = -1;
- }
- if (pool->check_pipefd[1] != -1) {
- close(pool->check_pipefd[1]);
- pool->check_pipefd[1] = -1;
- }
-
if (pool->num_threads == 0) {
return 0;
}
free_it = (pool->destroyed && (pool->num_threads == 0));
- while (true) {
- uint8_t c = 0;
- ssize_t nwritten = 0;
-
- if (pool->check_pipefd[1] == -1) {
- break;
- }
-
- nwritten = write(pool->check_pipefd[1], &c, 1);
- if (nwritten == -1) {
- if (errno == EINTR) {
- continue;
- }
- if (errno == EAGAIN) {
- break;
- }
-#ifdef EWOULDBLOCK
- if (errno == EWOULDBLOCK) {
- break;
- }
-#endif
- /* ignore ... */
- }
-
- break;
- }
-
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
return res;
}
-int pthreadpool_restart_check(struct pthreadpool *pool)
-{
- int res;
- int unlock_res;
- unsigned possible_threads = 0;
- unsigned missing_threads = 0;
-
- assert(!pool->destroyed);
-
- res = pthread_mutex_lock(&pool->mutex);
- if (res != 0) {
- return res;
- }
-
- if (pool->stopped) {
- /*
- * Protect against the pool being shut down while
- * trying to add a job
- */
- unlock_res = pthread_mutex_unlock(&pool->mutex);
- assert(unlock_res == 0);
- return EINVAL;
- }
-
- if (pool->num_jobs == 0) {
- /*
- * This also handles the pool->max_threads == 0 case as it never
- * calls pthreadpool_put_job()
- */
- unlock_res = pthread_mutex_unlock(&pool->mutex);
- assert(unlock_res == 0);
- return 0;
- }
-
- if (pool->num_idle > 0) {
- /*
- * We have idle threads and pending jobs,
- * this means we better let all threads
- * start and check for pending jobs.
- */
- res = pthread_cond_broadcast(&pool->condvar);
- assert(res == 0);
- }
-
- if (pool->num_threads < pool->max_threads) {
- possible_threads = pool->max_threads - pool->num_threads;
- }
-
- if (pool->num_idle < pool->num_jobs) {
- missing_threads = pool->num_jobs - pool->num_idle;
- }
-
- missing_threads = MIN(missing_threads, possible_threads);
-
- while (missing_threads > 0) {
-
- res = pthreadpool_create_thread(pool);
- if (res != 0) {
- break;
- }
-
- missing_threads--;
- }
-
- if (missing_threads == 0) {
- /*
- * Ok, we recreated all thread we need.
- */
- unlock_res = pthread_mutex_unlock(&pool->mutex);
- assert(unlock_res == 0);
- return 0;
- }
-
- if (pool->num_threads != 0) {
- /*
- * At least one thread is still available, let
- * that one run the queued jobs.
- */
- unlock_res = pthread_mutex_unlock(&pool->mutex);
- assert(unlock_res == 0);
- return 0;
- }
-
- /*
- * There's no thread available to run any pending jobs.
- * The caller may want to cancel the jobs and destroy the pool.
- * But that's up to the caller.
- */
- unlock_res = pthread_mutex_unlock(&pool->mutex);
- assert(unlock_res == 0);
-
- return res;
-}
-
-int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
-{
- int fd;
- int ret;
- bool ok;
-
- if (pool->stopped) {
- errno = EINVAL;
- return -1;
- }
-
- if (pool->check_pipefd[0] == -1) {
- errno = ENOSYS;
- return -1;
- }
-
- fd = dup(pool->check_pipefd[0]);
- if (fd == -1) {
- return -1;
- }
-
- ok = smb_set_close_on_exec(fd);
- if (!ok) {
- int saved_errno = errno;
- close(fd);
- errno = saved_errno;
- return -1;
- }
-
- ret = set_blocking(fd, false);
- if (ret == -1) {
- int saved_errno = errno;
- close(fd);
- errno = saved_errno;
- return -1;
- }
-
- return fd;
-}
-
-int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
-{
- if (pool->stopped) {
- return EINVAL;
- }
-
- if (pool->check_pipefd[0] == -1) {
- return ENOSYS;
- }
-
- while (true) {
- uint8_t buf[128];
- ssize_t nread;
-
- nread = read(pool->check_pipefd[0], buf, sizeof(buf));
- if (nread == -1) {
- if (errno == EINTR) {
- continue;
- }
- if (errno == EAGAIN) {
- return 0;
- }
-#ifdef EWOULDBLOCK
- if (errno == EWOULDBLOCK) {
- return 0;
- }
-#endif
- if (errno == 0) {
- errno = INT_MAX;
- }
-
- return errno;
- }
-
- if (nread < sizeof(buf)) {
- return 0;
- }
- }
-
- abort();
- return INT_MAX;
-}
-
size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data);
-/**
- * @brief Check if the pthreadpool needs a restart.
- *
- * This checks if there are enough threads to run the already
- * queued jobs. This should be called only the callers signal_fn
- * (passed to pthreadpool_init()) returned an error, so
- * that the job's worker thread exited.
- *
- * Typically this is called once the file destriptor
- * returned by pthreadpool_restart_check_monitor_fd()
- * became readable and pthreadpool_restart_check_monitor_drain()
- * returned success.
- *
- * This function tries to restart the missing threads.
- *
- * @param[in] pool The pool to run the job on
- * @return success: 0, failure: errno
- *
- * @see pthreadpool_restart_check_monitor_fd
- * @see pthreadpool_restart_check_monitor_drain
- */
-int pthreadpool_restart_check(struct pthreadpool *pool);
-
-/**
- * @brief Return a file destriptor that monitors the pool.
- *
- * If the file destrictor becomes readable,
- * the event handler should call pthreadpool_restart_check_monitor_drain().
- *
- * pthreadpool_restart_check() should also be called once the
- * state is drained.
- *
- * This function returns a fresh fd using dup() each time.
- *
- * If the pool doesn't require restarts, this function
- * returns -1 and sets errno = ENOSYS. The caller
- * may ignore that situation.
- *
- * @param[in] pool The pool to run the job on
- * @return success: 0, failure: -1 (set errno)
- *
- * @see pthreadpool_restart_check_monitor_fd
- * @see pthreadpool_restart_check_monitor_drain
- */
-int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool);
-
-/**
- * @brief Drain the monitor file destriptor of the pool.
- *
- * If the file destrictor returned by pthreadpool_restart_check_monitor_fd()
- * becomes readable, pthreadpool_restart_check_monitor_drain() should be
- * called before pthreadpool_restart_check().
- *
- * If this function returns an error the caller should close
- * the file destriptor it got from pthreadpool_restart_check_monitor_fd().
- *
- * @param[in] pool The pool to run the job on
- * @return success: 0, failure: errno
- *
- * @see pthreadpool_restart_check_monitor_fd
- * @see pthreadpool_restart_check
- */
-int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool);
-
/**
* @brief Try to cancel a job in a pthreadpool
*