643eb2d308f5c42686f310089d5dc1c4ad599b22
[ira/wip.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "includes.h"
25 #include "lib/talloc/talloc.h"
26 #include "lib/tevent/tevent.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29 #include <fcntl.h>
30
31 #ifndef TALLOC_FREE
32 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
33 #endif
34
35 struct async_send_state {
36         int fd;
37         const void *buf;
38         size_t len;
39         int flags;
40         ssize_t sent;
41 };
42
43 static void async_send_handler(struct tevent_context *ev,
44                                struct tevent_fd *fde,
45                                uint16_t flags, void *private_data);
46
47 struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
48                                    struct tevent_context *ev,
49                                    int fd, const void *buf, size_t len,
50                                    int flags)
51 {
52         struct tevent_req *result;
53         struct async_send_state *state;
54         struct tevent_fd *fde;
55
56         result = tevent_req_create(mem_ctx, &state, struct async_send_state);
57         if (result == NULL) {
58                 return result;
59         }
60         state->fd = fd;
61         state->buf = buf;
62         state->len = len;
63         state->flags = flags;
64
65         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
66                             result);
67         if (fde == NULL) {
68                 TALLOC_FREE(result);
69                 return NULL;
70         }
71         return result;
72 }
73
74 static void async_send_handler(struct tevent_context *ev,
75                                struct tevent_fd *fde,
76                                uint16_t flags, void *private_data)
77 {
78         struct tevent_req *req = talloc_get_type_abort(
79                 private_data, struct tevent_req);
80         struct async_send_state *state =
81                 tevent_req_data(req, struct async_send_state);
82
83         state->sent = send(state->fd, state->buf, state->len, state->flags);
84         if ((state->sent == -1) && (errno == EINTR)) {
85                 /* retry */
86                 return;
87         }
88         if (state->sent == -1) {
89                 tevent_req_error(req, errno);
90                 return;
91         }
92         tevent_req_done(req);
93 }
94
95 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
96 {
97         struct async_send_state *state =
98                 tevent_req_data(req, struct async_send_state);
99
100         if (tevent_req_is_unix_error(req, perrno)) {
101                 return -1;
102         }
103         return state->sent;
104 }
105
106 struct async_recv_state {
107         int fd;
108         void *buf;
109         size_t len;
110         int flags;
111         ssize_t received;
112 };
113
114 static void async_recv_handler(struct tevent_context *ev,
115                                struct tevent_fd *fde,
116                                uint16_t flags, void *private_data);
117
118 struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
119                                    struct tevent_context *ev,
120                                    int fd, void *buf, size_t len, int flags)
121 {
122         struct tevent_req *result;
123         struct async_recv_state *state;
124         struct tevent_fd *fde;
125
126         result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
127         if (result == NULL) {
128                 return result;
129         }
130         state->fd = fd;
131         state->buf = buf;
132         state->len = len;
133         state->flags = flags;
134
135         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
136                             result);
137         if (fde == NULL) {
138                 TALLOC_FREE(result);
139                 return NULL;
140         }
141         return result;
142 }
143
144 static void async_recv_handler(struct tevent_context *ev,
145                                struct tevent_fd *fde,
146                                uint16_t flags, void *private_data)
147 {
148         struct tevent_req *req = talloc_get_type_abort(
149                 private_data, struct tevent_req);
150         struct async_recv_state *state =
151                 tevent_req_data(req, struct async_recv_state);
152
153         state->received = recv(state->fd, state->buf, state->len,
154                                state->flags);
155         if ((state->received == -1) && (errno == EINTR)) {
156                 /* retry */
157                 return;
158         }
159         if (state->received == -1) {
160                 tevent_req_error(req, errno);
161                 return;
162         }
163         tevent_req_done(req);
164 }
165
166 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
167 {
168         struct async_recv_state *state =
169                 tevent_req_data(req, struct async_recv_state);
170
171         if (tevent_req_is_unix_error(req, perrno)) {
172                 return -1;
173         }
174         return state->received;
175 }
176
177 struct async_connect_state {
178         int fd;
179         int result;
180         int sys_errno;
181         long old_sockflags;
182         socklen_t address_len;
183         struct sockaddr_storage address;
184 };
185
186 static void async_connect_connected(struct tevent_context *ev,
187                                     struct tevent_fd *fde, uint16_t flags,
188                                     void *priv);
189
190 /**
191  * @brief async version of connect(2)
192  * @param[in] mem_ctx   The memory context to hang the result off
193  * @param[in] ev        The event context to work from
194  * @param[in] fd        The socket to recv from
195  * @param[in] address   Where to connect?
196  * @param[in] address_len Length of *address
197  * @retval The async request
198  *
199  * This function sets the socket into non-blocking state to be able to call
200  * connect in an async state. This will be reset when the request is finished.
201  */
202
203 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
204                                       struct tevent_context *ev,
205                                       int fd, const struct sockaddr *address,
206                                       socklen_t address_len)
207 {
208         struct tevent_req *result;
209         struct async_connect_state *state;
210         struct tevent_fd *fde;
211
212         result = tevent_req_create(
213                 mem_ctx, &state, struct async_connect_state);
214         if (result == NULL) {
215                 return NULL;
216         }
217
218         /**
219          * We have to set the socket to nonblocking for async connect(2). Keep
220          * the old sockflags around.
221          */
222
223         state->fd = fd;
224         state->sys_errno = 0;
225
226         state->old_sockflags = fcntl(fd, F_GETFL, 0);
227         if (state->old_sockflags == -1) {
228                 goto post_errno;
229         }
230
231         state->address_len = address_len;
232         if (address_len > sizeof(state->address)) {
233                 errno = EINVAL;
234                 goto post_errno;
235         }
236         memcpy(&state->address, address, address_len);
237
238         set_blocking(fd, false);
239
240         state->result = connect(fd, address, address_len);
241         if (state->result == 0) {
242                 tevent_req_done(result);
243                 goto done;
244         }
245
246         /**
247          * A number of error messages show that something good is progressing
248          * and that we have to wait for readability.
249          *
250          * If none of them are present, bail out.
251          */
252
253         if (!(errno == EINPROGRESS || errno == EALREADY ||
254 #ifdef EISCONN
255               errno == EISCONN ||
256 #endif
257               errno == EAGAIN || errno == EINTR)) {
258                 state->sys_errno = errno;
259                 goto post_errno;
260         }
261
262         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
263                            async_connect_connected, result);
264         if (fde == NULL) {
265                 state->sys_errno = ENOMEM;
266                 goto post_errno;
267         }
268         return result;
269
270  post_errno:
271         tevent_req_error(result, state->sys_errno);
272  done:
273         fcntl(fd, F_SETFL, state->old_sockflags);
274         return tevent_req_post(result, ev);
275 }
276
277 /**
278  * fde event handler for connect(2)
279  * @param[in] ev        The event context that sent us here
280  * @param[in] fde       The file descriptor event associated with the connect
281  * @param[in] flags     Indicate read/writeability of the socket
282  * @param[in] priv      private data, "struct async_req *" in this case
283  */
284
285 static void async_connect_connected(struct tevent_context *ev,
286                                     struct tevent_fd *fde, uint16_t flags,
287                                     void *priv)
288 {
289         struct tevent_req *req = talloc_get_type_abort(
290                 priv, struct tevent_req);
291         struct async_connect_state *state =
292                 tevent_req_data(req, struct async_connect_state);
293
294         /*
295          * Stevens, Network Programming says that if there's a
296          * successful connect, the socket is only writable. Upon an
297          * error, it's both readable and writable.
298          */
299         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
300             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
301                 int ret;
302
303                 ret = connect(state->fd,
304                               (struct sockaddr *)(void *)&state->address,
305                               state->address_len);
306                 if (ret == 0) {
307                         TALLOC_FREE(fde);
308                         tevent_req_done(req);
309                         return;
310                 }
311
312                 if (errno == EINPROGRESS) {
313                         /* Try again later, leave the fde around */
314                         return;
315                 }
316                 TALLOC_FREE(fde);
317                 tevent_req_error(req, errno);
318                 return;
319         }
320
321         state->sys_errno = 0;
322         tevent_req_done(req);
323 }
324
325 int async_connect_recv(struct tevent_req *req, int *perrno)
326 {
327         struct async_connect_state *state =
328                 tevent_req_data(req, struct async_connect_state);
329         int err;
330
331         fcntl(state->fd, F_SETFL, state->old_sockflags);
332
333         if (tevent_req_is_unix_error(req, &err)) {
334                 *perrno = err;
335                 return -1;
336         }
337
338         if (state->sys_errno == 0) {
339                 return 0;
340         }
341
342         *perrno = state->sys_errno;
343         return -1;
344 }
345
346 struct writev_state {
347         struct tevent_context *ev;
348         int fd;
349         struct iovec *iov;
350         int count;
351         size_t total_size;
352         uint16_t flags;
353 };
354
355 static void writev_trigger(struct tevent_req *req, void *private_data);
356 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
357                            uint16_t flags, void *private_data);
358
359 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
360                                struct tevent_queue *queue, int fd,
361                                bool err_on_readability,
362                                struct iovec *iov, int count)
363 {
364         struct tevent_req *req;
365         struct writev_state *state;
366
367         req = tevent_req_create(mem_ctx, &state, struct writev_state);
368         if (req == NULL) {
369                 return NULL;
370         }
371         state->ev = ev;
372         state->fd = fd;
373         state->total_size = 0;
374         state->count = count;
375         state->iov = (struct iovec *)talloc_memdup(
376                 state, iov, sizeof(struct iovec) * count);
377         if (state->iov == NULL) {
378                 goto fail;
379         }
380         state->flags = TEVENT_FD_WRITE;
381         if (err_on_readability) {
382                 state->flags |= TEVENT_FD_READ;
383         }
384
385         if (queue == NULL) {
386                 struct tevent_fd *fde;
387                 fde = tevent_add_fd(state->ev, state, state->fd,
388                                     state->flags, writev_handler, req);
389                 if (tevent_req_nomem(fde, req)) {
390                         return tevent_req_post(req, ev);
391                 }
392                 return req;
393         }
394
395         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
396                 goto fail;
397         }
398         return req;
399  fail:
400         TALLOC_FREE(req);
401         return NULL;
402 }
403
404 static void writev_trigger(struct tevent_req *req, void *private_data)
405 {
406         struct writev_state *state = tevent_req_data(req, struct writev_state);
407         struct tevent_fd *fde;
408
409         fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
410                             writev_handler, req);
411         if (fde == NULL) {
412                 tevent_req_error(req, ENOMEM);
413         }
414 }
415
416 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
417                            uint16_t flags, void *private_data)
418 {
419         struct tevent_req *req = talloc_get_type_abort(
420                 private_data, struct tevent_req);
421         struct writev_state *state =
422                 tevent_req_data(req, struct writev_state);
423         size_t to_write, written;
424         int i;
425
426         to_write = 0;
427
428         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
429                 tevent_req_error(req, EPIPE);
430                 return;
431         }
432
433         for (i=0; i<state->count; i++) {
434                 to_write += state->iov[i].iov_len;
435         }
436
437         written = writev(state->fd, state->iov, state->count);
438         if ((written == -1) && (errno = EINTR)) {
439                 /* retry */
440                 return;
441         }
442         if (written == -1) {
443                 tevent_req_error(req, errno);
444                 return;
445         }
446         if (written == 0) {
447                 tevent_req_error(req, EPIPE);
448                 return;
449         }
450         state->total_size += written;
451
452         if (written == to_write) {
453                 tevent_req_done(req);
454                 return;
455         }
456
457         /*
458          * We've written less than we were asked to, drop stuff from
459          * state->iov.
460          */
461
462         while (written > 0) {
463                 if (written < state->iov[0].iov_len) {
464                         state->iov[0].iov_base =
465                                 (char *)state->iov[0].iov_base + written;
466                         state->iov[0].iov_len -= written;
467                         break;
468                 }
469                 written -= state->iov[0].iov_len;
470                 state->iov += 1;
471                 state->count -= 1;
472         }
473 }
474
475 ssize_t writev_recv(struct tevent_req *req, int *perrno)
476 {
477         struct writev_state *state =
478                 tevent_req_data(req, struct writev_state);
479
480         if (tevent_req_is_unix_error(req, perrno)) {
481                 return -1;
482         }
483         return state->total_size;
484 }
485
486 struct read_packet_state {
487         int fd;
488         uint8_t *buf;
489         size_t nread;
490         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
491         void *private_data;
492 };
493
494 static void read_packet_handler(struct tevent_context *ev,
495                                 struct tevent_fd *fde,
496                                 uint16_t flags, void *private_data);
497
498 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
499                                     struct tevent_context *ev,
500                                     int fd, size_t initial,
501                                     ssize_t (*more)(uint8_t *buf,
502                                                     size_t buflen,
503                                                     void *private_data),
504                                     void *private_data)
505 {
506         struct tevent_req *result;
507         struct read_packet_state *state;
508         struct tevent_fd *fde;
509
510         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
511         if (result == NULL) {
512                 return NULL;
513         }
514         state->fd = fd;
515         state->nread = 0;
516         state->more = more;
517         state->private_data = private_data;
518
519         state->buf = talloc_array(state, uint8_t, initial);
520         if (state->buf == NULL) {
521                 goto fail;
522         }
523
524         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
525                             result);
526         if (fde == NULL) {
527                 goto fail;
528         }
529         return result;
530  fail:
531         TALLOC_FREE(result);
532         return NULL;
533 }
534
535 static void read_packet_handler(struct tevent_context *ev,
536                                 struct tevent_fd *fde,
537                                 uint16_t flags, void *private_data)
538 {
539         struct tevent_req *req = talloc_get_type_abort(
540                 private_data, struct tevent_req);
541         struct read_packet_state *state =
542                 tevent_req_data(req, struct read_packet_state);
543         size_t total = talloc_get_size(state->buf);
544         ssize_t nread, more;
545         uint8_t *tmp;
546
547         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
548                      0);
549         if ((nread == -1) && (errno == EINTR)) {
550                 /* retry */
551                 return;
552         }
553         if (nread == -1) {
554                 tevent_req_error(req, errno);
555                 return;
556         }
557         if (nread == 0) {
558                 tevent_req_error(req, EPIPE);
559                 return;
560         }
561
562         state->nread += nread;
563         if (state->nread < total) {
564                 /* Come back later */
565                 return;
566         }
567
568         /*
569          * We got what was initially requested. See if "more" asks for -- more.
570          */
571         if (state->more == NULL) {
572                 /* Nobody to ask, this is a async read_data */
573                 tevent_req_done(req);
574                 return;
575         }
576
577         more = state->more(state->buf, total, state->private_data);
578         if (more == -1) {
579                 /* We got an invalid packet, tell the caller */
580                 tevent_req_error(req, EIO);
581                 return;
582         }
583         if (more == 0) {
584                 /* We're done, full packet received */
585                 tevent_req_done(req);
586                 return;
587         }
588
589         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
590         if (tevent_req_nomem(tmp, req)) {
591                 return;
592         }
593         state->buf = tmp;
594 }
595
596 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
597                          uint8_t **pbuf, int *perrno)
598 {
599         struct read_packet_state *state =
600                 tevent_req_data(req, struct read_packet_state);
601
602         if (tevent_req_is_unix_error(req, perrno)) {
603                 return -1;
604         }
605         *pbuf = talloc_move(mem_ctx, &state->buf);
606         return talloc_get_size(*pbuf);
607 }