selftest: add a test for async_connect_send()
[kai/samba-autobuild/.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30 #include "lib/util/iov_buf.h"
31
32 /* Note: lib/util/ is currently GPL */
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/samba_util.h"
35
36 struct async_connect_state {
37         int fd;
38         struct tevent_fd *fde;
39         int result;
40         long old_sockflags;
41         socklen_t address_len;
42         struct sockaddr_storage address;
43
44         void (*before_connect)(void *private_data);
45         void (*after_connect)(void *private_data);
46         void *private_data;
47 };
48
49 static void async_connect_cleanup(struct tevent_req *req,
50                                   enum tevent_req_state req_state);
51 static void async_connect_connected(struct tevent_context *ev,
52                                     struct tevent_fd *fde, uint16_t flags,
53                                     void *priv);
54
55 /**
56  * @brief async version of connect(2)
57  * @param[in] mem_ctx   The memory context to hang the result off
58  * @param[in] ev        The event context to work from
59  * @param[in] fd        The socket to recv from
60  * @param[in] address   Where to connect?
61  * @param[in] address_len Length of *address
62  * @retval The async request
63  *
64  * This function sets the socket into non-blocking state to be able to call
65  * connect in an async state. This will be reset when the request is finished.
66  */
67
68 struct tevent_req *async_connect_send(
69         TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
70         const struct sockaddr *address, socklen_t address_len,
71         void (*before_connect)(void *private_data),
72         void (*after_connect)(void *private_data),
73         void *private_data)
74 {
75         struct tevent_req *req;
76         struct async_connect_state *state;
77         int ret;
78
79         req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
80         if (req == NULL) {
81                 return NULL;
82         }
83
84         /**
85          * We have to set the socket to nonblocking for async connect(2). Keep
86          * the old sockflags around.
87          */
88
89         state->fd = fd;
90         state->before_connect = before_connect;
91         state->after_connect = after_connect;
92         state->private_data = private_data;
93
94         state->old_sockflags = fcntl(fd, F_GETFL, 0);
95         if (state->old_sockflags == -1) {
96                 tevent_req_error(req, errno);
97                 return tevent_req_post(req, ev);
98         }
99
100         tevent_req_set_cleanup_fn(req, async_connect_cleanup);
101
102         state->address_len = address_len;
103         if (address_len > sizeof(state->address)) {
104                 tevent_req_error(req, EINVAL);
105                 return tevent_req_post(req, ev);
106         }
107         memcpy(&state->address, address, address_len);
108
109         ret = set_blocking(fd, false);
110         if (ret == -1) {
111                 tevent_req_error(req, errno);
112                 return tevent_req_post(req, ev);
113         }
114
115         if (state->before_connect != NULL) {
116                 state->before_connect(state->private_data);
117         }
118
119         state->result = connect(fd, address, address_len);
120
121         if (state->after_connect != NULL) {
122                 state->after_connect(state->private_data);
123         }
124
125         if (state->result == 0) {
126                 tevent_req_done(req);
127                 return tevent_req_post(req, ev);
128         }
129
130         /**
131          * A number of error messages show that something good is progressing
132          * and that we have to wait for readability.
133          *
134          * If none of them are present, bail out.
135          */
136
137         if (!(errno == EINPROGRESS || errno == EALREADY ||
138 #ifdef EISCONN
139               errno == EISCONN ||
140 #endif
141               errno == EAGAIN || errno == EINTR)) {
142                 tevent_req_error(req, errno);
143                 return tevent_req_post(req, ev);
144         }
145
146         state->fde = tevent_add_fd(ev, state, fd,
147                                    TEVENT_FD_READ | TEVENT_FD_WRITE,
148                                    async_connect_connected, req);
149         if (state->fde == NULL) {
150                 tevent_req_error(req, ENOMEM);
151                 return tevent_req_post(req, ev);
152         }
153         return req;
154 }
155
156 static void async_connect_cleanup(struct tevent_req *req,
157                                   enum tevent_req_state req_state)
158 {
159         struct async_connect_state *state =
160                 tevent_req_data(req, struct async_connect_state);
161
162         TALLOC_FREE(state->fde);
163         if (state->fd != -1) {
164                 int ret;
165
166                 ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
167                 if (ret == -1) {
168                         abort();
169                 }
170
171                 state->fd = -1;
172         }
173 }
174
175 /**
176  * fde event handler for connect(2)
177  * @param[in] ev        The event context that sent us here
178  * @param[in] fde       The file descriptor event associated with the connect
179  * @param[in] flags     Indicate read/writeability of the socket
180  * @param[in] priv      private data, "struct async_req *" in this case
181  */
182
183 static void async_connect_connected(struct tevent_context *ev,
184                                     struct tevent_fd *fde, uint16_t flags,
185                                     void *priv)
186 {
187         struct tevent_req *req = talloc_get_type_abort(
188                 priv, struct tevent_req);
189         struct async_connect_state *state =
190                 tevent_req_data(req, struct async_connect_state);
191         int ret;
192
193         if (state->before_connect != NULL) {
194                 state->before_connect(state->private_data);
195         }
196
197         ret = connect(state->fd, (struct sockaddr *)(void *)&state->address,
198                       state->address_len);
199
200         if (state->after_connect != NULL) {
201                 state->after_connect(state->private_data);
202         }
203
204         if (ret == 0) {
205                 tevent_req_done(req);
206                 return;
207         }
208         if (errno == EINPROGRESS) {
209                 /* Try again later, leave the fde around */
210                 return;
211         }
212         tevent_req_error(req, errno);
213         return;
214 }
215
216 int async_connect_recv(struct tevent_req *req, int *perrno)
217 {
218         int err = tevent_req_simple_recv_unix(req);
219
220         if (err != 0) {
221                 *perrno = err;
222                 return -1;
223         }
224
225         return 0;
226 }
227
228 struct writev_state {
229         struct tevent_context *ev;
230         int fd;
231         struct tevent_fd *fde;
232         struct iovec *iov;
233         int count;
234         size_t total_size;
235         uint16_t flags;
236         bool err_on_readability;
237 };
238
239 static void writev_cleanup(struct tevent_req *req,
240                            enum tevent_req_state req_state);
241 static void writev_trigger(struct tevent_req *req, void *private_data);
242 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
243                            uint16_t flags, void *private_data);
244
245 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
246                                struct tevent_queue *queue, int fd,
247                                bool err_on_readability,
248                                struct iovec *iov, int count)
249 {
250         struct tevent_req *req;
251         struct writev_state *state;
252
253         req = tevent_req_create(mem_ctx, &state, struct writev_state);
254         if (req == NULL) {
255                 return NULL;
256         }
257         state->ev = ev;
258         state->fd = fd;
259         state->total_size = 0;
260         state->count = count;
261         state->iov = (struct iovec *)talloc_memdup(
262                 state, iov, sizeof(struct iovec) * count);
263         if (tevent_req_nomem(state->iov, req)) {
264                 return tevent_req_post(req, ev);
265         }
266         state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
267         state->err_on_readability = err_on_readability;
268
269         tevent_req_set_cleanup_fn(req, writev_cleanup);
270
271         if (queue == NULL) {
272                 state->fde = tevent_add_fd(state->ev, state, state->fd,
273                                     state->flags, writev_handler, req);
274                 if (tevent_req_nomem(state->fde, req)) {
275                         return tevent_req_post(req, ev);
276                 }
277                 return req;
278         }
279
280         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
281                 tevent_req_oom(req);
282                 return tevent_req_post(req, ev);
283         }
284         return req;
285 }
286
287 static void writev_cleanup(struct tevent_req *req,
288                            enum tevent_req_state req_state)
289 {
290         struct writev_state *state = tevent_req_data(req, struct writev_state);
291
292         TALLOC_FREE(state->fde);
293 }
294
295 static void writev_trigger(struct tevent_req *req, void *private_data)
296 {
297         struct writev_state *state = tevent_req_data(req, struct writev_state);
298
299         state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
300                             writev_handler, req);
301         if (tevent_req_nomem(state->fde, req)) {
302                 return;
303         }
304 }
305
306 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
307                            uint16_t flags, void *private_data)
308 {
309         struct tevent_req *req = talloc_get_type_abort(
310                 private_data, struct tevent_req);
311         struct writev_state *state =
312                 tevent_req_data(req, struct writev_state);
313         size_t written;
314         bool ok;
315
316         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
317                 int ret, value;
318
319                 if (state->err_on_readability) {
320                         /* Readable and the caller wants an error on read. */
321                         tevent_req_error(req, EPIPE);
322                         return;
323                 }
324
325                 /* Might be an error. Check if there are bytes to read */
326                 ret = ioctl(state->fd, FIONREAD, &value);
327                 /* FIXME - should we also check
328                    for ret == 0 and value == 0 here ? */
329                 if (ret == -1) {
330                         /* There's an error. */
331                         tevent_req_error(req, EPIPE);
332                         return;
333                 }
334                 /* A request for TEVENT_FD_READ will succeed from now and
335                    forevermore until the bytes are read so if there was
336                    an error we'll wait until we do read, then get it in
337                    the read callback function. Until then, remove TEVENT_FD_READ
338                    from the flags we're waiting for. */
339                 state->flags &= ~TEVENT_FD_READ;
340                 TEVENT_FD_NOT_READABLE(fde);
341
342                 /* If not writable, we're done. */
343                 if (!(flags & TEVENT_FD_WRITE)) {
344                         return;
345                 }
346         }
347
348         written = writev(state->fd, state->iov, state->count);
349         if ((written == -1) && (errno == EINTR)) {
350                 /* retry */
351                 return;
352         }
353         if (written == -1) {
354                 tevent_req_error(req, errno);
355                 return;
356         }
357         if (written == 0) {
358                 tevent_req_error(req, EPIPE);
359                 return;
360         }
361         state->total_size += written;
362
363         ok = iov_advance(&state->iov, &state->count, written);
364         if (!ok) {
365                 tevent_req_error(req, EIO);
366                 return;
367         }
368
369         if (state->count == 0) {
370                 tevent_req_done(req);
371                 return;
372         }
373 }
374
375 ssize_t writev_recv(struct tevent_req *req, int *perrno)
376 {
377         struct writev_state *state =
378                 tevent_req_data(req, struct writev_state);
379         ssize_t ret;
380
381         if (tevent_req_is_unix_error(req, perrno)) {
382                 tevent_req_received(req);
383                 return -1;
384         }
385         ret = state->total_size;
386         tevent_req_received(req);
387         return ret;
388 }
389
390 struct read_packet_state {
391         int fd;
392         struct tevent_fd *fde;
393         uint8_t *buf;
394         size_t nread;
395         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
396         void *private_data;
397 };
398
399 static void read_packet_cleanup(struct tevent_req *req,
400                                  enum tevent_req_state req_state);
401 static void read_packet_handler(struct tevent_context *ev,
402                                 struct tevent_fd *fde,
403                                 uint16_t flags, void *private_data);
404
405 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
406                                     struct tevent_context *ev,
407                                     int fd, size_t initial,
408                                     ssize_t (*more)(uint8_t *buf,
409                                                     size_t buflen,
410                                                     void *private_data),
411                                     void *private_data)
412 {
413         struct tevent_req *req;
414         struct read_packet_state *state;
415
416         req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
417         if (req == NULL) {
418                 return NULL;
419         }
420         state->fd = fd;
421         state->nread = 0;
422         state->more = more;
423         state->private_data = private_data;
424
425         tevent_req_set_cleanup_fn(req, read_packet_cleanup);
426
427         state->buf = talloc_array(state, uint8_t, initial);
428         if (tevent_req_nomem(state->buf, req)) {
429                 return tevent_req_post(req, ev);
430         }
431
432         state->fde = tevent_add_fd(ev, state, fd,
433                                    TEVENT_FD_READ, read_packet_handler,
434                                    req);
435         if (tevent_req_nomem(state->fde, req)) {
436                 return tevent_req_post(req, ev);
437         }
438         return req;
439 }
440
441 static void read_packet_cleanup(struct tevent_req *req,
442                            enum tevent_req_state req_state)
443 {
444         struct read_packet_state *state =
445                 tevent_req_data(req, struct read_packet_state);
446
447         TALLOC_FREE(state->fde);
448 }
449
450 static void read_packet_handler(struct tevent_context *ev,
451                                 struct tevent_fd *fde,
452                                 uint16_t flags, void *private_data)
453 {
454         struct tevent_req *req = talloc_get_type_abort(
455                 private_data, struct tevent_req);
456         struct read_packet_state *state =
457                 tevent_req_data(req, struct read_packet_state);
458         size_t total = talloc_get_size(state->buf);
459         ssize_t nread, more;
460         uint8_t *tmp;
461
462         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
463                      0);
464         if ((nread == -1) && (errno == ENOTSOCK)) {
465                 nread = read(state->fd, state->buf+state->nread,
466                              total-state->nread);
467         }
468         if ((nread == -1) && (errno == EINTR)) {
469                 /* retry */
470                 return;
471         }
472         if (nread == -1) {
473                 tevent_req_error(req, errno);
474                 return;
475         }
476         if (nread == 0) {
477                 tevent_req_error(req, EPIPE);
478                 return;
479         }
480
481         state->nread += nread;
482         if (state->nread < total) {
483                 /* Come back later */
484                 return;
485         }
486
487         /*
488          * We got what was initially requested. See if "more" asks for -- more.
489          */
490         if (state->more == NULL) {
491                 /* Nobody to ask, this is a async read_data */
492                 tevent_req_done(req);
493                 return;
494         }
495
496         more = state->more(state->buf, total, state->private_data);
497         if (more == -1) {
498                 /* We got an invalid packet, tell the caller */
499                 tevent_req_error(req, EIO);
500                 return;
501         }
502         if (more == 0) {
503                 /* We're done, full packet received */
504                 tevent_req_done(req);
505                 return;
506         }
507
508         if (total + more < total) {
509                 tevent_req_error(req, EMSGSIZE);
510                 return;
511         }
512
513         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
514         if (tevent_req_nomem(tmp, req)) {
515                 return;
516         }
517         state->buf = tmp;
518 }
519
520 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
521                          uint8_t **pbuf, int *perrno)
522 {
523         struct read_packet_state *state =
524                 tevent_req_data(req, struct read_packet_state);
525
526         if (tevent_req_is_unix_error(req, perrno)) {
527                 tevent_req_received(req);
528                 return -1;
529         }
530         *pbuf = talloc_move(mem_ctx, &state->buf);
531         tevent_req_received(req);
532         return talloc_get_size(*pbuf);
533 }
534
535 struct wait_for_read_state {
536         struct tevent_fd *fde;
537         int fd;
538         bool check_errors;
539 };
540
541 static void wait_for_read_cleanup(struct tevent_req *req,
542                                   enum tevent_req_state req_state);
543 static void wait_for_read_done(struct tevent_context *ev,
544                                struct tevent_fd *fde,
545                                uint16_t flags,
546                                void *private_data);
547
548 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
549                                       struct tevent_context *ev, int fd,
550                                       bool check_errors)
551 {
552         struct tevent_req *req;
553         struct wait_for_read_state *state;
554
555         req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
556         if (req == NULL) {
557                 return NULL;
558         }
559
560         tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
561
562         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
563                                    wait_for_read_done, req);
564         if (tevent_req_nomem(state->fde, req)) {
565                 return tevent_req_post(req, ev);
566         }
567
568         state->fd = fd;
569         state->check_errors = check_errors;
570         return req;
571 }
572
573 static void wait_for_read_cleanup(struct tevent_req *req,
574                                   enum tevent_req_state req_state)
575 {
576         struct wait_for_read_state *state =
577                 tevent_req_data(req, struct wait_for_read_state);
578
579         TALLOC_FREE(state->fde);
580 }
581
582 static void wait_for_read_done(struct tevent_context *ev,
583                                struct tevent_fd *fde,
584                                uint16_t flags,
585                                void *private_data)
586 {
587         struct tevent_req *req = talloc_get_type_abort(
588                 private_data, struct tevent_req);
589         struct wait_for_read_state *state =
590             tevent_req_data(req, struct wait_for_read_state);
591         ssize_t nread;
592         char c;
593
594         if ((flags & TEVENT_FD_READ) == 0) {
595                 return;
596         }
597
598         if (!state->check_errors) {
599                 tevent_req_done(req);
600                 return;
601         }
602
603         nread = recv(state->fd, &c, 1, MSG_PEEK);
604
605         if (nread == 0) {
606                 tevent_req_error(req, EPIPE);
607                 return;
608         }
609
610         if ((nread == -1) && (errno == EINTR)) {
611                 /* come back later */
612                 return;
613         }
614
615         if ((nread == -1) && (errno == ENOTSOCK)) {
616                 /* Ignore this specific error on pipes */
617                 tevent_req_done(req);
618                 return;
619         }
620
621         if (nread == -1) {
622                 tevent_req_error(req, errno);
623                 return;
624         }
625
626         tevent_req_done(req);
627 }
628
629 bool wait_for_read_recv(struct tevent_req *req, int *perr)
630 {
631         int err = tevent_req_simple_recv_unix(req);
632
633         if (err != 0) {
634                 *perr = err;
635                 return false;
636         }
637
638         return true;
639 }