Add more conventional async_recv
[kai/samba-autobuild/.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/talloc/talloc.h"
22 #include "lib/tevent/tevent.h"
23 #include "lib/async_req/async_req.h"
24 #include "lib/async_req/async_sock.h"
25 #include "lib/util/tevent_unix.h"
26 #include <fcntl.h>
27
28 #ifndef TALLOC_FREE
29 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
30 #endif
31
32 /**
33  * Discriminator for async_syscall_state
34  */
35 enum async_syscall_type {
36         ASYNC_SYSCALL_SEND,
37         ASYNC_SYSCALL_RECV,
38 };
39
40 /**
41  * Holder for syscall arguments and the result
42  */
43
44 struct async_syscall_state {
45         enum async_syscall_type syscall_type;
46         struct tevent_fd *fde;
47
48         union {
49                 struct param_send {
50                         int fd;
51                         const void *buffer;
52                         size_t length;
53                         int flags;
54                 } param_send;
55                 struct param_recv {
56                         int fd;
57                         void *buffer;
58                         size_t length;
59                         int flags;
60                 } param_recv;
61         } param;
62
63         union {
64                 ssize_t result_ssize_t;
65                 size_t result_size_t;
66                 int result_int;
67         } result;
68         int sys_errno;
69 };
70
71 /**
72  * @brief Map async_req states to unix-style errnos
73  * @param[in]  req      The async req to get the state from
74  * @param[out] err      Pointer to take the unix-style errno
75  *
76  * @return true if the async_req is in an error state, false otherwise
77  */
78
79 bool async_req_is_errno(struct async_req *req, int *err)
80 {
81         enum async_req_state state;
82         uint64_t error;
83
84         if (!async_req_is_error(req, &state, &error)) {
85                 return false;
86         }
87
88         switch (state) {
89         case ASYNC_REQ_USER_ERROR:
90                 *err = (int)error;
91                 break;
92         case ASYNC_REQ_TIMED_OUT:
93 #ifdef ETIMEDOUT
94                 *err = ETIMEDOUT;
95 #else
96                 *err = EAGAIN;
97 #endif
98                 break;
99         case ASYNC_REQ_NO_MEMORY:
100                 *err = ENOMEM;
101                 break;
102         default:
103                 *err = EIO;
104                 break;
105         }
106         return true;
107 }
108
109 int async_req_simple_recv_errno(struct async_req *req)
110 {
111         int err;
112
113         if (async_req_is_errno(req, &err)) {
114                 return err;
115         }
116
117         return 0;
118 }
119
120 /**
121  * @brief Create a new async syscall req
122  * @param[in] mem_ctx   The memory context to hang the result off
123  * @param[in] ev        The event context to work from
124  * @param[in] type      Which syscall will this be
125  * @param[in] pstate    Where to put the newly created private_data state
126  * @retval The new request
127  *
128  * This is a helper function to prepare a new struct async_req with an
129  * associated struct async_syscall_state. The async_syscall_state will be put
130  * into the async_req as private_data.
131  */
132
133 static struct async_req *async_syscall_new(TALLOC_CTX *mem_ctx,
134                                            struct tevent_context *ev,
135                                            enum async_syscall_type type,
136                                            struct async_syscall_state **pstate)
137 {
138         struct async_req *result;
139         struct async_syscall_state *state;
140
141         if (!async_req_setup(mem_ctx, &result, &state,
142                              struct async_syscall_state)) {
143                 return NULL;
144         }
145         state->syscall_type = type;
146
147         result->private_data = state;
148
149         *pstate = state;
150
151         return result;
152 }
153
154 /**
155  * @brief Create a new async syscall req based on a fd
156  * @param[in] mem_ctx   The memory context to hang the result off
157  * @param[in] ev        The event context to work from
158  * @param[in] type      Which syscall will this be
159  * @param[in] fd        The file descriptor we work on
160  * @param[in] fde_flags TEVENT_FD_READ/WRITE -- what are we interested in?
161  * @param[in] fde_cb    The callback function for the file descriptor event
162  * @param[in] pstate    Where to put the newly created private_data state
163  * @retval The new request
164  *
165  * This is a helper function to prepare a new struct async_req with an
166  * associated struct async_syscall_state and an associated file descriptor
167  * event.
168  */
169
170 static struct async_req *async_fde_syscall_new(
171         TALLOC_CTX *mem_ctx,
172         struct tevent_context *ev,
173         enum async_syscall_type type,
174         int fd,
175         uint16_t fde_flags,
176         void (*fde_cb)(struct tevent_context *ev,
177                        struct tevent_fd *fde, uint16_t flags,
178                        void *priv),
179         struct async_syscall_state **pstate)
180 {
181         struct async_req *result;
182         struct async_syscall_state *state;
183
184         result = async_syscall_new(mem_ctx, ev, type, &state);
185         if (result == NULL) {
186                 return NULL;
187         }
188
189         state->fde = tevent_add_fd(ev, state, fd, fde_flags, fde_cb, result);
190         if (state->fde == NULL) {
191                 TALLOC_FREE(result);
192                 return NULL;
193         }
194         *pstate = state;
195         return result;
196 }
197
198 /**
199  * Retrieve a ssize_t typed result from an async syscall
200  * @param[in] req       The syscall that has just finished
201  * @param[out] perrno   Where to put the syscall's errno
202  * @retval The return value from the asynchronously called syscall
203  */
204
205 ssize_t async_syscall_result_ssize_t(struct async_req *req, int *perrno)
206 {
207         struct async_syscall_state *state = talloc_get_type_abort(
208                 req->private_data, struct async_syscall_state);
209
210         *perrno = state->sys_errno;
211         return state->result.result_ssize_t;
212 }
213
214 /**
215  * Retrieve a size_t typed result from an async syscall
216  * @param[in] req       The syscall that has just finished
217  * @param[out] perrno   Where to put the syscall's errno
218  * @retval The return value from the asynchronously called syscall
219  */
220
221 size_t async_syscall_result_size_t(struct async_req *req, int *perrno)
222 {
223         struct async_syscall_state *state = talloc_get_type_abort(
224                 req->private_data, struct async_syscall_state);
225
226         *perrno = state->sys_errno;
227         return state->result.result_size_t;
228 }
229
230 /**
231  * Retrieve a int typed result from an async syscall
232  * @param[in] req       The syscall that has just finished
233  * @param[out] perrno   Where to put the syscall's errno
234  * @retval The return value from the asynchronously called syscall
235  */
236
237 int async_syscall_result_int(struct async_req *req, int *perrno)
238 {
239         struct async_syscall_state *state = talloc_get_type_abort(
240                 req->private_data, struct async_syscall_state);
241
242         *perrno = state->sys_errno;
243         return state->result.result_int;
244 }
245
246 /**
247  * fde event handler for the "send" syscall
248  * @param[in] ev        The event context that sent us here
249  * @param[in] fde       The file descriptor event associated with the send
250  * @param[in] flags     Can only be TEVENT_FD_WRITE here
251  * @param[in] priv      private data, "struct async_req *" in this case
252  */
253
254 static void async_send_callback(struct tevent_context *ev,
255                                 struct tevent_fd *fde, uint16_t flags,
256                                 void *priv)
257 {
258         struct async_req *req = talloc_get_type_abort(
259                 priv, struct async_req);
260         struct async_syscall_state *state = talloc_get_type_abort(
261                 req->private_data, struct async_syscall_state);
262         struct param_send *p = &state->param.param_send;
263
264         if (state->syscall_type != ASYNC_SYSCALL_SEND) {
265                 async_req_error(req, EIO);
266                 return;
267         }
268
269         state->result.result_ssize_t = send(p->fd, p->buffer, p->length,
270                                             p->flags);
271         state->sys_errno = errno;
272
273         TALLOC_FREE(state->fde);
274
275         async_req_done(req);
276 }
277
278 /**
279  * Async version of send(2)
280  * @param[in] mem_ctx   The memory context to hang the result off
281  * @param[in] ev        The event context to work from
282  * @param[in] fd        The socket to send to
283  * @param[in] buffer    The buffer to send
284  * @param[in] length    How many bytes to send
285  * @param[in] flags     flags passed to send(2)
286  *
287  * This function is a direct counterpart of send(2)
288  */
289
290 struct async_req *async_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
291                              int fd, const void *buffer, size_t length,
292                              int flags)
293 {
294         struct async_req *result;
295         struct async_syscall_state *state;
296
297         result = async_fde_syscall_new(
298                 mem_ctx, ev, ASYNC_SYSCALL_SEND,
299                 fd, TEVENT_FD_WRITE, async_send_callback,
300                 &state);
301         if (result == NULL) {
302                 return NULL;
303         }
304
305         state->param.param_send.fd = fd;
306         state->param.param_send.buffer = buffer;
307         state->param.param_send.length = length;
308         state->param.param_send.flags = flags;
309
310         return result;
311 }
312
313 /**
314  * fde event handler for the "recv" syscall
315  * @param[in] ev        The event context that sent us here
316  * @param[in] fde       The file descriptor event associated with the recv
317  * @param[in] flags     Can only be TEVENT_FD_READ here
318  * @param[in] priv      private data, "struct async_req *" in this case
319  */
320
321 static void async_recv_callback(struct tevent_context *ev,
322                                 struct tevent_fd *fde, uint16_t flags,
323                                 void *priv)
324 {
325         struct async_req *req = talloc_get_type_abort(
326                 priv, struct async_req);
327         struct async_syscall_state *state = talloc_get_type_abort(
328                 req->private_data, struct async_syscall_state);
329         struct param_recv *p = &state->param.param_recv;
330
331         if (state->syscall_type != ASYNC_SYSCALL_RECV) {
332                 async_req_error(req, EIO);
333                 return;
334         }
335
336         state->result.result_ssize_t = recv(p->fd, p->buffer, p->length,
337                                             p->flags);
338         state->sys_errno = errno;
339
340         TALLOC_FREE(state->fde);
341
342         async_req_done(req);
343 }
344
345 /**
346  * Async version of recv(2)
347  * @param[in] mem_ctx   The memory context to hang the result off
348  * @param[in] ev        The event context to work from
349  * @param[in] fd        The socket to recv from
350  * @param[in] buffer    The buffer to recv into
351  * @param[in] length    How many bytes to recv
352  * @param[in] flags     flags passed to recv(2)
353  *
354  * This function is a direct counterpart of recv(2)
355  */
356
357 struct async_req *async_recv(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
358                              int fd, void *buffer, size_t length,
359                              int flags)
360 {
361         struct async_req *result;
362         struct async_syscall_state *state;
363
364         result = async_fde_syscall_new(
365                 mem_ctx, ev, ASYNC_SYSCALL_RECV,
366                 fd, TEVENT_FD_READ, async_recv_callback,
367                 &state);
368
369         if (result == NULL) {
370                 return NULL;
371         }
372
373         state->param.param_recv.fd = fd;
374         state->param.param_recv.buffer = buffer;
375         state->param.param_recv.length = length;
376         state->param.param_recv.flags = flags;
377
378         return result;
379 }
380
381 struct async_send_state {
382         int fd;
383         const void *buf;
384         size_t len;
385         int flags;
386         ssize_t sent;
387 };
388
389 static void async_send_handler(struct tevent_context *ev,
390                                struct tevent_fd *fde,
391                                uint16_t flags, void *private_data);
392
393 struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
394                                    struct tevent_context *ev,
395                                    int fd, const void *buf, size_t len,
396                                    int flags)
397 {
398         struct tevent_req *result;
399         struct async_send_state *state;
400         struct tevent_fd *fde;
401
402         result = tevent_req_create(mem_ctx, &state, struct async_send_state);
403         if (result == NULL) {
404                 return result;
405         }
406         state->fd = fd;
407         state->buf = buf;
408         state->len = len;
409         state->flags = flags;
410
411         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
412                             result);
413         if (fde == NULL) {
414                 TALLOC_FREE(result);
415                 return NULL;
416         }
417         return result;
418 }
419
420 static void async_send_handler(struct tevent_context *ev,
421                                struct tevent_fd *fde,
422                                uint16_t flags, void *private_data)
423 {
424         struct tevent_req *req = talloc_get_type_abort(
425                 private_data, struct tevent_req);
426         struct async_send_state *state = talloc_get_type_abort(
427                 req->private_state, struct async_send_state);
428
429         state->sent = send(state->fd, state->buf, state->len, state->flags);
430         if (state->sent == -1) {
431                 tevent_req_error(req, errno);
432                 return;
433         }
434         tevent_req_done(req);
435 }
436
437 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
438 {
439         struct async_send_state *state = talloc_get_type_abort(
440                 req->private_state, struct async_send_state);
441
442         if (tevent_req_is_unix_error(req, perrno)) {
443                 return -1;
444         }
445         return state->sent;
446 }
447
448 struct async_recv_state {
449         int fd;
450         void *buf;
451         size_t len;
452         int flags;
453         ssize_t received;
454 };
455
456 static void async_recv_handler(struct tevent_context *ev,
457                                struct tevent_fd *fde,
458                                uint16_t flags, void *private_data);
459
460 struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
461                                    struct tevent_context *ev,
462                                    int fd, void *buf, size_t len, int flags)
463 {
464         struct tevent_req *result;
465         struct async_recv_state *state;
466         struct tevent_fd *fde;
467
468         result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
469         if (result == NULL) {
470                 return result;
471         }
472         state->fd = fd;
473         state->buf = buf;
474         state->len = len;
475         state->flags = flags;
476
477         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
478                             result);
479         if (fde == NULL) {
480                 TALLOC_FREE(result);
481                 return NULL;
482         }
483         return result;
484 }
485
486 static void async_recv_handler(struct tevent_context *ev,
487                                struct tevent_fd *fde,
488                                uint16_t flags, void *private_data)
489 {
490         struct tevent_req *req = talloc_get_type_abort(
491                 private_data, struct tevent_req);
492         struct async_recv_state *state = talloc_get_type_abort(
493                 req->private_state, struct async_recv_state);
494
495         state->received = recv(state->fd, state->buf, state->len,
496                                state->flags);
497         if (state->received == -1) {
498                 tevent_req_error(req, errno);
499                 return;
500         }
501         tevent_req_done(req);
502 }
503
504 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
505 {
506         struct async_recv_state *state = talloc_get_type_abort(
507                 req->private_state, struct async_recv_state);
508
509         if (tevent_req_is_unix_error(req, perrno)) {
510                 return -1;
511         }
512         return state->received;
513 }
514
515 struct async_connect_state {
516         int fd;
517         int result;
518         int sys_errno;
519         long old_sockflags;
520 };
521
522 static void async_connect_connected(struct tevent_context *ev,
523                                     struct tevent_fd *fde, uint16_t flags,
524                                     void *priv);
525
526 /**
527  * @brief async version of connect(2)
528  * @param[in] mem_ctx   The memory context to hang the result off
529  * @param[in] ev        The event context to work from
530  * @param[in] fd        The socket to recv from
531  * @param[in] address   Where to connect?
532  * @param[in] address_len Length of *address
533  * @retval The async request
534  *
535  * This function sets the socket into non-blocking state to be able to call
536  * connect in an async state. This will be reset when the request is finished.
537  */
538
539 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
540                                       struct tevent_context *ev,
541                                       int fd, const struct sockaddr *address,
542                                       socklen_t address_len)
543 {
544         struct tevent_req *result;
545         struct async_connect_state *state;
546         struct tevent_fd *fde;
547
548         result = tevent_req_create(
549                 mem_ctx, &state, struct async_connect_state);
550         if (result == NULL) {
551                 return NULL;
552         }
553
554         /**
555          * We have to set the socket to nonblocking for async connect(2). Keep
556          * the old sockflags around.
557          */
558
559         state->fd = fd;
560         state->sys_errno = 0;
561
562         state->old_sockflags = fcntl(fd, F_GETFL, 0);
563         if (state->old_sockflags == -1) {
564                 goto post_errno;
565         }
566
567         set_blocking(fd, false);
568
569         state->result = connect(fd, address, address_len);
570         if (state->result == 0) {
571                 errno = 0;
572                 goto post_errno;
573         }
574
575         /**
576          * A number of error messages show that something good is progressing
577          * and that we have to wait for readability.
578          *
579          * If none of them are present, bail out.
580          */
581
582         if (!(errno == EINPROGRESS || errno == EALREADY ||
583 #ifdef EISCONN
584               errno == EISCONN ||
585 #endif
586               errno == EAGAIN || errno == EINTR)) {
587                 goto post_errno;
588         }
589
590         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
591                            async_connect_connected, result);
592         if (fde == NULL) {
593                 errno = ENOMEM;
594                 goto post_errno;
595         }
596         return result;
597
598  post_errno:
599         state->sys_errno = errno;
600         fcntl(fd, F_SETFL, state->old_sockflags);
601         if (state->sys_errno == 0) {
602                 tevent_req_done(result);
603         } else {
604                 tevent_req_error(result, state->sys_errno);
605         }
606         return tevent_req_post(result, ev);
607 }
608
609 /**
610  * fde event handler for connect(2)
611  * @param[in] ev        The event context that sent us here
612  * @param[in] fde       The file descriptor event associated with the connect
613  * @param[in] flags     Indicate read/writeability of the socket
614  * @param[in] priv      private data, "struct async_req *" in this case
615  */
616
617 static void async_connect_connected(struct tevent_context *ev,
618                                     struct tevent_fd *fde, uint16_t flags,
619                                     void *priv)
620 {
621         struct tevent_req *req = talloc_get_type_abort(
622                 priv, struct tevent_req);
623         struct async_connect_state *state = talloc_get_type_abort(
624                 req->private_state, struct async_connect_state);
625
626         TALLOC_FREE(fde);
627
628         /*
629          * Stevens, Network Programming says that if there's a
630          * successful connect, the socket is only writable. Upon an
631          * error, it's both readable and writable.
632          */
633         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
634             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
635                 int sockerr;
636                 socklen_t err_len = sizeof(sockerr);
637
638                 if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
639                                (void *)&sockerr, &err_len) == 0) {
640                         errno = sockerr;
641                 }
642
643                 state->sys_errno = errno;
644
645                 DEBUG(10, ("connect returned %s\n", strerror(errno)));
646
647                 fcntl(state->fd, F_SETFL, state->old_sockflags);
648                 tevent_req_error(req, state->sys_errno);
649                 return;
650         }
651
652         state->sys_errno = 0;
653         tevent_req_done(req);
654 }
655
656 int async_connect_recv(struct tevent_req *req, int *perrno)
657 {
658         struct async_connect_state *state = talloc_get_type_abort(
659                 req->private_state, struct async_connect_state);
660         int err;
661
662         fcntl(state->fd, F_SETFL, state->old_sockflags);
663
664         if (tevent_req_is_unix_error(req, &err)) {
665                 *perrno = err;
666                 return -1;
667         }
668
669         if (state->sys_errno == 0) {
670                 return 0;
671         }
672
673         *perrno = state->sys_errno;
674         return -1;
675 }
676
677 struct writev_state {
678         struct tevent_context *ev;
679         int fd;
680         struct iovec *iov;
681         int count;
682         size_t total_size;
683 };
684
685 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
686                            uint16_t flags, void *private_data);
687
688 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
689                                int fd, struct iovec *iov, int count)
690 {
691         struct tevent_req *result;
692         struct writev_state *state;
693         struct tevent_fd *fde;
694
695         result = tevent_req_create(mem_ctx, &state, struct writev_state);
696         if (result == NULL) {
697                 return NULL;
698         }
699         state->ev = ev;
700         state->fd = fd;
701         state->total_size = 0;
702         state->count = count;
703         state->iov = (struct iovec *)talloc_memdup(
704                 state, iov, sizeof(struct iovec) * count);
705         if (state->iov == NULL) {
706                 goto fail;
707         }
708
709         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, writev_handler,
710                             result);
711         if (fde == NULL) {
712                 goto fail;
713         }
714         return result;
715
716  fail:
717         TALLOC_FREE(result);
718         return NULL;
719 }
720
721 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
722                            uint16_t flags, void *private_data)
723 {
724         struct tevent_req *req = talloc_get_type_abort(
725                 private_data, struct tevent_req);
726         struct writev_state *state = talloc_get_type_abort(
727                 req->private_state, struct writev_state);
728         size_t to_write, written;
729         int i;
730
731         to_write = 0;
732
733         for (i=0; i<state->count; i++) {
734                 to_write += state->iov[i].iov_len;
735         }
736
737         written = sys_writev(state->fd, state->iov, state->count);
738         if (written == -1) {
739                 tevent_req_error(req, errno);
740                 return;
741         }
742         if (written == 0) {
743                 tevent_req_error(req, EPIPE);
744                 return;
745         }
746         state->total_size += written;
747
748         if (written == to_write) {
749                 tevent_req_done(req);
750                 return;
751         }
752
753         /*
754          * We've written less than we were asked to, drop stuff from
755          * state->iov.
756          */
757
758         while (written > 0) {
759                 if (written < state->iov[0].iov_len) {
760                         state->iov[0].iov_base =
761                                 (char *)state->iov[0].iov_base + written;
762                         state->iov[0].iov_len -= written;
763                         break;
764                 }
765                 written = state->iov[0].iov_len;
766                 state->iov += 1;
767                 state->count -= 1;
768         }
769 }
770
771 ssize_t writev_recv(struct tevent_req *req, int *perrno)
772 {
773         struct writev_state *state = talloc_get_type_abort(
774                 req->private_state, struct writev_state);
775
776         if (tevent_req_is_unix_error(req, perrno)) {
777                 return -1;
778         }
779         return state->total_size;
780 }
781
782 struct read_packet_state {
783         int fd;
784         uint8_t *buf;
785         size_t nread;
786         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
787         void *private_data;
788 };
789
790 static void read_packet_handler(struct tevent_context *ev,
791                                 struct tevent_fd *fde,
792                                 uint16_t flags, void *private_data);
793
794 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
795                                     struct tevent_context *ev,
796                                     int fd, size_t initial,
797                                     ssize_t (*more)(uint8_t *buf,
798                                                     size_t buflen,
799                                                     void *private_data),
800                                     void *private_data)
801 {
802         struct tevent_req *result;
803         struct read_packet_state *state;
804         struct tevent_fd *fde;
805
806         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
807         if (result == NULL) {
808                 return NULL;
809         }
810         state->fd = fd;
811         state->nread = 0;
812         state->more = more;
813         state->private_data = private_data;
814
815         state->buf = talloc_array(state, uint8_t, initial);
816         if (state->buf == NULL) {
817                 goto fail;
818         }
819
820         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
821                             result);
822         if (fde == NULL) {
823                 goto fail;
824         }
825         return result;
826  fail:
827         TALLOC_FREE(result);
828         return NULL;
829 }
830
831 static void read_packet_handler(struct tevent_context *ev,
832                                 struct tevent_fd *fde,
833                                 uint16_t flags, void *private_data)
834 {
835         struct tevent_req *req = talloc_get_type_abort(
836                 private_data, struct tevent_req);
837         struct read_packet_state *state = talloc_get_type_abort(
838                 req->private_state, struct read_packet_state);
839         size_t total = talloc_get_size(state->buf);
840         ssize_t nread, more;
841         uint8_t *tmp;
842
843         nread = read(state->fd, state->buf+state->nread, total-state->nread);
844         if (nread == -1) {
845                 tevent_req_error(req, errno);
846                 return;
847         }
848         if (nread == 0) {
849                 tevent_req_error(req, EPIPE);
850                 return;
851         }
852
853         state->nread += nread;
854         if (state->nread < total) {
855                 /* Come back later */
856                 return;
857         }
858
859         /*
860          * We got what was initially requested. See if "more" asks for -- more.
861          */
862         if (state->more == NULL) {
863                 /* Nobody to ask, this is a async read_data */
864                 tevent_req_done(req);
865                 return;
866         }
867
868         more = state->more(state->buf, total, state->private_data);
869         if (more == -1) {
870                 /* We got an invalid packet, tell the caller */
871                 tevent_req_error(req, EIO);
872                 return;
873         }
874         if (more == 0) {
875                 /* We're done, full packet received */
876                 tevent_req_done(req);
877                 return;
878         }
879
880         tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
881         if (tevent_req_nomem(tmp, req)) {
882                 return;
883         }
884         state->buf = tmp;
885 }
886
887 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
888                          uint8_t **pbuf, int *perrno)
889 {
890         struct read_packet_state *state = talloc_get_type_abort(
891                 req->private_state, struct read_packet_state);
892
893         if (tevent_req_is_unix_error(req, perrno)) {
894                 return -1;
895         }
896         *pbuf = talloc_move(mem_ctx, &state->buf);
897         return talloc_get_size(*pbuf);
898 }