net lua
[ira/wip.git] / lib / tevent / tevent_aio.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    main select loop and event handling - aio/epoll hybrid implementation
5
6    Copyright (C) Andrew Tridgell        2006
7
8    based on events_standard.c
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 3 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23 /*
24   this is a very strange beast. The Linux AIO implementation doesn't
25   yet integrate properly with epoll, but there is a kernel patch that
26   allows the aio wait primitives to be used to wait for epoll events,
27   and this can be used to give us a unified event system incorporating
28   both aio events and epoll events
29
30   this is _very_ experimental code
31 */
32
33 #include "system/filesys.h"
34 #include "replace.h"
35 #include "events.h"
36 #include "events_internal.h"
37 #include "events_util.h"
38 #include <sys/epoll.h>
39 #include <libaio.h>
40
41 #define MAX_AIO_QUEUE_DEPTH     100
42 #ifndef IOCB_CMD_EPOLL_WAIT
43 #define IOCB_CMD_EPOLL_WAIT     9
44 #endif
45
46 struct aio_event_context {
47         /* a pointer back to the generic event_context */
48         struct event_context *ev;
49
50         /* list of filedescriptor events */
51         struct fd_event *fd_events;
52
53         /* number of registered fd event handlers */
54         int num_fd_events;
55
56         uint32_t destruction_count;
57
58         io_context_t ioctx;
59
60         struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
61
62         struct iocb *epoll_iocb;
63
64         int epoll_fd;
65         int is_epoll_set;
66         pid_t pid;
67 };
68
69 struct aio_event {
70         struct event_context *event_ctx;
71         struct iocb iocb;
72         void *private_data;
73         event_aio_handler_t handler;
74 };
75
76 /*
77   map from EVENT_FD_* to EPOLLIN/EPOLLOUT
78 */
79 static uint32_t epoll_map_flags(uint16_t flags)
80 {
81         uint32_t ret = 0;
82         if (flags & EVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
83         if (flags & EVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
84         return ret;
85 }
86
87 /*
88  free the epoll fd
89 */
90 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
91 {
92         io_queue_release(aio_ev->ioctx);
93         close(aio_ev->epoll_fd);
94         aio_ev->epoll_fd = -1;
95         return 0;
96 }
97
98 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde);
99
100 /*
101   reopen the epoll handle when our pid changes
102   see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an 
103   demonstration of why this is needed
104  */
105 static void epoll_check_reopen(struct aio_event_context *aio_ev)
106 {
107         struct fd_event *fde;
108
109         if (aio_ev->pid == getpid()) {
110                 return;
111         }
112
113         close(aio_ev->epoll_fd);
114         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
115         if (aio_ev->epoll_fd == -1) {
116                 ev_debug(aio_ev->ev, EV_DEBUG_FATAL, "Failed to recreate epoll handle after fork\n");
117                 return;
118         }
119         aio_ev->pid = getpid();
120         for (fde=aio_ev->fd_events;fde;fde=fde->next) {
121                 epoll_add_event(aio_ev, fde);
122         }
123 }
124
125 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT      (1<<0)
126 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR   (1<<1)
127 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR      (1<<2)
128
129 /*
130  add the epoll event to the given fd_event
131 */
132 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde)
133 {
134         struct epoll_event event;
135         if (aio_ev->epoll_fd == -1) return;
136
137         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
138
139         /* if we don't want events yet, don't add an aio_event */
140         if (fde->flags == 0) return;
141
142         memset(&event, 0, sizeof(event));
143         event.events = epoll_map_flags(fde->flags);
144         event.data.ptr = fde;
145         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
146         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
147
148         /* only if we want to read we want to tell the event handler about errors */
149         if (fde->flags & EVENT_FD_READ) {
150                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
151         }
152 }
153
154 /*
155  delete the epoll event for given fd_event
156 */
157 static void epoll_del_event(struct aio_event_context *aio_ev, struct fd_event *fde)
158 {
159         struct epoll_event event;
160
161         DLIST_REMOVE(aio_ev->fd_events, fde);
162
163         if (aio_ev->epoll_fd == -1) return;
164
165         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
166
167         /* if there's no aio_event, we don't need to delete it */
168         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
169
170         ZERO_STRUCT(event);
171         event.events = epoll_map_flags(fde->flags);
172         event.data.ptr = fde;
173         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
174
175         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
176 }
177
178 /*
179  change the epoll event to the given fd_event
180 */
181 static void epoll_mod_event(struct aio_event_context *aio_ev, struct fd_event *fde)
182 {
183         struct epoll_event event;
184         if (aio_ev->epoll_fd == -1) return;
185
186         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
187
188         ZERO_STRUCT(event);
189         event.events = epoll_map_flags(fde->flags);
190         event.data.ptr = fde;
191         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
192
193         /* only if we want to read we want to tell the event handler about errors */
194         if (fde->flags & EVENT_FD_READ) {
195                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
196         }
197 }
198
199 static void epoll_change_event(struct aio_event_context *aio_ev, struct fd_event *fde)
200 {
201         bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
202         bool want_read = (fde->flags & EVENT_FD_READ);
203         bool want_write= (fde->flags & EVENT_FD_WRITE);
204
205         if (aio_ev->epoll_fd == -1) return;
206
207         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
208
209         /* there's already an event */
210         if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
211                 if (want_read || (want_write && !got_error)) {
212                         epoll_mod_event(aio_ev, fde);
213                         return;
214                 }
215                 epoll_del_event(aio_ev, fde);
216                 return;
217         }
218
219         /* there's no aio_event attached to the fde */
220         if (want_read || (want_write && !got_error)) {
221                 DLIST_ADD(aio_ev->fd_events, fde);
222                 epoll_add_event(aio_ev, fde);
223                 return;
224         }
225 }
226
227 static int setup_epoll_wait(struct aio_event_context *aio_ev)
228 {
229         if (aio_ev->is_epoll_set) {
230                 return 0;
231         }
232         memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
233         aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
234         aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
235         aio_ev->epoll_iocb->aio_reqprio = 0;
236
237         aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
238         aio_ev->epoll_iocb->u.c.offset = -1;
239         aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
240
241         if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
242                 return -1;
243         }
244         aio_ev->is_epoll_set = 1;
245
246         return 0;
247 }
248
249
250 /*
251   event loop handling using aio/epoll hybrid
252 */
253 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
254 {
255         int ret, i;
256         uint32_t destruction_count = ++aio_ev->destruction_count;
257         struct timespec timeout;
258         struct io_event events[8];
259
260         if (aio_ev->epoll_fd == -1) return -1;
261
262         if (aio_ev->ev->num_signal_handlers && 
263             common_event_check_signal(aio_ev->ev)) {
264                 return 0;
265         }
266
267         if (tvalp) {
268                 timeout.tv_sec = tvalp->tv_sec;
269                 timeout.tv_nsec = tvalp->tv_usec;
270                 timeout.tv_nsec *= 1000;
271         }
272
273         if (setup_epoll_wait(aio_ev) < 0) 
274                 return -1;
275
276         ret = io_getevents(aio_ev->ioctx, 1, 8,
277                            events, tvalp?&timeout:NULL);
278
279         if (ret == -EINTR) {
280                 if (aio_ev->ev->num_signal_handlers) {
281                         common_event_check_signal(aio_ev->ev);
282                 }
283                 return 0;
284         }
285
286         if (ret == 0 && tvalp) {
287                 /* we don't care about a possible delay here */
288                 common_event_loop_timer_delay(aio_ev->ev);
289                 return 0;
290         }
291
292         for (i=0;i<ret;i++) {
293                 struct io_event *event = &events[i];
294                 struct iocb *finished = event->obj;
295
296                 switch (finished->aio_lio_opcode) {
297                 case IO_CMD_PWRITE:
298                 case IO_CMD_PREAD: {
299                         struct aio_event *ae = talloc_get_type(finished->data, 
300                                                                struct aio_event);
301                         if (ae) {
302                                 talloc_set_destructor(ae, NULL);
303                                 ae->handler(ae->event_ctx, ae, 
304                                             event->res, ae->private_data);
305                                 talloc_free(ae);
306                         }
307                         break;
308                 }
309                 case IOCB_CMD_EPOLL_WAIT: {
310                         struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
311                         struct fd_event *fde;
312                         uint16_t flags = 0;
313                         int j;
314
315                         aio_ev->is_epoll_set = 0;
316
317                         for (j=0; j<event->res; j++, ep++) {
318                                 fde = talloc_get_type(ep->data.ptr, 
319                                                       struct fd_event);
320                                 if (fde == NULL) {
321                                         return -1;
322                                 }
323                                 if (ep->events & (EPOLLHUP|EPOLLERR)) {
324                                         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
325                                         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
326                                                 epoll_del_event(aio_ev, fde);
327                                                 continue;
328                                         }
329                                         flags |= EVENT_FD_READ;
330                                 }
331                                 if (ep->events & EPOLLIN) flags |= EVENT_FD_READ;
332                                 if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE;
333                                 if (flags) {
334                                         fde->handler(aio_ev->ev, fde, flags, fde->private_data);
335                                 }
336                         }
337                         break;
338                 }
339                 }
340                 if (destruction_count != aio_ev->destruction_count) {
341                         return 0;
342                 }
343         }
344
345         return 0;
346 }
347
348 /*
349   create a aio_event_context structure.
350 */
351 static int aio_event_context_init(struct event_context *ev)
352 {
353         struct aio_event_context *aio_ev;
354         
355         aio_ev = talloc_zero(ev, struct aio_event_context);
356         if (!aio_ev) return -1;
357
358         aio_ev->ev = ev;
359         aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
360
361         if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
362                 talloc_free(aio_ev);
363                 return -1;
364         }
365
366         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
367         if (aio_ev->epoll_fd == -1) {
368                 talloc_free(aio_ev);
369                 return -1;
370         }
371         aio_ev->pid = getpid();
372
373         talloc_set_destructor(aio_ev, aio_ctx_destructor);
374
375         ev->additional_data = aio_ev;
376
377         if (setup_epoll_wait(aio_ev) < 0) {
378                 talloc_free(aio_ev);
379                 return -1;
380         }
381
382         return 0;
383 }
384
385 /*
386   destroy an fd_event
387 */
388 static int aio_event_fd_destructor(struct fd_event *fde)
389 {
390         struct event_context *ev = fde->event_ctx;
391         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
392                                                            struct aio_event_context);
393
394         epoll_check_reopen(aio_ev);
395
396         aio_ev->num_fd_events--;
397         aio_ev->destruction_count++;
398
399         epoll_del_event(aio_ev, fde);
400
401         if (fde->flags & EVENT_FD_AUTOCLOSE) {
402                 close(fde->fd);
403                 fde->fd = -1;
404         }
405
406         return 0;
407 }
408
409 /*
410   add a fd based event
411   return NULL on failure (memory allocation error)
412 */
413 static struct fd_event *aio_event_add_fd(struct event_context *ev, TALLOC_CTX *mem_ctx,
414                                          int fd, uint16_t flags,
415                                          event_fd_handler_t handler,
416                                          void *private_data)
417 {
418         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
419                                                            struct aio_event_context);
420         struct fd_event *fde;
421
422         epoll_check_reopen(aio_ev);
423
424         fde = talloc(mem_ctx?mem_ctx:ev, struct fd_event);
425         if (!fde) return NULL;
426
427         fde->event_ctx          = ev;
428         fde->fd                 = fd;
429         fde->flags              = flags;
430         fde->handler            = handler;
431         fde->private_data       = private_data;
432         fde->additional_flags   = 0;
433         fde->additional_data    = NULL;
434
435         aio_ev->num_fd_events++;
436         talloc_set_destructor(fde, aio_event_fd_destructor);
437
438         DLIST_ADD(aio_ev->fd_events, fde);
439         epoll_add_event(aio_ev, fde);
440
441         return fde;
442 }
443
444
445 /*
446   return the fd event flags
447 */
448 static uint16_t aio_event_get_fd_flags(struct fd_event *fde)
449 {
450         return fde->flags;
451 }
452
453 /*
454   set the fd event flags
455 */
456 static void aio_event_set_fd_flags(struct fd_event *fde, uint16_t flags)
457 {
458         struct event_context *ev;
459         struct aio_event_context *aio_ev;
460
461         if (fde->flags == flags) return;
462
463         ev = fde->event_ctx;
464         aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
465
466         fde->flags = flags;
467
468         epoll_check_reopen(aio_ev);
469
470         epoll_change_event(aio_ev, fde);
471 }
472
473 /*
474   do a single event loop using the events defined in ev 
475 */
476 static int aio_event_loop_once(struct event_context *ev)
477 {
478         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
479                                                            struct aio_event_context);
480         struct timeval tval;
481
482         tval = common_event_loop_timer_delay(ev);
483         if (ev_timeval_is_zero(&tval)) {
484                 return 0;
485         }
486
487         epoll_check_reopen(aio_ev);
488
489         return aio_event_loop(aio_ev, &tval);
490 }
491
492 /*
493   return on failure or (with 0) if all fd events are removed
494 */
495 static int aio_event_loop_wait(struct event_context *ev)
496 {
497         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
498                                                            struct aio_event_context);
499         while (aio_ev->num_fd_events) {
500                 if (aio_event_loop_once(ev) != 0) {
501                         break;
502                 }
503         }
504
505         return 0;
506 }
507
508 /*
509   called when a disk IO event needs to be cancelled
510 */
511 static int aio_destructor(struct aio_event *ae)
512 {
513         struct event_context *ev = ae->event_ctx;
514         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
515                                                            struct aio_event_context);
516         struct io_event result;
517         io_cancel(aio_ev->ioctx, &ae->iocb, &result);
518         /* TODO: handle errors from io_cancel()! */
519         return 0;
520 }
521
522 /* submit an aio disk IO event */
523 static struct aio_event *aio_event_add_aio(struct event_context *ev, 
524                                            TALLOC_CTX *mem_ctx,
525                                            struct iocb *iocb,
526                                            event_aio_handler_t handler,
527                                            void *private_data)
528 {
529         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
530                                                            struct aio_event_context);
531         struct iocb *iocbp;
532         struct aio_event *ae = talloc(mem_ctx?mem_ctx:ev, struct aio_event);
533         if (ae == NULL) return NULL;
534
535         ae->event_ctx    = ev;
536         ae->iocb         = *iocb;
537         ae->handler      = handler;
538         ae->private_data = private_data;
539         iocbp = &ae->iocb;
540
541         if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
542                 talloc_free(ae);
543                 return NULL;
544         }
545         ae->iocb.data = ae;
546         talloc_set_destructor(ae, aio_destructor);
547
548         return ae;
549 }
550
551 static const struct event_ops aio_event_ops = {
552         .context_init   = aio_event_context_init,
553         .add_fd         = aio_event_add_fd,
554         .add_aio        = aio_event_add_aio,
555         .get_fd_flags   = aio_event_get_fd_flags,
556         .set_fd_flags   = aio_event_set_fd_flags,
557         .add_timed      = common_event_add_timed,
558         .add_signal     = common_event_add_signal,
559         .loop_once      = aio_event_loop_once,
560         .loop_wait      = aio_event_loop_wait,
561 };
562
563 bool events_aio_init(void)
564 {
565         return event_register_backend("aio", &aio_event_ops);
566 }
567