Attempt to fix the merged build
[jra/samba/.git] / lib / tevent / tevent_aio.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    main select loop and event handling - aio/epoll hybrid implementation
5
6    Copyright (C) Andrew Tridgell        2006
7
8    based on events_standard.c
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 3 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23 /*
24   this is a very strange beast. The Linux AIO implementation doesn't
25   yet integrate properly with epoll, but there is a kernel patch that
26   allows the aio wait primitives to be used to wait for epoll events,
27   and this can be used to give us a unified event system incorporating
28   both aio events and epoll events
29
30   this is _very_ experimental code
31 */
32
33 #include "replace.h"
34 #include "system/filesys.h"
35 #include "system/select.h"
36 #include "tevent.h"
37 #include "tevent_internal.h"
38 #include "tevent_util.h"
39 #include <libaio.h>
40
41 #define MAX_AIO_QUEUE_DEPTH     100
42 #ifndef IOCB_CMD_EPOLL_WAIT
43 #define IOCB_CMD_EPOLL_WAIT     9
44 #endif
45
46 struct aio_event_context {
47         /* a pointer back to the generic event_context */
48         struct tevent_context *ev;
49
50         uint32_t destruction_count;
51
52         io_context_t ioctx;
53
54         struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
55
56         struct iocb *epoll_iocb;
57
58         int epoll_fd;
59         int is_epoll_set;
60         pid_t pid;
61 };
62
63 struct tevent_aio {
64         struct tevent_context *event_ctx;
65         struct iocb iocb;
66         void *private_data;
67         tevent_aio_handler_t handler;
68 };
69
70 /*
71   map from EVENT_FD_* to EPOLLIN/EPOLLOUT
72 */
73 static uint32_t epoll_map_flags(uint16_t flags)
74 {
75         uint32_t ret = 0;
76         if (flags & TEVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
77         if (flags & TEVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
78         return ret;
79 }
80
81 /*
82  free the epoll fd
83 */
84 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
85 {
86         io_queue_release(aio_ev->ioctx);
87         close(aio_ev->epoll_fd);
88         aio_ev->epoll_fd = -1;
89         return 0;
90 }
91
92 static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde);
93
94 /*
95   reopen the epoll handle when our pid changes
96   see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an 
97   demonstration of why this is needed
98  */
99 static void epoll_check_reopen(struct aio_event_context *aio_ev)
100 {
101         struct tevent_fd *fde;
102
103         if (aio_ev->pid == getpid()) {
104                 return;
105         }
106
107         close(aio_ev->epoll_fd);
108         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
109         if (aio_ev->epoll_fd == -1) {
110                 tevent_debug(aio_ev->ev, TEVENT_DEBUG_FATAL,
111                              "Failed to recreate epoll handle after fork\n");
112                 return;
113         }
114         aio_ev->pid = getpid();
115         for (fde=aio_ev->ev->fd_events;fde;fde=fde->next) {
116                 epoll_add_event(aio_ev, fde);
117         }
118 }
119
120 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT      (1<<0)
121 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR   (1<<1)
122 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR      (1<<2)
123
124 /*
125  add the epoll event to the given fd_event
126 */
127 static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
128 {
129         struct epoll_event event;
130         if (aio_ev->epoll_fd == -1) return;
131
132         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
133
134         /* if we don't want events yet, don't add an aio_event */
135         if (fde->flags == 0) return;
136
137         memset(&event, 0, sizeof(event));
138         event.events = epoll_map_flags(fde->flags);
139         event.data.ptr = fde;
140         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
141         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
142
143         /* only if we want to read we want to tell the event handler about errors */
144         if (fde->flags & TEVENT_FD_READ) {
145                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
146         }
147 }
148
149 /*
150  delete the epoll event for given fd_event
151 */
152 static void epoll_del_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
153 {
154         struct epoll_event event;
155
156         if (aio_ev->epoll_fd == -1) return;
157
158         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
159
160         /* if there's no aio_event, we don't need to delete it */
161         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
162
163         ZERO_STRUCT(event);
164         event.events = epoll_map_flags(fde->flags);
165         event.data.ptr = fde;
166         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
167
168         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
169 }
170
171 /*
172  change the epoll event to the given fd_event
173 */
174 static void epoll_mod_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
175 {
176         struct epoll_event event;
177         if (aio_ev->epoll_fd == -1) return;
178
179         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
180
181         ZERO_STRUCT(event);
182         event.events = epoll_map_flags(fde->flags);
183         event.data.ptr = fde;
184         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
185
186         /* only if we want to read we want to tell the event handler about errors */
187         if (fde->flags & TEVENT_FD_READ) {
188                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
189         }
190 }
191
192 static void epoll_change_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
193 {
194         bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
195         bool want_read = (fde->flags & TEVENT_FD_READ);
196         bool want_write= (fde->flags & TEVENT_FD_WRITE);
197
198         if (aio_ev->epoll_fd == -1) return;
199
200         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
201
202         /* there's already an event */
203         if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
204                 if (want_read || (want_write && !got_error)) {
205                         epoll_mod_event(aio_ev, fde);
206                         return;
207                 }
208                 epoll_del_event(aio_ev, fde);
209                 return;
210         }
211
212         /* there's no aio_event attached to the fde */
213         if (want_read || (want_write && !got_error)) {
214                 epoll_add_event(aio_ev, fde);
215                 return;
216         }
217 }
218
219 static int setup_epoll_wait(struct aio_event_context *aio_ev)
220 {
221         if (aio_ev->is_epoll_set) {
222                 return 0;
223         }
224         memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
225         aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
226         aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
227         aio_ev->epoll_iocb->aio_reqprio = 0;
228
229         aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
230         aio_ev->epoll_iocb->u.c.offset = -1;
231         aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
232
233         if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
234                 return -1;
235         }
236         aio_ev->is_epoll_set = 1;
237
238         return 0;
239 }
240
241
242 /*
243   event loop handling using aio/epoll hybrid
244 */
245 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
246 {
247         int ret, i;
248         uint32_t destruction_count = ++aio_ev->destruction_count;
249         struct timespec timeout;
250         struct io_event events[8];
251
252         if (aio_ev->epoll_fd == -1) return -1;
253
254         if (aio_ev->ev->signal_events &&
255             tevent_common_check_signal(aio_ev->ev)) {
256                 return 0;
257         }
258
259         if (tvalp) {
260                 timeout.tv_sec = tvalp->tv_sec;
261                 timeout.tv_nsec = tvalp->tv_usec;
262                 timeout.tv_nsec *= 1000;
263         }
264
265         if (setup_epoll_wait(aio_ev) < 0) 
266                 return -1;
267
268         ret = io_getevents(aio_ev->ioctx, 1, 8,
269                            events, tvalp?&timeout:NULL);
270
271         if (ret == -EINTR) {
272                 if (aio_ev->ev->signal_events) {
273                         tevent_common_check_signal(aio_ev->ev);
274                 }
275                 return 0;
276         }
277
278         if (ret == 0 && tvalp) {
279                 /* we don't care about a possible delay here */
280                 tevent_common_loop_timer_delay(aio_ev->ev);
281                 return 0;
282         }
283
284         for (i=0;i<ret;i++) {
285                 struct io_event *event = &events[i];
286                 struct iocb *finished = event->obj;
287
288                 switch (finished->aio_lio_opcode) {
289                 case IO_CMD_PWRITE:
290                 case IO_CMD_PREAD: {
291                         struct tevent_aio *ae = talloc_get_type(finished->data, 
292                                                                struct tevent_aio);
293                         if (ae) {
294                                 talloc_set_destructor(ae, NULL);
295                                 ae->handler(ae->event_ctx, ae, 
296                                             event->res, ae->private_data);
297                                 talloc_free(ae);
298                         }
299                         break;
300                 }
301                 case IOCB_CMD_EPOLL_WAIT: {
302                         struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
303                         struct tevent_fd *fde;
304                         uint16_t flags = 0;
305                         int j;
306
307                         aio_ev->is_epoll_set = 0;
308
309                         for (j=0; j<event->res; j++, ep++) {
310                                 fde = talloc_get_type(ep->data.ptr, 
311                                                       struct tevent_fd);
312                                 if (fde == NULL) {
313                                         return -1;
314                                 }
315                                 if (ep->events & (EPOLLHUP|EPOLLERR)) {
316                                         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
317                                         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
318                                                 epoll_del_event(aio_ev, fde);
319                                                 continue;
320                                         }
321                                         flags |= TEVENT_FD_READ;
322                                 }
323                                 if (ep->events & EPOLLIN) flags |= TEVENT_FD_READ;
324                                 if (ep->events & EPOLLOUT) flags |= TEVENT_FD_WRITE;
325                                 if (flags) {
326                                         fde->handler(aio_ev->ev, fde, flags, fde->private_data);
327                                 }
328                         }
329                         break;
330                 }
331                 }
332                 if (destruction_count != aio_ev->destruction_count) {
333                         return 0;
334                 }
335         }
336
337         return 0;
338 }
339
340 /*
341   create a aio_event_context structure.
342 */
343 static int aio_event_context_init(struct tevent_context *ev)
344 {
345         struct aio_event_context *aio_ev;
346         
347         aio_ev = talloc_zero(ev, struct aio_event_context);
348         if (!aio_ev) return -1;
349
350         aio_ev->ev = ev;
351         aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
352
353         if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
354                 talloc_free(aio_ev);
355                 return -1;
356         }
357
358         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
359         if (aio_ev->epoll_fd == -1) {
360                 talloc_free(aio_ev);
361                 return -1;
362         }
363         aio_ev->pid = getpid();
364
365         talloc_set_destructor(aio_ev, aio_ctx_destructor);
366
367         ev->additional_data = aio_ev;
368
369         if (setup_epoll_wait(aio_ev) < 0) {
370                 talloc_free(aio_ev);
371                 return -1;
372         }
373
374         return 0;
375 }
376
377 /*
378   destroy an fd_event
379 */
380 static int aio_event_fd_destructor(struct tevent_fd *fde)
381 {
382         struct tevent_context *ev = fde->event_ctx;
383         struct aio_event_context *aio_ev = NULL;
384
385         if (ev) {
386                 aio_ev = talloc_get_type(ev->additional_data,
387                                          struct aio_event_context);
388
389                 epoll_check_reopen(aio_ev);
390
391                 aio_ev->destruction_count++;
392
393                 epoll_del_event(aio_ev, fde);
394         }
395
396         return tevent_common_fd_destructor(fde);
397 }
398
399 /*
400   add a fd based event
401   return NULL on failure (memory allocation error)
402 */
403 static struct tevent_fd *aio_event_add_fd(struct tevent_context *ev, TALLOC_CTX *mem_ctx,
404                                           int fd, uint16_t flags,
405                                           tevent_fd_handler_t handler,
406                                           void *private_data,
407                                           const char *handler_name,
408                                           const char *location)
409 {
410         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
411                                                            struct aio_event_context);
412         struct tevent_fd *fde;
413
414         epoll_check_reopen(aio_ev);
415
416         fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
417                                    handler, private_data,
418                                    handler_name, location);
419         if (!fde) return NULL;
420
421         talloc_set_destructor(fde, aio_event_fd_destructor);
422
423         epoll_add_event(aio_ev, fde);
424
425         return fde;
426 }
427
428 /*
429   set the fd event flags
430 */
431 static void aio_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
432 {
433         struct tevent_context *ev;
434         struct aio_event_context *aio_ev;
435
436         if (fde->flags == flags) return;
437
438         ev = fde->event_ctx;
439         aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
440
441         fde->flags = flags;
442
443         epoll_check_reopen(aio_ev);
444
445         epoll_change_event(aio_ev, fde);
446 }
447
448 /*
449   do a single event loop using the events defined in ev 
450 */
451 static int aio_event_loop_once(struct tevent_context *ev)
452 {
453         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
454                                                            struct aio_event_context);
455         struct timeval tval;
456
457         tval = tevent_common_loop_timer_delay(ev);
458         if (ev_timeval_is_zero(&tval)) {
459                 return 0;
460         }
461
462         epoll_check_reopen(aio_ev);
463
464         return aio_event_loop(aio_ev, &tval);
465 }
466
467 /*
468   return on failure or (with 0) if all fd events are removed
469 */
470 static int aio_event_loop_wait(struct tevent_context *ev)
471 {
472         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
473                                                            struct aio_event_context);
474         while (aio_ev->ev->fd_events) {
475                 if (aio_event_loop_once(ev) != 0) {
476                         break;
477                 }
478         }
479
480         return 0;
481 }
482
483 /*
484   called when a disk IO event needs to be cancelled
485 */
486 static int aio_destructor(struct tevent_aio *ae)
487 {
488         struct tevent_context *ev = ae->event_ctx;
489         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
490                                                            struct aio_event_context);
491         struct io_event result;
492         io_cancel(aio_ev->ioctx, &ae->iocb, &result);
493         /* TODO: handle errors from io_cancel()! */
494         return 0;
495 }
496
497 /* submit an aio disk IO event */
498 static struct tevent_aio *aio_event_add_aio(struct tevent_context *ev, 
499                                             TALLOC_CTX *mem_ctx,
500                                             struct iocb *iocb,
501                                             tevent_aio_handler_t handler,
502                                             void *private_data,
503                                             const char *handler_name,
504                                             const char *location)
505 {
506         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
507                                                            struct aio_event_context);
508         struct iocb *iocbp;
509         struct tevent_aio *ae = talloc(mem_ctx?mem_ctx:ev, struct tevent_aio);
510         if (ae == NULL) return NULL;
511
512         ae->event_ctx    = ev;
513         ae->iocb         = *iocb;
514         ae->handler      = handler;
515         ae->private_data = private_data;
516         iocbp = &ae->iocb;
517
518         if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
519                 talloc_free(ae);
520                 return NULL;
521         }
522         ae->iocb.data = ae;
523         talloc_set_destructor(ae, aio_destructor);
524
525         return ae;
526 }
527
528 static const struct tevent_ops aio_event_ops = {
529         .context_init   = aio_event_context_init,
530         .add_fd         = aio_event_add_fd,
531         .add_aio        = aio_event_add_aio,
532         .set_fd_close_fn= tevent_common_fd_set_close_fn,
533         .get_fd_flags   = tevent_common_fd_get_flags,
534         .set_fd_flags   = aio_event_set_fd_flags,
535         .add_timer      = tevent_common_add_timer,
536         .add_signal     = tevent_common_add_signal,
537         .loop_once      = aio_event_loop_once,
538         .loop_wait      = aio_event_loop_wait,
539 };
540
541 bool tevent_aio_init(void)
542 {
543         return tevent_register_backend("aio", &aio_event_ops);
544 }
545