s3:vfs_aio_pthread: make use of pthreadpool_tevent instead of pthreadpool_pipe
authorStefan Metzmacher <metze@samba.org>
Fri, 9 Mar 2018 14:02:04 +0000 (15:02 +0100)
committerVolker Lendecke <vl@samba.org>
Mon, 23 Apr 2018 11:30:06 +0000 (13:30 +0200)
pthreadpool_tevent provides a much simpler api and avoids an extra
pipe for the completion notification.

This means we now have just one thread pool, that's shared for
all async pread, pwrite, fsync and openat() calls, instead of having
an extra pool for openat() with the same possible number of threads.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Volker Lendecke <vl@samba.org>
Autobuild-User(master): Volker Lendecke <vl@samba.org>
Autobuild-Date(master): Mon Apr 23 13:30:06 CEST 2018 on sn-devel-144

source3/modules/vfs_aio_pthread.c

index d78dd110e3cc8ccbf13b2a267fc7118f4c127704..88794bb7acc7e2787a4e406f337ac475426a07c0 100644 (file)
 #include "system/shmem.h"
 #include "smbd/smbd.h"
 #include "smbd/globals.h"
-#include "../lib/pthreadpool/pthreadpool_pipe.h"
+#include "../lib/pthreadpool/pthreadpool_tevent.h"
 #ifdef HAVE_LINUX_FALLOC_H
 #include <linux/falloc.h>
 #endif
 
 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
 
-/************************************************************************
- Ensure thread pool is initialized.
-***********************************************************************/
-
-static bool init_aio_threadpool(struct tevent_context *ev_ctx,
-                               struct pthreadpool_pipe **pp_pool,
-                               void (*completion_fn)(struct tevent_context *,
-                                               struct tevent_fd *,
-                                               uint16_t,
-                                               void *))
-{
-       struct tevent_fd *sock_event = NULL;
-       int ret = 0;
-
-       if (*pp_pool) {
-               return true;
-       }
-
-       ret = pthreadpool_pipe_init(lp_aio_max_threads(), pp_pool);
-       if (ret) {
-               errno = ret;
-               return false;
-       }
-       sock_event = tevent_add_fd(ev_ctx,
-                               NULL,
-                               pthreadpool_pipe_signal_fd(*pp_pool),
-                               TEVENT_FD_READ,
-                               completion_fn,
-                               NULL);
-       if (sock_event == NULL) {
-               pthreadpool_pipe_destroy(*pp_pool);
-               *pp_pool = NULL;
-               return false;
-       }
-
-       DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
-                 (int)lp_aio_max_threads()));
-
-       return true;
-}
-
 /*
  * We must have openat() to do any thread-based
  * asynchronous opens. We also must be using
@@ -81,19 +40,9 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
  * for now).
  */
 
-/*
- * NB. This threadpool is shared over all
- * instances of this VFS module in this
- * process, as is the current jobid.
- */
-
-static struct pthreadpool_pipe *open_pool;
-static int aio_pthread_open_jobid;
-
 struct aio_open_private_data {
        struct aio_open_private_data *prev, *next;
        /* Inputs. */
-       int jobid;
        int dir_fd;
        int flags;
        mode_t mode;
@@ -112,23 +61,6 @@ struct aio_open_private_data {
 /* List of outstanding requests we have. */
 static struct aio_open_private_data *open_pd_list;
 
-/************************************************************************
- Find the open private data by jobid.
-***********************************************************************/
-
-static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
-{
-       struct aio_open_private_data *opd;
-
-       for (opd = open_pd_list; opd != NULL; opd = opd->next) {
-               if (opd->jobid == jobid) {
-                       return opd;
-               }
-       }
-
-       return NULL;
-}
-
 /************************************************************************
  Find the open private data by mid.
 ***********************************************************************/
@@ -150,42 +82,24 @@ static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
  Callback when an open completes.
 ***********************************************************************/
 
-static void aio_open_handle_completion(struct tevent_context *event_ctx,
-                               struct tevent_fd *event,
-                               uint16_t flags,
-                               void *p)
+static void aio_open_handle_completion(struct tevent_req *subreq)
 {
-       struct aio_open_private_data *opd = NULL;
-       int jobid = 0;
+       struct aio_open_private_data *opd =
+               tevent_req_callback_data(subreq,
+               struct aio_open_private_data);
        int ret;
        struct smbXsrv_connection *xconn;
 
-       DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
-               (int)flags));
-
-       if ((flags & TEVENT_FD_READ) == 0) {
-               return;
-       }
-
-       ret = pthreadpool_pipe_finished_jobs(open_pool, &jobid, 1);
-       if (ret != 1) {
+       ret = pthreadpool_tevent_job_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (ret != 0) {
                smb_panic("aio_open_handle_completion");
                /* notreached. */
                return;
        }
 
-       opd = find_open_private_data_by_jobid(jobid);
-       if (opd == NULL) {
-               DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
-                       jobid));
-               smb_panic("aio_open_handle_completion - no jobid");
-               /* notreached. */
-               return;
-       }
-
-       DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
+       DEBUG(10,("aio_open_handle_completion: mid %llu "
                "for file %s/%s completed\n",
-               jobid,
                (unsigned long long)opd->mid,
                opd->dname,
                opd->fname));
@@ -295,7 +209,6 @@ static struct aio_open_private_data *create_private_open_data(const files_struct
                return NULL;
        }
 
-       opd->jobid = aio_pthread_open_jobid++;
        opd->dir_fd = -1;
        opd->ret_fd = -1;
        opd->ret_errno = EINPROGRESS;
@@ -354,13 +267,7 @@ static int open_async(const files_struct *fsp,
                        mode_t mode)
 {
        struct aio_open_private_data *opd = NULL;
-       int ret;
-
-       if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
-                       &open_pool,
-                       aio_open_handle_completion)) {
-               return -1;
-       }
+       struct tevent_req *subreq = NULL;
 
        opd = create_private_open_data(fsp, flags, mode);
        if (opd == NULL) {
@@ -368,18 +275,17 @@ static int open_async(const files_struct *fsp,
                return -1;
        }
 
-       ret = pthreadpool_pipe_add_job(open_pool,
-                                      opd->jobid,
-                                      aio_open_worker,
-                                      (void *)opd);
-       if (ret) {
-               errno = ret;
+       subreq = pthreadpool_tevent_job_send(opd,
+                                            fsp->conn->sconn->ev_ctx,
+                                            fsp->conn->sconn->pool,
+                                            aio_open_worker, opd);
+       if (subreq == NULL) {
                return -1;
        }
+       tevent_req_set_callback(subreq, aio_open_handle_completion, opd);
 
-       DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
+       DEBUG(5,("open_async: mid %llu created for file %s/%s\n",
                (unsigned long long)opd->mid,
-               opd->jobid,
                opd->dname,
                opd->fname));
 
@@ -406,10 +312,9 @@ static bool find_completed_open(files_struct *fsp,
 
        if (opd->in_progress) {
                DEBUG(0,("find_completed_open: mid %llu "
-                       "jobid %d still in progress for "
+                       "still in progress for "
                        "file %s/%s. PANIC !\n",
                        (unsigned long long)opd->mid,
-                       opd->jobid,
                        opd->dname,
                        opd->fname));
                /* Disaster ! This is an open timeout. Just panic. */
@@ -423,12 +328,11 @@ static bool find_completed_open(files_struct *fsp,
 
        DEBUG(5,("find_completed_open: mid %llu returning "
                "fd = %d, errno = %d (%s) "
-               "jobid (%d) for file %s\n",
+               "for file %s\n",
                (unsigned long long)opd->mid,
                opd->ret_fd,
                opd->ret_errno,
                strerror(opd->ret_errno),
-               opd->jobid,
                smb_fname_str_dbg(fsp->fsp_name)));
 
        /* Now we can free the opd. */