d0cb06b0638258030e2e3791acb79e263467fd5b
[samba.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30 #include "lib/util/iov_buf.h"
31
32 /* Note: lib/util/ is currently GPL */
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/samba_util.h"
35
36 struct async_connect_state {
37         int fd;
38         struct tevent_fd *fde;
39         int result;
40         long old_sockflags;
41         socklen_t address_len;
42         struct sockaddr_storage address;
43
44         void (*before_connect)(void *private_data);
45         void (*after_connect)(void *private_data);
46         void *private_data;
47 };
48
49 static void async_connect_cleanup(struct tevent_req *req,
50                                   enum tevent_req_state req_state);
51 static void async_connect_connected(struct tevent_context *ev,
52                                     struct tevent_fd *fde, uint16_t flags,
53                                     void *priv);
54
55 /**
56  * @brief async version of connect(2)
57  * @param[in] mem_ctx   The memory context to hang the result off
58  * @param[in] ev        The event context to work from
59  * @param[in] fd        The socket to recv from
60  * @param[in] address   Where to connect?
61  * @param[in] address_len Length of *address
62  * @retval The async request
63  *
64  * This function sets the socket into non-blocking state to be able to call
65  * connect in an async state. This will be reset when the request is finished.
66  */
67
68 struct tevent_req *async_connect_send(
69         TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
70         const struct sockaddr *address, socklen_t address_len,
71         void (*before_connect)(void *private_data),
72         void (*after_connect)(void *private_data),
73         void *private_data)
74 {
75         struct tevent_req *req;
76         struct async_connect_state *state;
77         int ret;
78
79         req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
80         if (req == NULL) {
81                 return NULL;
82         }
83
84         /**
85          * We have to set the socket to nonblocking for async connect(2). Keep
86          * the old sockflags around.
87          */
88
89         state->fd = fd;
90         state->before_connect = before_connect;
91         state->after_connect = after_connect;
92         state->private_data = private_data;
93
94         state->old_sockflags = fcntl(fd, F_GETFL, 0);
95         if (state->old_sockflags == -1) {
96                 tevent_req_error(req, errno);
97                 return tevent_req_post(req, ev);
98         }
99
100         tevent_req_set_cleanup_fn(req, async_connect_cleanup);
101
102         state->address_len = address_len;
103         if (address_len > sizeof(state->address)) {
104                 tevent_req_error(req, EINVAL);
105                 return tevent_req_post(req, ev);
106         }
107         memcpy(&state->address, address, address_len);
108
109         ret = set_blocking(fd, false);
110         if (ret == -1) {
111                 tevent_req_error(req, errno);
112                 return tevent_req_post(req, ev);
113         }
114
115         if (state->before_connect != NULL) {
116                 state->before_connect(state->private_data);
117         }
118
119         state->result = connect(fd, address, address_len);
120
121         if (state->after_connect != NULL) {
122                 state->after_connect(state->private_data);
123         }
124
125         if (state->result == 0) {
126                 tevent_req_done(req);
127                 return tevent_req_post(req, ev);
128         }
129
130         /*
131          * The only errno indicating that an initial connect is still
132          * in flight is EINPROGRESS.
133          *
134          * We get EALREADY when someone calls us a second time for a
135          * given fd and the connect is still in flight (and returned
136          * EINPROGRESS the first time).
137          *
138          * This allows callers like open_socket_out_send() to reuse
139          * fds and call us with an fd for which the connect is still
140          * in flight. The proper thing to do for callers would be
141          * closing the fd and starting from scratch with a fresh
142          * socket.
143          */
144
145         if (errno != EINPROGRESS && errno != EALREADY) {
146                 tevent_req_error(req, errno);
147                 return tevent_req_post(req, ev);
148         }
149
150         /*
151          * Note for historic reasons TEVENT_FD_WRITE is not enough
152          * to get notified for POLLERR or EPOLLHUP even if they
153          * come together with POLLOUT. That means we need to
154          * use TEVENT_FD_READ in addition until we have
155          * TEVENT_FD_ERROR.
156          */
157         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ|TEVENT_FD_WRITE,
158                                    async_connect_connected, req);
159         if (state->fde == NULL) {
160                 tevent_req_error(req, ENOMEM);
161                 return tevent_req_post(req, ev);
162         }
163         return req;
164 }
165
166 static void async_connect_cleanup(struct tevent_req *req,
167                                   enum tevent_req_state req_state)
168 {
169         struct async_connect_state *state =
170                 tevent_req_data(req, struct async_connect_state);
171
172         TALLOC_FREE(state->fde);
173         if (state->fd != -1) {
174                 int ret;
175
176                 ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
177                 if (ret == -1) {
178                         abort();
179                 }
180
181                 state->fd = -1;
182         }
183 }
184
185 /**
186  * fde event handler for connect(2)
187  * @param[in] ev        The event context that sent us here
188  * @param[in] fde       The file descriptor event associated with the connect
189  * @param[in] flags     Indicate read/writeability of the socket
190  * @param[in] priv      private data, "struct async_req *" in this case
191  */
192
193 static void async_connect_connected(struct tevent_context *ev,
194                                     struct tevent_fd *fde, uint16_t flags,
195                                     void *priv)
196 {
197         struct tevent_req *req = talloc_get_type_abort(
198                 priv, struct tevent_req);
199         struct async_connect_state *state =
200                 tevent_req_data(req, struct async_connect_state);
201         int ret;
202         int socket_error = 0;
203         socklen_t slen = sizeof(socket_error);
204
205         ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
206                          &socket_error, &slen);
207
208         if (ret != 0) {
209                 /*
210                  * According to Stevens this is the Solaris behaviour
211                  * in case the connection encountered an error:
212                  * getsockopt() fails, error is in errno
213                  */
214                 tevent_req_error(req, errno);
215                 return;
216         }
217
218         if (socket_error != 0) {
219                 /*
220                  * Berkeley derived implementations (including) Linux
221                  * return the pending error via socket_error.
222                  */
223                 tevent_req_error(req, socket_error);
224                 return;
225         }
226
227         tevent_req_done(req);
228         return;
229 }
230
231 int async_connect_recv(struct tevent_req *req, int *perrno)
232 {
233         int err = tevent_req_simple_recv_unix(req);
234
235         if (err != 0) {
236                 *perrno = err;
237                 return -1;
238         }
239
240         return 0;
241 }
242
243 struct writev_state {
244         struct tevent_context *ev;
245         struct tevent_queue_entry *queue_entry;
246         int fd;
247         struct tevent_fd *fde;
248         struct iovec *iov;
249         int count;
250         size_t total_size;
251         uint16_t flags;
252         bool err_on_readability;
253 };
254
255 static void writev_cleanup(struct tevent_req *req,
256                            enum tevent_req_state req_state);
257 static bool writev_cancel(struct tevent_req *req);
258 static void writev_trigger(struct tevent_req *req, void *private_data);
259 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
260                            uint16_t flags, void *private_data);
261
262 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
263                                struct tevent_queue *queue, int fd,
264                                bool err_on_readability,
265                                struct iovec *iov, int count)
266 {
267         struct tevent_req *req;
268         struct writev_state *state;
269
270         req = tevent_req_create(mem_ctx, &state, struct writev_state);
271         if (req == NULL) {
272                 return NULL;
273         }
274         state->ev = ev;
275         state->fd = fd;
276         state->total_size = 0;
277         state->count = count;
278         state->iov = (struct iovec *)talloc_memdup(
279                 state, iov, sizeof(struct iovec) * count);
280         if (tevent_req_nomem(state->iov, req)) {
281                 return tevent_req_post(req, ev);
282         }
283         state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
284         state->err_on_readability = err_on_readability;
285
286         tevent_req_set_cleanup_fn(req, writev_cleanup);
287         tevent_req_set_cancel_fn(req, writev_cancel);
288
289         if (queue == NULL) {
290                 state->fde = tevent_add_fd(state->ev, state, state->fd,
291                                     state->flags, writev_handler, req);
292                 if (tevent_req_nomem(state->fde, req)) {
293                         return tevent_req_post(req, ev);
294                 }
295                 return req;
296         }
297
298         state->queue_entry = tevent_queue_add_entry(queue, ev, req,
299                                                     writev_trigger, NULL);
300         if (tevent_req_nomem(state->queue_entry, req)) {
301                 return tevent_req_post(req, ev);
302         }
303         return req;
304 }
305
306 static void writev_cleanup(struct tevent_req *req,
307                            enum tevent_req_state req_state)
308 {
309         struct writev_state *state = tevent_req_data(req, struct writev_state);
310
311         TALLOC_FREE(state->queue_entry);
312         TALLOC_FREE(state->fde);
313 }
314
315 static bool writev_cancel(struct tevent_req *req)
316 {
317         struct writev_state *state = tevent_req_data(req, struct writev_state);
318
319         TALLOC_FREE(state->queue_entry);
320         TALLOC_FREE(state->fde);
321
322         if (state->count == 0) {
323                 /*
324                  * already completed.
325                  */
326                 return false;
327         }
328
329         tevent_req_defer_callback(req, state->ev);
330         if (state->total_size > 0) {
331                 /*
332                  * We've already started to write :-(
333                  */
334                 tevent_req_error(req, EIO);
335                 return false;
336         }
337
338         tevent_req_error(req, ECANCELED);
339         return true;
340 }
341
342 static void writev_trigger(struct tevent_req *req, void *private_data)
343 {
344         struct writev_state *state = tevent_req_data(req, struct writev_state);
345
346         state->queue_entry = NULL;
347
348         state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
349                             writev_handler, req);
350         if (tevent_req_nomem(state->fde, req)) {
351                 return;
352         }
353 }
354
355 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
356                            uint16_t flags, void *private_data)
357 {
358         struct tevent_req *req = talloc_get_type_abort(
359                 private_data, struct tevent_req);
360         struct writev_state *state =
361                 tevent_req_data(req, struct writev_state);
362         ssize_t written;
363         bool ok;
364
365         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
366                 int ret, value;
367
368                 if (state->err_on_readability) {
369                         /* Readable and the caller wants an error on read. */
370                         tevent_req_error(req, EPIPE);
371                         return;
372                 }
373
374                 /* Might be an error. Check if there are bytes to read */
375                 ret = ioctl(state->fd, FIONREAD, &value);
376                 /* FIXME - should we also check
377                    for ret == 0 and value == 0 here ? */
378                 if (ret == -1) {
379                         /* There's an error. */
380                         tevent_req_error(req, EPIPE);
381                         return;
382                 }
383                 /* A request for TEVENT_FD_READ will succeed from now and
384                    forevermore until the bytes are read so if there was
385                    an error we'll wait until we do read, then get it in
386                    the read callback function. Until then, remove TEVENT_FD_READ
387                    from the flags we're waiting for. */
388                 state->flags &= ~TEVENT_FD_READ;
389                 TEVENT_FD_NOT_READABLE(fde);
390
391                 /* If not writable, we're done. */
392                 if (!(flags & TEVENT_FD_WRITE)) {
393                         return;
394                 }
395         }
396
397         written = writev(state->fd, state->iov, state->count);
398         if ((written == -1) && (errno == EINTR)) {
399                 /* retry */
400                 return;
401         }
402         if (written == -1) {
403                 tevent_req_error(req, errno);
404                 return;
405         }
406         if (written == 0) {
407                 tevent_req_error(req, EPIPE);
408                 return;
409         }
410         state->total_size += written;
411
412         ok = iov_advance(&state->iov, &state->count, written);
413         if (!ok) {
414                 tevent_req_error(req, EIO);
415                 return;
416         }
417
418         if (state->count == 0) {
419                 tevent_req_done(req);
420                 return;
421         }
422 }
423
424 ssize_t writev_recv(struct tevent_req *req, int *perrno)
425 {
426         struct writev_state *state =
427                 tevent_req_data(req, struct writev_state);
428         ssize_t ret;
429
430         if (tevent_req_is_unix_error(req, perrno)) {
431                 tevent_req_received(req);
432                 return -1;
433         }
434         ret = state->total_size;
435         tevent_req_received(req);
436         return ret;
437 }
438
439 struct read_packet_state {
440         int fd;
441         struct tevent_fd *fde;
442         uint8_t *buf;
443         size_t nread;
444         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
445         void *private_data;
446 };
447
448 static void read_packet_cleanup(struct tevent_req *req,
449                                  enum tevent_req_state req_state);
450 static void read_packet_handler(struct tevent_context *ev,
451                                 struct tevent_fd *fde,
452                                 uint16_t flags, void *private_data);
453
454 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
455                                     struct tevent_context *ev,
456                                     int fd, size_t initial,
457                                     ssize_t (*more)(uint8_t *buf,
458                                                     size_t buflen,
459                                                     void *private_data),
460                                     void *private_data)
461 {
462         struct tevent_req *req;
463         struct read_packet_state *state;
464
465         req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
466         if (req == NULL) {
467                 return NULL;
468         }
469         state->fd = fd;
470         state->nread = 0;
471         state->more = more;
472         state->private_data = private_data;
473
474         tevent_req_set_cleanup_fn(req, read_packet_cleanup);
475
476         state->buf = talloc_array(state, uint8_t, initial);
477         if (tevent_req_nomem(state->buf, req)) {
478                 return tevent_req_post(req, ev);
479         }
480
481         state->fde = tevent_add_fd(ev, state, fd,
482                                    TEVENT_FD_READ, read_packet_handler,
483                                    req);
484         if (tevent_req_nomem(state->fde, req)) {
485                 return tevent_req_post(req, ev);
486         }
487         return req;
488 }
489
490 static void read_packet_cleanup(struct tevent_req *req,
491                            enum tevent_req_state req_state)
492 {
493         struct read_packet_state *state =
494                 tevent_req_data(req, struct read_packet_state);
495
496         TALLOC_FREE(state->fde);
497 }
498
499 static void read_packet_handler(struct tevent_context *ev,
500                                 struct tevent_fd *fde,
501                                 uint16_t flags, void *private_data)
502 {
503         struct tevent_req *req = talloc_get_type_abort(
504                 private_data, struct tevent_req);
505         struct read_packet_state *state =
506                 tevent_req_data(req, struct read_packet_state);
507         size_t total = talloc_get_size(state->buf);
508         ssize_t nread, more;
509         uint8_t *tmp;
510
511         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
512                      0);
513         if ((nread == -1) && (errno == ENOTSOCK)) {
514                 nread = read(state->fd, state->buf+state->nread,
515                              total-state->nread);
516         }
517         if ((nread == -1) && (errno == EINTR)) {
518                 /* retry */
519                 return;
520         }
521         if (nread == -1) {
522                 tevent_req_error(req, errno);
523                 return;
524         }
525         if (nread == 0) {
526                 tevent_req_error(req, EPIPE);
527                 return;
528         }
529
530         state->nread += nread;
531         if (state->nread < total) {
532                 /* Come back later */
533                 return;
534         }
535
536         /*
537          * We got what was initially requested. See if "more" asks for -- more.
538          */
539         if (state->more == NULL) {
540                 /* Nobody to ask, this is a async read_data */
541                 tevent_req_done(req);
542                 return;
543         }
544
545         more = state->more(state->buf, total, state->private_data);
546         if (more == -1) {
547                 /* We got an invalid packet, tell the caller */
548                 tevent_req_error(req, EIO);
549                 return;
550         }
551         if (more == 0) {
552                 /* We're done, full packet received */
553                 tevent_req_done(req);
554                 return;
555         }
556
557         if (total + more < total) {
558                 tevent_req_error(req, EMSGSIZE);
559                 return;
560         }
561
562         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
563         if (tevent_req_nomem(tmp, req)) {
564                 return;
565         }
566         state->buf = tmp;
567 }
568
569 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
570                          uint8_t **pbuf, int *perrno)
571 {
572         struct read_packet_state *state =
573                 tevent_req_data(req, struct read_packet_state);
574
575         if (tevent_req_is_unix_error(req, perrno)) {
576                 tevent_req_received(req);
577                 return -1;
578         }
579         *pbuf = talloc_move(mem_ctx, &state->buf);
580         tevent_req_received(req);
581         return talloc_get_size(*pbuf);
582 }
583
584 struct wait_for_read_state {
585         struct tevent_fd *fde;
586         int fd;
587         bool check_errors;
588 };
589
590 static void wait_for_read_cleanup(struct tevent_req *req,
591                                   enum tevent_req_state req_state);
592 static void wait_for_read_done(struct tevent_context *ev,
593                                struct tevent_fd *fde,
594                                uint16_t flags,
595                                void *private_data);
596
597 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
598                                       struct tevent_context *ev, int fd,
599                                       bool check_errors)
600 {
601         struct tevent_req *req;
602         struct wait_for_read_state *state;
603
604         req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
605         if (req == NULL) {
606                 return NULL;
607         }
608
609         tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
610
611         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
612                                    wait_for_read_done, req);
613         if (tevent_req_nomem(state->fde, req)) {
614                 return tevent_req_post(req, ev);
615         }
616
617         state->fd = fd;
618         state->check_errors = check_errors;
619         return req;
620 }
621
622 static void wait_for_read_cleanup(struct tevent_req *req,
623                                   enum tevent_req_state req_state)
624 {
625         struct wait_for_read_state *state =
626                 tevent_req_data(req, struct wait_for_read_state);
627
628         TALLOC_FREE(state->fde);
629 }
630
631 static void wait_for_read_done(struct tevent_context *ev,
632                                struct tevent_fd *fde,
633                                uint16_t flags,
634                                void *private_data)
635 {
636         struct tevent_req *req = talloc_get_type_abort(
637                 private_data, struct tevent_req);
638         struct wait_for_read_state *state =
639             tevent_req_data(req, struct wait_for_read_state);
640         ssize_t nread;
641         char c;
642
643         if ((flags & TEVENT_FD_READ) == 0) {
644                 return;
645         }
646
647         if (!state->check_errors) {
648                 tevent_req_done(req);
649                 return;
650         }
651
652         nread = recv(state->fd, &c, 1, MSG_PEEK);
653
654         if (nread == 0) {
655                 tevent_req_error(req, EPIPE);
656                 return;
657         }
658
659         if ((nread == -1) && (errno == EINTR)) {
660                 /* come back later */
661                 return;
662         }
663
664         if ((nread == -1) && (errno == ENOTSOCK)) {
665                 /* Ignore this specific error on pipes */
666                 tevent_req_done(req);
667                 return;
668         }
669
670         if (nread == -1) {
671                 tevent_req_error(req, errno);
672                 return;
673         }
674
675         tevent_req_done(req);
676 }
677
678 bool wait_for_read_recv(struct tevent_req *req, int *perr)
679 {
680         int err = tevent_req_simple_recv_unix(req);
681
682         if (err != 0) {
683                 *perr = err;
684                 return false;
685         }
686
687         return true;
688 }
689
690 struct accept_state {
691         struct tevent_fd *fde;
692         int listen_sock;
693         socklen_t addrlen;
694         struct sockaddr_storage addr;
695         int sock;
696 };
697
698 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
699                            uint16_t flags, void *private_data);
700
701 struct tevent_req *accept_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
702                                int listen_sock)
703 {
704         struct tevent_req *req;
705         struct accept_state *state;
706
707         req = tevent_req_create(mem_ctx, &state, struct accept_state);
708         if (req == NULL) {
709                 return NULL;
710         }
711
712         state->listen_sock = listen_sock;
713
714         state->fde = tevent_add_fd(ev, state, listen_sock, TEVENT_FD_READ,
715                                    accept_handler, req);
716         if (tevent_req_nomem(state->fde, req)) {
717                 return tevent_req_post(req, ev);
718         }
719         return req;
720 }
721
722 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
723                            uint16_t flags, void *private_data)
724 {
725         struct tevent_req *req = talloc_get_type_abort(
726                 private_data, struct tevent_req);
727         struct accept_state *state = tevent_req_data(req, struct accept_state);
728         int ret;
729
730         TALLOC_FREE(state->fde);
731
732         if ((flags & TEVENT_FD_READ) == 0) {
733                 tevent_req_error(req, EIO);
734                 return;
735         }
736         state->addrlen = sizeof(state->addr);
737
738         ret = accept(state->listen_sock, (struct sockaddr *)&state->addr,
739                      &state->addrlen);
740         if ((ret == -1) && (errno == EINTR)) {
741                 /* retry */
742                 return;
743         }
744         if (ret == -1) {
745                 tevent_req_error(req, errno);
746                 return;
747         }
748         smb_set_close_on_exec(ret);
749         state->sock = ret;
750         tevent_req_done(req);
751 }
752
753 int accept_recv(struct tevent_req *req, struct sockaddr_storage *paddr,
754                 socklen_t *paddrlen, int *perr)
755 {
756         struct accept_state *state = tevent_req_data(req, struct accept_state);
757         int err;
758
759         if (tevent_req_is_unix_error(req, &err)) {
760                 if (perr != NULL) {
761                         *perr = err;
762                 }
763                 return -1;
764         }
765         if (paddr != NULL) {
766                 memcpy(paddr, &state->addr, state->addrlen);
767         }
768         if (paddrlen != NULL) {
769                 *paddrlen = state->addrlen;
770         }
771         return state->sock;
772 }