97305fdeb3f6e4a56b5917fe52831b42aa951615
[kai/samba-autobuild/.git] / source4 / lib / events / events_aio.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    main select loop and event handling - aio/epoll hybrid implementation
5
6    Copyright (C) Andrew Tridgell        2006
7
8    based on events_standard.c
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24 /*
25   this is a very strange beast. The Linux AIO implementation doesn't
26   yet integrate properly with epoll, but there is a kernel patch that
27   allows the aio wait primitives to be used to wait for epoll events,
28   and this can be used to give us a unified event system incorporating
29   both aio events and epoll events
30
31   this is _very_ experimental code
32 */
33
34 #include "includes.h"
35 #include "system/filesys.h"
36 #include "system/select.h" /* needed for WITH_EPOLL */
37 #include "lib/util/dlinklist.h"
38 #include "lib/events/events.h"
39 #include "lib/events/events_internal.h"
40 #include <libaio.h>
41
42 #define MAX_AIO_QUEUE_DEPTH     10
43 #define IOCB_CMD_EPOLL_WAIT     9
44
45 struct aio_event_context {
46         /* a pointer back to the generic event_context */
47         struct event_context *ev;
48
49         /* number of registered fd event handlers */
50         int num_fd_events;
51
52         /* list of timed events */
53         struct timed_event *timed_events;
54
55         uint32_t destruction_count;
56
57         io_context_t ioctx;
58
59         struct io_event events[MAX_AIO_QUEUE_DEPTH];
60         struct epoll_event epevent;
61
62         struct iocb *epoll_iocb;
63
64         int epoll_fd;
65 };
66
67 static void aio_event_loop_timer(struct aio_event_context *aio_ev);
68
69 /*
70   map from EVENT_FD_* to EPOLLIN/EPOLLOUT
71 */
72 static uint32_t epoll_map_flags(uint16_t flags)
73 {
74         uint32_t ret = 0;
75         if (flags & EVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
76         if (flags & EVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
77         return ret;
78 }
79
80 /*
81  free the epoll fd
82 */
83 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
84 {
85         close(aio_ev->epoll_fd);
86         aio_ev->epoll_fd = -1;
87         return 0;
88 }
89
90 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT      (1<<0)
91 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR   (1<<1)
92 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR      (1<<2)
93
94 /*
95  add the epoll event to the given fd_event
96 */
97 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde)
98 {
99         struct epoll_event event;
100         if (aio_ev->epoll_fd == -1) return;
101
102         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
103
104         /* if we don't want events yet, don't add an aio_event */
105         if (fde->flags == 0) return;
106
107         ZERO_STRUCT(event);
108         event.events = epoll_map_flags(fde->flags);
109         event.data.ptr = fde;
110         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
111         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
112
113         /* only if we want to read we want to tell the event handler about errors */
114         if (fde->flags & EVENT_FD_READ) {
115                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
116         }
117 }
118
119 /*
120  delete the epoll event for given fd_event
121 */
122 static void epoll_del_event(struct aio_event_context *aio_ev, struct fd_event *fde)
123 {
124         struct epoll_event event;
125         if (aio_ev->epoll_fd == -1) return;
126
127         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
128
129         /* if there's no aio_event, we don't need to delete it */
130         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
131
132         ZERO_STRUCT(event);
133         event.events = epoll_map_flags(fde->flags);
134         event.data.ptr = fde;
135         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
136
137         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
138 }
139
140 /*
141  change the epoll event to the given fd_event
142 */
143 static void epoll_mod_event(struct aio_event_context *aio_ev, struct fd_event *fde)
144 {
145         struct epoll_event event;
146         if (aio_ev->epoll_fd == -1) return;
147
148         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
149
150         ZERO_STRUCT(event);
151         event.events = epoll_map_flags(fde->flags);
152         event.data.ptr = fde;
153         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
154
155         /* only if we want to read we want to tell the event handler about errors */
156         if (fde->flags & EVENT_FD_READ) {
157                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
158         }
159 }
160
161 static void epoll_change_event(struct aio_event_context *aio_ev, struct fd_event *fde)
162 {
163         BOOL got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
164         BOOL want_read = (fde->flags & EVENT_FD_READ);
165         BOOL want_write= (fde->flags & EVENT_FD_WRITE);
166
167         if (aio_ev->epoll_fd == -1) return;
168
169         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
170
171         /* there's already an event */
172         if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
173                 if (want_read || (want_write && !got_error)) {
174                         epoll_mod_event(aio_ev, fde);
175                         return;
176                 }
177                 epoll_del_event(aio_ev, fde);
178                 return;
179         }
180
181         /* there's no aio_event attached to the fde */
182         if (want_read || (want_write && !got_error)) {
183                 epoll_add_event(aio_ev, fde);
184                 return;
185         }
186 }
187
188 static int setup_epoll_wait(struct aio_event_context *aio_ev)
189 {
190         struct io_event r;
191         
192         memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
193         aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
194         aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
195         aio_ev->epoll_iocb->aio_reqprio = 0;
196
197         aio_ev->epoll_iocb->u.c.nbytes = 1;
198         aio_ev->epoll_iocb->u.c.offset = -1;
199         aio_ev->epoll_iocb->u.c.buf = &aio_ev->epevent;
200
201         if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
202                 return -1;
203         }
204 }
205
206
207 /*
208   event loop handling using aio/epoll hybrid
209 */
210 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
211 {
212         int ret, i;
213         uint32_t destruction_count = aio_ev->destruction_count;
214         struct timespec timeout;
215
216         if (aio_ev->epoll_fd == -1) return -1;
217
218         if (tvalp) {
219                 timeout.tv_sec = tvalp->tv_sec;
220                 timeout.tv_nsec = tvalp->tv_usec;
221                 timeout.tv_nsec *= 1000;
222         }
223
224         setup_epoll_wait(aio_ev);
225
226         ret = io_getevents(aio_ev->ioctx, 1, MAX_AIO_QUEUE_DEPTH,
227                            aio_ev->events, tvalp?&timeout:NULL);
228         if (ret == -EINTR) {
229                 return 0;
230         }
231
232         if (ret == 0 && tvalp) {
233                 aio_event_loop_timer(aio_ev);
234                 return 0;
235         }
236
237         for (i=0;i<ret;i++) {
238                 struct iocb *finished = aio_ev->events[i].obj;
239                 switch (finished->aio_lio_opcode) {
240                 case IOCB_CMD_EPOLL_WAIT: {
241                         struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
242                         struct fd_event *fde = talloc_get_type(ep->data.ptr, 
243                                                                struct fd_event);
244                         uint16_t flags = 0;
245
246                         if (fde == NULL) {
247                                 return -1;
248                         }
249                         if (ep->events & (EPOLLHUP|EPOLLERR)) {
250                                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
251                                 if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
252                                         epoll_del_event(aio_ev, fde);
253                                         continue;
254                                 }
255                                 flags |= EVENT_FD_READ;
256                         }
257                         if (ep->events & EPOLLIN) flags |= EVENT_FD_READ;
258                         if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE;
259                         if (flags) {
260                                 fde->handler(aio_ev->ev, fde, flags, fde->private_data);
261                                 if (destruction_count != aio_ev->destruction_count) {
262                                         return 0;
263                                 }
264                         }
265                         break;
266                 }
267                 }
268         }
269
270         return 0;
271 }
272
273 /*
274   create a aio_event_context structure.
275 */
276 static int aio_event_context_init(struct event_context *ev, void *private_data)
277 {
278         struct aio_event_context *aio_ev;
279         
280         aio_ev = talloc_zero(ev, struct aio_event_context);
281         if (!aio_ev) return -1;
282
283         aio_ev->ev = ev;
284         aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
285
286         if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
287                 return -1;
288         }
289
290         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
291         if (aio_ev->epoll_fd == -1) return -1;
292
293         talloc_set_destructor(aio_ev, aio_ctx_destructor);
294
295         ev->additional_data = aio_ev;
296         return 0;
297 }
298
299 /*
300   destroy an fd_event
301 */
302 static int aio_event_fd_destructor(struct fd_event *fde)
303 {
304         struct event_context *ev = fde->event_ctx;
305         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
306                                                            struct aio_event_context);
307
308         aio_ev->num_fd_events--;
309         aio_ev->destruction_count++;
310
311         epoll_del_event(aio_ev, fde);
312
313         return 0;
314 }
315
316 /*
317   add a fd based event
318   return NULL on failure (memory allocation error)
319 */
320 static struct fd_event *aio_event_add_fd(struct event_context *ev, TALLOC_CTX *mem_ctx,
321                                          int fd, uint16_t flags,
322                                          event_fd_handler_t handler,
323                                          void *private_data)
324 {
325         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
326                                                            struct aio_event_context);
327         struct fd_event *fde;
328
329         fde = talloc(mem_ctx?mem_ctx:ev, struct fd_event);
330         if (!fde) return NULL;
331
332         fde->event_ctx          = ev;
333         fde->fd                 = fd;
334         fde->flags              = flags;
335         fde->handler            = handler;
336         fde->private_data       = private_data;
337         fde->additional_flags   = 0;
338         fde->additional_data    = NULL;
339
340         aio_ev->num_fd_events++;
341         talloc_set_destructor(fde, aio_event_fd_destructor);
342
343         epoll_add_event(aio_ev, fde);
344
345         return fde;
346 }
347
348
349 /*
350   return the fd event flags
351 */
352 static uint16_t aio_event_get_fd_flags(struct fd_event *fde)
353 {
354         return fde->flags;
355 }
356
357 /*
358   set the fd event flags
359 */
360 static void aio_event_set_fd_flags(struct fd_event *fde, uint16_t flags)
361 {
362         struct event_context *ev;
363         struct aio_event_context *aio_ev;
364
365         if (fde->flags == flags) return;
366
367         ev = fde->event_ctx;
368         aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
369
370         fde->flags = flags;
371
372         epoll_change_event(aio_ev, fde);
373 }
374
375 /*
376   destroy a timed event
377 */
378 static int aio_event_timed_destructor(struct timed_event *te)
379 {
380         struct aio_event_context *aio_ev = talloc_get_type(te->event_ctx->additional_data,
381                                                            struct aio_event_context);
382         DLIST_REMOVE(aio_ev->timed_events, te);
383         return 0;
384 }
385
386 static int aio_event_timed_deny_destructor(struct timed_event *te)
387 {
388         return -1;
389 }
390
391 /*
392   add a timed event
393   return NULL on failure (memory allocation error)
394 */
395 static struct timed_event *aio_event_add_timed(struct event_context *ev, TALLOC_CTX *mem_ctx,
396                                                struct timeval next_event, 
397                                                event_timed_handler_t handler, 
398                                                void *private_data) 
399 {
400         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
401                                                            struct aio_event_context);
402         struct timed_event *te, *last_te, *cur_te;
403
404         te = talloc(mem_ctx?mem_ctx:ev, struct timed_event);
405         if (te == NULL) return NULL;
406
407         te->event_ctx           = ev;
408         te->next_event          = next_event;
409         te->handler             = handler;
410         te->private_data        = private_data;
411         te->additional_data     = NULL;
412
413         /* keep the list ordered */
414         last_te = NULL;
415         for (cur_te = aio_ev->timed_events; cur_te; cur_te = cur_te->next) {
416                 /* if the new event comes before the current one break */
417                 if (!timeval_is_zero(&cur_te->next_event) &&
418                     timeval_compare(&te->next_event,
419                                     &cur_te->next_event) < 0) {
420                         break;
421                 }
422
423                 last_te = cur_te;
424         }
425
426         DLIST_ADD_AFTER(aio_ev->timed_events, te, last_te);
427
428         talloc_set_destructor(te, aio_event_timed_destructor);
429
430         return te;
431 }
432
433 /*
434   a timer has gone off - call it
435 */
436 static void aio_event_loop_timer(struct aio_event_context *aio_ev)
437 {
438         struct timeval t = timeval_current();
439         struct timed_event *te = aio_ev->timed_events;
440
441         if (te == NULL) {
442                 return;
443         }
444
445         /* deny the handler to free the event */
446         talloc_set_destructor(te, aio_event_timed_deny_destructor);
447
448         /* We need to remove the timer from the list before calling the
449          * handler because in a semi-async inner event loop called from the
450          * handler we don't want to come across this event again -- vl */
451         DLIST_REMOVE(aio_ev->timed_events, te);
452
453         te->handler(aio_ev->ev, te, t, te->private_data);
454
455         /* The destructor isn't necessary anymore, we've already removed the
456          * event from the list. */
457         talloc_set_destructor(te, NULL);
458
459         talloc_free(te);
460 }
461
462 /*
463   do a single event loop using the events defined in ev 
464 */
465 static int aio_event_loop_once(struct event_context *ev)
466 {
467         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
468                                                            struct aio_event_context);
469         struct timeval tval;
470
471         /* work out the right timeout for all timed events */
472         if (aio_ev->timed_events) {
473                 struct timeval t = timeval_current();
474                 tval = timeval_until(&t, &aio_ev->timed_events->next_event);
475                 if (timeval_is_zero(&tval)) {
476                         aio_event_loop_timer(aio_ev);
477                         return 0;
478                 }
479         } else {
480                 /* have a default tick time of 30 seconds. This guarantees
481                    that code that uses its own timeout checking will be
482                    able to proceeed eventually */
483                 tval = timeval_set(30, 0);
484         }
485
486         return aio_event_loop(aio_ev, &tval);
487 }
488
489 /*
490   return on failure or (with 0) if all fd events are removed
491 */
492 static int aio_event_loop_wait(struct event_context *ev)
493 {
494         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
495                                                            struct aio_event_context);
496         while (aio_ev->num_fd_events) {
497                 if (aio_event_loop_once(ev) != 0) {
498                         break;
499                 }
500         }
501
502         return 0;
503 }
504
505 static const struct event_ops aio_event_ops = {
506         .context_init   = aio_event_context_init,
507         .add_fd         = aio_event_add_fd,
508         .get_fd_flags   = aio_event_get_fd_flags,
509         .set_fd_flags   = aio_event_set_fd_flags,
510         .add_timed      = aio_event_add_timed,
511         .loop_once      = aio_event_loop_once,
512         .loop_wait      = aio_event_loop_wait,
513 };
514
515 const struct event_ops *event_aio_get_ops(void)
516 {
517         return &aio_event_ops;
518 }