#include "system/shmem.h"
#include "smbd/smbd.h"
#include "smbd/globals.h"
-#include "../lib/pthreadpool/pthreadpool_pipe.h"
+#include "../lib/pthreadpool/pthreadpool_tevent.h"
#ifdef HAVE_LINUX_FALLOC_H
#include <linux/falloc.h>
#endif
#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
-/************************************************************************
- Ensure thread pool is initialized.
-***********************************************************************/
-
-static bool init_aio_threadpool(struct tevent_context *ev_ctx,
- struct pthreadpool_pipe **pp_pool,
- void (*completion_fn)(struct tevent_context *,
- struct tevent_fd *,
- uint16_t,
- void *))
-{
- struct tevent_fd *sock_event = NULL;
- int ret = 0;
-
- if (*pp_pool) {
- return true;
- }
-
- ret = pthreadpool_pipe_init(lp_aio_max_threads(), pp_pool);
- if (ret) {
- errno = ret;
- return false;
- }
- sock_event = tevent_add_fd(ev_ctx,
- NULL,
- pthreadpool_pipe_signal_fd(*pp_pool),
- TEVENT_FD_READ,
- completion_fn,
- NULL);
- if (sock_event == NULL) {
- pthreadpool_pipe_destroy(*pp_pool);
- *pp_pool = NULL;
- return false;
- }
-
- DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
- (int)lp_aio_max_threads()));
-
- return true;
-}
-
/*
* We must have openat() to do any thread-based
* asynchronous opens. We also must be using
* for now).
*/
-/*
- * NB. This threadpool is shared over all
- * instances of this VFS module in this
- * process, as is the current jobid.
- */
-
-static struct pthreadpool_pipe *open_pool;
-static int aio_pthread_open_jobid;
-
struct aio_open_private_data {
struct aio_open_private_data *prev, *next;
/* Inputs. */
- int jobid;
int dir_fd;
int flags;
mode_t mode;
/* List of outstanding requests we have. */
static struct aio_open_private_data *open_pd_list;
-/************************************************************************
- Find the open private data by jobid.
-***********************************************************************/
-
-static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
-{
- struct aio_open_private_data *opd;
-
- for (opd = open_pd_list; opd != NULL; opd = opd->next) {
- if (opd->jobid == jobid) {
- return opd;
- }
- }
-
- return NULL;
-}
-
/************************************************************************
Find the open private data by mid.
***********************************************************************/
Callback when an open completes.
***********************************************************************/
-static void aio_open_handle_completion(struct tevent_context *event_ctx,
- struct tevent_fd *event,
- uint16_t flags,
- void *p)
+static void aio_open_handle_completion(struct tevent_req *subreq)
{
- struct aio_open_private_data *opd = NULL;
- int jobid = 0;
+ struct aio_open_private_data *opd =
+ tevent_req_callback_data(subreq,
+ struct aio_open_private_data);
int ret;
struct smbXsrv_connection *xconn;
- DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
- (int)flags));
-
- if ((flags & TEVENT_FD_READ) == 0) {
- return;
- }
-
- ret = pthreadpool_pipe_finished_jobs(open_pool, &jobid, 1);
- if (ret != 1) {
+ ret = pthreadpool_tevent_job_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (ret != 0) {
smb_panic("aio_open_handle_completion");
/* notreached. */
return;
}
- opd = find_open_private_data_by_jobid(jobid);
- if (opd == NULL) {
- DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
- jobid));
- smb_panic("aio_open_handle_completion - no jobid");
- /* notreached. */
- return;
- }
-
- DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
+ DEBUG(10,("aio_open_handle_completion: mid %llu "
"for file %s/%s completed\n",
- jobid,
(unsigned long long)opd->mid,
opd->dname,
opd->fname));
return NULL;
}
- opd->jobid = aio_pthread_open_jobid++;
opd->dir_fd = -1;
opd->ret_fd = -1;
opd->ret_errno = EINPROGRESS;
mode_t mode)
{
struct aio_open_private_data *opd = NULL;
- int ret;
-
- if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
- &open_pool,
- aio_open_handle_completion)) {
- return -1;
- }
+ struct tevent_req *subreq = NULL;
opd = create_private_open_data(fsp, flags, mode);
if (opd == NULL) {
return -1;
}
- ret = pthreadpool_pipe_add_job(open_pool,
- opd->jobid,
- aio_open_worker,
- (void *)opd);
- if (ret) {
- errno = ret;
+ subreq = pthreadpool_tevent_job_send(opd,
+ fsp->conn->sconn->ev_ctx,
+ fsp->conn->sconn->pool,
+ aio_open_worker, opd);
+ if (subreq == NULL) {
return -1;
}
+ tevent_req_set_callback(subreq, aio_open_handle_completion, opd);
- DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
+ DEBUG(5,("open_async: mid %llu created for file %s/%s\n",
(unsigned long long)opd->mid,
- opd->jobid,
opd->dname,
opd->fname));
if (opd->in_progress) {
DEBUG(0,("find_completed_open: mid %llu "
- "jobid %d still in progress for "
+ "still in progress for "
"file %s/%s. PANIC !\n",
(unsigned long long)opd->mid,
- opd->jobid,
opd->dname,
opd->fname));
/* Disaster ! This is an open timeout. Just panic. */
DEBUG(5,("find_completed_open: mid %llu returning "
"fd = %d, errno = %d (%s) "
- "jobid (%d) for file %s\n",
+ "for file %s\n",
(unsigned long long)opd->mid,
opd->ret_fd,
opd->ret_errno,
strerror(opd->ret_errno),
- opd->jobid,
smb_fname_str_dbg(fsp->fsp_name)));
/* Now we can free the opd. */