X-Git-Url: http://git.samba.org/samba.git/?p=kai%2Fsamba-autobuild%2F.git;a=blobdiff_plain;f=lib%2Fasync_req%2Fasync_sock.c;h=ebcedb5cbd1faae0c7bf9267b7eef72c2e0c7b5e;hp=02ae88068380406366af07c4f6f665f668e31a73;hb=4f05f68abc1d756bb114260e80d3532f3f959fec;hpb=1696298aad3f84b6cd9008bb2684db572b34dfb5
diff --git a/lib/async_req/async_sock.c b/lib/async_req/async_sock.c
index 02ae8806838..ebcedb5cbd1 100644
--- a/lib/async_req/async_sock.c
+++ b/lib/async_req/async_sock.c
@@ -3,771 +3,562 @@
async socket syscalls
Copyright (C) Volker Lendecke 2008
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
+ ** NOTE! The following LGPL license applies to the async_sock
+ ** library. This does NOT imply that all of Samba is released
+ ** under the LGPL
- This program is distributed in the hope that it will be useful,
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 3 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Library General Public License for more details.
- You should have received a copy of the GNU General Public License
+ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see .
*/
-#include "includes.h"
-#include "lib/talloc/talloc.h"
-#include "lib/tevent/tevent.h"
-#include "lib/async_req/async_req.h"
+#include "replace.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include
+#include
#include "lib/async_req/async_sock.h"
-#include
-
-#ifndef TALLOC_FREE
-#define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
-#endif
+#include "lib/util/iov_buf.h"
-/**
- * Discriminator for async_syscall_state
- */
-enum async_syscall_type {
- ASYNC_SYSCALL_SEND,
- ASYNC_SYSCALL_SENDALL,
- ASYNC_SYSCALL_RECV,
- ASYNC_SYSCALL_RECVALL,
- ASYNC_SYSCALL_CONNECT
-};
+/* Note: lib/util/ is currently GPL */
+#include "lib/util/tevent_unix.h"
+#include "lib/util/samba_util.h"
-/**
- * Holder for syscall arguments and the result
- */
-
-struct async_syscall_state {
- enum async_syscall_type syscall_type;
+struct async_connect_state {
+ int fd;
struct tevent_fd *fde;
+ int result;
+ long old_sockflags;
+ socklen_t address_len;
+ struct sockaddr_storage address;
- union {
- struct param_send {
- int fd;
- const void *buffer;
- size_t length;
- int flags;
- } param_send;
- struct param_sendall {
- int fd;
- const void *buffer;
- size_t length;
- int flags;
- size_t sent;
- } param_sendall;
- struct param_recv {
- int fd;
- void *buffer;
- size_t length;
- int flags;
- } param_recv;
- struct param_recvall {
- int fd;
- void *buffer;
- size_t length;
- int flags;
- size_t received;
- } param_recvall;
- struct param_connect {
- /**
- * connect needs to be done on a nonblocking
- * socket. Keep the old flags around
- */
- long old_sockflags;
- int fd;
- const struct sockaddr *address;
- socklen_t address_len;
- } param_connect;
- } param;
-
- union {
- ssize_t result_ssize_t;
- size_t result_size_t;
- int result_int;
- } result;
- int sys_errno;
+ void (*before_connect)(void *private_data);
+ void (*after_connect)(void *private_data);
+ void *private_data;
};
-/**
- * @brief Map async_req states to unix-style errnos
- * @param[in] req The async req to get the state from
- * @param[out] err Pointer to take the unix-style errno
- *
- * @return true if the async_req is in an error state, false otherwise
- */
-
-bool async_req_is_errno(struct async_req *req, int *err)
-{
- enum async_req_state state;
- uint64_t error;
-
- if (!async_req_is_error(req, &state, &error)) {
- return false;
- }
-
- switch (state) {
- case ASYNC_REQ_USER_ERROR:
- *err = (int)error;
- break;
- case ASYNC_REQ_TIMED_OUT:
-#ifdef ETIMEDOUT
- *err = ETIMEDOUT;
-#else
- *err = EAGAIN;
-#endif
- break;
- case ASYNC_REQ_NO_MEMORY:
- *err = ENOMEM;
- break;
- default:
- *err = EIO;
- break;
- }
- return true;
-}
-
-int async_req_simple_recv_errno(struct async_req *req)
-{
- int err;
-
- if (async_req_is_errno(req, &err)) {
- return err;
- }
-
- return 0;
-}
+static void async_connect_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+static void async_connect_connected(struct tevent_context *ev,
+ struct tevent_fd *fde, uint16_t flags,
+ void *priv);
/**
- * @brief Create a new async syscall req
+ * @brief async version of connect(2)
* @param[in] mem_ctx The memory context to hang the result off
* @param[in] ev The event context to work from
- * @param[in] type Which syscall will this be
- * @param[in] pstate Where to put the newly created private_data state
- * @retval The new request
+ * @param[in] fd The socket to recv from
+ * @param[in] address Where to connect?
+ * @param[in] address_len Length of *address
+ * @retval The async request
*
- * This is a helper function to prepare a new struct async_req with an
- * associated struct async_syscall_state. The async_syscall_state will be put
- * into the async_req as private_data.
+ * This function sets the socket into non-blocking state to be able to call
+ * connect in an async state. This will be reset when the request is finished.
*/
-static struct async_req *async_syscall_new(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- enum async_syscall_type type,
- struct async_syscall_state **pstate)
+struct tevent_req *async_connect_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
+ const struct sockaddr *address, socklen_t address_len,
+ void (*before_connect)(void *private_data),
+ void (*after_connect)(void *private_data),
+ void *private_data)
{
- struct async_req *result;
- struct async_syscall_state *state;
+ struct tevent_req *req;
+ struct async_connect_state *state;
- if (!async_req_setup(mem_ctx, &result, &state,
- struct async_syscall_state)) {
+ req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
+ if (req == NULL) {
return NULL;
}
- state->syscall_type = type;
-
- result->private_data = state;
- *pstate = state;
-
- return result;
-}
-
-/**
- * @brief Create a new async syscall req based on a fd
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] type Which syscall will this be
- * @param[in] fd The file descriptor we work on
- * @param[in] fde_flags TEVENT_FD_READ/WRITE -- what are we interested in?
- * @param[in] fde_cb The callback function for the file descriptor event
- * @param[in] pstate Where to put the newly created private_data state
- * @retval The new request
- *
- * This is a helper function to prepare a new struct async_req with an
- * associated struct async_syscall_state and an associated file descriptor
- * event.
- */
+ /**
+ * We have to set the socket to nonblocking for async connect(2). Keep
+ * the old sockflags around.
+ */
-static struct async_req *async_fde_syscall_new(
- TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- enum async_syscall_type type,
- int fd,
- uint16_t fde_flags,
- void (*fde_cb)(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv),
- struct async_syscall_state **pstate)
-{
- struct async_req *result;
- struct async_syscall_state *state;
+ state->fd = fd;
+ state->before_connect = before_connect;
+ state->after_connect = after_connect;
+ state->private_data = private_data;
- result = async_syscall_new(mem_ctx, ev, type, &state);
- if (result == NULL) {
- return NULL;
+ state->old_sockflags = fcntl(fd, F_GETFL, 0);
+ if (state->old_sockflags == -1) {
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, ev);
}
- state->fde = tevent_add_fd(ev, state, fd, fde_flags, fde_cb, result);
- if (state->fde == NULL) {
- TALLOC_FREE(result);
- return NULL;
- }
- *pstate = state;
- return result;
-}
+ tevent_req_set_cleanup_fn(req, async_connect_cleanup);
-/**
- * Retrieve a ssize_t typed result from an async syscall
- * @param[in] req The syscall that has just finished
- * @param[out] perrno Where to put the syscall's errno
- * @retval The return value from the asynchronously called syscall
- */
+ state->address_len = address_len;
+ if (address_len > sizeof(state->address)) {
+ tevent_req_error(req, EINVAL);
+ return tevent_req_post(req, ev);
+ }
+ memcpy(&state->address, address, address_len);
-ssize_t async_syscall_result_ssize_t(struct async_req *req, int *perrno)
-{
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
+ set_blocking(fd, false);
- *perrno = state->sys_errno;
- return state->result.result_ssize_t;
-}
+ if (state->before_connect != NULL) {
+ state->before_connect(state->private_data);
+ }
-/**
- * Retrieve a size_t typed result from an async syscall
- * @param[in] req The syscall that has just finished
- * @param[out] perrno Where to put the syscall's errno
- * @retval The return value from the asynchronously called syscall
- */
+ state->result = connect(fd, address, address_len);
-size_t async_syscall_result_size_t(struct async_req *req, int *perrno)
-{
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
+ if (state->after_connect != NULL) {
+ state->after_connect(state->private_data);
+ }
- *perrno = state->sys_errno;
- return state->result.result_size_t;
-}
+ if (state->result == 0) {
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
-/**
- * Retrieve a int typed result from an async syscall
- * @param[in] req The syscall that has just finished
- * @param[out] perrno Where to put the syscall's errno
- * @retval The return value from the asynchronously called syscall
- */
+ /**
+ * A number of error messages show that something good is progressing
+ * and that we have to wait for readability.
+ *
+ * If none of them are present, bail out.
+ */
-int async_syscall_result_int(struct async_req *req, int *perrno)
-{
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
+ if (!(errno == EINPROGRESS || errno == EALREADY ||
+#ifdef EISCONN
+ errno == EISCONN ||
+#endif
+ errno == EAGAIN || errno == EINTR)) {
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, ev);
+ }
- *perrno = state->sys_errno;
- return state->result.result_int;
+ state->fde = tevent_add_fd(ev, state, fd,
+ TEVENT_FD_READ | TEVENT_FD_WRITE,
+ async_connect_connected, req);
+ if (state->fde == NULL) {
+ tevent_req_error(req, ENOMEM);
+ return tevent_req_post(req, ev);
+ }
+ return req;
}
-/**
- * fde event handler for the "send" syscall
- * @param[in] ev The event context that sent us here
- * @param[in] fde The file descriptor event associated with the send
- * @param[in] flags Can only be TEVENT_FD_WRITE here
- * @param[in] priv private data, "struct async_req *" in this case
- */
-
-static void async_send_callback(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv)
+static void async_connect_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- struct param_send *p = &state->param.param_send;
-
- if (state->syscall_type != ASYNC_SYSCALL_SEND) {
- async_req_error(req, EIO);
- return;
- }
-
- state->result.result_ssize_t = send(p->fd, p->buffer, p->length,
- p->flags);
- state->sys_errno = errno;
+ struct async_connect_state *state =
+ tevent_req_data(req, struct async_connect_state);
TALLOC_FREE(state->fde);
-
- async_req_done(req);
-}
-
-/**
- * Async version of send(2)
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] fd The socket to send to
- * @param[in] buffer The buffer to send
- * @param[in] length How many bytes to send
- * @param[in] flags flags passed to send(2)
- *
- * This function is a direct counterpart of send(2)
- */
-
-struct async_req *async_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
- int fd, const void *buffer, size_t length,
- int flags)
-{
- struct async_req *result;
- struct async_syscall_state *state;
-
- result = async_fde_syscall_new(
- mem_ctx, ev, ASYNC_SYSCALL_SEND,
- fd, TEVENT_FD_WRITE, async_send_callback,
- &state);
- if (result == NULL) {
- return NULL;
+ if (state->fd != -1) {
+ fcntl(state->fd, F_SETFL, state->old_sockflags);
+ state->fd = -1;
}
-
- state->param.param_send.fd = fd;
- state->param.param_send.buffer = buffer;
- state->param.param_send.length = length;
- state->param.param_send.flags = flags;
-
- return result;
}
/**
- * fde event handler for the "sendall" syscall group
+ * fde event handler for connect(2)
* @param[in] ev The event context that sent us here
- * @param[in] fde The file descriptor event associated with the send
- * @param[in] flags Can only be TEVENT_FD_WRITE here
+ * @param[in] fde The file descriptor event associated with the connect
+ * @param[in] flags Indicate read/writeability of the socket
* @param[in] priv private data, "struct async_req *" in this case
*/
-static void async_sendall_callback(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv)
+static void async_connect_connected(struct tevent_context *ev,
+ struct tevent_fd *fde, uint16_t flags,
+ void *priv)
{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- struct param_sendall *p = &state->param.param_sendall;
-
- if (state->syscall_type != ASYNC_SYSCALL_SENDALL) {
- async_req_error(req, EIO);
- return;
+ struct tevent_req *req = talloc_get_type_abort(
+ priv, struct tevent_req);
+ struct async_connect_state *state =
+ tevent_req_data(req, struct async_connect_state);
+ int ret;
+
+ if (state->before_connect != NULL) {
+ state->before_connect(state->private_data);
}
- state->result.result_ssize_t = send(p->fd,
- (const char *)p->buffer + p->sent,
- p->length - p->sent, p->flags);
- state->sys_errno = errno;
+ ret = connect(state->fd, (struct sockaddr *)(void *)&state->address,
+ state->address_len);
- if (state->result.result_ssize_t == -1) {
- async_req_error(req, state->sys_errno);
- return;
+ if (state->after_connect != NULL) {
+ state->after_connect(state->private_data);
}
- if (state->result.result_ssize_t == 0) {
- async_req_error(req, EOF);
+ if (ret == 0) {
+ tevent_req_done(req);
return;
}
-
- p->sent += state->result.result_ssize_t;
- if (p->sent > p->length) {
- async_req_error(req, EIO);
+ if (errno == EINPROGRESS) {
+ /* Try again later, leave the fde around */
return;
}
-
- if (p->sent == p->length) {
- TALLOC_FREE(state->fde);
- async_req_done(req);
- }
-}
-
-/**
- * @brief Send all bytes to a socket
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] fd The socket to send to
- * @param[in] buffer The buffer to send
- * @param[in] length How many bytes to send
- * @param[in] flags flags passed to send(2)
- *
- * async_sendall calls send(2) as long as it is necessary to send all of the
- * "length" bytes
- */
-
-struct async_req *sendall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
- int fd, const void *buffer, size_t length,
- int flags)
-{
- struct async_req *result;
- struct async_syscall_state *state;
-
- result = async_fde_syscall_new(
- mem_ctx, ev, ASYNC_SYSCALL_SENDALL,
- fd, TEVENT_FD_WRITE, async_sendall_callback,
- &state);
- if (result == NULL) {
- return NULL;
- }
-
- state->param.param_sendall.fd = fd;
- state->param.param_sendall.buffer = buffer;
- state->param.param_sendall.length = length;
- state->param.param_sendall.flags = flags;
- state->param.param_sendall.sent = 0;
-
- return result;
+ tevent_req_error(req, errno);
+ return;
}
-ssize_t sendall_recv(struct async_req *req, int *perr)
+int async_connect_recv(struct tevent_req *req, int *perrno)
{
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- int err;
-
- err = async_req_simple_recv_errno(req);
+ int err = tevent_req_simple_recv_unix(req);
if (err != 0) {
- *perr = err;
+ *perrno = err;
return -1;
}
- return state->result.result_ssize_t;
+ return 0;
}
-/**
- * fde event handler for the "recv" syscall
- * @param[in] ev The event context that sent us here
- * @param[in] fde The file descriptor event associated with the recv
- * @param[in] flags Can only be TEVENT_FD_READ here
- * @param[in] priv private data, "struct async_req *" in this case
- */
+struct writev_state {
+ struct tevent_context *ev;
+ int fd;
+ struct tevent_fd *fde;
+ struct iovec *iov;
+ int count;
+ size_t total_size;
+ uint16_t flags;
+ bool err_on_readability;
+};
+
+static void writev_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+static void writev_trigger(struct tevent_req *req, void *private_data);
+static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
-static void async_recv_callback(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv)
+struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct tevent_queue *queue, int fd,
+ bool err_on_readability,
+ struct iovec *iov, int count)
{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- struct param_recv *p = &state->param.param_recv;
-
- if (state->syscall_type != ASYNC_SYSCALL_RECV) {
- async_req_error(req, EIO);
- return;
+ struct tevent_req *req;
+ struct writev_state *state;
+
+ req = tevent_req_create(mem_ctx, &state, struct writev_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->fd = fd;
+ state->total_size = 0;
+ state->count = count;
+ state->iov = (struct iovec *)talloc_memdup(
+ state, iov, sizeof(struct iovec) * count);
+ if (tevent_req_nomem(state->iov, req)) {
+ return tevent_req_post(req, ev);
+ }
+ state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
+ state->err_on_readability = err_on_readability;
+
+ tevent_req_set_cleanup_fn(req, writev_cleanup);
+
+ if (queue == NULL) {
+ state->fde = tevent_add_fd(state->ev, state, state->fd,
+ state->flags, writev_handler, req);
+ if (tevent_req_nomem(state->fde, req)) {
+ return tevent_req_post(req, ev);
+ }
+ return req;
}
- state->result.result_ssize_t = recv(p->fd, p->buffer, p->length,
- p->flags);
- state->sys_errno = errno;
+ if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
+ tevent_req_nomem(NULL, req);
+ return tevent_req_post(req, ev);
+ }
+ return req;
+}
- TALLOC_FREE(state->fde);
+static void writev_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct writev_state *state = tevent_req_data(req, struct writev_state);
- async_req_done(req);
+ TALLOC_FREE(state->fde);
}
-/**
- * Async version of recv(2)
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] fd The socket to recv from
- * @param[in] buffer The buffer to recv into
- * @param[in] length How many bytes to recv
- * @param[in] flags flags passed to recv(2)
- *
- * This function is a direct counterpart of recv(2)
- */
-
-struct async_req *async_recv(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
- int fd, void *buffer, size_t length,
- int flags)
+static void writev_trigger(struct tevent_req *req, void *private_data)
{
- struct async_req *result;
- struct async_syscall_state *state;
+ struct writev_state *state = tevent_req_data(req, struct writev_state);
- result = async_fde_syscall_new(
- mem_ctx, ev, ASYNC_SYSCALL_RECV,
- fd, TEVENT_FD_READ, async_recv_callback,
- &state);
-
- if (result == NULL) {
- return NULL;
+ state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
+ writev_handler, req);
+ if (tevent_req_nomem(state->fde, req)) {
+ return;
}
-
- state->param.param_recv.fd = fd;
- state->param.param_recv.buffer = buffer;
- state->param.param_recv.length = length;
- state->param.param_recv.flags = flags;
-
- return result;
}
-/**
- * fde event handler for the "recvall" syscall group
- * @param[in] ev The event context that sent us here
- * @param[in] fde The file descriptor event associated with the recv
- * @param[in] flags Can only be TEVENT_FD_READ here
- * @param[in] priv private data, "struct async_req *" in this case
- */
-
-static void async_recvall_callback(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv)
+static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- struct param_recvall *p = &state->param.param_recvall;
-
- if (state->syscall_type != ASYNC_SYSCALL_RECVALL) {
- async_req_error(req, EIO);
- return;
- }
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct writev_state *state =
+ tevent_req_data(req, struct writev_state);
+ size_t written;
+ bool ok;
+
+ if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
+ int ret, value;
+
+ if (state->err_on_readability) {
+ /* Readable and the caller wants an error on read. */
+ tevent_req_error(req, EPIPE);
+ return;
+ }
- state->result.result_ssize_t = recv(p->fd,
- (char *)p->buffer + p->received,
- p->length - p->received, p->flags);
- state->sys_errno = errno;
+ /* Might be an error. Check if there are bytes to read */
+ ret = ioctl(state->fd, FIONREAD, &value);
+ /* FIXME - should we also check
+ for ret == 0 and value == 0 here ? */
+ if (ret == -1) {
+ /* There's an error. */
+ tevent_req_error(req, EPIPE);
+ return;
+ }
+ /* A request for TEVENT_FD_READ will succeed from now and
+ forevermore until the bytes are read so if there was
+ an error we'll wait until we do read, then get it in
+ the read callback function. Until then, remove TEVENT_FD_READ
+ from the flags we're waiting for. */
+ state->flags &= ~TEVENT_FD_READ;
+ TEVENT_FD_NOT_READABLE(fde);
+
+ /* If not writable, we're done. */
+ if (!(flags & TEVENT_FD_WRITE)) {
+ return;
+ }
+ }
- if (state->result.result_ssize_t == -1) {
- async_req_error(req, state->sys_errno);
+ written = writev(state->fd, state->iov, state->count);
+ if ((written == -1) && (errno == EINTR)) {
+ /* retry */
return;
}
-
- if (state->result.result_ssize_t == 0) {
- async_req_error(req, EIO);
+ if (written == -1) {
+ tevent_req_error(req, errno);
return;
}
-
- p->received += state->result.result_ssize_t;
- if (p->received > p->length) {
- async_req_error(req, EIO);
+ if (written == 0) {
+ tevent_req_error(req, EPIPE);
return;
}
+ state->total_size += written;
- if (p->received == p->length) {
- TALLOC_FREE(state->fde);
- async_req_done(req);
+ ok = iov_advance(&state->iov, &state->count, written);
+ if (!ok) {
+ tevent_req_error(req, EIO);
+ return;
}
-}
-/**
- * Receive a specified number of bytes from a socket
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] fd The socket to recv from
- * @param[in] buffer The buffer to recv into
- * @param[in] length How many bytes to recv
- * @param[in] flags flags passed to recv(2)
- *
- * async_recvall will call recv(2) until "length" bytes are received
- */
-
-struct async_req *recvall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
- int fd, void *buffer, size_t length,
- int flags)
-{
- struct async_req *result;
- struct async_syscall_state *state;
-
- result = async_fde_syscall_new(
- mem_ctx, ev, ASYNC_SYSCALL_RECVALL,
- fd, TEVENT_FD_READ, async_recvall_callback,
- &state);
- if (result == NULL) {
- return NULL;
+ if (state->count == 0) {
+ tevent_req_done(req);
+ return;
}
-
- state->param.param_recvall.fd = fd;
- state->param.param_recvall.buffer = buffer;
- state->param.param_recvall.length = length;
- state->param.param_recvall.flags = flags;
- state->param.param_recvall.received = 0;
-
- return result;
}
-ssize_t recvall_recv(struct async_req *req, int *perr)
+ssize_t writev_recv(struct tevent_req *req, int *perrno)
{
- struct async_syscall_state *state = talloc_get_type_abort(
- req->private_data, struct async_syscall_state);
- int err;
-
- err = async_req_simple_recv_errno(req);
+ struct writev_state *state =
+ tevent_req_data(req, struct writev_state);
+ ssize_t ret;
- if (err != 0) {
- *perr = err;
+ if (tevent_req_is_unix_error(req, perrno)) {
+ tevent_req_received(req);
return -1;
}
-
- return state->result.result_ssize_t;
+ ret = state->total_size;
+ tevent_req_received(req);
+ return ret;
}
-struct async_connect_state {
+struct read_packet_state {
int fd;
- int result;
- int sys_errno;
- long old_sockflags;
+ uint8_t *buf;
+ size_t nread;
+ ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
+ void *private_data;
};
-static void async_connect_connected(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv);
-
-/**
- * @brief async version of connect(2)
- * @param[in] mem_ctx The memory context to hang the result off
- * @param[in] ev The event context to work from
- * @param[in] fd The socket to recv from
- * @param[in] address Where to connect?
- * @param[in] address_len Length of *address
- * @retval The async request
- *
- * This function sets the socket into non-blocking state to be able to call
- * connect in an async state. This will be reset when the request is finished.
- */
-
-struct async_req *async_connect_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- int fd, const struct sockaddr *address,
- socklen_t address_len)
+static void read_packet_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, size_t initial,
+ ssize_t (*more)(uint8_t *buf,
+ size_t buflen,
+ void *private_data),
+ void *private_data)
{
- struct async_req *result;
- struct async_connect_state *state;
+ struct tevent_req *req;
+ struct read_packet_state *state;
struct tevent_fd *fde;
- if (!async_req_setup(mem_ctx, &result, &state,
- struct async_connect_state)) {
+ req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
+ if (req == NULL) {
return NULL;
}
-
- /**
- * We have to set the socket to nonblocking for async connect(2). Keep
- * the old sockflags around.
- */
-
state->fd = fd;
- state->sys_errno = 0;
+ state->nread = 0;
+ state->more = more;
+ state->private_data = private_data;
- state->old_sockflags = fcntl(fd, F_GETFL, 0);
- if (state->old_sockflags == -1) {
- goto post_errno;
- }
-
- set_blocking(fd, false);
-
- state->result = connect(fd, address, address_len);
- if (state->result == 0) {
- state->sys_errno = 0;
- goto post_status;
- }
-
- /**
- * A number of error messages show that something good is progressing
- * and that we have to wait for readability.
- *
- * If none of them are present, bail out.
- */
-
- if (!(errno == EINPROGRESS || errno == EALREADY ||
-#ifdef EISCONN
- errno == EISCONN ||
-#endif
- errno == EAGAIN || errno == EINTR)) {
- goto post_errno;
+ state->buf = talloc_array(state, uint8_t, initial);
+ if (state->buf == NULL) {
+ goto fail;
}
- fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
- async_connect_connected, result);
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
+ req);
if (fde == NULL) {
- state->sys_errno = ENOMEM;
- goto post_status;
- }
- return result;
-
- post_errno:
- state->sys_errno = errno;
- post_status:
- fcntl(fd, F_SETFL, state->old_sockflags);
- if (!async_post_error(result, ev, state->sys_errno)) {
goto fail;
}
- return result;
+ return req;
fail:
- TALLOC_FREE(result);
+ TALLOC_FREE(req);
return NULL;
}
-/**
- * fde event handler for connect(2)
- * @param[in] ev The event context that sent us here
- * @param[in] fde The file descriptor event associated with the connect
- * @param[in] flags Indicate read/writeability of the socket
- * @param[in] priv private data, "struct async_req *" in this case
- */
-
-static void async_connect_connected(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *priv)
+static void read_packet_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
{
- struct async_req *req = talloc_get_type_abort(
- priv, struct async_req);
- struct async_connect_state *state = talloc_get_type_abort(
- req->private_data, struct async_connect_state);
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct read_packet_state *state =
+ tevent_req_data(req, struct read_packet_state);
+ size_t total = talloc_get_size(state->buf);
+ ssize_t nread, more;
+ uint8_t *tmp;
+
+ nread = recv(state->fd, state->buf+state->nread, total-state->nread,
+ 0);
+ if ((nread == -1) && (errno == ENOTSOCK)) {
+ nread = read(state->fd, state->buf+state->nread,
+ total-state->nread);
+ }
+ if ((nread == -1) && (errno == EINTR)) {
+ /* retry */
+ return;
+ }
+ if (nread == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
+ if (nread == 0) {
+ tevent_req_error(req, EPIPE);
+ return;
+ }
- TALLOC_FREE(fde);
+ state->nread += nread;
+ if (state->nread < total) {
+ /* Come back later */
+ return;
+ }
/*
- * Stevens, Network Programming says that if there's a
- * successful connect, the socket is only writable. Upon an
- * error, it's both readable and writable.
+ * We got what was initially requested. See if "more" asks for -- more.
*/
- if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
- == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
- int sockerr;
- socklen_t err_len = sizeof(sockerr);
-
- if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
- (void *)&sockerr, &err_len) == 0) {
- errno = sockerr;
- }
-
- state->sys_errno = errno;
+ if (state->more == NULL) {
+ /* Nobody to ask, this is a async read_data */
+ tevent_req_done(req);
+ return;
+ }
- DEBUG(10, ("connect returned %s\n", strerror(errno)));
+ more = state->more(state->buf, total, state->private_data);
+ if (more == -1) {
+ /* We got an invalid packet, tell the caller */
+ tevent_req_error(req, EIO);
+ return;
+ }
+ if (more == 0) {
+ /* We're done, full packet received */
+ tevent_req_done(req);
+ return;
+ }
- fcntl(state->fd, F_SETFL, state->old_sockflags);
- async_req_error(req, state->sys_errno);
+ if (total + more < total) {
+ tevent_req_error(req, EMSGSIZE);
return;
}
- state->sys_errno = 0;
- async_req_done(req);
+ tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
+ if (tevent_req_nomem(tmp, req)) {
+ return;
+ }
+ state->buf = tmp;
}
-int async_connect_recv(struct async_req *req, int *perrno)
+ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+ uint8_t **pbuf, int *perrno)
{
- struct async_connect_state *state = talloc_get_type_abort(
- req->private_data, struct async_connect_state);
- int err;
+ struct read_packet_state *state =
+ tevent_req_data(req, struct read_packet_state);
+
+ if (tevent_req_is_unix_error(req, perrno)) {
+ return -1;
+ }
+ *pbuf = talloc_move(mem_ctx, &state->buf);
+ return talloc_get_size(*pbuf);
+}
+
+struct wait_for_read_state {
+ struct tevent_req *req;
+ struct tevent_fd *fde;
+};
- fcntl(state->fd, F_SETFL, state->old_sockflags);
+static void wait_for_read_done(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data);
+struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd)
+{
+ struct tevent_req *req;
+ struct wait_for_read_state *state;
- if (async_req_is_errno(req, &err)) {
- *perrno = err;
- return -1;
+ req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->req = req;
+ state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
+ wait_for_read_done, state);
+ if (tevent_req_nomem(state->fde, req)) {
+ return tevent_req_post(req, ev);
}
- if (state->sys_errno == 0) {
- return 0;
+ return req;
+}
+
+static void wait_for_read_done(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data)
+{
+ struct wait_for_read_state *state = talloc_get_type_abort(
+ private_data, struct wait_for_read_state);
+
+ if (flags & TEVENT_FD_READ) {
+ TALLOC_FREE(state->fde);
+ tevent_req_done(state->req);
}
+}
+
+bool wait_for_read_recv(struct tevent_req *req, int *perr)
+{
+ int err;
- *perrno = state->sys_errno;
- return -1;
+ if (tevent_req_is_unix_error(req, &err)) {
+ *perr = err;
+ return false;
+ }
+ return true;
}