3eaa26774f5b0f364cd3c33d8a450607ac152b9a
[nivanova/samba-autobuild/.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29 #include "lib/util/sys_rw.h"
30 #include "lib/util/sys_rw_data.h"
31 #include "lib/util/msghdr.h"
32 #include "smbprofile.h"
33
34 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
35 # error Can not pass file descriptors
36 #endif
37
38 #undef recvmsg
39
40 #ifndef MAP_FILE
41 #define MAP_FILE 0
42 #endif
43
44 struct aio_child_list;
45
46 struct aio_fork_config {
47         bool erratic_testing_mode;
48         struct aio_child_list *children;
49 };
50
51 struct mmap_area {
52         size_t size;
53         void *ptr;
54 };
55
56 static int mmap_area_destructor(struct mmap_area *area)
57 {
58         munmap(discard_const(area->ptr), area->size);
59         return 0;
60 }
61
62 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
63 {
64         struct mmap_area *result;
65         int fd;
66
67         result = talloc(mem_ctx, struct mmap_area);
68         if (result == NULL) {
69                 DEBUG(0, ("talloc failed\n"));
70                 goto fail;
71         }
72
73         fd = open("/dev/zero", O_RDWR);
74         if (fd == -1) {
75                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
76                           strerror(errno)));
77                 goto fail;
78         }
79
80         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
81                            MAP_SHARED|MAP_FILE, fd, 0);
82         close(fd);
83         if (result->ptr == MAP_FAILED) {
84                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
85                 goto fail;
86         }
87
88         result->size = size;
89         talloc_set_destructor(result, mmap_area_destructor);
90
91         return result;
92
93 fail:
94         TALLOC_FREE(result);
95         return NULL;
96 }
97
98 enum cmd_type {
99         READ_CMD,
100         WRITE_CMD,
101         FSYNC_CMD
102 };
103
104 static const char *cmd_type_str(enum cmd_type cmd)
105 {
106         const char *result;
107
108         switch (cmd) {
109         case READ_CMD:
110                 result = "READ";
111                 break;
112         case WRITE_CMD:
113                 result = "WRITE";
114                 break;
115         case FSYNC_CMD:
116                 result = "FSYNC";
117                 break;
118         default:
119                 result = "<UNKNOWN>";
120                 break;
121         }
122         return result;
123 }
124
125 struct rw_cmd {
126         size_t n;
127         off_t offset;
128         enum cmd_type cmd;
129         bool erratic_testing_mode;
130 };
131
132 struct rw_ret {
133         ssize_t size;
134         int ret_errno;
135         uint64_t duration;
136 };
137
138 struct aio_child_list;
139
140 struct aio_child {
141         struct aio_child *prev, *next;
142         struct aio_child_list *list;
143         pid_t pid;
144         int sockfd;
145         struct mmap_area *map;
146         bool dont_delete;       /* Marked as in use since last cleanup */
147         bool busy;
148 };
149
150 struct aio_child_list {
151         struct aio_child *children;
152         struct tevent_timer *cleanup_event;
153 };
154
155 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
156 {
157         struct iovec iov[1];
158         struct msghdr msg = { .msg_iov = iov, .msg_iovlen = 1 };
159         ssize_t n;
160         size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, 1);
161         uint8_t buf[bufsize];
162
163         msghdr_prep_recv_fds(&msg, buf, bufsize, 1);
164
165         iov[0].iov_base = (void *)ptr;
166         iov[0].iov_len = nbytes;
167
168         do {
169                 n = recvmsg(fd, &msg, 0);
170         } while ((n == -1) && (errno == EINTR));
171
172         if (n <= 0) {
173                 return n;
174         }
175
176         {
177                 size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
178                 int fds[num_fds];
179
180                 msghdr_extract_fds(&msg, fds, num_fds);
181
182                 if (num_fds != 1) {
183                         size_t i;
184
185                         for (i=0; i<num_fds; i++) {
186                                 close(fds[i]);
187                         }
188
189                         *recvfd = -1;
190                         return n;
191                 }
192
193                 *recvfd = fds[0];
194         }
195
196         return(n);
197 }
198
199 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
200 {
201         struct msghdr msg = {0};
202         size_t bufsize = msghdr_prep_fds(NULL, NULL, 0, &sendfd, 1);
203         uint8_t buf[bufsize];
204         struct iovec iov;
205         ssize_t sent;
206
207         msghdr_prep_fds(&msg, buf, bufsize, &sendfd, 1);
208
209         iov.iov_base = (void *)ptr;
210         iov.iov_len = nbytes;
211         msg.msg_iov = &iov;
212         msg.msg_iovlen = 1;
213
214         do {
215                 sent = sendmsg(fd, &msg, 0);
216         } while ((sent == -1) && (errno == EINTR));
217
218         return sent;
219 }
220
221 static void aio_child_cleanup(struct tevent_context *event_ctx,
222                               struct tevent_timer *te,
223                               struct timeval now,
224                               void *private_data)
225 {
226         struct aio_child_list *list = talloc_get_type_abort(
227                 private_data, struct aio_child_list);
228         struct aio_child *child, *next;
229
230         TALLOC_FREE(list->cleanup_event);
231
232         for (child = list->children; child != NULL; child = next) {
233                 next = child->next;
234
235                 if (child->busy) {
236                         DEBUG(10, ("child %d currently active\n",
237                                    (int)child->pid));
238                         continue;
239                 }
240
241                 if (child->dont_delete) {
242                         DEBUG(10, ("Child %d was active since last cleanup\n",
243                                    (int)child->pid));
244                         child->dont_delete = false;
245                         continue;
246                 }
247
248                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
249                            "deleting\n", (int)child->pid));
250
251                 TALLOC_FREE(child);
252                 child = next;
253         }
254
255         if (list->children != NULL) {
256                 /*
257                  * Re-schedule the next cleanup round
258                  */
259                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
260                                                       timeval_add(&now, 30, 0),
261                                                       aio_child_cleanup, list);
262
263         }
264 }
265
266 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
267 {
268         struct aio_fork_config *config;
269         struct aio_child_list *children;
270
271         SMB_VFS_HANDLE_GET_DATA(handle, config, struct aio_fork_config,
272                                 return NULL);
273
274         if (config->children == NULL) {
275                 config->children = talloc_zero(config, struct aio_child_list);
276                 if (config->children == NULL) {
277                         return NULL;
278                 }
279         }
280         children = config->children;
281
282         /*
283          * Regardless of whether the child_list had been around or not, make
284          * sure that we have a cleanup timed event. This timed event will
285          * delete itself when it finds that no children are around anymore.
286          */
287
288         if (children->cleanup_event == NULL) {
289                 children->cleanup_event =
290                         tevent_add_timer(server_event_context(), children,
291                                          timeval_current_ofs(30, 0),
292                                          aio_child_cleanup, children);
293                 if (children->cleanup_event == NULL) {
294                         TALLOC_FREE(config->children);
295                         return NULL;
296                 }
297         }
298
299         return children;
300 }
301
302 static void aio_child_loop(int sockfd, struct mmap_area *map)
303 {
304         while (true) {
305                 int fd = -1;
306                 ssize_t ret;
307                 struct rw_cmd cmd_struct;
308                 struct rw_ret ret_struct;
309                 struct timespec start, end;
310
311                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
312                 if (ret != sizeof(cmd_struct)) {
313                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
314                                    strerror(errno)));
315                         exit(1);
316                 }
317
318                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
319                            cmd_type_str(cmd_struct.cmd),
320                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
321
322                 if (cmd_struct.erratic_testing_mode) {
323                         /*
324                          * For developer testing, we want erratic behaviour for
325                          * async I/O times
326                          */
327                         uint8_t randval;
328                         unsigned msecs;
329                         /*
330                          * use generate_random_buffer, we just forked from a
331                          * common parent state
332                          */
333                         generate_random_buffer(&randval, sizeof(randval));
334                         msecs = randval + 20;
335                         DEBUG(10, ("delaying for %u msecs\n", msecs));
336                         smb_msleep(msecs);
337                 }
338
339                 ZERO_STRUCT(ret_struct);
340
341                 PROFILE_TIMESTAMP(&start);
342
343                 switch (cmd_struct.cmd) {
344                 case READ_CMD:
345                         ret_struct.size = sys_pread(
346                                 fd, discard_const(map->ptr), cmd_struct.n,
347                                 cmd_struct.offset);
348 #if 0
349 /* This breaks "make test" when run with aio_fork module. */
350 #ifdef DEVELOPER
351                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
352 #endif
353 #endif
354                         break;
355                 case WRITE_CMD:
356                         ret_struct.size = sys_pwrite(
357                                 fd, discard_const(map->ptr), cmd_struct.n,
358                                 cmd_struct.offset);
359                         break;
360                 case FSYNC_CMD:
361                         ret_struct.size = fsync(fd);
362                         break;
363                 default:
364                         ret_struct.size = -1;
365                         errno = EINVAL;
366                 }
367
368                 PROFILE_TIMESTAMP(&end);
369                 ret_struct.duration = nsec_time_diff(&end, &start);
370                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
371                            (int)ret_struct.size));
372
373                 if (ret_struct.size == -1) {
374                         ret_struct.ret_errno = errno;
375                 }
376
377                 /*
378                  * Close the fd before telling our parent we're done. The
379                  * parent might close and re-open the file very quickly, and
380                  * with system-level share modes (GPFS) we would get an
381                  * unjustified SHARING_VIOLATION.
382                  */
383                 close(fd);
384
385                 ret = write_data(sockfd, (char *)&ret_struct,
386                                  sizeof(ret_struct));
387                 if (ret != sizeof(ret_struct)) {
388                         DEBUG(10, ("could not write ret_struct: %s\n",
389                                    strerror(errno)));
390                         exit(2);
391                 }
392         }
393 }
394
395 static int aio_child_destructor(struct aio_child *child)
396 {
397         char c=0;
398
399         SMB_ASSERT(!child->busy);
400
401         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
402                    (int)child->pid, child->sockfd));
403
404         /*
405          * closing the sockfd makes the child not return from recvmsg() on RHEL
406          * 5.5 so instead force the child to exit by writing bad data to it
407          */
408         sys_write_v(child->sockfd, &c, sizeof(c));
409         close(child->sockfd);
410         DLIST_REMOVE(child->list->children, child);
411         return 0;
412 }
413
414 /*
415  * We have to close all fd's in open files, we might incorrectly hold a system
416  * level share mode on a file.
417  */
418
419 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
420                                          void *private_data)
421 {
422         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
423                 close(fsp->fh->fd);
424                 fsp->fh->fd = -1;
425         }
426         return NULL;
427 }
428
429 static int create_aio_child(struct smbd_server_connection *sconn,
430                             struct aio_child_list *children,
431                             size_t map_size,
432                             struct aio_child **presult)
433 {
434         struct aio_child *result;
435         int fdpair[2];
436         int ret;
437
438         fdpair[0] = fdpair[1] = -1;
439
440         result = talloc_zero(children, struct aio_child);
441         if (result == NULL) {
442                 return ENOMEM;
443         }
444
445         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
446                 ret = errno;
447                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
448                 goto fail;
449         }
450
451         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
452
453         result->map = mmap_area_init(result, map_size);
454         if (result->map == NULL) {
455                 ret = errno;
456                 DEBUG(0, ("Could not create mmap area\n"));
457                 goto fail;
458         }
459
460         result->pid = fork();
461         if (result->pid == -1) {
462                 ret = errno;
463                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
464                 goto fail;
465         }
466
467         if (result->pid == 0) {
468                 close(fdpair[0]);
469                 result->sockfd = fdpair[1];
470                 files_forall(sconn, close_fsp_fd, NULL);
471                 aio_child_loop(result->sockfd, result->map);
472         }
473
474         DEBUG(10, ("Child %d created with sockfd %d\n",
475                    (int)result->pid, fdpair[0]));
476
477         result->sockfd = fdpair[0];
478         close(fdpair[1]);
479
480         result->list = children;
481         DLIST_ADD(children->children, result);
482
483         talloc_set_destructor(result, aio_child_destructor);
484
485         *presult = result;
486
487         return 0;
488
489  fail:
490         if (fdpair[0] != -1) close(fdpair[0]);
491         if (fdpair[1] != -1) close(fdpair[1]);
492         TALLOC_FREE(result);
493
494         return ret;
495 }
496
497 static int get_idle_child(struct vfs_handle_struct *handle,
498                           struct aio_child **pchild)
499 {
500         struct aio_child_list *children;
501         struct aio_child *child;
502
503         children = init_aio_children(handle);
504         if (children == NULL) {
505                 return ENOMEM;
506         }
507
508         for (child = children->children; child != NULL; child = child->next) {
509                 if (!child->busy) {
510                         break;
511                 }
512         }
513
514         if (child == NULL) {
515                 int ret;
516
517                 DEBUG(10, ("no idle child found, creating new one\n"));
518
519                 ret = create_aio_child(handle->conn->sconn, children,
520                                           128*1024, &child);
521                 if (ret != 0) {
522                         DEBUG(10, ("create_aio_child failed: %s\n",
523                                    strerror(errno)));
524                         return ret;
525                 }
526         }
527
528         child->dont_delete = true;
529         child->busy = true;
530
531         *pchild = child;
532         return 0;
533 }
534
535 struct aio_fork_pread_state {
536         struct aio_child *child;
537         ssize_t ret;
538         struct vfs_aio_state vfs_aio_state;
539 };
540
541 static void aio_fork_pread_done(struct tevent_req *subreq);
542
543 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
544                                               TALLOC_CTX *mem_ctx,
545                                               struct tevent_context *ev,
546                                               struct files_struct *fsp,
547                                               void *data,
548                                               size_t n, off_t offset)
549 {
550         struct tevent_req *req, *subreq;
551         struct aio_fork_pread_state *state;
552         struct rw_cmd cmd;
553         ssize_t written;
554         int err;
555         struct aio_fork_config *config;
556
557         SMB_VFS_HANDLE_GET_DATA(handle, config,
558                                 struct aio_fork_config,
559                                 return NULL);
560
561         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
562         if (req == NULL) {
563                 return NULL;
564         }
565
566         if (n > 128*1024) {
567                 /* TODO: support variable buffers */
568                 tevent_req_error(req, EINVAL);
569                 return tevent_req_post(req, ev);
570         }
571
572         err = get_idle_child(handle, &state->child);
573         if (err != 0) {
574                 tevent_req_error(req, err);
575                 return tevent_req_post(req, ev);
576         }
577
578         ZERO_STRUCT(cmd);
579         cmd.n = n;
580         cmd.offset = offset;
581         cmd.cmd = READ_CMD;
582         cmd.erratic_testing_mode = config->erratic_testing_mode;
583
584         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
585                    (int)state->child->pid));
586
587         /*
588          * Not making this async. We're writing into an empty unix
589          * domain socket. This should never block.
590          */
591         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
592                            fsp->fh->fd);
593         if (written == -1) {
594                 err = errno;
595
596                 TALLOC_FREE(state->child);
597
598                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
599                 tevent_req_error(req, err);
600                 return tevent_req_post(req, ev);
601         }
602
603         subreq = read_packet_send(state, ev, state->child->sockfd,
604                                   sizeof(struct rw_ret), NULL, NULL);
605         if (tevent_req_nomem(subreq, req)) {
606                 TALLOC_FREE(state->child); /* we sent sth down */
607                 return tevent_req_post(req, ev);
608         }
609         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
610         return req;
611 }
612
613 static void aio_fork_pread_done(struct tevent_req *subreq)
614 {
615         struct tevent_req *req = tevent_req_callback_data(
616                 subreq, struct tevent_req);
617         struct aio_fork_pread_state *state = tevent_req_data(
618                 req, struct aio_fork_pread_state);
619         ssize_t nread;
620         uint8_t *buf;
621         int err;
622         struct rw_ret *retbuf;
623
624         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
625         TALLOC_FREE(subreq);
626         if (nread == -1) {
627                 TALLOC_FREE(state->child);
628                 tevent_req_error(req, err);
629                 return;
630         }
631
632         state->child->busy = false;
633
634         retbuf = (struct rw_ret *)buf;
635         state->ret = retbuf->size;
636         state->vfs_aio_state.error = retbuf->ret_errno;
637         state->vfs_aio_state.duration = retbuf->duration;
638         tevent_req_done(req);
639 }
640
641 static ssize_t aio_fork_pread_recv(struct tevent_req *req,
642                                    struct vfs_aio_state *vfs_aio_state)
643 {
644         struct aio_fork_pread_state *state = tevent_req_data(
645                 req, struct aio_fork_pread_state);
646
647         if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
648                 return -1;
649         }
650         *vfs_aio_state = state->vfs_aio_state;
651         return state->ret;
652 }
653
654 struct aio_fork_pwrite_state {
655         struct aio_child *child;
656         ssize_t ret;
657         struct vfs_aio_state vfs_aio_state;
658 };
659
660 static void aio_fork_pwrite_done(struct tevent_req *subreq);
661
662 static struct tevent_req *aio_fork_pwrite_send(
663         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
664         struct tevent_context *ev, struct files_struct *fsp,
665         const void *data, size_t n, off_t offset)
666 {
667         struct tevent_req *req, *subreq;
668         struct aio_fork_pwrite_state *state;
669         struct rw_cmd cmd;
670         ssize_t written;
671         int err;
672         struct aio_fork_config *config;
673         SMB_VFS_HANDLE_GET_DATA(handle, config,
674                                 struct aio_fork_config,
675                                 return NULL);
676
677         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
678         if (req == NULL) {
679                 return NULL;
680         }
681
682         if (n > 128*1024) {
683                 /* TODO: support variable buffers */
684                 tevent_req_error(req, EINVAL);
685                 return tevent_req_post(req, ev);
686         }
687
688         err = get_idle_child(handle, &state->child);
689         if (err != 0) {
690                 tevent_req_error(req, err);
691                 return tevent_req_post(req, ev);
692         }
693
694         ZERO_STRUCT(cmd);
695         cmd.n = n;
696         cmd.offset = offset;
697         cmd.cmd = WRITE_CMD;
698         cmd.erratic_testing_mode = config->erratic_testing_mode;
699
700         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
701                    (int)state->child->pid));
702
703         /*
704          * Not making this async. We're writing into an empty unix
705          * domain socket. This should never block.
706          */
707         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
708                            fsp->fh->fd);
709         if (written == -1) {
710                 err = errno;
711
712                 TALLOC_FREE(state->child);
713
714                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
715                 tevent_req_error(req, err);
716                 return tevent_req_post(req, ev);
717         }
718
719         subreq = read_packet_send(state, ev, state->child->sockfd,
720                                   sizeof(struct rw_ret), NULL, NULL);
721         if (tevent_req_nomem(subreq, req)) {
722                 TALLOC_FREE(state->child); /* we sent sth down */
723                 return tevent_req_post(req, ev);
724         }
725         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
726         return req;
727 }
728
729 static void aio_fork_pwrite_done(struct tevent_req *subreq)
730 {
731         struct tevent_req *req = tevent_req_callback_data(
732                 subreq, struct tevent_req);
733         struct aio_fork_pwrite_state *state = tevent_req_data(
734                 req, struct aio_fork_pwrite_state);
735         ssize_t nread;
736         uint8_t *buf;
737         int err;
738         struct rw_ret *retbuf;
739
740         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
741         TALLOC_FREE(subreq);
742         if (nread == -1) {
743                 TALLOC_FREE(state->child);
744                 tevent_req_error(req, err);
745                 return;
746         }
747
748         state->child->busy = false;
749
750         retbuf = (struct rw_ret *)buf;
751         state->ret = retbuf->size;
752         state->vfs_aio_state.error = retbuf->ret_errno;
753         state->vfs_aio_state.duration = retbuf->duration;
754         tevent_req_done(req);
755 }
756
757 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req,
758                                     struct vfs_aio_state *vfs_aio_state)
759 {
760         struct aio_fork_pwrite_state *state = tevent_req_data(
761                 req, struct aio_fork_pwrite_state);
762
763         if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
764                 return -1;
765         }
766         *vfs_aio_state = state->vfs_aio_state;
767         return state->ret;
768 }
769
770 struct aio_fork_fsync_state {
771         struct aio_child *child;
772         ssize_t ret;
773         struct vfs_aio_state vfs_aio_state;
774 };
775
776 static void aio_fork_fsync_done(struct tevent_req *subreq);
777
778 static struct tevent_req *aio_fork_fsync_send(
779         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
780         struct tevent_context *ev, struct files_struct *fsp)
781 {
782         struct tevent_req *req, *subreq;
783         struct aio_fork_fsync_state *state;
784         struct rw_cmd cmd;
785         ssize_t written;
786         int err;
787         struct aio_fork_config *config;
788
789         SMB_VFS_HANDLE_GET_DATA(handle, config,
790                                 struct aio_fork_config,
791                                 return NULL);
792
793         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
794         if (req == NULL) {
795                 return NULL;
796         }
797
798         err = get_idle_child(handle, &state->child);
799         if (err != 0) {
800                 tevent_req_error(req, err);
801                 return tevent_req_post(req, ev);
802         }
803
804         ZERO_STRUCT(cmd);
805         cmd.cmd = FSYNC_CMD;
806         cmd.erratic_testing_mode = config->erratic_testing_mode;
807
808         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
809                    (int)state->child->pid));
810
811         /*
812          * Not making this async. We're writing into an empty unix
813          * domain socket. This should never block.
814          */
815         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
816                            fsp->fh->fd);
817         if (written == -1) {
818                 err = errno;
819
820                 TALLOC_FREE(state->child);
821
822                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
823                 tevent_req_error(req, err);
824                 return tevent_req_post(req, ev);
825         }
826
827         subreq = read_packet_send(state, ev, state->child->sockfd,
828                                   sizeof(struct rw_ret), NULL, NULL);
829         if (tevent_req_nomem(subreq, req)) {
830                 TALLOC_FREE(state->child); /* we sent sth down */
831                 return tevent_req_post(req, ev);
832         }
833         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
834         return req;
835 }
836
837 static void aio_fork_fsync_done(struct tevent_req *subreq)
838 {
839         struct tevent_req *req = tevent_req_callback_data(
840                 subreq, struct tevent_req);
841         struct aio_fork_fsync_state *state = tevent_req_data(
842                 req, struct aio_fork_fsync_state);
843         ssize_t nread;
844         uint8_t *buf;
845         int err;
846         struct rw_ret *retbuf;
847
848         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
849         TALLOC_FREE(subreq);
850         if (nread == -1) {
851                 TALLOC_FREE(state->child);
852                 tevent_req_error(req, err);
853                 return;
854         }
855
856         state->child->busy = false;
857
858         retbuf = (struct rw_ret *)buf;
859         state->ret = retbuf->size;
860         state->vfs_aio_state.error = retbuf->ret_errno;
861         state->vfs_aio_state.duration = retbuf->duration;
862         tevent_req_done(req);
863 }
864
865 static int aio_fork_fsync_recv(struct tevent_req *req,
866                                struct vfs_aio_state *vfs_aio_state)
867 {
868         struct aio_fork_fsync_state *state = tevent_req_data(
869                 req, struct aio_fork_fsync_state);
870
871         if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
872                 return -1;
873         }
874         *vfs_aio_state = state->vfs_aio_state;
875         return state->ret;
876 }
877
878 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
879                             const char *user)
880 {
881         int ret;
882         struct aio_fork_config *config;
883         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
884
885         if (ret < 0) {
886                 return ret;
887         }
888
889         config = talloc_zero(handle->conn, struct aio_fork_config);
890         if (!config) {
891                 SMB_VFS_NEXT_DISCONNECT(handle);
892                 DEBUG(0, ("talloc_zero() failed\n"));
893                 return -1;
894         }
895
896         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
897                                                     "erratic_testing_mode", false);
898         
899         SMB_VFS_HANDLE_SET_DATA(handle, config,
900                                 NULL, struct aio_fork_config,
901                                 return -1);
902
903         return 0;
904 }
905
906 static struct vfs_fn_pointers vfs_aio_fork_fns = {
907         .connect_fn = aio_fork_connect,
908         .pread_send_fn = aio_fork_pread_send,
909         .pread_recv_fn = aio_fork_pread_recv,
910         .pwrite_send_fn = aio_fork_pwrite_send,
911         .pwrite_recv_fn = aio_fork_pwrite_recv,
912         .fsync_send_fn = aio_fork_fsync_send,
913         .fsync_recv_fn = aio_fork_fsync_recv,
914 };
915
916 NTSTATUS vfs_aio_fork_init(TALLOC_CTX *);
917 NTSTATUS vfs_aio_fork_init(TALLOC_CTX *ctx)
918 {
919         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
920                                 "aio_fork", &vfs_aio_fork_fns);
921 }