Add async read_packet
[kai/samba-autobuild/.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/talloc/talloc.h"
22 #include "lib/tevent/tevent.h"
23 #include "lib/async_req/async_req.h"
24 #include "lib/async_req/async_sock.h"
25 #include "lib/util/tevent_unix.h"
26 #include <fcntl.h>
27
28 #ifndef TALLOC_FREE
29 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
30 #endif
31
32 /**
33  * Discriminator for async_syscall_state
34  */
35 enum async_syscall_type {
36         ASYNC_SYSCALL_SEND,
37         ASYNC_SYSCALL_RECV,
38         ASYNC_SYSCALL_RECVALL,
39         ASYNC_SYSCALL_CONNECT
40 };
41
42 /**
43  * Holder for syscall arguments and the result
44  */
45
46 struct async_syscall_state {
47         enum async_syscall_type syscall_type;
48         struct tevent_fd *fde;
49
50         union {
51                 struct param_send {
52                         int fd;
53                         const void *buffer;
54                         size_t length;
55                         int flags;
56                 } param_send;
57                 struct param_recv {
58                         int fd;
59                         void *buffer;
60                         size_t length;
61                         int flags;
62                 } param_recv;
63                 struct param_recvall {
64                         int fd;
65                         void *buffer;
66                         size_t length;
67                         int flags;
68                         size_t received;
69                 } param_recvall;
70                 struct param_connect {
71                         /**
72                          * connect needs to be done on a nonblocking
73                          * socket. Keep the old flags around
74                          */
75                         long old_sockflags;
76                         int fd;
77                         const struct sockaddr *address;
78                         socklen_t address_len;
79                 } param_connect;
80         } param;
81
82         union {
83                 ssize_t result_ssize_t;
84                 size_t result_size_t;
85                 int result_int;
86         } result;
87         int sys_errno;
88 };
89
90 /**
91  * @brief Map async_req states to unix-style errnos
92  * @param[in]  req      The async req to get the state from
93  * @param[out] err      Pointer to take the unix-style errno
94  *
95  * @return true if the async_req is in an error state, false otherwise
96  */
97
98 bool async_req_is_errno(struct async_req *req, int *err)
99 {
100         enum async_req_state state;
101         uint64_t error;
102
103         if (!async_req_is_error(req, &state, &error)) {
104                 return false;
105         }
106
107         switch (state) {
108         case ASYNC_REQ_USER_ERROR:
109                 *err = (int)error;
110                 break;
111         case ASYNC_REQ_TIMED_OUT:
112 #ifdef ETIMEDOUT
113                 *err = ETIMEDOUT;
114 #else
115                 *err = EAGAIN;
116 #endif
117                 break;
118         case ASYNC_REQ_NO_MEMORY:
119                 *err = ENOMEM;
120                 break;
121         default:
122                 *err = EIO;
123                 break;
124         }
125         return true;
126 }
127
128 int async_req_simple_recv_errno(struct async_req *req)
129 {
130         int err;
131
132         if (async_req_is_errno(req, &err)) {
133                 return err;
134         }
135
136         return 0;
137 }
138
139 /**
140  * @brief Create a new async syscall req
141  * @param[in] mem_ctx   The memory context to hang the result off
142  * @param[in] ev        The event context to work from
143  * @param[in] type      Which syscall will this be
144  * @param[in] pstate    Where to put the newly created private_data state
145  * @retval The new request
146  *
147  * This is a helper function to prepare a new struct async_req with an
148  * associated struct async_syscall_state. The async_syscall_state will be put
149  * into the async_req as private_data.
150  */
151
152 static struct async_req *async_syscall_new(TALLOC_CTX *mem_ctx,
153                                            struct tevent_context *ev,
154                                            enum async_syscall_type type,
155                                            struct async_syscall_state **pstate)
156 {
157         struct async_req *result;
158         struct async_syscall_state *state;
159
160         if (!async_req_setup(mem_ctx, &result, &state,
161                              struct async_syscall_state)) {
162                 return NULL;
163         }
164         state->syscall_type = type;
165
166         result->private_data = state;
167
168         *pstate = state;
169
170         return result;
171 }
172
173 /**
174  * @brief Create a new async syscall req based on a fd
175  * @param[in] mem_ctx   The memory context to hang the result off
176  * @param[in] ev        The event context to work from
177  * @param[in] type      Which syscall will this be
178  * @param[in] fd        The file descriptor we work on
179  * @param[in] fde_flags TEVENT_FD_READ/WRITE -- what are we interested in?
180  * @param[in] fde_cb    The callback function for the file descriptor event
181  * @param[in] pstate    Where to put the newly created private_data state
182  * @retval The new request
183  *
184  * This is a helper function to prepare a new struct async_req with an
185  * associated struct async_syscall_state and an associated file descriptor
186  * event.
187  */
188
189 static struct async_req *async_fde_syscall_new(
190         TALLOC_CTX *mem_ctx,
191         struct tevent_context *ev,
192         enum async_syscall_type type,
193         int fd,
194         uint16_t fde_flags,
195         void (*fde_cb)(struct tevent_context *ev,
196                        struct tevent_fd *fde, uint16_t flags,
197                        void *priv),
198         struct async_syscall_state **pstate)
199 {
200         struct async_req *result;
201         struct async_syscall_state *state;
202
203         result = async_syscall_new(mem_ctx, ev, type, &state);
204         if (result == NULL) {
205                 return NULL;
206         }
207
208         state->fde = tevent_add_fd(ev, state, fd, fde_flags, fde_cb, result);
209         if (state->fde == NULL) {
210                 TALLOC_FREE(result);
211                 return NULL;
212         }
213         *pstate = state;
214         return result;
215 }
216
217 /**
218  * Retrieve a ssize_t typed result from an async syscall
219  * @param[in] req       The syscall that has just finished
220  * @param[out] perrno   Where to put the syscall's errno
221  * @retval The return value from the asynchronously called syscall
222  */
223
224 ssize_t async_syscall_result_ssize_t(struct async_req *req, int *perrno)
225 {
226         struct async_syscall_state *state = talloc_get_type_abort(
227                 req->private_data, struct async_syscall_state);
228
229         *perrno = state->sys_errno;
230         return state->result.result_ssize_t;
231 }
232
233 /**
234  * Retrieve a size_t typed result from an async syscall
235  * @param[in] req       The syscall that has just finished
236  * @param[out] perrno   Where to put the syscall's errno
237  * @retval The return value from the asynchronously called syscall
238  */
239
240 size_t async_syscall_result_size_t(struct async_req *req, int *perrno)
241 {
242         struct async_syscall_state *state = talloc_get_type_abort(
243                 req->private_data, struct async_syscall_state);
244
245         *perrno = state->sys_errno;
246         return state->result.result_size_t;
247 }
248
249 /**
250  * Retrieve a int typed result from an async syscall
251  * @param[in] req       The syscall that has just finished
252  * @param[out] perrno   Where to put the syscall's errno
253  * @retval The return value from the asynchronously called syscall
254  */
255
256 int async_syscall_result_int(struct async_req *req, int *perrno)
257 {
258         struct async_syscall_state *state = talloc_get_type_abort(
259                 req->private_data, struct async_syscall_state);
260
261         *perrno = state->sys_errno;
262         return state->result.result_int;
263 }
264
265 /**
266  * fde event handler for the "send" syscall
267  * @param[in] ev        The event context that sent us here
268  * @param[in] fde       The file descriptor event associated with the send
269  * @param[in] flags     Can only be TEVENT_FD_WRITE here
270  * @param[in] priv      private data, "struct async_req *" in this case
271  */
272
273 static void async_send_callback(struct tevent_context *ev,
274                                 struct tevent_fd *fde, uint16_t flags,
275                                 void *priv)
276 {
277         struct async_req *req = talloc_get_type_abort(
278                 priv, struct async_req);
279         struct async_syscall_state *state = talloc_get_type_abort(
280                 req->private_data, struct async_syscall_state);
281         struct param_send *p = &state->param.param_send;
282
283         if (state->syscall_type != ASYNC_SYSCALL_SEND) {
284                 async_req_error(req, EIO);
285                 return;
286         }
287
288         state->result.result_ssize_t = send(p->fd, p->buffer, p->length,
289                                             p->flags);
290         state->sys_errno = errno;
291
292         TALLOC_FREE(state->fde);
293
294         async_req_done(req);
295 }
296
297 /**
298  * Async version of send(2)
299  * @param[in] mem_ctx   The memory context to hang the result off
300  * @param[in] ev        The event context to work from
301  * @param[in] fd        The socket to send to
302  * @param[in] buffer    The buffer to send
303  * @param[in] length    How many bytes to send
304  * @param[in] flags     flags passed to send(2)
305  *
306  * This function is a direct counterpart of send(2)
307  */
308
309 struct async_req *async_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
310                              int fd, const void *buffer, size_t length,
311                              int flags)
312 {
313         struct async_req *result;
314         struct async_syscall_state *state;
315
316         result = async_fde_syscall_new(
317                 mem_ctx, ev, ASYNC_SYSCALL_SEND,
318                 fd, TEVENT_FD_WRITE, async_send_callback,
319                 &state);
320         if (result == NULL) {
321                 return NULL;
322         }
323
324         state->param.param_send.fd = fd;
325         state->param.param_send.buffer = buffer;
326         state->param.param_send.length = length;
327         state->param.param_send.flags = flags;
328
329         return result;
330 }
331
332 /**
333  * fde event handler for the "recv" syscall
334  * @param[in] ev        The event context that sent us here
335  * @param[in] fde       The file descriptor event associated with the recv
336  * @param[in] flags     Can only be TEVENT_FD_READ here
337  * @param[in] priv      private data, "struct async_req *" in this case
338  */
339
340 static void async_recv_callback(struct tevent_context *ev,
341                                 struct tevent_fd *fde, uint16_t flags,
342                                 void *priv)
343 {
344         struct async_req *req = talloc_get_type_abort(
345                 priv, struct async_req);
346         struct async_syscall_state *state = talloc_get_type_abort(
347                 req->private_data, struct async_syscall_state);
348         struct param_recv *p = &state->param.param_recv;
349
350         if (state->syscall_type != ASYNC_SYSCALL_RECV) {
351                 async_req_error(req, EIO);
352                 return;
353         }
354
355         state->result.result_ssize_t = recv(p->fd, p->buffer, p->length,
356                                             p->flags);
357         state->sys_errno = errno;
358
359         TALLOC_FREE(state->fde);
360
361         async_req_done(req);
362 }
363
364 /**
365  * Async version of recv(2)
366  * @param[in] mem_ctx   The memory context to hang the result off
367  * @param[in] ev        The event context to work from
368  * @param[in] fd        The socket to recv from
369  * @param[in] buffer    The buffer to recv into
370  * @param[in] length    How many bytes to recv
371  * @param[in] flags     flags passed to recv(2)
372  *
373  * This function is a direct counterpart of recv(2)
374  */
375
376 struct async_req *async_recv(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
377                              int fd, void *buffer, size_t length,
378                              int flags)
379 {
380         struct async_req *result;
381         struct async_syscall_state *state;
382
383         result = async_fde_syscall_new(
384                 mem_ctx, ev, ASYNC_SYSCALL_RECV,
385                 fd, TEVENT_FD_READ, async_recv_callback,
386                 &state);
387
388         if (result == NULL) {
389                 return NULL;
390         }
391
392         state->param.param_recv.fd = fd;
393         state->param.param_recv.buffer = buffer;
394         state->param.param_recv.length = length;
395         state->param.param_recv.flags = flags;
396
397         return result;
398 }
399
400 /**
401  * fde event handler for the "recvall" syscall group
402  * @param[in] ev        The event context that sent us here
403  * @param[in] fde       The file descriptor event associated with the recv
404  * @param[in] flags     Can only be TEVENT_FD_READ here
405  * @param[in] priv      private data, "struct async_req *" in this case
406  */
407
408 static void async_recvall_callback(struct tevent_context *ev,
409                                    struct tevent_fd *fde, uint16_t flags,
410                                    void *priv)
411 {
412         struct async_req *req = talloc_get_type_abort(
413                 priv, struct async_req);
414         struct async_syscall_state *state = talloc_get_type_abort(
415                 req->private_data, struct async_syscall_state);
416         struct param_recvall *p = &state->param.param_recvall;
417
418         if (state->syscall_type != ASYNC_SYSCALL_RECVALL) {
419                 async_req_error(req, EIO);
420                 return;
421         }
422
423         state->result.result_ssize_t = recv(p->fd,
424                                             (char *)p->buffer + p->received,
425                                             p->length - p->received, p->flags);
426         state->sys_errno = errno;
427
428         if (state->result.result_ssize_t == -1) {
429                 async_req_error(req, state->sys_errno);
430                 return;
431         }
432
433         if (state->result.result_ssize_t == 0) {
434                 async_req_error(req, EIO);
435                 return;
436         }
437
438         p->received += state->result.result_ssize_t;
439         if (p->received > p->length) {
440                 async_req_error(req, EIO);
441                 return;
442         }
443
444         if (p->received == p->length) {
445                 TALLOC_FREE(state->fde);
446                 async_req_done(req);
447         }
448 }
449
450 /**
451  * Receive a specified number of bytes from a socket
452  * @param[in] mem_ctx   The memory context to hang the result off
453  * @param[in] ev        The event context to work from
454  * @param[in] fd        The socket to recv from
455  * @param[in] buffer    The buffer to recv into
456  * @param[in] length    How many bytes to recv
457  * @param[in] flags     flags passed to recv(2)
458  *
459  * async_recvall will call recv(2) until "length" bytes are received
460  */
461
462 struct async_req *recvall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
463                                int fd, void *buffer, size_t length,
464                                int flags)
465 {
466         struct async_req *result;
467         struct async_syscall_state *state;
468
469         result = async_fde_syscall_new(
470                 mem_ctx, ev, ASYNC_SYSCALL_RECVALL,
471                 fd, TEVENT_FD_READ, async_recvall_callback,
472                 &state);
473         if (result == NULL) {
474                 return NULL;
475         }
476
477         state->param.param_recvall.fd = fd;
478         state->param.param_recvall.buffer = buffer;
479         state->param.param_recvall.length = length;
480         state->param.param_recvall.flags = flags;
481         state->param.param_recvall.received = 0;
482
483         return result;
484 }
485
486 ssize_t recvall_recv(struct async_req *req, int *perr)
487 {
488         struct async_syscall_state *state = talloc_get_type_abort(
489                 req->private_data, struct async_syscall_state);
490         int err;
491
492         err = async_req_simple_recv_errno(req);
493
494         if (err != 0) {
495                 *perr = err;
496                 return -1;
497         }
498
499         return state->result.result_ssize_t;
500 }
501
502 struct async_connect_state {
503         int fd;
504         int result;
505         int sys_errno;
506         long old_sockflags;
507 };
508
509 static void async_connect_connected(struct tevent_context *ev,
510                                     struct tevent_fd *fde, uint16_t flags,
511                                     void *priv);
512
513 /**
514  * @brief async version of connect(2)
515  * @param[in] mem_ctx   The memory context to hang the result off
516  * @param[in] ev        The event context to work from
517  * @param[in] fd        The socket to recv from
518  * @param[in] address   Where to connect?
519  * @param[in] address_len Length of *address
520  * @retval The async request
521  *
522  * This function sets the socket into non-blocking state to be able to call
523  * connect in an async state. This will be reset when the request is finished.
524  */
525
526 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
527                                       struct tevent_context *ev,
528                                       int fd, const struct sockaddr *address,
529                                       socklen_t address_len)
530 {
531         struct tevent_req *result;
532         struct async_connect_state *state;
533         struct tevent_fd *fde;
534
535         result = tevent_req_create(
536                 mem_ctx, &state, struct async_connect_state);
537         if (result == NULL) {
538                 return NULL;
539         }
540
541         /**
542          * We have to set the socket to nonblocking for async connect(2). Keep
543          * the old sockflags around.
544          */
545
546         state->fd = fd;
547         state->sys_errno = 0;
548
549         state->old_sockflags = fcntl(fd, F_GETFL, 0);
550         if (state->old_sockflags == -1) {
551                 goto post_errno;
552         }
553
554         set_blocking(fd, false);
555
556         state->result = connect(fd, address, address_len);
557         if (state->result == 0) {
558                 errno = 0;
559                 goto post_errno;
560         }
561
562         /**
563          * A number of error messages show that something good is progressing
564          * and that we have to wait for readability.
565          *
566          * If none of them are present, bail out.
567          */
568
569         if (!(errno == EINPROGRESS || errno == EALREADY ||
570 #ifdef EISCONN
571               errno == EISCONN ||
572 #endif
573               errno == EAGAIN || errno == EINTR)) {
574                 goto post_errno;
575         }
576
577         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
578                            async_connect_connected, result);
579         if (fde == NULL) {
580                 errno = ENOMEM;
581                 goto post_errno;
582         }
583         return result;
584
585  post_errno:
586         state->sys_errno = errno;
587         fcntl(fd, F_SETFL, state->old_sockflags);
588         if (state->sys_errno == 0) {
589                 tevent_req_done(result);
590         } else {
591                 tevent_req_error(result, state->sys_errno);
592         }
593         return tevent_req_post(result, ev);
594 }
595
596 /**
597  * fde event handler for connect(2)
598  * @param[in] ev        The event context that sent us here
599  * @param[in] fde       The file descriptor event associated with the connect
600  * @param[in] flags     Indicate read/writeability of the socket
601  * @param[in] priv      private data, "struct async_req *" in this case
602  */
603
604 static void async_connect_connected(struct tevent_context *ev,
605                                     struct tevent_fd *fde, uint16_t flags,
606                                     void *priv)
607 {
608         struct tevent_req *req = talloc_get_type_abort(
609                 priv, struct tevent_req);
610         struct async_connect_state *state = talloc_get_type_abort(
611                 req->private_state, struct async_connect_state);
612
613         TALLOC_FREE(fde);
614
615         /*
616          * Stevens, Network Programming says that if there's a
617          * successful connect, the socket is only writable. Upon an
618          * error, it's both readable and writable.
619          */
620         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
621             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
622                 int sockerr;
623                 socklen_t err_len = sizeof(sockerr);
624
625                 if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
626                                (void *)&sockerr, &err_len) == 0) {
627                         errno = sockerr;
628                 }
629
630                 state->sys_errno = errno;
631
632                 DEBUG(10, ("connect returned %s\n", strerror(errno)));
633
634                 fcntl(state->fd, F_SETFL, state->old_sockflags);
635                 tevent_req_error(req, state->sys_errno);
636                 return;
637         }
638
639         state->sys_errno = 0;
640         tevent_req_done(req);
641 }
642
643 int async_connect_recv(struct tevent_req *req, int *perrno)
644 {
645         struct async_connect_state *state = talloc_get_type_abort(
646                 req->private_state, struct async_connect_state);
647         int err;
648
649         fcntl(state->fd, F_SETFL, state->old_sockflags);
650
651         if (tevent_req_is_unix_error(req, &err)) {
652                 *perrno = err;
653                 return -1;
654         }
655
656         if (state->sys_errno == 0) {
657                 return 0;
658         }
659
660         *perrno = state->sys_errno;
661         return -1;
662 }
663
664 struct writev_state {
665         struct tevent_context *ev;
666         int fd;
667         struct iovec *iov;
668         int count;
669         size_t total_size;
670 };
671
672 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
673                            uint16_t flags, void *private_data);
674
675 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
676                                int fd, struct iovec *iov, int count)
677 {
678         struct tevent_req *result;
679         struct writev_state *state;
680         struct tevent_fd *fde;
681
682         result = tevent_req_create(mem_ctx, &state, struct writev_state);
683         if (result == NULL) {
684                 return NULL;
685         }
686         state->ev = ev;
687         state->fd = fd;
688         state->total_size = 0;
689         state->count = count;
690         state->iov = (struct iovec *)talloc_memdup(
691                 state, iov, sizeof(struct iovec) * count);
692         if (state->iov == NULL) {
693                 goto fail;
694         }
695
696         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, writev_handler,
697                             result);
698         if (fde == NULL) {
699                 goto fail;
700         }
701         return result;
702
703  fail:
704         TALLOC_FREE(result);
705         return NULL;
706 }
707
708 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
709                            uint16_t flags, void *private_data)
710 {
711         struct tevent_req *req = talloc_get_type_abort(
712                 private_data, struct tevent_req);
713         struct writev_state *state = talloc_get_type_abort(
714                 req->private_state, struct writev_state);
715         size_t to_write, written;
716         int i;
717
718         to_write = 0;
719
720         for (i=0; i<state->count; i++) {
721                 to_write += state->iov[i].iov_len;
722         }
723
724         written = sys_writev(state->fd, state->iov, state->count);
725         if (written == -1) {
726                 tevent_req_error(req, errno);
727                 return;
728         }
729         if (written == 0) {
730                 tevent_req_error(req, EPIPE);
731                 return;
732         }
733         state->total_size += written;
734
735         if (written == to_write) {
736                 tevent_req_done(req);
737                 return;
738         }
739
740         /*
741          * We've written less than we were asked to, drop stuff from
742          * state->iov.
743          */
744
745         while (written > 0) {
746                 if (written < state->iov[0].iov_len) {
747                         state->iov[0].iov_base =
748                                 (char *)state->iov[0].iov_base + written;
749                         state->iov[0].iov_len -= written;
750                         break;
751                 }
752                 written = state->iov[0].iov_len;
753                 state->iov += 1;
754                 state->count -= 1;
755         }
756 }
757
758 ssize_t writev_recv(struct tevent_req *req, int *perrno)
759 {
760         struct writev_state *state = talloc_get_type_abort(
761                 req->private_state, struct writev_state);
762
763         if (tevent_req_is_unix_error(req, perrno)) {
764                 return -1;
765         }
766         return state->total_size;
767 }
768
769 struct read_packet_state {
770         int fd;
771         uint8_t *buf;
772         size_t nread;
773         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
774         void *private_data;
775 };
776
777 static void read_packet_handler(struct tevent_context *ev,
778                                 struct tevent_fd *fde,
779                                 uint16_t flags, void *private_data);
780
781 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
782                                     struct tevent_context *ev,
783                                     int fd, size_t initial,
784                                     ssize_t (*more)(uint8_t *buf,
785                                                     size_t buflen,
786                                                     void *private_data),
787                                     void *private_data)
788 {
789         struct tevent_req *result;
790         struct read_packet_state *state;
791         struct tevent_fd *fde;
792
793         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
794         if (result == NULL) {
795                 return NULL;
796         }
797         state->fd = fd;
798         state->nread = 0;
799         state->more = more;
800         state->private_data = private_data;
801
802         state->buf = talloc_array(state, uint8_t, initial);
803         if (state->buf == NULL) {
804                 goto fail;
805         }
806
807         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
808                             result);
809         if (fde == NULL) {
810                 goto fail;
811         }
812         return result;
813  fail:
814         TALLOC_FREE(result);
815         return NULL;
816 }
817
818 static void read_packet_handler(struct tevent_context *ev,
819                                 struct tevent_fd *fde,
820                                 uint16_t flags, void *private_data)
821 {
822         struct tevent_req *req = talloc_get_type_abort(
823                 private_data, struct tevent_req);
824         struct read_packet_state *state = talloc_get_type_abort(
825                 req->private_state, struct read_packet_state);
826         size_t total = talloc_get_size(state->buf);
827         ssize_t nread, more;
828         uint8_t *tmp;
829
830         nread = read(state->fd, state->buf+state->nread, total-state->nread);
831         if (nread == -1) {
832                 tevent_req_error(req, errno);
833                 return;
834         }
835         if (nread == 0) {
836                 tevent_req_error(req, EPIPE);
837                 return;
838         }
839
840         state->nread += nread;
841         if (state->nread < total) {
842                 /* Come back later */
843                 return;
844         }
845
846         /*
847          * We got what was initially requested. See if "more" asks for -- more.
848          */
849         if (state->more == NULL) {
850                 /* Nobody to ask, this is a async read_data */
851                 tevent_req_done(req);
852                 return;
853         }
854
855         more = state->more(state->buf, total, state->private_data);
856         if (more == -1) {
857                 /* We got an invalid packet, tell the caller */
858                 tevent_req_error(req, EIO);
859                 return;
860         }
861         if (more == 0) {
862                 /* We're done, full packet received */
863                 tevent_req_done(req);
864                 return;
865         }
866
867         tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
868         if (tevent_req_nomem(tmp, req)) {
869                 return;
870         }
871         state->buf = tmp;
872 }
873
874 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
875                          uint8_t **pbuf, int *perrno)
876 {
877         struct read_packet_state *state = talloc_get_type_abort(
878                 req->private_state, struct read_packet_state);
879
880         if (tevent_req_is_unix_error(req, perrno)) {
881                 return -1;
882         }
883         *pbuf = talloc_move(mem_ctx, &state->buf);
884         return talloc_get_size(*pbuf);
885 }