Merge branch 'master' of ctdb into 'master' of samba
[kai/samba-autobuild/.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30
31 /* Note: lib/util/ is currently GPL */
32 #include "lib/util/tevent_unix.h"
33 #include "lib/util/samba_util.h"
34
35 #ifndef TALLOC_FREE
36 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
37 #endif
38
39 struct sendto_state {
40         int fd;
41         const void *buf;
42         size_t len;
43         int flags;
44         const struct sockaddr_storage *addr;
45         socklen_t addr_len;
46         ssize_t sent;
47 };
48
49 static void sendto_handler(struct tevent_context *ev,
50                                struct tevent_fd *fde,
51                                uint16_t flags, void *private_data);
52
53 struct tevent_req *sendto_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
54                                int fd, const void *buf, size_t len, int flags,
55                                const struct sockaddr_storage *addr)
56 {
57         struct tevent_req *result;
58         struct sendto_state *state;
59         struct tevent_fd *fde;
60
61         result = tevent_req_create(mem_ctx, &state, struct sendto_state);
62         if (result == NULL) {
63                 return result;
64         }
65         state->fd = fd;
66         state->buf = buf;
67         state->len = len;
68         state->flags = flags;
69         state->addr = addr;
70
71         switch (addr->ss_family) {
72         case AF_INET:
73                 state->addr_len = sizeof(struct sockaddr_in);
74                 break;
75 #if defined(HAVE_IPV6)
76         case AF_INET6:
77                 state->addr_len = sizeof(struct sockaddr_in6);
78                 break;
79 #endif
80         case AF_UNIX:
81                 state->addr_len = sizeof(struct sockaddr_un);
82                 break;
83         default:
84                 state->addr_len = sizeof(struct sockaddr_storage);
85                 break;
86         }
87
88         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, sendto_handler,
89                             result);
90         if (fde == NULL) {
91                 TALLOC_FREE(result);
92                 return NULL;
93         }
94         return result;
95 }
96
97 static void sendto_handler(struct tevent_context *ev,
98                                struct tevent_fd *fde,
99                                uint16_t flags, void *private_data)
100 {
101         struct tevent_req *req = talloc_get_type_abort(
102                 private_data, struct tevent_req);
103         struct sendto_state *state =
104                 tevent_req_data(req, struct sendto_state);
105
106         state->sent = sendto(state->fd, state->buf, state->len, state->flags,
107                              (const struct sockaddr *)state->addr,
108                              state->addr_len);
109         if ((state->sent == -1) && (errno == EINTR)) {
110                 /* retry */
111                 return;
112         }
113         if (state->sent == -1) {
114                 tevent_req_error(req, errno);
115                 return;
116         }
117         tevent_req_done(req);
118 }
119
120 ssize_t sendto_recv(struct tevent_req *req, int *perrno)
121 {
122         struct sendto_state *state =
123                 tevent_req_data(req, struct sendto_state);
124
125         if (tevent_req_is_unix_error(req, perrno)) {
126                 return -1;
127         }
128         return state->sent;
129 }
130
131 struct recvfrom_state {
132         int fd;
133         void *buf;
134         size_t len;
135         int flags;
136         struct sockaddr_storage *addr;
137         socklen_t *addr_len;
138         ssize_t received;
139 };
140
141 static void recvfrom_handler(struct tevent_context *ev,
142                                struct tevent_fd *fde,
143                                uint16_t flags, void *private_data);
144
145 struct tevent_req *recvfrom_send(TALLOC_CTX *mem_ctx,
146                                  struct tevent_context *ev,
147                                  int fd, void *buf, size_t len, int flags,
148                                  struct sockaddr_storage *addr,
149                                  socklen_t *addr_len)
150 {
151         struct tevent_req *result;
152         struct recvfrom_state *state;
153         struct tevent_fd *fde;
154
155         result = tevent_req_create(mem_ctx, &state, struct recvfrom_state);
156         if (result == NULL) {
157                 return result;
158         }
159         state->fd = fd;
160         state->buf = buf;
161         state->len = len;
162         state->flags = flags;
163         state->addr = addr;
164         state->addr_len = addr_len;
165
166         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, recvfrom_handler,
167                             result);
168         if (fde == NULL) {
169                 TALLOC_FREE(result);
170                 return NULL;
171         }
172         return result;
173 }
174
175 static void recvfrom_handler(struct tevent_context *ev,
176                                struct tevent_fd *fde,
177                                uint16_t flags, void *private_data)
178 {
179         struct tevent_req *req = talloc_get_type_abort(
180                 private_data, struct tevent_req);
181         struct recvfrom_state *state =
182                 tevent_req_data(req, struct recvfrom_state);
183
184         state->received = recvfrom(state->fd, state->buf, state->len,
185                                    state->flags, (struct sockaddr *)state->addr,
186                                    state->addr_len);
187         if ((state->received == -1) && (errno == EINTR)) {
188                 /* retry */
189                 return;
190         }
191         if (state->received == 0) {
192                 tevent_req_error(req, EPIPE);
193                 return;
194         }
195         if (state->received == -1) {
196                 tevent_req_error(req, errno);
197                 return;
198         }
199         tevent_req_done(req);
200 }
201
202 ssize_t recvfrom_recv(struct tevent_req *req, int *perrno)
203 {
204         struct recvfrom_state *state =
205                 tevent_req_data(req, struct recvfrom_state);
206
207         if (tevent_req_is_unix_error(req, perrno)) {
208                 return -1;
209         }
210         return state->received;
211 }
212
213 struct async_connect_state {
214         int fd;
215         int result;
216         int sys_errno;
217         long old_sockflags;
218         socklen_t address_len;
219         struct sockaddr_storage address;
220
221         void (*before_connect)(void *private_data);
222         void (*after_connect)(void *private_data);
223         void *private_data;
224 };
225
226 static void async_connect_connected(struct tevent_context *ev,
227                                     struct tevent_fd *fde, uint16_t flags,
228                                     void *priv);
229
230 /**
231  * @brief async version of connect(2)
232  * @param[in] mem_ctx   The memory context to hang the result off
233  * @param[in] ev        The event context to work from
234  * @param[in] fd        The socket to recv from
235  * @param[in] address   Where to connect?
236  * @param[in] address_len Length of *address
237  * @retval The async request
238  *
239  * This function sets the socket into non-blocking state to be able to call
240  * connect in an async state. This will be reset when the request is finished.
241  */
242
243 struct tevent_req *async_connect_send(
244         TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
245         const struct sockaddr *address, socklen_t address_len,
246         void (*before_connect)(void *private_data),
247         void (*after_connect)(void *private_data),
248         void *private_data)
249 {
250         struct tevent_req *result;
251         struct async_connect_state *state;
252         struct tevent_fd *fde;
253
254         result = tevent_req_create(
255                 mem_ctx, &state, struct async_connect_state);
256         if (result == NULL) {
257                 return NULL;
258         }
259
260         /**
261          * We have to set the socket to nonblocking for async connect(2). Keep
262          * the old sockflags around.
263          */
264
265         state->fd = fd;
266         state->sys_errno = 0;
267         state->before_connect = before_connect;
268         state->after_connect = after_connect;
269         state->private_data = private_data;
270
271         state->old_sockflags = fcntl(fd, F_GETFL, 0);
272         if (state->old_sockflags == -1) {
273                 goto post_errno;
274         }
275
276         state->address_len = address_len;
277         if (address_len > sizeof(state->address)) {
278                 errno = EINVAL;
279                 goto post_errno;
280         }
281         memcpy(&state->address, address, address_len);
282
283         set_blocking(fd, false);
284
285         if (state->before_connect != NULL) {
286                 state->before_connect(state->private_data);
287         }
288
289         state->result = connect(fd, address, address_len);
290
291         if (state->after_connect != NULL) {
292                 state->after_connect(state->private_data);
293         }
294
295         if (state->result == 0) {
296                 tevent_req_done(result);
297                 goto done;
298         }
299
300         /**
301          * A number of error messages show that something good is progressing
302          * and that we have to wait for readability.
303          *
304          * If none of them are present, bail out.
305          */
306
307         if (!(errno == EINPROGRESS || errno == EALREADY ||
308 #ifdef EISCONN
309               errno == EISCONN ||
310 #endif
311               errno == EAGAIN || errno == EINTR)) {
312                 state->sys_errno = errno;
313                 goto post_errno;
314         }
315
316         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
317                            async_connect_connected, result);
318         if (fde == NULL) {
319                 state->sys_errno = ENOMEM;
320                 goto post_errno;
321         }
322         return result;
323
324  post_errno:
325         tevent_req_error(result, state->sys_errno);
326  done:
327         fcntl(fd, F_SETFL, state->old_sockflags);
328         return tevent_req_post(result, ev);
329 }
330
331 /**
332  * fde event handler for connect(2)
333  * @param[in] ev        The event context that sent us here
334  * @param[in] fde       The file descriptor event associated with the connect
335  * @param[in] flags     Indicate read/writeability of the socket
336  * @param[in] priv      private data, "struct async_req *" in this case
337  */
338
339 static void async_connect_connected(struct tevent_context *ev,
340                                     struct tevent_fd *fde, uint16_t flags,
341                                     void *priv)
342 {
343         struct tevent_req *req = talloc_get_type_abort(
344                 priv, struct tevent_req);
345         struct async_connect_state *state =
346                 tevent_req_data(req, struct async_connect_state);
347         int ret;
348
349         if (state->before_connect != NULL) {
350                 state->before_connect(state->private_data);
351         }
352
353         ret = connect(state->fd, (struct sockaddr *)(void *)&state->address,
354                       state->address_len);
355
356         if (state->after_connect != NULL) {
357                 state->after_connect(state->private_data);
358         }
359
360         if (ret == 0) {
361                 state->sys_errno = 0;
362                 TALLOC_FREE(fde);
363                 tevent_req_done(req);
364                 return;
365         }
366         if (errno == EINPROGRESS) {
367                 /* Try again later, leave the fde around */
368                 return;
369         }
370         state->sys_errno = errno;
371         TALLOC_FREE(fde);
372         tevent_req_error(req, errno);
373         return;
374 }
375
376 int async_connect_recv(struct tevent_req *req, int *perrno)
377 {
378         struct async_connect_state *state =
379                 tevent_req_data(req, struct async_connect_state);
380         int err;
381
382         fcntl(state->fd, F_SETFL, state->old_sockflags);
383
384         if (tevent_req_is_unix_error(req, &err)) {
385                 *perrno = err;
386                 return -1;
387         }
388
389         if (state->sys_errno == 0) {
390                 return 0;
391         }
392
393         *perrno = state->sys_errno;
394         return -1;
395 }
396
397 struct writev_state {
398         struct tevent_context *ev;
399         int fd;
400         struct iovec *iov;
401         int count;
402         size_t total_size;
403         uint16_t flags;
404         bool err_on_readability;
405 };
406
407 static void writev_trigger(struct tevent_req *req, void *private_data);
408 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
409                            uint16_t flags, void *private_data);
410
411 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
412                                struct tevent_queue *queue, int fd,
413                                bool err_on_readability,
414                                struct iovec *iov, int count)
415 {
416         struct tevent_req *req;
417         struct writev_state *state;
418
419         req = tevent_req_create(mem_ctx, &state, struct writev_state);
420         if (req == NULL) {
421                 return NULL;
422         }
423         state->ev = ev;
424         state->fd = fd;
425         state->total_size = 0;
426         state->count = count;
427         state->iov = (struct iovec *)talloc_memdup(
428                 state, iov, sizeof(struct iovec) * count);
429         if (state->iov == NULL) {
430                 goto fail;
431         }
432         state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
433         state->err_on_readability = err_on_readability;
434
435         if (queue == NULL) {
436                 struct tevent_fd *fde;
437                 fde = tevent_add_fd(state->ev, state, state->fd,
438                                     state->flags, writev_handler, req);
439                 if (tevent_req_nomem(fde, req)) {
440                         return tevent_req_post(req, ev);
441                 }
442                 return req;
443         }
444
445         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
446                 goto fail;
447         }
448         return req;
449  fail:
450         TALLOC_FREE(req);
451         return NULL;
452 }
453
454 static void writev_trigger(struct tevent_req *req, void *private_data)
455 {
456         struct writev_state *state = tevent_req_data(req, struct writev_state);
457         struct tevent_fd *fde;
458
459         fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
460                             writev_handler, req);
461         if (fde == NULL) {
462                 tevent_req_error(req, ENOMEM);
463         }
464 }
465
466 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
467                            uint16_t flags, void *private_data)
468 {
469         struct tevent_req *req = talloc_get_type_abort(
470                 private_data, struct tevent_req);
471         struct writev_state *state =
472                 tevent_req_data(req, struct writev_state);
473         size_t to_write, written;
474         int i;
475
476         to_write = 0;
477
478         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
479                 int ret, value;
480
481                 if (state->err_on_readability) {
482                         /* Readable and the caller wants an error on read. */
483                         tevent_req_error(req, EPIPE);
484                         return;
485                 }
486
487                 /* Might be an error. Check if there are bytes to read */
488                 ret = ioctl(state->fd, FIONREAD, &value);
489                 /* FIXME - should we also check
490                    for ret == 0 and value == 0 here ? */
491                 if (ret == -1) {
492                         /* There's an error. */
493                         tevent_req_error(req, EPIPE);
494                         return;
495                 }
496                 /* A request for TEVENT_FD_READ will succeed from now and
497                    forevermore until the bytes are read so if there was
498                    an error we'll wait until we do read, then get it in
499                    the read callback function. Until then, remove TEVENT_FD_READ
500                    from the flags we're waiting for. */
501                 state->flags &= ~TEVENT_FD_READ;
502                 TEVENT_FD_NOT_READABLE(fde);
503
504                 /* If not writable, we're done. */
505                 if (!(flags & TEVENT_FD_WRITE)) {
506                         return;
507                 }
508         }
509
510         for (i=0; i<state->count; i++) {
511                 to_write += state->iov[i].iov_len;
512         }
513
514         written = writev(state->fd, state->iov, state->count);
515         if ((written == -1) && (errno == EINTR)) {
516                 /* retry */
517                 return;
518         }
519         if (written == -1) {
520                 tevent_req_error(req, errno);
521                 return;
522         }
523         if (written == 0) {
524                 tevent_req_error(req, EPIPE);
525                 return;
526         }
527         state->total_size += written;
528
529         if (written == to_write) {
530                 tevent_req_done(req);
531                 return;
532         }
533
534         /*
535          * We've written less than we were asked to, drop stuff from
536          * state->iov.
537          */
538
539         while (written > 0) {
540                 if (written < state->iov[0].iov_len) {
541                         state->iov[0].iov_base =
542                                 (char *)state->iov[0].iov_base + written;
543                         state->iov[0].iov_len -= written;
544                         break;
545                 }
546                 written -= state->iov[0].iov_len;
547                 state->iov += 1;
548                 state->count -= 1;
549         }
550 }
551
552 ssize_t writev_recv(struct tevent_req *req, int *perrno)
553 {
554         struct writev_state *state =
555                 tevent_req_data(req, struct writev_state);
556
557         if (tevent_req_is_unix_error(req, perrno)) {
558                 return -1;
559         }
560         return state->total_size;
561 }
562
563 struct read_packet_state {
564         int fd;
565         uint8_t *buf;
566         size_t nread;
567         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
568         void *private_data;
569 };
570
571 static void read_packet_handler(struct tevent_context *ev,
572                                 struct tevent_fd *fde,
573                                 uint16_t flags, void *private_data);
574
575 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
576                                     struct tevent_context *ev,
577                                     int fd, size_t initial,
578                                     ssize_t (*more)(uint8_t *buf,
579                                                     size_t buflen,
580                                                     void *private_data),
581                                     void *private_data)
582 {
583         struct tevent_req *result;
584         struct read_packet_state *state;
585         struct tevent_fd *fde;
586
587         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
588         if (result == NULL) {
589                 return NULL;
590         }
591         state->fd = fd;
592         state->nread = 0;
593         state->more = more;
594         state->private_data = private_data;
595
596         state->buf = talloc_array(state, uint8_t, initial);
597         if (state->buf == NULL) {
598                 goto fail;
599         }
600
601         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
602                             result);
603         if (fde == NULL) {
604                 goto fail;
605         }
606         return result;
607  fail:
608         TALLOC_FREE(result);
609         return NULL;
610 }
611
612 static void read_packet_handler(struct tevent_context *ev,
613                                 struct tevent_fd *fde,
614                                 uint16_t flags, void *private_data)
615 {
616         struct tevent_req *req = talloc_get_type_abort(
617                 private_data, struct tevent_req);
618         struct read_packet_state *state =
619                 tevent_req_data(req, struct read_packet_state);
620         size_t total = talloc_get_size(state->buf);
621         ssize_t nread, more;
622         uint8_t *tmp;
623
624         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
625                      0);
626         if ((nread == -1) && (errno == ENOTSOCK)) {
627                 nread = read(state->fd, state->buf+state->nread,
628                              total-state->nread);
629         }
630         if ((nread == -1) && (errno == EINTR)) {
631                 /* retry */
632                 return;
633         }
634         if (nread == -1) {
635                 tevent_req_error(req, errno);
636                 return;
637         }
638         if (nread == 0) {
639                 tevent_req_error(req, EPIPE);
640                 return;
641         }
642
643         state->nread += nread;
644         if (state->nread < total) {
645                 /* Come back later */
646                 return;
647         }
648
649         /*
650          * We got what was initially requested. See if "more" asks for -- more.
651          */
652         if (state->more == NULL) {
653                 /* Nobody to ask, this is a async read_data */
654                 tevent_req_done(req);
655                 return;
656         }
657
658         more = state->more(state->buf, total, state->private_data);
659         if (more == -1) {
660                 /* We got an invalid packet, tell the caller */
661                 tevent_req_error(req, EIO);
662                 return;
663         }
664         if (more == 0) {
665                 /* We're done, full packet received */
666                 tevent_req_done(req);
667                 return;
668         }
669
670         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
671         if (tevent_req_nomem(tmp, req)) {
672                 return;
673         }
674         state->buf = tmp;
675 }
676
677 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
678                          uint8_t **pbuf, int *perrno)
679 {
680         struct read_packet_state *state =
681                 tevent_req_data(req, struct read_packet_state);
682
683         if (tevent_req_is_unix_error(req, perrno)) {
684                 return -1;
685         }
686         *pbuf = talloc_move(mem_ctx, &state->buf);
687         return talloc_get_size(*pbuf);
688 }
689
690 struct wait_for_read_state {
691         struct tevent_req *req;
692         struct tevent_fd *fde;
693 };
694
695 static void wait_for_read_done(struct tevent_context *ev,
696                                struct tevent_fd *fde,
697                                uint16_t flags,
698                                void *private_data);
699
700 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
701                                       struct tevent_context *ev,
702                                       int fd)
703 {
704         struct tevent_req *req;
705         struct wait_for_read_state *state;
706
707         req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
708         if (req == NULL) {
709                 return NULL;
710         }
711         state->req = req;
712         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
713                                    wait_for_read_done, state);
714         if (tevent_req_nomem(state->fde, req)) {
715                 return tevent_req_post(req, ev);
716         }
717         return req;
718 }
719
720 static void wait_for_read_done(struct tevent_context *ev,
721                                struct tevent_fd *fde,
722                                uint16_t flags,
723                                void *private_data)
724 {
725         struct wait_for_read_state *state = talloc_get_type_abort(
726                 private_data, struct wait_for_read_state);
727
728         if (flags & TEVENT_FD_READ) {
729                 TALLOC_FREE(state->fde);
730                 tevent_req_done(state->req);
731         }
732 }
733
734 bool wait_for_read_recv(struct tevent_req *req, int *perr)
735 {
736         int err;
737
738         if (tevent_req_is_unix_error(req, &err)) {
739                 *perr = err;
740                 return false;
741         }
742         return true;
743 }