Merge branch 'master' of git://git.samba.org/samba
[samba.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30
31 /* Note: lib/util/ is currently GPL */
32 #include "lib/util/tevent_unix.h"
33 #include "lib/util/util.h"
34
35 #ifndef TALLOC_FREE
36 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
37 #endif
38
39 struct async_send_state {
40         int fd;
41         const void *buf;
42         size_t len;
43         int flags;
44         ssize_t sent;
45 };
46
47 static void async_send_handler(struct tevent_context *ev,
48                                struct tevent_fd *fde,
49                                uint16_t flags, void *private_data);
50
51 struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
52                                    struct tevent_context *ev,
53                                    int fd, const void *buf, size_t len,
54                                    int flags)
55 {
56         struct tevent_req *result;
57         struct async_send_state *state;
58         struct tevent_fd *fde;
59
60         result = tevent_req_create(mem_ctx, &state, struct async_send_state);
61         if (result == NULL) {
62                 return result;
63         }
64         state->fd = fd;
65         state->buf = buf;
66         state->len = len;
67         state->flags = flags;
68
69         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
70                             result);
71         if (fde == NULL) {
72                 TALLOC_FREE(result);
73                 return NULL;
74         }
75         return result;
76 }
77
78 static void async_send_handler(struct tevent_context *ev,
79                                struct tevent_fd *fde,
80                                uint16_t flags, void *private_data)
81 {
82         struct tevent_req *req = talloc_get_type_abort(
83                 private_data, struct tevent_req);
84         struct async_send_state *state =
85                 tevent_req_data(req, struct async_send_state);
86
87         state->sent = send(state->fd, state->buf, state->len, state->flags);
88         if ((state->sent == -1) && (errno == EINTR)) {
89                 /* retry */
90                 return;
91         }
92         if (state->sent == -1) {
93                 tevent_req_error(req, errno);
94                 return;
95         }
96         tevent_req_done(req);
97 }
98
99 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
100 {
101         struct async_send_state *state =
102                 tevent_req_data(req, struct async_send_state);
103
104         if (tevent_req_is_unix_error(req, perrno)) {
105                 return -1;
106         }
107         return state->sent;
108 }
109
110 struct async_recv_state {
111         int fd;
112         void *buf;
113         size_t len;
114         int flags;
115         ssize_t received;
116 };
117
118 static void async_recv_handler(struct tevent_context *ev,
119                                struct tevent_fd *fde,
120                                uint16_t flags, void *private_data);
121
122 struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
123                                    struct tevent_context *ev,
124                                    int fd, void *buf, size_t len, int flags)
125 {
126         struct tevent_req *result;
127         struct async_recv_state *state;
128         struct tevent_fd *fde;
129
130         result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
131         if (result == NULL) {
132                 return result;
133         }
134         state->fd = fd;
135         state->buf = buf;
136         state->len = len;
137         state->flags = flags;
138
139         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
140                             result);
141         if (fde == NULL) {
142                 TALLOC_FREE(result);
143                 return NULL;
144         }
145         return result;
146 }
147
148 static void async_recv_handler(struct tevent_context *ev,
149                                struct tevent_fd *fde,
150                                uint16_t flags, void *private_data)
151 {
152         struct tevent_req *req = talloc_get_type_abort(
153                 private_data, struct tevent_req);
154         struct async_recv_state *state =
155                 tevent_req_data(req, struct async_recv_state);
156
157         state->received = recv(state->fd, state->buf, state->len,
158                                state->flags);
159         if ((state->received == -1) && (errno == EINTR)) {
160                 /* retry */
161                 return;
162         }
163         if (state->received == 0) {
164                 tevent_req_error(req, EPIPE);
165                 return;
166         }
167         if (state->received == -1) {
168                 tevent_req_error(req, errno);
169                 return;
170         }
171         tevent_req_done(req);
172 }
173
174 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
175 {
176         struct async_recv_state *state =
177                 tevent_req_data(req, struct async_recv_state);
178
179         if (tevent_req_is_unix_error(req, perrno)) {
180                 return -1;
181         }
182         return state->received;
183 }
184
185 struct async_connect_state {
186         int fd;
187         int result;
188         int sys_errno;
189         long old_sockflags;
190         socklen_t address_len;
191         struct sockaddr_storage address;
192 };
193
194 static void async_connect_connected(struct tevent_context *ev,
195                                     struct tevent_fd *fde, uint16_t flags,
196                                     void *priv);
197
198 /**
199  * @brief async version of connect(2)
200  * @param[in] mem_ctx   The memory context to hang the result off
201  * @param[in] ev        The event context to work from
202  * @param[in] fd        The socket to recv from
203  * @param[in] address   Where to connect?
204  * @param[in] address_len Length of *address
205  * @retval The async request
206  *
207  * This function sets the socket into non-blocking state to be able to call
208  * connect in an async state. This will be reset when the request is finished.
209  */
210
211 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
212                                       struct tevent_context *ev,
213                                       int fd, const struct sockaddr *address,
214                                       socklen_t address_len)
215 {
216         struct tevent_req *result;
217         struct async_connect_state *state;
218         struct tevent_fd *fde;
219
220         result = tevent_req_create(
221                 mem_ctx, &state, struct async_connect_state);
222         if (result == NULL) {
223                 return NULL;
224         }
225
226         /**
227          * We have to set the socket to nonblocking for async connect(2). Keep
228          * the old sockflags around.
229          */
230
231         state->fd = fd;
232         state->sys_errno = 0;
233
234         state->old_sockflags = fcntl(fd, F_GETFL, 0);
235         if (state->old_sockflags == -1) {
236                 goto post_errno;
237         }
238
239         state->address_len = address_len;
240         if (address_len > sizeof(state->address)) {
241                 errno = EINVAL;
242                 goto post_errno;
243         }
244         memcpy(&state->address, address, address_len);
245
246         set_blocking(fd, false);
247
248         state->result = connect(fd, address, address_len);
249         if (state->result == 0) {
250                 tevent_req_done(result);
251                 goto done;
252         }
253
254         /**
255          * A number of error messages show that something good is progressing
256          * and that we have to wait for readability.
257          *
258          * If none of them are present, bail out.
259          */
260
261         if (!(errno == EINPROGRESS || errno == EALREADY ||
262 #ifdef EISCONN
263               errno == EISCONN ||
264 #endif
265               errno == EAGAIN || errno == EINTR)) {
266                 state->sys_errno = errno;
267                 goto post_errno;
268         }
269
270         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
271                            async_connect_connected, result);
272         if (fde == NULL) {
273                 state->sys_errno = ENOMEM;
274                 goto post_errno;
275         }
276         return result;
277
278  post_errno:
279         tevent_req_error(result, state->sys_errno);
280  done:
281         fcntl(fd, F_SETFL, state->old_sockflags);
282         return tevent_req_post(result, ev);
283 }
284
285 /**
286  * fde event handler for connect(2)
287  * @param[in] ev        The event context that sent us here
288  * @param[in] fde       The file descriptor event associated with the connect
289  * @param[in] flags     Indicate read/writeability of the socket
290  * @param[in] priv      private data, "struct async_req *" in this case
291  */
292
293 static void async_connect_connected(struct tevent_context *ev,
294                                     struct tevent_fd *fde, uint16_t flags,
295                                     void *priv)
296 {
297         struct tevent_req *req = talloc_get_type_abort(
298                 priv, struct tevent_req);
299         struct async_connect_state *state =
300                 tevent_req_data(req, struct async_connect_state);
301
302         /*
303          * Stevens, Network Programming says that if there's a
304          * successful connect, the socket is only writable. Upon an
305          * error, it's both readable and writable.
306          */
307         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
308             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
309                 int ret;
310
311                 ret = connect(state->fd,
312                               (struct sockaddr *)(void *)&state->address,
313                               state->address_len);
314                 if (ret == 0) {
315                         TALLOC_FREE(fde);
316                         tevent_req_done(req);
317                         return;
318                 }
319
320                 if (errno == EINPROGRESS) {
321                         /* Try again later, leave the fde around */
322                         return;
323                 }
324                 TALLOC_FREE(fde);
325                 tevent_req_error(req, errno);
326                 return;
327         }
328
329         state->sys_errno = 0;
330         tevent_req_done(req);
331 }
332
333 int async_connect_recv(struct tevent_req *req, int *perrno)
334 {
335         struct async_connect_state *state =
336                 tevent_req_data(req, struct async_connect_state);
337         int err;
338
339         fcntl(state->fd, F_SETFL, state->old_sockflags);
340
341         if (tevent_req_is_unix_error(req, &err)) {
342                 *perrno = err;
343                 return -1;
344         }
345
346         if (state->sys_errno == 0) {
347                 return 0;
348         }
349
350         *perrno = state->sys_errno;
351         return -1;
352 }
353
354 struct writev_state {
355         struct tevent_context *ev;
356         int fd;
357         struct iovec *iov;
358         int count;
359         size_t total_size;
360         uint16_t flags;
361 };
362
363 static void writev_trigger(struct tevent_req *req, void *private_data);
364 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
365                            uint16_t flags, void *private_data);
366
367 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
368                                struct tevent_queue *queue, int fd,
369                                bool err_on_readability,
370                                struct iovec *iov, int count)
371 {
372         struct tevent_req *req;
373         struct writev_state *state;
374
375         req = tevent_req_create(mem_ctx, &state, struct writev_state);
376         if (req == NULL) {
377                 return NULL;
378         }
379         state->ev = ev;
380         state->fd = fd;
381         state->total_size = 0;
382         state->count = count;
383         state->iov = (struct iovec *)talloc_memdup(
384                 state, iov, sizeof(struct iovec) * count);
385         if (state->iov == NULL) {
386                 goto fail;
387         }
388         state->flags = TEVENT_FD_WRITE;
389         if (err_on_readability) {
390                 state->flags |= TEVENT_FD_READ;
391         }
392
393         if (queue == NULL) {
394                 struct tevent_fd *fde;
395                 fde = tevent_add_fd(state->ev, state, state->fd,
396                                     state->flags, writev_handler, req);
397                 if (tevent_req_nomem(fde, req)) {
398                         return tevent_req_post(req, ev);
399                 }
400                 return req;
401         }
402
403         if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
404                 goto fail;
405         }
406         return req;
407  fail:
408         TALLOC_FREE(req);
409         return NULL;
410 }
411
412 static void writev_trigger(struct tevent_req *req, void *private_data)
413 {
414         struct writev_state *state = tevent_req_data(req, struct writev_state);
415         struct tevent_fd *fde;
416
417         fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
418                             writev_handler, req);
419         if (fde == NULL) {
420                 tevent_req_error(req, ENOMEM);
421         }
422 }
423
424 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
425                            uint16_t flags, void *private_data)
426 {
427         struct tevent_req *req = talloc_get_type_abort(
428                 private_data, struct tevent_req);
429         struct writev_state *state =
430                 tevent_req_data(req, struct writev_state);
431         size_t to_write, written;
432         int i;
433
434         to_write = 0;
435
436         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
437                 tevent_req_error(req, EPIPE);
438                 return;
439         }
440
441         for (i=0; i<state->count; i++) {
442                 to_write += state->iov[i].iov_len;
443         }
444
445         written = writev(state->fd, state->iov, state->count);
446         if ((written == -1) && (errno == EINTR)) {
447                 /* retry */
448                 return;
449         }
450         if (written == -1) {
451                 tevent_req_error(req, errno);
452                 return;
453         }
454         if (written == 0) {
455                 tevent_req_error(req, EPIPE);
456                 return;
457         }
458         state->total_size += written;
459
460         if (written == to_write) {
461                 tevent_req_done(req);
462                 return;
463         }
464
465         /*
466          * We've written less than we were asked to, drop stuff from
467          * state->iov.
468          */
469
470         while (written > 0) {
471                 if (written < state->iov[0].iov_len) {
472                         state->iov[0].iov_base =
473                                 (char *)state->iov[0].iov_base + written;
474                         state->iov[0].iov_len -= written;
475                         break;
476                 }
477                 written -= state->iov[0].iov_len;
478                 state->iov += 1;
479                 state->count -= 1;
480         }
481 }
482
483 ssize_t writev_recv(struct tevent_req *req, int *perrno)
484 {
485         struct writev_state *state =
486                 tevent_req_data(req, struct writev_state);
487
488         if (tevent_req_is_unix_error(req, perrno)) {
489                 return -1;
490         }
491         return state->total_size;
492 }
493
494 struct read_packet_state {
495         int fd;
496         uint8_t *buf;
497         size_t nread;
498         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
499         void *private_data;
500 };
501
502 static void read_packet_handler(struct tevent_context *ev,
503                                 struct tevent_fd *fde,
504                                 uint16_t flags, void *private_data);
505
506 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
507                                     struct tevent_context *ev,
508                                     int fd, size_t initial,
509                                     ssize_t (*more)(uint8_t *buf,
510                                                     size_t buflen,
511                                                     void *private_data),
512                                     void *private_data)
513 {
514         struct tevent_req *result;
515         struct read_packet_state *state;
516         struct tevent_fd *fde;
517
518         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
519         if (result == NULL) {
520                 return NULL;
521         }
522         state->fd = fd;
523         state->nread = 0;
524         state->more = more;
525         state->private_data = private_data;
526
527         state->buf = talloc_array(state, uint8_t, initial);
528         if (state->buf == NULL) {
529                 goto fail;
530         }
531
532         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
533                             result);
534         if (fde == NULL) {
535                 goto fail;
536         }
537         return result;
538  fail:
539         TALLOC_FREE(result);
540         return NULL;
541 }
542
543 static void read_packet_handler(struct tevent_context *ev,
544                                 struct tevent_fd *fde,
545                                 uint16_t flags, void *private_data)
546 {
547         struct tevent_req *req = talloc_get_type_abort(
548                 private_data, struct tevent_req);
549         struct read_packet_state *state =
550                 tevent_req_data(req, struct read_packet_state);
551         size_t total = talloc_get_size(state->buf);
552         ssize_t nread, more;
553         uint8_t *tmp;
554
555         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
556                      0);
557         if ((nread == -1) && (errno == EINTR)) {
558                 /* retry */
559                 return;
560         }
561         if (nread == -1) {
562                 tevent_req_error(req, errno);
563                 return;
564         }
565         if (nread == 0) {
566                 tevent_req_error(req, EPIPE);
567                 return;
568         }
569
570         state->nread += nread;
571         if (state->nread < total) {
572                 /* Come back later */
573                 return;
574         }
575
576         /*
577          * We got what was initially requested. See if "more" asks for -- more.
578          */
579         if (state->more == NULL) {
580                 /* Nobody to ask, this is a async read_data */
581                 tevent_req_done(req);
582                 return;
583         }
584
585         more = state->more(state->buf, total, state->private_data);
586         if (more == -1) {
587                 /* We got an invalid packet, tell the caller */
588                 tevent_req_error(req, EIO);
589                 return;
590         }
591         if (more == 0) {
592                 /* We're done, full packet received */
593                 tevent_req_done(req);
594                 return;
595         }
596
597         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
598         if (tevent_req_nomem(tmp, req)) {
599                 return;
600         }
601         state->buf = tmp;
602 }
603
604 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
605                          uint8_t **pbuf, int *perrno)
606 {
607         struct read_packet_state *state =
608                 tevent_req_data(req, struct read_packet_state);
609
610         if (tevent_req_is_unix_error(req, perrno)) {
611                 return -1;
612         }
613         *pbuf = talloc_move(mem_ctx, &state->buf);
614         return talloc_get_size(*pbuf);
615 }