Merge commit 'release-4-0-0alpha15' into master4-tmp
[ab/samba.git/.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30
31 /* Note: lib/util/ is currently GPL */
32 #include "lib/util/tevent_unix.h"
33 #include "lib/util/util.h"
34
35 #ifndef TALLOC_FREE
36 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
37 #endif
38
39 struct sendto_state {
40         int fd;
41         const void *buf;
42         size_t len;
43         int flags;
44         const struct sockaddr_storage *addr;
45         socklen_t addr_len;
46         ssize_t sent;
47 };
48
49 static void sendto_handler(struct tevent_context *ev,
50                                struct tevent_fd *fde,
51                                uint16_t flags, void *private_data);
52
53 struct tevent_req *sendto_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
54                                int fd, const void *buf, size_t len, int flags,
55                                const struct sockaddr_storage *addr)
56 {
57         struct tevent_req *result;
58         struct sendto_state *state;
59         struct tevent_fd *fde;
60
61         result = tevent_req_create(mem_ctx, &state, struct sendto_state);
62         if (result == NULL) {
63                 return result;
64         }
65         state->fd = fd;
66         state->buf = buf;
67         state->len = len;
68         state->flags = flags;
69         state->addr = addr;
70
71         switch (addr->ss_family) {
72         case AF_INET:
73                 state->addr_len = sizeof(struct sockaddr_in);
74                 break;
75 #if defined(HAVE_IPV6)
76         case AF_INET6:
77                 state->addr_len = sizeof(struct sockaddr_in6);
78                 break;
79 #endif
80         case AF_UNIX:
81                 state->addr_len = sizeof(struct sockaddr_un);
82                 break;
83         default:
84                 state->addr_len = sizeof(struct sockaddr_storage);
85                 break;
86         }
87
88         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, sendto_handler,
89                             result);
90         if (fde == NULL) {
91                 TALLOC_FREE(result);
92                 return NULL;
93         }
94         return result;
95 }
96
97 static void sendto_handler(struct tevent_context *ev,
98                                struct tevent_fd *fde,
99                                uint16_t flags, void *private_data)
100 {
101         struct tevent_req *req = talloc_get_type_abort(
102                 private_data, struct tevent_req);
103         struct sendto_state *state =
104                 tevent_req_data(req, struct sendto_state);
105
106         state->sent = sendto(state->fd, state->buf, state->len, state->flags,
107                              (const struct sockaddr *)state->addr,
108                              state->addr_len);
109         if ((state->sent == -1) && (errno == EINTR)) {
110                 /* retry */
111                 return;
112         }
113         if (state->sent == -1) {
114                 tevent_req_error(req, errno);
115                 return;
116         }
117         tevent_req_done(req);
118 }
119
120 ssize_t sendto_recv(struct tevent_req *req, int *perrno)
121 {
122         struct sendto_state *state =
123                 tevent_req_data(req, struct sendto_state);
124
125         if (tevent_req_is_unix_error(req, perrno)) {
126                 return -1;
127         }
128         return state->sent;
129 }
130
131 struct recvfrom_state {
132         int fd;
133         void *buf;
134         size_t len;
135         int flags;
136         struct sockaddr_storage *addr;
137         socklen_t *addr_len;
138         ssize_t received;
139 };
140
141 static void recvfrom_handler(struct tevent_context *ev,
142                                struct tevent_fd *fde,
143                                uint16_t flags, void *private_data);
144
145 struct tevent_req *recvfrom_send(TALLOC_CTX *mem_ctx,
146                                  struct tevent_context *ev,
147                                  int fd, void *buf, size_t len, int flags,
148                                  struct sockaddr_storage *addr,
149                                  socklen_t *addr_len)
150 {
151         struct tevent_req *result;
152         struct recvfrom_state *state;
153         struct tevent_fd *fde;
154
155         result = tevent_req_create(mem_ctx, &state, struct recvfrom_state);
156         if (result == NULL) {
157                 return result;
158         }
159         state->fd = fd;
160         state->buf = buf;
161         state->len = len;
162         state->flags = flags;
163         state->addr = addr;
164         state->addr_len = addr_len;
165
166         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, recvfrom_handler,
167                             result);
168         if (fde == NULL) {
169                 TALLOC_FREE(result);
170                 return NULL;
171         }
172         return result;
173 }
174
175 static void recvfrom_handler(struct tevent_context *ev,
176                                struct tevent_fd *fde,
177                                uint16_t flags, void *private_data)
178 {
179         struct tevent_req *req = talloc_get_type_abort(
180                 private_data, struct tevent_req);
181         struct recvfrom_state *state =
182                 tevent_req_data(req, struct recvfrom_state);
183
184         state->received = recvfrom(state->fd, state->buf, state->len,
185                                    state->flags, (struct sockaddr *)state->addr,
186                                    state->addr_len);
187         if ((state->received == -1) && (errno == EINTR)) {
188                 /* retry */
189                 return;
190         }
191         if (state->received == 0) {
192                 tevent_req_error(req, EPIPE);
193                 return;
194         }
195         if (state->received == -1) {
196                 tevent_req_error(req, errno);
197                 return;
198         }
199         tevent_req_done(req);
200 }
201
202 ssize_t recvfrom_recv(struct tevent_req *req, int *perrno)
203 {
204         struct recvfrom_state *state =
205                 tevent_req_data(req, struct recvfrom_state);
206
207         if (tevent_req_is_unix_error(req, perrno)) {
208                 return -1;
209         }
210         return state->received;
211 }
212
213 struct async_connect_state {
214         int fd;
215         int result;
216         int sys_errno;
217         long old_sockflags;
218         socklen_t address_len;
219         struct sockaddr_storage address;
220 };
221
222 static void async_connect_connected(struct tevent_context *ev,
223                                     struct tevent_fd *fde, uint16_t flags,
224                                     void *priv);
225
226 /**
227  * @brief async version of connect(2)
228  * @param[in] mem_ctx   The memory context to hang the result off
229  * @param[in] ev        The event context to work from
230  * @param[in] fd        The socket to recv from
231  * @param[in] address   Where to connect?
232  * @param[in] address_len Length of *address
233  * @retval The async request
234  *
235  * This function sets the socket into non-blocking state to be able to call
236  * connect in an async state. This will be reset when the request is finished.
237  */
238
239 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
240                                       struct tevent_context *ev,
241                                       int fd, const struct sockaddr *address,
242                                       socklen_t address_len)
243 {
244         struct tevent_req *result;
245         struct async_connect_state *state;
246         struct tevent_fd *fde;
247
248         result = tevent_req_create(
249                 mem_ctx, &state, struct async_connect_state);
250         if (result == NULL) {
251                 return NULL;
252         }
253
254         /**
255          * We have to set the socket to nonblocking for async connect(2). Keep
256          * the old sockflags around.
257          */
258
259         state->fd = fd;
260         state->sys_errno = 0;
261
262         state->old_sockflags = fcntl(fd, F_GETFL, 0);
263         if (state->old_sockflags == -1) {
264                 goto post_errno;
265         }
266
267         state->address_len = address_len;
268         if (address_len > sizeof(state->address)) {
269                 errno = EINVAL;
270                 goto post_errno;
271         }
272         memcpy(&state->address, address, address_len);
273
274         set_blocking(fd, false);
275
276         state->result = connect(fd, address, address_len);
277         if (state->result == 0) {
278                 tevent_req_done(result);
279                 goto done;
280         }
281
282         /**
283          * A number of error messages show that something good is progressing
284          * and that we have to wait for readability.
285          *
286          * If none of them are present, bail out.
287          */
288
289         if (!(errno == EINPROGRESS || errno == EALREADY ||
290 #ifdef EISCONN
291               errno == EISCONN ||
292 #endif
293               errno == EAGAIN || errno == EINTR)) {
294                 state->sys_errno = errno;
295                 goto post_errno;
296         }
297
298         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
299                            async_connect_connected, result);
300         if (fde == NULL) {
301                 state->sys_errno = ENOMEM;
302                 goto post_errno;
303         }
304         return result;
305
306  post_errno:
307         tevent_req_error(result, state->sys_errno);
308  done:
309         fcntl(fd, F_SETFL, state->old_sockflags);
310         return tevent_req_post(result, ev);
311 }
312
313 /**
314  * fde event handler for connect(2)
315  * @param[in] ev        The event context that sent us here
316  * @param[in] fde       The file descriptor event associated with the connect
317  * @param[in] flags     Indicate read/writeability of the socket
318  * @param[in] priv      private data, "struct async_req *" in this case
319  */
320
321 static void async_connect_connected(struct tevent_context *ev,
322                                     struct tevent_fd *fde, uint16_t flags,
323                                     void *priv)
324 {
325         struct tevent_req *req = talloc_get_type_abort(
326                 priv, struct tevent_req);
327         struct async_connect_state *state =
328                 tevent_req_data(req, struct async_connect_state);
329
330         /*
331          * Stevens, Network Programming says that if there's a
332          * successful connect, the socket is only writable. Upon an
333          * error, it's both readable and writable.
334          */
335         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
336             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
337                 int ret;
338
339                 ret = connect(state->fd,
340                               (struct sockaddr *)(void *)&state->address,
341                               state->address_len);
342                 if (ret == 0) {
343                         TALLOC_FREE(fde);
344                         tevent_req_done(req);
345                         return;
346                 }
347
348                 if (errno == EINPROGRESS) {
349                         /* Try again later, leave the fde around */
350                         return;
351                 }
352                 TALLOC_FREE(fde);
353                 tevent_req_error(req, errno);
354                 return;
355         }
356
357         state->sys_errno = 0;
358         tevent_req_done(req);
359 }
360
361 int async_connect_recv(struct tevent_req *req, int *perrno)
362 {
363         struct async_connect_state *state =
364                 tevent_req_data(req, struct async_connect_state);
365         int err;
366
367         fcntl(state->fd, F_SETFL, state->old_sockflags);
368
369         if (tevent_req_is_unix_error(req, &err)) {
370                 *perrno = err;
371                 return -1;
372         }
373
374         if (state->sys_errno == 0) {
375                 return 0;
376         }
377
378         *perrno = state->sys_errno;
379         return -1;
380 }
381
382 struct writev_state {
383         struct tevent_context *ev;
384         int fd;
385         struct iovec *iov;
386         int count;
387         size_t total_size;
388         uint16_t flags;
389         bool err_on_readability;
390 };
391
392 static void writev_trigger(struct tevent_req *req, void *private_data);
393 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
394                            uint16_t flags, void *private_data);
395
396 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
397                                struct tevent_queue *queue, int fd,
398                                bool err_on_readability,
399                                struct iovec *iov, int count)
400 {
401         struct tevent_req *req;
402         struct writev_state *state;
403
404         req = tevent_req_create(mem_ctx, &state, struct writev_state);
405         if (req == NULL) {
406                 return NULL;
407         }
408         state->ev = ev;
409         state->fd = fd;
410         state->total_size = 0;
411         state->count = count;
412         state->iov = (struct iovec *)talloc_memdup(
413                 state, iov, sizeof(struct iovec) * count);
414         if (state->iov == NULL) {
415                 goto fail;
416         }
417         state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
418         state->err_on_readability = err_on_readability;
419
420         if (queue == NULL) {
421                 struct tevent_fd *fde;
422                 fde = tevent_add_fd(state->ev, state, state->fd,
423                                     state->flags, writev_handler, req);
424                 if (tevent_req_nomem(fde, req)) {
425                         return tevent_req_post(req, ev);
426                 }
427                 return req;
428         }
429
430         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
431                 goto fail;
432         }
433         return req;
434  fail:
435         TALLOC_FREE(req);
436         return NULL;
437 }
438
439 static void writev_trigger(struct tevent_req *req, void *private_data)
440 {
441         struct writev_state *state = tevent_req_data(req, struct writev_state);
442         struct tevent_fd *fde;
443
444         fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
445                             writev_handler, req);
446         if (fde == NULL) {
447                 tevent_req_error(req, ENOMEM);
448         }
449 }
450
451 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
452                            uint16_t flags, void *private_data)
453 {
454         struct tevent_req *req = talloc_get_type_abort(
455                 private_data, struct tevent_req);
456         struct writev_state *state =
457                 tevent_req_data(req, struct writev_state);
458         size_t to_write, written;
459         int i;
460
461         to_write = 0;
462
463         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
464                 int ret, value;
465
466                 if (state->err_on_readability) {
467                         /* Readable and the caller wants an error on read. */
468                         tevent_req_error(req, EPIPE);
469                         return;
470                 }
471
472                 /* Might be an error. Check if there are bytes to read */
473                 ret = ioctl(state->fd, FIONREAD, &value);
474                 /* FIXME - should we also check
475                    for ret == 0 and value == 0 here ? */
476                 if (ret == -1) {
477                         /* There's an error. */
478                         tevent_req_error(req, EPIPE);
479                         return;
480                 }
481                 /* A request for TEVENT_FD_READ will succeed from now and
482                    forevermore until the bytes are read so if there was
483                    an error we'll wait until we do read, then get it in
484                    the read callback function. Until then, remove TEVENT_FD_READ
485                    from the flags we're waiting for. */
486                 state->flags &= ~TEVENT_FD_READ;
487                 TEVENT_FD_NOT_READABLE(fde);
488
489                 /* If not writable, we're done. */
490                 if (!(flags & TEVENT_FD_WRITE)) {
491                         return;
492                 }
493         }
494
495         for (i=0; i<state->count; i++) {
496                 to_write += state->iov[i].iov_len;
497         }
498
499         written = writev(state->fd, state->iov, state->count);
500         if ((written == -1) && (errno == EINTR)) {
501                 /* retry */
502                 return;
503         }
504         if (written == -1) {
505                 tevent_req_error(req, errno);
506                 return;
507         }
508         if (written == 0) {
509                 tevent_req_error(req, EPIPE);
510                 return;
511         }
512         state->total_size += written;
513
514         if (written == to_write) {
515                 tevent_req_done(req);
516                 return;
517         }
518
519         /*
520          * We've written less than we were asked to, drop stuff from
521          * state->iov.
522          */
523
524         while (written > 0) {
525                 if (written < state->iov[0].iov_len) {
526                         state->iov[0].iov_base =
527                                 (char *)state->iov[0].iov_base + written;
528                         state->iov[0].iov_len -= written;
529                         break;
530                 }
531                 written -= state->iov[0].iov_len;
532                 state->iov += 1;
533                 state->count -= 1;
534         }
535 }
536
537 ssize_t writev_recv(struct tevent_req *req, int *perrno)
538 {
539         struct writev_state *state =
540                 tevent_req_data(req, struct writev_state);
541
542         if (tevent_req_is_unix_error(req, perrno)) {
543                 return -1;
544         }
545         return state->total_size;
546 }
547
548 struct read_packet_state {
549         int fd;
550         uint8_t *buf;
551         size_t nread;
552         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
553         void *private_data;
554 };
555
556 static void read_packet_handler(struct tevent_context *ev,
557                                 struct tevent_fd *fde,
558                                 uint16_t flags, void *private_data);
559
560 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
561                                     struct tevent_context *ev,
562                                     int fd, size_t initial,
563                                     ssize_t (*more)(uint8_t *buf,
564                                                     size_t buflen,
565                                                     void *private_data),
566                                     void *private_data)
567 {
568         struct tevent_req *result;
569         struct read_packet_state *state;
570         struct tevent_fd *fde;
571
572         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
573         if (result == NULL) {
574                 return NULL;
575         }
576         state->fd = fd;
577         state->nread = 0;
578         state->more = more;
579         state->private_data = private_data;
580
581         state->buf = talloc_array(state, uint8_t, initial);
582         if (state->buf == NULL) {
583                 goto fail;
584         }
585
586         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
587                             result);
588         if (fde == NULL) {
589                 goto fail;
590         }
591         return result;
592  fail:
593         TALLOC_FREE(result);
594         return NULL;
595 }
596
597 static void read_packet_handler(struct tevent_context *ev,
598                                 struct tevent_fd *fde,
599                                 uint16_t flags, void *private_data)
600 {
601         struct tevent_req *req = talloc_get_type_abort(
602                 private_data, struct tevent_req);
603         struct read_packet_state *state =
604                 tevent_req_data(req, struct read_packet_state);
605         size_t total = talloc_get_size(state->buf);
606         ssize_t nread, more;
607         uint8_t *tmp;
608
609         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
610                      0);
611         if ((nread == -1) && (errno == EINTR)) {
612                 /* retry */
613                 return;
614         }
615         if (nread == -1) {
616                 tevent_req_error(req, errno);
617                 return;
618         }
619         if (nread == 0) {
620                 tevent_req_error(req, EPIPE);
621                 return;
622         }
623
624         state->nread += nread;
625         if (state->nread < total) {
626                 /* Come back later */
627                 return;
628         }
629
630         /*
631          * We got what was initially requested. See if "more" asks for -- more.
632          */
633         if (state->more == NULL) {
634                 /* Nobody to ask, this is a async read_data */
635                 tevent_req_done(req);
636                 return;
637         }
638
639         more = state->more(state->buf, total, state->private_data);
640         if (more == -1) {
641                 /* We got an invalid packet, tell the caller */
642                 tevent_req_error(req, EIO);
643                 return;
644         }
645         if (more == 0) {
646                 /* We're done, full packet received */
647                 tevent_req_done(req);
648                 return;
649         }
650
651         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
652         if (tevent_req_nomem(tmp, req)) {
653                 return;
654         }
655         state->buf = tmp;
656 }
657
658 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
659                          uint8_t **pbuf, int *perrno)
660 {
661         struct read_packet_state *state =
662                 tevent_req_data(req, struct read_packet_state);
663
664         if (tevent_req_is_unix_error(req, perrno)) {
665                 return -1;
666         }
667         *pbuf = talloc_move(mem_ctx, &state->buf);
668         return talloc_get_size(*pbuf);
669 }