X-Git-Url: http://git.samba.org/?a=blobdiff_plain;f=lib%2Fasync_req%2Fasync_sock.c;h=545d21378fa1b7a082763231ec639b8ba41d200b;hb=05cf2d41cc16cf0ebd3605028a1723102449ccc3;hp=3563421e0e56c62363ab752e55db3ea788db2136;hpb=258ae4cec596631b758fb17c170c4494e4db8a8e;p=samba.git diff --git a/lib/async_req/async_sock.c b/lib/async_req/async_sock.c index 3563421e0e5..545d21378fa 100644 --- a/lib/async_req/async_sock.c +++ b/lib/async_req/async_sock.c @@ -3,403 +3,62 @@ async socket syscalls Copyright (C) Volker Lendecke 2008 - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. + ** NOTE! The following LGPL license applies to the async_sock + ** library. This does NOT imply that all of Samba is released + ** under the LGPL - This program is distributed in the hope that it will be useful, + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Library General Public License for more details. - You should have received a copy of the GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ -#include "includes.h" -#include "lib/talloc/talloc.h" -#include "lib/tevent/tevent.h" -#include "lib/async_req/async_req.h" +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" +#include +#include #include "lib/async_req/async_sock.h" + +/* Note: lib/util/ is currently GPL */ #include "lib/util/tevent_unix.h" -#include +#include "lib/util/samba_util.h" #ifndef TALLOC_FREE #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0) #endif -/** - * Discriminator for async_syscall_state - */ -enum async_syscall_type { - ASYNC_SYSCALL_SEND, - ASYNC_SYSCALL_RECV, -}; - -/** - * Holder for syscall arguments and the result - */ - -struct async_syscall_state { - enum async_syscall_type syscall_type; - struct tevent_fd *fde; - - union { - struct param_send { - int fd; - const void *buffer; - size_t length; - int flags; - } param_send; - struct param_recv { - int fd; - void *buffer; - size_t length; - int flags; - } param_recv; - } param; - - union { - ssize_t result_ssize_t; - size_t result_size_t; - int result_int; - } result; - int sys_errno; -}; - -/** - * @brief Map async_req states to unix-style errnos - * @param[in] req The async req to get the state from - * @param[out] err Pointer to take the unix-style errno - * - * @return true if the async_req is in an error state, false otherwise - */ - -bool async_req_is_errno(struct async_req *req, int *err) -{ - enum async_req_state state; - uint64_t error; - - if (!async_req_is_error(req, &state, &error)) { - return false; - } - - switch (state) { - case ASYNC_REQ_USER_ERROR: - *err = (int)error; - break; - case ASYNC_REQ_TIMED_OUT: -#ifdef ETIMEDOUT - *err = ETIMEDOUT; -#else - *err = EAGAIN; -#endif - break; - case ASYNC_REQ_NO_MEMORY: - *err = ENOMEM; - break; - default: - *err = EIO; - break; - } - return true; -} - -int async_req_simple_recv_errno(struct async_req *req) -{ - int err; - - if (async_req_is_errno(req, &err)) { - return err; - } - - return 0; -} - -/** - * @brief Create a new async syscall req - * @param[in] mem_ctx The memory context to hang the result off - * @param[in] ev The event context to work from - * @param[in] type Which syscall will this be - * @param[in] pstate Where to put the newly created private_data state - * @retval The new request - * - * This is a helper function to prepare a new struct async_req with an - * associated struct async_syscall_state. The async_syscall_state will be put - * into the async_req as private_data. - */ - -static struct async_req *async_syscall_new(TALLOC_CTX *mem_ctx, - struct tevent_context *ev, - enum async_syscall_type type, - struct async_syscall_state **pstate) -{ - struct async_req *result; - struct async_syscall_state *state; - - if (!async_req_setup(mem_ctx, &result, &state, - struct async_syscall_state)) { - return NULL; - } - state->syscall_type = type; - - result->private_data = state; - - *pstate = state; - - return result; -} - -/** - * @brief Create a new async syscall req based on a fd - * @param[in] mem_ctx The memory context to hang the result off - * @param[in] ev The event context to work from - * @param[in] type Which syscall will this be - * @param[in] fd The file descriptor we work on - * @param[in] fde_flags TEVENT_FD_READ/WRITE -- what are we interested in? - * @param[in] fde_cb The callback function for the file descriptor event - * @param[in] pstate Where to put the newly created private_data state - * @retval The new request - * - * This is a helper function to prepare a new struct async_req with an - * associated struct async_syscall_state and an associated file descriptor - * event. - */ - -static struct async_req *async_fde_syscall_new( - TALLOC_CTX *mem_ctx, - struct tevent_context *ev, - enum async_syscall_type type, - int fd, - uint16_t fde_flags, - void (*fde_cb)(struct tevent_context *ev, - struct tevent_fd *fde, uint16_t flags, - void *priv), - struct async_syscall_state **pstate) -{ - struct async_req *result; - struct async_syscall_state *state; - - result = async_syscall_new(mem_ctx, ev, type, &state); - if (result == NULL) { - return NULL; - } - - state->fde = tevent_add_fd(ev, state, fd, fde_flags, fde_cb, result); - if (state->fde == NULL) { - TALLOC_FREE(result); - return NULL; - } - *pstate = state; - return result; -} - -/** - * Retrieve a ssize_t typed result from an async syscall - * @param[in] req The syscall that has just finished - * @param[out] perrno Where to put the syscall's errno - * @retval The return value from the asynchronously called syscall - */ - -ssize_t async_syscall_result_ssize_t(struct async_req *req, int *perrno) -{ - struct async_syscall_state *state = talloc_get_type_abort( - req->private_data, struct async_syscall_state); - - *perrno = state->sys_errno; - return state->result.result_ssize_t; -} - -/** - * Retrieve a size_t typed result from an async syscall - * @param[in] req The syscall that has just finished - * @param[out] perrno Where to put the syscall's errno - * @retval The return value from the asynchronously called syscall - */ - -size_t async_syscall_result_size_t(struct async_req *req, int *perrno) -{ - struct async_syscall_state *state = talloc_get_type_abort( - req->private_data, struct async_syscall_state); - - *perrno = state->sys_errno; - return state->result.result_size_t; -} - -/** - * Retrieve a int typed result from an async syscall - * @param[in] req The syscall that has just finished - * @param[out] perrno Where to put the syscall's errno - * @retval The return value from the asynchronously called syscall - */ - -int async_syscall_result_int(struct async_req *req, int *perrno) -{ - struct async_syscall_state *state = talloc_get_type_abort( - req->private_data, struct async_syscall_state); - - *perrno = state->sys_errno; - return state->result.result_int; -} - -/** - * fde event handler for the "send" syscall - * @param[in] ev The event context that sent us here - * @param[in] fde The file descriptor event associated with the send - * @param[in] flags Can only be TEVENT_FD_WRITE here - * @param[in] priv private data, "struct async_req *" in this case - */ - -static void async_send_callback(struct tevent_context *ev, - struct tevent_fd *fde, uint16_t flags, - void *priv) -{ - struct async_req *req = talloc_get_type_abort( - priv, struct async_req); - struct async_syscall_state *state = talloc_get_type_abort( - req->private_data, struct async_syscall_state); - struct param_send *p = &state->param.param_send; - - if (state->syscall_type != ASYNC_SYSCALL_SEND) { - async_req_error(req, EIO); - return; - } - - state->result.result_ssize_t = send(p->fd, p->buffer, p->length, - p->flags); - state->sys_errno = errno; - - TALLOC_FREE(state->fde); - - async_req_done(req); -} - -/** - * Async version of send(2) - * @param[in] mem_ctx The memory context to hang the result off - * @param[in] ev The event context to work from - * @param[in] fd The socket to send to - * @param[in] buffer The buffer to send - * @param[in] length How many bytes to send - * @param[in] flags flags passed to send(2) - * - * This function is a direct counterpart of send(2) - */ - -struct async_req *async_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, - int fd, const void *buffer, size_t length, - int flags) -{ - struct async_req *result; - struct async_syscall_state *state; - - result = async_fde_syscall_new( - mem_ctx, ev, ASYNC_SYSCALL_SEND, - fd, TEVENT_FD_WRITE, async_send_callback, - &state); - if (result == NULL) { - return NULL; - } - - state->param.param_send.fd = fd; - state->param.param_send.buffer = buffer; - state->param.param_send.length = length; - state->param.param_send.flags = flags; - - return result; -} - -/** - * fde event handler for the "recv" syscall - * @param[in] ev The event context that sent us here - * @param[in] fde The file descriptor event associated with the recv - * @param[in] flags Can only be TEVENT_FD_READ here - * @param[in] priv private data, "struct async_req *" in this case - */ - -static void async_recv_callback(struct tevent_context *ev, - struct tevent_fd *fde, uint16_t flags, - void *priv) -{ - struct async_req *req = talloc_get_type_abort( - priv, struct async_req); - struct async_syscall_state *state = talloc_get_type_abort( - req->private_data, struct async_syscall_state); - struct param_recv *p = &state->param.param_recv; - - if (state->syscall_type != ASYNC_SYSCALL_RECV) { - async_req_error(req, EIO); - return; - } - - state->result.result_ssize_t = recv(p->fd, p->buffer, p->length, - p->flags); - state->sys_errno = errno; - - TALLOC_FREE(state->fde); - - async_req_done(req); -} - -/** - * Async version of recv(2) - * @param[in] mem_ctx The memory context to hang the result off - * @param[in] ev The event context to work from - * @param[in] fd The socket to recv from - * @param[in] buffer The buffer to recv into - * @param[in] length How many bytes to recv - * @param[in] flags flags passed to recv(2) - * - * This function is a direct counterpart of recv(2) - */ - -struct async_req *async_recv(TALLOC_CTX *mem_ctx, struct tevent_context *ev, - int fd, void *buffer, size_t length, - int flags) -{ - struct async_req *result; - struct async_syscall_state *state; - - result = async_fde_syscall_new( - mem_ctx, ev, ASYNC_SYSCALL_RECV, - fd, TEVENT_FD_READ, async_recv_callback, - &state); - - if (result == NULL) { - return NULL; - } - - state->param.param_recv.fd = fd; - state->param.param_recv.buffer = buffer; - state->param.param_recv.length = length; - state->param.param_recv.flags = flags; - - return result; -} - -struct async_send_state { +struct sendto_state { int fd; const void *buf; size_t len; int flags; + const struct sockaddr_storage *addr; + socklen_t addr_len; ssize_t sent; }; -static void async_send_handler(struct tevent_context *ev, +static void sendto_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data); -struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx, - struct tevent_context *ev, - int fd, const void *buf, size_t len, - int flags) +struct tevent_req *sendto_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + int fd, const void *buf, size_t len, int flags, + const struct sockaddr_storage *addr) { struct tevent_req *result; - struct async_send_state *state; + struct sendto_state *state; struct tevent_fd *fde; - result = tevent_req_create(mem_ctx, &state, struct async_send_state); + result = tevent_req_create(mem_ctx, &state, struct sendto_state); if (result == NULL) { return result; } @@ -407,8 +66,26 @@ struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx, state->buf = buf; state->len = len; state->flags = flags; + state->addr = addr; + + switch (addr->ss_family) { + case AF_INET: + state->addr_len = sizeof(struct sockaddr_in); + break; +#if defined(HAVE_IPV6) + case AF_INET6: + state->addr_len = sizeof(struct sockaddr_in6); + break; +#endif + case AF_UNIX: + state->addr_len = sizeof(struct sockaddr_un); + break; + default: + state->addr_len = sizeof(struct sockaddr_storage); + break; + } - fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler, + fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, sendto_handler, result); if (fde == NULL) { TALLOC_FREE(result); @@ -417,16 +94,22 @@ struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx, return result; } -static void async_send_handler(struct tevent_context *ev, +static void sendto_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data) { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); - struct async_send_state *state = talloc_get_type_abort( - req->private_state, struct async_send_state); - - state->sent = send(state->fd, state->buf, state->len, state->flags); + struct sendto_state *state = + tevent_req_data(req, struct sendto_state); + + state->sent = sendto(state->fd, state->buf, state->len, state->flags, + (const struct sockaddr *)state->addr, + state->addr_len); + if ((state->sent == -1) && (errno == EINTR)) { + /* retry */ + return; + } if (state->sent == -1) { tevent_req_error(req, errno); return; @@ -434,10 +117,10 @@ static void async_send_handler(struct tevent_context *ev, tevent_req_done(req); } -ssize_t async_send_recv(struct tevent_req *req, int *perrno) +ssize_t sendto_recv(struct tevent_req *req, int *perrno) { - struct async_send_state *state = talloc_get_type_abort( - req->private_state, struct async_send_state); + struct sendto_state *state = + tevent_req_data(req, struct sendto_state); if (tevent_req_is_unix_error(req, perrno)) { return -1; @@ -445,27 +128,31 @@ ssize_t async_send_recv(struct tevent_req *req, int *perrno) return state->sent; } -struct async_recv_state { +struct recvfrom_state { int fd; void *buf; size_t len; int flags; + struct sockaddr_storage *addr; + socklen_t *addr_len; ssize_t received; }; -static void async_recv_handler(struct tevent_context *ev, +static void recvfrom_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data); -struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx, - struct tevent_context *ev, - int fd, void *buf, size_t len, int flags) +struct tevent_req *recvfrom_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, void *buf, size_t len, int flags, + struct sockaddr_storage *addr, + socklen_t *addr_len) { struct tevent_req *result; - struct async_recv_state *state; + struct recvfrom_state *state; struct tevent_fd *fde; - result = tevent_req_create(mem_ctx, &state, struct async_recv_state); + result = tevent_req_create(mem_ctx, &state, struct recvfrom_state); if (result == NULL) { return result; } @@ -473,8 +160,10 @@ struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx, state->buf = buf; state->len = len; state->flags = flags; + state->addr = addr; + state->addr_len = addr_len; - fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler, + fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, recvfrom_handler, result); if (fde == NULL) { TALLOC_FREE(result); @@ -483,17 +172,26 @@ struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx, return result; } -static void async_recv_handler(struct tevent_context *ev, +static void recvfrom_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data) { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); - struct async_recv_state *state = talloc_get_type_abort( - req->private_state, struct async_recv_state); - - state->received = recv(state->fd, state->buf, state->len, - state->flags); + struct recvfrom_state *state = + tevent_req_data(req, struct recvfrom_state); + + state->received = recvfrom(state->fd, state->buf, state->len, + state->flags, (struct sockaddr *)state->addr, + state->addr_len); + if ((state->received == -1) && (errno == EINTR)) { + /* retry */ + return; + } + if (state->received == 0) { + tevent_req_error(req, EPIPE); + return; + } if (state->received == -1) { tevent_req_error(req, errno); return; @@ -501,10 +199,10 @@ static void async_recv_handler(struct tevent_context *ev, tevent_req_done(req); } -ssize_t async_recv_recv(struct tevent_req *req, int *perrno) +ssize_t recvfrom_recv(struct tevent_req *req, int *perrno) { - struct async_recv_state *state = talloc_get_type_abort( - req->private_state, struct async_recv_state); + struct recvfrom_state *state = + tevent_req_data(req, struct recvfrom_state); if (tevent_req_is_unix_error(req, perrno)) { return -1; @@ -517,6 +215,8 @@ struct async_connect_state { int result; int sys_errno; long old_sockflags; + socklen_t address_len; + struct sockaddr_storage address; }; static void async_connect_connected(struct tevent_context *ev, @@ -564,12 +264,19 @@ struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx, goto post_errno; } + state->address_len = address_len; + if (address_len > sizeof(state->address)) { + errno = EINVAL; + goto post_errno; + } + memcpy(&state->address, address, address_len); + set_blocking(fd, false); state->result = connect(fd, address, address_len); if (state->result == 0) { - errno = 0; - goto post_errno; + tevent_req_done(result); + goto done; } /** @@ -584,25 +291,22 @@ struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx, errno == EISCONN || #endif errno == EAGAIN || errno == EINTR)) { + state->sys_errno = errno; goto post_errno; } fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE, async_connect_connected, result); if (fde == NULL) { - errno = ENOMEM; + state->sys_errno = ENOMEM; goto post_errno; } return result; post_errno: - state->sys_errno = errno; + tevent_req_error(result, state->sys_errno); + done: fcntl(fd, F_SETFL, state->old_sockflags); - if (state->sys_errno == 0) { - tevent_req_done(result); - } else { - tevent_req_error(result, state->sys_errno); - } return tevent_req_post(result, ev); } @@ -620,43 +324,32 @@ static void async_connect_connected(struct tevent_context *ev, { struct tevent_req *req = talloc_get_type_abort( priv, struct tevent_req); - struct async_connect_state *state = talloc_get_type_abort( - req->private_state, struct async_connect_state); - - TALLOC_FREE(fde); - - /* - * Stevens, Network Programming says that if there's a - * successful connect, the socket is only writable. Upon an - * error, it's both readable and writable. - */ - if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE)) - == (TEVENT_FD_READ|TEVENT_FD_WRITE)) { - int sockerr; - socklen_t err_len = sizeof(sockerr); - - if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR, - (void *)&sockerr, &err_len) == 0) { - errno = sockerr; - } - - state->sys_errno = errno; - - DEBUG(10, ("connect returned %s\n", strerror(errno))); - - fcntl(state->fd, F_SETFL, state->old_sockflags); - tevent_req_error(req, state->sys_errno); + struct async_connect_state *state = + tevent_req_data(req, struct async_connect_state); + int ret; + + ret = connect(state->fd, (struct sockaddr *)(void *)&state->address, + state->address_len); + if (ret == 0) { + state->sys_errno = 0; + TALLOC_FREE(fde); + tevent_req_done(req); return; } - - state->sys_errno = 0; - tevent_req_done(req); + if (errno == EINPROGRESS) { + /* Try again later, leave the fde around */ + return; + } + state->sys_errno = errno; + TALLOC_FREE(fde); + tevent_req_error(req, errno); + return; } int async_connect_recv(struct tevent_req *req, int *perrno) { - struct async_connect_state *state = talloc_get_type_abort( - req->private_state, struct async_connect_state); + struct async_connect_state *state = + tevent_req_data(req, struct async_connect_state); int err; fcntl(state->fd, F_SETFL, state->old_sockflags); @@ -680,20 +373,24 @@ struct writev_state { struct iovec *iov; int count; size_t total_size; + uint16_t flags; + bool err_on_readability; }; +static void writev_trigger(struct tevent_req *req, void *private_data); static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data); struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, - int fd, struct iovec *iov, int count) + struct tevent_queue *queue, int fd, + bool err_on_readability, + struct iovec *iov, int count) { - struct tevent_req *result; + struct tevent_req *req; struct writev_state *state; - struct tevent_fd *fde; - result = tevent_req_create(mem_ctx, &state, struct writev_state); - if (result == NULL) { + req = tevent_req_create(mem_ctx, &state, struct writev_state); + if (req == NULL) { return NULL; } state->ev = ev; @@ -705,36 +402,93 @@ struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, if (state->iov == NULL) { goto fail; } + state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ; + state->err_on_readability = err_on_readability; - fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, writev_handler, - result); - if (fde == NULL) { - goto fail; + if (queue == NULL) { + struct tevent_fd *fde; + fde = tevent_add_fd(state->ev, state, state->fd, + state->flags, writev_handler, req); + if (tevent_req_nomem(fde, req)) { + return tevent_req_post(req, ev); + } + return req; } - return result; + if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) { + goto fail; + } + return req; fail: - TALLOC_FREE(result); + TALLOC_FREE(req); return NULL; } +static void writev_trigger(struct tevent_req *req, void *private_data) +{ + struct writev_state *state = tevent_req_data(req, struct writev_state); + struct tevent_fd *fde; + + fde = tevent_add_fd(state->ev, state, state->fd, state->flags, + writev_handler, req); + if (fde == NULL) { + tevent_req_error(req, ENOMEM); + } +} + static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data) { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); - struct writev_state *state = talloc_get_type_abort( - req->private_state, struct writev_state); + struct writev_state *state = + tevent_req_data(req, struct writev_state); size_t to_write, written; int i; to_write = 0; + if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) { + int ret, value; + + if (state->err_on_readability) { + /* Readable and the caller wants an error on read. */ + tevent_req_error(req, EPIPE); + return; + } + + /* Might be an error. Check if there are bytes to read */ + ret = ioctl(state->fd, FIONREAD, &value); + /* FIXME - should we also check + for ret == 0 and value == 0 here ? */ + if (ret == -1) { + /* There's an error. */ + tevent_req_error(req, EPIPE); + return; + } + /* A request for TEVENT_FD_READ will succeed from now and + forevermore until the bytes are read so if there was + an error we'll wait until we do read, then get it in + the read callback function. Until then, remove TEVENT_FD_READ + from the flags we're waiting for. */ + state->flags &= ~TEVENT_FD_READ; + TEVENT_FD_NOT_READABLE(fde); + + /* If not writable, we're done. */ + if (!(flags & TEVENT_FD_WRITE)) { + return; + } + } + for (i=0; icount; i++) { to_write += state->iov[i].iov_len; } - written = sys_writev(state->fd, state->iov, state->count); + written = writev(state->fd, state->iov, state->count); + if ((written == -1) && (errno == EINTR)) { + /* retry */ + return; + } if (written == -1) { tevent_req_error(req, errno); return; @@ -762,7 +516,7 @@ static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde, state->iov[0].iov_len -= written; break; } - written = state->iov[0].iov_len; + written -= state->iov[0].iov_len; state->iov += 1; state->count -= 1; } @@ -770,8 +524,8 @@ static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde, ssize_t writev_recv(struct tevent_req *req, int *perrno) { - struct writev_state *state = talloc_get_type_abort( - req->private_state, struct writev_state); + struct writev_state *state = + tevent_req_data(req, struct writev_state); if (tevent_req_is_unix_error(req, perrno)) { return -1; @@ -834,13 +588,18 @@ static void read_packet_handler(struct tevent_context *ev, { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); - struct read_packet_state *state = talloc_get_type_abort( - req->private_state, struct read_packet_state); + struct read_packet_state *state = + tevent_req_data(req, struct read_packet_state); size_t total = talloc_get_size(state->buf); ssize_t nread, more; uint8_t *tmp; - nread = read(state->fd, state->buf+state->nread, total-state->nread); + nread = recv(state->fd, state->buf+state->nread, total-state->nread, + 0); + if ((nread == -1) && (errno == EINTR)) { + /* retry */ + return; + } if (nread == -1) { tevent_req_error(req, errno); return; @@ -877,7 +636,7 @@ static void read_packet_handler(struct tevent_context *ev, return; } - tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more); + tmp = talloc_realloc(state, state->buf, uint8_t, total+more); if (tevent_req_nomem(tmp, req)) { return; } @@ -887,8 +646,8 @@ static void read_packet_handler(struct tevent_context *ev, ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pbuf, int *perrno) { - struct read_packet_state *state = talloc_get_type_abort( - req->private_state, struct read_packet_state); + struct read_packet_state *state = + tevent_req_data(req, struct read_packet_state); if (tevent_req_is_unix_error(req, perrno)) { return -1; @@ -896,3 +655,58 @@ ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, *pbuf = talloc_move(mem_ctx, &state->buf); return talloc_get_size(*pbuf); } + +struct wait_for_read_state { + struct tevent_req *req; + struct tevent_fd *fde; +}; + +static void wait_for_read_done(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_data); + +struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd) +{ + struct tevent_req *req; + struct wait_for_read_state *state; + + req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state); + if (req == NULL) { + return NULL; + } + state->req = req; + state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, + wait_for_read_done, state); + if (tevent_req_nomem(state->fde, req)) { + return tevent_req_post(req, ev); + } + return req; +} + +static void wait_for_read_done(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_data) +{ + struct wait_for_read_state *state = talloc_get_type_abort( + private_data, struct wait_for_read_state); + + if (flags & TEVENT_FD_READ) { + TALLOC_FREE(state->fde); + tevent_req_done(state->req); + } +} + +bool wait_for_read_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + *perr = err; + return false; + } + return true; +}