async socket syscalls
Copyright (C) Volker Lendecke 2008
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
+ ** NOTE! The following LGPL license applies to the async_sock
+ ** library. This does NOT imply that all of Samba is released
+ ** under the LGPL
- This program is distributed in the hope that it will be useful,
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 3 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Library General Public License for more details.
- You should have received a copy of the GNU General Public License
+ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "includes.h"
-#include "lib/talloc/talloc.h"
-#include "lib/tevent/tevent.h"
-#include "lib/async_req/async_req.h"
+#include "replace.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include <talloc.h>
+#include <tevent.h>
#include "lib/async_req/async_sock.h"
+
+/* Note: lib/util/ is currently GPL */
#include "lib/util/tevent_unix.h"
-#include <fcntl.h>
+#include "lib/util/util.h"
#ifndef TALLOC_FREE
#define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
#endif
-/**
- * @brief Map async_req states to unix-style errnos
- * @param[in] req The async req to get the state from
- * @param[out] err Pointer to take the unix-style errno
- *
- * @return true if the async_req is in an error state, false otherwise
- */
-
-bool async_req_is_errno(struct async_req *req, int *err)
-{
- enum async_req_state state;
- uint64_t error;
-
- if (!async_req_is_error(req, &state, &error)) {
- return false;
- }
-
- switch (state) {
- case ASYNC_REQ_USER_ERROR:
- *err = (int)error;
- break;
- case ASYNC_REQ_TIMED_OUT:
-#ifdef ETIMEDOUT
- *err = ETIMEDOUT;
-#else
- *err = EAGAIN;
-#endif
- break;
- case ASYNC_REQ_NO_MEMORY:
- *err = ENOMEM;
- break;
- default:
- *err = EIO;
- break;
- }
- return true;
-}
-
-int async_req_simple_recv_errno(struct async_req *req)
-{
- int err;
-
- if (async_req_is_errno(req, &err)) {
- return err;
- }
-
- return 0;
-}
-
-struct async_send_state {
+struct sendto_state {
int fd;
const void *buf;
size_t len;
int flags;
+ const struct sockaddr_storage *addr;
+ socklen_t addr_len;
ssize_t sent;
};
-static void async_send_handler(struct tevent_context *ev,
+static void sendto_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data);
-struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- int fd, const void *buf, size_t len,
- int flags)
+struct tevent_req *sendto_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ int fd, const void *buf, size_t len, int flags,
+ const struct sockaddr_storage *addr)
{
struct tevent_req *result;
- struct async_send_state *state;
+ struct sendto_state *state;
struct tevent_fd *fde;
- result = tevent_req_create(mem_ctx, &state, struct async_send_state);
+ result = tevent_req_create(mem_ctx, &state, struct sendto_state);
if (result == NULL) {
return result;
}
state->buf = buf;
state->len = len;
state->flags = flags;
+ state->addr = addr;
+
+ switch (addr->ss_family) {
+ case AF_INET:
+ state->addr_len = sizeof(struct sockaddr_in);
+ break;
+#if defined(HAVE_IPV6)
+ case AF_INET6:
+ state->addr_len = sizeof(struct sockaddr_in6);
+ break;
+#endif
+ case AF_UNIX:
+ state->addr_len = sizeof(struct sockaddr_un);
+ break;
+ default:
+ state->addr_len = sizeof(struct sockaddr_storage);
+ break;
+ }
- fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, sendto_handler,
result);
if (fde == NULL) {
TALLOC_FREE(result);
return result;
}
-static void async_send_handler(struct tevent_context *ev,
+static void sendto_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
- struct async_send_state *state = talloc_get_type_abort(
- req->private_state, struct async_send_state);
-
- state->sent = send(state->fd, state->buf, state->len, state->flags);
+ struct sendto_state *state =
+ tevent_req_data(req, struct sendto_state);
+
+ state->sent = sendto(state->fd, state->buf, state->len, state->flags,
+ (const struct sockaddr *)state->addr,
+ state->addr_len);
+ if ((state->sent == -1) && (errno == EINTR)) {
+ /* retry */
+ return;
+ }
if (state->sent == -1) {
tevent_req_error(req, errno);
return;
tevent_req_done(req);
}
-ssize_t async_send_recv(struct tevent_req *req, int *perrno)
+ssize_t sendto_recv(struct tevent_req *req, int *perrno)
{
- struct async_send_state *state = talloc_get_type_abort(
- req->private_state, struct async_send_state);
+ struct sendto_state *state =
+ tevent_req_data(req, struct sendto_state);
if (tevent_req_is_unix_error(req, perrno)) {
return -1;
return state->sent;
}
-struct async_recv_state {
+struct recvfrom_state {
int fd;
void *buf;
size_t len;
int flags;
+ struct sockaddr_storage *addr;
+ socklen_t *addr_len;
ssize_t received;
};
-static void async_recv_handler(struct tevent_context *ev,
+static void recvfrom_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data);
-struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- int fd, void *buf, size_t len, int flags)
+struct tevent_req *recvfrom_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd, void *buf, size_t len, int flags,
+ struct sockaddr_storage *addr,
+ socklen_t *addr_len)
{
struct tevent_req *result;
- struct async_recv_state *state;
+ struct recvfrom_state *state;
struct tevent_fd *fde;
- result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
+ result = tevent_req_create(mem_ctx, &state, struct recvfrom_state);
if (result == NULL) {
return result;
}
state->buf = buf;
state->len = len;
state->flags = flags;
+ state->addr = addr;
+ state->addr_len = addr_len;
- fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
+ fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, recvfrom_handler,
result);
if (fde == NULL) {
TALLOC_FREE(result);
return result;
}
-static void async_recv_handler(struct tevent_context *ev,
+static void recvfrom_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
- struct async_recv_state *state = talloc_get_type_abort(
- req->private_state, struct async_recv_state);
-
- state->received = recv(state->fd, state->buf, state->len,
- state->flags);
+ struct recvfrom_state *state =
+ tevent_req_data(req, struct recvfrom_state);
+
+ state->received = recvfrom(state->fd, state->buf, state->len,
+ state->flags, (struct sockaddr *)state->addr,
+ state->addr_len);
+ if ((state->received == -1) && (errno == EINTR)) {
+ /* retry */
+ return;
+ }
+ if (state->received == 0) {
+ tevent_req_error(req, EPIPE);
+ return;
+ }
if (state->received == -1) {
tevent_req_error(req, errno);
return;
tevent_req_done(req);
}
-ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
+ssize_t recvfrom_recv(struct tevent_req *req, int *perrno)
{
- struct async_recv_state *state = talloc_get_type_abort(
- req->private_state, struct async_recv_state);
+ struct recvfrom_state *state =
+ tevent_req_data(req, struct recvfrom_state);
if (tevent_req_is_unix_error(req, perrno)) {
return -1;
int result;
int sys_errno;
long old_sockflags;
+ socklen_t address_len;
+ struct sockaddr_storage address;
};
static void async_connect_connected(struct tevent_context *ev,
goto post_errno;
}
+ state->address_len = address_len;
+ if (address_len > sizeof(state->address)) {
+ errno = EINVAL;
+ goto post_errno;
+ }
+ memcpy(&state->address, address, address_len);
+
set_blocking(fd, false);
state->result = connect(fd, address, address_len);
{
struct tevent_req *req = talloc_get_type_abort(
priv, struct tevent_req);
- struct async_connect_state *state = talloc_get_type_abort(
- req->private_state, struct async_connect_state);
-
- TALLOC_FREE(fde);
+ struct async_connect_state *state =
+ tevent_req_data(req, struct async_connect_state);
/*
* Stevens, Network Programming says that if there's a
*/
if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
== (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
- int sockerr;
- socklen_t err_len = sizeof(sockerr);
-
- if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
- (void *)&sockerr, &err_len) == 0) {
- errno = sockerr;
+ int ret;
+
+ ret = connect(state->fd,
+ (struct sockaddr *)(void *)&state->address,
+ state->address_len);
+ if (ret == 0) {
+ TALLOC_FREE(fde);
+ tevent_req_done(req);
+ return;
}
- state->sys_errno = errno;
-
- DEBUG(10, ("connect returned %s\n", strerror(errno)));
-
- fcntl(state->fd, F_SETFL, state->old_sockflags);
- tevent_req_error(req, state->sys_errno);
+ if (errno == EINPROGRESS) {
+ /* Try again later, leave the fde around */
+ return;
+ }
+ TALLOC_FREE(fde);
+ tevent_req_error(req, errno);
return;
}
int async_connect_recv(struct tevent_req *req, int *perrno)
{
- struct async_connect_state *state = talloc_get_type_abort(
- req->private_state, struct async_connect_state);
+ struct async_connect_state *state =
+ tevent_req_data(req, struct async_connect_state);
int err;
fcntl(state->fd, F_SETFL, state->old_sockflags);
struct iovec *iov;
int count;
size_t total_size;
+ uint16_t flags;
};
+static void writev_trigger(struct tevent_req *req, void *private_data);
static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
uint16_t flags, void *private_data);
struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
- int fd, struct iovec *iov, int count)
+ struct tevent_queue *queue, int fd,
+ bool err_on_readability,
+ struct iovec *iov, int count)
{
- struct tevent_req *result;
+ struct tevent_req *req;
struct writev_state *state;
- struct tevent_fd *fde;
- result = tevent_req_create(mem_ctx, &state, struct writev_state);
- if (result == NULL) {
+ req = tevent_req_create(mem_ctx, &state, struct writev_state);
+ if (req == NULL) {
return NULL;
}
state->ev = ev;
if (state->iov == NULL) {
goto fail;
}
+ state->flags = TEVENT_FD_WRITE;
+ if (err_on_readability) {
+ state->flags |= TEVENT_FD_READ;
+ }
- fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, writev_handler,
- result);
- if (fde == NULL) {
- goto fail;
+ if (queue == NULL) {
+ struct tevent_fd *fde;
+ fde = tevent_add_fd(state->ev, state, state->fd,
+ state->flags, writev_handler, req);
+ if (tevent_req_nomem(fde, req)) {
+ return tevent_req_post(req, ev);
+ }
+ return req;
}
- return result;
+ if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
+ goto fail;
+ }
+ return req;
fail:
- TALLOC_FREE(result);
+ TALLOC_FREE(req);
return NULL;
}
+static void writev_trigger(struct tevent_req *req, void *private_data)
+{
+ struct writev_state *state = tevent_req_data(req, struct writev_state);
+ struct tevent_fd *fde;
+
+ fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
+ writev_handler, req);
+ if (fde == NULL) {
+ tevent_req_error(req, ENOMEM);
+ }
+}
+
static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
uint16_t flags, void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
- struct writev_state *state = talloc_get_type_abort(
- req->private_state, struct writev_state);
+ struct writev_state *state =
+ tevent_req_data(req, struct writev_state);
size_t to_write, written;
int i;
to_write = 0;
+ if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
+ tevent_req_error(req, EPIPE);
+ return;
+ }
+
for (i=0; i<state->count; i++) {
to_write += state->iov[i].iov_len;
}
- written = sys_writev(state->fd, state->iov, state->count);
+ written = writev(state->fd, state->iov, state->count);
+ if ((written == -1) && (errno == EINTR)) {
+ /* retry */
+ return;
+ }
if (written == -1) {
tevent_req_error(req, errno);
return;
state->iov[0].iov_len -= written;
break;
}
- written = state->iov[0].iov_len;
+ written -= state->iov[0].iov_len;
state->iov += 1;
state->count -= 1;
}
ssize_t writev_recv(struct tevent_req *req, int *perrno)
{
- struct writev_state *state = talloc_get_type_abort(
- req->private_state, struct writev_state);
+ struct writev_state *state =
+ tevent_req_data(req, struct writev_state);
if (tevent_req_is_unix_error(req, perrno)) {
return -1;
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
- struct read_packet_state *state = talloc_get_type_abort(
- req->private_state, struct read_packet_state);
+ struct read_packet_state *state =
+ tevent_req_data(req, struct read_packet_state);
size_t total = talloc_get_size(state->buf);
ssize_t nread, more;
uint8_t *tmp;
- nread = read(state->fd, state->buf+state->nread, total-state->nread);
+ nread = recv(state->fd, state->buf+state->nread, total-state->nread,
+ 0);
+ if ((nread == -1) && (errno == EINTR)) {
+ /* retry */
+ return;
+ }
if (nread == -1) {
tevent_req_error(req, errno);
return;
return;
}
- tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
+ tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
if (tevent_req_nomem(tmp, req)) {
return;
}
ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
uint8_t **pbuf, int *perrno)
{
- struct read_packet_state *state = talloc_get_type_abort(
- req->private_state, struct read_packet_state);
+ struct read_packet_state *state =
+ tevent_req_data(req, struct read_packet_state);
if (tevent_req_is_unix_error(req, perrno)) {
return -1;