Fix a = vs == error in writev_handler
[ira/wip.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30
31 /* Note: lib/util/ is currently GPL */
32 #include "lib/util/tevent_unix.h"
33 #include "lib/util/util.h"
34
35 #ifndef TALLOC_FREE
36 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
37 #endif
38
39 struct async_send_state {
40         int fd;
41         const void *buf;
42         size_t len;
43         int flags;
44         ssize_t sent;
45 };
46
47 static void async_send_handler(struct tevent_context *ev,
48                                struct tevent_fd *fde,
49                                uint16_t flags, void *private_data);
50
51 struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
52                                    struct tevent_context *ev,
53                                    int fd, const void *buf, size_t len,
54                                    int flags)
55 {
56         struct tevent_req *result;
57         struct async_send_state *state;
58         struct tevent_fd *fde;
59
60         result = tevent_req_create(mem_ctx, &state, struct async_send_state);
61         if (result == NULL) {
62                 return result;
63         }
64         state->fd = fd;
65         state->buf = buf;
66         state->len = len;
67         state->flags = flags;
68
69         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
70                             result);
71         if (fde == NULL) {
72                 TALLOC_FREE(result);
73                 return NULL;
74         }
75         return result;
76 }
77
78 static void async_send_handler(struct tevent_context *ev,
79                                struct tevent_fd *fde,
80                                uint16_t flags, void *private_data)
81 {
82         struct tevent_req *req = talloc_get_type_abort(
83                 private_data, struct tevent_req);
84         struct async_send_state *state =
85                 tevent_req_data(req, struct async_send_state);
86
87         state->sent = send(state->fd, state->buf, state->len, state->flags);
88         if ((state->sent == -1) && (errno == EINTR)) {
89                 /* retry */
90                 return;
91         }
92         if (state->sent == -1) {
93                 tevent_req_error(req, errno);
94                 return;
95         }
96         tevent_req_done(req);
97 }
98
99 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
100 {
101         struct async_send_state *state =
102                 tevent_req_data(req, struct async_send_state);
103
104         if (tevent_req_is_unix_error(req, perrno)) {
105                 return -1;
106         }
107         return state->sent;
108 }
109
110 struct async_recv_state {
111         int fd;
112         void *buf;
113         size_t len;
114         int flags;
115         ssize_t received;
116 };
117
118 static void async_recv_handler(struct tevent_context *ev,
119                                struct tevent_fd *fde,
120                                uint16_t flags, void *private_data);
121
122 struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
123                                    struct tevent_context *ev,
124                                    int fd, void *buf, size_t len, int flags)
125 {
126         struct tevent_req *result;
127         struct async_recv_state *state;
128         struct tevent_fd *fde;
129
130         result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
131         if (result == NULL) {
132                 return result;
133         }
134         state->fd = fd;
135         state->buf = buf;
136         state->len = len;
137         state->flags = flags;
138
139         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
140                             result);
141         if (fde == NULL) {
142                 TALLOC_FREE(result);
143                 return NULL;
144         }
145         return result;
146 }
147
148 static void async_recv_handler(struct tevent_context *ev,
149                                struct tevent_fd *fde,
150                                uint16_t flags, void *private_data)
151 {
152         struct tevent_req *req = talloc_get_type_abort(
153                 private_data, struct tevent_req);
154         struct async_recv_state *state =
155                 tevent_req_data(req, struct async_recv_state);
156
157         state->received = recv(state->fd, state->buf, state->len,
158                                state->flags);
159         if ((state->received == -1) && (errno == EINTR)) {
160                 /* retry */
161                 return;
162         }
163         if (state->received == -1) {
164                 tevent_req_error(req, errno);
165                 return;
166         }
167         tevent_req_done(req);
168 }
169
170 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
171 {
172         struct async_recv_state *state =
173                 tevent_req_data(req, struct async_recv_state);
174
175         if (tevent_req_is_unix_error(req, perrno)) {
176                 return -1;
177         }
178         return state->received;
179 }
180
181 struct async_connect_state {
182         int fd;
183         int result;
184         int sys_errno;
185         long old_sockflags;
186         socklen_t address_len;
187         struct sockaddr_storage address;
188 };
189
190 static void async_connect_connected(struct tevent_context *ev,
191                                     struct tevent_fd *fde, uint16_t flags,
192                                     void *priv);
193
194 /**
195  * @brief async version of connect(2)
196  * @param[in] mem_ctx   The memory context to hang the result off
197  * @param[in] ev        The event context to work from
198  * @param[in] fd        The socket to recv from
199  * @param[in] address   Where to connect?
200  * @param[in] address_len Length of *address
201  * @retval The async request
202  *
203  * This function sets the socket into non-blocking state to be able to call
204  * connect in an async state. This will be reset when the request is finished.
205  */
206
207 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
208                                       struct tevent_context *ev,
209                                       int fd, const struct sockaddr *address,
210                                       socklen_t address_len)
211 {
212         struct tevent_req *result;
213         struct async_connect_state *state;
214         struct tevent_fd *fde;
215
216         result = tevent_req_create(
217                 mem_ctx, &state, struct async_connect_state);
218         if (result == NULL) {
219                 return NULL;
220         }
221
222         /**
223          * We have to set the socket to nonblocking for async connect(2). Keep
224          * the old sockflags around.
225          */
226
227         state->fd = fd;
228         state->sys_errno = 0;
229
230         state->old_sockflags = fcntl(fd, F_GETFL, 0);
231         if (state->old_sockflags == -1) {
232                 goto post_errno;
233         }
234
235         state->address_len = address_len;
236         if (address_len > sizeof(state->address)) {
237                 errno = EINVAL;
238                 goto post_errno;
239         }
240         memcpy(&state->address, address, address_len);
241
242         set_blocking(fd, false);
243
244         state->result = connect(fd, address, address_len);
245         if (state->result == 0) {
246                 tevent_req_done(result);
247                 goto done;
248         }
249
250         /**
251          * A number of error messages show that something good is progressing
252          * and that we have to wait for readability.
253          *
254          * If none of them are present, bail out.
255          */
256
257         if (!(errno == EINPROGRESS || errno == EALREADY ||
258 #ifdef EISCONN
259               errno == EISCONN ||
260 #endif
261               errno == EAGAIN || errno == EINTR)) {
262                 state->sys_errno = errno;
263                 goto post_errno;
264         }
265
266         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
267                            async_connect_connected, result);
268         if (fde == NULL) {
269                 state->sys_errno = ENOMEM;
270                 goto post_errno;
271         }
272         return result;
273
274  post_errno:
275         tevent_req_error(result, state->sys_errno);
276  done:
277         fcntl(fd, F_SETFL, state->old_sockflags);
278         return tevent_req_post(result, ev);
279 }
280
281 /**
282  * fde event handler for connect(2)
283  * @param[in] ev        The event context that sent us here
284  * @param[in] fde       The file descriptor event associated with the connect
285  * @param[in] flags     Indicate read/writeability of the socket
286  * @param[in] priv      private data, "struct async_req *" in this case
287  */
288
289 static void async_connect_connected(struct tevent_context *ev,
290                                     struct tevent_fd *fde, uint16_t flags,
291                                     void *priv)
292 {
293         struct tevent_req *req = talloc_get_type_abort(
294                 priv, struct tevent_req);
295         struct async_connect_state *state =
296                 tevent_req_data(req, struct async_connect_state);
297
298         /*
299          * Stevens, Network Programming says that if there's a
300          * successful connect, the socket is only writable. Upon an
301          * error, it's both readable and writable.
302          */
303         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
304             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
305                 int ret;
306
307                 ret = connect(state->fd,
308                               (struct sockaddr *)(void *)&state->address,
309                               state->address_len);
310                 if (ret == 0) {
311                         TALLOC_FREE(fde);
312                         tevent_req_done(req);
313                         return;
314                 }
315
316                 if (errno == EINPROGRESS) {
317                         /* Try again later, leave the fde around */
318                         return;
319                 }
320                 TALLOC_FREE(fde);
321                 tevent_req_error(req, errno);
322                 return;
323         }
324
325         state->sys_errno = 0;
326         tevent_req_done(req);
327 }
328
329 int async_connect_recv(struct tevent_req *req, int *perrno)
330 {
331         struct async_connect_state *state =
332                 tevent_req_data(req, struct async_connect_state);
333         int err;
334
335         fcntl(state->fd, F_SETFL, state->old_sockflags);
336
337         if (tevent_req_is_unix_error(req, &err)) {
338                 *perrno = err;
339                 return -1;
340         }
341
342         if (state->sys_errno == 0) {
343                 return 0;
344         }
345
346         *perrno = state->sys_errno;
347         return -1;
348 }
349
350 struct writev_state {
351         struct tevent_context *ev;
352         int fd;
353         struct iovec *iov;
354         int count;
355         size_t total_size;
356         uint16_t flags;
357 };
358
359 static void writev_trigger(struct tevent_req *req, void *private_data);
360 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
361                            uint16_t flags, void *private_data);
362
363 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
364                                struct tevent_queue *queue, int fd,
365                                bool err_on_readability,
366                                struct iovec *iov, int count)
367 {
368         struct tevent_req *req;
369         struct writev_state *state;
370
371         req = tevent_req_create(mem_ctx, &state, struct writev_state);
372         if (req == NULL) {
373                 return NULL;
374         }
375         state->ev = ev;
376         state->fd = fd;
377         state->total_size = 0;
378         state->count = count;
379         state->iov = (struct iovec *)talloc_memdup(
380                 state, iov, sizeof(struct iovec) * count);
381         if (state->iov == NULL) {
382                 goto fail;
383         }
384         state->flags = TEVENT_FD_WRITE;
385         if (err_on_readability) {
386                 state->flags |= TEVENT_FD_READ;
387         }
388
389         if (queue == NULL) {
390                 struct tevent_fd *fde;
391                 fde = tevent_add_fd(state->ev, state, state->fd,
392                                     state->flags, writev_handler, req);
393                 if (tevent_req_nomem(fde, req)) {
394                         return tevent_req_post(req, ev);
395                 }
396                 return req;
397         }
398
399         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
400                 goto fail;
401         }
402         return req;
403  fail:
404         TALLOC_FREE(req);
405         return NULL;
406 }
407
408 static void writev_trigger(struct tevent_req *req, void *private_data)
409 {
410         struct writev_state *state = tevent_req_data(req, struct writev_state);
411         struct tevent_fd *fde;
412
413         fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
414                             writev_handler, req);
415         if (fde == NULL) {
416                 tevent_req_error(req, ENOMEM);
417         }
418 }
419
420 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
421                            uint16_t flags, void *private_data)
422 {
423         struct tevent_req *req = talloc_get_type_abort(
424                 private_data, struct tevent_req);
425         struct writev_state *state =
426                 tevent_req_data(req, struct writev_state);
427         size_t to_write, written;
428         int i;
429
430         to_write = 0;
431
432         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
433                 tevent_req_error(req, EPIPE);
434                 return;
435         }
436
437         for (i=0; i<state->count; i++) {
438                 to_write += state->iov[i].iov_len;
439         }
440
441         written = writev(state->fd, state->iov, state->count);
442         if ((written == -1) && (errno == EINTR)) {
443                 /* retry */
444                 return;
445         }
446         if (written == -1) {
447                 tevent_req_error(req, errno);
448                 return;
449         }
450         if (written == 0) {
451                 tevent_req_error(req, EPIPE);
452                 return;
453         }
454         state->total_size += written;
455
456         if (written == to_write) {
457                 tevent_req_done(req);
458                 return;
459         }
460
461         /*
462          * We've written less than we were asked to, drop stuff from
463          * state->iov.
464          */
465
466         while (written > 0) {
467                 if (written < state->iov[0].iov_len) {
468                         state->iov[0].iov_base =
469                                 (char *)state->iov[0].iov_base + written;
470                         state->iov[0].iov_len -= written;
471                         break;
472                 }
473                 written -= state->iov[0].iov_len;
474                 state->iov += 1;
475                 state->count -= 1;
476         }
477 }
478
479 ssize_t writev_recv(struct tevent_req *req, int *perrno)
480 {
481         struct writev_state *state =
482                 tevent_req_data(req, struct writev_state);
483
484         if (tevent_req_is_unix_error(req, perrno)) {
485                 return -1;
486         }
487         return state->total_size;
488 }
489
490 struct read_packet_state {
491         int fd;
492         uint8_t *buf;
493         size_t nread;
494         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
495         void *private_data;
496 };
497
498 static void read_packet_handler(struct tevent_context *ev,
499                                 struct tevent_fd *fde,
500                                 uint16_t flags, void *private_data);
501
502 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
503                                     struct tevent_context *ev,
504                                     int fd, size_t initial,
505                                     ssize_t (*more)(uint8_t *buf,
506                                                     size_t buflen,
507                                                     void *private_data),
508                                     void *private_data)
509 {
510         struct tevent_req *result;
511         struct read_packet_state *state;
512         struct tevent_fd *fde;
513
514         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
515         if (result == NULL) {
516                 return NULL;
517         }
518         state->fd = fd;
519         state->nread = 0;
520         state->more = more;
521         state->private_data = private_data;
522
523         state->buf = talloc_array(state, uint8_t, initial);
524         if (state->buf == NULL) {
525                 goto fail;
526         }
527
528         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
529                             result);
530         if (fde == NULL) {
531                 goto fail;
532         }
533         return result;
534  fail:
535         TALLOC_FREE(result);
536         return NULL;
537 }
538
539 static void read_packet_handler(struct tevent_context *ev,
540                                 struct tevent_fd *fde,
541                                 uint16_t flags, void *private_data)
542 {
543         struct tevent_req *req = talloc_get_type_abort(
544                 private_data, struct tevent_req);
545         struct read_packet_state *state =
546                 tevent_req_data(req, struct read_packet_state);
547         size_t total = talloc_get_size(state->buf);
548         ssize_t nread, more;
549         uint8_t *tmp;
550
551         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
552                      0);
553         if ((nread == -1) && (errno == EINTR)) {
554                 /* retry */
555                 return;
556         }
557         if (nread == -1) {
558                 tevent_req_error(req, errno);
559                 return;
560         }
561         if (nread == 0) {
562                 tevent_req_error(req, EPIPE);
563                 return;
564         }
565
566         state->nread += nread;
567         if (state->nread < total) {
568                 /* Come back later */
569                 return;
570         }
571
572         /*
573          * We got what was initially requested. See if "more" asks for -- more.
574          */
575         if (state->more == NULL) {
576                 /* Nobody to ask, this is a async read_data */
577                 tevent_req_done(req);
578                 return;
579         }
580
581         more = state->more(state->buf, total, state->private_data);
582         if (more == -1) {
583                 /* We got an invalid packet, tell the caller */
584                 tevent_req_error(req, EIO);
585                 return;
586         }
587         if (more == 0) {
588                 /* We're done, full packet received */
589                 tevent_req_done(req);
590                 return;
591         }
592
593         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
594         if (tevent_req_nomem(tmp, req)) {
595                 return;
596         }
597         state->buf = tmp;
598 }
599
600 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
601                          uint8_t **pbuf, int *perrno)
602 {
603         struct read_packet_state *state =
604                 tevent_req_data(req, struct read_packet_state);
605
606         if (tevent_req_is_unix_error(req, perrno)) {
607                 return -1;
608         }
609         *pbuf = talloc_move(mem_ctx, &state->buf);
610         return talloc_get_size(*pbuf);
611 }