2 Unix SMB/CIFS implementation.
4 main select loop and event handling - aio/epoll hybrid implementation
6 Copyright (C) Andrew Tridgell 2006
8 based on events_standard.c
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 3 of the License, or
13 (at your option) any later version.
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 this is a very strange beast. The Linux AIO implementation doesn't
25 yet integrate properly with epoll, but there is a kernel patch that
26 allows the aio wait primitives to be used to wait for epoll events,
27 and this can be used to give us a unified event system incorporating
28 both aio events and epoll events
30 this is _very_ experimental code
34 #include "system/filesys.h"
35 #include "system/select.h"
37 #include "tevent_internal.h"
38 #include "tevent_util.h"
41 #define MAX_AIO_QUEUE_DEPTH 100
42 #ifndef IOCB_CMD_EPOLL_WAIT
43 #define IOCB_CMD_EPOLL_WAIT 9
46 struct aio_event_context {
47 /* a pointer back to the generic event_context */
48 struct tevent_context *ev;
50 uint32_t destruction_count;
54 struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
56 struct iocb *epoll_iocb;
64 struct tevent_context *event_ctx;
67 tevent_aio_handler_t handler;
71 map from EVENT_FD_* to EPOLLIN/EPOLLOUT
73 static uint32_t epoll_map_flags(uint16_t flags)
76 if (flags & TEVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
77 if (flags & TEVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
84 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
86 io_queue_release(aio_ev->ioctx);
87 close(aio_ev->epoll_fd);
88 aio_ev->epoll_fd = -1;
92 static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde);
95 reopen the epoll handle when our pid changes
96 see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an
97 demonstration of why this is needed
99 static void epoll_check_reopen(struct aio_event_context *aio_ev)
101 struct tevent_fd *fde;
103 if (aio_ev->pid == getpid()) {
107 close(aio_ev->epoll_fd);
108 aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
109 if (aio_ev->epoll_fd == -1) {
110 tevent_debug(aio_ev->ev, TEVENT_DEBUG_FATAL,
111 "Failed to recreate epoll handle after fork\n");
114 aio_ev->pid = getpid();
115 for (fde=aio_ev->ev->fd_events;fde;fde=fde->next) {
116 epoll_add_event(aio_ev, fde);
120 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT (1<<0)
121 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR (1<<1)
122 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR (1<<2)
125 add the epoll event to the given fd_event
127 static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
129 struct epoll_event event;
130 if (aio_ev->epoll_fd == -1) return;
132 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
134 /* if we don't want events yet, don't add an aio_event */
135 if (fde->flags == 0) return;
137 memset(&event, 0, sizeof(event));
138 event.events = epoll_map_flags(fde->flags);
139 event.data.ptr = fde;
140 epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
141 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
143 /* only if we want to read we want to tell the event handler about errors */
144 if (fde->flags & TEVENT_FD_READ) {
145 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
150 delete the epoll event for given fd_event
152 static void epoll_del_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
154 struct epoll_event event;
156 if (aio_ev->epoll_fd == -1) return;
158 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
160 /* if there's no aio_event, we don't need to delete it */
161 if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
164 event.events = epoll_map_flags(fde->flags);
165 event.data.ptr = fde;
166 epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
168 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
172 change the epoll event to the given fd_event
174 static void epoll_mod_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
176 struct epoll_event event;
177 if (aio_ev->epoll_fd == -1) return;
179 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
182 event.events = epoll_map_flags(fde->flags);
183 event.data.ptr = fde;
184 epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
186 /* only if we want to read we want to tell the event handler about errors */
187 if (fde->flags & TEVENT_FD_READ) {
188 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
192 static void epoll_change_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
194 bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
195 bool want_read = (fde->flags & TEVENT_FD_READ);
196 bool want_write= (fde->flags & TEVENT_FD_WRITE);
198 if (aio_ev->epoll_fd == -1) return;
200 fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
202 /* there's already an event */
203 if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
204 if (want_read || (want_write && !got_error)) {
205 epoll_mod_event(aio_ev, fde);
208 epoll_del_event(aio_ev, fde);
212 /* there's no aio_event attached to the fde */
213 if (want_read || (want_write && !got_error)) {
214 epoll_add_event(aio_ev, fde);
219 static int setup_epoll_wait(struct aio_event_context *aio_ev)
221 if (aio_ev->is_epoll_set) {
224 memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
225 aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
226 aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
227 aio_ev->epoll_iocb->aio_reqprio = 0;
229 aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
230 aio_ev->epoll_iocb->u.c.offset = -1;
231 aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
233 if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
236 aio_ev->is_epoll_set = 1;
243 event loop handling using aio/epoll hybrid
245 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
248 uint32_t destruction_count = ++aio_ev->destruction_count;
249 struct timespec timeout;
250 struct io_event events[8];
252 if (aio_ev->epoll_fd == -1) return -1;
254 if (aio_ev->ev->signal_events &&
255 tevent_common_check_signal(aio_ev->ev)) {
260 timeout.tv_sec = tvalp->tv_sec;
261 timeout.tv_nsec = tvalp->tv_usec;
262 timeout.tv_nsec *= 1000;
265 if (setup_epoll_wait(aio_ev) < 0)
268 ret = io_getevents(aio_ev->ioctx, 1, 8,
269 events, tvalp?&timeout:NULL);
272 if (aio_ev->ev->signal_events) {
273 tevent_common_check_signal(aio_ev->ev);
278 if (ret == 0 && tvalp) {
279 /* we don't care about a possible delay here */
280 tevent_common_loop_timer_delay(aio_ev->ev);
284 for (i=0;i<ret;i++) {
285 struct io_event *event = &events[i];
286 struct iocb *finished = event->obj;
288 switch (finished->aio_lio_opcode) {
291 struct tevent_aio *ae = talloc_get_type(finished->data,
294 talloc_set_destructor(ae, NULL);
295 ae->handler(ae->event_ctx, ae,
296 event->res, ae->private_data);
301 case IOCB_CMD_EPOLL_WAIT: {
302 struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
303 struct tevent_fd *fde;
307 aio_ev->is_epoll_set = 0;
309 for (j=0; j<event->res; j++, ep++) {
310 fde = talloc_get_type(ep->data.ptr,
315 if (ep->events & (EPOLLHUP|EPOLLERR)) {
316 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
317 if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
318 epoll_del_event(aio_ev, fde);
321 flags |= TEVENT_FD_READ;
323 if (ep->events & EPOLLIN) flags |= TEVENT_FD_READ;
324 if (ep->events & EPOLLOUT) flags |= TEVENT_FD_WRITE;
326 fde->handler(aio_ev->ev, fde, flags, fde->private_data);
332 if (destruction_count != aio_ev->destruction_count) {
341 create a aio_event_context structure.
343 static int aio_event_context_init(struct tevent_context *ev)
345 struct aio_event_context *aio_ev;
347 aio_ev = talloc_zero(ev, struct aio_event_context);
348 if (!aio_ev) return -1;
351 aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
353 if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
358 aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
359 if (aio_ev->epoll_fd == -1) {
363 aio_ev->pid = getpid();
365 talloc_set_destructor(aio_ev, aio_ctx_destructor);
367 ev->additional_data = aio_ev;
369 if (setup_epoll_wait(aio_ev) < 0) {
380 static int aio_event_fd_destructor(struct tevent_fd *fde)
382 struct tevent_context *ev = fde->event_ctx;
383 struct aio_event_context *aio_ev = NULL;
386 aio_ev = talloc_get_type(ev->additional_data,
387 struct aio_event_context);
389 epoll_check_reopen(aio_ev);
391 aio_ev->destruction_count++;
393 epoll_del_event(aio_ev, fde);
396 return tevent_common_fd_destructor(fde);
401 return NULL on failure (memory allocation error)
403 static struct tevent_fd *aio_event_add_fd(struct tevent_context *ev, TALLOC_CTX *mem_ctx,
404 int fd, uint16_t flags,
405 tevent_fd_handler_t handler,
407 const char *handler_name,
408 const char *location)
410 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
411 struct aio_event_context);
412 struct tevent_fd *fde;
414 epoll_check_reopen(aio_ev);
416 fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
417 handler, private_data,
418 handler_name, location);
419 if (!fde) return NULL;
421 talloc_set_destructor(fde, aio_event_fd_destructor);
423 epoll_add_event(aio_ev, fde);
429 set the fd event flags
431 static void aio_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
433 struct tevent_context *ev;
434 struct aio_event_context *aio_ev;
436 if (fde->flags == flags) return;
439 aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
443 epoll_check_reopen(aio_ev);
445 epoll_change_event(aio_ev, fde);
449 do a single event loop using the events defined in ev
451 static int aio_event_loop_once(struct tevent_context *ev)
453 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
454 struct aio_event_context);
457 tval = tevent_common_loop_timer_delay(ev);
458 if (ev_timeval_is_zero(&tval)) {
462 epoll_check_reopen(aio_ev);
464 return aio_event_loop(aio_ev, &tval);
468 return on failure or (with 0) if all fd events are removed
470 static int aio_event_loop_wait(struct tevent_context *ev)
472 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
473 struct aio_event_context);
474 while (aio_ev->ev->fd_events) {
475 if (aio_event_loop_once(ev) != 0) {
484 called when a disk IO event needs to be cancelled
486 static int aio_destructor(struct tevent_aio *ae)
488 struct tevent_context *ev = ae->event_ctx;
489 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
490 struct aio_event_context);
491 struct io_event result;
492 io_cancel(aio_ev->ioctx, &ae->iocb, &result);
493 /* TODO: handle errors from io_cancel()! */
497 /* submit an aio disk IO event */
498 static struct tevent_aio *aio_event_add_aio(struct tevent_context *ev,
501 tevent_aio_handler_t handler,
503 const char *handler_name,
504 const char *location)
506 struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
507 struct aio_event_context);
509 struct tevent_aio *ae = talloc(mem_ctx?mem_ctx:ev, struct tevent_aio);
510 if (ae == NULL) return NULL;
514 ae->handler = handler;
515 ae->private_data = private_data;
518 if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
523 talloc_set_destructor(ae, aio_destructor);
528 static const struct tevent_ops aio_event_ops = {
529 .context_init = aio_event_context_init,
530 .add_fd = aio_event_add_fd,
531 .add_aio = aio_event_add_aio,
532 .set_fd_close_fn= tevent_common_fd_set_close_fn,
533 .get_fd_flags = tevent_common_fd_get_flags,
534 .set_fd_flags = aio_event_set_fd_flags,
535 .add_timer = tevent_common_add_timer,
536 .add_signal = tevent_common_add_signal,
537 .loop_once = aio_event_loop_once,
538 .loop_wait = aio_event_loop_wait,
541 bool tevent_aio_init(void)
543 return tevent_register_backend("aio", &aio_event_ops);