aio_fork: Fix CID 1273291 Uninitialized scalar variable
[amitay/samba.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29 #include "lib/sys_rw.h"
30 #include "lib/sys_rw_data.h"
31 #include "lib/msghdr.h"
32
33 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
34 # error Can not pass file descriptors
35 #endif
36
37 #undef recvmsg
38
39 #ifndef MAP_FILE
40 #define MAP_FILE 0
41 #endif
42
43 struct aio_fork_config {
44         bool erratic_testing_mode;
45 };
46
47 struct mmap_area {
48         size_t size;
49         volatile void *ptr;
50 };
51
52 static int mmap_area_destructor(struct mmap_area *area)
53 {
54         munmap((void *)area->ptr, area->size);
55         return 0;
56 }
57
58 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
59 {
60         struct mmap_area *result;
61         int fd;
62
63         result = talloc(mem_ctx, struct mmap_area);
64         if (result == NULL) {
65                 DEBUG(0, ("talloc failed\n"));
66                 goto fail;
67         }
68
69         fd = open("/dev/zero", O_RDWR);
70         if (fd == -1) {
71                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
72                           strerror(errno)));
73                 goto fail;
74         }
75
76         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
77                            MAP_SHARED|MAP_FILE, fd, 0);
78         close(fd);
79         if (result->ptr == MAP_FAILED) {
80                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
81                 goto fail;
82         }
83
84         result->size = size;
85         talloc_set_destructor(result, mmap_area_destructor);
86
87         return result;
88
89 fail:
90         TALLOC_FREE(result);
91         return NULL;
92 }
93
94 enum cmd_type {
95         READ_CMD,
96         WRITE_CMD,
97         FSYNC_CMD
98 };
99
100 static const char *cmd_type_str(enum cmd_type cmd)
101 {
102         const char *result;
103
104         switch (cmd) {
105         case READ_CMD:
106                 result = "READ";
107                 break;
108         case WRITE_CMD:
109                 result = "WRITE";
110                 break;
111         case FSYNC_CMD:
112                 result = "FSYNC";
113                 break;
114         default:
115                 result = "<UNKNOWN>";
116                 break;
117         }
118         return result;
119 }
120
121 struct rw_cmd {
122         size_t n;
123         off_t offset;
124         enum cmd_type cmd;
125         bool erratic_testing_mode;
126 };
127
128 struct rw_ret {
129         ssize_t size;
130         int ret_errno;
131 };
132
133 struct aio_child_list;
134
135 struct aio_child {
136         struct aio_child *prev, *next;
137         struct aio_child_list *list;
138         pid_t pid;
139         int sockfd;
140         struct mmap_area *map;
141         bool dont_delete;       /* Marked as in use since last cleanup */
142         bool busy;
143 };
144
145 struct aio_child_list {
146         struct aio_child *children;
147         struct tevent_timer *cleanup_event;
148 };
149
150 static void free_aio_children(void **p)
151 {
152         TALLOC_FREE(*p);
153 }
154
155 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
156 {
157         struct msghdr msg;
158         struct iovec iov[1];
159         ssize_t n;
160         size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, 1);
161         uint8_t buf[bufsize];
162
163         msghdr_prep_recv_fds(&msg, buf, bufsize, 1);
164
165         msg.msg_name = NULL;
166         msg.msg_namelen = 0;
167
168         iov[0].iov_base = (void *)ptr;
169         iov[0].iov_len = nbytes;
170         msg.msg_iov = iov;
171         msg.msg_iovlen = 1;
172
173         do {
174                 n = recvmsg(fd, &msg, 0);
175         } while ((n == -1) && (errno == EINTR));
176
177         if (n <= 0) {
178                 return n;
179         }
180
181         {
182                 size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
183                 int fds[num_fds];
184
185                 msghdr_extract_fds(&msg, fds, num_fds);
186
187                 if (num_fds != 1) {
188                         size_t i;
189
190                         for (i=0; i<num_fds; i++) {
191                                 close(fds[i]);
192                         }
193
194                         *recvfd = -1;
195                         return n;
196                 }
197
198                 *recvfd = fds[0];
199         }
200
201         return(n);
202 }
203
204 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
205 {
206         struct msghdr msg = {0};
207         size_t bufsize = msghdr_prep_fds(NULL, NULL, 0, &sendfd, 1);
208         uint8_t buf[bufsize];
209         struct iovec iov;
210         ssize_t sent;
211
212         msghdr_prep_fds(&msg, buf, bufsize, &sendfd, 1);
213
214         iov.iov_base = (void *)ptr;
215         iov.iov_len = nbytes;
216         msg.msg_iov = &iov;
217         msg.msg_iovlen = 1;
218
219         do {
220                 sent = sendmsg(fd, &msg, 0);
221         } while ((sent == -1) && (errno == EINTR));
222
223         return sent;
224 }
225
226 static void aio_child_cleanup(struct tevent_context *event_ctx,
227                               struct tevent_timer *te,
228                               struct timeval now,
229                               void *private_data)
230 {
231         struct aio_child_list *list = talloc_get_type_abort(
232                 private_data, struct aio_child_list);
233         struct aio_child *child, *next;
234
235         TALLOC_FREE(list->cleanup_event);
236
237         for (child = list->children; child != NULL; child = next) {
238                 next = child->next;
239
240                 if (child->busy) {
241                         DEBUG(10, ("child %d currently active\n",
242                                    (int)child->pid));
243                         continue;
244                 }
245
246                 if (child->dont_delete) {
247                         DEBUG(10, ("Child %d was active since last cleanup\n",
248                                    (int)child->pid));
249                         child->dont_delete = false;
250                         continue;
251                 }
252
253                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
254                            "deleting\n", (int)child->pid));
255
256                 TALLOC_FREE(child);
257                 child = next;
258         }
259
260         if (list->children != NULL) {
261                 /*
262                  * Re-schedule the next cleanup round
263                  */
264                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
265                                                       timeval_add(&now, 30, 0),
266                                                       aio_child_cleanup, list);
267
268         }
269 }
270
271 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
272 {
273         struct aio_child_list *data = NULL;
274
275         if (SMB_VFS_HANDLE_TEST_DATA(handle)) {
276                 SMB_VFS_HANDLE_GET_DATA(handle, data, struct aio_child_list,
277                                         return NULL);
278         }
279
280         if (data == NULL) {
281                 data = talloc_zero(NULL, struct aio_child_list);
282                 if (data == NULL) {
283                         return NULL;
284                 }
285         }
286
287         /*
288          * Regardless of whether the child_list had been around or not, make
289          * sure that we have a cleanup timed event. This timed event will
290          * delete itself when it finds that no children are around anymore.
291          */
292
293         if (data->cleanup_event == NULL) {
294                 data->cleanup_event = tevent_add_timer(server_event_context(), data,
295                                                       timeval_current_ofs(30, 0),
296                                                       aio_child_cleanup, data);
297                 if (data->cleanup_event == NULL) {
298                         TALLOC_FREE(data);
299                         return NULL;
300                 }
301         }
302
303         if (!SMB_VFS_HANDLE_TEST_DATA(handle)) {
304                 SMB_VFS_HANDLE_SET_DATA(handle, data, free_aio_children,
305                                         struct aio_child_list, return False);
306         }
307
308         return data;
309 }
310
311 static void aio_child_loop(int sockfd, struct mmap_area *map)
312 {
313         while (true) {
314                 int fd = -1;
315                 ssize_t ret;
316                 struct rw_cmd cmd_struct;
317                 struct rw_ret ret_struct;
318
319                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
320                 if (ret != sizeof(cmd_struct)) {
321                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
322                                    strerror(errno)));
323                         exit(1);
324                 }
325
326                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
327                            cmd_type_str(cmd_struct.cmd),
328                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
329
330                 if (cmd_struct.erratic_testing_mode) {
331                         /*
332                          * For developer testing, we want erratic behaviour for
333                          * async I/O times
334                          */
335                         uint8_t randval;
336                         unsigned msecs;
337                         /*
338                          * use generate_random_buffer, we just forked from a
339                          * common parent state
340                          */
341                         generate_random_buffer(&randval, sizeof(randval));
342                         msecs = randval + 20;
343                         DEBUG(10, ("delaying for %u msecs\n", msecs));
344                         smb_msleep(msecs);
345                 }
346
347                 ZERO_STRUCT(ret_struct);
348
349                 switch (cmd_struct.cmd) {
350                 case READ_CMD:
351                         ret_struct.size = sys_pread(
352                                 fd, (void *)map->ptr, cmd_struct.n,
353                                 cmd_struct.offset);
354 #if 0
355 /* This breaks "make test" when run with aio_fork module. */
356 #ifdef DEVELOPER
357                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
358 #endif
359 #endif
360                         break;
361                 case WRITE_CMD:
362                         ret_struct.size = sys_pwrite(
363                                 fd, (void *)map->ptr, cmd_struct.n,
364                                 cmd_struct.offset);
365                         break;
366                 case FSYNC_CMD:
367                         ret_struct.size = fsync(fd);
368                         break;
369                 default:
370                         ret_struct.size = -1;
371                         errno = EINVAL;
372                 }
373
374                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
375                            (int)ret_struct.size));
376
377                 if (ret_struct.size == -1) {
378                         ret_struct.ret_errno = errno;
379                 }
380
381                 /*
382                  * Close the fd before telling our parent we're done. The
383                  * parent might close and re-open the file very quickly, and
384                  * with system-level share modes (GPFS) we would get an
385                  * unjustified SHARING_VIOLATION.
386                  */
387                 close(fd);
388
389                 ret = write_data(sockfd, (char *)&ret_struct,
390                                  sizeof(ret_struct));
391                 if (ret != sizeof(ret_struct)) {
392                         DEBUG(10, ("could not write ret_struct: %s\n",
393                                    strerror(errno)));
394                         exit(2);
395                 }
396         }
397 }
398
399 static int aio_child_destructor(struct aio_child *child)
400 {
401         char c=0;
402
403         SMB_ASSERT(!child->busy);
404
405         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
406                         child->pid, child->sockfd));
407
408         /*
409          * closing the sockfd makes the child not return from recvmsg() on RHEL
410          * 5.5 so instead force the child to exit by writing bad data to it
411          */
412         write(child->sockfd, &c, sizeof(c));
413         close(child->sockfd);
414         DLIST_REMOVE(child->list->children, child);
415         return 0;
416 }
417
418 /*
419  * We have to close all fd's in open files, we might incorrectly hold a system
420  * level share mode on a file.
421  */
422
423 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
424                                          void *private_data)
425 {
426         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
427                 close(fsp->fh->fd);
428                 fsp->fh->fd = -1;
429         }
430         return NULL;
431 }
432
433 static int create_aio_child(struct smbd_server_connection *sconn,
434                             struct aio_child_list *children,
435                             size_t map_size,
436                             struct aio_child **presult)
437 {
438         struct aio_child *result;
439         int fdpair[2];
440         int ret;
441
442         fdpair[0] = fdpair[1] = -1;
443
444         result = talloc_zero(children, struct aio_child);
445         if (result == NULL) {
446                 return ENOMEM;
447         }
448
449         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
450                 ret = errno;
451                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
452                 goto fail;
453         }
454
455         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
456
457         result->map = mmap_area_init(result, map_size);
458         if (result->map == NULL) {
459                 ret = errno;
460                 DEBUG(0, ("Could not create mmap area\n"));
461                 goto fail;
462         }
463
464         result->pid = fork();
465         if (result->pid == -1) {
466                 ret = errno;
467                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
468                 goto fail;
469         }
470
471         if (result->pid == 0) {
472                 close(fdpair[0]);
473                 result->sockfd = fdpair[1];
474                 files_forall(sconn, close_fsp_fd, NULL);
475                 aio_child_loop(result->sockfd, result->map);
476         }
477
478         DEBUG(10, ("Child %d created with sockfd %d\n",
479                         result->pid, fdpair[0]));
480
481         result->sockfd = fdpair[0];
482         close(fdpair[1]);
483
484         result->list = children;
485         DLIST_ADD(children->children, result);
486
487         talloc_set_destructor(result, aio_child_destructor);
488
489         *presult = result;
490
491         return 0;
492
493  fail:
494         if (fdpair[0] != -1) close(fdpair[0]);
495         if (fdpair[1] != -1) close(fdpair[1]);
496         TALLOC_FREE(result);
497
498         return ret;
499 }
500
501 static int get_idle_child(struct vfs_handle_struct *handle,
502                           struct aio_child **pchild)
503 {
504         struct aio_child_list *children;
505         struct aio_child *child;
506
507         children = init_aio_children(handle);
508         if (children == NULL) {
509                 return ENOMEM;
510         }
511
512         for (child = children->children; child != NULL; child = child->next) {
513                 if (!child->busy) {
514                         break;
515                 }
516         }
517
518         if (child == NULL) {
519                 int ret;
520
521                 DEBUG(10, ("no idle child found, creating new one\n"));
522
523                 ret = create_aio_child(handle->conn->sconn, children,
524                                           128*1024, &child);
525                 if (ret != 0) {
526                         DEBUG(10, ("create_aio_child failed: %s\n",
527                                    strerror(errno)));
528                         return ret;
529                 }
530         }
531
532         child->dont_delete = true;
533         child->busy = true;
534
535         *pchild = child;
536         return 0;
537 }
538
539 struct aio_fork_pread_state {
540         struct aio_child *child;
541         ssize_t ret;
542         int err;
543 };
544
545 static void aio_fork_pread_done(struct tevent_req *subreq);
546
547 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
548                                               TALLOC_CTX *mem_ctx,
549                                               struct tevent_context *ev,
550                                               struct files_struct *fsp,
551                                               void *data,
552                                               size_t n, off_t offset)
553 {
554         struct tevent_req *req, *subreq;
555         struct aio_fork_pread_state *state;
556         struct rw_cmd cmd;
557         ssize_t written;
558         int err;
559         struct aio_fork_config *config;
560
561         SMB_VFS_HANDLE_GET_DATA(handle, config,
562                                 struct aio_fork_config,
563                                 return NULL);
564
565         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
566         if (req == NULL) {
567                 return NULL;
568         }
569
570         if (n > 128*1024) {
571                 /* TODO: support variable buffers */
572                 tevent_req_error(req, EINVAL);
573                 return tevent_req_post(req, ev);
574         }
575
576         err = get_idle_child(handle, &state->child);
577         if (err != 0) {
578                 tevent_req_error(req, err);
579                 return tevent_req_post(req, ev);
580         }
581
582         ZERO_STRUCT(cmd);
583         cmd.n = n;
584         cmd.offset = offset;
585         cmd.cmd = READ_CMD;
586         cmd.erratic_testing_mode = config->erratic_testing_mode;
587
588         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
589                    (int)state->child->pid));
590
591         /*
592          * Not making this async. We're writing into an empty unix
593          * domain socket. This should never block.
594          */
595         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
596                            fsp->fh->fd);
597         if (written == -1) {
598                 err = errno;
599
600                 TALLOC_FREE(state->child);
601
602                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
603                 tevent_req_error(req, err);
604                 return tevent_req_post(req, ev);
605         }
606
607         subreq = read_packet_send(state, ev, state->child->sockfd,
608                                   sizeof(struct rw_ret), NULL, NULL);
609         if (tevent_req_nomem(subreq, req)) {
610                 TALLOC_FREE(state->child); /* we sent sth down */
611                 return tevent_req_post(req, ev);
612         }
613         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
614         return req;
615 }
616
617 static void aio_fork_pread_done(struct tevent_req *subreq)
618 {
619         struct tevent_req *req = tevent_req_callback_data(
620                 subreq, struct tevent_req);
621         struct aio_fork_pread_state *state = tevent_req_data(
622                 req, struct aio_fork_pread_state);
623         ssize_t nread;
624         uint8_t *buf;
625         int err;
626         struct rw_ret *retbuf;
627
628         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
629         TALLOC_FREE(subreq);
630         if (nread == -1) {
631                 TALLOC_FREE(state->child);
632                 tevent_req_error(req, err);
633                 return;
634         }
635
636         state->child->busy = false;
637
638         retbuf = (struct rw_ret *)buf;
639         state->ret = retbuf->size;
640         state->err = retbuf->ret_errno;
641         tevent_req_done(req);
642 }
643
644 static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err)
645 {
646         struct aio_fork_pread_state *state = tevent_req_data(
647                 req, struct aio_fork_pread_state);
648
649         if (tevent_req_is_unix_error(req, err)) {
650                 return -1;
651         }
652         if (state->ret == -1) {
653                 *err = state->err;
654         }
655         return state->ret;
656 }
657
658 struct aio_fork_pwrite_state {
659         struct aio_child *child;
660         ssize_t ret;
661         int err;
662 };
663
664 static void aio_fork_pwrite_done(struct tevent_req *subreq);
665
666 static struct tevent_req *aio_fork_pwrite_send(
667         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
668         struct tevent_context *ev, struct files_struct *fsp,
669         const void *data, size_t n, off_t offset)
670 {
671         struct tevent_req *req, *subreq;
672         struct aio_fork_pwrite_state *state;
673         struct rw_cmd cmd;
674         ssize_t written;
675         int err;
676         struct aio_fork_config *config;
677         SMB_VFS_HANDLE_GET_DATA(handle, config,
678                                 struct aio_fork_config,
679                                 return NULL);
680
681         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
682         if (req == NULL) {
683                 return NULL;
684         }
685
686         if (n > 128*1024) {
687                 /* TODO: support variable buffers */
688                 tevent_req_error(req, EINVAL);
689                 return tevent_req_post(req, ev);
690         }
691
692         err = get_idle_child(handle, &state->child);
693         if (err != 0) {
694                 tevent_req_error(req, err);
695                 return tevent_req_post(req, ev);
696         }
697
698         ZERO_STRUCT(cmd);
699         cmd.n = n;
700         cmd.offset = offset;
701         cmd.cmd = WRITE_CMD;
702         cmd.erratic_testing_mode = config->erratic_testing_mode;
703
704         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
705                    (int)state->child->pid));
706
707         /*
708          * Not making this async. We're writing into an empty unix
709          * domain socket. This should never block.
710          */
711         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
712                            fsp->fh->fd);
713         if (written == -1) {
714                 err = errno;
715
716                 TALLOC_FREE(state->child);
717
718                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
719                 tevent_req_error(req, err);
720                 return tevent_req_post(req, ev);
721         }
722
723         subreq = read_packet_send(state, ev, state->child->sockfd,
724                                   sizeof(struct rw_ret), NULL, NULL);
725         if (tevent_req_nomem(subreq, req)) {
726                 TALLOC_FREE(state->child); /* we sent sth down */
727                 return tevent_req_post(req, ev);
728         }
729         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
730         return req;
731 }
732
733 static void aio_fork_pwrite_done(struct tevent_req *subreq)
734 {
735         struct tevent_req *req = tevent_req_callback_data(
736                 subreq, struct tevent_req);
737         struct aio_fork_pwrite_state *state = tevent_req_data(
738                 req, struct aio_fork_pwrite_state);
739         ssize_t nread;
740         uint8_t *buf;
741         int err;
742         struct rw_ret *retbuf;
743
744         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
745         TALLOC_FREE(subreq);
746         if (nread == -1) {
747                 TALLOC_FREE(state->child);
748                 tevent_req_error(req, err);
749                 return;
750         }
751
752         state->child->busy = false;
753
754         retbuf = (struct rw_ret *)buf;
755         state->ret = retbuf->size;
756         state->err = retbuf->ret_errno;
757         tevent_req_done(req);
758 }
759
760 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err)
761 {
762         struct aio_fork_pwrite_state *state = tevent_req_data(
763                 req, struct aio_fork_pwrite_state);
764
765         if (tevent_req_is_unix_error(req, err)) {
766                 return -1;
767         }
768         if (state->ret == -1) {
769                 *err = state->err;
770         }
771         return state->ret;
772 }
773
774 struct aio_fork_fsync_state {
775         struct aio_child *child;
776         ssize_t ret;
777         int err;
778 };
779
780 static void aio_fork_fsync_done(struct tevent_req *subreq);
781
782 static struct tevent_req *aio_fork_fsync_send(
783         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
784         struct tevent_context *ev, struct files_struct *fsp)
785 {
786         struct tevent_req *req, *subreq;
787         struct aio_fork_fsync_state *state;
788         struct rw_cmd cmd;
789         ssize_t written;
790         int err;
791         struct aio_fork_config *config;
792
793         SMB_VFS_HANDLE_GET_DATA(handle, config,
794                                 struct aio_fork_config,
795                                 return NULL);
796
797         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
798         if (req == NULL) {
799                 return NULL;
800         }
801
802         err = get_idle_child(handle, &state->child);
803         if (err != 0) {
804                 tevent_req_error(req, err);
805                 return tevent_req_post(req, ev);
806         }
807
808         ZERO_STRUCT(cmd);
809         cmd.cmd = FSYNC_CMD;
810         cmd.erratic_testing_mode = config->erratic_testing_mode;
811
812         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
813                    (int)state->child->pid));
814
815         /*
816          * Not making this async. We're writing into an empty unix
817          * domain socket. This should never block.
818          */
819         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
820                            fsp->fh->fd);
821         if (written == -1) {
822                 err = errno;
823
824                 TALLOC_FREE(state->child);
825
826                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
827                 tevent_req_error(req, err);
828                 return tevent_req_post(req, ev);
829         }
830
831         subreq = read_packet_send(state, ev, state->child->sockfd,
832                                   sizeof(struct rw_ret), NULL, NULL);
833         if (tevent_req_nomem(subreq, req)) {
834                 TALLOC_FREE(state->child); /* we sent sth down */
835                 return tevent_req_post(req, ev);
836         }
837         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
838         return req;
839 }
840
841 static void aio_fork_fsync_done(struct tevent_req *subreq)
842 {
843         struct tevent_req *req = tevent_req_callback_data(
844                 subreq, struct tevent_req);
845         struct aio_fork_fsync_state *state = tevent_req_data(
846                 req, struct aio_fork_fsync_state);
847         ssize_t nread;
848         uint8_t *buf;
849         int err;
850         struct rw_ret *retbuf;
851
852         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
853         TALLOC_FREE(subreq);
854         if (nread == -1) {
855                 TALLOC_FREE(state->child);
856                 tevent_req_error(req, err);
857                 return;
858         }
859
860         state->child->busy = false;
861
862         retbuf = (struct rw_ret *)buf;
863         state->ret = retbuf->size;
864         state->err = retbuf->ret_errno;
865         tevent_req_done(req);
866 }
867
868 static int aio_fork_fsync_recv(struct tevent_req *req, int *err)
869 {
870         struct aio_fork_fsync_state *state = tevent_req_data(
871                 req, struct aio_fork_fsync_state);
872
873         if (tevent_req_is_unix_error(req, err)) {
874                 return -1;
875         }
876         if (state->ret == -1) {
877                 *err = state->err;
878         }
879         return state->ret;
880 }
881
882 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
883                             const char *user)
884 {
885         int ret;
886         struct aio_fork_config *config;
887         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
888
889         if (ret < 0) {
890                 return ret;
891         }
892
893         config = talloc_zero(handle->conn, struct aio_fork_config);
894         if (!config) {
895                 SMB_VFS_NEXT_DISCONNECT(handle);
896                 DEBUG(0, ("talloc_zero() failed\n"));
897                 return -1;
898         }
899
900         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
901                                                     "erratic_testing_mode", false);
902         
903         SMB_VFS_HANDLE_SET_DATA(handle, config,
904                                 NULL, struct aio_fork_config,
905                                 return -1);
906
907         /*********************************************************************
908          * How many threads to initialize ?
909          * 100 per process seems insane as a default until you realize that
910          * (a) Threads terminate after 1 second when idle.
911          * (b) Throttling is done in SMB2 via the crediting algorithm.
912          * (c) SMB1 clients are limited to max_mux (50) outstanding
913          *     requests and Windows clients don't use this anyway.
914          * Essentially we want this to be unlimited unless smb.conf
915          * says different.
916          *********************************************************************/
917         aio_pending_size = 100;
918         return 0;
919 }
920
921 static struct vfs_fn_pointers vfs_aio_fork_fns = {
922         .connect_fn = aio_fork_connect,
923         .pread_send_fn = aio_fork_pread_send,
924         .pread_recv_fn = aio_fork_pread_recv,
925         .pwrite_send_fn = aio_fork_pwrite_send,
926         .pwrite_recv_fn = aio_fork_pwrite_recv,
927         .fsync_send_fn = aio_fork_fsync_send,
928         .fsync_recv_fn = aio_fork_fsync_recv,
929 };
930
931 NTSTATUS vfs_aio_fork_init(void);
932 NTSTATUS vfs_aio_fork_init(void)
933 {
934         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
935                                 "aio_fork", &vfs_aio_fork_fns);
936 }