tsocket/bsd: more correctly check if the cached tevent_fd is still valid
[ira/wip.git] / lib / async_req / async_sock.c
index 1f48697f26f825a729c618165b36f0846de4fd9e..f5a0dfde708bd8b14065dcdbc09ad9e642a1fd3a 100644 (file)
@@ -3,81 +3,39 @@
    async socket syscalls
    Copyright (C) Volker Lendecke 2008
 
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 3 of the License, or
-   (at your option) any later version.
+     ** NOTE! The following LGPL license applies to the async_sock
+     ** library. This does NOT imply that all of Samba is released
+     ** under the LGPL
 
-   This program is distributed in the hope that it will be useful,
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 3 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Library General Public License for more details.
 
-   You should have received a copy of the GNU General Public License
+   You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
-#include "lib/talloc/talloc.h"
-#include "lib/tevent/tevent.h"
-#include "lib/async_req/async_req.h"
+#include "replace.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include <talloc.h>
+#include <tevent.h>
 #include "lib/async_req/async_sock.h"
+
+/* Note: lib/util/ is currently GPL */
 #include "lib/util/tevent_unix.h"
-#include <fcntl.h>
+#include "lib/util/util.h"
 
 #ifndef TALLOC_FREE
 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
 #endif
 
-/**
- * @brief Map async_req states to unix-style errnos
- * @param[in]  req     The async req to get the state from
- * @param[out] err     Pointer to take the unix-style errno
- *
- * @return true if the async_req is in an error state, false otherwise
- */
-
-bool async_req_is_errno(struct async_req *req, int *err)
-{
-       enum async_req_state state;
-       uint64_t error;
-
-       if (!async_req_is_error(req, &state, &error)) {
-               return false;
-       }
-
-       switch (state) {
-       case ASYNC_REQ_USER_ERROR:
-               *err = (int)error;
-               break;
-       case ASYNC_REQ_TIMED_OUT:
-#ifdef ETIMEDOUT
-               *err = ETIMEDOUT;
-#else
-               *err = EAGAIN;
-#endif
-               break;
-       case ASYNC_REQ_NO_MEMORY:
-               *err = ENOMEM;
-               break;
-       default:
-               *err = EIO;
-               break;
-       }
-       return true;
-}
-
-int async_req_simple_recv_errno(struct async_req *req)
-{
-       int err;
-
-       if (async_req_is_errno(req, &err)) {
-               return err;
-       }
-
-       return 0;
-}
-
 struct async_send_state {
        int fd;
        const void *buf;
@@ -123,10 +81,14 @@ static void async_send_handler(struct tevent_context *ev,
 {
        struct tevent_req *req = talloc_get_type_abort(
                private_data, struct tevent_req);
-       struct async_send_state *state = talloc_get_type_abort(
-               req->private_state, struct async_send_state);
+       struct async_send_state *state =
+               tevent_req_data(req, struct async_send_state);
 
        state->sent = send(state->fd, state->buf, state->len, state->flags);
+       if ((state->sent == -1) && (errno == EINTR)) {
+               /* retry */
+               return;
+       }
        if (state->sent == -1) {
                tevent_req_error(req, errno);
                return;
@@ -136,8 +98,8 @@ static void async_send_handler(struct tevent_context *ev,
 
 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
 {
-       struct async_send_state *state = talloc_get_type_abort(
-               req->private_state, struct async_send_state);
+       struct async_send_state *state =
+               tevent_req_data(req, struct async_send_state);
 
        if (tevent_req_is_unix_error(req, perrno)) {
                return -1;
@@ -189,11 +151,15 @@ static void async_recv_handler(struct tevent_context *ev,
 {
        struct tevent_req *req = talloc_get_type_abort(
                private_data, struct tevent_req);
-       struct async_recv_state *state = talloc_get_type_abort(
-               req->private_state, struct async_recv_state);
+       struct async_recv_state *state =
+               tevent_req_data(req, struct async_recv_state);
 
        state->received = recv(state->fd, state->buf, state->len,
                               state->flags);
+       if ((state->received == -1) && (errno == EINTR)) {
+               /* retry */
+               return;
+       }
        if (state->received == -1) {
                tevent_req_error(req, errno);
                return;
@@ -203,8 +169,8 @@ static void async_recv_handler(struct tevent_context *ev,
 
 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
 {
-       struct async_recv_state *state = talloc_get_type_abort(
-               req->private_state, struct async_recv_state);
+       struct async_recv_state *state =
+               tevent_req_data(req, struct async_recv_state);
 
        if (tevent_req_is_unix_error(req, perrno)) {
                return -1;
@@ -217,6 +183,8 @@ struct async_connect_state {
        int result;
        int sys_errno;
        long old_sockflags;
+       socklen_t address_len;
+       struct sockaddr_storage address;
 };
 
 static void async_connect_connected(struct tevent_context *ev,
@@ -264,6 +232,13 @@ struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
                goto post_errno;
        }
 
+       state->address_len = address_len;
+       if (address_len > sizeof(state->address)) {
+               errno = EINVAL;
+               goto post_errno;
+       }
+       memcpy(&state->address, address, address_len);
+
        set_blocking(fd, false);
 
        state->result = connect(fd, address, address_len);
@@ -317,10 +292,8 @@ static void async_connect_connected(struct tevent_context *ev,
 {
        struct tevent_req *req = talloc_get_type_abort(
                priv, struct tevent_req);
-       struct async_connect_state *state = talloc_get_type_abort(
-               req->private_state, struct async_connect_state);
-
-       TALLOC_FREE(fde);
+       struct async_connect_state *state =
+               tevent_req_data(req, struct async_connect_state);
 
        /*
         * Stevens, Network Programming says that if there's a
@@ -329,20 +302,23 @@ static void async_connect_connected(struct tevent_context *ev,
         */
        if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
            == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
-               int sockerr;
-               socklen_t err_len = sizeof(sockerr);
-
-               if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
-                              (void *)&sockerr, &err_len) == 0) {
-                       errno = sockerr;
+               int ret;
+
+               ret = connect(state->fd,
+                             (struct sockaddr *)(void *)&state->address,
+                             state->address_len);
+               if (ret == 0) {
+                       TALLOC_FREE(fde);
+                       tevent_req_done(req);
+                       return;
                }
 
-               state->sys_errno = errno;
-
-               DEBUG(10, ("connect returned %s\n", strerror(errno)));
-
-               fcntl(state->fd, F_SETFL, state->old_sockflags);
-               tevent_req_error(req, state->sys_errno);
+               if (errno == EINPROGRESS) {
+                       /* Try again later, leave the fde around */
+                       return;
+               }
+               TALLOC_FREE(fde);
+               tevent_req_error(req, errno);
                return;
        }
 
@@ -352,8 +328,8 @@ static void async_connect_connected(struct tevent_context *ev,
 
 int async_connect_recv(struct tevent_req *req, int *perrno)
 {
-       struct async_connect_state *state = talloc_get_type_abort(
-               req->private_state, struct async_connect_state);
+       struct async_connect_state *state =
+               tevent_req_data(req, struct async_connect_state);
        int err;
 
        fcntl(state->fd, F_SETFL, state->old_sockflags);
@@ -377,20 +353,23 @@ struct writev_state {
        struct iovec *iov;
        int count;
        size_t total_size;
+       uint16_t flags;
 };
 
+static void writev_trigger(struct tevent_req *req, void *private_data);
 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
                           uint16_t flags, void *private_data);
 
 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
-                              int fd, struct iovec *iov, int count)
+                              struct tevent_queue *queue, int fd,
+                              bool err_on_readability,
+                              struct iovec *iov, int count)
 {
-       struct tevent_req *result;
+       struct tevent_req *req;
        struct writev_state *state;
-       struct tevent_fd *fde;
 
-       result = tevent_req_create(mem_ctx, &state, struct writev_state);
-       if (result == NULL) {
+       req = tevent_req_create(mem_ctx, &state, struct writev_state);
+       if (req == NULL) {
                return NULL;
        }
        state->ev = ev;
@@ -402,36 +381,68 @@ struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
        if (state->iov == NULL) {
                goto fail;
        }
+       state->flags = TEVENT_FD_WRITE;
+       if (err_on_readability) {
+               state->flags |= TEVENT_FD_READ;
+       }
 
-       fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, writev_handler,
-                           result);
-       if (fde == NULL) {
-               goto fail;
+       if (queue == NULL) {
+               struct tevent_fd *fde;
+               fde = tevent_add_fd(state->ev, state, state->fd,
+                                   state->flags, writev_handler, req);
+               if (tevent_req_nomem(fde, req)) {
+                       return tevent_req_post(req, ev);
+               }
+               return req;
        }
-       return result;
 
+       if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
+               goto fail;
+       }
+       return req;
  fail:
-       TALLOC_FREE(result);
+       TALLOC_FREE(req);
        return NULL;
 }
 
+static void writev_trigger(struct tevent_req *req, void *private_data)
+{
+       struct writev_state *state = tevent_req_data(req, struct writev_state);
+       struct tevent_fd *fde;
+
+       fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
+                           writev_handler, req);
+       if (fde == NULL) {
+               tevent_req_error(req, ENOMEM);
+       }
+}
+
 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
                           uint16_t flags, void *private_data)
 {
        struct tevent_req *req = talloc_get_type_abort(
                private_data, struct tevent_req);
-       struct writev_state *state = talloc_get_type_abort(
-               req->private_state, struct writev_state);
+       struct writev_state *state =
+               tevent_req_data(req, struct writev_state);
        size_t to_write, written;
        int i;
 
        to_write = 0;
 
+       if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
+               tevent_req_error(req, EPIPE);
+               return;
+       }
+
        for (i=0; i<state->count; i++) {
                to_write += state->iov[i].iov_len;
        }
 
-       written = sys_writev(state->fd, state->iov, state->count);
+       written = writev(state->fd, state->iov, state->count);
+       if ((written == -1) && (errno == EINTR)) {
+               /* retry */
+               return;
+       }
        if (written == -1) {
                tevent_req_error(req, errno);
                return;
@@ -459,7 +470,7 @@ static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
                        state->iov[0].iov_len -= written;
                        break;
                }
-               written = state->iov[0].iov_len;
+               written -= state->iov[0].iov_len;
                state->iov += 1;
                state->count -= 1;
        }
@@ -467,8 +478,8 @@ static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
 
 ssize_t writev_recv(struct tevent_req *req, int *perrno)
 {
-       struct writev_state *state = talloc_get_type_abort(
-               req->private_state, struct writev_state);
+       struct writev_state *state =
+               tevent_req_data(req, struct writev_state);
 
        if (tevent_req_is_unix_error(req, perrno)) {
                return -1;
@@ -531,13 +542,18 @@ static void read_packet_handler(struct tevent_context *ev,
 {
        struct tevent_req *req = talloc_get_type_abort(
                private_data, struct tevent_req);
-       struct read_packet_state *state = talloc_get_type_abort(
-               req->private_state, struct read_packet_state);
+       struct read_packet_state *state =
+               tevent_req_data(req, struct read_packet_state);
        size_t total = talloc_get_size(state->buf);
        ssize_t nread, more;
        uint8_t *tmp;
 
-       nread = read(state->fd, state->buf+state->nread, total-state->nread);
+       nread = recv(state->fd, state->buf+state->nread, total-state->nread,
+                    0);
+       if ((nread == -1) && (errno == EINTR)) {
+               /* retry */
+               return;
+       }
        if (nread == -1) {
                tevent_req_error(req, errno);
                return;
@@ -574,7 +590,7 @@ static void read_packet_handler(struct tevent_context *ev,
                return;
        }
 
-       tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
+       tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
        if (tevent_req_nomem(tmp, req)) {
                return;
        }
@@ -584,8 +600,8 @@ static void read_packet_handler(struct tevent_context *ev,
 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
                         uint8_t **pbuf, int *perrno)
 {
-       struct read_packet_state *state = talloc_get_type_abort(
-               req->private_state, struct read_packet_state);
+       struct read_packet_state *state =
+               tevent_req_data(req, struct read_packet_state);
 
        if (tevent_req_is_unix_error(req, perrno)) {
                return -1;