lib/async_req/async_sock.c set socket close on exec
[kai/samba-autobuild/.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30 #include "lib/util/iov_buf.h"
31
32 /* Note: lib/util/ is currently GPL */
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/samba_util.h"
35
36 struct async_connect_state {
37         int fd;
38         struct tevent_fd *fde;
39         int result;
40         long old_sockflags;
41         socklen_t address_len;
42         struct sockaddr_storage address;
43
44         void (*before_connect)(void *private_data);
45         void (*after_connect)(void *private_data);
46         void *private_data;
47 };
48
49 static void async_connect_cleanup(struct tevent_req *req,
50                                   enum tevent_req_state req_state);
51 static void async_connect_connected(struct tevent_context *ev,
52                                     struct tevent_fd *fde, uint16_t flags,
53                                     void *priv);
54
55 /**
56  * @brief async version of connect(2)
57  * @param[in] mem_ctx   The memory context to hang the result off
58  * @param[in] ev        The event context to work from
59  * @param[in] fd        The socket to recv from
60  * @param[in] address   Where to connect?
61  * @param[in] address_len Length of *address
62  * @retval The async request
63  *
64  * This function sets the socket into non-blocking state to be able to call
65  * connect in an async state. This will be reset when the request is finished.
66  */
67
68 struct tevent_req *async_connect_send(
69         TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
70         const struct sockaddr *address, socklen_t address_len,
71         void (*before_connect)(void *private_data),
72         void (*after_connect)(void *private_data),
73         void *private_data)
74 {
75         struct tevent_req *req;
76         struct async_connect_state *state;
77         int ret;
78
79         req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
80         if (req == NULL) {
81                 return NULL;
82         }
83
84         /**
85          * We have to set the socket to nonblocking for async connect(2). Keep
86          * the old sockflags around.
87          */
88
89         state->fd = fd;
90         state->before_connect = before_connect;
91         state->after_connect = after_connect;
92         state->private_data = private_data;
93
94         state->old_sockflags = fcntl(fd, F_GETFL, 0);
95         if (state->old_sockflags == -1) {
96                 tevent_req_error(req, errno);
97                 return tevent_req_post(req, ev);
98         }
99
100         tevent_req_set_cleanup_fn(req, async_connect_cleanup);
101
102         state->address_len = address_len;
103         if (address_len > sizeof(state->address)) {
104                 tevent_req_error(req, EINVAL);
105                 return tevent_req_post(req, ev);
106         }
107         memcpy(&state->address, address, address_len);
108
109         ret = set_blocking(fd, false);
110         if (ret == -1) {
111                 tevent_req_error(req, errno);
112                 return tevent_req_post(req, ev);
113         }
114
115         if (state->before_connect != NULL) {
116                 state->before_connect(state->private_data);
117         }
118
119         state->result = connect(fd, address, address_len);
120
121         if (state->after_connect != NULL) {
122                 state->after_connect(state->private_data);
123         }
124
125         if (state->result == 0) {
126                 tevent_req_done(req);
127                 return tevent_req_post(req, ev);
128         }
129
130         /*
131          * The only errno indicating that an initial connect is still
132          * in flight is EINPROGRESS.
133          *
134          * We get EALREADY when someone calls us a second time for a
135          * given fd and the connect is still in flight (and returned
136          * EINPROGRESS the first time).
137          *
138          * This allows callers like open_socket_out_send() to reuse
139          * fds and call us with an fd for which the connect is still
140          * in flight. The proper thing to do for callers would be
141          * closing the fd and starting from scratch with a fresh
142          * socket.
143          */
144
145         if (errno != EINPROGRESS && errno != EALREADY) {
146                 tevent_req_error(req, errno);
147                 return tevent_req_post(req, ev);
148         }
149
150         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE,
151                                    async_connect_connected, req);
152         if (state->fde == NULL) {
153                 tevent_req_error(req, ENOMEM);
154                 return tevent_req_post(req, ev);
155         }
156         return req;
157 }
158
159 static void async_connect_cleanup(struct tevent_req *req,
160                                   enum tevent_req_state req_state)
161 {
162         struct async_connect_state *state =
163                 tevent_req_data(req, struct async_connect_state);
164
165         TALLOC_FREE(state->fde);
166         if (state->fd != -1) {
167                 int ret;
168
169                 ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
170                 if (ret == -1) {
171                         abort();
172                 }
173
174                 state->fd = -1;
175         }
176 }
177
178 /**
179  * fde event handler for connect(2)
180  * @param[in] ev        The event context that sent us here
181  * @param[in] fde       The file descriptor event associated with the connect
182  * @param[in] flags     Indicate read/writeability of the socket
183  * @param[in] priv      private data, "struct async_req *" in this case
184  */
185
186 static void async_connect_connected(struct tevent_context *ev,
187                                     struct tevent_fd *fde, uint16_t flags,
188                                     void *priv)
189 {
190         struct tevent_req *req = talloc_get_type_abort(
191                 priv, struct tevent_req);
192         struct async_connect_state *state =
193                 tevent_req_data(req, struct async_connect_state);
194         int ret;
195         int socket_error = 0;
196         socklen_t slen = sizeof(socket_error);
197
198         ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
199                          &socket_error, &slen);
200
201         if (ret != 0) {
202                 /*
203                  * According to Stevens this is the Solaris behaviour
204                  * in case the connection encountered an error:
205                  * getsockopt() fails, error is in errno
206                  */
207                 tevent_req_error(req, errno);
208                 return;
209         }
210
211         if (socket_error != 0) {
212                 /*
213                  * Berkeley derived implementations (including) Linux
214                  * return the pending error via socket_error.
215                  */
216                 tevent_req_error(req, socket_error);
217                 return;
218         }
219
220         tevent_req_done(req);
221         return;
222 }
223
224 int async_connect_recv(struct tevent_req *req, int *perrno)
225 {
226         int err = tevent_req_simple_recv_unix(req);
227
228         if (err != 0) {
229                 *perrno = err;
230                 return -1;
231         }
232
233         return 0;
234 }
235
236 struct writev_state {
237         struct tevent_context *ev;
238         struct tevent_queue_entry *queue_entry;
239         int fd;
240         struct tevent_fd *fde;
241         struct iovec *iov;
242         int count;
243         size_t total_size;
244         uint16_t flags;
245         bool err_on_readability;
246 };
247
248 static void writev_cleanup(struct tevent_req *req,
249                            enum tevent_req_state req_state);
250 static bool writev_cancel(struct tevent_req *req);
251 static void writev_trigger(struct tevent_req *req, void *private_data);
252 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
253                            uint16_t flags, void *private_data);
254
255 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
256                                struct tevent_queue *queue, int fd,
257                                bool err_on_readability,
258                                struct iovec *iov, int count)
259 {
260         struct tevent_req *req;
261         struct writev_state *state;
262
263         req = tevent_req_create(mem_ctx, &state, struct writev_state);
264         if (req == NULL) {
265                 return NULL;
266         }
267         state->ev = ev;
268         state->fd = fd;
269         state->total_size = 0;
270         state->count = count;
271         state->iov = (struct iovec *)talloc_memdup(
272                 state, iov, sizeof(struct iovec) * count);
273         if (tevent_req_nomem(state->iov, req)) {
274                 return tevent_req_post(req, ev);
275         }
276         state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
277         state->err_on_readability = err_on_readability;
278
279         tevent_req_set_cleanup_fn(req, writev_cleanup);
280         tevent_req_set_cancel_fn(req, writev_cancel);
281
282         if (queue == NULL) {
283                 state->fde = tevent_add_fd(state->ev, state, state->fd,
284                                     state->flags, writev_handler, req);
285                 if (tevent_req_nomem(state->fde, req)) {
286                         return tevent_req_post(req, ev);
287                 }
288                 return req;
289         }
290
291         state->queue_entry = tevent_queue_add_entry(queue, ev, req,
292                                                     writev_trigger, NULL);
293         if (tevent_req_nomem(state->queue_entry, req)) {
294                 return tevent_req_post(req, ev);
295         }
296         return req;
297 }
298
299 static void writev_cleanup(struct tevent_req *req,
300                            enum tevent_req_state req_state)
301 {
302         struct writev_state *state = tevent_req_data(req, struct writev_state);
303
304         TALLOC_FREE(state->queue_entry);
305         TALLOC_FREE(state->fde);
306 }
307
308 static bool writev_cancel(struct tevent_req *req)
309 {
310         struct writev_state *state = tevent_req_data(req, struct writev_state);
311
312         TALLOC_FREE(state->queue_entry);
313         TALLOC_FREE(state->fde);
314
315         if (state->count == 0) {
316                 /*
317                  * already completed.
318                  */
319                 return false;
320         }
321
322         tevent_req_defer_callback(req, state->ev);
323         if (state->total_size > 0) {
324                 /*
325                  * We've already started to write :-(
326                  */
327                 tevent_req_error(req, EIO);
328                 return false;
329         }
330
331         tevent_req_error(req, ECANCELED);
332         return true;
333 }
334
335 static void writev_trigger(struct tevent_req *req, void *private_data)
336 {
337         struct writev_state *state = tevent_req_data(req, struct writev_state);
338
339         state->queue_entry = NULL;
340
341         state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
342                             writev_handler, req);
343         if (tevent_req_nomem(state->fde, req)) {
344                 return;
345         }
346 }
347
348 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
349                            uint16_t flags, void *private_data)
350 {
351         struct tevent_req *req = talloc_get_type_abort(
352                 private_data, struct tevent_req);
353         struct writev_state *state =
354                 tevent_req_data(req, struct writev_state);
355         ssize_t written;
356         bool ok;
357
358         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
359                 int ret, value;
360
361                 if (state->err_on_readability) {
362                         /* Readable and the caller wants an error on read. */
363                         tevent_req_error(req, EPIPE);
364                         return;
365                 }
366
367                 /* Might be an error. Check if there are bytes to read */
368                 ret = ioctl(state->fd, FIONREAD, &value);
369                 /* FIXME - should we also check
370                    for ret == 0 and value == 0 here ? */
371                 if (ret == -1) {
372                         /* There's an error. */
373                         tevent_req_error(req, EPIPE);
374                         return;
375                 }
376                 /* A request for TEVENT_FD_READ will succeed from now and
377                    forevermore until the bytes are read so if there was
378                    an error we'll wait until we do read, then get it in
379                    the read callback function. Until then, remove TEVENT_FD_READ
380                    from the flags we're waiting for. */
381                 state->flags &= ~TEVENT_FD_READ;
382                 TEVENT_FD_NOT_READABLE(fde);
383
384                 /* If not writable, we're done. */
385                 if (!(flags & TEVENT_FD_WRITE)) {
386                         return;
387                 }
388         }
389
390         written = writev(state->fd, state->iov, state->count);
391         if ((written == -1) && (errno == EINTR)) {
392                 /* retry */
393                 return;
394         }
395         if (written == -1) {
396                 tevent_req_error(req, errno);
397                 return;
398         }
399         if (written == 0) {
400                 tevent_req_error(req, EPIPE);
401                 return;
402         }
403         state->total_size += written;
404
405         ok = iov_advance(&state->iov, &state->count, written);
406         if (!ok) {
407                 tevent_req_error(req, EIO);
408                 return;
409         }
410
411         if (state->count == 0) {
412                 tevent_req_done(req);
413                 return;
414         }
415 }
416
417 ssize_t writev_recv(struct tevent_req *req, int *perrno)
418 {
419         struct writev_state *state =
420                 tevent_req_data(req, struct writev_state);
421         ssize_t ret;
422
423         if (tevent_req_is_unix_error(req, perrno)) {
424                 tevent_req_received(req);
425                 return -1;
426         }
427         ret = state->total_size;
428         tevent_req_received(req);
429         return ret;
430 }
431
432 struct read_packet_state {
433         int fd;
434         struct tevent_fd *fde;
435         uint8_t *buf;
436         size_t nread;
437         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
438         void *private_data;
439 };
440
441 static void read_packet_cleanup(struct tevent_req *req,
442                                  enum tevent_req_state req_state);
443 static void read_packet_handler(struct tevent_context *ev,
444                                 struct tevent_fd *fde,
445                                 uint16_t flags, void *private_data);
446
447 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
448                                     struct tevent_context *ev,
449                                     int fd, size_t initial,
450                                     ssize_t (*more)(uint8_t *buf,
451                                                     size_t buflen,
452                                                     void *private_data),
453                                     void *private_data)
454 {
455         struct tevent_req *req;
456         struct read_packet_state *state;
457
458         req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
459         if (req == NULL) {
460                 return NULL;
461         }
462         state->fd = fd;
463         state->nread = 0;
464         state->more = more;
465         state->private_data = private_data;
466
467         tevent_req_set_cleanup_fn(req, read_packet_cleanup);
468
469         state->buf = talloc_array(state, uint8_t, initial);
470         if (tevent_req_nomem(state->buf, req)) {
471                 return tevent_req_post(req, ev);
472         }
473
474         state->fde = tevent_add_fd(ev, state, fd,
475                                    TEVENT_FD_READ, read_packet_handler,
476                                    req);
477         if (tevent_req_nomem(state->fde, req)) {
478                 return tevent_req_post(req, ev);
479         }
480         return req;
481 }
482
483 static void read_packet_cleanup(struct tevent_req *req,
484                            enum tevent_req_state req_state)
485 {
486         struct read_packet_state *state =
487                 tevent_req_data(req, struct read_packet_state);
488
489         TALLOC_FREE(state->fde);
490 }
491
492 static void read_packet_handler(struct tevent_context *ev,
493                                 struct tevent_fd *fde,
494                                 uint16_t flags, void *private_data)
495 {
496         struct tevent_req *req = talloc_get_type_abort(
497                 private_data, struct tevent_req);
498         struct read_packet_state *state =
499                 tevent_req_data(req, struct read_packet_state);
500         size_t total = talloc_get_size(state->buf);
501         ssize_t nread, more;
502         uint8_t *tmp;
503
504         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
505                      0);
506         if ((nread == -1) && (errno == ENOTSOCK)) {
507                 nread = read(state->fd, state->buf+state->nread,
508                              total-state->nread);
509         }
510         if ((nread == -1) && (errno == EINTR)) {
511                 /* retry */
512                 return;
513         }
514         if (nread == -1) {
515                 tevent_req_error(req, errno);
516                 return;
517         }
518         if (nread == 0) {
519                 tevent_req_error(req, EPIPE);
520                 return;
521         }
522
523         state->nread += nread;
524         if (state->nread < total) {
525                 /* Come back later */
526                 return;
527         }
528
529         /*
530          * We got what was initially requested. See if "more" asks for -- more.
531          */
532         if (state->more == NULL) {
533                 /* Nobody to ask, this is a async read_data */
534                 tevent_req_done(req);
535                 return;
536         }
537
538         more = state->more(state->buf, total, state->private_data);
539         if (more == -1) {
540                 /* We got an invalid packet, tell the caller */
541                 tevent_req_error(req, EIO);
542                 return;
543         }
544         if (more == 0) {
545                 /* We're done, full packet received */
546                 tevent_req_done(req);
547                 return;
548         }
549
550         if (total + more < total) {
551                 tevent_req_error(req, EMSGSIZE);
552                 return;
553         }
554
555         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
556         if (tevent_req_nomem(tmp, req)) {
557                 return;
558         }
559         state->buf = tmp;
560 }
561
562 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
563                          uint8_t **pbuf, int *perrno)
564 {
565         struct read_packet_state *state =
566                 tevent_req_data(req, struct read_packet_state);
567
568         if (tevent_req_is_unix_error(req, perrno)) {
569                 tevent_req_received(req);
570                 return -1;
571         }
572         *pbuf = talloc_move(mem_ctx, &state->buf);
573         tevent_req_received(req);
574         return talloc_get_size(*pbuf);
575 }
576
577 struct wait_for_read_state {
578         struct tevent_fd *fde;
579         int fd;
580         bool check_errors;
581 };
582
583 static void wait_for_read_cleanup(struct tevent_req *req,
584                                   enum tevent_req_state req_state);
585 static void wait_for_read_done(struct tevent_context *ev,
586                                struct tevent_fd *fde,
587                                uint16_t flags,
588                                void *private_data);
589
590 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
591                                       struct tevent_context *ev, int fd,
592                                       bool check_errors)
593 {
594         struct tevent_req *req;
595         struct wait_for_read_state *state;
596
597         req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
598         if (req == NULL) {
599                 return NULL;
600         }
601
602         tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
603
604         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
605                                    wait_for_read_done, req);
606         if (tevent_req_nomem(state->fde, req)) {
607                 return tevent_req_post(req, ev);
608         }
609
610         state->fd = fd;
611         state->check_errors = check_errors;
612         return req;
613 }
614
615 static void wait_for_read_cleanup(struct tevent_req *req,
616                                   enum tevent_req_state req_state)
617 {
618         struct wait_for_read_state *state =
619                 tevent_req_data(req, struct wait_for_read_state);
620
621         TALLOC_FREE(state->fde);
622 }
623
624 static void wait_for_read_done(struct tevent_context *ev,
625                                struct tevent_fd *fde,
626                                uint16_t flags,
627                                void *private_data)
628 {
629         struct tevent_req *req = talloc_get_type_abort(
630                 private_data, struct tevent_req);
631         struct wait_for_read_state *state =
632             tevent_req_data(req, struct wait_for_read_state);
633         ssize_t nread;
634         char c;
635
636         if ((flags & TEVENT_FD_READ) == 0) {
637                 return;
638         }
639
640         if (!state->check_errors) {
641                 tevent_req_done(req);
642                 return;
643         }
644
645         nread = recv(state->fd, &c, 1, MSG_PEEK);
646
647         if (nread == 0) {
648                 tevent_req_error(req, EPIPE);
649                 return;
650         }
651
652         if ((nread == -1) && (errno == EINTR)) {
653                 /* come back later */
654                 return;
655         }
656
657         if ((nread == -1) && (errno == ENOTSOCK)) {
658                 /* Ignore this specific error on pipes */
659                 tevent_req_done(req);
660                 return;
661         }
662
663         if (nread == -1) {
664                 tevent_req_error(req, errno);
665                 return;
666         }
667
668         tevent_req_done(req);
669 }
670
671 bool wait_for_read_recv(struct tevent_req *req, int *perr)
672 {
673         int err = tevent_req_simple_recv_unix(req);
674
675         if (err != 0) {
676                 *perr = err;
677                 return false;
678         }
679
680         return true;
681 }
682
683 struct accept_state {
684         struct tevent_fd *fde;
685         int listen_sock;
686         socklen_t addrlen;
687         struct sockaddr_storage addr;
688         int sock;
689 };
690
691 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
692                            uint16_t flags, void *private_data);
693
694 struct tevent_req *accept_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
695                                int listen_sock)
696 {
697         struct tevent_req *req;
698         struct accept_state *state;
699
700         req = tevent_req_create(mem_ctx, &state, struct accept_state);
701         if (req == NULL) {
702                 return NULL;
703         }
704
705         state->listen_sock = listen_sock;
706
707         state->fde = tevent_add_fd(ev, state, listen_sock, TEVENT_FD_READ,
708                                    accept_handler, req);
709         if (tevent_req_nomem(state->fde, req)) {
710                 return tevent_req_post(req, ev);
711         }
712         return req;
713 }
714
715 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
716                            uint16_t flags, void *private_data)
717 {
718         struct tevent_req *req = talloc_get_type_abort(
719                 private_data, struct tevent_req);
720         struct accept_state *state = tevent_req_data(req, struct accept_state);
721         int ret;
722
723         TALLOC_FREE(state->fde);
724
725         if ((flags & TEVENT_FD_READ) == 0) {
726                 tevent_req_error(req, EIO);
727                 return;
728         }
729         state->addrlen = sizeof(state->addr);
730
731         ret = accept(state->listen_sock, (struct sockaddr *)&state->addr,
732                      &state->addrlen);
733         if ((ret == -1) && (errno == EINTR)) {
734                 /* retry */
735                 return;
736         }
737         if (ret == -1) {
738                 tevent_req_error(req, errno);
739                 return;
740         }
741         smb_set_close_on_exec(ret);
742         state->sock = ret;
743         tevent_req_done(req);
744 }
745
746 int accept_recv(struct tevent_req *req, struct sockaddr_storage *paddr,
747                 socklen_t *paddrlen, int *perr)
748 {
749         struct accept_state *state = tevent_req_data(req, struct accept_state);
750         int err;
751
752         if (tevent_req_is_unix_error(req, &err)) {
753                 if (perr != NULL) {
754                         *perr = err;
755                 }
756                 return -1;
757         }
758         if (paddr != NULL) {
759                 memcpy(paddr, &state->addr, state->addrlen);
760         }
761         if (paddrlen != NULL) {
762                 *paddrlen = state->addrlen;
763         }
764         return state->sock;
765 }