s3:modules: s/event_add_timed/tevent_add_timer
[vlendec/samba-autobuild/.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29
30 #undef recvmsg
31
32 #ifndef MAP_FILE
33 #define MAP_FILE 0
34 #endif
35
36 struct aio_fork_config {
37         bool erratic_testing_mode;
38 };
39
40 struct mmap_area {
41         size_t size;
42         volatile void *ptr;
43 };
44
45 static int mmap_area_destructor(struct mmap_area *area)
46 {
47         munmap((void *)area->ptr, area->size);
48         return 0;
49 }
50
51 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
52 {
53         struct mmap_area *result;
54         int fd;
55
56         result = talloc(mem_ctx, struct mmap_area);
57         if (result == NULL) {
58                 DEBUG(0, ("talloc failed\n"));
59                 goto fail;
60         }
61
62         fd = open("/dev/zero", O_RDWR);
63         if (fd == -1) {
64                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
65                           strerror(errno)));
66                 goto fail;
67         }
68
69         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
70                            MAP_SHARED|MAP_FILE, fd, 0);
71         if (result->ptr == MAP_FAILED) {
72                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
73                 goto fail;
74         }
75
76         close(fd);
77
78         result->size = size;
79         talloc_set_destructor(result, mmap_area_destructor);
80
81         return result;
82
83 fail:
84         TALLOC_FREE(result);
85         return NULL;
86 }
87
88 enum cmd_type {
89         READ_CMD,
90         WRITE_CMD,
91         FSYNC_CMD
92 };
93
94 static const char *cmd_type_str(enum cmd_type cmd)
95 {
96         const char *result;
97
98         switch (cmd) {
99         case READ_CMD:
100                 result = "READ";
101                 break;
102         case WRITE_CMD:
103                 result = "WRITE";
104                 break;
105         case FSYNC_CMD:
106                 result = "FSYNC";
107                 break;
108         default:
109                 result = "<UNKNOWN>";
110                 break;
111         }
112         return result;
113 }
114
115 struct rw_cmd {
116         size_t n;
117         off_t offset;
118         enum cmd_type cmd;
119         bool erratic_testing_mode;
120 };
121
122 struct rw_ret {
123         ssize_t size;
124         int ret_errno;
125 };
126
127 struct aio_child_list;
128
129 struct aio_child {
130         struct aio_child *prev, *next;
131         struct aio_child_list *list;
132         pid_t pid;
133         int sockfd;
134         struct mmap_area *map;
135         bool dont_delete;       /* Marked as in use since last cleanup */
136         bool busy;
137 };
138
139 struct aio_child_list {
140         struct aio_child *children;
141         struct tevent_timer *cleanup_event;
142 };
143
144 static void free_aio_children(void **p)
145 {
146         TALLOC_FREE(*p);
147 }
148
149 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
150 {
151         struct msghdr msg;
152         struct iovec iov[1];
153         ssize_t n;
154 #ifndef HAVE_MSGHDR_MSG_CONTROL
155         int newfd;
156 #endif
157
158 #ifdef  HAVE_MSGHDR_MSG_CONTROL
159         union {
160           struct cmsghdr        cm;
161           char                          control[CMSG_SPACE(sizeof(int))];
162         } control_un;
163         struct cmsghdr  *cmptr;
164
165         msg.msg_control = control_un.control;
166         msg.msg_controllen = sizeof(control_un.control);
167 #else
168 #if HAVE_MSGHDR_MSG_ACCTRIGHTS
169         msg.msg_accrights = (caddr_t) &newfd;
170         msg.msg_accrightslen = sizeof(int);
171 #else
172 #error Can not pass file descriptors
173 #endif
174 #endif
175
176         msg.msg_name = NULL;
177         msg.msg_namelen = 0;
178         msg.msg_flags = 0;
179
180         iov[0].iov_base = (void *)ptr;
181         iov[0].iov_len = nbytes;
182         msg.msg_iov = iov;
183         msg.msg_iovlen = 1;
184
185         if ( (n = recvmsg(fd, &msg, 0)) <= 0) {
186                 return(n);
187         }
188
189 #ifdef  HAVE_MSGHDR_MSG_CONTROL
190         if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
191             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
192                 if (cmptr->cmsg_level != SOL_SOCKET) {
193                         DEBUG(10, ("control level != SOL_SOCKET"));
194                         errno = EINVAL;
195                         return -1;
196                 }
197                 if (cmptr->cmsg_type != SCM_RIGHTS) {
198                         DEBUG(10, ("control type != SCM_RIGHTS"));
199                         errno = EINVAL;
200                         return -1;
201                 }
202                 memcpy(recvfd, CMSG_DATA(cmptr), sizeof(*recvfd));
203         } else {
204                 *recvfd = -1;           /* descriptor was not passed */
205         }
206 #else
207         if (msg.msg_accrightslen == sizeof(int)) {
208                 *recvfd = newfd;
209         }
210         else {
211                 *recvfd = -1;           /* descriptor was not passed */
212         }
213 #endif
214
215         return(n);
216 }
217
218 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
219 {
220         struct msghdr   msg;
221         struct iovec    iov[1];
222
223 #ifdef  HAVE_MSGHDR_MSG_CONTROL
224         union {
225                 struct cmsghdr  cm;
226                 char control[CMSG_SPACE(sizeof(int))];
227         } control_un;
228         struct cmsghdr  *cmptr;
229
230         ZERO_STRUCT(msg);
231         ZERO_STRUCT(control_un);
232
233         msg.msg_control = control_un.control;
234         msg.msg_controllen = sizeof(control_un.control);
235
236         cmptr = CMSG_FIRSTHDR(&msg);
237         cmptr->cmsg_len = CMSG_LEN(sizeof(int));
238         cmptr->cmsg_level = SOL_SOCKET;
239         cmptr->cmsg_type = SCM_RIGHTS;
240         memcpy(CMSG_DATA(cmptr), &sendfd, sizeof(sendfd));
241 #else
242         ZERO_STRUCT(msg);
243         msg.msg_accrights = (caddr_t) &sendfd;
244         msg.msg_accrightslen = sizeof(int);
245 #endif
246
247         msg.msg_name = NULL;
248         msg.msg_namelen = 0;
249
250         ZERO_STRUCT(iov);
251         iov[0].iov_base = (void *)ptr;
252         iov[0].iov_len = nbytes;
253         msg.msg_iov = iov;
254         msg.msg_iovlen = 1;
255
256         return (sendmsg(fd, &msg, 0));
257 }
258
259 static void aio_child_cleanup(struct tevent_context *event_ctx,
260                               struct tevent_timer *te,
261                               struct timeval now,
262                               void *private_data)
263 {
264         struct aio_child_list *list = talloc_get_type_abort(
265                 private_data, struct aio_child_list);
266         struct aio_child *child, *next;
267
268         TALLOC_FREE(list->cleanup_event);
269
270         for (child = list->children; child != NULL; child = next) {
271                 next = child->next;
272
273                 if (child->busy) {
274                         DEBUG(10, ("child %d currently active\n",
275                                    (int)child->pid));
276                         continue;
277                 }
278
279                 if (child->dont_delete) {
280                         DEBUG(10, ("Child %d was active since last cleanup\n",
281                                    (int)child->pid));
282                         child->dont_delete = false;
283                         continue;
284                 }
285
286                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
287                            "deleting\n", (int)child->pid));
288
289                 TALLOC_FREE(child);
290                 child = next;
291         }
292
293         if (list->children != NULL) {
294                 /*
295                  * Re-schedule the next cleanup round
296                  */
297                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
298                                                       timeval_add(&now, 30, 0),
299                                                       aio_child_cleanup, list);
300
301         }
302 }
303
304 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
305 {
306         struct aio_child_list *data = NULL;
307
308         if (SMB_VFS_HANDLE_TEST_DATA(handle)) {
309                 SMB_VFS_HANDLE_GET_DATA(handle, data, struct aio_child_list,
310                                         return NULL);
311         }
312
313         if (data == NULL) {
314                 data = talloc_zero(NULL, struct aio_child_list);
315                 if (data == NULL) {
316                         return NULL;
317                 }
318         }
319
320         /*
321          * Regardless of whether the child_list had been around or not, make
322          * sure that we have a cleanup timed event. This timed event will
323          * delete itself when it finds that no children are around anymore.
324          */
325
326         if (data->cleanup_event == NULL) {
327                 data->cleanup_event = tevent_add_timer(server_event_context(), data,
328                                                       timeval_current_ofs(30, 0),
329                                                       aio_child_cleanup, data);
330                 if (data->cleanup_event == NULL) {
331                         TALLOC_FREE(data);
332                         return NULL;
333                 }
334         }
335
336         if (!SMB_VFS_HANDLE_TEST_DATA(handle)) {
337                 SMB_VFS_HANDLE_SET_DATA(handle, data, free_aio_children,
338                                         struct aio_child_list, return False);
339         }
340
341         return data;
342 }
343
344 static void aio_child_loop(int sockfd, struct mmap_area *map)
345 {
346         while (true) {
347                 int fd = -1;
348                 ssize_t ret;
349                 struct rw_cmd cmd_struct;
350                 struct rw_ret ret_struct;
351
352                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
353                 if (ret != sizeof(cmd_struct)) {
354                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
355                                    strerror(errno)));
356                         exit(1);
357                 }
358
359                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
360                            cmd_type_str(cmd_struct.cmd),
361                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
362
363                 if (cmd_struct.erratic_testing_mode) {
364                         /*
365                          * For developer testing, we want erratic behaviour for
366                          * async I/O times
367                          */
368                         uint8_t randval;
369                         unsigned msecs;
370                         /*
371                          * use generate_random_buffer, we just forked from a
372                          * common parent state
373                          */
374                         generate_random_buffer(&randval, sizeof(randval));
375                         msecs = randval + 20;
376                         DEBUG(10, ("delaying for %u msecs\n", msecs));
377                         smb_msleep(msecs);
378                 }
379
380                 ZERO_STRUCT(ret_struct);
381
382                 switch (cmd_struct.cmd) {
383                 case READ_CMD:
384                         ret_struct.size = sys_pread(
385                                 fd, (void *)map->ptr, cmd_struct.n,
386                                 cmd_struct.offset);
387 #if 0
388 /* This breaks "make test" when run with aio_fork module. */
389 #ifdef DEVELOPER
390                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
391 #endif
392 #endif
393                         break;
394                 case WRITE_CMD:
395                         ret_struct.size = sys_pwrite(
396                                 fd, (void *)map->ptr, cmd_struct.n,
397                                 cmd_struct.offset);
398                         break;
399                 case FSYNC_CMD:
400                         ret_struct.size = fsync(fd);
401                         break;
402                 default:
403                         ret_struct.size = -1;
404                         errno = EINVAL;
405                 }
406
407                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
408                            (int)ret_struct.size));
409
410                 if (ret_struct.size == -1) {
411                         ret_struct.ret_errno = errno;
412                 }
413
414                 /*
415                  * Close the fd before telling our parent we're done. The
416                  * parent might close and re-open the file very quickly, and
417                  * with system-level share modes (GPFS) we would get an
418                  * unjustified SHARING_VIOLATION.
419                  */
420                 close(fd);
421
422                 ret = write_data(sockfd, (char *)&ret_struct,
423                                  sizeof(ret_struct));
424                 if (ret != sizeof(ret_struct)) {
425                         DEBUG(10, ("could not write ret_struct: %s\n",
426                                    strerror(errno)));
427                         exit(2);
428                 }
429         }
430 }
431
432 static int aio_child_destructor(struct aio_child *child)
433 {
434         char c=0;
435
436         SMB_ASSERT(!child->busy);
437
438         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
439                         child->pid, child->sockfd));
440
441         /*
442          * closing the sockfd makes the child not return from recvmsg() on RHEL
443          * 5.5 so instead force the child to exit by writing bad data to it
444          */
445         write(child->sockfd, &c, sizeof(c));
446         close(child->sockfd);
447         DLIST_REMOVE(child->list->children, child);
448         return 0;
449 }
450
451 /*
452  * We have to close all fd's in open files, we might incorrectly hold a system
453  * level share mode on a file.
454  */
455
456 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
457                                          void *private_data)
458 {
459         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
460                 close(fsp->fh->fd);
461                 fsp->fh->fd = -1;
462         }
463         return NULL;
464 }
465
466 static int create_aio_child(struct smbd_server_connection *sconn,
467                             struct aio_child_list *children,
468                             size_t map_size,
469                             struct aio_child **presult)
470 {
471         struct aio_child *result;
472         int fdpair[2];
473         int ret;
474
475         fdpair[0] = fdpair[1] = -1;
476
477         result = talloc_zero(children, struct aio_child);
478         if (result == NULL) {
479                 return ENOMEM;
480         }
481
482         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
483                 ret = errno;
484                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
485                 goto fail;
486         }
487
488         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
489
490         result->map = mmap_area_init(result, map_size);
491         if (result->map == NULL) {
492                 ret = errno;
493                 DEBUG(0, ("Could not create mmap area\n"));
494                 goto fail;
495         }
496
497         result->pid = fork();
498         if (result->pid == -1) {
499                 ret = errno;
500                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
501                 goto fail;
502         }
503
504         if (result->pid == 0) {
505                 close(fdpair[0]);
506                 result->sockfd = fdpair[1];
507                 files_forall(sconn, close_fsp_fd, NULL);
508                 aio_child_loop(result->sockfd, result->map);
509         }
510
511         DEBUG(10, ("Child %d created with sockfd %d\n",
512                         result->pid, fdpair[0]));
513
514         result->sockfd = fdpair[0];
515         close(fdpair[1]);
516
517         result->list = children;
518         DLIST_ADD(children->children, result);
519
520         talloc_set_destructor(result, aio_child_destructor);
521
522         *presult = result;
523
524         return 0;
525
526  fail:
527         if (fdpair[0] != -1) close(fdpair[0]);
528         if (fdpair[1] != -1) close(fdpair[1]);
529         TALLOC_FREE(result);
530
531         return ret;
532 }
533
534 static int get_idle_child(struct vfs_handle_struct *handle,
535                           struct aio_child **pchild)
536 {
537         struct aio_child_list *children;
538         struct aio_child *child;
539
540         children = init_aio_children(handle);
541         if (children == NULL) {
542                 return ENOMEM;
543         }
544
545         for (child = children->children; child != NULL; child = child->next) {
546                 if (!child->busy) {
547                         break;
548                 }
549         }
550
551         if (child == NULL) {
552                 int ret;
553
554                 DEBUG(10, ("no idle child found, creating new one\n"));
555
556                 ret = create_aio_child(handle->conn->sconn, children,
557                                           128*1024, &child);
558                 if (ret != 0) {
559                         DEBUG(10, ("create_aio_child failed: %s\n",
560                                    strerror(errno)));
561                         return ret;
562                 }
563         }
564
565         child->dont_delete = true;
566         child->busy = true;
567
568         *pchild = child;
569         return 0;
570 }
571
572 struct aio_fork_pread_state {
573         struct aio_child *child;
574         ssize_t ret;
575         int err;
576 };
577
578 static void aio_fork_pread_done(struct tevent_req *subreq);
579
580 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
581                                               TALLOC_CTX *mem_ctx,
582                                               struct tevent_context *ev,
583                                               struct files_struct *fsp,
584                                               void *data,
585                                               size_t n, off_t offset)
586 {
587         struct tevent_req *req, *subreq;
588         struct aio_fork_pread_state *state;
589         struct rw_cmd cmd;
590         ssize_t written;
591         int err;
592         struct aio_fork_config *config;
593
594         SMB_VFS_HANDLE_GET_DATA(handle, config,
595                                 struct aio_fork_config,
596                                 return NULL);
597
598         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
599         if (req == NULL) {
600                 return NULL;
601         }
602
603         if (n > 128*1024) {
604                 /* TODO: support variable buffers */
605                 tevent_req_error(req, EINVAL);
606                 return tevent_req_post(req, ev);
607         }
608
609         err = get_idle_child(handle, &state->child);
610         if (err != 0) {
611                 tevent_req_error(req, err);
612                 return tevent_req_post(req, ev);
613         }
614
615         ZERO_STRUCT(cmd);
616         cmd.n = n;
617         cmd.offset = offset;
618         cmd.cmd = READ_CMD;
619         cmd.erratic_testing_mode = config->erratic_testing_mode;
620
621         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
622                    (int)state->child->pid));
623
624         /*
625          * Not making this async. We're writing into an empty unix
626          * domain socket. This should never block.
627          */
628         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
629                            fsp->fh->fd);
630         if (written == -1) {
631                 err = errno;
632
633                 TALLOC_FREE(state->child);
634
635                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
636                 tevent_req_error(req, err);
637                 return tevent_req_post(req, ev);
638         }
639
640         subreq = read_packet_send(state, ev, state->child->sockfd,
641                                   sizeof(struct rw_ret), NULL, NULL);
642         if (tevent_req_nomem(subreq, req)) {
643                 TALLOC_FREE(state->child); /* we sent sth down */
644                 return tevent_req_post(req, ev);
645         }
646         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
647         return req;
648 }
649
650 static void aio_fork_pread_done(struct tevent_req *subreq)
651 {
652         struct tevent_req *req = tevent_req_callback_data(
653                 subreq, struct tevent_req);
654         struct aio_fork_pread_state *state = tevent_req_data(
655                 req, struct aio_fork_pread_state);
656         ssize_t nread;
657         uint8_t *buf;
658         int err;
659         struct rw_ret *retbuf;
660
661         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
662         TALLOC_FREE(subreq);
663         if (nread == -1) {
664                 TALLOC_FREE(state->child);
665                 tevent_req_error(req, err);
666                 return;
667         }
668
669         state->child->busy = false;
670
671         retbuf = (struct rw_ret *)buf;
672         state->ret = retbuf->size;
673         state->err = retbuf->ret_errno;
674         tevent_req_done(req);
675 }
676
677 static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err)
678 {
679         struct aio_fork_pread_state *state = tevent_req_data(
680                 req, struct aio_fork_pread_state);
681
682         if (tevent_req_is_unix_error(req, err)) {
683                 return -1;
684         }
685         if (state->ret == -1) {
686                 *err = state->err;
687         }
688         return state->ret;
689 }
690
691 struct aio_fork_pwrite_state {
692         struct aio_child *child;
693         ssize_t ret;
694         int err;
695 };
696
697 static void aio_fork_pwrite_done(struct tevent_req *subreq);
698
699 static struct tevent_req *aio_fork_pwrite_send(
700         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
701         struct tevent_context *ev, struct files_struct *fsp,
702         const void *data, size_t n, off_t offset)
703 {
704         struct tevent_req *req, *subreq;
705         struct aio_fork_pwrite_state *state;
706         struct rw_cmd cmd;
707         ssize_t written;
708         int err;
709         struct aio_fork_config *config;
710         SMB_VFS_HANDLE_GET_DATA(handle, config,
711                                 struct aio_fork_config,
712                                 return NULL);
713
714         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
715         if (req == NULL) {
716                 return NULL;
717         }
718
719         if (n > 128*1024) {
720                 /* TODO: support variable buffers */
721                 tevent_req_error(req, EINVAL);
722                 return tevent_req_post(req, ev);
723         }
724
725         err = get_idle_child(handle, &state->child);
726         if (err != 0) {
727                 tevent_req_error(req, err);
728                 return tevent_req_post(req, ev);
729         }
730
731         ZERO_STRUCT(cmd);
732         cmd.n = n;
733         cmd.offset = offset;
734         cmd.cmd = WRITE_CMD;
735         cmd.erratic_testing_mode = config->erratic_testing_mode;
736
737         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
738                    (int)state->child->pid));
739
740         /*
741          * Not making this async. We're writing into an empty unix
742          * domain socket. This should never block.
743          */
744         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
745                            fsp->fh->fd);
746         if (written == -1) {
747                 err = errno;
748
749                 TALLOC_FREE(state->child);
750
751                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
752                 tevent_req_error(req, err);
753                 return tevent_req_post(req, ev);
754         }
755
756         subreq = read_packet_send(state, ev, state->child->sockfd,
757                                   sizeof(struct rw_ret), NULL, NULL);
758         if (tevent_req_nomem(subreq, req)) {
759                 TALLOC_FREE(state->child); /* we sent sth down */
760                 return tevent_req_post(req, ev);
761         }
762         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
763         return req;
764 }
765
766 static void aio_fork_pwrite_done(struct tevent_req *subreq)
767 {
768         struct tevent_req *req = tevent_req_callback_data(
769                 subreq, struct tevent_req);
770         struct aio_fork_pwrite_state *state = tevent_req_data(
771                 req, struct aio_fork_pwrite_state);
772         ssize_t nread;
773         uint8_t *buf;
774         int err;
775         struct rw_ret *retbuf;
776
777         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
778         TALLOC_FREE(subreq);
779         if (nread == -1) {
780                 TALLOC_FREE(state->child);
781                 tevent_req_error(req, err);
782                 return;
783         }
784
785         state->child->busy = false;
786
787         retbuf = (struct rw_ret *)buf;
788         state->ret = retbuf->size;
789         state->err = retbuf->ret_errno;
790         tevent_req_done(req);
791 }
792
793 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err)
794 {
795         struct aio_fork_pwrite_state *state = tevent_req_data(
796                 req, struct aio_fork_pwrite_state);
797
798         if (tevent_req_is_unix_error(req, err)) {
799                 return -1;
800         }
801         if (state->ret == -1) {
802                 *err = state->err;
803         }
804         return state->ret;
805 }
806
807 struct aio_fork_fsync_state {
808         struct aio_child *child;
809         ssize_t ret;
810         int err;
811 };
812
813 static void aio_fork_fsync_done(struct tevent_req *subreq);
814
815 static struct tevent_req *aio_fork_fsync_send(
816         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
817         struct tevent_context *ev, struct files_struct *fsp)
818 {
819         struct tevent_req *req, *subreq;
820         struct aio_fork_fsync_state *state;
821         struct rw_cmd cmd;
822         ssize_t written;
823         int err;
824         struct aio_fork_config *config;
825
826         SMB_VFS_HANDLE_GET_DATA(handle, config,
827                                 struct aio_fork_config,
828                                 return NULL);
829
830         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
831         if (req == NULL) {
832                 return NULL;
833         }
834
835         err = get_idle_child(handle, &state->child);
836         if (err != 0) {
837                 tevent_req_error(req, err);
838                 return tevent_req_post(req, ev);
839         }
840
841         ZERO_STRUCT(cmd);
842         cmd.cmd = FSYNC_CMD;
843         cmd.erratic_testing_mode = config->erratic_testing_mode;
844
845         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
846                    (int)state->child->pid));
847
848         /*
849          * Not making this async. We're writing into an empty unix
850          * domain socket. This should never block.
851          */
852         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
853                            fsp->fh->fd);
854         if (written == -1) {
855                 err = errno;
856
857                 TALLOC_FREE(state->child);
858
859                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
860                 tevent_req_error(req, err);
861                 return tevent_req_post(req, ev);
862         }
863
864         subreq = read_packet_send(state, ev, state->child->sockfd,
865                                   sizeof(struct rw_ret), NULL, NULL);
866         if (tevent_req_nomem(subreq, req)) {
867                 TALLOC_FREE(state->child); /* we sent sth down */
868                 return tevent_req_post(req, ev);
869         }
870         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
871         return req;
872 }
873
874 static void aio_fork_fsync_done(struct tevent_req *subreq)
875 {
876         struct tevent_req *req = tevent_req_callback_data(
877                 subreq, struct tevent_req);
878         struct aio_fork_fsync_state *state = tevent_req_data(
879                 req, struct aio_fork_fsync_state);
880         ssize_t nread;
881         uint8_t *buf;
882         int err;
883         struct rw_ret *retbuf;
884
885         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
886         TALLOC_FREE(subreq);
887         if (nread == -1) {
888                 TALLOC_FREE(state->child);
889                 tevent_req_error(req, err);
890                 return;
891         }
892
893         state->child->busy = false;
894
895         retbuf = (struct rw_ret *)buf;
896         state->ret = retbuf->size;
897         state->err = retbuf->ret_errno;
898         tevent_req_done(req);
899 }
900
901 static int aio_fork_fsync_recv(struct tevent_req *req, int *err)
902 {
903         struct aio_fork_fsync_state *state = tevent_req_data(
904                 req, struct aio_fork_fsync_state);
905
906         if (tevent_req_is_unix_error(req, err)) {
907                 return -1;
908         }
909         if (state->ret == -1) {
910                 *err = state->err;
911         }
912         return state->ret;
913 }
914
915 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
916                             const char *user)
917 {
918         int ret;
919         struct aio_fork_config *config;
920         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
921
922         if (ret < 0) {
923                 return ret;
924         }
925
926         config = talloc_zero(handle->conn, struct aio_fork_config);
927         if (!config) {
928                 SMB_VFS_NEXT_DISCONNECT(handle);
929                 DEBUG(0, ("talloc_zero() failed\n"));
930                 return -1;
931         }
932
933         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
934                                                     "erratic_testing_mode", false);
935         
936         SMB_VFS_HANDLE_SET_DATA(handle, config,
937                                 NULL, struct aio_fork_config,
938                                 return -1);
939
940         /*********************************************************************
941          * How many threads to initialize ?
942          * 100 per process seems insane as a default until you realize that
943          * (a) Threads terminate after 1 second when idle.
944          * (b) Throttling is done in SMB2 via the crediting algorithm.
945          * (c) SMB1 clients are limited to max_mux (50) outstanding
946          *     requests and Windows clients don't use this anyway.
947          * Essentially we want this to be unlimited unless smb.conf
948          * says different.
949          *********************************************************************/
950         aio_pending_size = 100;
951         return 0;
952 }
953
954 static struct vfs_fn_pointers vfs_aio_fork_fns = {
955         .connect_fn = aio_fork_connect,
956         .pread_send_fn = aio_fork_pread_send,
957         .pread_recv_fn = aio_fork_pread_recv,
958         .pwrite_send_fn = aio_fork_pwrite_send,
959         .pwrite_recv_fn = aio_fork_pwrite_recv,
960         .fsync_send_fn = aio_fork_fsync_send,
961         .fsync_recv_fn = aio_fork_fsync_recv,
962 };
963
964 NTSTATUS vfs_aio_fork_init(void);
965 NTSTATUS vfs_aio_fork_init(void)
966 {
967         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
968                                 "aio_fork", &vfs_aio_fork_fns);
969 }