s3: piddir creation fix part 2.
[ira/wip.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30
31 /* Note: lib/util/ is currently GPL */
32 #include "lib/util/tevent_unix.h"
33 #include "lib/util/samba_util.h"
34
35 #ifndef TALLOC_FREE
36 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
37 #endif
38
39 struct sendto_state {
40         int fd;
41         const void *buf;
42         size_t len;
43         int flags;
44         const struct sockaddr_storage *addr;
45         socklen_t addr_len;
46         ssize_t sent;
47 };
48
49 static void sendto_handler(struct tevent_context *ev,
50                                struct tevent_fd *fde,
51                                uint16_t flags, void *private_data);
52
53 struct tevent_req *sendto_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
54                                int fd, const void *buf, size_t len, int flags,
55                                const struct sockaddr_storage *addr)
56 {
57         struct tevent_req *result;
58         struct sendto_state *state;
59         struct tevent_fd *fde;
60
61         result = tevent_req_create(mem_ctx, &state, struct sendto_state);
62         if (result == NULL) {
63                 return result;
64         }
65         state->fd = fd;
66         state->buf = buf;
67         state->len = len;
68         state->flags = flags;
69         state->addr = addr;
70
71         switch (addr->ss_family) {
72         case AF_INET:
73                 state->addr_len = sizeof(struct sockaddr_in);
74                 break;
75 #if defined(HAVE_IPV6)
76         case AF_INET6:
77                 state->addr_len = sizeof(struct sockaddr_in6);
78                 break;
79 #endif
80         case AF_UNIX:
81                 state->addr_len = sizeof(struct sockaddr_un);
82                 break;
83         default:
84                 state->addr_len = sizeof(struct sockaddr_storage);
85                 break;
86         }
87
88         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, sendto_handler,
89                             result);
90         if (fde == NULL) {
91                 TALLOC_FREE(result);
92                 return NULL;
93         }
94         return result;
95 }
96
97 static void sendto_handler(struct tevent_context *ev,
98                                struct tevent_fd *fde,
99                                uint16_t flags, void *private_data)
100 {
101         struct tevent_req *req = talloc_get_type_abort(
102                 private_data, struct tevent_req);
103         struct sendto_state *state =
104                 tevent_req_data(req, struct sendto_state);
105
106         state->sent = sendto(state->fd, state->buf, state->len, state->flags,
107                              (const struct sockaddr *)state->addr,
108                              state->addr_len);
109         if ((state->sent == -1) && (errno == EINTR)) {
110                 /* retry */
111                 return;
112         }
113         if (state->sent == -1) {
114                 tevent_req_error(req, errno);
115                 return;
116         }
117         tevent_req_done(req);
118 }
119
120 ssize_t sendto_recv(struct tevent_req *req, int *perrno)
121 {
122         struct sendto_state *state =
123                 tevent_req_data(req, struct sendto_state);
124
125         if (tevent_req_is_unix_error(req, perrno)) {
126                 return -1;
127         }
128         return state->sent;
129 }
130
131 struct recvfrom_state {
132         int fd;
133         void *buf;
134         size_t len;
135         int flags;
136         struct sockaddr_storage *addr;
137         socklen_t *addr_len;
138         ssize_t received;
139 };
140
141 static void recvfrom_handler(struct tevent_context *ev,
142                                struct tevent_fd *fde,
143                                uint16_t flags, void *private_data);
144
145 struct tevent_req *recvfrom_send(TALLOC_CTX *mem_ctx,
146                                  struct tevent_context *ev,
147                                  int fd, void *buf, size_t len, int flags,
148                                  struct sockaddr_storage *addr,
149                                  socklen_t *addr_len)
150 {
151         struct tevent_req *result;
152         struct recvfrom_state *state;
153         struct tevent_fd *fde;
154
155         result = tevent_req_create(mem_ctx, &state, struct recvfrom_state);
156         if (result == NULL) {
157                 return result;
158         }
159         state->fd = fd;
160         state->buf = buf;
161         state->len = len;
162         state->flags = flags;
163         state->addr = addr;
164         state->addr_len = addr_len;
165
166         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, recvfrom_handler,
167                             result);
168         if (fde == NULL) {
169                 TALLOC_FREE(result);
170                 return NULL;
171         }
172         return result;
173 }
174
175 static void recvfrom_handler(struct tevent_context *ev,
176                                struct tevent_fd *fde,
177                                uint16_t flags, void *private_data)
178 {
179         struct tevent_req *req = talloc_get_type_abort(
180                 private_data, struct tevent_req);
181         struct recvfrom_state *state =
182                 tevent_req_data(req, struct recvfrom_state);
183
184         state->received = recvfrom(state->fd, state->buf, state->len,
185                                    state->flags, (struct sockaddr *)state->addr,
186                                    state->addr_len);
187         if ((state->received == -1) && (errno == EINTR)) {
188                 /* retry */
189                 return;
190         }
191         if (state->received == 0) {
192                 tevent_req_error(req, EPIPE);
193                 return;
194         }
195         if (state->received == -1) {
196                 tevent_req_error(req, errno);
197                 return;
198         }
199         tevent_req_done(req);
200 }
201
202 ssize_t recvfrom_recv(struct tevent_req *req, int *perrno)
203 {
204         struct recvfrom_state *state =
205                 tevent_req_data(req, struct recvfrom_state);
206
207         if (tevent_req_is_unix_error(req, perrno)) {
208                 return -1;
209         }
210         return state->received;
211 }
212
213 struct async_connect_state {
214         int fd;
215         int result;
216         int sys_errno;
217         long old_sockflags;
218         socklen_t address_len;
219         struct sockaddr_storage address;
220 };
221
222 static void async_connect_connected(struct tevent_context *ev,
223                                     struct tevent_fd *fde, uint16_t flags,
224                                     void *priv);
225
226 /**
227  * @brief async version of connect(2)
228  * @param[in] mem_ctx   The memory context to hang the result off
229  * @param[in] ev        The event context to work from
230  * @param[in] fd        The socket to recv from
231  * @param[in] address   Where to connect?
232  * @param[in] address_len Length of *address
233  * @retval The async request
234  *
235  * This function sets the socket into non-blocking state to be able to call
236  * connect in an async state. This will be reset when the request is finished.
237  */
238
239 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
240                                       struct tevent_context *ev,
241                                       int fd, const struct sockaddr *address,
242                                       socklen_t address_len)
243 {
244         struct tevent_req *result;
245         struct async_connect_state *state;
246         struct tevent_fd *fde;
247
248         result = tevent_req_create(
249                 mem_ctx, &state, struct async_connect_state);
250         if (result == NULL) {
251                 return NULL;
252         }
253
254         /**
255          * We have to set the socket to nonblocking for async connect(2). Keep
256          * the old sockflags around.
257          */
258
259         state->fd = fd;
260         state->sys_errno = 0;
261
262         state->old_sockflags = fcntl(fd, F_GETFL, 0);
263         if (state->old_sockflags == -1) {
264                 goto post_errno;
265         }
266
267         state->address_len = address_len;
268         if (address_len > sizeof(state->address)) {
269                 errno = EINVAL;
270                 goto post_errno;
271         }
272         memcpy(&state->address, address, address_len);
273
274         set_blocking(fd, false);
275
276         state->result = connect(fd, address, address_len);
277         if (state->result == 0) {
278                 tevent_req_done(result);
279                 goto done;
280         }
281
282         /**
283          * A number of error messages show that something good is progressing
284          * and that we have to wait for readability.
285          *
286          * If none of them are present, bail out.
287          */
288
289         if (!(errno == EINPROGRESS || errno == EALREADY ||
290 #ifdef EISCONN
291               errno == EISCONN ||
292 #endif
293               errno == EAGAIN || errno == EINTR)) {
294                 state->sys_errno = errno;
295                 goto post_errno;
296         }
297
298         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
299                            async_connect_connected, result);
300         if (fde == NULL) {
301                 state->sys_errno = ENOMEM;
302                 goto post_errno;
303         }
304         return result;
305
306  post_errno:
307         tevent_req_error(result, state->sys_errno);
308  done:
309         fcntl(fd, F_SETFL, state->old_sockflags);
310         return tevent_req_post(result, ev);
311 }
312
313 /**
314  * fde event handler for connect(2)
315  * @param[in] ev        The event context that sent us here
316  * @param[in] fde       The file descriptor event associated with the connect
317  * @param[in] flags     Indicate read/writeability of the socket
318  * @param[in] priv      private data, "struct async_req *" in this case
319  */
320
321 static void async_connect_connected(struct tevent_context *ev,
322                                     struct tevent_fd *fde, uint16_t flags,
323                                     void *priv)
324 {
325         struct tevent_req *req = talloc_get_type_abort(
326                 priv, struct tevent_req);
327         struct async_connect_state *state =
328                 tevent_req_data(req, struct async_connect_state);
329         int ret;
330
331         ret = connect(state->fd, (struct sockaddr *)(void *)&state->address,
332                       state->address_len);
333         if (ret == 0) {
334                 state->sys_errno = 0;
335                 TALLOC_FREE(fde);
336                 tevent_req_done(req);
337                 return;
338         }
339         if (errno == EINPROGRESS) {
340                 /* Try again later, leave the fde around */
341                 return;
342         }
343         state->sys_errno = errno;
344         TALLOC_FREE(fde);
345         tevent_req_error(req, errno);
346         return;
347 }
348
349 int async_connect_recv(struct tevent_req *req, int *perrno)
350 {
351         struct async_connect_state *state =
352                 tevent_req_data(req, struct async_connect_state);
353         int err;
354
355         fcntl(state->fd, F_SETFL, state->old_sockflags);
356
357         if (tevent_req_is_unix_error(req, &err)) {
358                 *perrno = err;
359                 return -1;
360         }
361
362         if (state->sys_errno == 0) {
363                 return 0;
364         }
365
366         *perrno = state->sys_errno;
367         return -1;
368 }
369
370 struct writev_state {
371         struct tevent_context *ev;
372         int fd;
373         struct iovec *iov;
374         int count;
375         size_t total_size;
376         uint16_t flags;
377         bool err_on_readability;
378 };
379
380 static void writev_trigger(struct tevent_req *req, void *private_data);
381 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
382                            uint16_t flags, void *private_data);
383
384 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
385                                struct tevent_queue *queue, int fd,
386                                bool err_on_readability,
387                                struct iovec *iov, int count)
388 {
389         struct tevent_req *req;
390         struct writev_state *state;
391
392         req = tevent_req_create(mem_ctx, &state, struct writev_state);
393         if (req == NULL) {
394                 return NULL;
395         }
396         state->ev = ev;
397         state->fd = fd;
398         state->total_size = 0;
399         state->count = count;
400         state->iov = (struct iovec *)talloc_memdup(
401                 state, iov, sizeof(struct iovec) * count);
402         if (state->iov == NULL) {
403                 goto fail;
404         }
405         state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
406         state->err_on_readability = err_on_readability;
407
408         if (queue == NULL) {
409                 struct tevent_fd *fde;
410                 fde = tevent_add_fd(state->ev, state, state->fd,
411                                     state->flags, writev_handler, req);
412                 if (tevent_req_nomem(fde, req)) {
413                         return tevent_req_post(req, ev);
414                 }
415                 return req;
416         }
417
418         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
419                 goto fail;
420         }
421         return req;
422  fail:
423         TALLOC_FREE(req);
424         return NULL;
425 }
426
427 static void writev_trigger(struct tevent_req *req, void *private_data)
428 {
429         struct writev_state *state = tevent_req_data(req, struct writev_state);
430         struct tevent_fd *fde;
431
432         fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
433                             writev_handler, req);
434         if (fde == NULL) {
435                 tevent_req_error(req, ENOMEM);
436         }
437 }
438
439 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
440                            uint16_t flags, void *private_data)
441 {
442         struct tevent_req *req = talloc_get_type_abort(
443                 private_data, struct tevent_req);
444         struct writev_state *state =
445                 tevent_req_data(req, struct writev_state);
446         size_t to_write, written;
447         int i;
448
449         to_write = 0;
450
451         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
452                 int ret, value;
453
454                 if (state->err_on_readability) {
455                         /* Readable and the caller wants an error on read. */
456                         tevent_req_error(req, EPIPE);
457                         return;
458                 }
459
460                 /* Might be an error. Check if there are bytes to read */
461                 ret = ioctl(state->fd, FIONREAD, &value);
462                 /* FIXME - should we also check
463                    for ret == 0 and value == 0 here ? */
464                 if (ret == -1) {
465                         /* There's an error. */
466                         tevent_req_error(req, EPIPE);
467                         return;
468                 }
469                 /* A request for TEVENT_FD_READ will succeed from now and
470                    forevermore until the bytes are read so if there was
471                    an error we'll wait until we do read, then get it in
472                    the read callback function. Until then, remove TEVENT_FD_READ
473                    from the flags we're waiting for. */
474                 state->flags &= ~TEVENT_FD_READ;
475                 TEVENT_FD_NOT_READABLE(fde);
476
477                 /* If not writable, we're done. */
478                 if (!(flags & TEVENT_FD_WRITE)) {
479                         return;
480                 }
481         }
482
483         for (i=0; i<state->count; i++) {
484                 to_write += state->iov[i].iov_len;
485         }
486
487         written = writev(state->fd, state->iov, state->count);
488         if ((written == -1) && (errno == EINTR)) {
489                 /* retry */
490                 return;
491         }
492         if (written == -1) {
493                 tevent_req_error(req, errno);
494                 return;
495         }
496         if (written == 0) {
497                 tevent_req_error(req, EPIPE);
498                 return;
499         }
500         state->total_size += written;
501
502         if (written == to_write) {
503                 tevent_req_done(req);
504                 return;
505         }
506
507         /*
508          * We've written less than we were asked to, drop stuff from
509          * state->iov.
510          */
511
512         while (written > 0) {
513                 if (written < state->iov[0].iov_len) {
514                         state->iov[0].iov_base =
515                                 (char *)state->iov[0].iov_base + written;
516                         state->iov[0].iov_len -= written;
517                         break;
518                 }
519                 written -= state->iov[0].iov_len;
520                 state->iov += 1;
521                 state->count -= 1;
522         }
523 }
524
525 ssize_t writev_recv(struct tevent_req *req, int *perrno)
526 {
527         struct writev_state *state =
528                 tevent_req_data(req, struct writev_state);
529
530         if (tevent_req_is_unix_error(req, perrno)) {
531                 return -1;
532         }
533         return state->total_size;
534 }
535
536 struct read_packet_state {
537         int fd;
538         uint8_t *buf;
539         size_t nread;
540         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
541         void *private_data;
542 };
543
544 static void read_packet_handler(struct tevent_context *ev,
545                                 struct tevent_fd *fde,
546                                 uint16_t flags, void *private_data);
547
548 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
549                                     struct tevent_context *ev,
550                                     int fd, size_t initial,
551                                     ssize_t (*more)(uint8_t *buf,
552                                                     size_t buflen,
553                                                     void *private_data),
554                                     void *private_data)
555 {
556         struct tevent_req *result;
557         struct read_packet_state *state;
558         struct tevent_fd *fde;
559
560         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
561         if (result == NULL) {
562                 return NULL;
563         }
564         state->fd = fd;
565         state->nread = 0;
566         state->more = more;
567         state->private_data = private_data;
568
569         state->buf = talloc_array(state, uint8_t, initial);
570         if (state->buf == NULL) {
571                 goto fail;
572         }
573
574         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
575                             result);
576         if (fde == NULL) {
577                 goto fail;
578         }
579         return result;
580  fail:
581         TALLOC_FREE(result);
582         return NULL;
583 }
584
585 static void read_packet_handler(struct tevent_context *ev,
586                                 struct tevent_fd *fde,
587                                 uint16_t flags, void *private_data)
588 {
589         struct tevent_req *req = talloc_get_type_abort(
590                 private_data, struct tevent_req);
591         struct read_packet_state *state =
592                 tevent_req_data(req, struct read_packet_state);
593         size_t total = talloc_get_size(state->buf);
594         ssize_t nread, more;
595         uint8_t *tmp;
596
597         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
598                      0);
599         if ((nread == -1) && (errno == EINTR)) {
600                 /* retry */
601                 return;
602         }
603         if (nread == -1) {
604                 tevent_req_error(req, errno);
605                 return;
606         }
607         if (nread == 0) {
608                 tevent_req_error(req, EPIPE);
609                 return;
610         }
611
612         state->nread += nread;
613         if (state->nread < total) {
614                 /* Come back later */
615                 return;
616         }
617
618         /*
619          * We got what was initially requested. See if "more" asks for -- more.
620          */
621         if (state->more == NULL) {
622                 /* Nobody to ask, this is a async read_data */
623                 tevent_req_done(req);
624                 return;
625         }
626
627         more = state->more(state->buf, total, state->private_data);
628         if (more == -1) {
629                 /* We got an invalid packet, tell the caller */
630                 tevent_req_error(req, EIO);
631                 return;
632         }
633         if (more == 0) {
634                 /* We're done, full packet received */
635                 tevent_req_done(req);
636                 return;
637         }
638
639         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
640         if (tevent_req_nomem(tmp, req)) {
641                 return;
642         }
643         state->buf = tmp;
644 }
645
646 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
647                          uint8_t **pbuf, int *perrno)
648 {
649         struct read_packet_state *state =
650                 tevent_req_data(req, struct read_packet_state);
651
652         if (tevent_req_is_unix_error(req, perrno)) {
653                 return -1;
654         }
655         *pbuf = talloc_move(mem_ctx, &state->buf);
656         return talloc_get_size(*pbuf);
657 }
658
659 struct wait_for_read_state {
660         struct tevent_req *req;
661         struct tevent_fd *fde;
662 };
663
664 static void wait_for_read_done(struct tevent_context *ev,
665                                struct tevent_fd *fde,
666                                uint16_t flags,
667                                void *private_data);
668
669 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
670                                       struct tevent_context *ev,
671                                       int fd)
672 {
673         struct tevent_req *req;
674         struct wait_for_read_state *state;
675
676         req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
677         if (req == NULL) {
678                 return NULL;
679         }
680         state->req = req;
681         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
682                                    wait_for_read_done, state);
683         if (tevent_req_nomem(state->fde, req)) {
684                 return tevent_req_post(req, ev);
685         }
686         return req;
687 }
688
689 static void wait_for_read_done(struct tevent_context *ev,
690                                struct tevent_fd *fde,
691                                uint16_t flags,
692                                void *private_data)
693 {
694         struct wait_for_read_state *state = talloc_get_type_abort(
695                 private_data, struct wait_for_read_state);
696
697         if (flags & TEVENT_FD_READ) {
698                 TALLOC_FREE(state->fde);
699                 tevent_req_done(state->req);
700         }
701 }
702
703 bool wait_for_read_recv(struct tevent_req *req, int *perr)
704 {
705         int err;
706
707         if (tevent_req_is_unix_error(req, &err)) {
708                 *perr = err;
709                 return false;
710         }
711         return true;
712 }