598a126467d75411206bc3d3271a0ca7b32de5a9
[ira/wip.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "includes.h"
25 #include "lib/talloc/talloc.h"
26 #include "lib/tevent/tevent.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29 #include <fcntl.h>
30
31 #ifndef TALLOC_FREE
32 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
33 #endif
34
35 struct async_send_state {
36         int fd;
37         const void *buf;
38         size_t len;
39         int flags;
40         ssize_t sent;
41 };
42
43 static void async_send_handler(struct tevent_context *ev,
44                                struct tevent_fd *fde,
45                                uint16_t flags, void *private_data);
46
47 struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
48                                    struct tevent_context *ev,
49                                    int fd, const void *buf, size_t len,
50                                    int flags)
51 {
52         struct tevent_req *result;
53         struct async_send_state *state;
54         struct tevent_fd *fde;
55
56         result = tevent_req_create(mem_ctx, &state, struct async_send_state);
57         if (result == NULL) {
58                 return result;
59         }
60         state->fd = fd;
61         state->buf = buf;
62         state->len = len;
63         state->flags = flags;
64
65         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
66                             result);
67         if (fde == NULL) {
68                 TALLOC_FREE(result);
69                 return NULL;
70         }
71         return result;
72 }
73
74 static void async_send_handler(struct tevent_context *ev,
75                                struct tevent_fd *fde,
76                                uint16_t flags, void *private_data)
77 {
78         struct tevent_req *req = talloc_get_type_abort(
79                 private_data, struct tevent_req);
80         struct async_send_state *state =
81                 tevent_req_data(req, struct async_send_state);
82
83         state->sent = send(state->fd, state->buf, state->len, state->flags);
84         if (state->sent == -1) {
85                 tevent_req_error(req, errno);
86                 return;
87         }
88         tevent_req_done(req);
89 }
90
91 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
92 {
93         struct async_send_state *state =
94                 tevent_req_data(req, struct async_send_state);
95
96         if (tevent_req_is_unix_error(req, perrno)) {
97                 return -1;
98         }
99         return state->sent;
100 }
101
102 struct async_recv_state {
103         int fd;
104         void *buf;
105         size_t len;
106         int flags;
107         ssize_t received;
108 };
109
110 static void async_recv_handler(struct tevent_context *ev,
111                                struct tevent_fd *fde,
112                                uint16_t flags, void *private_data);
113
114 struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
115                                    struct tevent_context *ev,
116                                    int fd, void *buf, size_t len, int flags)
117 {
118         struct tevent_req *result;
119         struct async_recv_state *state;
120         struct tevent_fd *fde;
121
122         result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
123         if (result == NULL) {
124                 return result;
125         }
126         state->fd = fd;
127         state->buf = buf;
128         state->len = len;
129         state->flags = flags;
130
131         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
132                             result);
133         if (fde == NULL) {
134                 TALLOC_FREE(result);
135                 return NULL;
136         }
137         return result;
138 }
139
140 static void async_recv_handler(struct tevent_context *ev,
141                                struct tevent_fd *fde,
142                                uint16_t flags, void *private_data)
143 {
144         struct tevent_req *req = talloc_get_type_abort(
145                 private_data, struct tevent_req);
146         struct async_recv_state *state =
147                 tevent_req_data(req, struct async_recv_state);
148
149         state->received = recv(state->fd, state->buf, state->len,
150                                state->flags);
151         if (state->received == -1) {
152                 tevent_req_error(req, errno);
153                 return;
154         }
155         tevent_req_done(req);
156 }
157
158 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
159 {
160         struct async_recv_state *state =
161                 tevent_req_data(req, struct async_recv_state);
162
163         if (tevent_req_is_unix_error(req, perrno)) {
164                 return -1;
165         }
166         return state->received;
167 }
168
169 struct async_connect_state {
170         int fd;
171         int result;
172         int sys_errno;
173         long old_sockflags;
174         socklen_t address_len;
175         struct sockaddr_storage address;
176 };
177
178 static void async_connect_connected(struct tevent_context *ev,
179                                     struct tevent_fd *fde, uint16_t flags,
180                                     void *priv);
181
182 /**
183  * @brief async version of connect(2)
184  * @param[in] mem_ctx   The memory context to hang the result off
185  * @param[in] ev        The event context to work from
186  * @param[in] fd        The socket to recv from
187  * @param[in] address   Where to connect?
188  * @param[in] address_len Length of *address
189  * @retval The async request
190  *
191  * This function sets the socket into non-blocking state to be able to call
192  * connect in an async state. This will be reset when the request is finished.
193  */
194
195 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
196                                       struct tevent_context *ev,
197                                       int fd, const struct sockaddr *address,
198                                       socklen_t address_len)
199 {
200         struct tevent_req *result;
201         struct async_connect_state *state;
202         struct tevent_fd *fde;
203
204         result = tevent_req_create(
205                 mem_ctx, &state, struct async_connect_state);
206         if (result == NULL) {
207                 return NULL;
208         }
209
210         /**
211          * We have to set the socket to nonblocking for async connect(2). Keep
212          * the old sockflags around.
213          */
214
215         state->fd = fd;
216         state->sys_errno = 0;
217
218         state->address_len = address_len;
219         if (address_len > sizeof(state->address)) {
220                 errno = EINVAL;
221                 goto post_errno;
222         }
223         memcpy(&state->address, address, address_len);
224
225         state->old_sockflags = fcntl(fd, F_GETFL, 0);
226         if (state->old_sockflags == -1) {
227                 goto post_errno;
228         }
229
230         set_blocking(fd, false);
231
232         state->result = connect(fd, address, address_len);
233         if (state->result == 0) {
234                 tevent_req_done(result);
235                 goto done;
236         }
237
238         /**
239          * A number of error messages show that something good is progressing
240          * and that we have to wait for readability.
241          *
242          * If none of them are present, bail out.
243          */
244
245         if (!(errno == EINPROGRESS || errno == EALREADY ||
246 #ifdef EISCONN
247               errno == EISCONN ||
248 #endif
249               errno == EAGAIN || errno == EINTR)) {
250                 state->sys_errno = errno;
251                 goto post_errno;
252         }
253
254         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
255                            async_connect_connected, result);
256         if (fde == NULL) {
257                 state->sys_errno = ENOMEM;
258                 goto post_errno;
259         }
260         return result;
261
262  post_errno:
263         tevent_req_error(result, state->sys_errno);
264  done:
265         fcntl(fd, F_SETFL, state->old_sockflags);
266         return tevent_req_post(result, ev);
267 }
268
269 /**
270  * fde event handler for connect(2)
271  * @param[in] ev        The event context that sent us here
272  * @param[in] fde       The file descriptor event associated with the connect
273  * @param[in] flags     Indicate read/writeability of the socket
274  * @param[in] priv      private data, "struct async_req *" in this case
275  */
276
277 static void async_connect_connected(struct tevent_context *ev,
278                                     struct tevent_fd *fde, uint16_t flags,
279                                     void *priv)
280 {
281         struct tevent_req *req = talloc_get_type_abort(
282                 priv, struct tevent_req);
283         struct async_connect_state *state =
284                 tevent_req_data(req, struct async_connect_state);
285
286         /*
287          * Stevens, Network Programming says that if there's a
288          * successful connect, the socket is only writable. Upon an
289          * error, it's both readable and writable.
290          */
291         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
292             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
293                 int ret;
294
295                 ret = connect(state->fd,
296                               (struct sockaddr *)(void *)&state->address,
297                               state->address_len);
298                 if (ret == 0) {
299                         TALLOC_FREE(fde);
300                         tevent_req_done(req);
301                         return;
302                 }
303
304                 if (errno == EINPROGRESS) {
305                         /* Try again later, leave the fde around */
306                         return;
307                 }
308                 TALLOC_FREE(fde);
309                 tevent_req_error(req, errno);
310                 return;
311         }
312
313         state->sys_errno = 0;
314         tevent_req_done(req);
315 }
316
317 int async_connect_recv(struct tevent_req *req, int *perrno)
318 {
319         struct async_connect_state *state =
320                 tevent_req_data(req, struct async_connect_state);
321         int err;
322
323         fcntl(state->fd, F_SETFL, state->old_sockflags);
324
325         if (tevent_req_is_unix_error(req, &err)) {
326                 *perrno = err;
327                 return -1;
328         }
329
330         if (state->sys_errno == 0) {
331                 return 0;
332         }
333
334         *perrno = state->sys_errno;
335         return -1;
336 }
337
338 struct writev_state {
339         struct tevent_context *ev;
340         int fd;
341         struct iovec *iov;
342         int count;
343         size_t total_size;
344         uint16_t flags;
345 };
346
347 static void writev_trigger(struct tevent_req *req, void *private_data);
348 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
349                            uint16_t flags, void *private_data);
350
351 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
352                                struct tevent_queue *queue, int fd,
353                                bool err_on_readability,
354                                struct iovec *iov, int count)
355 {
356         struct tevent_req *req;
357         struct writev_state *state;
358
359         req = tevent_req_create(mem_ctx, &state, struct writev_state);
360         if (req == NULL) {
361                 return NULL;
362         }
363         state->ev = ev;
364         state->fd = fd;
365         state->total_size = 0;
366         state->count = count;
367         state->iov = (struct iovec *)talloc_memdup(
368                 state, iov, sizeof(struct iovec) * count);
369         if (state->iov == NULL) {
370                 goto fail;
371         }
372         state->flags = TEVENT_FD_WRITE;
373         if (err_on_readability) {
374                 state->flags |= TEVENT_FD_READ;
375         }
376
377         if (queue == NULL) {
378                 struct tevent_fd *fde;
379                 fde = tevent_add_fd(state->ev, state, state->fd,
380                                     state->flags, writev_handler, req);
381                 if (tevent_req_nomem(fde, req)) {
382                         return tevent_req_post(req, ev);
383                 }
384                 return req;
385         }
386
387         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
388                 goto fail;
389         }
390         return req;
391  fail:
392         TALLOC_FREE(req);
393         return NULL;
394 }
395
396 static void writev_trigger(struct tevent_req *req, void *private_data)
397 {
398         struct writev_state *state = tevent_req_data(req, struct writev_state);
399         struct tevent_fd *fde;
400
401         fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
402                             writev_handler, req);
403         if (fde == NULL) {
404                 tevent_req_error(req, ENOMEM);
405         }
406 }
407
408 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
409                            uint16_t flags, void *private_data)
410 {
411         struct tevent_req *req = talloc_get_type_abort(
412                 private_data, struct tevent_req);
413         struct writev_state *state =
414                 tevent_req_data(req, struct writev_state);
415         size_t to_write, written;
416         int i;
417
418         to_write = 0;
419
420         if (flags & TEVENT_FD_READ) {
421                 tevent_req_error(req, EPIPE);
422                 return;
423         }
424
425         for (i=0; i<state->count; i++) {
426                 to_write += state->iov[i].iov_len;
427         }
428
429         written = sys_writev(state->fd, state->iov, state->count);
430         if (written == -1) {
431                 tevent_req_error(req, errno);
432                 return;
433         }
434         if (written == 0) {
435                 tevent_req_error(req, EPIPE);
436                 return;
437         }
438         state->total_size += written;
439
440         if (written == to_write) {
441                 tevent_req_done(req);
442                 return;
443         }
444
445         /*
446          * We've written less than we were asked to, drop stuff from
447          * state->iov.
448          */
449
450         while (written > 0) {
451                 if (written < state->iov[0].iov_len) {
452                         state->iov[0].iov_base =
453                                 (char *)state->iov[0].iov_base + written;
454                         state->iov[0].iov_len -= written;
455                         break;
456                 }
457                 written -= state->iov[0].iov_len;
458                 state->iov += 1;
459                 state->count -= 1;
460         }
461 }
462
463 ssize_t writev_recv(struct tevent_req *req, int *perrno)
464 {
465         struct writev_state *state =
466                 tevent_req_data(req, struct writev_state);
467
468         if (tevent_req_is_unix_error(req, perrno)) {
469                 return -1;
470         }
471         return state->total_size;
472 }
473
474 struct read_packet_state {
475         int fd;
476         uint8_t *buf;
477         size_t nread;
478         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
479         void *private_data;
480 };
481
482 static void read_packet_handler(struct tevent_context *ev,
483                                 struct tevent_fd *fde,
484                                 uint16_t flags, void *private_data);
485
486 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
487                                     struct tevent_context *ev,
488                                     int fd, size_t initial,
489                                     ssize_t (*more)(uint8_t *buf,
490                                                     size_t buflen,
491                                                     void *private_data),
492                                     void *private_data)
493 {
494         struct tevent_req *result;
495         struct read_packet_state *state;
496         struct tevent_fd *fde;
497
498         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
499         if (result == NULL) {
500                 return NULL;
501         }
502         state->fd = fd;
503         state->nread = 0;
504         state->more = more;
505         state->private_data = private_data;
506
507         state->buf = talloc_array(state, uint8_t, initial);
508         if (state->buf == NULL) {
509                 goto fail;
510         }
511
512         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
513                             result);
514         if (fde == NULL) {
515                 goto fail;
516         }
517         return result;
518  fail:
519         TALLOC_FREE(result);
520         return NULL;
521 }
522
523 static void read_packet_handler(struct tevent_context *ev,
524                                 struct tevent_fd *fde,
525                                 uint16_t flags, void *private_data)
526 {
527         struct tevent_req *req = talloc_get_type_abort(
528                 private_data, struct tevent_req);
529         struct read_packet_state *state =
530                 tevent_req_data(req, struct read_packet_state);
531         size_t total = talloc_get_size(state->buf);
532         ssize_t nread, more;
533         uint8_t *tmp;
534
535         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
536                      0);
537         if (nread == -1) {
538                 tevent_req_error(req, errno);
539                 return;
540         }
541         if (nread == 0) {
542                 tevent_req_error(req, EPIPE);
543                 return;
544         }
545
546         state->nread += nread;
547         if (state->nread < total) {
548                 /* Come back later */
549                 return;
550         }
551
552         /*
553          * We got what was initially requested. See if "more" asks for -- more.
554          */
555         if (state->more == NULL) {
556                 /* Nobody to ask, this is a async read_data */
557                 tevent_req_done(req);
558                 return;
559         }
560
561         more = state->more(state->buf, total, state->private_data);
562         if (more == -1) {
563                 /* We got an invalid packet, tell the caller */
564                 tevent_req_error(req, EIO);
565                 return;
566         }
567         if (more == 0) {
568                 /* We're done, full packet received */
569                 tevent_req_done(req);
570                 return;
571         }
572
573         tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
574         if (tevent_req_nomem(tmp, req)) {
575                 return;
576         }
577         state->buf = tmp;
578 }
579
580 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
581                          uint8_t **pbuf, int *perrno)
582 {
583         struct read_packet_state *state =
584                 tevent_req_data(req, struct read_packet_state);
585
586         if (tevent_req_is_unix_error(req, perrno)) {
587                 return -1;
588         }
589         *pbuf = talloc_move(mem_ctx, &state->buf);
590         return talloc_get_size(*pbuf);
591 }