*/
#include "replace.h"
-#include "system/network.h"
#include "system/filesys.h"
#include "tsocket.h"
#include "tsocket_internal.h"
-int tsocket_error_from_errno(int ret,
- int sys_errno,
- bool *retry)
+struct tdgram_sendto_queue_state {
+ /* this structs are owned by the caller */
+ struct {
+ struct tevent_context *ev;
+ struct tdgram_context *dgram;
+ const uint8_t *buf;
+ size_t len;
+ const struct tsocket_address *dst;
+ } caller;
+ ssize_t ret;
+};
+
+static void tdgram_sendto_queue_trigger(struct tevent_req *req,
+ void *private_data);
+static void tdgram_sendto_queue_done(struct tevent_req *subreq);
+
+/**
+ * @brief Queue a dgram blob for sending through the socket
+ * @param[in] mem_ctx The memory context for the result
+ * @param[in] ev The event context the operation should work on
+ * @param[in] dgram The tdgram_context to send the message buffer
+ * @param[in] queue The existing dgram queue
+ * @param[in] buf The message buffer
+ * @param[in] len The message length
+ * @param[in] dst The destination socket address
+ * @retval The async request handle
+ *
+ * This function queues a blob for sending to destination through an existing
+ * dgram socket. The async callback is triggered when the whole blob is
+ * delivered to the underlying system socket.
+ *
+ * The caller needs to make sure that all non-scalar input parameters hang
+ * arround for the whole lifetime of the request.
+ */
+struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tdgram_context *dgram,
+ struct tevent_queue *queue,
+ const uint8_t *buf,
+ size_t len,
+ struct tsocket_address *dst)
{
- *retry = false;
+ struct tevent_req *req;
+ struct tdgram_sendto_queue_state *state;
+ bool ok;
- if (ret >= 0) {
- return 0;
+ req = tevent_req_create(mem_ctx, &state,
+ struct tdgram_sendto_queue_state);
+ if (!req) {
+ return NULL;
}
- if (ret != -1) {
- return EIO;
+ state->caller.ev = ev;
+ state->caller.dgram = dgram;
+ state->caller.buf = buf;
+ state->caller.len = len;
+ state->caller.dst = dst;
+ state->ret = -1;
+
+ ok = tevent_queue_add(queue,
+ ev,
+ req,
+ tdgram_sendto_queue_trigger,
+ NULL);
+ if (!ok) {
+ tevent_req_nomem(NULL, req);
+ goto post;
}
- if (sys_errno == 0) {
- return EIO;
+ return req;
+
+ post:
+ tevent_req_post(req, ev);
+ return req;
+}
+
+static void tdgram_sendto_queue_trigger(struct tevent_req *req,
+ void *private_data)
+{
+ struct tdgram_sendto_queue_state *state = tevent_req_data(req,
+ struct tdgram_sendto_queue_state);
+ struct tevent_req *subreq;
+
+ subreq = tdgram_sendto_send(state,
+ state->caller.ev,
+ state->caller.dgram,
+ state->caller.buf,
+ state->caller.len,
+ state->caller.dst);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
}
+ tevent_req_set_callback(subreq, tdgram_sendto_queue_done, req);
+}
- if (sys_errno == EINTR) {
- *retry = true;
- return sys_errno;
+static void tdgram_sendto_queue_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(subreq,
+ struct tevent_req);
+ struct tdgram_sendto_queue_state *state = tevent_req_data(req,
+ struct tdgram_sendto_queue_state);
+ ssize_t ret;
+ int sys_errno;
+
+ ret = tdgram_sendto_recv(subreq, &sys_errno);
+ talloc_free(subreq);
+ if (ret == -1) {
+ tevent_req_error(req, sys_errno);
+ return;
}
+ state->ret = ret;
- if (sys_errno == EINPROGRESS) {
- *retry = true;
- return sys_errno;
+ tevent_req_done(req);
+}
+
+ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno)
+{
+ struct tdgram_sendto_queue_state *state = tevent_req_data(req,
+ struct tdgram_sendto_queue_state);
+ ssize_t ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ ret = state->ret;
}
- if (sys_errno == EAGAIN) {
- *retry = true;
- return sys_errno;
+ tevent_req_received(req);
+ return ret;
+}
+
+struct tstream_readv_pdu_state {
+ /* this structs are owned by the caller */
+ struct {
+ struct tevent_context *ev;
+ struct tstream_context *stream;
+ tstream_readv_pdu_next_vector_t next_vector_fn;
+ void *next_vector_private;
+ } caller;
+
+ /*
+ * Each call to the callback resets iov and count
+ * the callback allocated the iov as child of our state,
+ * that means we are allowed to modify and free it.
+ *
+ * we should call the callback every time we filled the given
+ * vector and ask for a new vector. We return if the callback
+ * ask for 0 bytes.
+ */
+ struct iovec *vector;
+ size_t count;
+
+ /*
+ * the total number of bytes we read,
+ * the return value of the _recv function
+ */
+ int total_read;
+};
+
+static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req);
+static void tstream_readv_pdu_readv_done(struct tevent_req *subreq);
+
+struct tevent_req *tstream_readv_pdu_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream,
+ tstream_readv_pdu_next_vector_t next_vector_fn,
+ void *next_vector_private)
+{
+ struct tevent_req *req;
+ struct tstream_readv_pdu_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_readv_pdu_state);
+ if (!req) {
+ return NULL;
}
-#ifdef EWOULDBLOCK
- if (sys_errno == EWOULDBLOCK) {
- *retry = true;
- return sys_errno;
+ state->caller.ev = ev;
+ state->caller.stream = stream;
+ state->caller.next_vector_fn = next_vector_fn;
+ state->caller.next_vector_private = next_vector_private;
+
+ state->vector = NULL;
+ state->count = 0;
+ state->total_read = 0;
+
+ tstream_readv_pdu_ask_for_next_vector(req);
+ if (!tevent_req_is_in_progress(req)) {
+ goto post;
}
-#endif
- return sys_errno;
+ return req;
+
+ post:
+ return tevent_req_post(req, ev);
}
-int tsocket_simple_int_recv(struct tevent_req *req, int *perrno)
+static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req)
{
- enum tevent_req_state state;
- uint64_t error;
+ struct tstream_readv_pdu_state *state = tevent_req_data(req,
+ struct tstream_readv_pdu_state);
+ int ret;
+ size_t to_read = 0;
+ size_t i;
+ struct tevent_req *subreq;
+
+ TALLOC_FREE(state->vector);
+ state->count = 0;
+
+ ret = state->caller.next_vector_fn(state->caller.stream,
+ state->caller.next_vector_private,
+ state, &state->vector, &state->count);
+ if (ret == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
- if (!tevent_req_is_error(req, &state, &error)) {
- return 0;
+ if (state->count == 0) {
+ tevent_req_done(req);
+ return;
}
- switch (state) {
- case TEVENT_REQ_NO_MEMORY:
- *perrno = ENOMEM;
- return -1;
- case TEVENT_REQ_TIMED_OUT:
- *perrno = ETIMEDOUT;
- return -1;
- case TEVENT_REQ_USER_ERROR:
- *perrno = (int)error;
- return -1;
- default:
- *perrno = EIO;
- return -1;
+ for (i=0; i < state->count; i++) {
+ size_t tmp = to_read;
+ tmp += state->vector[i].iov_len;
+
+ if (tmp < to_read) {
+ tevent_req_error(req, EMSGSIZE);
+ return;
+ }
+
+ to_read = tmp;
}
- *perrno = EIO;
- return -1;
+ /*
+ * this is invalid the next vector function should have
+ * reported count == 0.
+ */
+ if (to_read == 0) {
+ tevent_req_error(req, EINVAL);
+ return;
+ }
+
+ if (state->total_read + to_read < state->total_read) {
+ tevent_req_error(req, EMSGSIZE);
+ return;
+ }
+
+ subreq = tstream_readv_send(state,
+ state->caller.ev,
+ state->caller.stream,
+ state->vector,
+ state->count);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, tstream_readv_pdu_readv_done, req);
}
-int tsocket_common_prepare_fd(int fd, bool high_fd)
+static void tstream_readv_pdu_readv_done(struct tevent_req *subreq)
{
- int i;
- int sys_errno = 0;
- int fds[3];
- int num_fds = 0;
+ struct tevent_req *req = tevent_req_callback_data(subreq,
+ struct tevent_req);
+ struct tstream_readv_pdu_state *state = tevent_req_data(req,
+ struct tstream_readv_pdu_state);
+ int ret;
+ int sys_errno;
+
+ ret = tstream_readv_recv(subreq, &sys_errno);
+ if (ret == -1) {
+ tevent_req_error(req, sys_errno);
+ return;
+ }
+
+ state->total_read += ret;
- int result, flags;
+ /* ask the callback for a new vector we should fill */
+ tstream_readv_pdu_ask_for_next_vector(req);
+}
- if (fd == -1) {
- return -1;
+int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno)
+{
+ struct tstream_readv_pdu_state *state = tevent_req_data(req,
+ struct tstream_readv_pdu_state);
+ int ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ ret = state->total_read;
}
- /* first make a fd >= 3 */
- if (high_fd) {
- while (fd < 3) {
- fds[num_fds++] = fd;
- fd = dup(fd);
- if (fd == -1) {
- sys_errno = errno;
- break;
- }
- }
- for (i=0; i<num_fds; i++) {
- close(fds[i]);
- }
- if (fd == -1) {
- errno = sys_errno;
- return fd;
- }
+ tevent_req_received(req);
+ return ret;
+}
+
+struct tstream_readv_pdu_queue_state {
+ /* this structs are owned by the caller */
+ struct {
+ struct tevent_context *ev;
+ struct tstream_context *stream;
+ tstream_readv_pdu_next_vector_t next_vector_fn;
+ void *next_vector_private;
+ } caller;
+ int ret;
+};
+
+static void tstream_readv_pdu_queue_trigger(struct tevent_req *req,
+ void *private_data);
+static void tstream_readv_pdu_queue_done(struct tevent_req *subreq);
+
+/**
+ * @brief Queue a dgram blob for sending through the socket
+ * @param[in] mem_ctx The memory context for the result
+ * @param[in] ev The tevent_context to run on
+ * @param[in] stream The stream to send data through
+ * @param[in] queue The existing send queue
+ * @param[in] next_vector_fn The next vector function
+ * @param[in] next_vector_private The private_data of the next vector function
+ * @retval The async request handle
+ *
+ * This function queues a blob for sending to destination through an existing
+ * dgram socket. The async callback is triggered when the whole blob is
+ * delivered to the underlying system socket.
+ *
+ * The caller needs to make sure that all non-scalar input parameters hang
+ * arround for the whole lifetime of the request.
+ */
+struct tevent_req *tstream_readv_pdu_queue_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream,
+ struct tevent_queue *queue,
+ tstream_readv_pdu_next_vector_t next_vector_fn,
+ void *next_vector_private)
+{
+ struct tevent_req *req;
+ struct tstream_readv_pdu_queue_state *state;
+ bool ok;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_readv_pdu_queue_state);
+ if (!req) {
+ return NULL;
}
- /* fd should be nonblocking. */
+ state->caller.ev = ev;
+ state->caller.stream = stream;
+ state->caller.next_vector_fn = next_vector_fn;
+ state->caller.next_vector_private = next_vector_private;
+ state->ret = -1;
-#ifdef O_NONBLOCK
-#define FLAG_TO_SET O_NONBLOCK
-#else
-#ifdef SYSV
-#define FLAG_TO_SET O_NDELAY
-#else /* BSD */
-#define FLAG_TO_SET FNDELAY
-#endif
-#endif
+ ok = tevent_queue_add(queue,
+ ev,
+ req,
+ tstream_readv_pdu_queue_trigger,
+ NULL);
+ if (!ok) {
+ tevent_req_nomem(NULL, req);
+ goto post;
+ }
+
+ return req;
+
+ post:
+ return tevent_req_post(req, ev);
+}
+
+static void tstream_readv_pdu_queue_trigger(struct tevent_req *req,
+ void *private_data)
+{
+ struct tstream_readv_pdu_queue_state *state = tevent_req_data(req,
+ struct tstream_readv_pdu_queue_state);
+ struct tevent_req *subreq;
- if ((flags = fcntl(fd, F_GETFL)) == -1) {
- goto fail;
+ subreq = tstream_readv_pdu_send(state,
+ state->caller.ev,
+ state->caller.stream,
+ state->caller.next_vector_fn,
+ state->caller.next_vector_private);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
}
+ tevent_req_set_callback(subreq, tstream_readv_pdu_queue_done ,req);
+}
+
+static void tstream_readv_pdu_queue_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(subreq,
+ struct tevent_req);
+ struct tstream_readv_pdu_queue_state *state = tevent_req_data(req,
+ struct tstream_readv_pdu_queue_state);
+ int ret;
+ int sys_errno;
- flags |= FLAG_TO_SET;
- if (fcntl(fd, F_SETFL, flags) == -1) {
- goto fail;
+ ret = tstream_readv_pdu_recv(subreq, &sys_errno);
+ talloc_free(subreq);
+ if (ret == -1) {
+ tevent_req_error(req, sys_errno);
+ return;
}
+ state->ret = ret;
-#undef FLAG_TO_SET
+ tevent_req_done(req);
+}
+
+int tstream_readv_pdu_queue_recv(struct tevent_req *req, int *perrno)
+{
+ struct tstream_readv_pdu_queue_state *state = tevent_req_data(req,
+ struct tstream_readv_pdu_queue_state);
+ int ret;
- /* fd should be closed on exec() */
-#ifdef FD_CLOEXEC
- result = flags = fcntl(fd, F_GETFD, 0);
- if (flags >= 0) {
- flags |= FD_CLOEXEC;
- result = fcntl(fd, F_SETFD, flags);
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ ret = state->ret;
}
- if (result < 0) {
- goto fail;
+
+ tevent_req_received(req);
+ return ret;
+}
+
+struct tstream_writev_queue_state {
+ /* this structs are owned by the caller */
+ struct {
+ struct tevent_context *ev;
+ struct tstream_context *stream;
+ const struct iovec *vector;
+ size_t count;
+ } caller;
+ int ret;
+};
+
+static void tstream_writev_queue_trigger(struct tevent_req *req,
+ void *private_data);
+static void tstream_writev_queue_done(struct tevent_req *subreq);
+
+/**
+ * @brief Queue a dgram blob for sending through the socket
+ * @param[in] mem_ctx The memory context for the result
+ * @param[in] ev The tevent_context to run on
+ * @param[in] stream The stream to send data through
+ * @param[in] queue The existing send queue
+ * @param[in] vector The iovec vector so write
+ * @param[in] count The size of the vector
+ * @retval The async request handle
+ *
+ * This function queues a blob for sending to destination through an existing
+ * dgram socket. The async callback is triggered when the whole blob is
+ * delivered to the underlying system socket.
+ *
+ * The caller needs to make sure that all non-scalar input parameters hang
+ * arround for the whole lifetime of the request.
+ */
+struct tevent_req *tstream_writev_queue_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream,
+ struct tevent_queue *queue,
+ const struct iovec *vector,
+ size_t count)
+{
+ struct tevent_req *req;
+ struct tstream_writev_queue_state *state;
+ bool ok;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_writev_queue_state);
+ if (!req) {
+ return NULL;
+ }
+
+ state->caller.ev = ev;
+ state->caller.stream = stream;
+ state->caller.vector = vector;
+ state->caller.count = count;
+ state->ret = -1;
+
+ ok = tevent_queue_add(queue,
+ ev,
+ req,
+ tstream_writev_queue_trigger,
+ NULL);
+ if (!ok) {
+ tevent_req_nomem(NULL, req);
+ goto post;
+ }
+
+ return req;
+
+ post:
+ return tevent_req_post(req, ev);
+}
+
+static void tstream_writev_queue_trigger(struct tevent_req *req,
+ void *private_data)
+{
+ struct tstream_writev_queue_state *state = tevent_req_data(req,
+ struct tstream_writev_queue_state);
+ struct tevent_req *subreq;
+
+ subreq = tstream_writev_send(state,
+ state->caller.ev,
+ state->caller.stream,
+ state->caller.vector,
+ state->caller.count);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
}
-#endif
- return fd;
+ tevent_req_set_callback(subreq, tstream_writev_queue_done ,req);
+}
+
+static void tstream_writev_queue_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(subreq,
+ struct tevent_req);
+ struct tstream_writev_queue_state *state = tevent_req_data(req,
+ struct tstream_writev_queue_state);
+ int ret;
+ int sys_errno;
- fail:
- if (fd != -1) {
- sys_errno = errno;
- close(fd);
- errno = sys_errno;
+ ret = tstream_writev_recv(subreq, &sys_errno);
+ talloc_free(subreq);
+ if (ret == -1) {
+ tevent_req_error(req, sys_errno);
+ return;
}
- return -1;
+ state->ret = ret;
+
+ tevent_req_done(req);
+}
+
+int tstream_writev_queue_recv(struct tevent_req *req, int *perrno)
+{
+ struct tstream_writev_queue_state *state = tevent_req_data(req,
+ struct tstream_writev_queue_state);
+ int ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ ret = state->ret;
+ }
+
+ tevent_req_received(req);
+ return ret;
}