build: fix the test and define for msg_accrights
[nivanova/samba-autobuild/.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29
30 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
31 # error Can not pass file descriptors
32 #endif
33
34 #undef recvmsg
35
36 #ifndef MAP_FILE
37 #define MAP_FILE 0
38 #endif
39
40 struct aio_fork_config {
41         bool erratic_testing_mode;
42 };
43
44 struct mmap_area {
45         size_t size;
46         volatile void *ptr;
47 };
48
49 static int mmap_area_destructor(struct mmap_area *area)
50 {
51         munmap((void *)area->ptr, area->size);
52         return 0;
53 }
54
55 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
56 {
57         struct mmap_area *result;
58         int fd;
59
60         result = talloc(mem_ctx, struct mmap_area);
61         if (result == NULL) {
62                 DEBUG(0, ("talloc failed\n"));
63                 goto fail;
64         }
65
66         fd = open("/dev/zero", O_RDWR);
67         if (fd == -1) {
68                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
69                           strerror(errno)));
70                 goto fail;
71         }
72
73         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
74                            MAP_SHARED|MAP_FILE, fd, 0);
75         close(fd);
76         if (result->ptr == MAP_FAILED) {
77                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
78                 goto fail;
79         }
80
81         result->size = size;
82         talloc_set_destructor(result, mmap_area_destructor);
83
84         return result;
85
86 fail:
87         TALLOC_FREE(result);
88         return NULL;
89 }
90
91 enum cmd_type {
92         READ_CMD,
93         WRITE_CMD,
94         FSYNC_CMD
95 };
96
97 static const char *cmd_type_str(enum cmd_type cmd)
98 {
99         const char *result;
100
101         switch (cmd) {
102         case READ_CMD:
103                 result = "READ";
104                 break;
105         case WRITE_CMD:
106                 result = "WRITE";
107                 break;
108         case FSYNC_CMD:
109                 result = "FSYNC";
110                 break;
111         default:
112                 result = "<UNKNOWN>";
113                 break;
114         }
115         return result;
116 }
117
118 struct rw_cmd {
119         size_t n;
120         off_t offset;
121         enum cmd_type cmd;
122         bool erratic_testing_mode;
123 };
124
125 struct rw_ret {
126         ssize_t size;
127         int ret_errno;
128 };
129
130 struct aio_child_list;
131
132 struct aio_child {
133         struct aio_child *prev, *next;
134         struct aio_child_list *list;
135         pid_t pid;
136         int sockfd;
137         struct mmap_area *map;
138         bool dont_delete;       /* Marked as in use since last cleanup */
139         bool busy;
140 };
141
142 struct aio_child_list {
143         struct aio_child *children;
144         struct tevent_timer *cleanup_event;
145 };
146
147 static void free_aio_children(void **p)
148 {
149         TALLOC_FREE(*p);
150 }
151
152 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
153 {
154         struct msghdr msg;
155         struct iovec iov[1];
156         ssize_t n;
157 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
158         int newfd;
159
160         msg.msg_accrights = (caddr_t) &newfd;
161         msg.msg_accrightslen = sizeof(int);
162 #else
163
164         union {
165           struct cmsghdr        cm;
166           char                          control[CMSG_SPACE(sizeof(int))];
167         } control_un;
168         struct cmsghdr  *cmptr;
169
170         msg.msg_control = control_un.control;
171         msg.msg_controllen = sizeof(control_un.control);
172 #endif
173
174         msg.msg_name = NULL;
175         msg.msg_namelen = 0;
176         msg.msg_flags = 0;
177
178         iov[0].iov_base = (void *)ptr;
179         iov[0].iov_len = nbytes;
180         msg.msg_iov = iov;
181         msg.msg_iovlen = 1;
182
183         if ( (n = recvmsg(fd, &msg, 0)) <= 0) {
184                 return(n);
185         }
186
187 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
188         if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
189             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
190                 if (cmptr->cmsg_level != SOL_SOCKET) {
191                         DEBUG(10, ("control level != SOL_SOCKET"));
192                         errno = EINVAL;
193                         return -1;
194                 }
195                 if (cmptr->cmsg_type != SCM_RIGHTS) {
196                         DEBUG(10, ("control type != SCM_RIGHTS"));
197                         errno = EINVAL;
198                         return -1;
199                 }
200                 memcpy(recvfd, CMSG_DATA(cmptr), sizeof(*recvfd));
201         } else {
202                 *recvfd = -1;           /* descriptor was not passed */
203         }
204 #else
205         if (msg.msg_accrightslen == sizeof(int)) {
206                 *recvfd = newfd;
207         }
208         else {
209                 *recvfd = -1;           /* descriptor was not passed */
210         }
211 #endif
212
213         return(n);
214 }
215
216 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
217 {
218         struct msghdr   msg;
219         struct iovec    iov[1];
220
221 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
222         union {
223                 struct cmsghdr  cm;
224                 char control[CMSG_SPACE(sizeof(int))];
225         } control_un;
226         struct cmsghdr  *cmptr;
227
228         ZERO_STRUCT(msg);
229         ZERO_STRUCT(control_un);
230
231         msg.msg_control = control_un.control;
232         msg.msg_controllen = sizeof(control_un.control);
233
234         cmptr = CMSG_FIRSTHDR(&msg);
235         cmptr->cmsg_len = CMSG_LEN(sizeof(int));
236         cmptr->cmsg_level = SOL_SOCKET;
237         cmptr->cmsg_type = SCM_RIGHTS;
238         memcpy(CMSG_DATA(cmptr), &sendfd, sizeof(sendfd));
239 #else
240         ZERO_STRUCT(msg);
241         msg.msg_accrights = (caddr_t) &sendfd;
242         msg.msg_accrightslen = sizeof(int);
243 #endif
244
245         msg.msg_name = NULL;
246         msg.msg_namelen = 0;
247
248         ZERO_STRUCT(iov);
249         iov[0].iov_base = (void *)ptr;
250         iov[0].iov_len = nbytes;
251         msg.msg_iov = iov;
252         msg.msg_iovlen = 1;
253
254         return (sendmsg(fd, &msg, 0));
255 }
256
257 static void aio_child_cleanup(struct tevent_context *event_ctx,
258                               struct tevent_timer *te,
259                               struct timeval now,
260                               void *private_data)
261 {
262         struct aio_child_list *list = talloc_get_type_abort(
263                 private_data, struct aio_child_list);
264         struct aio_child *child, *next;
265
266         TALLOC_FREE(list->cleanup_event);
267
268         for (child = list->children; child != NULL; child = next) {
269                 next = child->next;
270
271                 if (child->busy) {
272                         DEBUG(10, ("child %d currently active\n",
273                                    (int)child->pid));
274                         continue;
275                 }
276
277                 if (child->dont_delete) {
278                         DEBUG(10, ("Child %d was active since last cleanup\n",
279                                    (int)child->pid));
280                         child->dont_delete = false;
281                         continue;
282                 }
283
284                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
285                            "deleting\n", (int)child->pid));
286
287                 TALLOC_FREE(child);
288                 child = next;
289         }
290
291         if (list->children != NULL) {
292                 /*
293                  * Re-schedule the next cleanup round
294                  */
295                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
296                                                       timeval_add(&now, 30, 0),
297                                                       aio_child_cleanup, list);
298
299         }
300 }
301
302 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
303 {
304         struct aio_child_list *data = NULL;
305
306         if (SMB_VFS_HANDLE_TEST_DATA(handle)) {
307                 SMB_VFS_HANDLE_GET_DATA(handle, data, struct aio_child_list,
308                                         return NULL);
309         }
310
311         if (data == NULL) {
312                 data = talloc_zero(NULL, struct aio_child_list);
313                 if (data == NULL) {
314                         return NULL;
315                 }
316         }
317
318         /*
319          * Regardless of whether the child_list had been around or not, make
320          * sure that we have a cleanup timed event. This timed event will
321          * delete itself when it finds that no children are around anymore.
322          */
323
324         if (data->cleanup_event == NULL) {
325                 data->cleanup_event = tevent_add_timer(server_event_context(), data,
326                                                       timeval_current_ofs(30, 0),
327                                                       aio_child_cleanup, data);
328                 if (data->cleanup_event == NULL) {
329                         TALLOC_FREE(data);
330                         return NULL;
331                 }
332         }
333
334         if (!SMB_VFS_HANDLE_TEST_DATA(handle)) {
335                 SMB_VFS_HANDLE_SET_DATA(handle, data, free_aio_children,
336                                         struct aio_child_list, return False);
337         }
338
339         return data;
340 }
341
342 static void aio_child_loop(int sockfd, struct mmap_area *map)
343 {
344         while (true) {
345                 int fd = -1;
346                 ssize_t ret;
347                 struct rw_cmd cmd_struct;
348                 struct rw_ret ret_struct;
349
350                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
351                 if (ret != sizeof(cmd_struct)) {
352                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
353                                    strerror(errno)));
354                         exit(1);
355                 }
356
357                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
358                            cmd_type_str(cmd_struct.cmd),
359                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
360
361                 if (cmd_struct.erratic_testing_mode) {
362                         /*
363                          * For developer testing, we want erratic behaviour for
364                          * async I/O times
365                          */
366                         uint8_t randval;
367                         unsigned msecs;
368                         /*
369                          * use generate_random_buffer, we just forked from a
370                          * common parent state
371                          */
372                         generate_random_buffer(&randval, sizeof(randval));
373                         msecs = randval + 20;
374                         DEBUG(10, ("delaying for %u msecs\n", msecs));
375                         smb_msleep(msecs);
376                 }
377
378                 ZERO_STRUCT(ret_struct);
379
380                 switch (cmd_struct.cmd) {
381                 case READ_CMD:
382                         ret_struct.size = sys_pread(
383                                 fd, (void *)map->ptr, cmd_struct.n,
384                                 cmd_struct.offset);
385 #if 0
386 /* This breaks "make test" when run with aio_fork module. */
387 #ifdef DEVELOPER
388                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
389 #endif
390 #endif
391                         break;
392                 case WRITE_CMD:
393                         ret_struct.size = sys_pwrite(
394                                 fd, (void *)map->ptr, cmd_struct.n,
395                                 cmd_struct.offset);
396                         break;
397                 case FSYNC_CMD:
398                         ret_struct.size = fsync(fd);
399                         break;
400                 default:
401                         ret_struct.size = -1;
402                         errno = EINVAL;
403                 }
404
405                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
406                            (int)ret_struct.size));
407
408                 if (ret_struct.size == -1) {
409                         ret_struct.ret_errno = errno;
410                 }
411
412                 /*
413                  * Close the fd before telling our parent we're done. The
414                  * parent might close and re-open the file very quickly, and
415                  * with system-level share modes (GPFS) we would get an
416                  * unjustified SHARING_VIOLATION.
417                  */
418                 close(fd);
419
420                 ret = write_data(sockfd, (char *)&ret_struct,
421                                  sizeof(ret_struct));
422                 if (ret != sizeof(ret_struct)) {
423                         DEBUG(10, ("could not write ret_struct: %s\n",
424                                    strerror(errno)));
425                         exit(2);
426                 }
427         }
428 }
429
430 static int aio_child_destructor(struct aio_child *child)
431 {
432         char c=0;
433
434         SMB_ASSERT(!child->busy);
435
436         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
437                         child->pid, child->sockfd));
438
439         /*
440          * closing the sockfd makes the child not return from recvmsg() on RHEL
441          * 5.5 so instead force the child to exit by writing bad data to it
442          */
443         write(child->sockfd, &c, sizeof(c));
444         close(child->sockfd);
445         DLIST_REMOVE(child->list->children, child);
446         return 0;
447 }
448
449 /*
450  * We have to close all fd's in open files, we might incorrectly hold a system
451  * level share mode on a file.
452  */
453
454 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
455                                          void *private_data)
456 {
457         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
458                 close(fsp->fh->fd);
459                 fsp->fh->fd = -1;
460         }
461         return NULL;
462 }
463
464 static int create_aio_child(struct smbd_server_connection *sconn,
465                             struct aio_child_list *children,
466                             size_t map_size,
467                             struct aio_child **presult)
468 {
469         struct aio_child *result;
470         int fdpair[2];
471         int ret;
472
473         fdpair[0] = fdpair[1] = -1;
474
475         result = talloc_zero(children, struct aio_child);
476         if (result == NULL) {
477                 return ENOMEM;
478         }
479
480         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
481                 ret = errno;
482                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
483                 goto fail;
484         }
485
486         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
487
488         result->map = mmap_area_init(result, map_size);
489         if (result->map == NULL) {
490                 ret = errno;
491                 DEBUG(0, ("Could not create mmap area\n"));
492                 goto fail;
493         }
494
495         result->pid = fork();
496         if (result->pid == -1) {
497                 ret = errno;
498                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
499                 goto fail;
500         }
501
502         if (result->pid == 0) {
503                 close(fdpair[0]);
504                 result->sockfd = fdpair[1];
505                 files_forall(sconn, close_fsp_fd, NULL);
506                 aio_child_loop(result->sockfd, result->map);
507         }
508
509         DEBUG(10, ("Child %d created with sockfd %d\n",
510                         result->pid, fdpair[0]));
511
512         result->sockfd = fdpair[0];
513         close(fdpair[1]);
514
515         result->list = children;
516         DLIST_ADD(children->children, result);
517
518         talloc_set_destructor(result, aio_child_destructor);
519
520         *presult = result;
521
522         return 0;
523
524  fail:
525         if (fdpair[0] != -1) close(fdpair[0]);
526         if (fdpair[1] != -1) close(fdpair[1]);
527         TALLOC_FREE(result);
528
529         return ret;
530 }
531
532 static int get_idle_child(struct vfs_handle_struct *handle,
533                           struct aio_child **pchild)
534 {
535         struct aio_child_list *children;
536         struct aio_child *child;
537
538         children = init_aio_children(handle);
539         if (children == NULL) {
540                 return ENOMEM;
541         }
542
543         for (child = children->children; child != NULL; child = child->next) {
544                 if (!child->busy) {
545                         break;
546                 }
547         }
548
549         if (child == NULL) {
550                 int ret;
551
552                 DEBUG(10, ("no idle child found, creating new one\n"));
553
554                 ret = create_aio_child(handle->conn->sconn, children,
555                                           128*1024, &child);
556                 if (ret != 0) {
557                         DEBUG(10, ("create_aio_child failed: %s\n",
558                                    strerror(errno)));
559                         return ret;
560                 }
561         }
562
563         child->dont_delete = true;
564         child->busy = true;
565
566         *pchild = child;
567         return 0;
568 }
569
570 struct aio_fork_pread_state {
571         struct aio_child *child;
572         ssize_t ret;
573         int err;
574 };
575
576 static void aio_fork_pread_done(struct tevent_req *subreq);
577
578 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
579                                               TALLOC_CTX *mem_ctx,
580                                               struct tevent_context *ev,
581                                               struct files_struct *fsp,
582                                               void *data,
583                                               size_t n, off_t offset)
584 {
585         struct tevent_req *req, *subreq;
586         struct aio_fork_pread_state *state;
587         struct rw_cmd cmd;
588         ssize_t written;
589         int err;
590         struct aio_fork_config *config;
591
592         SMB_VFS_HANDLE_GET_DATA(handle, config,
593                                 struct aio_fork_config,
594                                 return NULL);
595
596         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
597         if (req == NULL) {
598                 return NULL;
599         }
600
601         if (n > 128*1024) {
602                 /* TODO: support variable buffers */
603                 tevent_req_error(req, EINVAL);
604                 return tevent_req_post(req, ev);
605         }
606
607         err = get_idle_child(handle, &state->child);
608         if (err != 0) {
609                 tevent_req_error(req, err);
610                 return tevent_req_post(req, ev);
611         }
612
613         ZERO_STRUCT(cmd);
614         cmd.n = n;
615         cmd.offset = offset;
616         cmd.cmd = READ_CMD;
617         cmd.erratic_testing_mode = config->erratic_testing_mode;
618
619         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
620                    (int)state->child->pid));
621
622         /*
623          * Not making this async. We're writing into an empty unix
624          * domain socket. This should never block.
625          */
626         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
627                            fsp->fh->fd);
628         if (written == -1) {
629                 err = errno;
630
631                 TALLOC_FREE(state->child);
632
633                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
634                 tevent_req_error(req, err);
635                 return tevent_req_post(req, ev);
636         }
637
638         subreq = read_packet_send(state, ev, state->child->sockfd,
639                                   sizeof(struct rw_ret), NULL, NULL);
640         if (tevent_req_nomem(subreq, req)) {
641                 TALLOC_FREE(state->child); /* we sent sth down */
642                 return tevent_req_post(req, ev);
643         }
644         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
645         return req;
646 }
647
648 static void aio_fork_pread_done(struct tevent_req *subreq)
649 {
650         struct tevent_req *req = tevent_req_callback_data(
651                 subreq, struct tevent_req);
652         struct aio_fork_pread_state *state = tevent_req_data(
653                 req, struct aio_fork_pread_state);
654         ssize_t nread;
655         uint8_t *buf;
656         int err;
657         struct rw_ret *retbuf;
658
659         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
660         TALLOC_FREE(subreq);
661         if (nread == -1) {
662                 TALLOC_FREE(state->child);
663                 tevent_req_error(req, err);
664                 return;
665         }
666
667         state->child->busy = false;
668
669         retbuf = (struct rw_ret *)buf;
670         state->ret = retbuf->size;
671         state->err = retbuf->ret_errno;
672         tevent_req_done(req);
673 }
674
675 static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err)
676 {
677         struct aio_fork_pread_state *state = tevent_req_data(
678                 req, struct aio_fork_pread_state);
679
680         if (tevent_req_is_unix_error(req, err)) {
681                 return -1;
682         }
683         if (state->ret == -1) {
684                 *err = state->err;
685         }
686         return state->ret;
687 }
688
689 struct aio_fork_pwrite_state {
690         struct aio_child *child;
691         ssize_t ret;
692         int err;
693 };
694
695 static void aio_fork_pwrite_done(struct tevent_req *subreq);
696
697 static struct tevent_req *aio_fork_pwrite_send(
698         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
699         struct tevent_context *ev, struct files_struct *fsp,
700         const void *data, size_t n, off_t offset)
701 {
702         struct tevent_req *req, *subreq;
703         struct aio_fork_pwrite_state *state;
704         struct rw_cmd cmd;
705         ssize_t written;
706         int err;
707         struct aio_fork_config *config;
708         SMB_VFS_HANDLE_GET_DATA(handle, config,
709                                 struct aio_fork_config,
710                                 return NULL);
711
712         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
713         if (req == NULL) {
714                 return NULL;
715         }
716
717         if (n > 128*1024) {
718                 /* TODO: support variable buffers */
719                 tevent_req_error(req, EINVAL);
720                 return tevent_req_post(req, ev);
721         }
722
723         err = get_idle_child(handle, &state->child);
724         if (err != 0) {
725                 tevent_req_error(req, err);
726                 return tevent_req_post(req, ev);
727         }
728
729         ZERO_STRUCT(cmd);
730         cmd.n = n;
731         cmd.offset = offset;
732         cmd.cmd = WRITE_CMD;
733         cmd.erratic_testing_mode = config->erratic_testing_mode;
734
735         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
736                    (int)state->child->pid));
737
738         /*
739          * Not making this async. We're writing into an empty unix
740          * domain socket. This should never block.
741          */
742         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
743                            fsp->fh->fd);
744         if (written == -1) {
745                 err = errno;
746
747                 TALLOC_FREE(state->child);
748
749                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
750                 tevent_req_error(req, err);
751                 return tevent_req_post(req, ev);
752         }
753
754         subreq = read_packet_send(state, ev, state->child->sockfd,
755                                   sizeof(struct rw_ret), NULL, NULL);
756         if (tevent_req_nomem(subreq, req)) {
757                 TALLOC_FREE(state->child); /* we sent sth down */
758                 return tevent_req_post(req, ev);
759         }
760         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
761         return req;
762 }
763
764 static void aio_fork_pwrite_done(struct tevent_req *subreq)
765 {
766         struct tevent_req *req = tevent_req_callback_data(
767                 subreq, struct tevent_req);
768         struct aio_fork_pwrite_state *state = tevent_req_data(
769                 req, struct aio_fork_pwrite_state);
770         ssize_t nread;
771         uint8_t *buf;
772         int err;
773         struct rw_ret *retbuf;
774
775         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
776         TALLOC_FREE(subreq);
777         if (nread == -1) {
778                 TALLOC_FREE(state->child);
779                 tevent_req_error(req, err);
780                 return;
781         }
782
783         state->child->busy = false;
784
785         retbuf = (struct rw_ret *)buf;
786         state->ret = retbuf->size;
787         state->err = retbuf->ret_errno;
788         tevent_req_done(req);
789 }
790
791 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err)
792 {
793         struct aio_fork_pwrite_state *state = tevent_req_data(
794                 req, struct aio_fork_pwrite_state);
795
796         if (tevent_req_is_unix_error(req, err)) {
797                 return -1;
798         }
799         if (state->ret == -1) {
800                 *err = state->err;
801         }
802         return state->ret;
803 }
804
805 struct aio_fork_fsync_state {
806         struct aio_child *child;
807         ssize_t ret;
808         int err;
809 };
810
811 static void aio_fork_fsync_done(struct tevent_req *subreq);
812
813 static struct tevent_req *aio_fork_fsync_send(
814         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
815         struct tevent_context *ev, struct files_struct *fsp)
816 {
817         struct tevent_req *req, *subreq;
818         struct aio_fork_fsync_state *state;
819         struct rw_cmd cmd;
820         ssize_t written;
821         int err;
822         struct aio_fork_config *config;
823
824         SMB_VFS_HANDLE_GET_DATA(handle, config,
825                                 struct aio_fork_config,
826                                 return NULL);
827
828         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
829         if (req == NULL) {
830                 return NULL;
831         }
832
833         err = get_idle_child(handle, &state->child);
834         if (err != 0) {
835                 tevent_req_error(req, err);
836                 return tevent_req_post(req, ev);
837         }
838
839         ZERO_STRUCT(cmd);
840         cmd.cmd = FSYNC_CMD;
841         cmd.erratic_testing_mode = config->erratic_testing_mode;
842
843         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
844                    (int)state->child->pid));
845
846         /*
847          * Not making this async. We're writing into an empty unix
848          * domain socket. This should never block.
849          */
850         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
851                            fsp->fh->fd);
852         if (written == -1) {
853                 err = errno;
854
855                 TALLOC_FREE(state->child);
856
857                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
858                 tevent_req_error(req, err);
859                 return tevent_req_post(req, ev);
860         }
861
862         subreq = read_packet_send(state, ev, state->child->sockfd,
863                                   sizeof(struct rw_ret), NULL, NULL);
864         if (tevent_req_nomem(subreq, req)) {
865                 TALLOC_FREE(state->child); /* we sent sth down */
866                 return tevent_req_post(req, ev);
867         }
868         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
869         return req;
870 }
871
872 static void aio_fork_fsync_done(struct tevent_req *subreq)
873 {
874         struct tevent_req *req = tevent_req_callback_data(
875                 subreq, struct tevent_req);
876         struct aio_fork_fsync_state *state = tevent_req_data(
877                 req, struct aio_fork_fsync_state);
878         ssize_t nread;
879         uint8_t *buf;
880         int err;
881         struct rw_ret *retbuf;
882
883         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
884         TALLOC_FREE(subreq);
885         if (nread == -1) {
886                 TALLOC_FREE(state->child);
887                 tevent_req_error(req, err);
888                 return;
889         }
890
891         state->child->busy = false;
892
893         retbuf = (struct rw_ret *)buf;
894         state->ret = retbuf->size;
895         state->err = retbuf->ret_errno;
896         tevent_req_done(req);
897 }
898
899 static int aio_fork_fsync_recv(struct tevent_req *req, int *err)
900 {
901         struct aio_fork_fsync_state *state = tevent_req_data(
902                 req, struct aio_fork_fsync_state);
903
904         if (tevent_req_is_unix_error(req, err)) {
905                 return -1;
906         }
907         if (state->ret == -1) {
908                 *err = state->err;
909         }
910         return state->ret;
911 }
912
913 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
914                             const char *user)
915 {
916         int ret;
917         struct aio_fork_config *config;
918         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
919
920         if (ret < 0) {
921                 return ret;
922         }
923
924         config = talloc_zero(handle->conn, struct aio_fork_config);
925         if (!config) {
926                 SMB_VFS_NEXT_DISCONNECT(handle);
927                 DEBUG(0, ("talloc_zero() failed\n"));
928                 return -1;
929         }
930
931         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
932                                                     "erratic_testing_mode", false);
933         
934         SMB_VFS_HANDLE_SET_DATA(handle, config,
935                                 NULL, struct aio_fork_config,
936                                 return -1);
937
938         /*********************************************************************
939          * How many threads to initialize ?
940          * 100 per process seems insane as a default until you realize that
941          * (a) Threads terminate after 1 second when idle.
942          * (b) Throttling is done in SMB2 via the crediting algorithm.
943          * (c) SMB1 clients are limited to max_mux (50) outstanding
944          *     requests and Windows clients don't use this anyway.
945          * Essentially we want this to be unlimited unless smb.conf
946          * says different.
947          *********************************************************************/
948         aio_pending_size = 100;
949         return 0;
950 }
951
952 static struct vfs_fn_pointers vfs_aio_fork_fns = {
953         .connect_fn = aio_fork_connect,
954         .pread_send_fn = aio_fork_pread_send,
955         .pread_recv_fn = aio_fork_pread_recv,
956         .pwrite_send_fn = aio_fork_pwrite_send,
957         .pwrite_recv_fn = aio_fork_pwrite_recv,
958         .fsync_send_fn = aio_fork_fsync_send,
959         .fsync_recv_fn = aio_fork_fsync_recv,
960 };
961
962 NTSTATUS vfs_aio_fork_init(void);
963 NTSTATUS vfs_aio_fork_init(void)
964 {
965         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
966                                 "aio_fork", &vfs_aio_fork_fns);
967 }